From 904a033ae738e1c351f8fef71e2ec2418fc4db3d Mon Sep 17 00:00:00 2001 From: Charles Cabergs Date: Fri, 5 Feb 2021 12:27:32 +0100 Subject: Renaming src -> minishell_test for package name, Renaming main.py -> __main__.py for package execution with python -m --- minishell_test/test/test.py | 155 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 155 insertions(+) create mode 100644 minishell_test/test/test.py (limited to 'minishell_test/test/test.py') diff --git a/minishell_test/test/test.py b/minishell_test/test/test.py new file mode 100644 index 0000000..ab68d1e --- /dev/null +++ b/minishell_test/test/test.py @@ -0,0 +1,155 @@ +# ############################################################################ # +# # +# ::: :::::::: # +# test.py :+: :+: :+: # +# +:+ +:+ +:+ # +# By: charles +#+ +:+ +#+ # +# +#+#+#+#+#+ +#+ # +# Created: 2020/06/16 21:48:50 by charles #+# #+# # +# Updated: 2021/02/05 01:37:44 by charles ### ########.fr # +# # +# ############################################################################ # + +import os +import sys +import subprocess +from typing import Optional, List, Dict, Union, Callable + +import config +from test.captured import Captured +from test.result import Result, LeakResult +import sandbox + +HookType = Union[Callable[[str], str], List[Callable[[str], str]]] +HookStatusType = Union[Callable[[int], int], List[Callable[[int], int]]] + + +class Test: + def __init__( + self, + cmd: str, + setup: str = "", + files: List[str] = [], + exports: Dict[str, str] = {}, + timeout: float = config.TIMEOUT, + hook: HookType = [], + hook_status: HookStatusType = [], + ): + """ Test class + cmd: command to execute + setup: command to execute before tested command + files: files to watch (check content after test) + exports: exported variables + timeout: maximum amount of time taken by the test + hook: function to execute on the output of the test + hook_status: function to execute on status code + """ + self.cmd = cmd + self.setup = setup + self.files = files + self.exports = exports + self.result: Optional[Union[Result, LeakResult]] = None + self.timeout = timeout + if type(hook) is not list: + hook = [hook] # type: ignore + if type(hook_status) is not list: + hook_status = [hook_status] # type: ignore + self.hook: List[Callable[[str], str]] = hook # type: ignore + self.hook_status: List[Callable[[int], int]] = hook_status # type: ignore + + def run(self, index: int) -> None: + """ Run the test for minishell and the reference shell and print the result out """ + + if config.CHECK_LEAKS: + self.hook = [] + self.hook_status = [] + captured = self._run_sandboxed([*config.VALGRIND_CMD, "-c"]) + if config.VERBOSE_LEVEL == 2: + print(captured.output) + self.result = LeakResult(self.full_cmd, captured) + self.result.put(index) + return + + expected = self._run_sandboxed([config.REFERENCE_PATH, *config.REFERENCE_ARGS, "-c"]) + actual = self._run_sandboxed([config.MINISHELL_PATH, "-c"]) + self.result = Result(self.full_cmd, self.files, expected, actual) + self.result.put(index) + + def _run_sandboxed(self, shell_cmd: List[str]) -> Captured: + """ Run the command in a sandbox environment """ + with sandbox.context(): + if self.setup != "": + try: + subprocess.run( + self.setup, + shell=True, + cwd=config.SANDBOX_PATH, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + check=True + ) + except subprocess.CalledProcessError as e: + print("Error: `{}` setup command failed for `{}`\n\twith '{}'" + .format(self.setup, + self.cmd, + "no stderr" if e.stdout is None + else e.stdout.decode().strip())) + sys.exit(1) + return self._run_capture(shell_cmd) + + def _run_capture(self, shell_cmd: List[str]) -> Captured: + """ Capture the output (stdout and stderr) + Capture the content of the watched files after the command is run + """ + # run the command in the sandbox + process = subprocess.Popen( + [*shell_cmd, self.cmd], + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + cwd=config.SANDBOX_PATH, + env={ + 'PATH': config.PATH_VARIABLE, + 'TERM': 'xterm-256color', + **self.exports, + }, + ) + + # https://docs.python.org/3/library/subprocess.html#subprocess.Popen.communicate + try: + stdout, _ = process.communicate( + timeout=(self.timeout if not config.CHECK_LEAKS else config.CHECK_LEAKS_TIMEOUT)) + except subprocess.TimeoutExpired: + process.kill() + # _, _ = process.communicate(timeout=2) + return Captured.timeout() + try: + output = stdout.decode() + except UnicodeDecodeError: + output = "UNICODE ERROR: {}".format(process.stdout) + + # capture watched files content + files_content: List[Optional[str]] = [] + for file_name in self.files: + try: + with open(os.path.join(config.SANDBOX_PATH, file_name), "rb") as f: + files_content.append(f.read().decode()) + except FileNotFoundError: + files_content.append(None) + + # apply output/status hooks + for hook in self.hook: + output = hook(output) + for hook_status in self.hook_status: + process.returncode = hook_status(process.returncode) + return Captured(output, process.returncode, files_content) + + @property + def full_cmd(self): + """ Return the command prefixed by the setup and exports """ + s = self.cmd + if len(self.exports) != 0: + s = "[EXPORTS {}] {}".format( + ' '.join(["{}='{}'".format(k, v) for k, v in self.exports.items()]), s) + if self.setup != "": + s = "[SETUP {}] {}".format(self.setup, s) + return s -- cgit