aboutsummaryrefslogtreecommitdiff
path: root/src/test
diff options
context:
space:
mode:
authorCharles Cabergs <me@cacharle.xyz>2021-02-05 12:27:32 +0100
committerCharles Cabergs <me@cacharle.xyz>2021-02-05 12:27:32 +0100
commit904a033ae738e1c351f8fef71e2ec2418fc4db3d (patch)
tree3de4980582c109c4f0d19111a2b88eafec9b9b36 /src/test
parenta3e983f78dc4cbcf6f75f78fa2b3c57e09cd1b2b (diff)
downloadminishell_test-904a033ae738e1c351f8fef71e2ec2418fc4db3d.tar.gz
minishell_test-904a033ae738e1c351f8fef71e2ec2418fc4db3d.tar.bz2
minishell_test-904a033ae738e1c351f8fef71e2ec2418fc4db3d.zip
Renaming src -> minishell_test for package name, Renaming main.py -> __main__.py for package execution with python -m
Diffstat (limited to 'src/test')
-rw-r--r--src/test/__init__.py13
-rw-r--r--src/test/captured.py56
-rw-r--r--src/test/result.py246
-rw-r--r--src/test/test.py155
4 files changed, 0 insertions, 470 deletions
diff --git a/src/test/__init__.py b/src/test/__init__.py
deleted file mode 100644
index cf9949f..0000000
--- a/src/test/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# ############################################################################ #
-# #
-# ::: :::::::: #
-# __init__.py :+: :+: :+: #
-# +:+ +:+ +:+ #
-# By: charles <me@cacharle.xyz> +#+ +:+ +#+ #
-# +#+#+#+#+#+ +#+ #
-# Created: 2020/09/11 12:18:14 by charles #+# #+# #
-# Updated: 2020/09/11 20:18:10 by charles ### ########.fr #
-# #
-# ############################################################################ #
-
-from test.test import Test # noqa: F401
diff --git a/src/test/captured.py b/src/test/captured.py
deleted file mode 100644
index f7dae3e..0000000
--- a/src/test/captured.py
+++ /dev/null
@@ -1,56 +0,0 @@
-# ############################################################################ #
-# #
-# ::: :::::::: #
-# captured.py :+: :+: :+: #
-# +:+ +:+ +:+ #
-# By: charles <me@cacharle.xyz> +#+ +:+ +#+ #
-# +#+#+#+#+#+ +#+ #
-# Created: 2020/09/11 12:16:25 by charles #+# #+# #
-# Updated: 2021/02/04 15:52:19 by charles ### ########.fr #
-# #
-# ############################################################################ #
-
-from typing import List, Optional
-
-import config
-
-
-class Captured:
- def __init__(
- self,
- output: str,
- status: int,
- files_content: List[Optional[str]],
- is_timeout: bool = False
- ):
- """Captured class
- output: captured content
- status: command status
- files_content: content of the files altered by the command
- is_timeout: the command has timed out
- """
- lines = output.split('\n')
- for i, l in enumerate(lines):
- if l.find(config.REFERENCE_ERROR_BEGIN) == 0:
- lines[i] = l.replace(config.REFERENCE_ERROR_BEGIN, config.MINISHELL_ERROR_BEGIN, 1)
- elif l.find(config.REFERENCE_PATH + ": ") == 0:
- lines[i] = l.replace(config.REFERENCE_PATH + ": ", config.MINISHELL_ERROR_BEGIN, 1)
- self.output = '\n'.join(lines)
-
- self.status = status
- self.files_content = files_content
- self.is_timeout = is_timeout
-
- def __eq__(self, other: object) -> bool:
- if not isinstance(other, Captured):
- raise NotImplementedError
- if self.is_timeout:
- return self.is_timeout == other.is_timeout
- return (self.output == other.output
- and self.status == other.status
- and all(x == y for x, y in zip(self.files_content, other.files_content)))
-
- @staticmethod
- def timeout():
- """Create a new captured timeout"""
- return Captured("", 0, [], is_timeout=True)
diff --git a/src/test/result.py b/src/test/result.py
deleted file mode 100644
index eff7b8b..0000000
--- a/src/test/result.py
+++ /dev/null
@@ -1,246 +0,0 @@
-# ############################################################################ #
-# #
-# ::: :::::::: #
-# result.py :+: :+: :+: #
-# +:+ +:+ +:+ #
-# By: charles <me@cacharle.xyz> +#+ +:+ +#+ #
-# +#+#+#+#+#+ +#+ #
-# Created: 2020/09/11 12:17:34 by charles #+# #+# #
-# Updated: 2021/02/05 01:36:44 by charles ### ########.fr #
-# #
-# ############################################################################ #
-
-import sys
-import re
-from typing import Match, List, Optional
-
-import config
-from test.captured import Captured
-
-
-class BaseResult:
- RED_CHARS = "\033[31m"
- GREEN_CHARS = "\033[32m"
- BLUE_CHARS = "\033[34m"
- BOLD_CHARS = "\033[1m"
- CLOSE_CHARS = "\033[0m"
-
- def __init__(self, cmd: str):
- self.cmd = cmd
- self.colored = True
- self.set_colors()
-
- @property
- def passed(self):
- """Check if the result passed"""
- raise NotImplementedError
-
- @property
- def failed(self):
- """Check if the result failed"""
- return not self.passed
-
- def __repr__(self):
- """Returns a representation of the result based on the verbosity"""
- if config.VERBOSE_LEVEL == 0:
- return self.green('.') if self.passed else self.red('!')
- if config.VERBOSE_LEVEL == 1:
- printed = self._escaped_cmd[:]
- if config.SHOW_RANGE:
- printed = "{:2}: ".format(self.index) + printed
- if len(printed) > config.TERM_COLS - 7:
- printed = printed[:config.TERM_COLS - 10] + "..."
- fmt = self.green("{:{width}} [PASS]") if self.passed else self.red("{:{width}} [FAIL]")
- return fmt.format(printed, width=config.TERM_COLS - 7)
- elif config.VERBOSE_LEVEL == 2:
- return self.full_diff()
- else:
- raise RuntimeError("Invalid verbose level")
-
- def put(self, index: int) -> None:
- """Print a summary of the result"""
- if config.VERBOSE_LEVEL == 2 and self.passed:
- return
- self.index = index
- print(self, end="")
- if config.VERBOSE_LEVEL == 0:
- sys.stdout.flush()
- else:
- print()
-
- def indicator(self, title: str, prefix: str) -> str:
- return self.bold(self.blue(prefix + " " + title))
-
- def full_diff(self) -> str:
- raise NotImplementedError
-
- @property
- def _escaped_cmd(self):
- """Escape common control characters"""
- c = self.cmd
- c = c.replace("\t", "\\t")
- c = c.replace("\n", "\\n")
- c = c.replace("\v", "\\v")
- c = c.replace("\r", "\\r")
- c = c.replace("\f", "\\f")
- return c
-
- @property
- def _header_with(self):
- return self.indicator("WITH {}".format(self._escaped_cmd), "|>") + '\n'
-
- def set_colors(self):
- """Set colors strings on or off based on self.colored"""
- if self.colored:
- self.color_red = self.RED_CHARS
- self.color_green = self.GREEN_CHARS
- self.color_blue = self.BLUE_CHARS
- self.color_bold = self.BOLD_CHARS
- self.color_close = self.CLOSE_CHARS
- else:
- self.color_red = ""
- self.color_green = ""
- self.color_blue = ""
- self.color_bold = ""
- self.color_close = ""
-
- def green(self, s):
- return self.color_green + s + self.color_close
-
- def red(self, s):
- return self.color_red + s + self.color_close
-
- def blue(self, s):
- return self.color_blue + s + self.color_close
-
- def bold(self, s):
- return self.color_bold + s + self.color_close
-
-
-class Result(BaseResult):
- def __init__(
- self,
- cmd: str,
- file_names: List[str],
- expected: Captured,
- actual: Captured,
- ):
- """Result class
- cmd: runned command
- file_names: names of watched files
- expected: expected capture
- actual: actual capture
- """
- self.file_names = file_names
- self.expected = expected
- self.actual = actual
- super().__init__(cmd)
-
- @property
- def passed(self):
- return self.actual == self.expected
-
- def header(self, title: str) -> str:
- return self.bold("|---------------------------------------{:-<40}".format(title))
-
- @property
- def expected_header(self) -> str:
- return self.green(self.header("EXPECTED")) + '\n'
-
- @property
- def actual_header(self) -> str:
- return self.red(self.header("ACTUAL")) + '\n'
-
- def cat_e(self, s: Optional[str]) -> str:
- """Pass a string through a cat -e like output"""
- if s is None:
- return "FROM TEST: File not created\n"
- s = s.replace("\n", "$\n")
- if len(s) < 2:
- return s
- if s[-1] != '\n':
- s += '\n'
- return s
-
- def file_diff(self, file_name: str, expected: Optional[str], actual: Optional[str]) -> str:
- """Difference between 2 files"""
- if expected == actual:
- return ""
- file_header = self.indicator("FILE {}".format(file_name), "|#") + '\n'
- return (
- file_header
- + self.expected_header
- + self.cat_e(expected)
- + self.actual_header
- + self.cat_e(actual)
- )
-
- def files_diff(self):
- """Difference between watched files"""
- return '\n'.join([self.file_diff(n, e, a) for n, e, a in
- zip(self.file_names,
- self.expected.files_content,
- self.actual.files_content)
- if e != a])
-
- def output_diff(self) -> str:
- """Difference in command output"""
- out = ""
- if self.actual.is_timeout:
- return "TIMEOUT\n"
- if self.expected.status != self.actual.status:
- out += self.indicator(
- "STATUS: expected {} actual {}"
- .format(self.expected.status, self.actual.status), "| "
- ) + '\n'
- if self.expected.output != self.actual.output:
- out += (self.expected_header
- + self.cat_e(self.expected.output)
- + self.actual_header
- + self.cat_e(self.actual.output))
- return out
-
- def full_diff(self) -> str:
- """Concat all difference reports"""
- return self._header_with + self.output_diff() + self.files_diff() + "=" * 80 + '\n'
-
-
-class LeakResult(BaseResult):
- def __init__(self, cmd: str, captured: Captured):
- self.captured = captured
- super().__init__(cmd)
-
- def _search_leak_kind(self, kind: str) -> Match:
- match = re.search(
- r"==\d+==\s+" + kind + r" lost: (?P<bytes>[0-9,]+) bytes in [0-9,]+ blocks",
- self.captured.output
- )
- if match is None:
- raise RuntimeError(
- "valgrind output parsing failed for `{}`:\n{}"
- .format(self.cmd, self.captured.output)
- )
- return match
-
- @property
- def _lost_bytes(self):
- if self.captured.output.find("All heap blocks were freed -- no leaks are possible") != -1:
- definite_bytes = 0
- indirect_bytes = 0
- else:
- definite_match = self._search_leak_kind("definitely")
- indirect_match = self._search_leak_kind("indirectly")
- definite_bytes = int(definite_match.group("bytes").replace(",", ""))
- indirect_bytes = int(indirect_match.group("bytes").replace(",", ""))
- return definite_bytes + indirect_bytes
-
- @property
- def passed(self):
- """Check if the result passed"""
- if self.captured.is_timeout:
- return False
- return self._lost_bytes == 0
-
- def full_diff(self) -> str:
- """Concat all difference reports"""
- return self._header_with + self.captured.output
diff --git a/src/test/test.py b/src/test/test.py
deleted file mode 100644
index ab68d1e..0000000
--- a/src/test/test.py
+++ /dev/null
@@ -1,155 +0,0 @@
-# ############################################################################ #
-# #
-# ::: :::::::: #
-# test.py :+: :+: :+: #
-# +:+ +:+ +:+ #
-# By: charles <charles.cabergs@gmail.com> +#+ +:+ +#+ #
-# +#+#+#+#+#+ +#+ #
-# Created: 2020/06/16 21:48:50 by charles #+# #+# #
-# Updated: 2021/02/05 01:37:44 by charles ### ########.fr #
-# #
-# ############################################################################ #
-
-import os
-import sys
-import subprocess
-from typing import Optional, List, Dict, Union, Callable
-
-import config
-from test.captured import Captured
-from test.result import Result, LeakResult
-import sandbox
-
-HookType = Union[Callable[[str], str], List[Callable[[str], str]]]
-HookStatusType = Union[Callable[[int], int], List[Callable[[int], int]]]
-
-
-class Test:
- def __init__(
- self,
- cmd: str,
- setup: str = "",
- files: List[str] = [],
- exports: Dict[str, str] = {},
- timeout: float = config.TIMEOUT,
- hook: HookType = [],
- hook_status: HookStatusType = [],
- ):
- """ Test class
- cmd: command to execute
- setup: command to execute before tested command
- files: files to watch (check content after test)
- exports: exported variables
- timeout: maximum amount of time taken by the test
- hook: function to execute on the output of the test
- hook_status: function to execute on status code
- """
- self.cmd = cmd
- self.setup = setup
- self.files = files
- self.exports = exports
- self.result: Optional[Union[Result, LeakResult]] = None
- self.timeout = timeout
- if type(hook) is not list:
- hook = [hook] # type: ignore
- if type(hook_status) is not list:
- hook_status = [hook_status] # type: ignore
- self.hook: List[Callable[[str], str]] = hook # type: ignore
- self.hook_status: List[Callable[[int], int]] = hook_status # type: ignore
-
- def run(self, index: int) -> None:
- """ Run the test for minishell and the reference shell and print the result out """
-
- if config.CHECK_LEAKS:
- self.hook = []
- self.hook_status = []
- captured = self._run_sandboxed([*config.VALGRIND_CMD, "-c"])
- if config.VERBOSE_LEVEL == 2:
- print(captured.output)
- self.result = LeakResult(self.full_cmd, captured)
- self.result.put(index)
- return
-
- expected = self._run_sandboxed([config.REFERENCE_PATH, *config.REFERENCE_ARGS, "-c"])
- actual = self._run_sandboxed([config.MINISHELL_PATH, "-c"])
- self.result = Result(self.full_cmd, self.files, expected, actual)
- self.result.put(index)
-
- def _run_sandboxed(self, shell_cmd: List[str]) -> Captured:
- """ Run the command in a sandbox environment """
- with sandbox.context():
- if self.setup != "":
- try:
- subprocess.run(
- self.setup,
- shell=True,
- cwd=config.SANDBOX_PATH,
- stderr=subprocess.STDOUT,
- stdout=subprocess.PIPE,
- check=True
- )
- except subprocess.CalledProcessError as e:
- print("Error: `{}` setup command failed for `{}`\n\twith '{}'"
- .format(self.setup,
- self.cmd,
- "no stderr" if e.stdout is None
- else e.stdout.decode().strip()))
- sys.exit(1)
- return self._run_capture(shell_cmd)
-
- def _run_capture(self, shell_cmd: List[str]) -> Captured:
- """ Capture the output (stdout and stderr)
- Capture the content of the watched files after the command is run
- """
- # run the command in the sandbox
- process = subprocess.Popen(
- [*shell_cmd, self.cmd],
- stderr=subprocess.STDOUT,
- stdout=subprocess.PIPE,
- cwd=config.SANDBOX_PATH,
- env={
- 'PATH': config.PATH_VARIABLE,
- 'TERM': 'xterm-256color',
- **self.exports,
- },
- )
-
- # https://docs.python.org/3/library/subprocess.html#subprocess.Popen.communicate
- try:
- stdout, _ = process.communicate(
- timeout=(self.timeout if not config.CHECK_LEAKS else config.CHECK_LEAKS_TIMEOUT))
- except subprocess.TimeoutExpired:
- process.kill()
- # _, _ = process.communicate(timeout=2)
- return Captured.timeout()
- try:
- output = stdout.decode()
- except UnicodeDecodeError:
- output = "UNICODE ERROR: {}".format(process.stdout)
-
- # capture watched files content
- files_content: List[Optional[str]] = []
- for file_name in self.files:
- try:
- with open(os.path.join(config.SANDBOX_PATH, file_name), "rb") as f:
- files_content.append(f.read().decode())
- except FileNotFoundError:
- files_content.append(None)
-
- # apply output/status hooks
- for hook in self.hook:
- output = hook(output)
- for hook_status in self.hook_status:
- process.returncode = hook_status(process.returncode)
- return Captured(output, process.returncode, files_content)
-
- @property
- def full_cmd(self):
- """ Return the command prefixed by the setup and exports """
- s = self.cmd
- if len(self.exports) != 0:
- s = "[EXPORTS {}] {}".format(
- ' '.join(["{}='{}'".format(k, v) for k, v in self.exports.items()]), s)
- if self.setup != "":
- s = "[SETUP {}] {}".format(self.setup, s)
- return s