fix: export public keys

This commit is contained in:
张泊明518370910136 2021-10-11 14:46:56 +08:00
parent 13e7278cdd
commit 31bca87d4b
No known key found for this signature in database
GPG Key ID: FBEF5DE8B9F4C629
4 changed files with 381 additions and 373 deletions

View File

@ -47,7 +47,11 @@ def create_teams_and_repos_by_canvas_groups(group_prefix: str) -> None:
@app.command("get-public-keys", help="list all public keys on gitea") @app.command("get-public-keys", help="list all public keys on gitea")
def get_public_key_of_all_canvas_students() -> None: def get_public_key_of_all_canvas_students() -> None:
echo("\n".join(tea.pot.get_public_key_of_all_canvas_students())) res = []
for k, v in tea.pot.get_public_key_of_all_canvas_students().items():
keys = "\\n".join(v)
res.append(f"{k},{keys}")
echo("\n".join(res))
@app.command("clone-all-repos", help="clone all gitea repos to local") @app.command("clone-all-repos", help="clone all gitea repos to local")

View File

@ -1,6 +1,6 @@
import functools import functools
from datetime import datetime from datetime import datetime
from typing import Any, Callable, List, Optional, TypeVar from typing import Any, Callable, Dict, List, Optional, TypeVar
from joint_teapot.config import settings from joint_teapot.config import settings
from joint_teapot.utils.logger import logger from joint_teapot.utils.logger import logger
@ -88,7 +88,7 @@ class Teapot:
self.canvas.students, self.canvas.groups, convertor, convertor self.canvas.students, self.canvas.groups, convertor, convertor
) )
def get_public_key_of_all_canvas_students(self) -> List[str]: def get_public_key_of_all_canvas_students(self) -> Dict[str, List[str]]:
return self.gitea.get_public_key_of_canvas_students(self.canvas.students) return self.gitea.get_public_key_of_canvas_students(self.canvas.students)
def clone_all_repos(self) -> None: def clone_all_repos(self) -> None:

View File

@ -20,9 +20,9 @@ class Canvas:
self.canvas = PyCanvas("https://umjicanvas.com/", access_token) self.canvas = PyCanvas("https://umjicanvas.com/", access_token)
self.course = self.canvas.get_course(course_id) self.course = self.canvas.get_course(course_id)
logger.info(f"Canvas course loaded. {self.course}") logger.info(f"Canvas course loaded. {self.course}")
self.students = self.course.get_users( # types = ["student", "observer"]
enrollment_type=["student"], include=["email"] types = ["student"]
) self.students = self.course.get_users(enrollment_type=types, include=["email"])
for attr in ["sis_login_id", "sortable_name", "name"]: for attr in ["sis_login_id", "sortable_name", "name"]:
if not hasattr(self.students[0], attr): if not hasattr(self.students[0], attr):
raise Exception( raise Exception(

View File

@ -1,367 +1,371 @@
import re import re
from enum import Enum from enum import Enum
from functools import lru_cache from functools import lru_cache
from typing import Any, Callable, List, Optional, Tuple from typing import Any, Callable, Dict, List, Optional, Tuple
import focs_gitea import focs_gitea
from canvasapi.group import Group, GroupMembership from canvasapi.group import Group, GroupMembership
from canvasapi.paginated_list import PaginatedList from canvasapi.paginated_list import PaginatedList
from canvasapi.user import User from canvasapi.user import User
from focs_gitea.rest import ApiException from focs_gitea.rest import ApiException
from joint_teapot.config import settings from joint_teapot.config import settings
from joint_teapot.utils.logger import logger from joint_teapot.utils.logger import logger
from joint_teapot.utils.main import first from joint_teapot.utils.main import first
class PermissionEnum(Enum): class PermissionEnum(Enum):
read = "read" read = "read"
write = "write" write = "write"
admin = "admin" admin = "admin"
def default_repo_name_convertor(user: User) -> Optional[str]: def default_repo_name_convertor(user: User) -> Optional[str]:
id, name = user.sis_login_id, user.name id, name = user.sis_login_id, user.name
eng = re.sub("[\u4e00-\u9fa5]", "", name) eng = re.sub("[\u4e00-\u9fa5]", "", name)
eng = eng.replace(",", "") eng = eng.replace(",", "")
eng = "".join([word[0].capitalize() + word[1:] for word in eng.split()]) eng = "".join([word[0].capitalize() + word[1:] for word in eng.split()])
return f"{eng}{id}" return f"{eng}{id}"
def list_all(method: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: def list_all(method: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
all_res = [] all_res = []
page = 1 page = 1
while True: while True:
res = method(*args, **kwargs, page=page) res = method(*args, **kwargs, page=page)
if not res: if not res:
break break
for item in res: for item in res:
all_res.append(item) all_res.append(item)
page += 1 page += 1
return all_res return all_res
class Gitea: class Gitea:
def __init__( def __init__(
self, self,
access_token: str = settings.gitea_access_token, access_token: str = settings.gitea_access_token,
org_name: str = settings.gitea_org_name, org_name: str = settings.gitea_org_name,
): ):
self.org_name = org_name self.org_name = org_name
configuration = focs_gitea.Configuration() configuration = focs_gitea.Configuration()
configuration.api_key["access_token"] = access_token configuration.api_key["access_token"] = access_token
self.api_client = focs_gitea.ApiClient(configuration) self.api_client = focs_gitea.ApiClient(configuration)
self.admin_api = focs_gitea.AdminApi(self.api_client) self.admin_api = focs_gitea.AdminApi(self.api_client)
self.miscellaneous_api = focs_gitea.MiscellaneousApi(self.api_client) self.miscellaneous_api = focs_gitea.MiscellaneousApi(self.api_client)
self.organization_api = focs_gitea.OrganizationApi(self.api_client) self.organization_api = focs_gitea.OrganizationApi(self.api_client)
self.issue_api = focs_gitea.IssueApi(self.api_client) self.issue_api = focs_gitea.IssueApi(self.api_client)
self.repository_api = focs_gitea.RepositoryApi(self.api_client) self.repository_api = focs_gitea.RepositoryApi(self.api_client)
self.settings_api = focs_gitea.SettingsApi(self.api_client) self.settings_api = focs_gitea.SettingsApi(self.api_client)
self.user_api = focs_gitea.UserApi(self.api_client) self.user_api = focs_gitea.UserApi(self.api_client)
logger.debug("Gitea initialized") logger.debug("Gitea initialized")
@lru_cache() @lru_cache()
def _get_team_id_by_name(self, name: str) -> int: def _get_team_id_by_name(self, name: str) -> int:
res = self.organization_api.team_search(self.org_name, q=str(name), limit=1) res = self.organization_api.team_search(self.org_name, q=str(name), limit=1)
if len(res["data"]) == 0: if len(res["data"]) == 0:
raise Exception(f"{name} not found by name in Gitea") raise Exception(f"{name} not found by name in Gitea")
return res["data"][0]["id"] return res["data"][0]["id"]
@lru_cache() @lru_cache()
def _get_username_by_canvas_student(self, student: User) -> str: def _get_username_by_canvas_student(self, student: User) -> str:
res = self.user_api.user_search(q=student.sis_login_id, limit=1) res = self.user_api.user_search(q=student.sis_login_id, limit=1)
if len(res["data"]) == 0: if len(res["data"]) == 0:
raise Exception(f"{student} not found in Gitea") raise Exception(f"{student} not found in Gitea")
return res["data"][0]["username"] return res["data"][0]["username"]
def add_canvas_students_to_teams( def add_canvas_students_to_teams(
self, students: PaginatedList, team_names: List[str] self, students: PaginatedList, team_names: List[str]
) -> None: ) -> None:
for team_name in team_names: for team_name in team_names:
team_id = self._get_team_id_by_name(team_name) team_id = self._get_team_id_by_name(team_name)
team_members = self.organization_api.org_list_team_members(team_id) team_members = self.organization_api.org_list_team_members(team_id)
for student in students: for student in students:
try: try:
username = self._get_username_by_canvas_student(student) username = self._get_username_by_canvas_student(student)
team_member = first(team_members, lambda x: x.login == username) team_member = first(team_members, lambda x: x.login == username)
if team_member is None: if team_member is None:
self.organization_api.org_add_team_member(team_id, username) self.organization_api.org_add_team_member(team_id, username)
logger.info(f"{student} added to team {team_name}") logger.info(f"{student} added to team {team_name}")
else: else:
team_members.remove(team_member) team_members.remove(team_member)
logger.warning(f"{student} already in team {team_name}") logger.warning(f"{student} already in team {team_name}")
except Exception as e: except Exception as e:
logger.error(e) logger.error(e)
for team_member in team_members: for team_member in team_members:
logger.error( logger.error(
f"{team_member.full_name} found in team {team_name} " f"{team_member.full_name} found in team {team_name} "
+ "but not found in Canvas students" + "but not found in Canvas students"
) )
def create_personal_repos_for_canvas_students( def create_personal_repos_for_canvas_students(
self, self,
students: PaginatedList, students: PaginatedList,
repo_name_convertor: Callable[ repo_name_convertor: Callable[
[User], Optional[str] [User], Optional[str]
] = default_repo_name_convertor, ] = default_repo_name_convertor,
) -> List[str]: ) -> List[str]:
repo_names = [] repo_names = []
for student in students: for student in students:
repo_name = repo_name_convertor(student) repo_name = repo_name_convertor(student)
if repo_name is None: if repo_name is None:
continue continue
repo_names.append(repo_name) repo_names.append(repo_name)
body = { body = {
"auto_init": False, "auto_init": False,
"default_branch": "master", "default_branch": "master",
"name": repo_name, "name": repo_name,
"private": True, "private": True,
"template": False, "template": False,
"trust_model": "default", "trust_model": "default",
} }
try: try:
try: try:
repo = self.organization_api.create_org_repo( repo = self.organization_api.create_org_repo(
self.org_name, body=body self.org_name, body=body
) )
logger.info( logger.info(
f"Personal repo {self.org_name}/{repo_name} for {student} created" f"Personal repo {self.org_name}/{repo_name} for {student} created"
) )
except ApiException as e: except ApiException as e:
if e.status == 409: if e.status == 409:
logger.warning( logger.warning(
f"Personal repo {self.org_name}/{repo_name} for {student} already exists" f"Personal repo {self.org_name}/{repo_name} for {student} already exists"
) )
else: else:
raise (e) raise (e)
username = self._get_username_by_canvas_student(student) username = self._get_username_by_canvas_student(student)
self.repository_api.repo_add_collaborator( self.repository_api.repo_add_collaborator(
self.org_name, repo_name, username self.org_name, repo_name, username
) )
except Exception as e: except Exception as e:
logger.error(e) logger.error(e)
return repo_names return repo_names
def create_teams_and_repos_by_canvas_groups( def create_teams_and_repos_by_canvas_groups(
self, self,
students: PaginatedList, students: PaginatedList,
groups: PaginatedList, groups: PaginatedList,
team_name_convertor: Callable[[str], Optional[str]] = lambda name: name, team_name_convertor: Callable[[str], Optional[str]] = lambda name: name,
repo_name_convertor: Callable[[str], Optional[str]] = lambda name: name, repo_name_convertor: Callable[[str], Optional[str]] = lambda name: name,
permission: PermissionEnum = PermissionEnum.write, permission: PermissionEnum = PermissionEnum.write,
) -> List[str]: ) -> List[str]:
repo_names = [] repo_names = []
teams = list_all(self.organization_api.org_list_teams, self.org_name) teams = list_all(self.organization_api.org_list_teams, self.org_name)
repos = list_all(self.organization_api.org_list_repos, self.org_name) repos = list_all(self.organization_api.org_list_repos, self.org_name)
group: Group group: Group
for group in groups: for group in groups:
team_name = team_name_convertor(group.name) team_name = team_name_convertor(group.name)
repo_name = repo_name_convertor(group.name) repo_name = repo_name_convertor(group.name)
if team_name is None or repo_name is None: if team_name is None or repo_name is None:
continue continue
team = first(teams, lambda team: team.name == team_name) team = first(teams, lambda team: team.name == team_name)
if team is None: if team is None:
team = self.organization_api.org_create_team( team = self.organization_api.org_create_team(
self.org_name, self.org_name,
body={ body={
"can_create_org_repo": False, "can_create_org_repo": False,
"includes_all_repositories": False, "includes_all_repositories": False,
"name": team_name, "name": team_name,
"permission": permission.value, "permission": permission.value,
"units": [ "units": [
"repo.code", "repo.code",
"repo.issues", "repo.issues",
"repo.ext_issues", "repo.ext_issues",
"repo.wiki", "repo.wiki",
"repo.pulls", "repo.pulls",
"repo.releases", "repo.releases",
"repo.projects", "repo.projects",
"repo.ext_wiki", "repo.ext_wiki",
], ],
}, },
) )
logger.info(f"{self.org_name}/{team_name} created") logger.info(f"{self.org_name}/{team_name} created")
repo = first(repos, lambda repo: repo.name == repo_name) repo = first(repos, lambda repo: repo.name == repo_name)
if repo is None: if repo is None:
repo_names.append(repo_name) repo_names.append(repo_name)
repo = self.organization_api.create_org_repo( repo = self.organization_api.create_org_repo(
self.org_name, self.org_name,
body={ body={
"auto_init": False, "auto_init": False,
"default_branch": "master", "default_branch": "master",
"name": repo_name, "name": repo_name,
"private": True, "private": True,
"template": False, "template": False,
"trust_model": "default", "trust_model": "default",
}, },
) )
logger.info(f"Team {team_name} created") logger.info(f"Team {team_name} created")
self.organization_api.org_add_team_repository( self.organization_api.org_add_team_repository(
team.id, self.org_name, repo_name team.id, self.org_name, repo_name
) )
membership: GroupMembership membership: GroupMembership
student_count = 0 student_count = 0
for membership in group.get_memberships(): for membership in group.get_memberships():
student = first(students, lambda s: s.id == membership.user_id) student = first(students, lambda s: s.id == membership.user_id)
if student is None: if student is None:
raise Exception( raise Exception(
f"student with user_id {membership.user_id} not found" f"student with user_id {membership.user_id} not found"
) )
username = self._get_username_by_canvas_student(student) username = self._get_username_by_canvas_student(student)
self.organization_api.org_add_team_member(team.id, username) self.organization_api.org_add_team_member(team.id, username)
self.repository_api.repo_add_collaborator( self.repository_api.repo_add_collaborator(
self.org_name, repo_name, username self.org_name, repo_name, username
) )
student_count += 1 student_count += 1
try: try:
self.repository_api.repo_delete_branch_protection( self.repository_api.repo_delete_branch_protection(
self.org_name, repo_name, "master" self.org_name, repo_name, "master"
) )
except ApiException as e: except ApiException as e:
if e.status != 404: if e.status != 404:
raise raise
try: try:
self.repository_api.repo_create_branch_protection( self.repository_api.repo_create_branch_protection(
self.org_name, self.org_name,
repo_name, repo_name,
body={ body={
"block_on_official_review_requests": True, "block_on_official_review_requests": True,
"block_on_outdated_branch": True, "block_on_outdated_branch": True,
"block_on_rejected_reviews": True, "block_on_rejected_reviews": True,
"branch_name": "master", "branch_name": "master",
"dismiss_stale_approvals": True, "dismiss_stale_approvals": True,
"enable_approvals_whitelist": False, "enable_approvals_whitelist": False,
"enable_merge_whitelist": False, "enable_merge_whitelist": False,
"enable_push": False, "enable_push": False,
"enable_push_whitelist": False, "enable_push_whitelist": False,
"enable_status_check": False, "enable_status_check": False,
"merge_whitelist_teams": [], "merge_whitelist_teams": [],
"merge_whitelist_usernames": [], "merge_whitelist_usernames": [],
"protected_file_patterns": "", "protected_file_patterns": "",
"push_whitelist_deploy_keys": False, "push_whitelist_deploy_keys": False,
"push_whitelist_teams": [], "push_whitelist_teams": [],
"push_whitelist_usernames": [], "push_whitelist_usernames": [],
"require_signed_commits": False, "require_signed_commits": False,
"required_approvals": max(student_count - 1, 0), "required_approvals": max(student_count - 1, 0),
"status_check_contexts": [], "status_check_contexts": [],
}, },
) )
except ApiException as e: except ApiException as e:
if e.status != 404: if e.status != 404:
raise raise
logger.info(f"{self.org_name}/{repo_name} jobs done") logger.info(f"{self.org_name}/{repo_name} jobs done")
return repo_names return repo_names
def get_public_key_of_canvas_students(self, students: PaginatedList) -> List[str]: def get_public_key_of_canvas_students(
res = [] self, students: PaginatedList
for student in students: ) -> Dict[str, List[str]]:
try: res = {}
username = self._get_username_by_canvas_student(student) for student in students:
res.extend( try:
[ username = self._get_username_by_canvas_student(student)
item.key keys = [
for item in list_all(self.user_api.user_list_keys, username) item.key
] for item in list_all(self.user_api.user_list_keys, username)
) ]
except Exception as e: if not keys:
logger.error(e) logger.info(f"{student} has not uploaded ssh keys to gitea")
return res continue
res[student.sis_login_id] = keys
def get_repo_releases(self, repo_name: str) -> List[Any]: except Exception as e:
res = [] logger.error(e)
try: return res
args = self.repository_api.repo_list_releases, self.org_name, repo_name
res = list_all(*args) def get_repo_releases(self, repo_name: str) -> List[Any]:
except ApiException as e: res = []
if e.status != 404: try:
raise args = self.repository_api.repo_list_releases, self.org_name, repo_name
return res res = list_all(*args)
except ApiException as e:
def get_all_repo_names(self) -> List[str]: if e.status != 404:
return [ raise
data.name return res
for data in list_all(self.organization_api.org_list_repos, self.org_name)
] def get_all_repo_names(self) -> List[str]:
return [
def get_no_collaborator_repos(self) -> List[str]: data.name
res = [] for data in list_all(self.organization_api.org_list_repos, self.org_name)
for data in list_all(self.organization_api.org_list_repos, self.org_name): ]
collaborators = self.repository_api.repo_list_collaborators(
self.org_name, data.name def get_no_collaborator_repos(self) -> List[str]:
) res = []
if collaborators: for data in list_all(self.organization_api.org_list_repos, self.org_name):
continue collaborators = self.repository_api.repo_list_collaborators(
logger.info(f"{self.org_name}/{data.name} has no collaborators") self.org_name, data.name
res.append(data.name) )
return res if collaborators:
continue
def get_repos_status(self) -> List[Tuple[str, int, int]]: logger.info(f"{self.org_name}/{data.name} has no collaborators")
res = [] res.append(data.name)
for repo in list_all(self.organization_api.org_list_repos, self.org_name): return res
try:
commits = self.repository_api.repo_get_all_commits( def get_repos_status(self) -> List[Tuple[str, int, int]]:
self.org_name, repo.name res = []
) for repo in list_all(self.organization_api.org_list_repos, self.org_name):
except ApiException as e: try:
if e.status != 409: commits = self.repository_api.repo_get_all_commits(
raise self.org_name, repo.name
commits = [] )
issues = self.issue_api.issue_list_issues( except ApiException as e:
self.org_name, repo.name, state="all" if e.status != 409:
) raise
# if not commits: commits = []
# logger.info(f"{self.org_name}/{repo.name} has no commits") issues = self.issue_api.issue_list_issues(
# res.append(repo.name) self.org_name, repo.name, state="all"
res.append((repo.name, len(commits), len(issues))) )
return res # if not commits:
# logger.info(f"{self.org_name}/{repo.name} has no commits")
def create_issue( # res.append(repo.name)
self, res.append((repo.name, len(commits), len(issues)))
repo_name: str, return res
title: str,
body: str, def create_issue(
assign_every_collaborators: bool = True, self,
) -> None: repo_name: str,
assignees = [] title: str,
if assign_every_collaborators: body: str,
assignees = [ assign_every_collaborators: bool = True,
item.username ) -> None:
for item in list_all( assignees = []
self.repository_api.repo_list_collaborators, if assign_every_collaborators:
self.org_name, assignees = [
repo_name, item.username
) for item in list_all(
] self.repository_api.repo_list_collaborators,
self.issue_api.issue_create_issue( self.org_name,
self.org_name, repo_name,
repo_name, )
body={"title": title, "body": body, "assignees": assignees}, ]
) self.issue_api.issue_create_issue(
self.org_name,
def check_exist_issue_by_title(self, repo_name: str, title: str) -> bool: repo_name,
for issue in list_all( body={"title": title, "body": body, "assignees": assignees},
self.issue_api.issue_list_issues, self.org_name, repo_name )
):
if issue.title == title: def check_exist_issue_by_title(self, repo_name: str, title: str) -> bool:
return True for issue in list_all(
return False self.issue_api.issue_list_issues, self.org_name, repo_name
):
def close_all_issues(self) -> None: if issue.title == title:
for repo in list_all(self.organization_api.org_list_repos, self.org_name): return True
for issue in list_all( return False
self.issue_api.issue_list_issues, self.org_name, repo.name
): def close_all_issues(self) -> None:
if issue.state != "closed": for repo in list_all(self.organization_api.org_list_repos, self.org_name):
self.issue_api.issue_edit_issue( for issue in list_all(
self.org_name, repo.name, issue.number, body={"state": "closed"} self.issue_api.issue_list_issues, self.org_name, repo.name
) ):
if issue.state != "closed":
def archieve_all_repos(self) -> None: self.issue_api.issue_edit_issue(
for repo in list_all(self.organization_api.org_list_repos, self.org_name): self.org_name, repo.name, issue.number, body={"state": "closed"}
self.repository_api.repo_edit( )
self.org_name, repo.name, body={"archived": True}
) def archieve_all_repos(self) -> None:
for repo in list_all(self.organization_api.org_list_repos, self.org_name):
self.repository_api.repo_edit(
if __name__ == "__main__": self.org_name, repo.name, body={"archived": True}
gitea = Gitea() )
if __name__ == "__main__":
gitea = Gitea()