diff options
Diffstat (limited to 'minishell_test/test')
| -rw-r--r-- | minishell_test/test/__init__.py | 13 | ||||
| -rw-r--r-- | minishell_test/test/captured.py | 56 | ||||
| -rw-r--r-- | minishell_test/test/result.py | 246 | ||||
| -rw-r--r-- | minishell_test/test/test.py | 155 |
4 files changed, 470 insertions, 0 deletions
diff --git a/minishell_test/test/__init__.py b/minishell_test/test/__init__.py new file mode 100644 index 0000000..cf9949f --- /dev/null +++ b/minishell_test/test/__init__.py @@ -0,0 +1,13 @@ +# ############################################################################ # +# # +# ::: :::::::: # +# __init__.py :+: :+: :+: # +# +:+ +:+ +:+ # +# By: charles <me@cacharle.xyz> +#+ +:+ +#+ # +# +#+#+#+#+#+ +#+ # +# Created: 2020/09/11 12:18:14 by charles #+# #+# # +# Updated: 2020/09/11 20:18:10 by charles ### ########.fr # +# # +# ############################################################################ # + +from test.test import Test # noqa: F401 diff --git a/minishell_test/test/captured.py b/minishell_test/test/captured.py new file mode 100644 index 0000000..f7dae3e --- /dev/null +++ b/minishell_test/test/captured.py @@ -0,0 +1,56 @@ +# ############################################################################ # +# # +# ::: :::::::: # +# captured.py :+: :+: :+: # +# +:+ +:+ +:+ # +# By: charles <me@cacharle.xyz> +#+ +:+ +#+ # +# +#+#+#+#+#+ +#+ # +# Created: 2020/09/11 12:16:25 by charles #+# #+# # +# Updated: 2021/02/04 15:52:19 by charles ### ########.fr # +# # +# ############################################################################ # + +from typing import List, Optional + +import config + + +class Captured: + def __init__( + self, + output: str, + status: int, + files_content: List[Optional[str]], + is_timeout: bool = False + ): + """Captured class + output: captured content + status: command status + files_content: content of the files altered by the command + is_timeout: the command has timed out + """ + lines = output.split('\n') + for i, l in enumerate(lines): + if l.find(config.REFERENCE_ERROR_BEGIN) == 0: + lines[i] = l.replace(config.REFERENCE_ERROR_BEGIN, config.MINISHELL_ERROR_BEGIN, 1) + elif l.find(config.REFERENCE_PATH + ": ") == 0: + lines[i] = l.replace(config.REFERENCE_PATH + ": ", config.MINISHELL_ERROR_BEGIN, 1) + self.output = '\n'.join(lines) + + self.status = status + self.files_content = files_content + self.is_timeout = is_timeout + + def __eq__(self, other: object) -> bool: + if not isinstance(other, Captured): + raise NotImplementedError + if self.is_timeout: + return self.is_timeout == other.is_timeout + return (self.output == other.output + and self.status == other.status + and all(x == y for x, y in zip(self.files_content, other.files_content))) + + @staticmethod + def timeout(): + """Create a new captured timeout""" + return Captured("", 0, [], is_timeout=True) diff --git a/minishell_test/test/result.py b/minishell_test/test/result.py new file mode 100644 index 0000000..eff7b8b --- /dev/null +++ b/minishell_test/test/result.py @@ -0,0 +1,246 @@ +# ############################################################################ # +# # +# ::: :::::::: # +# result.py :+: :+: :+: # +# +:+ +:+ +:+ # +# By: charles <me@cacharle.xyz> +#+ +:+ +#+ # +# +#+#+#+#+#+ +#+ # +# Created: 2020/09/11 12:17:34 by charles #+# #+# # +# Updated: 2021/02/05 01:36:44 by charles ### ########.fr # +# # +# ############################################################################ # + +import sys +import re +from typing import Match, List, Optional + +import config +from test.captured import Captured + + +class BaseResult: + RED_CHARS = "\033[31m" + GREEN_CHARS = "\033[32m" + BLUE_CHARS = "\033[34m" + BOLD_CHARS = "\033[1m" + CLOSE_CHARS = "\033[0m" + + def __init__(self, cmd: str): + self.cmd = cmd + self.colored = True + self.set_colors() + + @property + def passed(self): + """Check if the result passed""" + raise NotImplementedError + + @property + def failed(self): + """Check if the result failed""" + return not self.passed + + def __repr__(self): + """Returns a representation of the result based on the verbosity""" + if config.VERBOSE_LEVEL == 0: + return self.green('.') if self.passed else self.red('!') + if config.VERBOSE_LEVEL == 1: + printed = self._escaped_cmd[:] + if config.SHOW_RANGE: + printed = "{:2}: ".format(self.index) + printed + if len(printed) > config.TERM_COLS - 7: + printed = printed[:config.TERM_COLS - 10] + "..." + fmt = self.green("{:{width}} [PASS]") if self.passed else self.red("{:{width}} [FAIL]") + return fmt.format(printed, width=config.TERM_COLS - 7) + elif config.VERBOSE_LEVEL == 2: + return self.full_diff() + else: + raise RuntimeError("Invalid verbose level") + + def put(self, index: int) -> None: + """Print a summary of the result""" + if config.VERBOSE_LEVEL == 2 and self.passed: + return + self.index = index + print(self, end="") + if config.VERBOSE_LEVEL == 0: + sys.stdout.flush() + else: + print() + + def indicator(self, title: str, prefix: str) -> str: + return self.bold(self.blue(prefix + " " + title)) + + def full_diff(self) -> str: + raise NotImplementedError + + @property + def _escaped_cmd(self): + """Escape common control characters""" + c = self.cmd + c = c.replace("\t", "\\t") + c = c.replace("\n", "\\n") + c = c.replace("\v", "\\v") + c = c.replace("\r", "\\r") + c = c.replace("\f", "\\f") + return c + + @property + def _header_with(self): + return self.indicator("WITH {}".format(self._escaped_cmd), "|>") + '\n' + + def set_colors(self): + """Set colors strings on or off based on self.colored""" + if self.colored: + self.color_red = self.RED_CHARS + self.color_green = self.GREEN_CHARS + self.color_blue = self.BLUE_CHARS + self.color_bold = self.BOLD_CHARS + self.color_close = self.CLOSE_CHARS + else: + self.color_red = "" + self.color_green = "" + self.color_blue = "" + self.color_bold = "" + self.color_close = "" + + def green(self, s): + return self.color_green + s + self.color_close + + def red(self, s): + return self.color_red + s + self.color_close + + def blue(self, s): + return self.color_blue + s + self.color_close + + def bold(self, s): + return self.color_bold + s + self.color_close + + +class Result(BaseResult): + def __init__( + self, + cmd: str, + file_names: List[str], + expected: Captured, + actual: Captured, + ): + """Result class + cmd: runned command + file_names: names of watched files + expected: expected capture + actual: actual capture + """ + self.file_names = file_names + self.expected = expected + self.actual = actual + super().__init__(cmd) + + @property + def passed(self): + return self.actual == self.expected + + def header(self, title: str) -> str: + return self.bold("|---------------------------------------{:-<40}".format(title)) + + @property + def expected_header(self) -> str: + return self.green(self.header("EXPECTED")) + '\n' + + @property + def actual_header(self) -> str: + return self.red(self.header("ACTUAL")) + '\n' + + def cat_e(self, s: Optional[str]) -> str: + """Pass a string through a cat -e like output""" + if s is None: + return "FROM TEST: File not created\n" + s = s.replace("\n", "$\n") + if len(s) < 2: + return s + if s[-1] != '\n': + s += '\n' + return s + + def file_diff(self, file_name: str, expected: Optional[str], actual: Optional[str]) -> str: + """Difference between 2 files""" + if expected == actual: + return "" + file_header = self.indicator("FILE {}".format(file_name), "|#") + '\n' + return ( + file_header + + self.expected_header + + self.cat_e(expected) + + self.actual_header + + self.cat_e(actual) + ) + + def files_diff(self): + """Difference between watched files""" + return '\n'.join([self.file_diff(n, e, a) for n, e, a in + zip(self.file_names, + self.expected.files_content, + self.actual.files_content) + if e != a]) + + def output_diff(self) -> str: + """Difference in command output""" + out = "" + if self.actual.is_timeout: + return "TIMEOUT\n" + if self.expected.status != self.actual.status: + out += self.indicator( + "STATUS: expected {} actual {}" + .format(self.expected.status, self.actual.status), "| " + ) + '\n' + if self.expected.output != self.actual.output: + out += (self.expected_header + + self.cat_e(self.expected.output) + + self.actual_header + + self.cat_e(self.actual.output)) + return out + + def full_diff(self) -> str: + """Concat all difference reports""" + return self._header_with + self.output_diff() + self.files_diff() + "=" * 80 + '\n' + + +class LeakResult(BaseResult): + def __init__(self, cmd: str, captured: Captured): + self.captured = captured + super().__init__(cmd) + + def _search_leak_kind(self, kind: str) -> Match: + match = re.search( + r"==\d+==\s+" + kind + r" lost: (?P<bytes>[0-9,]+) bytes in [0-9,]+ blocks", + self.captured.output + ) + if match is None: + raise RuntimeError( + "valgrind output parsing failed for `{}`:\n{}" + .format(self.cmd, self.captured.output) + ) + return match + + @property + def _lost_bytes(self): + if self.captured.output.find("All heap blocks were freed -- no leaks are possible") != -1: + definite_bytes = 0 + indirect_bytes = 0 + else: + definite_match = self._search_leak_kind("definitely") + indirect_match = self._search_leak_kind("indirectly") + definite_bytes = int(definite_match.group("bytes").replace(",", "")) + indirect_bytes = int(indirect_match.group("bytes").replace(",", "")) + return definite_bytes + indirect_bytes + + @property + def passed(self): + """Check if the result passed""" + if self.captured.is_timeout: + return False + return self._lost_bytes == 0 + + def full_diff(self) -> str: + """Concat all difference reports""" + return self._header_with + self.captured.output diff --git a/minishell_test/test/test.py b/minishell_test/test/test.py new file mode 100644 index 0000000..ab68d1e --- /dev/null +++ b/minishell_test/test/test.py @@ -0,0 +1,155 @@ +# ############################################################################ # +# # +# ::: :::::::: # +# test.py :+: :+: :+: # +# +:+ +:+ +:+ # +# By: charles <charles.cabergs@gmail.com> +#+ +:+ +#+ # +# +#+#+#+#+#+ +#+ # +# Created: 2020/06/16 21:48:50 by charles #+# #+# # +# Updated: 2021/02/05 01:37:44 by charles ### ########.fr # +# # +# ############################################################################ # + +import os +import sys +import subprocess +from typing import Optional, List, Dict, Union, Callable + +import config +from test.captured import Captured +from test.result import Result, LeakResult +import sandbox + +HookType = Union[Callable[[str], str], List[Callable[[str], str]]] +HookStatusType = Union[Callable[[int], int], List[Callable[[int], int]]] + + +class Test: + def __init__( + self, + cmd: str, + setup: str = "", + files: List[str] = [], + exports: Dict[str, str] = {}, + timeout: float = config.TIMEOUT, + hook: HookType = [], + hook_status: HookStatusType = [], + ): + """ Test class + cmd: command to execute + setup: command to execute before tested command + files: files to watch (check content after test) + exports: exported variables + timeout: maximum amount of time taken by the test + hook: function to execute on the output of the test + hook_status: function to execute on status code + """ + self.cmd = cmd + self.setup = setup + self.files = files + self.exports = exports + self.result: Optional[Union[Result, LeakResult]] = None + self.timeout = timeout + if type(hook) is not list: + hook = [hook] # type: ignore + if type(hook_status) is not list: + hook_status = [hook_status] # type: ignore + self.hook: List[Callable[[str], str]] = hook # type: ignore + self.hook_status: List[Callable[[int], int]] = hook_status # type: ignore + + def run(self, index: int) -> None: + """ Run the test for minishell and the reference shell and print the result out """ + + if config.CHECK_LEAKS: + self.hook = [] + self.hook_status = [] + captured = self._run_sandboxed([*config.VALGRIND_CMD, "-c"]) + if config.VERBOSE_LEVEL == 2: + print(captured.output) + self.result = LeakResult(self.full_cmd, captured) + self.result.put(index) + return + + expected = self._run_sandboxed([config.REFERENCE_PATH, *config.REFERENCE_ARGS, "-c"]) + actual = self._run_sandboxed([config.MINISHELL_PATH, "-c"]) + self.result = Result(self.full_cmd, self.files, expected, actual) + self.result.put(index) + + def _run_sandboxed(self, shell_cmd: List[str]) -> Captured: + """ Run the command in a sandbox environment """ + with sandbox.context(): + if self.setup != "": + try: + subprocess.run( + self.setup, + shell=True, + cwd=config.SANDBOX_PATH, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + check=True + ) + except subprocess.CalledProcessError as e: + print("Error: `{}` setup command failed for `{}`\n\twith '{}'" + .format(self.setup, + self.cmd, + "no stderr" if e.stdout is None + else e.stdout.decode().strip())) + sys.exit(1) + return self._run_capture(shell_cmd) + + def _run_capture(self, shell_cmd: List[str]) -> Captured: + """ Capture the output (stdout and stderr) + Capture the content of the watched files after the command is run + """ + # run the command in the sandbox + process = subprocess.Popen( + [*shell_cmd, self.cmd], + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + cwd=config.SANDBOX_PATH, + env={ + 'PATH': config.PATH_VARIABLE, + 'TERM': 'xterm-256color', + **self.exports, + }, + ) + + # https://docs.python.org/3/library/subprocess.html#subprocess.Popen.communicate + try: + stdout, _ = process.communicate( + timeout=(self.timeout if not config.CHECK_LEAKS else config.CHECK_LEAKS_TIMEOUT)) + except subprocess.TimeoutExpired: + process.kill() + # _, _ = process.communicate(timeout=2) + return Captured.timeout() + try: + output = stdout.decode() + except UnicodeDecodeError: + output = "UNICODE ERROR: {}".format(process.stdout) + + # capture watched files content + files_content: List[Optional[str]] = [] + for file_name in self.files: + try: + with open(os.path.join(config.SANDBOX_PATH, file_name), "rb") as f: + files_content.append(f.read().decode()) + except FileNotFoundError: + files_content.append(None) + + # apply output/status hooks + for hook in self.hook: + output = hook(output) + for hook_status in self.hook_status: + process.returncode = hook_status(process.returncode) + return Captured(output, process.returncode, files_content) + + @property + def full_cmd(self): + """ Return the command prefixed by the setup and exports """ + s = self.cmd + if len(self.exports) != 0: + s = "[EXPORTS {}] {}".format( + ' '.join(["{}='{}'".format(k, v) for k, v in self.exports.items()]), s) + if self.setup != "": + s = "[SETUP {}] {}".format(self.setup, s) + return s |
