aboutsummaryrefslogtreecommitdiff
path: root/src/test/test.py
blob: 48f05a002955b993dff44f424543057908781e94 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# ############################################################################ #
#                                                                              #
#                                                         :::      ::::::::    #
#    test.py                                            :+:      :+:    :+:    #
#                                                     +:+ +:+         +:+      #
#    By: charles <charles.cabergs@gmail.com>        +#+  +:+       +#+         #
#                                                 +#+#+#+#+#+   +#+            #
#    Created: 2020/06/16 21:48:50 by charles           #+#    #+#              #
#    Updated: 2020/09/11 20:00:52 by charles          ###   ########.fr        #
#                                                                              #
# ############################################################################ #

import os
import sys
import subprocess
import time

import config
from test.captured import Captured
from test.result import Result
import sandbox


class Test:
    def __init__(self,
                 cmd: str,
                 setup: str = "",
                 files: [str] = [],
                 exports: {str: str} = {},
                 timeout: float = config.TIMEOUT,
                 signal=None,
                 hook=None):
        self.cmd = cmd
        self.setup = setup
        self.files = files
        self.exports = exports
        self.result = None
        self.timeout = timeout
        self.signal = signal
        self.hook = hook

    def run(self):
        expected = self._run_sandboxed(config.REFERENCE_PATH, "-c")
        actual   = self._run_sandboxed(config.MINISHELL_PATH, "-c")
        s = self.cmd
        if self.setup != "":
            s = "[SETUP {}] {}".format(self.setup, s)
        if len(self.exports) != 0:
            s = "[EXPORTS {}] {}".format(
                ' '.join(["{}='{:.20}'".format(k, v) for k, v in self.exports.items()]), s)
        self.result = Result(s, self.files, expected, actual)
        self.result.put()

    def _run_sandboxed(self, shell_path: str, shell_option: str) -> Captured:
        """ run the command in a sandbox environment

            capture the output (stdout and stderr)
            capture the content of the watched files after the command is run
        """

        sandbox.create()
        if self.setup != "":
            try:
                subprocess.run(
                    self.setup,
                    shell=True,
                    cwd=config.SANDBOX_PATH,
                    stderr=subprocess.STDOUT,
                    stdout=subprocess.PIPE,
                    check=True
                )
            except subprocess.CalledProcessError as e:
                print("Error: `{}` setup command failed for `{}`\n\twith '{}'"
                      .format(self.setup,
                              self.cmd,
                              "no stderr" if e.stdout is None else e.stdout.decode().strip()))
                sys.exit(1)

        process = subprocess.Popen(
            [shell_path, shell_option, self.cmd],
            stderr=subprocess.STDOUT,
            stdout=subprocess.PIPE,
            cwd=config.SANDBOX_PATH,
            env={
                'PATH': config.PATH_VARIABLE,
                'TERM': 'xterm-256color',
                **self.exports,
            },
        )
        if self.signal is not None:
            time.sleep(0.2)
            process.send_signal(self.signal)
        else:
            try:
                process.wait(timeout=self.timeout)
            except subprocess.TimeoutExpired:
                return Captured.timeout()

        try:
            stdout, _ = process.communicate()
            output = stdout.decode()
        except UnicodeDecodeError:
            output = "UNICODE ERROR: {}".format(process.stdout)

        # capture watched files content
        files_content = []
        for file_name in self.files:
            try:
                with open(os.path.join(config.SANDBOX_PATH, file_name), "rb") as f:
                    files_content.append(f.read().decode())
            except FileNotFoundError:
                files_content.append(None)
        sandbox.remove()
        if self.hook is not None:
            output = self.hook(output)
        return Captured(output, process.returncode, files_content)