forked from JOJ/Joint-Teapot
		
	
		
			
				
	
	
		
			372 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			372 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import re
 | |
| from enum import Enum
 | |
| from functools import lru_cache
 | |
| from typing import Any, Callable, Dict, List, Optional, Tuple
 | |
| 
 | |
| import focs_gitea
 | |
| from canvasapi.group import Group, GroupMembership
 | |
| from canvasapi.paginated_list import PaginatedList
 | |
| from canvasapi.user import User
 | |
| from focs_gitea.rest import ApiException
 | |
| 
 | |
| from joint_teapot.config import settings
 | |
| from joint_teapot.utils.logger import logger
 | |
| from joint_teapot.utils.main import first
 | |
| 
 | |
| 
 | |
| class PermissionEnum(Enum):
 | |
|     read = "read"
 | |
|     write = "write"
 | |
|     admin = "admin"
 | |
| 
 | |
| 
 | |
| def default_repo_name_convertor(user: User) -> Optional[str]:
 | |
|     id, name = user.sis_login_id, user.name
 | |
|     eng = re.sub("[\u4e00-\u9fa5]", "", name)
 | |
|     eng = eng.replace(",", "")
 | |
|     eng = "".join([word[0].capitalize() + word[1:] for word in eng.split()])
 | |
|     return f"{eng}{id}"
 | |
| 
 | |
| 
 | |
| def list_all(method: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
 | |
|     all_res = []
 | |
|     page = 1
 | |
|     while True:
 | |
|         res = method(*args, **kwargs, page=page)
 | |
|         if not res:
 | |
|             break
 | |
|         for item in res:
 | |
|             all_res.append(item)
 | |
|         page += 1
 | |
|     return all_res
 | |
| 
 | |
| 
 | |
| class Gitea:
 | |
|     def __init__(
 | |
|         self,
 | |
|         access_token: str = settings.gitea_access_token,
 | |
|         org_name: str = settings.gitea_org_name,
 | |
|     ):
 | |
|         self.org_name = org_name
 | |
|         configuration = focs_gitea.Configuration()
 | |
|         configuration.api_key["access_token"] = access_token
 | |
|         self.api_client = focs_gitea.ApiClient(configuration)
 | |
|         self.admin_api = focs_gitea.AdminApi(self.api_client)
 | |
|         self.miscellaneous_api = focs_gitea.MiscellaneousApi(self.api_client)
 | |
|         self.organization_api = focs_gitea.OrganizationApi(self.api_client)
 | |
|         self.issue_api = focs_gitea.IssueApi(self.api_client)
 | |
|         self.repository_api = focs_gitea.RepositoryApi(self.api_client)
 | |
|         self.settings_api = focs_gitea.SettingsApi(self.api_client)
 | |
|         self.user_api = focs_gitea.UserApi(self.api_client)
 | |
|         logger.debug("Gitea initialized")
 | |
| 
 | |
|     @lru_cache()
 | |
|     def _get_team_id_by_name(self, name: str) -> int:
 | |
|         res = self.organization_api.team_search(self.org_name, q=str(name), limit=1)
 | |
|         if len(res["data"]) == 0:
 | |
|             raise Exception(f"{name} not found by name in Gitea")
 | |
|         return res["data"][0]["id"]
 | |
| 
 | |
|     @lru_cache()
 | |
|     def _get_username_by_canvas_student(self, student: User) -> str:
 | |
|         res = self.user_api.user_search(q=student.sis_login_id, limit=1)
 | |
|         if len(res["data"]) == 0:
 | |
|             raise Exception(f"{student} not found in Gitea")
 | |
|         return res["data"][0]["username"]
 | |
| 
 | |
|     def add_canvas_students_to_teams(
 | |
|         self, students: PaginatedList, team_names: List[str]
 | |
|     ) -> None:
 | |
|         for team_name in team_names:
 | |
|             team_id = self._get_team_id_by_name(team_name)
 | |
|             team_members = self.organization_api.org_list_team_members(team_id)
 | |
|             for student in students:
 | |
|                 try:
 | |
|                     username = self._get_username_by_canvas_student(student)
 | |
|                     team_member = first(team_members, lambda x: x.login == username)
 | |
|                     if team_member is None:
 | |
|                         self.organization_api.org_add_team_member(team_id, username)
 | |
|                         logger.info(f"{student} added to team {team_name}")
 | |
|                     else:
 | |
|                         team_members.remove(team_member)
 | |
|                         logger.warning(f"{student} already in team {team_name}")
 | |
|                 except Exception as e:
 | |
|                     logger.error(e)
 | |
|             for team_member in team_members:
 | |
|                 logger.error(
 | |
|                     f"{team_member.full_name} found in team {team_name} "
 | |
|                     + "but not found in Canvas students"
 | |
|                 )
 | |
| 
 | |
|     def create_personal_repos_for_canvas_students(
 | |
|         self,
 | |
|         students: PaginatedList,
 | |
|         repo_name_convertor: Callable[
 | |
|             [User], Optional[str]
 | |
|         ] = default_repo_name_convertor,
 | |
|     ) -> List[str]:
 | |
|         repo_names = []
 | |
|         for student in students:
 | |
|             repo_name = repo_name_convertor(student)
 | |
|             if repo_name is None:
 | |
|                 continue
 | |
|             repo_names.append(repo_name)
 | |
|             body = {
 | |
|                 "auto_init": False,
 | |
|                 "default_branch": "master",
 | |
|                 "name": repo_name,
 | |
|                 "private": True,
 | |
|                 "template": False,
 | |
|                 "trust_model": "default",
 | |
|             }
 | |
|             try:
 | |
|                 try:
 | |
|                     repo = self.organization_api.create_org_repo(
 | |
|                         self.org_name, body=body
 | |
|                     )
 | |
|                     logger.info(
 | |
|                         f"Personal repo {self.org_name}/{repo_name} for {student} created"
 | |
|                     )
 | |
|                 except ApiException as e:
 | |
|                     if e.status == 409:
 | |
|                         logger.warning(
 | |
|                             f"Personal repo {self.org_name}/{repo_name} for {student} already exists"
 | |
|                         )
 | |
|                     else:
 | |
|                         raise (e)
 | |
|                 username = self._get_username_by_canvas_student(student)
 | |
|                 self.repository_api.repo_add_collaborator(
 | |
|                     self.org_name, repo_name, username
 | |
|                 )
 | |
|             except Exception as e:
 | |
|                 logger.error(e)
 | |
|         return repo_names
 | |
| 
 | |
|     def create_teams_and_repos_by_canvas_groups(
 | |
|         self,
 | |
|         students: PaginatedList,
 | |
|         groups: PaginatedList,
 | |
|         team_name_convertor: Callable[[str], Optional[str]] = lambda name: name,
 | |
|         repo_name_convertor: Callable[[str], Optional[str]] = lambda name: name,
 | |
|         permission: PermissionEnum = PermissionEnum.write,
 | |
|     ) -> List[str]:
 | |
|         repo_names = []
 | |
|         teams = list_all(self.organization_api.org_list_teams, self.org_name)
 | |
|         repos = list_all(self.organization_api.org_list_repos, self.org_name)
 | |
|         group: Group
 | |
|         for group in groups:
 | |
|             team_name = team_name_convertor(group.name)
 | |
|             repo_name = repo_name_convertor(group.name)
 | |
|             if team_name is None or repo_name is None:
 | |
|                 continue
 | |
|             team = first(teams, lambda team: team.name == team_name)
 | |
|             if team is None:
 | |
|                 team = self.organization_api.org_create_team(
 | |
|                     self.org_name,
 | |
|                     body={
 | |
|                         "can_create_org_repo": False,
 | |
|                         "includes_all_repositories": False,
 | |
|                         "name": team_name,
 | |
|                         "permission": permission.value,
 | |
|                         "units": [
 | |
|                             "repo.code",
 | |
|                             "repo.issues",
 | |
|                             "repo.ext_issues",
 | |
|                             "repo.wiki",
 | |
|                             "repo.pulls",
 | |
|                             "repo.releases",
 | |
|                             "repo.projects",
 | |
|                             "repo.ext_wiki",
 | |
|                         ],
 | |
|                     },
 | |
|                 )
 | |
|                 logger.info(f"{self.org_name}/{team_name} created")
 | |
|             repo = first(repos, lambda repo: repo.name == repo_name)
 | |
|             if repo is None:
 | |
|                 repo_names.append(repo_name)
 | |
|                 repo = self.organization_api.create_org_repo(
 | |
|                     self.org_name,
 | |
|                     body={
 | |
|                         "auto_init": False,
 | |
|                         "default_branch": "master",
 | |
|                         "name": repo_name,
 | |
|                         "private": True,
 | |
|                         "template": False,
 | |
|                         "trust_model": "default",
 | |
|                     },
 | |
|                 )
 | |
|                 logger.info(f"Team {team_name} created")
 | |
|             self.organization_api.org_add_team_repository(
 | |
|                 team.id, self.org_name, repo_name
 | |
|             )
 | |
|             membership: GroupMembership
 | |
|             student_count = 0
 | |
|             for membership in group.get_memberships():
 | |
|                 student = first(students, lambda s: s.id == membership.user_id)
 | |
|                 if student is None:
 | |
|                     raise Exception(
 | |
|                         f"student with user_id {membership.user_id} not found"
 | |
|                     )
 | |
|                 username = self._get_username_by_canvas_student(student)
 | |
|                 self.organization_api.org_add_team_member(team.id, username)
 | |
|                 self.repository_api.repo_add_collaborator(
 | |
|                     self.org_name, repo_name, username
 | |
|                 )
 | |
|                 student_count += 1
 | |
|             try:
 | |
|                 self.repository_api.repo_delete_branch_protection(
 | |
|                     self.org_name, repo_name, "master"
 | |
|                 )
 | |
|             except ApiException as e:
 | |
|                 if e.status != 404:
 | |
|                     raise
 | |
|             try:
 | |
|                 self.repository_api.repo_create_branch_protection(
 | |
|                     self.org_name,
 | |
|                     repo_name,
 | |
|                     body={
 | |
|                         "block_on_official_review_requests": True,
 | |
|                         "block_on_outdated_branch": True,
 | |
|                         "block_on_rejected_reviews": True,
 | |
|                         "branch_name": "master",
 | |
|                         "dismiss_stale_approvals": True,
 | |
|                         "enable_approvals_whitelist": False,
 | |
|                         "enable_merge_whitelist": True,
 | |
|                         "enable_push": True,
 | |
|                         "enable_push_whitelist": True,
 | |
|                         "enable_status_check": False,
 | |
|                         "merge_whitelist_teams": ["Owners"],
 | |
|                         "merge_whitelist_usernames": [],
 | |
|                         "protected_file_patterns": "",
 | |
|                         "push_whitelist_deploy_keys": False,
 | |
|                         "push_whitelist_teams": ["Owners"],
 | |
|                         "push_whitelist_usernames": [],
 | |
|                         "require_signed_commits": False,
 | |
|                         "required_approvals": max(student_count - 1, 0),
 | |
|                         "status_check_contexts": [],
 | |
|                     },
 | |
|                 )
 | |
|             except ApiException as e:
 | |
|                 if e.status != 404:
 | |
|                     raise
 | |
|             logger.info(f"{self.org_name}/{repo_name} jobs done")
 | |
|         return repo_names
 | |
| 
 | |
|     def get_public_key_of_canvas_students(
 | |
|         self, students: PaginatedList
 | |
|     ) -> Dict[str, List[str]]:
 | |
|         res = {}
 | |
|         for student in students:
 | |
|             try:
 | |
|                 username = self._get_username_by_canvas_student(student)
 | |
|                 keys = [
 | |
|                     item.key
 | |
|                     for item in list_all(self.user_api.user_list_keys, username)
 | |
|                 ]
 | |
|                 if not keys:
 | |
|                     logger.info(f"{student} has not uploaded ssh keys to gitea")
 | |
|                     continue
 | |
|                 res[student.sis_login_id] = keys
 | |
|             except Exception as e:
 | |
|                 logger.error(e)
 | |
|         return res
 | |
| 
 | |
|     def get_repo_releases(self, repo_name: str) -> List[Any]:
 | |
|         res = []
 | |
|         try:
 | |
|             args = self.repository_api.repo_list_releases, self.org_name, repo_name
 | |
|             res = list_all(*args)
 | |
|         except ApiException as e:
 | |
|             if e.status != 404:
 | |
|                 raise
 | |
|         return res
 | |
| 
 | |
|     def get_all_repo_names(self) -> List[str]:
 | |
|         return [
 | |
|             data.name
 | |
|             for data in list_all(self.organization_api.org_list_repos, self.org_name)
 | |
|         ]
 | |
| 
 | |
|     def get_no_collaborator_repos(self) -> List[str]:
 | |
|         res = []
 | |
|         for data in list_all(self.organization_api.org_list_repos, self.org_name):
 | |
|             collaborators = self.repository_api.repo_list_collaborators(
 | |
|                 self.org_name, data.name
 | |
|             )
 | |
|             if collaborators:
 | |
|                 continue
 | |
|             logger.info(f"{self.org_name}/{data.name} has no collaborators")
 | |
|             res.append(data.name)
 | |
|         return res
 | |
| 
 | |
|     def get_repos_status(self) -> List[Tuple[str, int, int]]:
 | |
|         res = []
 | |
|         for repo in list_all(self.organization_api.org_list_repos, self.org_name):
 | |
|             try:
 | |
|                 commits = self.repository_api.repo_get_all_commits(
 | |
|                     self.org_name, repo.name
 | |
|                 )
 | |
|             except ApiException as e:
 | |
|                 if e.status != 409:
 | |
|                     raise
 | |
|                 commits = []
 | |
|             issues = self.issue_api.issue_list_issues(
 | |
|                 self.org_name, repo.name, state="all"
 | |
|             )
 | |
|             # if not commits:
 | |
|             #     logger.info(f"{self.org_name}/{repo.name} has no commits")
 | |
|             #     res.append(repo.name)
 | |
|             res.append((repo.name, len(commits), len(issues)))
 | |
|         return res
 | |
| 
 | |
|     def create_issue(
 | |
|         self,
 | |
|         repo_name: str,
 | |
|         title: str,
 | |
|         body: str,
 | |
|         assign_every_collaborators: bool = True,
 | |
|     ) -> None:
 | |
|         assignees = []
 | |
|         if assign_every_collaborators:
 | |
|             assignees = [
 | |
|                 item.username
 | |
|                 for item in list_all(
 | |
|                     self.repository_api.repo_list_collaborators,
 | |
|                     self.org_name,
 | |
|                     repo_name,
 | |
|                 )
 | |
|             ]
 | |
|         self.issue_api.issue_create_issue(
 | |
|             self.org_name,
 | |
|             repo_name,
 | |
|             body={"title": title, "body": body, "assignees": assignees},
 | |
|         )
 | |
| 
 | |
|     def check_exist_issue_by_title(self, repo_name: str, title: str) -> bool:
 | |
|         for issue in list_all(
 | |
|             self.issue_api.issue_list_issues, self.org_name, repo_name
 | |
|         ):
 | |
|             if issue.title == title:
 | |
|                 return True
 | |
|         return False
 | |
| 
 | |
|     def close_all_issues(self) -> None:
 | |
|         for repo in list_all(self.organization_api.org_list_repos, self.org_name):
 | |
|             for issue in list_all(
 | |
|                 self.issue_api.issue_list_issues, self.org_name, repo.name
 | |
|             ):
 | |
|                 if issue.state != "closed":
 | |
|                     self.issue_api.issue_edit_issue(
 | |
|                         self.org_name, repo.name, issue.number, body={"state": "closed"}
 | |
|                     )
 | |
| 
 | |
|     def archieve_all_repos(self) -> None:
 | |
|         for repo in list_all(self.organization_api.org_list_repos, self.org_name):
 | |
|             self.repository_api.repo_edit(
 | |
|                 self.org_name, repo.name, body={"archived": True}
 | |
|             )
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     gitea = Gitea()
 |