diff options
| author | Charles Cabergs <me@cacharle.xyz> | 2021-02-05 12:27:32 +0100 |
|---|---|---|
| committer | Charles Cabergs <me@cacharle.xyz> | 2021-02-05 12:27:32 +0100 |
| commit | 904a033ae738e1c351f8fef71e2ec2418fc4db3d (patch) | |
| tree | 3de4980582c109c4f0d19111a2b88eafec9b9b36 /src/test | |
| parent | a3e983f78dc4cbcf6f75f78fa2b3c57e09cd1b2b (diff) | |
| download | minishell_test-904a033ae738e1c351f8fef71e2ec2418fc4db3d.tar.gz minishell_test-904a033ae738e1c351f8fef71e2ec2418fc4db3d.tar.bz2 minishell_test-904a033ae738e1c351f8fef71e2ec2418fc4db3d.zip | |
Renaming src -> minishell_test for package name, Renaming main.py -> __main__.py for package execution with python -m
Diffstat (limited to 'src/test')
| -rw-r--r-- | src/test/__init__.py | 13 | ||||
| -rw-r--r-- | src/test/captured.py | 56 | ||||
| -rw-r--r-- | src/test/result.py | 246 | ||||
| -rw-r--r-- | src/test/test.py | 155 |
4 files changed, 0 insertions, 470 deletions
diff --git a/src/test/__init__.py b/src/test/__init__.py deleted file mode 100644 index cf9949f..0000000 --- a/src/test/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# ############################################################################ # -# # -# ::: :::::::: # -# __init__.py :+: :+: :+: # -# +:+ +:+ +:+ # -# By: charles <me@cacharle.xyz> +#+ +:+ +#+ # -# +#+#+#+#+#+ +#+ # -# Created: 2020/09/11 12:18:14 by charles #+# #+# # -# Updated: 2020/09/11 20:18:10 by charles ### ########.fr # -# # -# ############################################################################ # - -from test.test import Test # noqa: F401 diff --git a/src/test/captured.py b/src/test/captured.py deleted file mode 100644 index f7dae3e..0000000 --- a/src/test/captured.py +++ /dev/null @@ -1,56 +0,0 @@ -# ############################################################################ # -# # -# ::: :::::::: # -# captured.py :+: :+: :+: # -# +:+ +:+ +:+ # -# By: charles <me@cacharle.xyz> +#+ +:+ +#+ # -# +#+#+#+#+#+ +#+ # -# Created: 2020/09/11 12:16:25 by charles #+# #+# # -# Updated: 2021/02/04 15:52:19 by charles ### ########.fr # -# # -# ############################################################################ # - -from typing import List, Optional - -import config - - -class Captured: - def __init__( - self, - output: str, - status: int, - files_content: List[Optional[str]], - is_timeout: bool = False - ): - """Captured class - output: captured content - status: command status - files_content: content of the files altered by the command - is_timeout: the command has timed out - """ - lines = output.split('\n') - for i, l in enumerate(lines): - if l.find(config.REFERENCE_ERROR_BEGIN) == 0: - lines[i] = l.replace(config.REFERENCE_ERROR_BEGIN, config.MINISHELL_ERROR_BEGIN, 1) - elif l.find(config.REFERENCE_PATH + ": ") == 0: - lines[i] = l.replace(config.REFERENCE_PATH + ": ", config.MINISHELL_ERROR_BEGIN, 1) - self.output = '\n'.join(lines) - - self.status = status - self.files_content = files_content - self.is_timeout = is_timeout - - def __eq__(self, other: object) -> bool: - if not isinstance(other, Captured): - raise NotImplementedError - if self.is_timeout: - return self.is_timeout == other.is_timeout - return (self.output == other.output - and self.status == other.status - and all(x == y for x, y in zip(self.files_content, other.files_content))) - - @staticmethod - def timeout(): - """Create a new captured timeout""" - return Captured("", 0, [], is_timeout=True) diff --git a/src/test/result.py b/src/test/result.py deleted file mode 100644 index eff7b8b..0000000 --- a/src/test/result.py +++ /dev/null @@ -1,246 +0,0 @@ -# ############################################################################ # -# # -# ::: :::::::: # -# result.py :+: :+: :+: # -# +:+ +:+ +:+ # -# By: charles <me@cacharle.xyz> +#+ +:+ +#+ # -# +#+#+#+#+#+ +#+ # -# Created: 2020/09/11 12:17:34 by charles #+# #+# # -# Updated: 2021/02/05 01:36:44 by charles ### ########.fr # -# # -# ############################################################################ # - -import sys -import re -from typing import Match, List, Optional - -import config -from test.captured import Captured - - -class BaseResult: - RED_CHARS = "\033[31m" - GREEN_CHARS = "\033[32m" - BLUE_CHARS = "\033[34m" - BOLD_CHARS = "\033[1m" - CLOSE_CHARS = "\033[0m" - - def __init__(self, cmd: str): - self.cmd = cmd - self.colored = True - self.set_colors() - - @property - def passed(self): - """Check if the result passed""" - raise NotImplementedError - - @property - def failed(self): - """Check if the result failed""" - return not self.passed - - def __repr__(self): - """Returns a representation of the result based on the verbosity""" - if config.VERBOSE_LEVEL == 0: - return self.green('.') if self.passed else self.red('!') - if config.VERBOSE_LEVEL == 1: - printed = self._escaped_cmd[:] - if config.SHOW_RANGE: - printed = "{:2}: ".format(self.index) + printed - if len(printed) > config.TERM_COLS - 7: - printed = printed[:config.TERM_COLS - 10] + "..." - fmt = self.green("{:{width}} [PASS]") if self.passed else self.red("{:{width}} [FAIL]") - return fmt.format(printed, width=config.TERM_COLS - 7) - elif config.VERBOSE_LEVEL == 2: - return self.full_diff() - else: - raise RuntimeError("Invalid verbose level") - - def put(self, index: int) -> None: - """Print a summary of the result""" - if config.VERBOSE_LEVEL == 2 and self.passed: - return - self.index = index - print(self, end="") - if config.VERBOSE_LEVEL == 0: - sys.stdout.flush() - else: - print() - - def indicator(self, title: str, prefix: str) -> str: - return self.bold(self.blue(prefix + " " + title)) - - def full_diff(self) -> str: - raise NotImplementedError - - @property - def _escaped_cmd(self): - """Escape common control characters""" - c = self.cmd - c = c.replace("\t", "\\t") - c = c.replace("\n", "\\n") - c = c.replace("\v", "\\v") - c = c.replace("\r", "\\r") - c = c.replace("\f", "\\f") - return c - - @property - def _header_with(self): - return self.indicator("WITH {}".format(self._escaped_cmd), "|>") + '\n' - - def set_colors(self): - """Set colors strings on or off based on self.colored""" - if self.colored: - self.color_red = self.RED_CHARS - self.color_green = self.GREEN_CHARS - self.color_blue = self.BLUE_CHARS - self.color_bold = self.BOLD_CHARS - self.color_close = self.CLOSE_CHARS - else: - self.color_red = "" - self.color_green = "" - self.color_blue = "" - self.color_bold = "" - self.color_close = "" - - def green(self, s): - return self.color_green + s + self.color_close - - def red(self, s): - return self.color_red + s + self.color_close - - def blue(self, s): - return self.color_blue + s + self.color_close - - def bold(self, s): - return self.color_bold + s + self.color_close - - -class Result(BaseResult): - def __init__( - self, - cmd: str, - file_names: List[str], - expected: Captured, - actual: Captured, - ): - """Result class - cmd: runned command - file_names: names of watched files - expected: expected capture - actual: actual capture - """ - self.file_names = file_names - self.expected = expected - self.actual = actual - super().__init__(cmd) - - @property - def passed(self): - return self.actual == self.expected - - def header(self, title: str) -> str: - return self.bold("|---------------------------------------{:-<40}".format(title)) - - @property - def expected_header(self) -> str: - return self.green(self.header("EXPECTED")) + '\n' - - @property - def actual_header(self) -> str: - return self.red(self.header("ACTUAL")) + '\n' - - def cat_e(self, s: Optional[str]) -> str: - """Pass a string through a cat -e like output""" - if s is None: - return "FROM TEST: File not created\n" - s = s.replace("\n", "$\n") - if len(s) < 2: - return s - if s[-1] != '\n': - s += '\n' - return s - - def file_diff(self, file_name: str, expected: Optional[str], actual: Optional[str]) -> str: - """Difference between 2 files""" - if expected == actual: - return "" - file_header = self.indicator("FILE {}".format(file_name), "|#") + '\n' - return ( - file_header - + self.expected_header - + self.cat_e(expected) - + self.actual_header - + self.cat_e(actual) - ) - - def files_diff(self): - """Difference between watched files""" - return '\n'.join([self.file_diff(n, e, a) for n, e, a in - zip(self.file_names, - self.expected.files_content, - self.actual.files_content) - if e != a]) - - def output_diff(self) -> str: - """Difference in command output""" - out = "" - if self.actual.is_timeout: - return "TIMEOUT\n" - if self.expected.status != self.actual.status: - out += self.indicator( - "STATUS: expected {} actual {}" - .format(self.expected.status, self.actual.status), "| " - ) + '\n' - if self.expected.output != self.actual.output: - out += (self.expected_header - + self.cat_e(self.expected.output) - + self.actual_header - + self.cat_e(self.actual.output)) - return out - - def full_diff(self) -> str: - """Concat all difference reports""" - return self._header_with + self.output_diff() + self.files_diff() + "=" * 80 + '\n' - - -class LeakResult(BaseResult): - def __init__(self, cmd: str, captured: Captured): - self.captured = captured - super().__init__(cmd) - - def _search_leak_kind(self, kind: str) -> Match: - match = re.search( - r"==\d+==\s+" + kind + r" lost: (?P<bytes>[0-9,]+) bytes in [0-9,]+ blocks", - self.captured.output - ) - if match is None: - raise RuntimeError( - "valgrind output parsing failed for `{}`:\n{}" - .format(self.cmd, self.captured.output) - ) - return match - - @property - def _lost_bytes(self): - if self.captured.output.find("All heap blocks were freed -- no leaks are possible") != -1: - definite_bytes = 0 - indirect_bytes = 0 - else: - definite_match = self._search_leak_kind("definitely") - indirect_match = self._search_leak_kind("indirectly") - definite_bytes = int(definite_match.group("bytes").replace(",", "")) - indirect_bytes = int(indirect_match.group("bytes").replace(",", "")) - return definite_bytes + indirect_bytes - - @property - def passed(self): - """Check if the result passed""" - if self.captured.is_timeout: - return False - return self._lost_bytes == 0 - - def full_diff(self) -> str: - """Concat all difference reports""" - return self._header_with + self.captured.output diff --git a/src/test/test.py b/src/test/test.py deleted file mode 100644 index ab68d1e..0000000 --- a/src/test/test.py +++ /dev/null @@ -1,155 +0,0 @@ -# ############################################################################ # -# # -# ::: :::::::: # -# test.py :+: :+: :+: # -# +:+ +:+ +:+ # -# By: charles <charles.cabergs@gmail.com> +#+ +:+ +#+ # -# +#+#+#+#+#+ +#+ # -# Created: 2020/06/16 21:48:50 by charles #+# #+# # -# Updated: 2021/02/05 01:37:44 by charles ### ########.fr # -# # -# ############################################################################ # - -import os -import sys -import subprocess -from typing import Optional, List, Dict, Union, Callable - -import config -from test.captured import Captured -from test.result import Result, LeakResult -import sandbox - -HookType = Union[Callable[[str], str], List[Callable[[str], str]]] -HookStatusType = Union[Callable[[int], int], List[Callable[[int], int]]] - - -class Test: - def __init__( - self, - cmd: str, - setup: str = "", - files: List[str] = [], - exports: Dict[str, str] = {}, - timeout: float = config.TIMEOUT, - hook: HookType = [], - hook_status: HookStatusType = [], - ): - """ Test class - cmd: command to execute - setup: command to execute before tested command - files: files to watch (check content after test) - exports: exported variables - timeout: maximum amount of time taken by the test - hook: function to execute on the output of the test - hook_status: function to execute on status code - """ - self.cmd = cmd - self.setup = setup - self.files = files - self.exports = exports - self.result: Optional[Union[Result, LeakResult]] = None - self.timeout = timeout - if type(hook) is not list: - hook = [hook] # type: ignore - if type(hook_status) is not list: - hook_status = [hook_status] # type: ignore - self.hook: List[Callable[[str], str]] = hook # type: ignore - self.hook_status: List[Callable[[int], int]] = hook_status # type: ignore - - def run(self, index: int) -> None: - """ Run the test for minishell and the reference shell and print the result out """ - - if config.CHECK_LEAKS: - self.hook = [] - self.hook_status = [] - captured = self._run_sandboxed([*config.VALGRIND_CMD, "-c"]) - if config.VERBOSE_LEVEL == 2: - print(captured.output) - self.result = LeakResult(self.full_cmd, captured) - self.result.put(index) - return - - expected = self._run_sandboxed([config.REFERENCE_PATH, *config.REFERENCE_ARGS, "-c"]) - actual = self._run_sandboxed([config.MINISHELL_PATH, "-c"]) - self.result = Result(self.full_cmd, self.files, expected, actual) - self.result.put(index) - - def _run_sandboxed(self, shell_cmd: List[str]) -> Captured: - """ Run the command in a sandbox environment """ - with sandbox.context(): - if self.setup != "": - try: - subprocess.run( - self.setup, - shell=True, - cwd=config.SANDBOX_PATH, - stderr=subprocess.STDOUT, - stdout=subprocess.PIPE, - check=True - ) - except subprocess.CalledProcessError as e: - print("Error: `{}` setup command failed for `{}`\n\twith '{}'" - .format(self.setup, - self.cmd, - "no stderr" if e.stdout is None - else e.stdout.decode().strip())) - sys.exit(1) - return self._run_capture(shell_cmd) - - def _run_capture(self, shell_cmd: List[str]) -> Captured: - """ Capture the output (stdout and stderr) - Capture the content of the watched files after the command is run - """ - # run the command in the sandbox - process = subprocess.Popen( - [*shell_cmd, self.cmd], - stderr=subprocess.STDOUT, - stdout=subprocess.PIPE, - cwd=config.SANDBOX_PATH, - env={ - 'PATH': config.PATH_VARIABLE, - 'TERM': 'xterm-256color', - **self.exports, - }, - ) - - # https://docs.python.org/3/library/subprocess.html#subprocess.Popen.communicate - try: - stdout, _ = process.communicate( - timeout=(self.timeout if not config.CHECK_LEAKS else config.CHECK_LEAKS_TIMEOUT)) - except subprocess.TimeoutExpired: - process.kill() - # _, _ = process.communicate(timeout=2) - return Captured.timeout() - try: - output = stdout.decode() - except UnicodeDecodeError: - output = "UNICODE ERROR: {}".format(process.stdout) - - # capture watched files content - files_content: List[Optional[str]] = [] - for file_name in self.files: - try: - with open(os.path.join(config.SANDBOX_PATH, file_name), "rb") as f: - files_content.append(f.read().decode()) - except FileNotFoundError: - files_content.append(None) - - # apply output/status hooks - for hook in self.hook: - output = hook(output) - for hook_status in self.hook_status: - process.returncode = hook_status(process.returncode) - return Captured(output, process.returncode, files_content) - - @property - def full_cmd(self): - """ Return the command prefixed by the setup and exports """ - s = self.cmd - if len(self.exports) != 0: - s = "[EXPORTS {}] {}".format( - ' '.join(["{}='{}'".format(k, v) for k, v in self.exports.items()]), s) - if self.setup != "": - s = "[SETUP {}] {}".format(self.setup, s) - return s |
