aboutsummaryrefslogtreecommitdiff
path: root/src/test.py
blob: 49059f641d99c59417fdb86ae798426a8589bcd0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# ############################################################################ #
#                                                                              #
#                                                         :::      ::::::::    #
#    test.py                                            :+:      :+:    :+:    #
#                                                     +:+ +:+         +:+      #
#    By: charles <me@cacharle.xyz>                  +#+  +:+       +#+         #
#                                                 +#+#+#+#+#+   +#+            #
#    Created: 2020/09/27 11:36:32 by charles           #+#    #+#              #
#    Updated: 2020/10/01 11:41:21 by cacharle         ###   ########.fr        #
#                                                                              #
# ############################################################################ #

import os
import time
import subprocess

import config
import philo
from helper import current_ms, red, green


class Test:
    _tests = []
    _exec_path = None
    _fail_summaries = []

    @classmethod
    def run_all(cls, exec_path: str):
        cls._exec_path = exec_path
        for t in cls._tests:
            t.run()

    @staticmethod
    def new_error(error_cmd: [str]):
        Test(error_cmd=error_cmd)

    def __init__(
        self,
        philo_num:     int   = None,
        timeout_die:   int   = None,
        timeout_eat:   int   = None,
        timeout_sleep: int   = None,
        meal_num:      int   = None,
        error_cmd:     [str] = None,
    ):
        self._philo_num     = philo_num
        self._timeout_die   = timeout_die
        self._timeout_eat   = timeout_eat
        self._timeout_sleep = timeout_sleep
        self._meal_num      = meal_num
        self._error_cmd     = error_cmd
        Test._tests.append(self)

    def run(self):
        try:
            self._run_tested()
        except philo.error.Philo as e:
            self._print_fail(e.summary)
            Test._fail_summaries.append(self._argv_str + '\n' + e.full_summary)
        else:
            self._print_pass()

    @classmethod
    def write_failed(cls):
        with open(config.RESULT_FILE, "w") as f:
            f.write('\n\n'.join(cls._fail_summaries))

    @classmethod
    def print_summary(cls):
        fail_total = len(cls._fail_summaries)
        pass_total = len(cls._tests) - fail_total
        print("Summary: Total {}   {}   {}".format(
            len(cls._tests),
            green("[PASS] {:3}".format(pass_total)),
            red("[FAIL] {:3}".format(fail_total))
        ))

    def _run_tested(self):
        start_time = current_ms()
        process = subprocess.Popen(
            self._argv(),
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT
        )
        if self._error_cmd is not None:
            self._check_error(process)
            return

        out = ""
        try:
            out, _ = process.communicate(timeout=1)
        except subprocess.TimeoutExpired as e:
            out = e.stdout
        end_time = current_ms()
        try:
            out = out.decode()
        except UnicodeDecodeError:
            pass # TODO

        table = philo.Table(
            self._philo_num,
            self._timeout_die,
            self._timeout_eat,
            self._timeout_sleep,
            1 if self._meal_num is None else self._meal_num
        )
        for line in out.split('\n')[:-1]:
            table.add_log(philo.Log(line, self._philo_num, start_time, end_time))
            table.check()

    def _check_error(self, process):
        try:
            out, _ = process.communicate(timeout=config.TIMEOUT_ERROR)
        except subprocess.TimeoutExpired:
            raise error.ShouldFail("no error message")
        if process.returncode == 0:
            raise error.ShouldFail("non zero status code: {}".format(process.returncode))
        if out.decode().count('\n') != 1:
            raise error.ShouldFail("no error message")

    def _argv(self, basename=False):
        exec_path = os.path.basename(Test._exec_path) if basename else Test._exec_path
        if self._error_cmd is not None:
            return [exec_path, *self._error_cmd]
        else:
            argv = [
                exec_path,
                str(self._philo_num),
                str(self._timeout_die),
                str(self._timeout_eat),
                str(self._timeout_sleep)
            ]
            if self._meal_num is not None:
                argv.append(str(self._meal_num))
            return argv

    @property
    def _argv_str(self):
        return ' '.join(self._argv(basename=True))

    def _print_fail(self, msg):
        print(red("[FAIL] {}: {}".format(self._argv_str, msg)))

    def _print_pass(self):
        print(green("[PASS] {}".format(self._argv_str)))