1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
# ############################################################################ #
# #
# ::: :::::::: #
# test.py :+: :+: :+: #
# +:+ +:+ +:+ #
# By: charles <me@cacharle.xyz> +#+ +:+ +#+ #
# +#+#+#+#+#+ +#+ #
# Created: 2020/09/27 11:36:32 by charles #+# #+# #
# Updated: 2020/09/29 15:15:46 by cacharle ### ########.fr #
# #
# ############################################################################ #
import os
import time
import subprocess
import config
import test.philo as philo
import test.error as error
from helper import current_ms
class Test:
_tests = []
_exec_path = None
_fail_summaries = []
@classmethod
def run_all(cls, exec_path: str):
cls._exec_path = exec_path
for t in cls._tests:
t.run()
@staticmethod
def new_error(error_cmd: [str]):
Test(error_cmd=error_cmd)
def __init__(
self,
philo_num: int = None,
timeout_die: int = None,
timeout_eat: int = None,
timeout_sleep: int = None,
meal_num: int = None,
error_cmd: [str] = None,
infinite: bool = False
):
self._philo_num = philo_num
self._timeout_die = timeout_die
self._timeout_eat = timeout_eat
self._timeout_sleep = timeout_sleep
self._meal_num = meal_num
self._error_cmd = error_cmd
self._infinite = infinite
Test._tests.append(self)
def run(self):
try:
self._run_tested()
except error.Philo as e:
self._print_fail(e.summary)
Test._fail_summaries.append(self._argv_str + '\n' + e.full_summary)
else:
self._print_pass()
@classmethod
def write_failed(cls):
with open(config.RESULT_FILE, "w") as f:
f.write('\n\n'.join(cls._fail_summaries))
def _run_tested(self):
process = subprocess.Popen(
self._argv(),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT
)
if self._error_cmd is not None:
self._check_error(process)
elif self._infinite:
try:
self._check_output(process.stdout, died=False)
process.wait(timeout=config.INFINITE_WAIT_TIME)
except subprocess.TimeoutExpired:
pass
else:
raise ShouldBeInfinite
else:
self._check_output(process.stdout)
process.wait(timeout=config.TIMEOUT)
def _check_output(self, stream, died: bool = True):
start_time = current_ms()
table = philo.Table(
self._philo_num, self._timeout_die, self._timeout_eat, self._timeout_sleep,
1 if self._meal_num is None else self._meal_num)
for i, line in enumerate(stream):
line = line.decode()[:-1]
table.add_log(philo.Log(line, self._philo_num, start_time))
table.check()
if i > 1000:
break
if died:
if not table.dead and self._philo_num != 0:
raise philo.error.Log(table._logs, "one philosopher should have died")
else:
if table.dead:
raise philo.error.Log(table._logs, "infinite shouldn't die")
def _check_error(self, process):
try:
out, _ = process.communicate(timeout=config.TIMEOUT_ERROR)
except subprocess.TimeoutExpired:
raise error.ShouldFail("no error message")
if process.returncode == 0:
raise error.ShouldFail("non zero status code: {}".format(process.returncode))
if out.decode().count('\n') != 1:
raise error.ShouldFail("no error message")
def _argv(self, basename=False):
exec_path = os.path.basename(Test._exec_path) if basename else Test._exec_path
if self._error_cmd is not None:
return [exec_path, *self._error_cmd]
else:
argv = [
exec_path,
str(self._philo_num),
str(self._timeout_die),
str(self._timeout_eat),
str(self._timeout_sleep)
]
if self._meal_num is not None:
argv.append(str(self._meal_num))
return argv
@property
def _argv_str(self):
return ' '.join(self._argv(basename=True))
RED_CHARS = "\033[31m"
GREEN_CHARS = "\033[32m"
CLOSE_CHARS = "\033[0m"
def _print_fail(self, msg):
print("{}[FAIL] {}: {}{}".format(Test.RED_CHARS, self._argv_str, msg, Test.CLOSE_CHARS))
def _print_pass(self):
print("{}[PASS] {}{}".format(Test.GREEN_CHARS, self._argv_str, Test.CLOSE_CHARS))
|