aboutsummaryrefslogtreecommitdiff
path: root/minishell_test/test
diff options
context:
space:
mode:
Diffstat (limited to 'minishell_test/test')
-rw-r--r--minishell_test/test/__init__.py13
-rw-r--r--minishell_test/test/captured.py56
-rw-r--r--minishell_test/test/result.py246
-rw-r--r--minishell_test/test/test.py155
4 files changed, 470 insertions, 0 deletions
diff --git a/minishell_test/test/__init__.py b/minishell_test/test/__init__.py
new file mode 100644
index 0000000..cf9949f
--- /dev/null
+++ b/minishell_test/test/__init__.py
@@ -0,0 +1,13 @@
+# ############################################################################ #
+# #
+# ::: :::::::: #
+# __init__.py :+: :+: :+: #
+# +:+ +:+ +:+ #
+# By: charles <me@cacharle.xyz> +#+ +:+ +#+ #
+# +#+#+#+#+#+ +#+ #
+# Created: 2020/09/11 12:18:14 by charles #+# #+# #
+# Updated: 2020/09/11 20:18:10 by charles ### ########.fr #
+# #
+# ############################################################################ #
+
+from test.test import Test # noqa: F401
diff --git a/minishell_test/test/captured.py b/minishell_test/test/captured.py
new file mode 100644
index 0000000..f7dae3e
--- /dev/null
+++ b/minishell_test/test/captured.py
@@ -0,0 +1,56 @@
+# ############################################################################ #
+# #
+# ::: :::::::: #
+# captured.py :+: :+: :+: #
+# +:+ +:+ +:+ #
+# By: charles <me@cacharle.xyz> +#+ +:+ +#+ #
+# +#+#+#+#+#+ +#+ #
+# Created: 2020/09/11 12:16:25 by charles #+# #+# #
+# Updated: 2021/02/04 15:52:19 by charles ### ########.fr #
+# #
+# ############################################################################ #
+
+from typing import List, Optional
+
+import config
+
+
+class Captured:
+ def __init__(
+ self,
+ output: str,
+ status: int,
+ files_content: List[Optional[str]],
+ is_timeout: bool = False
+ ):
+ """Captured class
+ output: captured content
+ status: command status
+ files_content: content of the files altered by the command
+ is_timeout: the command has timed out
+ """
+ lines = output.split('\n')
+ for i, l in enumerate(lines):
+ if l.find(config.REFERENCE_ERROR_BEGIN) == 0:
+ lines[i] = l.replace(config.REFERENCE_ERROR_BEGIN, config.MINISHELL_ERROR_BEGIN, 1)
+ elif l.find(config.REFERENCE_PATH + ": ") == 0:
+ lines[i] = l.replace(config.REFERENCE_PATH + ": ", config.MINISHELL_ERROR_BEGIN, 1)
+ self.output = '\n'.join(lines)
+
+ self.status = status
+ self.files_content = files_content
+ self.is_timeout = is_timeout
+
+ def __eq__(self, other: object) -> bool:
+ if not isinstance(other, Captured):
+ raise NotImplementedError
+ if self.is_timeout:
+ return self.is_timeout == other.is_timeout
+ return (self.output == other.output
+ and self.status == other.status
+ and all(x == y for x, y in zip(self.files_content, other.files_content)))
+
+ @staticmethod
+ def timeout():
+ """Create a new captured timeout"""
+ return Captured("", 0, [], is_timeout=True)
diff --git a/minishell_test/test/result.py b/minishell_test/test/result.py
new file mode 100644
index 0000000..eff7b8b
--- /dev/null
+++ b/minishell_test/test/result.py
@@ -0,0 +1,246 @@
+# ############################################################################ #
+# #
+# ::: :::::::: #
+# result.py :+: :+: :+: #
+# +:+ +:+ +:+ #
+# By: charles <me@cacharle.xyz> +#+ +:+ +#+ #
+# +#+#+#+#+#+ +#+ #
+# Created: 2020/09/11 12:17:34 by charles #+# #+# #
+# Updated: 2021/02/05 01:36:44 by charles ### ########.fr #
+# #
+# ############################################################################ #
+
+import sys
+import re
+from typing import Match, List, Optional
+
+import config
+from test.captured import Captured
+
+
+class BaseResult:
+ RED_CHARS = "\033[31m"
+ GREEN_CHARS = "\033[32m"
+ BLUE_CHARS = "\033[34m"
+ BOLD_CHARS = "\033[1m"
+ CLOSE_CHARS = "\033[0m"
+
+ def __init__(self, cmd: str):
+ self.cmd = cmd
+ self.colored = True
+ self.set_colors()
+
+ @property
+ def passed(self):
+ """Check if the result passed"""
+ raise NotImplementedError
+
+ @property
+ def failed(self):
+ """Check if the result failed"""
+ return not self.passed
+
+ def __repr__(self):
+ """Returns a representation of the result based on the verbosity"""
+ if config.VERBOSE_LEVEL == 0:
+ return self.green('.') if self.passed else self.red('!')
+ if config.VERBOSE_LEVEL == 1:
+ printed = self._escaped_cmd[:]
+ if config.SHOW_RANGE:
+ printed = "{:2}: ".format(self.index) + printed
+ if len(printed) > config.TERM_COLS - 7:
+ printed = printed[:config.TERM_COLS - 10] + "..."
+ fmt = self.green("{:{width}} [PASS]") if self.passed else self.red("{:{width}} [FAIL]")
+ return fmt.format(printed, width=config.TERM_COLS - 7)
+ elif config.VERBOSE_LEVEL == 2:
+ return self.full_diff()
+ else:
+ raise RuntimeError("Invalid verbose level")
+
+ def put(self, index: int) -> None:
+ """Print a summary of the result"""
+ if config.VERBOSE_LEVEL == 2 and self.passed:
+ return
+ self.index = index
+ print(self, end="")
+ if config.VERBOSE_LEVEL == 0:
+ sys.stdout.flush()
+ else:
+ print()
+
+ def indicator(self, title: str, prefix: str) -> str:
+ return self.bold(self.blue(prefix + " " + title))
+
+ def full_diff(self) -> str:
+ raise NotImplementedError
+
+ @property
+ def _escaped_cmd(self):
+ """Escape common control characters"""
+ c = self.cmd
+ c = c.replace("\t", "\\t")
+ c = c.replace("\n", "\\n")
+ c = c.replace("\v", "\\v")
+ c = c.replace("\r", "\\r")
+ c = c.replace("\f", "\\f")
+ return c
+
+ @property
+ def _header_with(self):
+ return self.indicator("WITH {}".format(self._escaped_cmd), "|>") + '\n'
+
+ def set_colors(self):
+ """Set colors strings on or off based on self.colored"""
+ if self.colored:
+ self.color_red = self.RED_CHARS
+ self.color_green = self.GREEN_CHARS
+ self.color_blue = self.BLUE_CHARS
+ self.color_bold = self.BOLD_CHARS
+ self.color_close = self.CLOSE_CHARS
+ else:
+ self.color_red = ""
+ self.color_green = ""
+ self.color_blue = ""
+ self.color_bold = ""
+ self.color_close = ""
+
+ def green(self, s):
+ return self.color_green + s + self.color_close
+
+ def red(self, s):
+ return self.color_red + s + self.color_close
+
+ def blue(self, s):
+ return self.color_blue + s + self.color_close
+
+ def bold(self, s):
+ return self.color_bold + s + self.color_close
+
+
+class Result(BaseResult):
+ def __init__(
+ self,
+ cmd: str,
+ file_names: List[str],
+ expected: Captured,
+ actual: Captured,
+ ):
+ """Result class
+ cmd: runned command
+ file_names: names of watched files
+ expected: expected capture
+ actual: actual capture
+ """
+ self.file_names = file_names
+ self.expected = expected
+ self.actual = actual
+ super().__init__(cmd)
+
+ @property
+ def passed(self):
+ return self.actual == self.expected
+
+ def header(self, title: str) -> str:
+ return self.bold("|---------------------------------------{:-<40}".format(title))
+
+ @property
+ def expected_header(self) -> str:
+ return self.green(self.header("EXPECTED")) + '\n'
+
+ @property
+ def actual_header(self) -> str:
+ return self.red(self.header("ACTUAL")) + '\n'
+
+ def cat_e(self, s: Optional[str]) -> str:
+ """Pass a string through a cat -e like output"""
+ if s is None:
+ return "FROM TEST: File not created\n"
+ s = s.replace("\n", "$\n")
+ if len(s) < 2:
+ return s
+ if s[-1] != '\n':
+ s += '\n'
+ return s
+
+ def file_diff(self, file_name: str, expected: Optional[str], actual: Optional[str]) -> str:
+ """Difference between 2 files"""
+ if expected == actual:
+ return ""
+ file_header = self.indicator("FILE {}".format(file_name), "|#") + '\n'
+ return (
+ file_header
+ + self.expected_header
+ + self.cat_e(expected)
+ + self.actual_header
+ + self.cat_e(actual)
+ )
+
+ def files_diff(self):
+ """Difference between watched files"""
+ return '\n'.join([self.file_diff(n, e, a) for n, e, a in
+ zip(self.file_names,
+ self.expected.files_content,
+ self.actual.files_content)
+ if e != a])
+
+ def output_diff(self) -> str:
+ """Difference in command output"""
+ out = ""
+ if self.actual.is_timeout:
+ return "TIMEOUT\n"
+ if self.expected.status != self.actual.status:
+ out += self.indicator(
+ "STATUS: expected {} actual {}"
+ .format(self.expected.status, self.actual.status), "| "
+ ) + '\n'
+ if self.expected.output != self.actual.output:
+ out += (self.expected_header
+ + self.cat_e(self.expected.output)
+ + self.actual_header
+ + self.cat_e(self.actual.output))
+ return out
+
+ def full_diff(self) -> str:
+ """Concat all difference reports"""
+ return self._header_with + self.output_diff() + self.files_diff() + "=" * 80 + '\n'
+
+
+class LeakResult(BaseResult):
+ def __init__(self, cmd: str, captured: Captured):
+ self.captured = captured
+ super().__init__(cmd)
+
+ def _search_leak_kind(self, kind: str) -> Match:
+ match = re.search(
+ r"==\d+==\s+" + kind + r" lost: (?P<bytes>[0-9,]+) bytes in [0-9,]+ blocks",
+ self.captured.output
+ )
+ if match is None:
+ raise RuntimeError(
+ "valgrind output parsing failed for `{}`:\n{}"
+ .format(self.cmd, self.captured.output)
+ )
+ return match
+
+ @property
+ def _lost_bytes(self):
+ if self.captured.output.find("All heap blocks were freed -- no leaks are possible") != -1:
+ definite_bytes = 0
+ indirect_bytes = 0
+ else:
+ definite_match = self._search_leak_kind("definitely")
+ indirect_match = self._search_leak_kind("indirectly")
+ definite_bytes = int(definite_match.group("bytes").replace(",", ""))
+ indirect_bytes = int(indirect_match.group("bytes").replace(",", ""))
+ return definite_bytes + indirect_bytes
+
+ @property
+ def passed(self):
+ """Check if the result passed"""
+ if self.captured.is_timeout:
+ return False
+ return self._lost_bytes == 0
+
+ def full_diff(self) -> str:
+ """Concat all difference reports"""
+ return self._header_with + self.captured.output
diff --git a/minishell_test/test/test.py b/minishell_test/test/test.py
new file mode 100644
index 0000000..ab68d1e
--- /dev/null
+++ b/minishell_test/test/test.py
@@ -0,0 +1,155 @@
+# ############################################################################ #
+# #
+# ::: :::::::: #
+# test.py :+: :+: :+: #
+# +:+ +:+ +:+ #
+# By: charles <charles.cabergs@gmail.com> +#+ +:+ +#+ #
+# +#+#+#+#+#+ +#+ #
+# Created: 2020/06/16 21:48:50 by charles #+# #+# #
+# Updated: 2021/02/05 01:37:44 by charles ### ########.fr #
+# #
+# ############################################################################ #
+
+import os
+import sys
+import subprocess
+from typing import Optional, List, Dict, Union, Callable
+
+import config
+from test.captured import Captured
+from test.result import Result, LeakResult
+import sandbox
+
+HookType = Union[Callable[[str], str], List[Callable[[str], str]]]
+HookStatusType = Union[Callable[[int], int], List[Callable[[int], int]]]
+
+
+class Test:
+ def __init__(
+ self,
+ cmd: str,
+ setup: str = "",
+ files: List[str] = [],
+ exports: Dict[str, str] = {},
+ timeout: float = config.TIMEOUT,
+ hook: HookType = [],
+ hook_status: HookStatusType = [],
+ ):
+ """ Test class
+ cmd: command to execute
+ setup: command to execute before tested command
+ files: files to watch (check content after test)
+ exports: exported variables
+ timeout: maximum amount of time taken by the test
+ hook: function to execute on the output of the test
+ hook_status: function to execute on status code
+ """
+ self.cmd = cmd
+ self.setup = setup
+ self.files = files
+ self.exports = exports
+ self.result: Optional[Union[Result, LeakResult]] = None
+ self.timeout = timeout
+ if type(hook) is not list:
+ hook = [hook] # type: ignore
+ if type(hook_status) is not list:
+ hook_status = [hook_status] # type: ignore
+ self.hook: List[Callable[[str], str]] = hook # type: ignore
+ self.hook_status: List[Callable[[int], int]] = hook_status # type: ignore
+
+ def run(self, index: int) -> None:
+ """ Run the test for minishell and the reference shell and print the result out """
+
+ if config.CHECK_LEAKS:
+ self.hook = []
+ self.hook_status = []
+ captured = self._run_sandboxed([*config.VALGRIND_CMD, "-c"])
+ if config.VERBOSE_LEVEL == 2:
+ print(captured.output)
+ self.result = LeakResult(self.full_cmd, captured)
+ self.result.put(index)
+ return
+
+ expected = self._run_sandboxed([config.REFERENCE_PATH, *config.REFERENCE_ARGS, "-c"])
+ actual = self._run_sandboxed([config.MINISHELL_PATH, "-c"])
+ self.result = Result(self.full_cmd, self.files, expected, actual)
+ self.result.put(index)
+
+ def _run_sandboxed(self, shell_cmd: List[str]) -> Captured:
+ """ Run the command in a sandbox environment """
+ with sandbox.context():
+ if self.setup != "":
+ try:
+ subprocess.run(
+ self.setup,
+ shell=True,
+ cwd=config.SANDBOX_PATH,
+ stderr=subprocess.STDOUT,
+ stdout=subprocess.PIPE,
+ check=True
+ )
+ except subprocess.CalledProcessError as e:
+ print("Error: `{}` setup command failed for `{}`\n\twith '{}'"
+ .format(self.setup,
+ self.cmd,
+ "no stderr" if e.stdout is None
+ else e.stdout.decode().strip()))
+ sys.exit(1)
+ return self._run_capture(shell_cmd)
+
+ def _run_capture(self, shell_cmd: List[str]) -> Captured:
+ """ Capture the output (stdout and stderr)
+ Capture the content of the watched files after the command is run
+ """
+ # run the command in the sandbox
+ process = subprocess.Popen(
+ [*shell_cmd, self.cmd],
+ stderr=subprocess.STDOUT,
+ stdout=subprocess.PIPE,
+ cwd=config.SANDBOX_PATH,
+ env={
+ 'PATH': config.PATH_VARIABLE,
+ 'TERM': 'xterm-256color',
+ **self.exports,
+ },
+ )
+
+ # https://docs.python.org/3/library/subprocess.html#subprocess.Popen.communicate
+ try:
+ stdout, _ = process.communicate(
+ timeout=(self.timeout if not config.CHECK_LEAKS else config.CHECK_LEAKS_TIMEOUT))
+ except subprocess.TimeoutExpired:
+ process.kill()
+ # _, _ = process.communicate(timeout=2)
+ return Captured.timeout()
+ try:
+ output = stdout.decode()
+ except UnicodeDecodeError:
+ output = "UNICODE ERROR: {}".format(process.stdout)
+
+ # capture watched files content
+ files_content: List[Optional[str]] = []
+ for file_name in self.files:
+ try:
+ with open(os.path.join(config.SANDBOX_PATH, file_name), "rb") as f:
+ files_content.append(f.read().decode())
+ except FileNotFoundError:
+ files_content.append(None)
+
+ # apply output/status hooks
+ for hook in self.hook:
+ output = hook(output)
+ for hook_status in self.hook_status:
+ process.returncode = hook_status(process.returncode)
+ return Captured(output, process.returncode, files_content)
+
+ @property
+ def full_cmd(self):
+ """ Return the command prefixed by the setup and exports """
+ s = self.cmd
+ if len(self.exports) != 0:
+ s = "[EXPORTS {}] {}".format(
+ ' '.join(["{}='{}'".format(k, v) for k, v in self.exports.items()]), s)
+ if self.setup != "":
+ s = "[SETUP {}] {}".format(self.setup, s)
+ return s