aboutsummaryrefslogtreecommitdiff
path: root/src/test/test.py
blob: 9674240adb964d237ad27e7b513f6cf9af9ef5d2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# ############################################################################ #
#                                                                              #
#                                                         :::      ::::::::    #
#    test.py                                            :+:      :+:    :+:    #
#                                                     +:+ +:+         +:+      #
#    By: charles <charles.cabergs@gmail.com>        +#+  +:+       +#+         #
#                                                 +#+#+#+#+#+   +#+            #
#    Created: 2020/06/16 21:48:50 by charles           #+#    #+#              #
#    Updated: 2020/09/11 12:24:33 by charles          ###   ########.fr        #
#                                                                              #
# ############################################################################ #

import os
import sys
import subprocess
import shutil
import glob

import config
from test.captured import Captured
from test.result import Result


class Test:
    def __init__(self,
                 cmd: str,
                 setup: str = "",
                 files: [str] = [],
                 exports: {str: str} = {},
                 timeout: float = config.TIMEOUT):
        self.cmd = cmd
        self.setup = setup
        self.files = files
        self.exports = exports
        self.result = None
        self.timeout = timeout

    def run(self):
        expected = self._run_sandboxed(config.REFERENCE_PATH, "-c")
        actual   = self._run_sandboxed(config.MINISHELL_PATH, "-c")
        s = self.cmd
        if self.setup != "":
            s = "[SETUP {}] {}".format(self.setup, s)
        if len(self.exports) != 0:
            s = "[EXPORTS {}] {}".format(
                    ' '.join(["{}='{:.20}'".format(k, v) for k, v in self.exports.items()]), s)
        self.result = Result(s, self.files, expected, actual)
        self.result.put()

    def _run_sandboxed(self, shell_path: str, shell_option: str) -> Captured:
        """ run the command in a sandbox environment

            capture the output (stdout and stderr)
            capture the content of the watched files after the command is run
        """

        try:
            os.mkdir(config.SANDBOX_PATH)
        except OSError:
            pass
        if self.setup != "":
            try:
                setup_status = subprocess.run(
                    self.setup,
                    shell=True,
                    cwd=config.SANDBOX_PATH,
                    stderr=subprocess.STDOUT,
                    stdout=subprocess.PIPE,
                    check=True
                )
            except subprocess.CalledProcessError as e:
                print("Error: `{}` setup command failed for `{}`\n\twith '{}'"
                      .format(self.setup,
                              self.cmd,
                              "no stderr" if e.stdout is None else e.stdout.decode().strip()))
                sys.exit(1)

        process = subprocess.Popen(
            [shell_path, shell_option, self.cmd],
            stderr=subprocess.STDOUT,
            stdout=subprocess.PIPE,
            cwd=config.SANDBOX_PATH,
            env={
                'PATH': config.PATH_VARIABLE,
                'TERM': 'xterm-256color',
                **self.exports,
            },
        )
        try:
            process.wait(timeout=self.timeout)
        except subprocess.TimeoutExpired:
            return Captured.timeout()

        try:
            stdout, _ = process.communicate()
            output = stdout.decode()
        except UnicodeDecodeError:
            output = "UNICODE ERROR: {}".format(process.stdout)

        # capture watched files content
        files_content = []
        for file_name in self.files:
            try:
                with open(os.path.join(config.SANDBOX_PATH, file_name), "rb") as f:
                    files_content.append(f.read().decode())
            except FileNotFoundError as e:
                files_content.append(None)
        try:
            shutil.rmtree(config.SANDBOX_PATH)
        except:
            subprocess.run(["chmod", "777", *glob.glob(config.SANDBOX_PATH + "/*")], check=True)
            subprocess.run(["rm", "-rf", config.SANDBOX_PATH], check=True)
        return Captured(output, process.returncode, files_content)