From fd0fc0eadc81c55e35199972107afe96ec70ad2f Mon Sep 17 00:00:00 2001 From: Adrien Lescourt <adrien@lescourt.net> Date: Wed, 9 Apr 2025 12:58:58 +0200 Subject: [PATCH] Refactoring --- pwm | 843 +++++++++++++++++++++++++++++------------------------------- 1 file changed, 411 insertions(+), 432 deletions(-) diff --git a/pwm b/pwm index 2d30be1..07fa11f 100755 --- a/pwm +++ b/pwm @@ -2,508 +2,487 @@ """ Practical Work Manager (pwm) -Steven Liatti, Florent Gluck -2020-2022 +Steven Liatti, Florent Gluck, Adrien Lescourt +2020-2025 """ -from argparse import Namespace -import os +from dataclasses import dataclass +from requests.models import Response from typing import Any, Dict, List, Optional +import argparse +import json +import os import requests import subprocess -import argparse -from requests.models import Response import yaml -import json - -BASE_URL: str = 'https://gitedu.hesge.ch' -BASE_API_URL: str = BASE_URL + '/api/v4' -TOKEN_URL: str = BASE_URL + '/profile/personal_access_tokens' - -def create_group(token: str, name: str, visibility: str = 'private') -> str: - """ - Create gitlab group from name and visibility given. Need valid api token. - Return group_id created. - """ - params = {'path': name, 'name': name, 'visibility': visibility} - headers = {'PRIVATE-TOKEN': token} - group = requests.post(BASE_API_URL + '/groups', - params=params, headers=headers).json() - if 'message' in group: - print('Error in creating group: %s' % group) - exit(1) +BASE_URL: str = "https://gitedu.hesge.ch" +BASE_API_URL: str = BASE_URL + "/api/v4" +TOKEN_URL: str = BASE_URL + "/-/user_settings/personal_access_tokens" - print("Group '" + group['name'] + "' with id '" + str(group['id']) + "' and visibility '" + - group['visibility'] + "' available at '" + group['web_url'] + "'") - return str(group['id']) +@dataclass +class User: + def __init__(self, user_id: str) -> None: + self.id = user_id -def delete_group(token: str, group_id: str): - """ - Delete a group and all subprojects. - """ - headers = {'PRIVATE-TOKEN': token} - deleted_group = requests.delete( - BASE_API_URL + '/groups/' + group_id, headers=headers).json() - if 'message' in deleted_group: - if deleted_group['message'] != '202 Accepted': - print('Error in deleting group: %s' % deleted_group) - exit(1) - else: - print('Group ' + group_id + ' successfully deleted') +@dataclass +class Group: + def __init__(self, name: str, user_ids: List[User]) -> None: + self.name = name + self.user_ids = user_ids -def emails_to_ids(emails: List[str], headers: Dict[str, str]) -> List[int]: - """ - Get students ids from their emails - """ - user_ids = [] - - for email in emails: - # Dirty and hackish way that attempts to extract the username from the email. - # It's inefficient, but currently there is no way to reliably obtain - # the username from the email. - # new strategy: - stop when we reach the "." or when we get more than 1 result - # Example of tricky users: - # pierre-louis.roden@etu.hesge.ch -> login AAI: pierrelo.roden - # joel.ferreirapinto@etu.hesge.ch -> login AAI: joelfili.ferreira - username = email.split("@")[0] - #print("Email: ",email) - - while len(username) > 1: - #print("Guessed username: ",username) - user_requested = requests.get(BASE_API_URL + '/users', params={'search': username}, headers=headers).json() - - nb_users = len(user_requested) - if nb_users == 0: - #print('No user %s found, another try...' % email) - lastchar = username[-1] - username = username.rstrip(lastchar) - if username[-1] == '.': # stop if we reach the . -> probably at least one wrong person match the first name - break - continue - elif nb_users > 1: - print('Too many users found for email %s, aborting.' % email) - exit(1) - #print(json.dumps(user_requested, indent=4)) - user_ids.append(user_requested[0]['id']) - return user_ids +@dataclass +class DataSource: + def __init__(self, groups: List[Group]) -> None: + self.groups = groups - print('User %s not found, aborting.' % email) - exit(1) + @classmethod + def from_yaml(cls, yaml_file_path: str) -> "DataSource": + if not cls._validate_yaml(yaml_file_path): + print("Syntax error in YAML file.\nAborted.") + exit(1) + groups = [] + repos = yaml.full_load(yaml_file_path) + for repo in repos: + if "name" in repo: + name = repo["name"] + else: + name = repo["users_login_aai"] + groups.append(Group(name, repo["users"])) + return cls(groups) + + + @staticmethod + def _validate_yaml(yaml_file_path) -> bool: + with open(args.repos_file) as f: + repos = yaml.full_load(f) + for repo in repos: + if "name" in repo: + pass + elif "users_login_aai" in repo: + pass + else: + return False + return True + + +class Gitlab: + def __init__(self, token: str): + self.token = token + + def create_group(self, group: Group, visibility: str = "private") -> str: + """Create gitlab group, Returns group_id created.""" + + params = {"path": group.name, "name": group.name, "visibility": visibility} + headers = {"PRIVATE-TOKEN": self.token} + group_res = requests.post( + BASE_API_URL + "/group_ress", params=params, headers=headers + ).json() + if "message" in group_res: + print("Error in creating group_res: %s" % group_res) + exit(1) -def create_repository(token: str, group_id: str, emails: List[str], name: str, import_url: Optional[str], expires_at: Optional[str]): - """ - Create repository in group_id, with members from emails, a name, and - optional import_url and expiration date. - """ - headers = {'PRIVATE-TOKEN': token} - - # Create project from name, import_url (if given) and group_id - params = {'name': name, 'namespace_id': group_id, 'visibility': 'private'} - if import_url: - params['import_url'] = import_url - project = requests.post(BASE_API_URL + '/projects', - params=params, headers=headers).json() - if 'message' in project: - print('Error in creating project: %s' % project) - exit(1) - print("Project '" + project['name'] + "' at '" + - project['web_url'] + "' created") - - # Allow users with developer access level to push and merge on master - access_level = 40 - params = {'name': 'master', 'push_access_level': str( - access_level), 'merge_access_level': str(access_level)} - requests.post(BASE_API_URL + '/projects/' + - str(project['id']) + '/protected_branches', params=params, headers=headers).json() - - # Get students ids from their emails - user_ids = emails_to_ids(emails, headers) - - # Add each student as maintainer (level 40) - for user_id in user_ids: - params = {'user_id': user_id, 'access_level': access_level} - if expires_at: - params['expires_at'] = expires_at - new_user = requests.post(BASE_API_URL + '/projects/' + str( - project['id']) + '/members', params=params, headers=headers).json() - if 'message' in new_user: - print('Error in adding user: %s' % new_user) - else: - out = ("Adding '" + new_user['name'] + "' (" + new_user['username'] + ") in '" - + project['name'] + "' with access level: " + str(new_user['access_level'])) + print( + "group_res '" + + group_res["name"] + + "' with id '" + + str(group_res["id"]) + + "' and visibility '" + + group_res["visibility"] + + "' available at '" + + group_res["web_url"] + + "'" + ) + return str(group_res["id"]) + + def delete_group(self, group_id: str): + """Delete a group and all subprojects.""" + headers = {"PRIVATE-TOKEN": self.token} + deleted_group = requests.delete( + BASE_API_URL + "/groups/" + group_id, headers=headers + ).json() + if "message" in deleted_group: + if deleted_group["message"] != "202 Accepted": + print("Error in deleting group: %s" % deleted_group) + exit(1) + else: + print("Group " + group_id + " successfully deleted") + + def create_repository( + self, + group_id: str, + users: List[User], + name: str, + import_url: Optional[str], + expires_at: Optional[str], + ): + """ + Create repository in group_id, with members from user_ids, a name, and + optional import_url and expiration date. + """ + headers = {"PRIVATE-TOKEN": self.token} + + # Create project from name, import_url (if given) and group_id + params = {"name": name, "namespace_id": group_id, "visibility": "private"} + if import_url: + params["import_url"] = import_url + project = requests.post( + BASE_API_URL + "/projects", params=params, headers=headers + ).json() + if "message" in project: + print("Error in creating project: %s" % project) + exit(1) + print( + "Project '" + project["name"] + "' at '" + project["web_url"] + "' created" + ) + + # Allow users with developer access level to push and merge on master + access_level = 40 + params = { + "name": "master", + "push_access_level": str(access_level), + "merge_access_level": str(access_level), + } + requests.post( + BASE_API_URL + "/projects/" + str(project["id"]) + "/protected_branches", + params=params, + headers=headers, + ).json() + + # Add each student as maintainer (level 40) + for user in users: + params = {"user_id": user.id, "access_level": access_level} if expires_at: - out += ", expires at: " + new_user['expires_at'] - print(out) - - -def paginate_responses(url: str, headers: Dict[str, str], params: Dict[str, Any]) -> List[Response]: - """ - Manage gitlab pagination, max 100 results by request - """ - responses = [requests.get(url, params=params, headers=headers)] - last_response = responses[len(responses) - 1] - - while last_response.status_code == 200 and len(last_response.headers['X-Next-Page']) != 0: - next_page = last_response.headers['X-Next-Page'] - params['page'] = next_page - responses.append(requests.get(url, params=params, headers=headers)) + params["expires_at"] = expires_at + new_user = requests.post( + BASE_API_URL + "/projects/" + str(project["id"]) + "/members", + params=params, + headers=headers, + ).json() + if "message" in new_user: + print("Error in adding user: %s" % new_user) + else: + out = ( + "Adding '" + + new_user["name"] + + "' (" + + new_user["username"] + + ") in '" + + project["name"] + + "' with access level: " + + str(new_user["access_level"]) + ) + if expires_at: + out += ", expires at: " + new_user["expires_at"] + print(out) + + @staticmethod + def _paginate_responses( + url: str, headers: Dict[str, str], params: Dict[str, Any] + ) -> List[Response]: + """ + Manage gitlab pagination, max 100 results by request + """ + responses = [requests.get(url, params=params, headers=headers)] last_response = responses[len(responses) - 1] - return responses - - -def get_members(token: str, id: str) -> List: - """ - Return members list from given id - """ - url = BASE_API_URL + '/projects/' + id + '/members' - - headers = {'PRIVATE-TOKEN': token} - params = {'simple': 'true', 'order_by': 'name', - 'sort': 'asc', 'per_page': 100} - responses = paginate_responses(url, headers, params) - - members = [] - for r in responses: - members += r.json() - return members - - -def get_projects(token: str, id: str, source: str = 'group') -> List: - """ - Return projects list from given id and source ('group' or 'forks') - """ - if source == 'forks': - url = BASE_API_URL + '/projects/' + id + '/forks' - else: - url = BASE_API_URL + '/groups/' + id + '/projects' - - headers = {'PRIVATE-TOKEN': token} - params = {'simple': 'true', 'order_by': 'name', - 'sort': 'asc', 'per_page': 100} - responses = paginate_responses(url, headers, params) - - projects = [] - for r in responses: - projects += r.json() - return projects + if last_response.status_code != 200: + print(last_response.text) + return [] + + while ( + last_response.status_code == 200 + and len(last_response.headers["X-Next-Page"]) != 0 + ): + next_page = last_response.headers["X-Next-Page"] + params["page"] = next_page + responses.append(requests.get(url, params=params, headers=headers)) + last_response = responses[len(responses) - 1] + + return responses + + def get_users_in_repository(self, id: str) -> List: + """ + Return members list from given id + """ + # BUG: does not work if the repo members are inherited from a group + url = BASE_API_URL + "/projects/" + id + "/members" + + headers = {"PRIVATE-TOKEN": self.token} + params = {"simple": "true", "order_by": "name", "sort": "asc", "per_page": 100} + responses = self._paginate_responses(url, headers, params) + + members = [] + for r in responses: + members += r.json() + return members + + def get_projects_in_group(self, id: str) -> List: + """ + Return projects list from given group id + """ + url = BASE_API_URL + "/groups/" + id + "/projects" + headers = {"PRIVATE-TOKEN": self.token} + params = {"simple": "true", "order_by": "name", "sort": "asc", "per_page": 100} + responses = self._paginate_responses(url, headers, params) + + projects = [] + for r in responses: + projects += r.json() + return projects + + def clone_all(self, id: str, directory: str, until_date: Optional[str]): + """ + Clone all repositories from a group id in directory (created in function). + """ + try: + os.mkdir(directory) + except OSError: + print("Creation of the directory '%s' failed, exit\n" % directory) + exit(1) + headers = {"PRIVATE-TOKEN": self.token} + repositories = self.get_projects_in_group(id) -def clone_all(token: str, id: str, directory: str, until_date: Optional[str], source: str = 'group', use_http: bool = True): - """ - Clone all repositories (from a group or "forks of") from id (group or - project id) in directory (created in function). - """ - try: - os.mkdir(directory) - except OSError: - print("Creation of the directory '%s' failed, exit\n" % directory) - exit(1) - - headers = {'PRIVATE-TOKEN': token} - repositories = get_projects(token, id, source) - - for repo in repositories: - repo_url = BASE_API_URL + '/projects/' + str(repo['id']) + '/members' - members = requests.get(repo_url, headers=headers).json() - if 'message' in members: - print('Error retrieving members: ' + members['message']) - exit(1) + for repo in repositories: + repo_url = BASE_API_URL + "/projects/" + str(repo["id"]) + "/members" + members = requests.get(repo_url, headers=headers).json() + if "message" in members: + print("Error retrieving members: " + members["message"]) + exit(1) - ssh_url_to_repo = repo['ssh_url_to_repo'] - web_url = repo['web_url'] - members_names = '' + ssh_url_to_repo = repo["ssh_url_to_repo"] + web_url = repo["web_url"] + members_names = "" - for member in members: - if member['access_level'] > 20: # Access level greater than "Reporter" - members_names += member['username'] + ', ' + for member in members: + if member["access_level"] > 20: # Access level greater than "Reporter" + members_names += member["username"] + ", " - if source == 'forks': - repo_local_name = repo['namespace']['path'] - else: - repo_local_name = repo['path'] + repo_local_name = repo["path"] - print('Members: ' + members_names) - print('Web url: ' + web_url) - print('Cloning in "' + directory + '/' + repo_local_name + '"') + print("Members: " + members_names) + print("Web url: " + web_url) + print('Cloning in "' + directory + "/" + repo_local_name + '"') - if use_http: scheme = "https://" after_https = BASE_URL.find(scheme) + len(scheme) - url = BASE_URL[:after_https] + "{}:{}@".format("gitlab-ci-token", token) + BASE_URL[after_https:] - subprocess.run(["git", "clone", "-q", web_url.replace(BASE_URL, url), - directory + '/' + repo_local_name]) - else: - subprocess.run(["git", "clone", "-q", ssh_url_to_repo, - directory + '/' + repo_local_name]) - - if until_date: - commit_id = subprocess.check_output([ - "git", "rev-list", "-n", "1", "--before=\"" + until_date + "\"", - "master"], cwd=directory + '/' + repo_local_name).decode('utf-8').rstrip() + url = ( + BASE_URL[:after_https] + + "{}:{}@".format("gitlab-ci-token", self.token) + + BASE_URL[after_https:] + ) subprocess.run( - ["git", "checkout", "-q", str(commit_id)], - cwd=directory + '/' + repo_local_name) - print("Checkout at " + str(commit_id) + "\n") - else: - print() - - -def validate_yaml(args): - """ - Verify that the yaml file is valid by checking: - - the yaml syntax - - that a user id can be retrieved from the email (through some hackish inference) - This function stops the program if a check fails. - """ - with open(args.repos_file) as f: - repos = yaml.full_load(f) - for repo in repos: - if 'name' in repo: - name = repo['name'] - elif 'emails' in repo: - name = repo['emails'][0].split('@')[0] + [ + "git", + "clone", + "-q", + web_url.replace(BASE_URL, url), + directory + "/" + repo_local_name, + ] + ) + + if until_date: + commit_id = ( + subprocess.check_output( + [ + "git", + "rev-list", + "-n", + "1", + '--before="' + until_date + '"', + "master", + ], + cwd=directory + "/" + repo_local_name, + ) + .decode("utf-8") + .rstrip() + ) + subprocess.run( + ["git", "checkout", "-q", str(commit_id)], + cwd=directory + "/" + repo_local_name, + ) + print("Checkout at " + str(commit_id) + "\n") else: - print('Syntax error in YAML file.\nAborted.') - exit(1) - headers = {'PRIVATE-TOKEN': args.token} - # check the user id can sucessfully be retrieved from the email address - emails_to_ids(repo['emails'], headers) + print() def command_create_group_repos(args): """ - Combine create_group and create_repository. For each repository listed in - given file, create a repo in group. - """ - validate_yaml(args) - - if args.visibility: - group_id = create_group(args.token, args.group_name, args.visibility) - else: - group_id = create_group(args.token, args.group_name) - print() - - with open(args.repos_file) as f: - repos = yaml.full_load(f) - for repo in repos: - if 'name' in repo: - name = repo['name'] - elif 'emails' in repo: - name = repo['emails'][0].split('@')[0] - else: - print('Syntax error in YAML file.') - delete_group(args.token, group_id) - print('Deleted group.\nAborted.') - exit(1) - create_repository(args.token, group_id, repo['emails'], name, args.import_url, args.expires_at) - print() - - -def command_create_repos(args): - """ - Create a set of repositories inside the specified (existing) group. For each repository listed - in the given file, create a repository. - """ - validate_yaml(args) - - group_id = args.group_id - - with open(args.repos_file) as f: - repos = yaml.full_load(f) - for repo in repos: - if 'name' in repo: - name = repo['name'] - elif 'emails' in repo: - name = repo['emails'][0].split('@')[0] - else: - print('YAML file not correct, exit and delete group') - delete_group(args.token, group_id) - exit(1) - create_repository(args.token, group_id, repo['emails'], name, args.import_url, args.expires_at) - print('created repo:',repo['emails'],name) - print() - - -def command_create_group(args): - """ - Call create_group + For each repository listed in given file, create a group and a repo in group. + Add users in every repo created """ - if args.visibility: - create_group(args.token, args.group_name, args.visibility) - else: - create_group(args.token, args.group_name) - - -def command_create_repository(args): - """ - Call create_repository - """ - if args.name: - name = args.name - else: - name = args.emails.split('@')[0] - create_repository(args.token, args.group_id, args.emails.split( - ','), name, args.import_url, args.expires_at) + gl = Gitlab(args.token) + ds = DataSource.from_yaml(args.repos_file) + group_id = gl.create_group(args.group_name, args.visibility) + for group in ds.groups: + gl.create_repository( + group_id, + group.user_ids, + group.name, + args.import_url, + args.expires_at, + ) + print(f"created repo: {group.name} with the users: {group.user_ids}") + print() def command_clone_all(args): - """ - Call clone_all - """ - if args.forks: - clone_all(args.token, args.id, args.directory, - args.until_date, 'forks', args.use_http) - else: - clone_all(args.token, args.id, args.directory, args.until_date, args.use_http) - - -def command_list(args): - """ - Call get_projects or get_members - """ - if args.members: - members = get_members(args.token, args.id) - if args.show: - if args.show == 'all': - print(json.dumps(members, indent=2)) - elif args.show == 'url': - results = list(map(lambda p: p['web_url'], members)) - for r in results: - print(r) - else: - names = list(map(lambda p: p['username'], members)) - for name in names: - print(name) + gl = Gitlab(args.token) + gl.clone_all(args.id, args.directory, args.until_date) + + +def command_list_projects(args): + gl = Gitlab(args.token) + projects = gl.get_projects_in_group(args.id) + if args.show: + if args.show == "all": + print(json.dumps(projects, indent=2)) + elif args.show == "url": + results = list(map(lambda p: p["http_url_to_repo"], projects)) + for r in results: + print(r) + elif args.show == "ssh": + results = list(map(lambda p: p["ssh_url_to_repo"], projects)) + for r in results: + print(r) else: - names = list(map(lambda p: p['username'], members)) + names = list(map(lambda p: p["name"], projects)) for name in names: print(name) else: - projects = get_projects(args.token, args.id) - if args.show: - if args.show == 'all': - print(json.dumps(projects, indent=2)) - elif args.show == 'url': - results = list(map(lambda p: p['http_url_to_repo'], projects)) - for r in results: - print(r) - elif args.show == 'ssh': - results = list(map(lambda p: p['ssh_url_to_repo'], projects)) - for r in results: - print(r) - else: - names = list(map(lambda p: p['name'], projects)) - for name in names: - print(name) + names = list(map(lambda p: p["name"], projects)) + for name in names: + print(name) + + +def command_list_users(args): + gl = Gitlab(args.token) + members = gl.get_users_in_repository(args.id) + if args.show: + if args.show == "all": + print(json.dumps(members, indent=2)) + elif args.show == "url": + results = list(map(lambda p: p["web_url"], members)) + for r in results: + print(r) else: - names = list(map(lambda p: p['name'], projects)) + names = list(map(lambda p: p["username"], members)) for name in names: print(name) + else: + names = list(map(lambda p: p["username"], members)) + for name in names: + print(name) -if __name__ == '__main__': - parser = argparse.ArgumentParser(description='Practical Work Manager - \ - Manage students PW - Create group, projects or clone repositories') +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Practical Work Manager - \ + Manage students PW - Create group, projects or clone repositories" + ) parser.set_defaults(func=lambda _: parser.print_help()) - parser.add_argument("-t", "--token", metavar="TOKEN", - help="Create a token here: " + TOKEN_URL) + parser.add_argument( + "-t", "--token", metavar="TOKEN", help=f"Create a token here: {TOKEN_URL}. -t is not needed if the env var GITEDU_TOKEN is set." + ) subparsers = parser.add_subparsers( - metavar='(group_repos | group | repo | clone | list)') + metavar="(group_repos | clone | list_project | list_users)" + ) parser_group_repos = subparsers.add_parser( - 'group_repos', help='Create group and repos associated') + "group_repos", help="Create group and repositories from file" + ) parser_group_repos.add_argument( - "group_name", metavar="GROUP_NAME", help="The group name.") + "group_name", metavar="GROUP_NAME", help="The group name." + ) parser_group_repos.add_argument( - "repos_file", metavar="REPOS_FILE", help="YAML file with projects names and/or students emails.") + "repos_file", + metavar="REPOS_FILE", + help="YAML file with projects names and/or students emails.", + ) parser_group_repos.add_argument( - "--visibility", help="Group visibility. By default private.") - parser_group_repos.add_argument("-i", "--import_url", - help="Import the publicly accessible project by URL given here (optional).") - parser_group_repos.add_argument("-x", "--expires_at", - help="Expiration date to kick off students from this project, at 00:00:00. YYYY-MM-DD format (optional).") + "--visibility", help="Group visibility. By default private.", default="private" + ) + parser_group_repos.add_argument( + "-i", + "--import_url", + help="Import the publicly accessible project by URL given here (optional).", + ) + parser_group_repos.add_argument( + "-x", + "--expires_at", + help="Expiration date to kick off students from this project, at 00:00:00. YYYY-MM-DD format (optional).", + ) parser_group_repos.set_defaults(func=command_create_group_repos) - parser_repos = subparsers.add_parser( - 'repos', help='Create repos inside existing group') - parser_repos.add_argument( - "group_id", metavar="GROUP_ID", help="The existing group id.") - parser_repos.add_argument( - "repos_file", metavar="REPOS_FILE", help="YAML file with projects names and/or students emails.") - parser_repos.add_argument("-i", "--import_url", - help="Import the publicly accessible project by URL given here (optional).") - parser_repos.add_argument("-x", "--expires_at", - help="Expiration date to kick off students from this project, at 00:00:00. YYYY-MM-DD format (optional).") - parser_repos.set_defaults(func=command_create_repos) - - parser_group = subparsers.add_parser('group', help='Create gitlab group') - parser_group.add_argument( - "group_name", metavar="GROUP_NAME", help="The group name.") - parser_group.add_argument( - "--visibility", help="Group visibility. By default private.") - parser_group.set_defaults(func=command_create_group) - - parser_repo = subparsers.add_parser('repo', help='Create gitlab project') - parser_repo.add_argument( - "group_id", metavar="GROUP_ID", help="The group id (int) where to store the created new project.") - parser_repo.add_argument( - "emails", metavar="EMAILS", help="Emails list of students working in this project, separated by commas (email1,email2).") - parser_repo.add_argument( - "-n", "--name", help="The project name. If blank, take the first student name (from email) as name.") - parser_repo.add_argument("-i", "--import_url", - help="Import the publicly accessible project by URL given here (optional).") - parser_repo.add_argument("-x", "--expires_at", - help="Expiration date to kick off students from this project, at 00:00:00. YYYY-MM-DD format (optional).") - parser_repo.set_defaults(func=command_create_repository) - - parser_clone = subparsers.add_parser( - 'clone', help='Clone the repositories locally') + parser_clone = subparsers.add_parser("clone", help="Clone the repositories locally") group_clone = parser_clone.add_mutually_exclusive_group() - group_clone.add_argument("-g", "--group", action="store_true", - help="Clone repositories from a group (with group_id) (default behavior).") - group_clone.add_argument("-f", "--forks", action="store_true", - help="Clone forks of a project (with project_id).") + group_clone.add_argument( + "-g", + "--group", + action="store_true", + help="Clone repositories from a group (with group_id) (default behavior).", + ) + parser_clone.add_argument( + "id", + metavar="ID", + help="The group_id (int) of the projects.", + ) parser_clone.add_argument( - "id", metavar="ID", help="The group_id (int) of the projects or the project_id (int) of the forks.") + "directory", + metavar="DIRECTORY", + help="Local directory where clone all repositories.", + ) parser_clone.add_argument( - "directory", metavar="DIRECTORY", help="Local directory where clone all repositories.") + "-u", + "--until_date", + help='Do a git checkout for all repositories at given date, format "YYYY-MM-DD hh:mm" (optional).', + ) parser_clone.add_argument( - "-u", "--until_date", help="Do a git checkout for all repositories at given date, format \"YYYY-MM-DD hh:mm\" (optional).") - parser_clone.add_argument("--use_http", help="Use the HTTP client instead of SSH. False by default.", action='store_true') + "--use_http", + help="Use the HTTP client instead of SSH. False by default.", + action="store_true", + ) parser_clone.set_defaults(func=command_clone_all) parser_list = subparsers.add_parser( - 'list', help="List group's projects or project's members") - group_list = parser_list.add_mutually_exclusive_group() - group_list.add_argument( - "-p", "--projects", action="store_true", help="List group's projects (default") - group_list.add_argument( - "-m", "--members", action="store_true", help="List project's members") + "list_projects", help="List all project in a group" + ) + parser_list.add_argument("id", metavar="ID", help="The group_id (int).") + parser_list.add_argument( + "-s", + "--show", + help="Amount of informations (default name) : [all | name | url | ssh]", + ) + parser_list.set_defaults(func=command_list_projects) + + parser_list = subparsers.add_parser( + "list_users", help="List all users in a repository" + ) parser_list.add_argument( - "id", metavar="ID", help="The group_id or the project_id (int).") + "id", metavar="ID", help="The repository project_id (int)." + ) parser_list.add_argument( - "-s", "--show", help="Amount of informations (default name) : [all | name | url | ssh]") - parser_list.set_defaults(func=command_list) + "-s", + "--show", + help="Amount of informations (default name) : [all | name | url | ssh]", + ) + parser_list.set_defaults(func=command_list_users) args = parser.parse_args() if not args.token: - home = os.environ.get('HOME') - if home: - token_file = home + '/.gitedu_token' - if os.path.isfile(token_file): - with open(token_file) as file: - args.token = file.read().strip() - elif os.environ.get('GITEDU_TOKEN'): - args.token = os.environ.get('GITEDU_TOKEN') + if os.environ.get("GITEDU_TOKEN"): + args.token = os.environ.get("GITEDU_TOKEN") else: - print('Error: you must give a valid api token. Create a token here: ' + TOKEN_URL) + print( + "Error: you must give a valid api token. Create a token here: " + + TOKEN_URL + ) exit(1) args.func(args) -- GitLab