import re from enum import Enum from functools import lru_cache from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, cast from urllib.parse import quote import focs_gitea import requests # type: ignore from canvasapi.group import Group, GroupMembership from canvasapi.paginated_list import PaginatedList from canvasapi.user import User from focs_gitea.rest import ApiException from joint_teapot.config import settings from joint_teapot.utils.logger import logger from joint_teapot.utils.main import default_repo_name_convertor, first class PermissionEnum(Enum): read = "read" write = "write" admin = "admin" def list_all(method: Callable[..., Any], *args: Any, **kwargs: Any) -> List[Any]: """Call paginated API methods repeatedly and collect results. The exact return element types vary depending on the API client. We use ``Any`` here to avoid over-constraining typing for the external client. """ all_res: List[Any] = [] page = 1 while True: res = method(*args, **kwargs, page=page) if not res: break for item in res: all_res.append(item) page += 1 return all_res class Gitea: def __init__( self, access_token: str = "", # nosec org_name: str = "", domain_name: str = "", suffix: str = "", ): access_token = access_token or settings.gitea_access_token org_name = org_name or settings.gitea_org_name domain_name = domain_name or settings.gitea_domain_name suffix = suffix or settings.gitea_suffix self.org_name = org_name configuration = focs_gitea.Configuration() configuration.api_key["access_token"] = access_token configuration.host = f"https://{domain_name}{suffix}/api/v1" configuration.debug = settings.gitea_debug for v in configuration.logger.values(): v.handlers = [] self.api_client = focs_gitea.ApiClient(configuration) self.admin_api = focs_gitea.AdminApi(self.api_client) self.miscellaneous_api = focs_gitea.MiscellaneousApi(self.api_client) self.organization_api = focs_gitea.OrganizationApi(self.api_client) self.issue_api = focs_gitea.IssueApi(self.api_client) self.repository_api = focs_gitea.RepositoryApi(self.api_client) self.settings_api = focs_gitea.SettingsApi(self.api_client) self.user_api = focs_gitea.UserApi(self.api_client) logger.debug("Gitea initialized") @lru_cache() def _get_team_id_by_name(self, name: str) -> int: res = self.organization_api.team_search( self.org_name, q=str(name), limit=1 ).to_dict() if len(res["data"] or []) == 0: raise Exception( f"{name} not found by name in Gitea. Possible reason: you did not join this team." ) return res["data"][0]["id"] @lru_cache() def _get_username_by_canvas_student(self, student: User) -> str: if ( student.email is not None and student.email.count("@") == 1 and student.email.endswith("@sjtu.edu.cn") ): return student.email.split("@")[0] raise Exception(f"Can not get username of {student}, an SJTU email is expected") def add_canvas_students_to_teams( self, students: PaginatedList, team_names: List[str] ) -> None: for team_name in team_names: team_id = self._get_team_id_by_name(team_name) team_members = self.organization_api.org_list_team_members(team_id) for student in students: try: username = self._get_username_by_canvas_student(student) team_member = first(team_members, lambda x: x.login == username) if team_member is None: self.organization_api.org_add_team_member(team_id, username) logger.info(f"{student} added to team {team_name}") else: team_members.remove(team_member) logger.warning(f"{student} already in team {team_name}") except Exception as e: logger.error(e) for team_member in team_members: logger.error( f"{team_member.full_name} found in team {team_name} " + "but not found in Canvas students" ) def create_personal_repos_for_canvas_students( self, students: PaginatedList, repo_name_convertor: Callable[ [User], Optional[str] ] = default_repo_name_convertor, template: str = "", ) -> List[str]: repo_names = [] for student in students: repo_name = repo_name_convertor(student) if repo_name is None: continue repo_names.append(repo_name) try: try: if template == "": body = { "auto_init": False, "default_branch": settings.default_branch, "name": repo_name, "private": True, "template": False, "trust_model": "default", } self.organization_api.create_org_repo(self.org_name, body=body) else: body = { "default_branch": settings.default_branch, "git_content": True, "git_hooks": True, "labels": True, "name": repo_name, "owner": self.org_name, "private": True, "protected_branch": True, } self.repository_api.generate_repo( self.org_name, template, body=body ) logger.info( f"Personal repo {self.org_name}/{repo_name} for {student} created" ) except ApiException as e: if e.status == 409: logger.warning( f"Personal repo {self.org_name}/{repo_name} for {student} already exists" ) else: raise (e) username = self._get_username_by_canvas_student(student) self.repository_api.repo_add_collaborator( self.org_name, repo_name, username ) except Exception as e: logger.error(e) return repo_names def create_teams_and_repos_by_canvas_groups( self, students: PaginatedList, groups: PaginatedList, team_name_convertor: Callable[[str], Optional[str]] = lambda name: name, repo_name_convertor: Callable[[str], Optional[str]] = lambda name: name, template: str = "", permission: PermissionEnum = PermissionEnum.write, ) -> List[str]: repo_names = [] teams = list_all(self.organization_api.org_list_teams, self.org_name) repos = list_all(self.organization_api.org_list_repos, self.org_name) group: Group for group in groups: team_name = team_name_convertor(group.name) repo_name = repo_name_convertor(group.name) if team_name is None or repo_name is None: continue team = first(teams, lambda team: team.name == team_name) if team is None: team = self.organization_api.org_create_team( self.org_name, body={ "can_create_org_repo": False, "includes_all_repositories": False, "name": team_name, "permission": permission.value, "units": [ "repo.code", "repo.issues", "repo.ext_issues", "repo.wiki", "repo.pulls", "repo.releases", "repo.projects", "repo.ext_wiki", ], }, ) logger.info(f"Team {team_name} created") if first(repos, lambda repo: repo.name == repo_name) is None: repo_names.append(repo_name) if template == "": self.organization_api.create_org_repo( self.org_name, body={ "auto_init": False, "default_branch": settings.default_branch, "name": repo_name, "private": True, "template": False, "trust_model": "default", }, ) else: self.repository_api.generate_repo( self.org_name, template, body={ "default_branch": settings.default_branch, "git_content": True, "git_hooks": True, "labels": True, "name": repo_name, "owner": self.org_name, "private": True, "protected_branch": True, }, ) logger.info(f"{self.org_name}/{team_name} created") try: self.organization_api.org_add_team_repository( team.id, self.org_name, repo_name ) except Exception as e: logger.warning(e) membership: GroupMembership student_count = 0 for membership in group.get_memberships(): student = first(students, lambda s: s.id == membership.user_id) student_count += 1 if student is None: raise Exception( f"student with user_id {membership.user_id} not found" ) try: username = self._get_username_by_canvas_student(student) except Exception as e: logger.warning(e) continue try: self.organization_api.org_add_team_member(team.id, username) self.repository_api.repo_add_collaborator( self.org_name, repo_name, username ) except Exception as e: logger.error(e) continue try: self.repository_api.repo_delete_branch_protection( self.org_name, repo_name, settings.default_branch ) except ApiException as e: if e.status != 404: raise try: self.repository_api.repo_create_branch_protection( self.org_name, repo_name, body={ "block_on_official_review_requests": True, "block_on_outdated_branch": True, "block_on_rejected_reviews": True, "branch_name": settings.default_branch, "dismiss_stale_approvals": True, "enable_approvals_whitelist": False, "enable_merge_whitelist": False, "enable_push": True, "enable_push_whitelist": True, "merge_whitelist_teams": [], "merge_whitelist_usernames": [], "protected_file_patterns": "", "push_whitelist_deploy_keys": False, "push_whitelist_teams": ["Owners"], "push_whitelist_usernames": [], "require_signed_commits": False, "required_approvals": max(student_count - 1, 0), "enable_status_check": True, "status_check_contexts": ["Run JOJ3 on Push / run (push)"], }, ) except ApiException as e: if e.status != 404: raise logger.info(f"{self.org_name}/{repo_name} jobs done") return repo_names def get_public_key_of_canvas_students( self, students: PaginatedList ) -> Dict[str, List[str]]: res = {} for student in students: try: username = self._get_username_by_canvas_student(student) keys = [ item.key for item in list_all(self.user_api.user_list_keys, username) ] if not keys: logger.info(f"{student} has not uploaded ssh keys to gitea") continue res[student.login_id] = keys except Exception as e: logger.error(e) return res def get_repo_releases(self, repo_name: str) -> List[Any]: try: args = self.repository_api.repo_list_releases, self.org_name, repo_name return list_all(*args) except ApiException as e: if e.status != 404: raise return [] def get_all_repo_names(self) -> List[str]: return [ data.name for data in list_all(self.organization_api.org_list_repos, self.org_name) ] def get_no_collaborator_repos(self) -> List[str]: res = [] for data in list_all(self.organization_api.org_list_repos, self.org_name): collaborators = self.repository_api.repo_list_collaborators( self.org_name, data.name ) if collaborators: continue logger.info(f"{self.org_name}/{data.name} has no collaborators") res.append(data.name) return res def get_repos_status(self) -> Dict[str, Tuple[int, int]]: res = {} for repo in list_all(self.organization_api.org_list_repos, self.org_name): commits = [] issues = [] try: commits = self.repository_api.repo_get_all_commits( self.org_name, repo.name ) except ApiException as e: if e.status != 409: raise issues = self.issue_api.issue_list_issues( self.org_name, repo.name, state="all" ) # if not commits: # logger.info(f"{self.org_name}/{repo.name} has no commits") res[repo.name] = (len(commits), len(issues)) return res def create_issue( self, repo_name: str, title: str, body: str, assign_every_collaborators: bool = True, milestone: str = "", labels: list[str] = [], ) -> None: assignees = [] if assign_every_collaborators: assignees = [ item.login for item in list_all( self.repository_api.repo_list_collaborators, self.org_name, repo_name, ) ] milestone_id = None if milestone: milestone_list = self.issue_api.issue_get_milestones_list( self.org_name, repo_name ) if milestone not in [m.title for m in milestone_list]: logger.warning(f"Milestone {milestone} does not exist in {repo_name}") else: milestone_id = first( [m.id for m in milestone_list if m.title == milestone] ) labels_id = [] if labels: labels_list = self.issue_api.issue_list_labels(self.org_name, repo_name) labels_id = [l.id for l in labels_list if l.name in labels] if not labels_id: logger.warning(f"no label matches {labels}") self.issue_api.issue_create_issue( self.org_name, repo_name, body={ "title": title, "body": body, "assignees": assignees, "milestone": milestone_id, "labels": labels_id, }, ) logger.info(f'Created issue "{title}" in {repo_name}') def create_comment( self, repo_name: str, index: int, body: str, ) -> None: self.issue_api.issue_create_comment( self.org_name, repo_name, index, body={"body": body}, ) logger.info(f"Created comment in {repo_name}/issues/{index}") def create_milestone( self, repo_name: str, title: str, description: str, due_on: str, ) -> None: if due_on == "": self.issue_api.issue_create_milestone( self.org_name, repo_name, body={"title": title, "description": description}, ) return self.issue_api.issue_create_milestone( self.org_name, repo_name, body={ "title": title, "description": description, "due_on": due_on + "T23:59:59.999+08:00", }, ) def check_exist_issue_by_title(self, repo_name: str, title: str) -> bool: for issue in list_all( self.issue_api.issue_list_issues, self.org_name, repo_name ): if issue.title == title: return True return False def close_all_issues(self) -> None: for repo_name in self.get_all_repo_names(): issues = list_all( self.issue_api.issue_list_issues, self.org_name, repo_name ) for issue in issues: if issue.state != "closed": self.issue_api.issue_edit_issue( self.org_name, repo_name, issue.number, body={"state": "closed"} ) def close_issues( self, repo_name: str, issue_numbers: List[int], dry_run: bool = True ) -> None: if not issue_numbers: logger.warning("No issue numbers provided to close") return if dry_run: logger.info("Dry run enabled. No changes will be made to issues.") try: issues = { issue.number: issue for issue in list_all( self.issue_api.issue_list_issues, self.org_name, repo_name ) } except ApiException as e: logger.error(f"Failed to list issues for {repo_name}: {e}") return for num in issue_numbers: issue = issues.get(num) if issue is None: logger.warning(f"Issue #{num} not found in {repo_name}") continue if getattr(issue, "state", "") == "closed": logger.info(f"Issue #{num} in {repo_name} already closed") continue try: if dry_run: logger.info(f"Would close issue #{num} in {repo_name} (dry run)") continue self.issue_api.issue_edit_issue( self.org_name, repo_name, num, body={"state": "closed"} ) logger.info(f"Closed issue #{num} in {repo_name}") except ApiException as e: logger.error(f"Failed to close issue #{num} in {repo_name}: {e}") def archive_repos(self, regex: str = ".+", dry_run: bool = True) -> None: if dry_run: logger.info("Dry run enabled. No changes will be made to the repositories.") logger.info(f"Archiving repos with name matching {regex}") for repo_name in self.get_all_repo_names(): if re.fullmatch(regex, repo_name): logger.info(f"Archived {repo_name}") if not dry_run: self.repository_api.repo_edit( self.org_name, repo_name, body={"archived": True} ) def unwatch_all_repos(self) -> None: for repo in list_all(self.organization_api.org_list_repos, self.org_name): self.repository_api.user_current_delete_subscription( self.org_name, repo.name ) def get_all_teams(self) -> Dict[str, List[str]]: res: Dict[str, List[str]] = {} for team in list_all(self.organization_api.org_list_teams, self.org_name): if team.name == "Owners": continue team_id = team.id try: members = [ m.login.lower() for m in self.organization_api.org_list_team_members(team_id) ] except ApiException as e: logger.warning( f"Failed to get members of team {team_id} in {self.org_name}: {e}" ) continue res[team.name] = members return res def unsubscribe_from_repos(self, pattern: str) -> None: subscriptions = [ sub for sub in self.user_api.user_current_list_subscriptions() if sub.owner.login == self.org_name and re.search(pattern, sub.name) is not None ] if len(subscriptions) == 0: logger.warning(f"No subscribed repo matches the pattern {pattern}") return logger.info( f"{len(subscriptions)} subscriptions match the pattern {pattern}: {[s.name for s in subscriptions]}" ) for sub in subscriptions: self.repository_api.user_current_delete_subscription( self.org_name, sub.name ) logger.info(f"Unsubscribed from {sub.name}") def create_milestones( self, milestone: str, regex: str, due_date: str, description: str ) -> None: for repo_name in self.get_all_repo_names(): if not re.fullmatch(regex, repo_name): continue milestone_list = self.issue_api.issue_get_milestones_list( self.org_name, repo_name ) if milestone in [m.title for m in milestone_list]: logger.warning(f"Milestone {milestone} already exists in {repo_name}") continue self.create_milestone(repo_name, milestone, description, due_date) logger.info(f"Created milestone {milestone} in {repo_name}") def label_issues( self, repo_name: str, label_name: str, issue_numbers: List[int], color: Optional[str] = None, ) -> None: if not issue_numbers: logger.warning("No issue numbers provided to label") return try: labels = list( cast( Iterable[Any], self.issue_api.issue_list_labels(self.org_name, repo_name), ) ) except ApiException as e: logger.error(f"Failed to list labels for {repo_name}: {e}") return # Build a lookup for repo labels by name to read metadata like 'exclusive' repo_label_name_to_obj: Dict[str, Any] = {} try: for l in labels: name = getattr(l, "name", None) or ( l.get("name") if isinstance(l, dict) else None ) if isinstance(name, str): repo_label_name_to_obj[name] = l except Exception: repo_label_name_to_obj = {} label_obj = None for l in labels: if getattr(l, "name", None) == label_name: label_obj = l break if not label_obj: chosen_color = None if color: c = color.strip() if c.startswith("#"): c = c[1:] if re.fullmatch(r"[0-9a-fA-F]{6}", c): chosen_color = c else: logger.warning( f"Provided color '{color}' is not a valid 3- or 6-digit hex; falling back to default" ) if chosen_color is None: chosen_color = "CCCCCC" try: # Create the label and mark it as exclusive so subsequent labels added by this # command are treated as exclusive by Gitea (if supported by the server) label_obj = self.issue_api.issue_create_label( self.org_name, repo_name, body={"name": label_name, "color": chosen_color, "exclusive": True}, ) logger.info( f"Created label '{label_name}' in {self.org_name}/{repo_name}" ) except ApiException as e: logger.error(f"Failed to create label {label_name} in {repo_name}: {e}") if hasattr(e, "body"): logger.error(f"ApiException body: {getattr(e, 'body', None)}") return else: try: existing_color = getattr(label_obj, "color", None) or ( label_obj.get("color") if isinstance(label_obj, dict) else None ) if ( existing_color and isinstance(existing_color, str) and existing_color.startswith("#") ): existing_color = existing_color[1:] is_exclusive = getattr(label_obj, "exclusive", None) if not is_exclusive: enc_name = quote(label_name, safe="") path = f"/repos/{self.org_name}/{repo_name}/labels/{enc_name}" url = f"{self.api_client.configuration.host}{path}" token = ( getattr(self.api_client.configuration, "api_key", {}).get( "access_token" ) or settings.gitea_access_token ) headers_local = { "Authorization": f"token {token}", "Content-Type": "application/json", } payload = {"exclusive": True, "name": label_name} if existing_color: payload["color"] = existing_color try: resp = requests.patch( url, headers=headers_local, json=payload, timeout=10 ) if resp.status_code in (200, 201): logger.info( f"Marked existing label '{label_name}' as exclusive in {repo_name}" ) else: logger.warning( f"Failed to mark existing label '{label_name}' as exclusive: status={resp.status_code}, body={getattr(resp, 'text', None)}" ) except Exception as e: logger.warning( f"Error while trying to mark label '{label_name}' exclusive: {e}" ) except Exception: logger.debug( f"Could not ensure exclusive attribute for label '{label_name}' (continuing)" ) # Determine numeric id of the label in a type-safe manner label_id = None try: if isinstance(label_obj, dict): label_id = label_obj.get("id") else: label_id = getattr(label_obj, "id", None) except Exception: label_id = None if label_id is None: logger.error( f"Unable to determine id of label '{label_name}' in {repo_name}" ) return try: issues = { issue.number: issue for issue in list_all( self.issue_api.issue_list_issues, self.org_name, repo_name ) } except ApiException as e: logger.error(f"Failed to list issues for {repo_name}: {e}") return for num in issue_numbers: issue = issues.get(num) if issue is None: logger.warning(f"Issue #{num} not found in {repo_name}") continue def _extract_label_names_from_issue(issue_obj: Any) -> List[str]: try: return [ getattr(l, "name", l) for l in getattr(issue_obj, "labels", []) or [] ] except Exception: return [] existing_label_names = _extract_label_names_from_issue(issue) if label_name in existing_label_names: logger.info( f"Issue #{num} in {repo_name} already has label '{label_name}'" ) continue existing_label_ids: List[int] = [] try: for l in getattr(issue, "labels", []) or []: lid = getattr(l, "id", None) if lid is None: try: lid = l.get("id") except Exception: lid = None if lid is not None: existing_label_ids.append(lid) except Exception: existing_label_ids = [] if label_id in existing_label_ids: logger.info( f"Issue #{num} in {repo_name} already has label '{label_name}' (by id)" ) continue # verification def _fetch_issue_labels_via_api() -> List[str]: try: single = self.issue_api.issue_get_issue( self.org_name, repo_name, num ) return _extract_label_names_from_issue(single) except Exception: try: updated = next( ( i for i in list_all( self.issue_api.issue_list_issues, self.org_name, repo_name, ) if getattr(i, "number", None) == num ), None, ) if updated is None: return [] return _extract_label_names_from_issue(updated) except Exception: return [] issue_labels = _fetch_issue_labels_via_api() # prepare low-level HTTP helpers for fallbacks def _do_post_labels(issue_num: int, payload: Any) -> Any: path = f"/repos/{self.org_name}/{repo_name}/issues/{issue_num}/labels" full_url = f"{self.api_client.configuration.host}{path}" token = ( getattr(self.api_client.configuration, "api_key", {}).get( "access_token" ) or settings.gitea_access_token ) headers_local = { "Authorization": f"token {token}", "Content-Type": "application/json", } return requests.post( full_url, headers=headers_local, json=payload, timeout=10 ) # fallback: if label still not present, POST JSON object {"labels":[name]} to add-labels endpoint # determine if the target label is marked exclusive (force to bool) target_is_exclusive: bool = False try: target_label_obj = repo_label_name_to_obj.get(label_name) or label_obj _tmp_any: Any = getattr(target_label_obj, "exclusive", None) or ( target_label_obj.get("exclusive") if isinstance(target_label_obj, dict) else False ) target_is_exclusive = bool(_tmp_any) except Exception: target_is_exclusive = False if target_is_exclusive: # find other exclusive labels present on this issue and remove them other_exclusive_label_ids: List[int] = [] try: for lname in issue_labels or []: if lname == label_name: continue lobj = repo_label_name_to_obj.get(lname) is_ex = False if lobj is not None: is_ex = bool( getattr(lobj, "exclusive", None) or ( lobj.get("exclusive") if isinstance(lobj, dict) else False ) ) else: # try to find by name in labels list for ll in labels: n = getattr(ll, "name", None) or ( ll.get("name") if isinstance(ll, dict) else None ) if n == lname: _tmp_any_ex: Any = getattr( ll, "exclusive", None ) or ( ll.get("exclusive") if isinstance(ll, dict) else False ) is_ex = bool(_tmp_any_ex) break if is_ex: # determine id lid = None try: candidate = next( ( ll for ll in labels if ( getattr(ll, "name", None) or ( ll.get("name") if isinstance(ll, dict) else None ) ) == lname ), None, ) if candidate is not None: lid = getattr(candidate, "id", None) or ( candidate.get("id") if isinstance(candidate, dict) else None ) except Exception: lid = None if lid is not None: other_exclusive_label_ids.append(lid) except Exception: other_exclusive_label_ids = [] # delete other exclusive labels by numeric id token = ( getattr(self.api_client.configuration, "api_key", {}).get( "access_token" ) or settings.gitea_access_token ) headers_local = {"Authorization": f"token {token}"} for other_lid in other_exclusive_label_ids: try: path_id = f"/repos/{self.org_name}/{repo_name}/issues/{num}/labels/{other_lid}" url_id = f"{self.api_client.configuration.host}{path_id}" resp = requests.delete( url_id, headers=headers_local, timeout=10 ) if resp.status_code in (200, 204): logger.info( f"Removed exclusive label id {other_lid} from {repo_name}#{num}" ) else: logger.warning( f"Failed to remove exclusive label id {other_lid} from {repo_name}#{num}: status={resp.status_code}" ) except Exception as e: logger.warning( f"Error removing exclusive label id {other_lid} from {repo_name}#{num}: {e}" ) if label_name not in (issue_labels or []): try: resp = _do_post_labels(num, {"labels": [label_name]}) if resp.status_code not in (200, 201): logger.error( f"Failed to add label via add-labels endpoint for issue #{num}: status={resp.status_code}" ) except Exception as e: logger.error(f"Failed to POST object payload to issue #{num}: {e}") # final verification issue_labels = _fetch_issue_labels_via_api() if label_name in (issue_labels or []): logger.info( f"Label '{label_name}' attached to issue #{num} in {repo_name}" ) else: logger.warning( f"Label '{label_name}' not attached to issue #{num} in {repo_name} after attempts" ) def delete_label( self, repo_name: str, label_name: str, issue_numbers: Optional[List[int]] = None, delete_repo_label: bool = False, ) -> None: token = ( getattr(self.api_client.configuration, "api_key", {}).get("access_token") or settings.gitea_access_token ) headers_local = {"Authorization": f"token {token}"} repo_labels: List[Any] = [] try: repo_labels = list( cast( Iterable[Any], self.issue_api.issue_list_labels(self.org_name, repo_name), ) ) label_name_to_id: Dict[str, int] = {} for l in repo_labels: name = getattr(l, "name", None) or ( l.get("name") if isinstance(l, dict) else None ) lid = getattr(l, "id", None) or ( l.get("id") if isinstance(l, dict) else None ) if isinstance(name, str) and isinstance(lid, int): label_name_to_id[name] = lid except Exception: label_name_to_id = {} def _delete_issue_label(issue_num: int) -> None: lid = label_name_to_id.get(label_name) if lid is None: try: for l in repo_labels: name = getattr(l, "name", None) or ( l.get("name") if isinstance(l, dict) else None ) if name == label_name: lid = getattr(l, "id", None) or ( l.get("id") if isinstance(l, dict) else None ) break except Exception: logger.warning( f"Could not determine id of label '{label_name}' in {repo_name}" ) if lid is None: logger.warning( f"No numeric id found for label '{label_name}' in {repo_name}; skipping issue-level delete for issue #{issue_num}" ) return path_id = ( f"/repos/{self.org_name}/{repo_name}/issues/{issue_num}/labels/{lid}" ) url_id = f"{self.api_client.configuration.host}{path_id}" try: resp = requests.delete(url_id, headers=headers_local, timeout=10) if resp.status_code in (200, 204): logger.info( f"Removed label '{label_name}' from {repo_name}#{issue_num}" ) return logger.warning( f"Numeric-id DELETE returned status {resp.status_code} for {repo_name}#{issue_num}: body={getattr(resp, 'text', None)}" ) except Exception as e: logger.error( f"Numeric-id DELETE error for {repo_name}#{issue_num}: {e}" ) def _delete_repo_label() -> None: enc_name = quote(label_name, safe="") path = f"/repos/{self.org_name}/{repo_name}/labels/{enc_name}" url = f"{self.api_client.configuration.host}{path}" try: resp = requests.delete(url, headers=headers_local, timeout=10) if resp.status_code in (200, 204): logger.info( f"Removed repo-level label '{label_name}' from {repo_name}" ) return logger.error( f"Failed to delete repo label '{label_name}' from {repo_name}: status={resp.status_code}, body={getattr(resp, 'text', None)}" ) except Exception as e: logger.error( f"Error deleting repo label '{label_name}' from {repo_name}: {e}" ) if issue_numbers: try: issues = { issue.number: issue for issue in list_all( self.issue_api.issue_list_issues, self.org_name, repo_name ) } except ApiException as e: logger.error(f"Failed to list issues for {repo_name}: {e}") return for num in issue_numbers: if num not in issues: logger.warning(f"Issue #{num} not found in {repo_name}") continue _delete_issue_label(num) else: if delete_repo_label: _delete_repo_label() else: logger.warning( "No issue numbers provided and --repo not set; nothing to do for delete_label" ) if __name__ == "__main__": gitea = Gitea()