Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • soufiane.elkharmo/tp-shell-student
  • gc_courses/sys-exploit/tp-shell-student
2 results
Show changes
Commits on Source (2)
......@@ -15,6 +15,7 @@ import time
import psutil
import re
def print_usage():
print("Usage: {} shell_executable_name".format(sys.argv[0]))
print("")
......@@ -22,145 +23,241 @@ def print_usage():
print(" if omitted, the script attempt to find it automatically")
# TODO: move in test class (failed so far)
test_failed = False
def test(func):
def wrapper(self):
global test_failed
print(func.__name__ + ": ", end="")
try:
func(self)
except Exception as e:
print(Fore.RED + "FAILED" + Fore.RESET)
logging.error(f"{type(e).__name__}:{str(e)}")
test_failed = True
return
print(Fore.GREEN + "SUCCESS" + Fore.RESET)
return wrapper
class Test:
def __init__(self, shell_exec: str) -> None:
self.shell_exec = Path(shell_exec).resolve()
class Cmd:
def __init__(self, cmd: list[str]) -> None:
self.cmd = cmd
def _get_exit_code_from_stdout(self, stdout: str) -> int:
# Find line with keyword "code"
for line in stdout.splitlines():
if "code" in line:
# Assumes that exit code is after keyword code and that it is the only digits
return int(''.join([c for c in line.split('code')[-1] if c.isdigit()]))
raise AssertionError('No exit code found')
def __str__(self) -> str:
if self.cmd is None:
return str(None)
else:
return ' '.join(self.cmd).strip()
def _execute_shell_command(self, command: list[str], cwd: str = tempfile.gettempdir(), duration_cmd: float = 0.2, timeout: int = 3):
""" Execute a shell command and tries to determine if the command is correctly executed
"""
def __iter__(self) -> str:
for c in self.cmd:
yield c
# Create a string out of a command
str_cmd = ' '.join(command)
class Shell:
def __init__(self, executable: str, cwd: str = tempfile.gettempdir()) -> None:
# Execute the shell and keep track of the process
shell_process = subprocess.Popen(
[self.shell_exec], cwd=cwd, encoding='utf-8',
self.executable = executable
self.shell_process = subprocess.Popen(
[self.executable], cwd=cwd, encoding='utf-8',
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
shell_ps = psutil.Process(shell_process.pid)
try:
# Execute the command and wait for it to finish
shell_process.stdin.write(str_cmd + '\n')
shell_process.stdin.flush()
self.shell_ps = psutil.Process(self.shell_process.pid)
# Keep track of last executed command
self.last_cmd = Cmd(None)
# No ouputs so far...
self.stdout = None
self.stderr = None
#TODO: we wait a bit here to ensure the child is created ? However it seems is it not needed, how to ensure it ?
# TODO: move out of Shell ?
def check_ophans(self, cmd: Cmd):
""" Check if an orphan process (child of 1) was executed with cmd
cmd: if None the tested command will be the last executed command
"""
if cmd is None:
if self.last_cmd is not None:
cmd = self.last_cmd
else:
return
str_cmd = str(cmd)
init_ps = psutil.Process(1)
for p in init_ps.children():
if p.cmdline() == str_cmd:
raise AssertionError('The command "{}" seem to be a child of process 1'.format(str_cmd))
def wait_children(self, test_zombies: bool = True, timeout: int = 3):
""" Wait for children of the shell to temrinate
test_zombies: if True the command will raise an AssertionError if some children are zombies
timeout: time after which a Timeout exception is raised if there are still children
"""
# Wait for all children and check zombies / timeout
start_time = time.time()
while True:
# Wait a bit before checking
time.sleep(0.1)
# Wait for children to die and check if some are zombies
# TODO: it can happen that the child is stuck (e.g. if it becomes a shell cause cannot execve) in that case the code bellow blocks
children = shell_ps.children()
while(children):
# Check if some children are zombies
children = self.shell_ps.children()
if test_zombies:
for c in children:
if c.status() == psutil.STATUS_ZOMBIE:
raise AssertionError('The shell child process {} is a zombie for command "{}"'.format(c, str_cmd))
time.sleep(0.1)
children = shell_ps.children()
raise AssertionError('The shell child process {} is a zombie (last command executed: {})'.format(c, self.last_cmd))
# No more children to wait for -> stop looping
if (len(children) == 0):
break
# as the command exceeded the timeout ?
duration = time.time() - start_time
if duration > timeout:
raise psutil.TimeoutExpired('The process took more than the timeout ({}s) to terminate (last command executed: {})'.format(duration, self.last_cmd))
# Check if command process has become a child of process 1 (e.g. by double fork)
init_ps = psutil.Process(1)
for c in init_ps.children():
if c.cmdline() == command:
raise AssertionError('The command "{}" seem to be a child of process 1'.format(str_cmd))
# update current working directory
cwd = shell_ps.cwd()
def exec_command(self, command: Cmd, wait_cmd: bool = True, timeout: int = 3):
"""Execute a command in the shell without existing the shell
wait_cmd: wait for all the command processes to finish and raise an error on zombies
timeout: terminate the command if it last longer than timeout (does not apply if wait_cmd = False)"""
# Execute the command
self.shell_process.stdin.write(str(command) + '\n')
self.shell_process.stdin.flush()
self.last_cmd = command
# Terminate shell by using exit command and returning shell outputs
stdout, stderr = shell_process.communicate(input='exit\n', timeout=timeout)
# Wait for the shell childs to finish while checking for zombies
if wait_cmd:
self.wait_children(timeout=timeout)
# Test if shell is still running
if shell_ps.is_running():
raise AssertionError('Shell is still running after en of communication, either exit is not working or the shell does not terminate on Ctrl+D')
return stdout, stderr, cwd
def exec_commands(self, commands: list[Cmd], wait_cmd: bool = True, timeout: int = 3):
"""Execute a list of commands in the shell without existing the shell and exits
wait_cmd: wait for all the command processes to finish and raise an error on zombies
timeout: terminate the command if it last longer than timeout (does not apply if wait_cmd = False)"""
for cmd in commands:
self.exec_command(cmd, wait_cmd, timeout)
self.exit()
def exit(self):
# We use communicate to be sure that all streams are closed (i.e. process terminated)
timeout = 1
try:
self.stdout, self.stderr = self.shell_process.communicate(input='exit\n', timeout=timeout)
except subprocess.TimeoutExpired:
shell_process.kill()
print('Timeout during command {}'.format(str_cmd))
raise subprocess.TimeoutExpired
self.shell_process.kill()
raise subprocess.TimeoutExpired('The exit command did not exit the shell after {}s'.format(timeout))
#TODO: should I check if process is still running at that point (stream closed but process alive ?)
@test
def test_simple_foregroundjob(self):
# sleep is one of the given exemples
self._execute_shell_command(['sleep', '1'], timeout=5)
def read_stdout(self):
if self.stdout is not None:
return self.stdout
else:
raise ValueError('stdout is None probably because exit was not called on the shell before accessing it')
@test
def test_successfull_foregroundjob(self):
# check if command output (stdout, stderr and 0 exit status) is the one expected with ls command
cmd = ['ls', '-l', '--all', '--author', '-h', '-i', '-S']
std_stdout, std_stderr, _ = self._execute_shell_command(cmd)
# get "real" output
real = subprocess.run(cmd, cwd=tempfile.gettempdir(), capture_output=True, encoding='utf-8')
def read_stderr(self):
if self.stderr is not None:
return self.stderr
else:
raise ValueError('stderr is None probably because exit was not called on the shell before accessing it')
# check standard output
if not real.stdout in std_stdout:
raise AssertionError('The standard output of the command "{}" does not include the following correct result:\n{}\cmd result in shell:\n{}'.format(' '.join(cmd), real.stdout, std_stdout))
# check standard error
if std_stderr:
raise AssertionError('The standard error of the command "{}" shouldbe empty but contains:\n{}'.format(std_stderr))
def get_cwd(self):
return self.shell_ps.cwd()
# check return code
std_returncode = self._get_exit_code_from_stdout(std_stdout)
if std_returncode != real.returncode:
raise AssertionError('The command "{}" should return {} but the shell indicates {}'.format(' '.join(cmd), real.returncode, std_returncode))
def is_running(self):
return self.shell_process.poll() == None
def test_error_foregroundjob(self, cmd: list[str]):
# check if command output (stdout, stderr and 0 exit status) is the one expected with ls command
std_stdout, std_stderr, _ = self._execute_shell_command(cmd)
class Test:
def __init__(self, shell_exec: str) -> None:
self.shell_exec = Path(shell_exec).resolve()
self.shell = None # currently no shell run
#TODO: remove self.shell as it should be passed from function to function (or not in case also correct some function that does)
def _get_exit_code_from_stdout(self, stdout: str) -> int:
# Find line with keyword "code"
for line in stdout.splitlines():
if "code" in line:
# Assumes that exit code is after keyword code and that it is the only digits
return int(''.join([c for c in line.split('code')[-1] if c.isdigit()]))
raise AssertionError('No exit code found')
def _test_command_results(self, cmd: Cmd, shell: Shell, test_stdout: str, test_stderr: str, test_return: bool):
""" Test if the results (standard outputs and return code) of a command are the correct ones
test_stdout/test_stderr: a string indicating if the standard output should include the normal output ('include'),
be empty ('empty'), or not tested ('notest' or any other string)
test_return: should the return code be tested (relies on the computation of the return code from the standard output)
"""
# get "real" output
real = subprocess.run(cmd, cwd=tempfile.gettempdir(), capture_output=True, encoding='utf-8')
# check standard output
if not real.stderr in std_stderr:
raise AssertionError('The standard output of the command "{}" does not include the following correct result:\n{}\cmd result in shell:\n{}'.format(' '.join(cmd), real.stderr, std_stderr))
# TODO combine the two tests (the one below) in one fonction
shell_stdout = shell.read_stdout()
if test_stdout == 'include':
if not real.stdout in shell_stdout:
raise AssertionError('The standard output of the command "{}" does not include the following correct result:\n{}\cmd result in shell:\n{}'.format(' '.join(cmd), real.stdout, shell_stdout))
elif test_stdout == 'empty':
if shell_stdout:
raise AssertionError('The standard error of the command "{}" shouldbe empty but contains:\n{}'.format(shell_stderr))
# do not check if stdout is empty because it will contain return code...
# check standard output
shell_stderr = shell.read_stderr()
if test_stderr == 'include':
if not real.stderr in shell_stderr:
raise AssertionError('The standard output of the command "{}" does not include the following correct result:\n{}\cmd result in shell:\n{}'.format(' '.join(cmd), real.stderr, shell_stderr))
elif test_stdout == 'empty':
if shell_stderr:
raise AssertionError('The standard error of the command "{}" shouldbe empty but contains:\n{}'.format(shell_stderr))
# check return code
std_returncode = self._get_exit_code_from_stdout(std_stdout)
if std_returncode != real.returncode:
raise AssertionError('The command "{}" should return {} but the shell indicates {}'.format(' '.join(cmd), real.returncode, std_returncode))
if test_return:
std_returncode = self._get_exit_code_from_stdout(shell_stdout)
if std_returncode != real.returncode:
raise AssertionError('The command "{}" should return {} but the shell indicates {}'.format(' '.join(cmd), real.returncode, std_returncode))
@test
def test_simple_foregroundjob(self):
# sleep is one of the given exemples
cmd = Cmd(['sleep', '1'])
shell = Shell(self.shell_exec)
shell.exec_commands([cmd], timeout=5)
@test
def test_successfull_foregroundjob(self):
# check if command output (stdout, stderr and 0 exit status) is the one expected with ls command
cmd = Cmd(['ls', '-l', '--all', '--author', '-h', '-i', '-S'])
shell = Shell(self.shell_exec)
shell.exec_commands([cmd])
self._test_command_results(cmd, shell, test_stdout='include', test_stderr='empty', test_return=True)
def test_error_foregroundjob(self, cmd: Cmd):
# check if command output (stdout, stderr and 0 exit status) is the one expected with ls command
shell = Shell(self.shell_exec)
shell.exec_commands([cmd])
self._test_command_results(cmd, shell, test_stdout='notest', test_stderr='include', test_return=True)
@test
def test_wrongcmd(self):
# check if command output (stdout, stderr and 0 exit status) is the one expected with ls command
str_cmd = 'ffof cf ee ewpqe pepfiwqnfe ff pife piwfpef pi efqplc c p fpc fpi fip qepi fpiaef pifipewq ipfqepif e pifeq fipqe pifewq pfiewa'
cmd = str_cmd.split(' ')
_, std_stderr, _ = self._execute_shell_command(cmd)
if not std_stderr:
cmd = Cmd(str_cmd.split(' '))
shell = Shell(self.shell_exec)
shell.exec_commands([cmd])
if not shell.read_stderr():
raise AssertionError('The command "{}" should return an error but stderr is empty'.format(str_cmd))
......@@ -174,12 +271,12 @@ class Test:
@test
def test_error_foregroundjob_1(self):
self.test_error_foregroundjob(['ls', '-l', '--all', '--author', '-h', '-i', '-S', 'thisfileshouldnotexist'])
self.test_error_foregroundjob(Cmd(['ls', '-l', '--all', '--author', '-h', '-i', '-S', 'thisfileshouldnotexist']))
test_error_foregroundjob_1(self)
@test
def test_error_foregroundjob_2(self):
self.test_error_foregroundjob(['stat', 'thisfileshouldnotexist'])
self.test_error_foregroundjob(Cmd(['stat', 'thisfileshouldnotexist']))
test_error_foregroundjob_2(self)
......@@ -187,21 +284,32 @@ class Test:
def test_builtin_exit(self):
# Cannot test exit because otherwise the shell will exit before some test on it (see execute_shell_command)
# An empty command is tested instead since the exit command is tested anyway at the end
self._execute_shell_command([''], timeout=1)
shell = Shell(self.shell_exec)
cmd = Cmd(['exit'])
shell.exec_command(cmd)
time.sleep(0.5) # wait to be sure that command was executed
if shell.is_running():
raise AssertionError('Command exit was sent but shell is still running')
shell.exit()
@test
def test_builtin_cd(self):
# Test existing directory
dir = tempfile.TemporaryDirectory()
_, _, cwd = self._execute_shell_command(['cd', dir.name], cwd='.', timeout=1)
if dir.name != cwd:
raise AssertionError('Changing directory failed: the directory shouldbe {} but it is {}'.format(dir, cwd))
cmd = Cmd(['cd', dir.name])
shell = Shell(self.shell_exec, cwd='.')
shell.exec_command(cmd, timeout=1)
time.sleep(0.5) # to be "sure" that the shell command executed
if dir.name != shell.get_cwd():
raise AssertionError('Changing directory failed: the directory shouldbe {} but it is {}'.format(dir, shell.get_cwd()))
shell.exit()
# Test non-existing directory
cmd = ['cd', 'thisfoldershouldnotexist']
_, stderr, _ = self._execute_shell_command(cmd, timeout=1)
if not stderr:
cmd = Cmd(['cd', 'thisfoldershouldnotexist'])
shell = Shell(self.shell_exec)
shell.exec_commands([cmd], timeout=1)
if not shell.read_stderr():
raise AssertionError('The command "{}" should return an error but stderr is empty'.format(' '.join(cmd)))
......@@ -221,6 +329,8 @@ if __name__ == "__main__":
# Empty command
t.test_builtin()
t.test_foregroundjobs()
sys.exit(test_failed)
# # Long nonsensical command
# execute_commandon_shell(tp_dir, tp_shell_name, b'ffof cf ee ewpqe pepfiwqnfe ff pife piwfpef pi efqplc c p fpc fpi fip qepi fpiaef pifipewq ipfqepif e pifeq fipqe pifewq pfiewa')
# # cd without check it works
......