Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
P
practical-work-manager
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
ISC
projects
practical-work-manager
Commits
fd0fc0ea
Commit
fd0fc0ea
authored
1 month ago
by
Adrien Lescourt
Browse files
Options
Downloads
Patches
Plain Diff
Refactoring
parent
1eaa9f09
No related branches found
No related tags found
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
pwm
+411
-432
411 additions, 432 deletions
pwm
with
411 additions
and
432 deletions
pwm
+
411
−
432
View file @
fd0fc0ea
...
@@ -2,508 +2,487 @@
...
@@ -2,508 +2,487 @@
"""
"""
Practical Work Manager (pwm)
Practical Work Manager (pwm)
Steven Liatti, Florent Gluck
Steven Liatti, Florent Gluck
, Adrien Lescourt
2020-202
2
2020-202
5
"""
"""
from
argpar
se
import
Namespace
from
dataclas
se
s
import
dataclass
import
os
from
requests.models
import
Response
from
typing
import
Any
,
Dict
,
List
,
Optional
from
typing
import
Any
,
Dict
,
List
,
Optional
import
argparse
import
json
import
os
import
requests
import
requests
import
subprocess
import
subprocess
import
argparse
from
requests.models
import
Response
import
yaml
import
yaml
import
json
BASE_URL
:
str
=
'
https://gitedu.hesge.ch
'
BASE_API_URL
:
str
=
BASE_URL
+
'
/api/v4
'
TOKEN_URL
:
str
=
BASE_URL
+
'
/profile/personal_access_tokens
'
def
create_group
(
token
:
str
,
name
:
str
,
visibility
:
str
=
'
private
'
)
->
str
:
"""
Create gitlab group from name and visibility given. Need valid api token.
Return group_id created.
"""
params
=
{
'
path
'
:
name
,
'
name
'
:
name
,
'
visibility
'
:
visibility
}
headers
=
{
'
PRIVATE-TOKEN
'
:
token
}
group
=
requests
.
post
(
BASE_API_URL
+
'
/groups
'
,
BASE_URL
:
str
=
"
https://gitedu.hesge.ch
"
params
=
params
,
headers
=
headers
).
json
()
BASE_API_URL
:
str
=
BASE_URL
+
"
/api/v4
"
if
'
message
'
in
group
:
TOKEN_URL
:
str
=
BASE_URL
+
"
/-/user_settings/personal_access_tokens
"
print
(
'
Error in creating group: %s
'
%
group
)
exit
(
1
)
print
(
"
Group
'"
+
group
[
'
name
'
]
+
"'
with id
'"
+
str
(
group
[
'
id
'
])
+
"'
and visibility
'"
+
group
[
'
visibility
'
]
+
"'
available at
'"
+
group
[
'
web_url
'
]
+
"'"
)
return
str
(
group
[
'
id
'
])
@dataclass
class
User
:
def
__init__
(
self
,
user_id
:
str
)
->
None
:
self
.
id
=
user_id
def
delete_group
(
token
:
str
,
group_id
:
str
):
"""
Delete a group and all subprojects.
"""
headers
=
{
'
PRIVATE-TOKEN
'
:
token
}
deleted_group
=
requests
.
delete
(
BASE_API_URL
+
'
/groups/
'
+
group_id
,
headers
=
headers
).
json
()
if
'
message
'
in
deleted_group
:
if
deleted_group
[
'
message
'
]
!=
'
202 Accepted
'
:
print
(
'
Error in deleting group: %s
'
%
deleted_group
)
exit
(
1
)
else
:
print
(
'
Group
'
+
group_id
+
'
successfully deleted
'
)
@dataclass
class
Group
:
def
__init__
(
self
,
name
:
str
,
user_ids
:
List
[
User
])
->
None
:
self
.
name
=
name
self
.
user_ids
=
user_ids
def
emails_to_ids
(
emails
:
List
[
str
],
headers
:
Dict
[
str
,
str
])
->
List
[
int
]:
"""
Get students ids from their emails
"""
user_ids
=
[]
for
email
in
emails
:
# Dirty and hackish way that attempts to extract the username from the email.
# It's inefficient, but currently there is no way to reliably obtain
# the username from the email.
# new strategy: - stop when we reach the "." or when we get more than 1 result
# Example of tricky users:
# pierre-louis.roden@etu.hesge.ch -> login AAI: pierrelo.roden
# joel.ferreirapinto@etu.hesge.ch -> login AAI: joelfili.ferreira
username
=
email
.
split
(
"
@
"
)[
0
]
#print("Email: ",email)
while
len
(
username
)
>
1
:
#print("Guessed username: ",username)
user_requested
=
requests
.
get
(
BASE_API_URL
+
'
/users
'
,
params
=
{
'
search
'
:
username
},
headers
=
headers
).
json
()
nb_users
=
len
(
user_requested
)
if
nb_users
==
0
:
#print('No user %s found, another try...' % email)
lastchar
=
username
[
-
1
]
username
=
username
.
rstrip
(
lastchar
)
if
username
[
-
1
]
==
'
.
'
:
# stop if we reach the . -> probably at least one wrong person match the first name
break
continue
elif
nb_users
>
1
:
print
(
'
Too many users found for email %s, aborting.
'
%
email
)
exit
(
1
)
#print(json.dumps(user_requested, indent=4))
@dataclass
user_ids
.
append
(
user_requested
[
0
][
'
id
'
])
class
DataSource
:
return
user_ids
def
__init__
(
self
,
groups
:
List
[
Group
])
->
None
:
self
.
groups
=
groups
print
(
'
User %s not found, aborting.
'
%
email
)
@classmethod
exit
(
1
)
def
from_yaml
(
cls
,
yaml_file_path
:
str
)
->
"
DataSource
"
:
if
not
cls
.
_validate_yaml
(
yaml_file_path
):
print
(
"
Syntax error in YAML file.
\n
Aborted.
"
)
exit
(
1
)
groups
=
[]
repos
=
yaml
.
full_load
(
yaml_file_path
)
for
repo
in
repos
:
if
"
name
"
in
repo
:
name
=
repo
[
"
name
"
]
else
:
name
=
repo
[
"
users_login_aai
"
]
groups
.
append
(
Group
(
name
,
repo
[
"
users
"
]))
return
cls
(
groups
)
@staticmethod
def
_validate_yaml
(
yaml_file_path
)
->
bool
:
with
open
(
args
.
repos_file
)
as
f
:
repos
=
yaml
.
full_load
(
f
)
for
repo
in
repos
:
if
"
name
"
in
repo
:
pass
elif
"
users_login_aai
"
in
repo
:
pass
else
:
return
False
return
True
class
Gitlab
:
def
__init__
(
self
,
token
:
str
):
self
.
token
=
token
def
create_group
(
self
,
group
:
Group
,
visibility
:
str
=
"
private
"
)
->
str
:
"""
Create gitlab group, Returns group_id created.
"""
params
=
{
"
path
"
:
group
.
name
,
"
name
"
:
group
.
name
,
"
visibility
"
:
visibility
}
headers
=
{
"
PRIVATE-TOKEN
"
:
self
.
token
}
group_res
=
requests
.
post
(
BASE_API_URL
+
"
/group_ress
"
,
params
=
params
,
headers
=
headers
).
json
()
if
"
message
"
in
group_res
:
print
(
"
Error in creating group_res: %s
"
%
group_res
)
exit
(
1
)
def
create_repository
(
token
:
str
,
group_id
:
str
,
emails
:
List
[
str
],
name
:
str
,
import_url
:
Optional
[
str
],
expires_at
:
Optional
[
str
]):
print
(
"""
"
group_res
'"
Create repository in group_id, with members from emails, a name, and
+
group_res
[
"
name
"
]
optional import_url and expiration date.
+
"'
with id
'"
"""
+
str
(
group_res
[
"
id
"
])
headers
=
{
'
PRIVATE-TOKEN
'
:
token
}
+
"'
and visibility
'"
+
group_res
[
"
visibility
"
]
# Create project from name, import_url (if given) and group_id
+
"'
available at
'"
params
=
{
'
name
'
:
name
,
'
namespace_id
'
:
group_id
,
'
visibility
'
:
'
private
'
}
+
group_res
[
"
web_url
"
]
if
import_url
:
+
"'"
params
[
'
import_url
'
]
=
import_url
)
project
=
requests
.
post
(
BASE_API_URL
+
'
/projects
'
,
return
str
(
group_res
[
"
id
"
])
params
=
params
,
headers
=
headers
).
json
()
if
'
message
'
in
project
:
def
delete_group
(
self
,
group_id
:
str
):
print
(
'
Error in creating project: %s
'
%
project
)
"""
Delete a group and all subprojects.
"""
exit
(
1
)
headers
=
{
"
PRIVATE-TOKEN
"
:
self
.
token
}
print
(
"
Project
'"
+
project
[
'
name
'
]
+
"'
at
'"
+
deleted_group
=
requests
.
delete
(
project
[
'
web_url
'
]
+
"'
created
"
)
BASE_API_URL
+
"
/groups/
"
+
group_id
,
headers
=
headers
).
json
()
# Allow users with developer access level to push and merge on master
if
"
message
"
in
deleted_group
:
access_level
=
40
if
deleted_group
[
"
message
"
]
!=
"
202 Accepted
"
:
params
=
{
'
name
'
:
'
master
'
,
'
push_access_level
'
:
str
(
print
(
"
Error in deleting group: %s
"
%
deleted_group
)
access_level
),
'
merge_access_level
'
:
str
(
access_level
)}
exit
(
1
)
requests
.
post
(
BASE_API_URL
+
'
/projects/
'
+
else
:
str
(
project
[
'
id
'
])
+
'
/protected_branches
'
,
params
=
params
,
headers
=
headers
).
json
()
print
(
"
Group
"
+
group_id
+
"
successfully deleted
"
)
# Get students ids from their emails
def
create_repository
(
user_ids
=
emails_to_ids
(
emails
,
headers
)
self
,
group_id
:
str
,
# Add each student as maintainer (level 40)
users
:
List
[
User
],
for
user_id
in
user_ids
:
name
:
str
,
params
=
{
'
user_id
'
:
user_id
,
'
access_level
'
:
access_level
}
import_url
:
Optional
[
str
],
if
expires_at
:
expires_at
:
Optional
[
str
],
params
[
'
expires_at
'
]
=
expires_at
):
new_user
=
requests
.
post
(
BASE_API_URL
+
'
/projects/
'
+
str
(
"""
project
[
'
id
'
])
+
'
/members
'
,
params
=
params
,
headers
=
headers
).
json
()
Create repository in group_id, with members from user_ids, a name, and
if
'
message
'
in
new_user
:
optional import_url and expiration date.
print
(
'
Error in adding user: %s
'
%
new_user
)
"""
else
:
headers
=
{
"
PRIVATE-TOKEN
"
:
self
.
token
}
out
=
(
"
Adding
'"
+
new_user
[
'
name
'
]
+
"'
(
"
+
new_user
[
'
username
'
]
+
"
) in
'"
+
project
[
'
name
'
]
+
"'
with access level:
"
+
str
(
new_user
[
'
access_level
'
]))
# Create project from name, import_url (if given) and group_id
params
=
{
"
name
"
:
name
,
"
namespace_id
"
:
group_id
,
"
visibility
"
:
"
private
"
}
if
import_url
:
params
[
"
import_url
"
]
=
import_url
project
=
requests
.
post
(
BASE_API_URL
+
"
/projects
"
,
params
=
params
,
headers
=
headers
).
json
()
if
"
message
"
in
project
:
print
(
"
Error in creating project: %s
"
%
project
)
exit
(
1
)
print
(
"
Project
'"
+
project
[
"
name
"
]
+
"'
at
'"
+
project
[
"
web_url
"
]
+
"'
created
"
)
# Allow users with developer access level to push and merge on master
access_level
=
40
params
=
{
"
name
"
:
"
master
"
,
"
push_access_level
"
:
str
(
access_level
),
"
merge_access_level
"
:
str
(
access_level
),
}
requests
.
post
(
BASE_API_URL
+
"
/projects/
"
+
str
(
project
[
"
id
"
])
+
"
/protected_branches
"
,
params
=
params
,
headers
=
headers
,
).
json
()
# Add each student as maintainer (level 40)
for
user
in
users
:
params
=
{
"
user_id
"
:
user
.
id
,
"
access_level
"
:
access_level
}
if
expires_at
:
if
expires_at
:
out
+=
"
, expires at:
"
+
new_user
[
'
expires_at
'
]
params
[
"
expires_at
"
]
=
expires_at
print
(
out
)
new_user
=
requests
.
post
(
BASE_API_URL
+
"
/projects/
"
+
str
(
project
[
"
id
"
])
+
"
/members
"
,
params
=
params
,
def
paginate_responses
(
url
:
str
,
headers
:
Dict
[
str
,
str
],
params
:
Dict
[
str
,
Any
])
->
List
[
Response
]:
headers
=
headers
,
"""
).
json
()
Manage gitlab pagination, max 100 results by request
if
"
message
"
in
new_user
:
"""
print
(
"
Error in adding user: %s
"
%
new_user
)
responses
=
[
requests
.
get
(
url
,
params
=
params
,
headers
=
headers
)]
else
:
last_response
=
responses
[
len
(
responses
)
-
1
]
out
=
(
"
Adding
'"
while
last_response
.
status_code
==
200
and
len
(
last_response
.
headers
[
'
X-Next-Page
'
])
!=
0
:
+
new_user
[
"
name
"
]
next_page
=
last_response
.
headers
[
'
X-Next-Page
'
]
+
"'
(
"
params
[
'
page
'
]
=
next_page
+
new_user
[
"
username
"
]
responses
.
append
(
requests
.
get
(
url
,
params
=
params
,
headers
=
headers
))
+
"
) in
'"
+
project
[
"
name
"
]
+
"'
with access level:
"
+
str
(
new_user
[
"
access_level
"
])
)
if
expires_at
:
out
+=
"
, expires at:
"
+
new_user
[
"
expires_at
"
]
print
(
out
)
@staticmethod
def
_paginate_responses
(
url
:
str
,
headers
:
Dict
[
str
,
str
],
params
:
Dict
[
str
,
Any
]
)
->
List
[
Response
]:
"""
Manage gitlab pagination, max 100 results by request
"""
responses
=
[
requests
.
get
(
url
,
params
=
params
,
headers
=
headers
)]
last_response
=
responses
[
len
(
responses
)
-
1
]
last_response
=
responses
[
len
(
responses
)
-
1
]
return
responses
if
last_response
.
status_code
!=
200
:
print
(
last_response
.
text
)
return
[]
def
get_members
(
token
:
str
,
id
:
str
)
->
List
:
"""
while
(
Return members list from given id
last_response
.
status_code
==
200
"""
and
len
(
last_response
.
headers
[
"
X-Next-Page
"
])
!=
0
url
=
BASE_API_URL
+
'
/projects/
'
+
id
+
'
/members
'
):
next_page
=
last_response
.
headers
[
"
X-Next-Page
"
]
headers
=
{
'
PRIVATE-TOKEN
'
:
token
}
params
[
"
page
"
]
=
next_page
params
=
{
'
simple
'
:
'
true
'
,
'
order_by
'
:
'
name
'
,
responses
.
append
(
requests
.
get
(
url
,
params
=
params
,
headers
=
headers
))
'
sort
'
:
'
asc
'
,
'
per_page
'
:
100
}
last_response
=
responses
[
len
(
responses
)
-
1
]
responses
=
paginate_responses
(
url
,
headers
,
params
)
return
responses
members
=
[]
for
r
in
responses
:
def
get_users_in_repository
(
self
,
id
:
str
)
->
List
:
members
+=
r
.
json
()
"""
return
members
Return members list from given id
"""
# BUG: does not work if the repo members are inherited from a group
def
get_projects
(
token
:
str
,
id
:
str
,
source
:
str
=
'
group
'
)
->
List
:
url
=
BASE_API_URL
+
"
/projects/
"
+
id
+
"
/members
"
"""
Return projects list from given id and source (
'
group
'
or
'
forks
'
)
headers
=
{
"
PRIVATE-TOKEN
"
:
self
.
token
}
"""
params
=
{
"
simple
"
:
"
true
"
,
"
order_by
"
:
"
name
"
,
"
sort
"
:
"
asc
"
,
"
per_page
"
:
100
}
if
source
==
'
forks
'
:
responses
=
self
.
_paginate_responses
(
url
,
headers
,
params
)
url
=
BASE_API_URL
+
'
/projects/
'
+
id
+
'
/forks
'
else
:
members
=
[]
url
=
BASE_API_URL
+
'
/groups/
'
+
id
+
'
/projects
'
for
r
in
responses
:
members
+=
r
.
json
()
headers
=
{
'
PRIVATE-TOKEN
'
:
token
}
return
members
params
=
{
'
simple
'
:
'
true
'
,
'
order_by
'
:
'
name
'
,
'
sort
'
:
'
asc
'
,
'
per_page
'
:
100
}
def
get_projects_in_group
(
self
,
id
:
str
)
->
List
:
responses
=
paginate_responses
(
url
,
headers
,
params
)
"""
Return projects list from given group id
projects
=
[]
"""
for
r
in
responses
:
url
=
BASE_API_URL
+
"
/groups/
"
+
id
+
"
/projects
"
projects
+=
r
.
json
()
headers
=
{
"
PRIVATE-TOKEN
"
:
self
.
token
}
return
projects
params
=
{
"
simple
"
:
"
true
"
,
"
order_by
"
:
"
name
"
,
"
sort
"
:
"
asc
"
,
"
per_page
"
:
100
}
responses
=
self
.
_paginate_responses
(
url
,
headers
,
params
)
projects
=
[]
for
r
in
responses
:
projects
+=
r
.
json
()
return
projects
def
clone_all
(
self
,
id
:
str
,
directory
:
str
,
until_date
:
Optional
[
str
]):
"""
Clone all repositories from a group id in directory (created in function).
"""
try
:
os
.
mkdir
(
directory
)
except
OSError
:
print
(
"
Creation of the directory
'
%s
'
failed, exit
\n
"
%
directory
)
exit
(
1
)
headers
=
{
"
PRIVATE-TOKEN
"
:
self
.
token
}
repositories
=
self
.
get_projects_in_group
(
id
)
def
clone_all
(
token
:
str
,
id
:
str
,
directory
:
str
,
until_date
:
Optional
[
str
],
source
:
str
=
'
group
'
,
use_http
:
bool
=
True
):
for
repo
in
repositories
:
"""
repo_url
=
BASE_API_URL
+
"
/projects/
"
+
str
(
repo
[
"
id
"
])
+
"
/members
"
Clone all repositories (from a group or
"
forks of
"
) from id (group or
members
=
requests
.
get
(
repo_url
,
headers
=
headers
).
json
()
project id) in directory (created in function).
if
"
message
"
in
members
:
"""
print
(
"
Error retrieving members:
"
+
members
[
"
message
"
])
try
:
exit
(
1
)
os
.
mkdir
(
directory
)
except
OSError
:
print
(
"
Creation of the directory
'
%s
'
failed, exit
\n
"
%
directory
)
exit
(
1
)
headers
=
{
'
PRIVATE-TOKEN
'
:
token
}
repositories
=
get_projects
(
token
,
id
,
source
)
for
repo
in
repositories
:
repo_url
=
BASE_API_URL
+
'
/projects/
'
+
str
(
repo
[
'
id
'
])
+
'
/members
'
members
=
requests
.
get
(
repo_url
,
headers
=
headers
).
json
()
if
'
message
'
in
members
:
print
(
'
Error retrieving members:
'
+
members
[
'
message
'
])
exit
(
1
)
ssh_url_to_repo
=
repo
[
'
ssh_url_to_repo
'
]
ssh_url_to_repo
=
repo
[
"
ssh_url_to_repo
"
]
web_url
=
repo
[
'
web_url
'
]
web_url
=
repo
[
"
web_url
"
]
members_names
=
''
members_names
=
""
for
member
in
members
:
for
member
in
members
:
if
member
[
'
access_level
'
]
>
20
:
# Access level greater than "Reporter"
if
member
[
"
access_level
"
]
>
20
:
# Access level greater than "Reporter"
members_names
+=
member
[
'
username
'
]
+
'
,
'
members_names
+=
member
[
"
username
"
]
+
"
,
"
if
source
==
'
forks
'
:
repo_local_name
=
repo
[
"
path
"
]
repo_local_name
=
repo
[
'
namespace
'
][
'
path
'
]
else
:
repo_local_name
=
repo
[
'
path
'
]
print
(
'
Members:
'
+
members_names
)
print
(
"
Members:
"
+
members_names
)
print
(
'
Web url:
'
+
web_url
)
print
(
"
Web url:
"
+
web_url
)
print
(
'
Cloning in
"'
+
directory
+
'
/
'
+
repo_local_name
+
'"'
)
print
(
'
Cloning in
"'
+
directory
+
"
/
"
+
repo_local_name
+
'"'
)
if
use_http
:
scheme
=
"
https://
"
scheme
=
"
https://
"
after_https
=
BASE_URL
.
find
(
scheme
)
+
len
(
scheme
)
after_https
=
BASE_URL
.
find
(
scheme
)
+
len
(
scheme
)
url
=
BASE_URL
[:
after_https
]
+
"
{}:{}@
"
.
format
(
"
gitlab-ci-token
"
,
token
)
+
BASE_URL
[
after_https
:]
url
=
(
subprocess
.
run
([
"
git
"
,
"
clone
"
,
"
-q
"
,
web_url
.
replace
(
BASE_URL
,
url
),
BASE_URL
[:
after_https
]
directory
+
'
/
'
+
repo_local_name
])
+
"
{}:{}@
"
.
format
(
"
gitlab-ci-token
"
,
self
.
token
)
else
:
+
BASE_URL
[
after_https
:]
subprocess
.
run
([
"
git
"
,
"
clone
"
,
"
-q
"
,
ssh_url_to_repo
,
)
directory
+
'
/
'
+
repo_local_name
])
if
until_date
:
commit_id
=
subprocess
.
check_output
([
"
git
"
,
"
rev-list
"
,
"
-n
"
,
"
1
"
,
"
--before=
\"
"
+
until_date
+
"
\"
"
,
"
master
"
],
cwd
=
directory
+
'
/
'
+
repo_local_name
).
decode
(
'
utf-8
'
).
rstrip
()
subprocess
.
run
(
subprocess
.
run
(
[
"
git
"
,
"
checkout
"
,
"
-q
"
,
str
(
commit_id
)],
[
cwd
=
directory
+
'
/
'
+
repo_local_name
)
"
git
"
,
print
(
"
Checkout at
"
+
str
(
commit_id
)
+
"
\n
"
)
"
clone
"
,
else
:
"
-q
"
,
print
()
web_url
.
replace
(
BASE_URL
,
url
),
directory
+
"
/
"
+
repo_local_name
,
]
def
validate_yaml
(
args
):
)
"""
Verify that the yaml file is valid by checking:
if
until_date
:
- the yaml syntax
commit_id
=
(
- that a user id can be retrieved from the email (through some hackish inference)
subprocess
.
check_output
(
This function stops the program if a check fails.
[
"""
"
git
"
,
with
open
(
args
.
repos_file
)
as
f
:
"
rev-list
"
,
repos
=
yaml
.
full_load
(
f
)
"
-n
"
,
for
repo
in
repos
:
"
1
"
,
if
'
name
'
in
repo
:
'
--before=
"'
+
until_date
+
'"'
,
name
=
repo
[
'
name
'
]
"
master
"
,
elif
'
emails
'
in
repo
:
],
name
=
repo
[
'
emails
'
][
0
].
split
(
'
@
'
)[
0
]
cwd
=
directory
+
"
/
"
+
repo_local_name
,
)
.
decode
(
"
utf-8
"
)
.
rstrip
()
)
subprocess
.
run
(
[
"
git
"
,
"
checkout
"
,
"
-q
"
,
str
(
commit_id
)],
cwd
=
directory
+
"
/
"
+
repo_local_name
,
)
print
(
"
Checkout at
"
+
str
(
commit_id
)
+
"
\n
"
)
else
:
else
:
print
(
'
Syntax error in YAML file.
\n
Aborted.
'
)
print
()
exit
(
1
)
headers
=
{
'
PRIVATE-TOKEN
'
:
args
.
token
}
# check the user id can sucessfully be retrieved from the email address
emails_to_ids
(
repo
[
'
emails
'
],
headers
)
def
command_create_group_repos
(
args
):
def
command_create_group_repos
(
args
):
"""
"""
Combine create_group and create_repository. For each repository listed in
For each repository listed in given file, create a group and a repo in group.
given file, create a repo in group.
Add users in every repo created
"""
validate_yaml
(
args
)
if
args
.
visibility
:
group_id
=
create_group
(
args
.
token
,
args
.
group_name
,
args
.
visibility
)
else
:
group_id
=
create_group
(
args
.
token
,
args
.
group_name
)
print
()
with
open
(
args
.
repos_file
)
as
f
:
repos
=
yaml
.
full_load
(
f
)
for
repo
in
repos
:
if
'
name
'
in
repo
:
name
=
repo
[
'
name
'
]
elif
'
emails
'
in
repo
:
name
=
repo
[
'
emails
'
][
0
].
split
(
'
@
'
)[
0
]
else
:
print
(
'
Syntax error in YAML file.
'
)
delete_group
(
args
.
token
,
group_id
)
print
(
'
Deleted group.
\n
Aborted.
'
)
exit
(
1
)
create_repository
(
args
.
token
,
group_id
,
repo
[
'
emails
'
],
name
,
args
.
import_url
,
args
.
expires_at
)
print
()
def
command_create_repos
(
args
):
"""
Create a set of repositories inside the specified (existing) group. For each repository listed
in the given file, create a repository.
"""
validate_yaml
(
args
)
group_id
=
args
.
group_id
with
open
(
args
.
repos_file
)
as
f
:
repos
=
yaml
.
full_load
(
f
)
for
repo
in
repos
:
if
'
name
'
in
repo
:
name
=
repo
[
'
name
'
]
elif
'
emails
'
in
repo
:
name
=
repo
[
'
emails
'
][
0
].
split
(
'
@
'
)[
0
]
else
:
print
(
'
YAML file not correct, exit and delete group
'
)
delete_group
(
args
.
token
,
group_id
)
exit
(
1
)
create_repository
(
args
.
token
,
group_id
,
repo
[
'
emails
'
],
name
,
args
.
import_url
,
args
.
expires_at
)
print
(
'
created repo:
'
,
repo
[
'
emails
'
],
name
)
print
()
def
command_create_group
(
args
):
"""
Call create_group
"""
"""
if
args
.
visibility
:
gl
=
Gitlab
(
args
.
token
)
create_group
(
args
.
token
,
args
.
group_name
,
args
.
visibility
)
ds
=
DataSource
.
from_yaml
(
args
.
repos_file
)
else
:
group_id
=
gl
.
create_group
(
args
.
group_name
,
args
.
visibility
)
create_group
(
args
.
token
,
args
.
group_name
)
for
group
in
ds
.
groups
:
gl
.
create_repository
(
group_id
,
def
command_create_repository
(
args
):
group
.
user_ids
,
"""
group
.
name
,
Call create_repository
args
.
import_url
,
"""
args
.
expires_at
,
if
args
.
name
:
)
name
=
args
.
name
print
(
f
"
created repo:
{
group
.
name
}
with the users:
{
group
.
user_ids
}
"
)
else
:
print
()
name
=
args
.
emails
.
split
(
'
@
'
)[
0
]
create_repository
(
args
.
token
,
args
.
group_id
,
args
.
emails
.
split
(
'
,
'
),
name
,
args
.
import_url
,
args
.
expires_at
)
def
command_clone_all
(
args
):
def
command_clone_all
(
args
):
"""
gl
=
Gitlab
(
args
.
token
)
Call clone_all
gl
.
clone_all
(
args
.
id
,
args
.
directory
,
args
.
until_date
)
"""
if
args
.
forks
:
clone_all
(
args
.
token
,
args
.
id
,
args
.
directory
,
def
command_list_projects
(
args
):
args
.
until_date
,
'
forks
'
,
args
.
use_http
)
gl
=
Gitlab
(
args
.
token
)
else
:
projects
=
gl
.
get_projects_in_group
(
args
.
id
)
clone_all
(
args
.
token
,
args
.
id
,
args
.
directory
,
args
.
until_date
,
args
.
use_http
)
if
args
.
show
:
if
args
.
show
==
"
all
"
:
print
(
json
.
dumps
(
projects
,
indent
=
2
))
def
command_list
(
args
):
elif
args
.
show
==
"
url
"
:
"""
results
=
list
(
map
(
lambda
p
:
p
[
"
http_url_to_repo
"
],
projects
))
Call get_projects or get_members
for
r
in
results
:
"""
print
(
r
)
if
args
.
members
:
elif
args
.
show
==
"
ssh
"
:
members
=
get_members
(
args
.
token
,
args
.
id
)
results
=
list
(
map
(
lambda
p
:
p
[
"
ssh_url_to_repo
"
],
projects
))
if
args
.
show
:
for
r
in
results
:
if
args
.
show
==
'
all
'
:
print
(
r
)
print
(
json
.
dumps
(
members
,
indent
=
2
))
elif
args
.
show
==
'
url
'
:
results
=
list
(
map
(
lambda
p
:
p
[
'
web_url
'
],
members
))
for
r
in
results
:
print
(
r
)
else
:
names
=
list
(
map
(
lambda
p
:
p
[
'
username
'
],
members
))
for
name
in
names
:
print
(
name
)
else
:
else
:
names
=
list
(
map
(
lambda
p
:
p
[
'
user
name
'
],
member
s
))
names
=
list
(
map
(
lambda
p
:
p
[
"
name
"
],
project
s
))
for
name
in
names
:
for
name
in
names
:
print
(
name
)
print
(
name
)
else
:
else
:
projects
=
get_projects
(
args
.
token
,
args
.
id
)
names
=
list
(
map
(
lambda
p
:
p
[
"
name
"
],
projects
))
if
args
.
show
:
for
name
in
names
:
if
args
.
show
==
'
all
'
:
print
(
name
)
print
(
json
.
dumps
(
projects
,
indent
=
2
))
elif
args
.
show
==
'
url
'
:
results
=
list
(
map
(
lambda
p
:
p
[
'
http_url_to_repo
'
],
projects
))
def
command_list_users
(
args
):
for
r
in
results
:
gl
=
Gitlab
(
args
.
token
)
print
(
r
)
members
=
gl
.
get_users_in_repository
(
args
.
id
)
elif
args
.
show
==
'
ssh
'
:
if
args
.
show
:
results
=
list
(
map
(
lambda
p
:
p
[
'
ssh_url_to_repo
'
],
projects
))
if
args
.
show
==
"
all
"
:
for
r
in
results
:
print
(
json
.
dumps
(
members
,
indent
=
2
))
print
(
r
)
elif
args
.
show
==
"
url
"
:
else
:
results
=
list
(
map
(
lambda
p
:
p
[
"
web_url
"
],
members
))
names
=
list
(
map
(
lambda
p
:
p
[
'
name
'
],
projects
))
for
r
in
results
:
for
name
in
names
:
print
(
r
)
print
(
name
)
else
:
else
:
names
=
list
(
map
(
lambda
p
:
p
[
'
name
'
],
project
s
))
names
=
list
(
map
(
lambda
p
:
p
[
"
user
name
"
],
member
s
))
for
name
in
names
:
for
name
in
names
:
print
(
name
)
print
(
name
)
else
:
names
=
list
(
map
(
lambda
p
:
p
[
"
username
"
],
members
))
for
name
in
names
:
print
(
name
)
if
__name__
==
'
__main__
'
:
if
__name__
==
"
__main__
"
:
parser
=
argparse
.
ArgumentParser
(
description
=
'
Practical Work Manager -
\
parser
=
argparse
.
ArgumentParser
(
Manage students PW - Create group, projects or clone repositories
'
)
description
=
"
Practical Work Manager -
\
Manage students PW - Create group, projects or clone repositories
"
)
parser
.
set_defaults
(
func
=
lambda
_
:
parser
.
print_help
())
parser
.
set_defaults
(
func
=
lambda
_
:
parser
.
print_help
())
parser
.
add_argument
(
"
-t
"
,
"
--token
"
,
metavar
=
"
TOKEN
"
,
parser
.
add_argument
(
help
=
"
Create a token here:
"
+
TOKEN_URL
)
"
-t
"
,
"
--token
"
,
metavar
=
"
TOKEN
"
,
help
=
f
"
Create a token here:
{
TOKEN_URL
}
. -t is not needed if the env var GITEDU_TOKEN is set.
"
)
subparsers
=
parser
.
add_subparsers
(
subparsers
=
parser
.
add_subparsers
(
metavar
=
'
(group_repos | group | repo | clone | list)
'
)
metavar
=
"
(group_repos | clone | list_project | list_users)
"
)
parser_group_repos
=
subparsers
.
add_parser
(
parser_group_repos
=
subparsers
.
add_parser
(
'
group_repos
'
,
help
=
'
Create group and repos associated
'
)
"
group_repos
"
,
help
=
"
Create group and repositories from file
"
)
parser_group_repos
.
add_argument
(
parser_group_repos
.
add_argument
(
"
group_name
"
,
metavar
=
"
GROUP_NAME
"
,
help
=
"
The group name.
"
)
"
group_name
"
,
metavar
=
"
GROUP_NAME
"
,
help
=
"
The group name.
"
)
parser_group_repos
.
add_argument
(
parser_group_repos
.
add_argument
(
"
repos_file
"
,
metavar
=
"
REPOS_FILE
"
,
help
=
"
YAML file with projects names and/or students emails.
"
)
"
repos_file
"
,
metavar
=
"
REPOS_FILE
"
,
help
=
"
YAML file with projects names and/or students emails.
"
,
)
parser_group_repos
.
add_argument
(
parser_group_repos
.
add_argument
(
"
--visibility
"
,
help
=
"
Group visibility. By default private.
"
)
"
--visibility
"
,
help
=
"
Group visibility. By default private.
"
,
default
=
"
private
"
parser_group_repos
.
add_argument
(
"
-i
"
,
"
--import_url
"
,
)
help
=
"
Import the publicly accessible project by URL given here (optional).
"
)
parser_group_repos
.
add_argument
(
parser_group_repos
.
add_argument
(
"
-x
"
,
"
--expires_at
"
,
"
-i
"
,
help
=
"
Expiration date to kick off students from this project, at 00:00:00. YYYY-MM-DD format (optional).
"
)
"
--import_url
"
,
help
=
"
Import the publicly accessible project by URL given here (optional).
"
,
)
parser_group_repos
.
add_argument
(
"
-x
"
,
"
--expires_at
"
,
help
=
"
Expiration date to kick off students from this project, at 00:00:00. YYYY-MM-DD format (optional).
"
,
)
parser_group_repos
.
set_defaults
(
func
=
command_create_group_repos
)
parser_group_repos
.
set_defaults
(
func
=
command_create_group_repos
)
parser_repos
=
subparsers
.
add_parser
(
parser_clone
=
subparsers
.
add_parser
(
"
clone
"
,
help
=
"
Clone the repositories locally
"
)
'
repos
'
,
help
=
'
Create repos inside existing group
'
)
parser_repos
.
add_argument
(
"
group_id
"
,
metavar
=
"
GROUP_ID
"
,
help
=
"
The existing group id.
"
)
parser_repos
.
add_argument
(
"
repos_file
"
,
metavar
=
"
REPOS_FILE
"
,
help
=
"
YAML file with projects names and/or students emails.
"
)
parser_repos
.
add_argument
(
"
-i
"
,
"
--import_url
"
,
help
=
"
Import the publicly accessible project by URL given here (optional).
"
)
parser_repos
.
add_argument
(
"
-x
"
,
"
--expires_at
"
,
help
=
"
Expiration date to kick off students from this project, at 00:00:00. YYYY-MM-DD format (optional).
"
)
parser_repos
.
set_defaults
(
func
=
command_create_repos
)
parser_group
=
subparsers
.
add_parser
(
'
group
'
,
help
=
'
Create gitlab group
'
)
parser_group
.
add_argument
(
"
group_name
"
,
metavar
=
"
GROUP_NAME
"
,
help
=
"
The group name.
"
)
parser_group
.
add_argument
(
"
--visibility
"
,
help
=
"
Group visibility. By default private.
"
)
parser_group
.
set_defaults
(
func
=
command_create_group
)
parser_repo
=
subparsers
.
add_parser
(
'
repo
'
,
help
=
'
Create gitlab project
'
)
parser_repo
.
add_argument
(
"
group_id
"
,
metavar
=
"
GROUP_ID
"
,
help
=
"
The group id (int) where to store the created new project.
"
)
parser_repo
.
add_argument
(
"
emails
"
,
metavar
=
"
EMAILS
"
,
help
=
"
Emails list of students working in this project, separated by commas (email1,email2).
"
)
parser_repo
.
add_argument
(
"
-n
"
,
"
--name
"
,
help
=
"
The project name. If blank, take the first student name (from email) as name.
"
)
parser_repo
.
add_argument
(
"
-i
"
,
"
--import_url
"
,
help
=
"
Import the publicly accessible project by URL given here (optional).
"
)
parser_repo
.
add_argument
(
"
-x
"
,
"
--expires_at
"
,
help
=
"
Expiration date to kick off students from this project, at 00:00:00. YYYY-MM-DD format (optional).
"
)
parser_repo
.
set_defaults
(
func
=
command_create_repository
)
parser_clone
=
subparsers
.
add_parser
(
'
clone
'
,
help
=
'
Clone the repositories locally
'
)
group_clone
=
parser_clone
.
add_mutually_exclusive_group
()
group_clone
=
parser_clone
.
add_mutually_exclusive_group
()
group_clone
.
add_argument
(
"
-g
"
,
"
--group
"
,
action
=
"
store_true
"
,
group_clone
.
add_argument
(
help
=
"
Clone repositories from a group (with group_id) (default behavior).
"
)
"
-g
"
,
group_clone
.
add_argument
(
"
-f
"
,
"
--forks
"
,
action
=
"
store_true
"
,
"
--group
"
,
help
=
"
Clone forks of a project (with project_id).
"
)
action
=
"
store_true
"
,
help
=
"
Clone repositories from a group (with group_id) (default behavior).
"
,
)
parser_clone
.
add_argument
(
"
id
"
,
metavar
=
"
ID
"
,
help
=
"
The group_id (int) of the projects.
"
,
)
parser_clone
.
add_argument
(
parser_clone
.
add_argument
(
"
id
"
,
metavar
=
"
ID
"
,
help
=
"
The group_id (int) of the projects or the project_id (int) of the forks.
"
)
"
directory
"
,
metavar
=
"
DIRECTORY
"
,
help
=
"
Local directory where clone all repositories.
"
,
)
parser_clone
.
add_argument
(
parser_clone
.
add_argument
(
"
directory
"
,
metavar
=
"
DIRECTORY
"
,
help
=
"
Local directory where clone all repositories.
"
)
"
-u
"
,
"
--until_date
"
,
help
=
'
Do a git checkout for all repositories at given date, format
"
YYYY-MM-DD hh:mm
"
(optional).
'
,
)
parser_clone
.
add_argument
(
parser_clone
.
add_argument
(
"
-u
"
,
"
--until_date
"
,
help
=
"
Do a git checkout for all repositories at given date, format
\"
YYYY-MM-DD hh:mm
\"
(optional).
"
)
"
--use_http
"
,
parser_clone
.
add_argument
(
"
--use_http
"
,
help
=
"
Use the HTTP client instead of SSH. False by default.
"
,
action
=
'
store_true
'
)
help
=
"
Use the HTTP client instead of SSH. False by default.
"
,
action
=
"
store_true
"
,
)
parser_clone
.
set_defaults
(
func
=
command_clone_all
)
parser_clone
.
set_defaults
(
func
=
command_clone_all
)
parser_list
=
subparsers
.
add_parser
(
parser_list
=
subparsers
.
add_parser
(
'
list
'
,
help
=
"
List group
'
s projects or project
'
s members
"
)
"
list_projects
"
,
help
=
"
List all project in a group
"
group_list
=
parser_list
.
add_mutually_exclusive_group
()
)
group_list
.
add_argument
(
parser_list
.
add_argument
(
"
id
"
,
metavar
=
"
ID
"
,
help
=
"
The group_id (int).
"
)
"
-p
"
,
"
--projects
"
,
action
=
"
store_true
"
,
help
=
"
List group
'
s projects (default
"
)
parser_list
.
add_argument
(
group_list
.
add_argument
(
"
-s
"
,
"
-m
"
,
"
--members
"
,
action
=
"
store_true
"
,
help
=
"
List project
'
s members
"
)
"
--show
"
,
help
=
"
Amount of informations (default name) : [all | name | url | ssh]
"
,
)
parser_list
.
set_defaults
(
func
=
command_list_projects
)
parser_list
=
subparsers
.
add_parser
(
"
list_users
"
,
help
=
"
List all users in a repository
"
)
parser_list
.
add_argument
(
parser_list
.
add_argument
(
"
id
"
,
metavar
=
"
ID
"
,
help
=
"
The group_id or the project_id (int).
"
)
"
id
"
,
metavar
=
"
ID
"
,
help
=
"
The repository project_id (int).
"
)
parser_list
.
add_argument
(
parser_list
.
add_argument
(
"
-s
"
,
"
--show
"
,
help
=
"
Amount of informations (default name) : [all | name | url | ssh]
"
)
"
-s
"
,
parser_list
.
set_defaults
(
func
=
command_list
)
"
--show
"
,
help
=
"
Amount of informations (default name) : [all | name | url | ssh]
"
,
)
parser_list
.
set_defaults
(
func
=
command_list_users
)
args
=
parser
.
parse_args
()
args
=
parser
.
parse_args
()
if
not
args
.
token
:
if
not
args
.
token
:
home
=
os
.
environ
.
get
(
'
HOME
'
)
if
os
.
environ
.
get
(
"
GITEDU_TOKEN
"
):
if
home
:
args
.
token
=
os
.
environ
.
get
(
"
GITEDU_TOKEN
"
)
token_file
=
home
+
'
/.gitedu_token
'
if
os
.
path
.
isfile
(
token_file
):
with
open
(
token_file
)
as
file
:
args
.
token
=
file
.
read
().
strip
()
elif
os
.
environ
.
get
(
'
GITEDU_TOKEN
'
):
args
.
token
=
os
.
environ
.
get
(
'
GITEDU_TOKEN
'
)
else
:
else
:
print
(
'
Error: you must give a valid api token. Create a token here:
'
+
TOKEN_URL
)
print
(
"
Error: you must give a valid api token. Create a token here:
"
+
TOKEN_URL
)
exit
(
1
)
exit
(
1
)
args
.
func
(
args
)
args
.
func
(
args
)
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment