From 061b6a74ca5076511ae9bcf6a5c8ded8ddb9f8a6 Mon Sep 17 00:00:00 2001 From: Guillaume Chanel <Guillaume.Chanel@unige.ch> Date: Thu, 13 Oct 2022 12:38:31 +0200 Subject: [PATCH] Refactor code with a Shell and Cmd class This refactoring allowed to correct several bugs. TODO: non-blocking I/O TODO: other tests for the next part of the TP --- test/test.py | 290 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 200 insertions(+), 90 deletions(-) diff --git a/test/test.py b/test/test.py index a4aacc4..58a6eea 100644 --- a/test/test.py +++ b/test/test.py @@ -15,6 +15,7 @@ import time import psutil import re + def print_usage(): print("Usage: {} shell_executable_name".format(sys.argv[0])) print("") @@ -22,145 +23,241 @@ def print_usage(): print(" if omitted, the script attempt to find it automatically") +# TODO: move in test class (failed so far) +test_failed = False def test(func): def wrapper(self): + global test_failed print(func.__name__ + ": ", end="") try: func(self) except Exception as e: print(Fore.RED + "FAILED" + Fore.RESET) logging.error(f"{type(e).__name__}:{str(e)}") + test_failed = True return print(Fore.GREEN + "SUCCESS" + Fore.RESET) return wrapper -class Test: - def __init__(self, shell_exec: str) -> None: - self.shell_exec = Path(shell_exec).resolve() - +class Cmd: + def __init__(self, cmd: list[str]) -> None: + self.cmd = cmd - def _get_exit_code_from_stdout(self, stdout: str) -> int: - # Find line with keyword "code" - for line in stdout.splitlines(): - if "code" in line: - # Assumes that exit code is after keyword code and that it is the only digits - return int(''.join([c for c in line.split('code')[-1] if c.isdigit()])) - raise AssertionError('No exit code found') + def __str__(self) -> str: + if self.cmd is None: + return str(None) + else: + return ' '.join(self.cmd).strip() - def _execute_shell_command(self, command: list[str], cwd: str = tempfile.gettempdir(), duration_cmd: float = 0.2, timeout: int = 3): - """ Execute a shell command and tries to determine if the command is correctly executed - """ + def __iter__(self) -> str: + for c in self.cmd: + yield c - # Create a string out of a command - str_cmd = ' '.join(command) +class Shell: + def __init__(self, executable: str, cwd: str = tempfile.gettempdir()) -> None: # Execute the shell and keep track of the process - shell_process = subprocess.Popen( - [self.shell_exec], cwd=cwd, encoding='utf-8', + self.executable = executable + self.shell_process = subprocess.Popen( + [self.executable], cwd=cwd, encoding='utf-8', stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) - shell_ps = psutil.Process(shell_process.pid) - try: - # Execute the command and wait for it to finish - shell_process.stdin.write(str_cmd + '\n') - shell_process.stdin.flush() + self.shell_ps = psutil.Process(self.shell_process.pid) + + # Keep track of last executed command + self.last_cmd = Cmd(None) + + # No ouputs so far... + self.stdout = None + self.stderr = None - #TODO: we wait a bit here to ensure the child is created ? However it seems is it not needed, how to ensure it ? + + # TODO: move out of Shell ? + def check_ophans(self, cmd: Cmd): + """ Check if an orphan process (child of 1) was executed with cmd + cmd: if None the tested command will be the last executed command + """ + if cmd is None: + if self.last_cmd is not None: + cmd = self.last_cmd + else: + return + str_cmd = str(cmd) + init_ps = psutil.Process(1) + for p in init_ps.children(): + if p.cmdline() == str_cmd: + raise AssertionError('The command "{}" seem to be a child of process 1'.format(str_cmd)) + + + def wait_children(self, test_zombies: bool = True, timeout: int = 3): + """ Wait for children of the shell to temrinate + test_zombies: if True the command will raise an AssertionError if some children are zombies + timeout: time after which a Timeout exception is raised if there are still children + """ + # Wait for all children and check zombies / timeout + start_time = time.time() + while True: + # Wait a bit before checking time.sleep(0.1) - # Wait for children to die and check if some are zombies - # TODO: it can happen that the child is stuck (e.g. if it becomes a shell cause cannot execve) in that case the code bellow blocks - children = shell_ps.children() - while(children): + # Check if some children are zombies + children = self.shell_ps.children() + if test_zombies: for c in children: if c.status() == psutil.STATUS_ZOMBIE: - raise AssertionError('The shell child process {} is a zombie for command "{}"'.format(c, str_cmd)) - time.sleep(0.1) - children = shell_ps.children() + raise AssertionError('The shell child process {} is a zombie (last command executed: {})'.format(c, self.last_cmd)) + + # No more children to wait for -> stop looping + if (len(children) == 0): + break + # as the command exceeded the timeout ? + duration = time.time() - start_time + if duration > timeout: + raise psutil.TimeoutExpired('The process took more than the timeout ({}s) to terminate (last command executed: {})'.format(duration, self.last_cmd)) - # Check if command process has become a child of process 1 (e.g. by double fork) - init_ps = psutil.Process(1) - for c in init_ps.children(): - if c.cmdline() == command: - raise AssertionError('The command "{}" seem to be a child of process 1'.format(str_cmd)) - # update current working directory - cwd = shell_ps.cwd() + def exec_command(self, command: Cmd, wait_cmd: bool = True, timeout: int = 3): + """Execute a command in the shell without existing the shell + wait_cmd: wait for all the command processes to finish and raise an error on zombies + timeout: terminate the command if it last longer than timeout (does not apply if wait_cmd = False)""" + # Execute the command + self.shell_process.stdin.write(str(command) + '\n') + self.shell_process.stdin.flush() + self.last_cmd = command - # Terminate shell by using exit command and returning shell outputs - stdout, stderr = shell_process.communicate(input='exit\n', timeout=timeout) + # Wait for the shell childs to finish while checking for zombies + if wait_cmd: + self.wait_children(timeout=timeout) - # Test if shell is still running - if shell_ps.is_running(): - raise AssertionError('Shell is still running after en of communication, either exit is not working or the shell does not terminate on Ctrl+D') - return stdout, stderr, cwd + def exec_commands(self, commands: list[Cmd], wait_cmd: bool = True, timeout: int = 3): + """Execute a list of commands in the shell without existing the shell and exits + wait_cmd: wait for all the command processes to finish and raise an error on zombies + timeout: terminate the command if it last longer than timeout (does not apply if wait_cmd = False)""" + for cmd in commands: + self.exec_command(cmd, wait_cmd, timeout) + self.exit() + + def exit(self): + # We use communicate to be sure that all streams are closed (i.e. process terminated) + timeout = 1 + try: + self.stdout, self.stderr = self.shell_process.communicate(input='exit\n', timeout=timeout) except subprocess.TimeoutExpired: - shell_process.kill() - print('Timeout during command {}'.format(str_cmd)) - raise subprocess.TimeoutExpired + self.shell_process.kill() + raise subprocess.TimeoutExpired('The exit command did not exit the shell after {}s'.format(timeout)) + #TODO: should I check if process is still running at that point (stream closed but process alive ?) - @test - def test_simple_foregroundjob(self): - # sleep is one of the given exemples - self._execute_shell_command(['sleep', '1'], timeout=5) + def read_stdout(self): + if self.stdout is not None: + return self.stdout + else: + raise ValueError('stdout is None probably because exit was not called on the shell before accessing it') - @test - def test_successfull_foregroundjob(self): - # check if command output (stdout, stderr and 0 exit status) is the one expected with ls command - cmd = ['ls', '-l', '--all', '--author', '-h', '-i', '-S'] - std_stdout, std_stderr, _ = self._execute_shell_command(cmd) - # get "real" output - real = subprocess.run(cmd, cwd=tempfile.gettempdir(), capture_output=True, encoding='utf-8') + def read_stderr(self): + if self.stderr is not None: + return self.stderr + else: + raise ValueError('stderr is None probably because exit was not called on the shell before accessing it') - # check standard output - if not real.stdout in std_stdout: - raise AssertionError('The standard output of the command "{}" does not include the following correct result:\n{}\cmd result in shell:\n{}'.format(' '.join(cmd), real.stdout, std_stdout)) - # check standard error - if std_stderr: - raise AssertionError('The standard error of the command "{}" shouldbe empty but contains:\n{}'.format(std_stderr)) + def get_cwd(self): + return self.shell_ps.cwd() - # check return code - std_returncode = self._get_exit_code_from_stdout(std_stdout) - if std_returncode != real.returncode: - raise AssertionError('The command "{}" should return {} but the shell indicates {}'.format(' '.join(cmd), real.returncode, std_returncode)) + def is_running(self): + return self.shell_process.poll() == None - def test_error_foregroundjob(self, cmd: list[str]): - # check if command output (stdout, stderr and 0 exit status) is the one expected with ls command - std_stdout, std_stderr, _ = self._execute_shell_command(cmd) +class Test: + def __init__(self, shell_exec: str) -> None: + self.shell_exec = Path(shell_exec).resolve() + self.shell = None # currently no shell run + #TODO: remove self.shell as it should be passed from function to function (or not in case also correct some function that does) + + + def _get_exit_code_from_stdout(self, stdout: str) -> int: + # Find line with keyword "code" + for line in stdout.splitlines(): + if "code" in line: + # Assumes that exit code is after keyword code and that it is the only digits + return int(''.join([c for c in line.split('code')[-1] if c.isdigit()])) + raise AssertionError('No exit code found') + + def _test_command_results(self, cmd: Cmd, shell: Shell, test_stdout: str, test_stderr: str, test_return: bool): + """ Test if the results (standard outputs and return code) of a command are the correct ones + test_stdout/test_stderr: a string indicating if the standard output should include the normal output ('include'), + be empty ('empty'), or not tested ('notest' or any other string) + test_return: should the return code be tested (relies on the computation of the return code from the standard output) + """ # get "real" output real = subprocess.run(cmd, cwd=tempfile.gettempdir(), capture_output=True, encoding='utf-8') # check standard output - if not real.stderr in std_stderr: - raise AssertionError('The standard output of the command "{}" does not include the following correct result:\n{}\cmd result in shell:\n{}'.format(' '.join(cmd), real.stderr, std_stderr)) + # TODO combine the two tests (the one below) in one fonction + shell_stdout = shell.read_stdout() + if test_stdout == 'include': + if not real.stdout in shell_stdout: + raise AssertionError('The standard output of the command "{}" does not include the following correct result:\n{}\cmd result in shell:\n{}'.format(' '.join(cmd), real.stdout, shell_stdout)) + elif test_stdout == 'empty': + if shell_stdout: + raise AssertionError('The standard error of the command "{}" shouldbe empty but contains:\n{}'.format(shell_stderr)) - # do not check if stdout is empty because it will contain return code... + # check standard output + shell_stderr = shell.read_stderr() + if test_stderr == 'include': + if not real.stderr in shell_stderr: + raise AssertionError('The standard output of the command "{}" does not include the following correct result:\n{}\cmd result in shell:\n{}'.format(' '.join(cmd), real.stderr, shell_stderr)) + elif test_stdout == 'empty': + if shell_stderr: + raise AssertionError('The standard error of the command "{}" shouldbe empty but contains:\n{}'.format(shell_stderr)) # check return code - std_returncode = self._get_exit_code_from_stdout(std_stdout) - if std_returncode != real.returncode: - raise AssertionError('The command "{}" should return {} but the shell indicates {}'.format(' '.join(cmd), real.returncode, std_returncode)) + if test_return: + std_returncode = self._get_exit_code_from_stdout(shell_stdout) + if std_returncode != real.returncode: + raise AssertionError('The command "{}" should return {} but the shell indicates {}'.format(' '.join(cmd), real.returncode, std_returncode)) + + + @test + def test_simple_foregroundjob(self): + # sleep is one of the given exemples + cmd = Cmd(['sleep', '1']) + shell = Shell(self.shell_exec) + shell.exec_commands([cmd], timeout=5) + + + @test + def test_successfull_foregroundjob(self): + # check if command output (stdout, stderr and 0 exit status) is the one expected with ls command + cmd = Cmd(['ls', '-l', '--all', '--author', '-h', '-i', '-S']) + shell = Shell(self.shell_exec) + shell.exec_commands([cmd]) + self._test_command_results(cmd, shell, test_stdout='include', test_stderr='empty', test_return=True) + + + def test_error_foregroundjob(self, cmd: Cmd): + # check if command output (stdout, stderr and 0 exit status) is the one expected with ls command + shell = Shell(self.shell_exec) + shell.exec_commands([cmd]) + self._test_command_results(cmd, shell, test_stdout='notest', test_stderr='include', test_return=True) @test def test_wrongcmd(self): # check if command output (stdout, stderr and 0 exit status) is the one expected with ls command str_cmd = 'ffof cf ee ewpqe pepfiwqnfe ff pife piwfpef pi efqplc c p fpc fpi fip qepi fpiaef pifipewq ipfqepif e pifeq fipqe pifewq pfiewa' - cmd = str_cmd.split(' ') - _, std_stderr, _ = self._execute_shell_command(cmd) - - if not std_stderr: + cmd = Cmd(str_cmd.split(' ')) + shell = Shell(self.shell_exec) + shell.exec_commands([cmd]) + if not shell.read_stderr(): raise AssertionError('The command "{}" should return an error but stderr is empty'.format(str_cmd)) @@ -174,12 +271,12 @@ class Test: @test def test_error_foregroundjob_1(self): - self.test_error_foregroundjob(['ls', '-l', '--all', '--author', '-h', '-i', '-S', 'thisfileshouldnotexist']) + self.test_error_foregroundjob(Cmd(['ls', '-l', '--all', '--author', '-h', '-i', '-S', 'thisfileshouldnotexist'])) test_error_foregroundjob_1(self) @test def test_error_foregroundjob_2(self): - self.test_error_foregroundjob(['stat', 'thisfileshouldnotexist']) + self.test_error_foregroundjob(Cmd(['stat', 'thisfileshouldnotexist'])) test_error_foregroundjob_2(self) @@ -187,21 +284,32 @@ class Test: def test_builtin_exit(self): # Cannot test exit because otherwise the shell will exit before some test on it (see execute_shell_command) # An empty command is tested instead since the exit command is tested anyway at the end - self._execute_shell_command([''], timeout=1) + shell = Shell(self.shell_exec) + cmd = Cmd(['exit']) + shell.exec_command(cmd) + time.sleep(0.5) # wait to be sure that command was executed + if shell.is_running(): + raise AssertionError('Command exit was sent but shell is still running') + shell.exit() @test def test_builtin_cd(self): # Test existing directory dir = tempfile.TemporaryDirectory() - _, _, cwd = self._execute_shell_command(['cd', dir.name], cwd='.', timeout=1) - if dir.name != cwd: - raise AssertionError('Changing directory failed: the directory shouldbe {} but it is {}'.format(dir, cwd)) + cmd = Cmd(['cd', dir.name]) + shell = Shell(self.shell_exec, cwd='.') + shell.exec_command(cmd, timeout=1) + time.sleep(0.5) # to be "sure" that the shell command executed + if dir.name != shell.get_cwd(): + raise AssertionError('Changing directory failed: the directory shouldbe {} but it is {}'.format(dir, shell.get_cwd())) + shell.exit() # Test non-existing directory - cmd = ['cd', 'thisfoldershouldnotexist'] - _, stderr, _ = self._execute_shell_command(cmd, timeout=1) - if not stderr: + cmd = Cmd(['cd', 'thisfoldershouldnotexist']) + shell = Shell(self.shell_exec) + shell.exec_commands([cmd], timeout=1) + if not shell.read_stderr(): raise AssertionError('The command "{}" should return an error but stderr is empty'.format(' '.join(cmd))) @@ -221,6 +329,8 @@ if __name__ == "__main__": # Empty command t.test_builtin() t.test_foregroundjobs() + + sys.exit(test_failed) # # Long nonsensical command # execute_commandon_shell(tp_dir, tp_shell_name, b'ffof cf ee ewpqe pepfiwqnfe ff pife piwfpef pi efqplc c p fpc fpi fip qepi fpiaef pifipewq ipfqepif e pifeq fipqe pifewq pfiewa') # # cd without check it works -- GitLab