990 lines
38 KiB
Python
990 lines
38 KiB
Python
import re
|
|
from enum import Enum
|
|
from functools import lru_cache
|
|
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, cast
|
|
from urllib.parse import quote
|
|
|
|
import focs_gitea
|
|
import requests # type: ignore
|
|
from canvasapi.group import Group, GroupMembership
|
|
from canvasapi.paginated_list import PaginatedList
|
|
from canvasapi.user import User
|
|
from focs_gitea.rest import ApiException
|
|
|
|
from joint_teapot.config import settings
|
|
from joint_teapot.utils.logger import logger
|
|
from joint_teapot.utils.main import default_repo_name_convertor, first
|
|
|
|
|
|
class PermissionEnum(Enum):
|
|
read = "read"
|
|
write = "write"
|
|
admin = "admin"
|
|
|
|
|
|
def list_all(method: Callable[..., Any], *args: Any, **kwargs: Any) -> List[Any]:
|
|
"""Call paginated API methods repeatedly and collect results.
|
|
|
|
The exact return element types vary depending on the API client. We use
|
|
``Any`` here to avoid over-constraining typing for the external client.
|
|
"""
|
|
all_res: List[Any] = []
|
|
page = 1
|
|
while True:
|
|
res = method(*args, **kwargs, page=page)
|
|
if not res:
|
|
break
|
|
for item in res:
|
|
all_res.append(item)
|
|
page += 1
|
|
return all_res
|
|
|
|
|
|
def _get_color(c: Optional[str]) -> str:
|
|
if not c:
|
|
return "CCCCCC"
|
|
s = c.strip()
|
|
if s.startswith("#"):
|
|
s = s[1:]
|
|
if re.fullmatch(r"[0-9a-fA-F]{6}", s):
|
|
return s
|
|
logger.warning(f"Provided color '{c}' is not a valid hex; falling back to #CCCCCC")
|
|
return "CCCCCC"
|
|
|
|
|
|
def _label_id_from_obj(obj: Any) -> Optional[int]:
|
|
try:
|
|
if isinstance(obj, dict):
|
|
return obj.get("id")
|
|
return getattr(obj, "id", None)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _get_repo_labels(gitea: Any, repo_name: str) -> List[Any]:
|
|
try:
|
|
return list(
|
|
cast(
|
|
Iterable[Any],
|
|
gitea.issue_api.issue_list_labels(gitea.org_name, repo_name),
|
|
)
|
|
)
|
|
except ApiException as e:
|
|
logger.error(f"Failed to list labels for {repo_name}: {e}")
|
|
return []
|
|
|
|
|
|
def _list_issues_map(gitea: Any, repo_name: str) -> Dict[int, Any]:
|
|
try:
|
|
return {
|
|
issue.number: issue
|
|
for issue in list_all(
|
|
gitea.issue_api.issue_list_issues, gitea.org_name, repo_name
|
|
)
|
|
}
|
|
except ApiException as e:
|
|
logger.error(f"Failed to list issues for {repo_name}: {e}")
|
|
return {}
|
|
|
|
|
|
def _api_post_labels(gitea: Any, repo_name: str, issue_num: int, payload: Any) -> Any:
|
|
path = f"/repos/{gitea.org_name}/{repo_name}/issues/{issue_num}/labels"
|
|
full_url = f"{gitea.api_client.configuration.host}{path}"
|
|
token = (
|
|
getattr(gitea.api_client.configuration, "api_key", {}).get("access_token")
|
|
or settings.gitea_access_token
|
|
)
|
|
headers_local = {
|
|
"Authorization": f"token {token}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
return requests.post(full_url, headers=headers_local, json=payload, timeout=10)
|
|
|
|
|
|
def _patch_label_exclusive(
|
|
gitea: Any, repo_name: str, name: str, label_obj: Any
|
|
) -> None:
|
|
try:
|
|
existing_color = getattr(label_obj, "color", None) or (
|
|
label_obj.get("color") if isinstance(label_obj, dict) else None
|
|
)
|
|
if (
|
|
existing_color
|
|
and isinstance(existing_color, str)
|
|
and existing_color.startswith("#")
|
|
):
|
|
existing_color = existing_color[1:]
|
|
enc_name = quote(name, safe="")
|
|
path = f"/repos/{gitea.org_name}/{repo_name}/labels/{enc_name}"
|
|
url = f"{gitea.api_client.configuration.host}{path}"
|
|
token = (
|
|
getattr(gitea.api_client.configuration, "api_key", {}).get("access_token")
|
|
or settings.gitea_access_token
|
|
)
|
|
headers_local = {
|
|
"Authorization": f"token {token}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
payload = {"exclusive": True, "name": name}
|
|
if existing_color:
|
|
payload["color"] = existing_color
|
|
resp = requests.patch(url, headers=headers_local, json=payload, timeout=10)
|
|
if resp.status_code in (200, 201):
|
|
logger.info(f"Marked existing label '{name}' as exclusive in {repo_name}")
|
|
else:
|
|
logger.warning(
|
|
f"Failed to mark existing label '{name}' as exclusive: status={resp.status_code}"
|
|
)
|
|
except Exception as e:
|
|
logger.debug(f"Could not patch label exclusive for {name}: {e}")
|
|
|
|
|
|
def _delete_issue_label_by_id(
|
|
gitea: Any, repo_name: str, issue_num: int, lid: int
|
|
) -> None:
|
|
try:
|
|
path_id = f"/repos/{gitea.org_name}/{repo_name}/issues/{issue_num}/labels/{lid}"
|
|
url_id = f"{gitea.api_client.configuration.host}{path_id}"
|
|
token = (
|
|
getattr(gitea.api_client.configuration, "api_key", {}).get("access_token")
|
|
or settings.gitea_access_token
|
|
)
|
|
headers_local = {"Authorization": f"token {token}"}
|
|
resp = requests.delete(url_id, headers=headers_local, timeout=10)
|
|
if resp.status_code in (200, 204):
|
|
logger.info(f"Removed label id {lid} from {repo_name}#{issue_num}")
|
|
else:
|
|
logger.warning(
|
|
f"Failed to remove label id {lid} from {repo_name}#{issue_num}: status={resp.status_code}"
|
|
)
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Error removing label id {lid} from {repo_name}#{issue_num}: {e}"
|
|
)
|
|
|
|
|
|
def _create_label(
|
|
gitea: Any, repo_name: str, label_name: str, color: Optional[str], labels: List[Any]
|
|
) -> Optional[Any]:
|
|
for l in labels:
|
|
if getattr(l, "name", None) == label_name:
|
|
try:
|
|
is_ex = getattr(l, "exclusive", None)
|
|
if not bool(is_ex):
|
|
_patch_label_exclusive(gitea, repo_name, label_name, l)
|
|
except Exception:
|
|
return l
|
|
return l
|
|
|
|
chosen_color = _get_color(color)
|
|
try:
|
|
new = gitea.issue_api.issue_create_label(
|
|
gitea.org_name,
|
|
repo_name,
|
|
body={"name": label_name, "color": chosen_color, "exclusive": True},
|
|
)
|
|
logger.info(f"Created label '{label_name}' in {gitea.org_name}/{repo_name}")
|
|
return new
|
|
except ApiException as e:
|
|
logger.error(f"Failed to create label {label_name} in {repo_name}: {e}")
|
|
if hasattr(e, "body"):
|
|
logger.error(f"ApiException body: {getattr(e, 'body', None)}")
|
|
return None
|
|
|
|
|
|
def _extract_label_names(issue_obj: Any) -> List[str]:
|
|
try:
|
|
return [getattr(l, "name", l) for l in getattr(issue_obj, "labels", []) or []]
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
class Gitea:
|
|
def __init__(
|
|
self,
|
|
access_token: str = "", # nosec
|
|
org_name: str = "",
|
|
domain_name: str = "",
|
|
suffix: str = "",
|
|
):
|
|
access_token = access_token or settings.gitea_access_token
|
|
org_name = org_name or settings.gitea_org_name
|
|
domain_name = domain_name or settings.gitea_domain_name
|
|
suffix = suffix or settings.gitea_suffix
|
|
self.org_name = org_name
|
|
configuration = focs_gitea.Configuration()
|
|
configuration.api_key["access_token"] = access_token
|
|
configuration.host = f"https://{domain_name}{suffix}/api/v1"
|
|
configuration.debug = settings.gitea_debug
|
|
for v in configuration.logger.values():
|
|
v.handlers = []
|
|
self.api_client = focs_gitea.ApiClient(configuration)
|
|
self.admin_api = focs_gitea.AdminApi(self.api_client)
|
|
self.miscellaneous_api = focs_gitea.MiscellaneousApi(self.api_client)
|
|
self.organization_api = focs_gitea.OrganizationApi(self.api_client)
|
|
self.issue_api = focs_gitea.IssueApi(self.api_client)
|
|
self.repository_api = focs_gitea.RepositoryApi(self.api_client)
|
|
self.settings_api = focs_gitea.SettingsApi(self.api_client)
|
|
self.user_api = focs_gitea.UserApi(self.api_client)
|
|
logger.debug("Gitea initialized")
|
|
|
|
@lru_cache()
|
|
def _get_team_id_by_name(self, name: str) -> int:
|
|
res = self.organization_api.team_search(
|
|
self.org_name, q=str(name), limit=1
|
|
).to_dict()
|
|
if len(res["data"] or []) == 0:
|
|
raise Exception(
|
|
f"{name} not found by name in Gitea. Possible reason: you did not join this team."
|
|
)
|
|
return res["data"][0]["id"]
|
|
|
|
@lru_cache()
|
|
def _get_username_by_canvas_student(self, student: User) -> str:
|
|
if (
|
|
student.email is not None
|
|
and student.email.count("@") == 1
|
|
and student.email.endswith("@sjtu.edu.cn")
|
|
):
|
|
return student.email.split("@")[0]
|
|
raise Exception(f"Can not get username of {student}, an SJTU email is expected")
|
|
|
|
def add_canvas_students_to_teams(
|
|
self, students: PaginatedList, team_names: List[str]
|
|
) -> None:
|
|
for team_name in team_names:
|
|
team_id = self._get_team_id_by_name(team_name)
|
|
team_members = self.organization_api.org_list_team_members(team_id)
|
|
for student in students:
|
|
try:
|
|
username = self._get_username_by_canvas_student(student)
|
|
team_member = first(team_members, lambda x: x.login == username)
|
|
if team_member is None:
|
|
self.organization_api.org_add_team_member(team_id, username)
|
|
logger.info(f"{student} added to team {team_name}")
|
|
else:
|
|
team_members.remove(team_member)
|
|
logger.warning(f"{student} already in team {team_name}")
|
|
except Exception as e:
|
|
logger.error(e)
|
|
for team_member in team_members:
|
|
logger.error(
|
|
f"{team_member.full_name} found in team {team_name} "
|
|
+ "but not found in Canvas students"
|
|
)
|
|
|
|
def create_personal_repos_for_canvas_students(
|
|
self,
|
|
students: PaginatedList,
|
|
repo_name_convertor: Callable[
|
|
[User], Optional[str]
|
|
] = default_repo_name_convertor,
|
|
template: str = "",
|
|
) -> List[str]:
|
|
repo_names = []
|
|
for student in students:
|
|
repo_name = repo_name_convertor(student)
|
|
if repo_name is None:
|
|
continue
|
|
repo_names.append(repo_name)
|
|
try:
|
|
try:
|
|
if template == "":
|
|
body = {
|
|
"auto_init": False,
|
|
"default_branch": settings.default_branch,
|
|
"name": repo_name,
|
|
"private": True,
|
|
"template": False,
|
|
"trust_model": "default",
|
|
}
|
|
self.organization_api.create_org_repo(self.org_name, body=body)
|
|
else:
|
|
body = {
|
|
"default_branch": settings.default_branch,
|
|
"git_content": True,
|
|
"git_hooks": True,
|
|
"labels": True,
|
|
"name": repo_name,
|
|
"owner": self.org_name,
|
|
"private": True,
|
|
"protected_branch": True,
|
|
}
|
|
self.repository_api.generate_repo(
|
|
self.org_name, template, body=body
|
|
)
|
|
logger.info(
|
|
f"Personal repo {self.org_name}/{repo_name} for {student} created"
|
|
)
|
|
except ApiException as e:
|
|
if e.status == 409:
|
|
logger.warning(
|
|
f"Personal repo {self.org_name}/{repo_name} for {student} already exists"
|
|
)
|
|
else:
|
|
raise (e)
|
|
username = self._get_username_by_canvas_student(student)
|
|
self.repository_api.repo_add_collaborator(
|
|
self.org_name, repo_name, username
|
|
)
|
|
except Exception as e:
|
|
logger.error(e)
|
|
return repo_names
|
|
|
|
def create_teams_and_repos_by_canvas_groups(
|
|
self,
|
|
students: PaginatedList,
|
|
groups: PaginatedList,
|
|
team_name_convertor: Callable[[str], Optional[str]] = lambda name: name,
|
|
repo_name_convertor: Callable[[str], Optional[str]] = lambda name: name,
|
|
template: str = "",
|
|
permission: PermissionEnum = PermissionEnum.write,
|
|
) -> List[str]:
|
|
repo_names = []
|
|
teams = list_all(self.organization_api.org_list_teams, self.org_name)
|
|
repos = list_all(self.organization_api.org_list_repos, self.org_name)
|
|
group: Group
|
|
for group in groups:
|
|
team_name = team_name_convertor(group.name)
|
|
repo_name = repo_name_convertor(group.name)
|
|
if team_name is None or repo_name is None:
|
|
continue
|
|
team = first(teams, lambda team: team.name == team_name)
|
|
if team is None:
|
|
team = self.organization_api.org_create_team(
|
|
self.org_name,
|
|
body={
|
|
"can_create_org_repo": False,
|
|
"includes_all_repositories": False,
|
|
"name": team_name,
|
|
"permission": permission.value,
|
|
"units": [
|
|
"repo.code",
|
|
"repo.issues",
|
|
"repo.ext_issues",
|
|
"repo.wiki",
|
|
"repo.pulls",
|
|
"repo.releases",
|
|
"repo.projects",
|
|
"repo.ext_wiki",
|
|
],
|
|
},
|
|
)
|
|
logger.info(f"Team {team_name} created")
|
|
if first(repos, lambda repo: repo.name == repo_name) is None:
|
|
repo_names.append(repo_name)
|
|
if template == "":
|
|
self.organization_api.create_org_repo(
|
|
self.org_name,
|
|
body={
|
|
"auto_init": False,
|
|
"default_branch": settings.default_branch,
|
|
"name": repo_name,
|
|
"private": True,
|
|
"template": False,
|
|
"trust_model": "default",
|
|
},
|
|
)
|
|
else:
|
|
self.repository_api.generate_repo(
|
|
self.org_name,
|
|
template,
|
|
body={
|
|
"default_branch": settings.default_branch,
|
|
"git_content": True,
|
|
"git_hooks": True,
|
|
"labels": True,
|
|
"name": repo_name,
|
|
"owner": self.org_name,
|
|
"private": True,
|
|
"protected_branch": True,
|
|
},
|
|
)
|
|
logger.info(f"{self.org_name}/{team_name} created")
|
|
try:
|
|
self.organization_api.org_add_team_repository(
|
|
team.id, self.org_name, repo_name
|
|
)
|
|
except Exception as e:
|
|
logger.warning(e)
|
|
membership: GroupMembership
|
|
student_count = 0
|
|
for membership in group.get_memberships():
|
|
student = first(students, lambda s: s.id == membership.user_id)
|
|
student_count += 1
|
|
if student is None:
|
|
raise Exception(
|
|
f"student with user_id {membership.user_id} not found"
|
|
)
|
|
try:
|
|
username = self._get_username_by_canvas_student(student)
|
|
except Exception as e:
|
|
logger.warning(e)
|
|
continue
|
|
try:
|
|
self.organization_api.org_add_team_member(team.id, username)
|
|
self.repository_api.repo_add_collaborator(
|
|
self.org_name, repo_name, username
|
|
)
|
|
except Exception as e:
|
|
logger.error(e)
|
|
continue
|
|
try:
|
|
self.repository_api.repo_delete_branch_protection(
|
|
self.org_name, repo_name, settings.default_branch
|
|
)
|
|
except ApiException as e:
|
|
if e.status != 404:
|
|
raise
|
|
try:
|
|
self.repository_api.repo_create_branch_protection(
|
|
self.org_name,
|
|
repo_name,
|
|
body={
|
|
"block_on_official_review_requests": True,
|
|
"block_on_outdated_branch": True,
|
|
"block_on_rejected_reviews": True,
|
|
"branch_name": settings.default_branch,
|
|
"dismiss_stale_approvals": True,
|
|
"enable_approvals_whitelist": False,
|
|
"enable_merge_whitelist": False,
|
|
"enable_push": True,
|
|
"enable_push_whitelist": True,
|
|
"merge_whitelist_teams": [],
|
|
"merge_whitelist_usernames": [],
|
|
"protected_file_patterns": "",
|
|
"push_whitelist_deploy_keys": False,
|
|
"push_whitelist_teams": ["Owners"],
|
|
"push_whitelist_usernames": [],
|
|
"require_signed_commits": False,
|
|
"required_approvals": max(student_count - 1, 0),
|
|
"enable_status_check": True,
|
|
"status_check_contexts": ["Run JOJ3 on Push / run (push)"],
|
|
},
|
|
)
|
|
except ApiException as e:
|
|
if e.status != 404:
|
|
raise
|
|
logger.info(f"{self.org_name}/{repo_name} jobs done")
|
|
return repo_names
|
|
|
|
def get_public_key_of_canvas_students(
|
|
self, students: PaginatedList
|
|
) -> Dict[str, List[str]]:
|
|
res = {}
|
|
for student in students:
|
|
try:
|
|
username = self._get_username_by_canvas_student(student)
|
|
keys = [
|
|
item.key
|
|
for item in list_all(self.user_api.user_list_keys, username)
|
|
]
|
|
if not keys:
|
|
logger.info(f"{student} has not uploaded ssh keys to gitea")
|
|
continue
|
|
res[student.login_id] = keys
|
|
except Exception as e:
|
|
logger.error(e)
|
|
return res
|
|
|
|
def get_repo_releases(self, repo_name: str) -> List[Any]:
|
|
try:
|
|
args = self.repository_api.repo_list_releases, self.org_name, repo_name
|
|
return list_all(*args)
|
|
except ApiException as e:
|
|
if e.status != 404:
|
|
raise
|
|
return []
|
|
|
|
def get_all_repo_names(self) -> List[str]:
|
|
return [
|
|
data.name
|
|
for data in list_all(self.organization_api.org_list_repos, self.org_name)
|
|
]
|
|
|
|
def get_no_collaborator_repos(self) -> List[str]:
|
|
res = []
|
|
for data in list_all(self.organization_api.org_list_repos, self.org_name):
|
|
collaborators = self.repository_api.repo_list_collaborators(
|
|
self.org_name, data.name
|
|
)
|
|
if collaborators:
|
|
continue
|
|
logger.info(f"{self.org_name}/{data.name} has no collaborators")
|
|
res.append(data.name)
|
|
return res
|
|
|
|
def get_repos_status(self) -> Dict[str, Tuple[int, int]]:
|
|
res = {}
|
|
for repo in list_all(self.organization_api.org_list_repos, self.org_name):
|
|
commits = []
|
|
issues = []
|
|
try:
|
|
commits = self.repository_api.repo_get_all_commits(
|
|
self.org_name, repo.name
|
|
)
|
|
except ApiException as e:
|
|
if e.status != 409:
|
|
raise
|
|
issues = self.issue_api.issue_list_issues(
|
|
self.org_name, repo.name, state="all"
|
|
)
|
|
# if not commits:
|
|
# logger.info(f"{self.org_name}/{repo.name} has no commits")
|
|
res[repo.name] = (len(commits), len(issues))
|
|
return res
|
|
|
|
def create_issue(
|
|
self,
|
|
repo_name: str,
|
|
title: str,
|
|
body: str,
|
|
assign_every_collaborators: bool = True,
|
|
milestone: str = "",
|
|
labels: list[str] = [],
|
|
) -> None:
|
|
assignees = []
|
|
if assign_every_collaborators:
|
|
assignees = [
|
|
item.login
|
|
for item in list_all(
|
|
self.repository_api.repo_list_collaborators,
|
|
self.org_name,
|
|
repo_name,
|
|
)
|
|
]
|
|
milestone_id = None
|
|
if milestone:
|
|
milestone_list = self.issue_api.issue_get_milestones_list(
|
|
self.org_name, repo_name
|
|
)
|
|
if milestone not in [m.title for m in milestone_list]:
|
|
logger.warning(f"Milestone {milestone} does not exist in {repo_name}")
|
|
else:
|
|
milestone_id = first(
|
|
[m.id for m in milestone_list if m.title == milestone]
|
|
)
|
|
labels_id = []
|
|
if labels:
|
|
labels_list = self.issue_api.issue_list_labels(self.org_name, repo_name)
|
|
labels_id = [l.id for l in labels_list if l.name in labels]
|
|
if not labels_id:
|
|
logger.warning(f"no label matches {labels}")
|
|
self.issue_api.issue_create_issue(
|
|
self.org_name,
|
|
repo_name,
|
|
body={
|
|
"title": title,
|
|
"body": body,
|
|
"assignees": assignees,
|
|
"milestone": milestone_id,
|
|
"labels": labels_id,
|
|
},
|
|
)
|
|
logger.info(f'Created issue "{title}" in {repo_name}')
|
|
|
|
def create_comment(
|
|
self,
|
|
repo_name: str,
|
|
index: int,
|
|
body: str,
|
|
) -> None:
|
|
self.issue_api.issue_create_comment(
|
|
self.org_name,
|
|
repo_name,
|
|
index,
|
|
body={"body": body},
|
|
)
|
|
logger.info(f"Created comment in {repo_name}/issues/{index}")
|
|
|
|
def create_milestone(
|
|
self,
|
|
repo_name: str,
|
|
title: str,
|
|
description: str,
|
|
due_on: str,
|
|
) -> None:
|
|
if due_on == "":
|
|
self.issue_api.issue_create_milestone(
|
|
self.org_name,
|
|
repo_name,
|
|
body={"title": title, "description": description},
|
|
)
|
|
return
|
|
self.issue_api.issue_create_milestone(
|
|
self.org_name,
|
|
repo_name,
|
|
body={
|
|
"title": title,
|
|
"description": description,
|
|
"due_on": due_on + "T23:59:59.999+08:00",
|
|
},
|
|
)
|
|
|
|
def check_exist_issue_by_title(self, repo_name: str, title: str) -> bool:
|
|
for issue in list_all(
|
|
self.issue_api.issue_list_issues, self.org_name, repo_name
|
|
):
|
|
if issue.title == title:
|
|
return True
|
|
return False
|
|
|
|
def close_all_issues(self) -> None:
|
|
for repo_name in self.get_all_repo_names():
|
|
issues = list_all(
|
|
self.issue_api.issue_list_issues, self.org_name, repo_name
|
|
)
|
|
for issue in issues:
|
|
if issue.state != "closed":
|
|
self.issue_api.issue_edit_issue(
|
|
self.org_name, repo_name, issue.number, body={"state": "closed"}
|
|
)
|
|
|
|
def close_issues(
|
|
self, repo_name: str, issue_numbers: List[int], dry_run: bool = False
|
|
) -> None:
|
|
if not issue_numbers:
|
|
logger.warning("No issue numbers provided to close")
|
|
return
|
|
if dry_run:
|
|
logger.info("Dry run enabled. No changes will be made to issues.")
|
|
try:
|
|
issues = {
|
|
issue.number: issue
|
|
for issue in list_all(
|
|
self.issue_api.issue_list_issues, self.org_name, repo_name
|
|
)
|
|
}
|
|
except ApiException as e:
|
|
logger.error(f"Failed to list issues for {repo_name}: {e}")
|
|
return
|
|
|
|
for num in issue_numbers:
|
|
issue = issues.get(num)
|
|
if issue is None:
|
|
logger.warning(f"Issue #{num} not found in {repo_name}")
|
|
continue
|
|
if getattr(issue, "state", "") == "closed":
|
|
logger.info(f"Issue #{num} in {repo_name} already closed")
|
|
continue
|
|
try:
|
|
if dry_run:
|
|
logger.info(f"Would close issue #{num} in {repo_name} (dry run)")
|
|
continue
|
|
self.issue_api.issue_edit_issue(
|
|
self.org_name, repo_name, num, body={"state": "closed"}
|
|
)
|
|
logger.info(f"Closed issue #{num} in {repo_name}")
|
|
except ApiException as e:
|
|
logger.error(f"Failed to close issue #{num} in {repo_name}: {e}")
|
|
|
|
def archive_repos(self, regex: str = ".+", dry_run: bool = True) -> None:
|
|
if dry_run:
|
|
logger.info("Dry run enabled. No changes will be made to the repositories.")
|
|
logger.info(f"Archiving repos with name matching {regex}")
|
|
for repo_name in self.get_all_repo_names():
|
|
if re.fullmatch(regex, repo_name):
|
|
logger.info(f"Archived {repo_name}")
|
|
if not dry_run:
|
|
self.repository_api.repo_edit(
|
|
self.org_name, repo_name, body={"archived": True}
|
|
)
|
|
|
|
def unwatch_all_repos(self) -> None:
|
|
for repo in list_all(self.organization_api.org_list_repos, self.org_name):
|
|
self.repository_api.user_current_delete_subscription(
|
|
self.org_name, repo.name
|
|
)
|
|
|
|
def get_all_teams(self) -> Dict[str, List[str]]:
|
|
res: Dict[str, List[str]] = {}
|
|
for team in list_all(self.organization_api.org_list_teams, self.org_name):
|
|
if team.name == "Owners":
|
|
continue
|
|
team_id = team.id
|
|
try:
|
|
members = [
|
|
m.login.lower()
|
|
for m in self.organization_api.org_list_team_members(team_id)
|
|
]
|
|
except ApiException as e:
|
|
logger.warning(
|
|
f"Failed to get members of team {team_id} in {self.org_name}: {e}"
|
|
)
|
|
continue
|
|
res[team.name] = members
|
|
return res
|
|
|
|
def unsubscribe_from_repos(self, pattern: str) -> None:
|
|
subscriptions = [
|
|
sub
|
|
for sub in self.user_api.user_current_list_subscriptions()
|
|
if sub.owner.login == self.org_name
|
|
and re.search(pattern, sub.name) is not None
|
|
]
|
|
if len(subscriptions) == 0:
|
|
logger.warning(f"No subscribed repo matches the pattern {pattern}")
|
|
return
|
|
logger.info(
|
|
f"{len(subscriptions)} subscriptions match the pattern {pattern}: {[s.name for s in subscriptions]}"
|
|
)
|
|
for sub in subscriptions:
|
|
self.repository_api.user_current_delete_subscription(
|
|
self.org_name, sub.name
|
|
)
|
|
logger.info(f"Unsubscribed from {sub.name}")
|
|
|
|
def create_milestones(
|
|
self, milestone: str, regex: str, due_date: str, description: str
|
|
) -> None:
|
|
for repo_name in self.get_all_repo_names():
|
|
if not re.fullmatch(regex, repo_name):
|
|
continue
|
|
milestone_list = self.issue_api.issue_get_milestones_list(
|
|
self.org_name, repo_name
|
|
)
|
|
if milestone in [m.title for m in milestone_list]:
|
|
logger.warning(f"Milestone {milestone} already exists in {repo_name}")
|
|
continue
|
|
self.create_milestone(repo_name, milestone, description, due_date)
|
|
logger.info(f"Created milestone {milestone} in {repo_name}")
|
|
|
|
def label_issues(
|
|
self,
|
|
repo_name: str,
|
|
label_name: str,
|
|
issue_numbers: List[int],
|
|
color: Optional[str] = None,
|
|
) -> None:
|
|
if not issue_numbers:
|
|
logger.warning("No issue numbers provided to label")
|
|
return
|
|
|
|
labels = _get_repo_labels(self, repo_name)
|
|
if not labels:
|
|
logger.warning(f"No labels found for {repo_name}")
|
|
return
|
|
|
|
repo_label_name_to_obj: Dict[str, Any] = {}
|
|
for l in labels:
|
|
name = getattr(l, "name", None) or (
|
|
l.get("name") if isinstance(l, dict) else None
|
|
)
|
|
if isinstance(name, str):
|
|
repo_label_name_to_obj[name] = l
|
|
|
|
label_obj = _create_label(self, repo_name, label_name, color, labels)
|
|
if label_obj is None:
|
|
logger.error(f"Unable to ensure label '{label_name}' exists in {repo_name}")
|
|
return
|
|
|
|
label_id = _label_id_from_obj(label_obj)
|
|
if label_id is None:
|
|
logger.error(f"Unable to find id of label '{label_name}' in {repo_name}")
|
|
return
|
|
issues_map = _list_issues_map(self, repo_name)
|
|
if not issues_map:
|
|
return
|
|
|
|
for num in issue_numbers:
|
|
issue = issues_map.get(num)
|
|
if issue is None:
|
|
logger.warning(f"Issue #{num} not found in {repo_name}")
|
|
continue
|
|
|
|
existing_label_names = _extract_label_names(issue)
|
|
if label_name in existing_label_names:
|
|
logger.info(
|
|
f"Issue #{num} in {repo_name} already has label '{label_name}'"
|
|
)
|
|
continue
|
|
|
|
try:
|
|
current = self.issue_api.issue_get_issue(self.org_name, repo_name, num)
|
|
issue_labels = _extract_label_names(current)
|
|
except Exception:
|
|
issue_labels = existing_label_names
|
|
|
|
try:
|
|
target_obj = repo_label_name_to_obj.get(label_name) or label_obj
|
|
target_is_exclusive = bool(
|
|
getattr(target_obj, "exclusive", None)
|
|
or (
|
|
target_obj.get("exclusive")
|
|
if isinstance(target_obj, dict)
|
|
else False
|
|
)
|
|
)
|
|
except Exception:
|
|
target_is_exclusive = False
|
|
|
|
if target_is_exclusive:
|
|
for lname in issue_labels or []:
|
|
if lname == label_name:
|
|
continue
|
|
lobj = repo_label_name_to_obj.get(lname)
|
|
is_ex = False
|
|
if lobj is not None:
|
|
is_ex = bool(
|
|
getattr(lobj, "exclusive", None)
|
|
or (
|
|
lobj.get("exclusive")
|
|
if isinstance(lobj, dict)
|
|
else False
|
|
)
|
|
)
|
|
if is_ex:
|
|
lid = _label_id_from_obj(lobj) if lobj is not None else None
|
|
if lid is not None:
|
|
_delete_issue_label_by_id(self, repo_name, num, lid)
|
|
|
|
if label_name not in (issue_labels or []):
|
|
try:
|
|
resp = _api_post_labels(
|
|
self, repo_name, num, {"labels": [label_name]}
|
|
)
|
|
if getattr(resp, "status_code", None) not in (200, 201):
|
|
logger.error(
|
|
f"Failed to add label via add-labels endpoint for issue #{num}: status={getattr(resp, 'status_code', None)}"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to POST labels to issue #{num}: {e}")
|
|
|
|
# verification
|
|
try:
|
|
final = self.issue_api.issue_get_issue(self.org_name, repo_name, num)
|
|
final_labels = _extract_label_names(final)
|
|
except Exception:
|
|
final_labels = []
|
|
if label_name in (final_labels or []):
|
|
logger.info(
|
|
f"Label '{label_name}' attached to issue #{num} in {repo_name}"
|
|
)
|
|
else:
|
|
logger.warning(
|
|
f"Label '{label_name}' not attached to issue #{num} in {repo_name} after attempts"
|
|
)
|
|
|
|
def delete_label(
|
|
self,
|
|
repo_name: str,
|
|
label_name: str,
|
|
issue_numbers: Optional[List[int]] = None,
|
|
delete_repo_label: bool = False,
|
|
) -> None:
|
|
token = (
|
|
getattr(self.api_client.configuration, "api_key", {}).get("access_token")
|
|
or settings.gitea_access_token
|
|
)
|
|
headers_local = {"Authorization": f"token {token}"}
|
|
repo_labels: List[Any] = []
|
|
try:
|
|
repo_labels = list(
|
|
cast(
|
|
Iterable[Any],
|
|
self.issue_api.issue_list_labels(self.org_name, repo_name),
|
|
)
|
|
)
|
|
label_name_to_id: Dict[str, int] = {}
|
|
for l in repo_labels:
|
|
name = getattr(l, "name", None) or (
|
|
l.get("name") if isinstance(l, dict) else None
|
|
)
|
|
lid = getattr(l, "id", None) or (
|
|
l.get("id") if isinstance(l, dict) else None
|
|
)
|
|
if isinstance(name, str) and isinstance(lid, int):
|
|
label_name_to_id[name] = lid
|
|
except Exception:
|
|
label_name_to_id = {}
|
|
|
|
def _delete_issue_label(issue_num: int) -> None:
|
|
lid = label_name_to_id.get(label_name)
|
|
if lid is None:
|
|
try:
|
|
for l in repo_labels:
|
|
name = getattr(l, "name", None) or (
|
|
l.get("name") if isinstance(l, dict) else None
|
|
)
|
|
if name == label_name:
|
|
lid = getattr(l, "id", None) or (
|
|
l.get("id") if isinstance(l, dict) else None
|
|
)
|
|
break
|
|
except Exception:
|
|
logger.warning(
|
|
f"Could not determine id of label '{label_name}' in {repo_name}"
|
|
)
|
|
|
|
if lid is None:
|
|
logger.warning(
|
|
f"No numeric id found for label '{label_name}' in {repo_name}; skipping issue-level delete for issue #{issue_num}"
|
|
)
|
|
return
|
|
|
|
path_id = (
|
|
f"/repos/{self.org_name}/{repo_name}/issues/{issue_num}/labels/{lid}"
|
|
)
|
|
url_id = f"{self.api_client.configuration.host}{path_id}"
|
|
try:
|
|
resp = requests.delete(url_id, headers=headers_local, timeout=10)
|
|
if resp.status_code in (200, 204):
|
|
logger.info(
|
|
f"Removed label '{label_name}' from {repo_name}#{issue_num}"
|
|
)
|
|
return
|
|
logger.warning(
|
|
f"Numeric-id DELETE returned status {resp.status_code} for {repo_name}#{issue_num}: body={getattr(resp, 'text', None)}"
|
|
)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Numeric-id DELETE error for {repo_name}#{issue_num}: {e}"
|
|
)
|
|
|
|
def _delete_repo_label() -> None:
|
|
enc_name = quote(label_name, safe="")
|
|
path = f"/repos/{self.org_name}/{repo_name}/labels/{enc_name}"
|
|
url = f"{self.api_client.configuration.host}{path}"
|
|
try:
|
|
resp = requests.delete(url, headers=headers_local, timeout=10)
|
|
if resp.status_code in (200, 204):
|
|
logger.info(
|
|
f"Removed repo-level label '{label_name}' from {repo_name}"
|
|
)
|
|
return
|
|
logger.error(
|
|
f"Failed to delete repo label '{label_name}' from {repo_name}: status={resp.status_code}, body={getattr(resp, 'text', None)}"
|
|
)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error deleting repo label '{label_name}' from {repo_name}: {e}"
|
|
)
|
|
|
|
if issue_numbers:
|
|
try:
|
|
issues = {
|
|
issue.number: issue
|
|
for issue in list_all(
|
|
self.issue_api.issue_list_issues, self.org_name, repo_name
|
|
)
|
|
}
|
|
except ApiException as e:
|
|
logger.error(f"Failed to list issues for {repo_name}: {e}")
|
|
return
|
|
for num in issue_numbers:
|
|
if num not in issues:
|
|
logger.warning(f"Issue #{num} not found in {repo_name}")
|
|
continue
|
|
_delete_issue_label(num)
|
|
else:
|
|
if delete_repo_label:
|
|
_delete_repo_label()
|
|
else:
|
|
logger.warning(
|
|
"No issue numbers provided and --repo not set; nothing to do for delete_label"
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
gitea = Gitea()
|