forked from JOJ/Joint-Teapot
		
	
		
			
				
	
	
		
			1080 lines
		
	
	
		
			43 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1080 lines
		
	
	
		
			43 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import re
 | 
						|
from enum import Enum
 | 
						|
from functools import lru_cache
 | 
						|
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, cast
 | 
						|
from urllib.parse import quote
 | 
						|
 | 
						|
import focs_gitea
 | 
						|
import requests  # type: ignore
 | 
						|
from canvasapi.group import Group, GroupMembership
 | 
						|
from canvasapi.paginated_list import PaginatedList
 | 
						|
from canvasapi.user import User
 | 
						|
from focs_gitea.rest import ApiException
 | 
						|
 | 
						|
from joint_teapot.config import settings
 | 
						|
from joint_teapot.utils.logger import logger
 | 
						|
from joint_teapot.utils.main import default_repo_name_convertor, first
 | 
						|
 | 
						|
 | 
						|
class PermissionEnum(Enum):
 | 
						|
    read = "read"
 | 
						|
    write = "write"
 | 
						|
    admin = "admin"
 | 
						|
 | 
						|
 | 
						|
def list_all(method: Callable[..., Any], *args: Any, **kwargs: Any) -> List[Any]:
 | 
						|
    """Call paginated API methods repeatedly and collect results.
 | 
						|
 | 
						|
    The exact return element types vary depending on the API client. We use
 | 
						|
    ``Any`` here to avoid over-constraining typing for the external client.
 | 
						|
    """
 | 
						|
    all_res: List[Any] = []
 | 
						|
    page = 1
 | 
						|
    while True:
 | 
						|
        res = method(*args, **kwargs, page=page)
 | 
						|
        if not res:
 | 
						|
            break
 | 
						|
        for item in res:
 | 
						|
            all_res.append(item)
 | 
						|
        page += 1
 | 
						|
    return all_res
 | 
						|
 | 
						|
 | 
						|
class Gitea:
 | 
						|
    def __init__(
 | 
						|
        self,
 | 
						|
        access_token: str = "",  # nosec
 | 
						|
        org_name: str = "",
 | 
						|
        domain_name: str = "",
 | 
						|
        suffix: str = "",
 | 
						|
    ):
 | 
						|
        access_token = access_token or settings.gitea_access_token
 | 
						|
        org_name = org_name or settings.gitea_org_name
 | 
						|
        domain_name = domain_name or settings.gitea_domain_name
 | 
						|
        suffix = suffix or settings.gitea_suffix
 | 
						|
        self.org_name = org_name
 | 
						|
        configuration = focs_gitea.Configuration()
 | 
						|
        configuration.api_key["access_token"] = access_token
 | 
						|
        configuration.host = f"https://{domain_name}{suffix}/api/v1"
 | 
						|
        configuration.debug = settings.gitea_debug
 | 
						|
        for v in configuration.logger.values():
 | 
						|
            v.handlers = []
 | 
						|
        self.api_client = focs_gitea.ApiClient(configuration)
 | 
						|
        self.admin_api = focs_gitea.AdminApi(self.api_client)
 | 
						|
        self.miscellaneous_api = focs_gitea.MiscellaneousApi(self.api_client)
 | 
						|
        self.organization_api = focs_gitea.OrganizationApi(self.api_client)
 | 
						|
        self.issue_api = focs_gitea.IssueApi(self.api_client)
 | 
						|
        self.repository_api = focs_gitea.RepositoryApi(self.api_client)
 | 
						|
        self.settings_api = focs_gitea.SettingsApi(self.api_client)
 | 
						|
        self.user_api = focs_gitea.UserApi(self.api_client)
 | 
						|
        logger.debug("Gitea initialized")
 | 
						|
 | 
						|
    @lru_cache()
 | 
						|
    def _get_team_id_by_name(self, name: str) -> int:
 | 
						|
        res = self.organization_api.team_search(
 | 
						|
            self.org_name, q=str(name), limit=1
 | 
						|
        ).to_dict()
 | 
						|
        if len(res["data"] or []) == 0:
 | 
						|
            raise Exception(
 | 
						|
                f"{name} not found by name in Gitea. Possible reason: you did not join this team."
 | 
						|
            )
 | 
						|
        return res["data"][0]["id"]
 | 
						|
 | 
						|
    @lru_cache()
 | 
						|
    def _get_username_by_canvas_student(self, student: User) -> str:
 | 
						|
        if (
 | 
						|
            student.email is not None
 | 
						|
            and student.email.count("@") == 1
 | 
						|
            and student.email.endswith("@sjtu.edu.cn")
 | 
						|
        ):
 | 
						|
            return student.email.split("@")[0]
 | 
						|
        raise Exception(f"Can not get username of {student}, an SJTU email is expected")
 | 
						|
 | 
						|
    def add_canvas_students_to_teams(
 | 
						|
        self, students: PaginatedList, team_names: List[str]
 | 
						|
    ) -> None:
 | 
						|
        for team_name in team_names:
 | 
						|
            team_id = self._get_team_id_by_name(team_name)
 | 
						|
            team_members = self.organization_api.org_list_team_members(team_id)
 | 
						|
            for student in students:
 | 
						|
                try:
 | 
						|
                    username = self._get_username_by_canvas_student(student)
 | 
						|
                    team_member = first(team_members, lambda x: x.login == username)
 | 
						|
                    if team_member is None:
 | 
						|
                        self.organization_api.org_add_team_member(team_id, username)
 | 
						|
                        logger.info(f"{student} added to team {team_name}")
 | 
						|
                    else:
 | 
						|
                        team_members.remove(team_member)
 | 
						|
                        logger.warning(f"{student} already in team {team_name}")
 | 
						|
                except Exception as e:
 | 
						|
                    logger.error(e)
 | 
						|
            for team_member in team_members:
 | 
						|
                logger.error(
 | 
						|
                    f"{team_member.full_name} found in team {team_name} "
 | 
						|
                    + "but not found in Canvas students"
 | 
						|
                )
 | 
						|
 | 
						|
    def create_personal_repos_for_canvas_students(
 | 
						|
        self,
 | 
						|
        students: PaginatedList,
 | 
						|
        repo_name_convertor: Callable[
 | 
						|
            [User], Optional[str]
 | 
						|
        ] = default_repo_name_convertor,
 | 
						|
        template: str = "",
 | 
						|
    ) -> List[str]:
 | 
						|
        repo_names = []
 | 
						|
        for student in students:
 | 
						|
            repo_name = repo_name_convertor(student)
 | 
						|
            if repo_name is None:
 | 
						|
                continue
 | 
						|
            repo_names.append(repo_name)
 | 
						|
            try:
 | 
						|
                try:
 | 
						|
                    if template == "":
 | 
						|
                        body = {
 | 
						|
                            "auto_init": False,
 | 
						|
                            "default_branch": settings.default_branch,
 | 
						|
                            "name": repo_name,
 | 
						|
                            "private": True,
 | 
						|
                            "template": False,
 | 
						|
                            "trust_model": "default",
 | 
						|
                        }
 | 
						|
                        self.organization_api.create_org_repo(self.org_name, body=body)
 | 
						|
                    else:
 | 
						|
                        body = {
 | 
						|
                            "default_branch": settings.default_branch,
 | 
						|
                            "git_content": True,
 | 
						|
                            "git_hooks": True,
 | 
						|
                            "labels": True,
 | 
						|
                            "name": repo_name,
 | 
						|
                            "owner": self.org_name,
 | 
						|
                            "private": True,
 | 
						|
                            "protected_branch": True,
 | 
						|
                        }
 | 
						|
                        self.repository_api.generate_repo(
 | 
						|
                            self.org_name, template, body=body
 | 
						|
                        )
 | 
						|
                    logger.info(
 | 
						|
                        f"Personal repo {self.org_name}/{repo_name} for {student} created"
 | 
						|
                    )
 | 
						|
                except ApiException as e:
 | 
						|
                    if e.status == 409:
 | 
						|
                        logger.warning(
 | 
						|
                            f"Personal repo {self.org_name}/{repo_name} for {student} already exists"
 | 
						|
                        )
 | 
						|
                    else:
 | 
						|
                        raise (e)
 | 
						|
                username = self._get_username_by_canvas_student(student)
 | 
						|
                self.repository_api.repo_add_collaborator(
 | 
						|
                    self.org_name, repo_name, username
 | 
						|
                )
 | 
						|
            except Exception as e:
 | 
						|
                logger.error(e)
 | 
						|
        return repo_names
 | 
						|
 | 
						|
    def create_teams_and_repos_by_canvas_groups(
 | 
						|
        self,
 | 
						|
        students: PaginatedList,
 | 
						|
        groups: PaginatedList,
 | 
						|
        team_name_convertor: Callable[[str], Optional[str]] = lambda name: name,
 | 
						|
        repo_name_convertor: Callable[[str], Optional[str]] = lambda name: name,
 | 
						|
        template: str = "",
 | 
						|
        permission: PermissionEnum = PermissionEnum.write,
 | 
						|
    ) -> List[str]:
 | 
						|
        repo_names = []
 | 
						|
        teams = list_all(self.organization_api.org_list_teams, self.org_name)
 | 
						|
        repos = list_all(self.organization_api.org_list_repos, self.org_name)
 | 
						|
        group: Group
 | 
						|
        for group in groups:
 | 
						|
            team_name = team_name_convertor(group.name)
 | 
						|
            repo_name = repo_name_convertor(group.name)
 | 
						|
            if team_name is None or repo_name is None:
 | 
						|
                continue
 | 
						|
            team = first(teams, lambda team: team.name == team_name)
 | 
						|
            if team is None:
 | 
						|
                team = self.organization_api.org_create_team(
 | 
						|
                    self.org_name,
 | 
						|
                    body={
 | 
						|
                        "can_create_org_repo": False,
 | 
						|
                        "includes_all_repositories": False,
 | 
						|
                        "name": team_name,
 | 
						|
                        "permission": permission.value,
 | 
						|
                        "units": [
 | 
						|
                            "repo.code",
 | 
						|
                            "repo.issues",
 | 
						|
                            "repo.ext_issues",
 | 
						|
                            "repo.wiki",
 | 
						|
                            "repo.pulls",
 | 
						|
                            "repo.releases",
 | 
						|
                            "repo.projects",
 | 
						|
                            "repo.ext_wiki",
 | 
						|
                        ],
 | 
						|
                    },
 | 
						|
                )
 | 
						|
                logger.info(f"Team {team_name} created")
 | 
						|
            if first(repos, lambda repo: repo.name == repo_name) is None:
 | 
						|
                repo_names.append(repo_name)
 | 
						|
                if template == "":
 | 
						|
                    self.organization_api.create_org_repo(
 | 
						|
                        self.org_name,
 | 
						|
                        body={
 | 
						|
                            "auto_init": False,
 | 
						|
                            "default_branch": settings.default_branch,
 | 
						|
                            "name": repo_name,
 | 
						|
                            "private": True,
 | 
						|
                            "template": False,
 | 
						|
                            "trust_model": "default",
 | 
						|
                        },
 | 
						|
                    )
 | 
						|
                else:
 | 
						|
                    self.repository_api.generate_repo(
 | 
						|
                        self.org_name,
 | 
						|
                        template,
 | 
						|
                        body={
 | 
						|
                            "default_branch": settings.default_branch,
 | 
						|
                            "git_content": True,
 | 
						|
                            "git_hooks": True,
 | 
						|
                            "labels": True,
 | 
						|
                            "name": repo_name,
 | 
						|
                            "owner": self.org_name,
 | 
						|
                            "private": True,
 | 
						|
                            "protected_branch": True,
 | 
						|
                        },
 | 
						|
                    )
 | 
						|
                logger.info(f"{self.org_name}/{team_name} created")
 | 
						|
            try:
 | 
						|
                self.organization_api.org_add_team_repository(
 | 
						|
                    team.id, self.org_name, repo_name
 | 
						|
                )
 | 
						|
            except Exception as e:
 | 
						|
                logger.warning(e)
 | 
						|
            membership: GroupMembership
 | 
						|
            student_count = 0
 | 
						|
            for membership in group.get_memberships():
 | 
						|
                student = first(students, lambda s: s.id == membership.user_id)
 | 
						|
                student_count += 1
 | 
						|
                if student is None:
 | 
						|
                    raise Exception(
 | 
						|
                        f"student with user_id {membership.user_id} not found"
 | 
						|
                    )
 | 
						|
                try:
 | 
						|
                    username = self._get_username_by_canvas_student(student)
 | 
						|
                except Exception as e:
 | 
						|
                    logger.warning(e)
 | 
						|
                    continue
 | 
						|
                try:
 | 
						|
                    self.organization_api.org_add_team_member(team.id, username)
 | 
						|
                    self.repository_api.repo_add_collaborator(
 | 
						|
                        self.org_name, repo_name, username
 | 
						|
                    )
 | 
						|
                except Exception as e:
 | 
						|
                    logger.error(e)
 | 
						|
                    continue
 | 
						|
            try:
 | 
						|
                self.repository_api.repo_delete_branch_protection(
 | 
						|
                    self.org_name, repo_name, settings.default_branch
 | 
						|
                )
 | 
						|
            except ApiException as e:
 | 
						|
                if e.status != 404:
 | 
						|
                    raise
 | 
						|
            try:
 | 
						|
                self.repository_api.repo_create_branch_protection(
 | 
						|
                    self.org_name,
 | 
						|
                    repo_name,
 | 
						|
                    body={
 | 
						|
                        "block_on_official_review_requests": True,
 | 
						|
                        "block_on_outdated_branch": True,
 | 
						|
                        "block_on_rejected_reviews": True,
 | 
						|
                        "branch_name": settings.default_branch,
 | 
						|
                        "dismiss_stale_approvals": True,
 | 
						|
                        "enable_approvals_whitelist": False,
 | 
						|
                        "enable_merge_whitelist": False,
 | 
						|
                        "enable_push": True,
 | 
						|
                        "enable_push_whitelist": True,
 | 
						|
                        "merge_whitelist_teams": [],
 | 
						|
                        "merge_whitelist_usernames": [],
 | 
						|
                        "protected_file_patterns": "",
 | 
						|
                        "push_whitelist_deploy_keys": False,
 | 
						|
                        "push_whitelist_teams": ["Owners"],
 | 
						|
                        "push_whitelist_usernames": [],
 | 
						|
                        "require_signed_commits": False,
 | 
						|
                        "required_approvals": max(student_count - 1, 0),
 | 
						|
                        "enable_status_check": True,
 | 
						|
                        "status_check_contexts": ["Run JOJ3 on Push / run (push)"],
 | 
						|
                    },
 | 
						|
                )
 | 
						|
            except ApiException as e:
 | 
						|
                if e.status != 404:
 | 
						|
                    raise
 | 
						|
            logger.info(f"{self.org_name}/{repo_name} jobs done")
 | 
						|
        return repo_names
 | 
						|
 | 
						|
    def get_public_key_of_canvas_students(
 | 
						|
        self, students: PaginatedList
 | 
						|
    ) -> Dict[str, List[str]]:
 | 
						|
        res = {}
 | 
						|
        for student in students:
 | 
						|
            try:
 | 
						|
                username = self._get_username_by_canvas_student(student)
 | 
						|
                keys = [
 | 
						|
                    item.key
 | 
						|
                    for item in list_all(self.user_api.user_list_keys, username)
 | 
						|
                ]
 | 
						|
                if not keys:
 | 
						|
                    logger.info(f"{student} has not uploaded ssh keys to gitea")
 | 
						|
                    continue
 | 
						|
                res[student.login_id] = keys
 | 
						|
            except Exception as e:
 | 
						|
                logger.error(e)
 | 
						|
        return res
 | 
						|
 | 
						|
    def get_repo_releases(self, repo_name: str) -> List[Any]:
 | 
						|
        try:
 | 
						|
            args = self.repository_api.repo_list_releases, self.org_name, repo_name
 | 
						|
            return list_all(*args)
 | 
						|
        except ApiException as e:
 | 
						|
            if e.status != 404:
 | 
						|
                raise
 | 
						|
        return []
 | 
						|
 | 
						|
    def get_all_repo_names(self) -> List[str]:
 | 
						|
        return [
 | 
						|
            data.name
 | 
						|
            for data in list_all(self.organization_api.org_list_repos, self.org_name)
 | 
						|
        ]
 | 
						|
 | 
						|
    def get_no_collaborator_repos(self) -> List[str]:
 | 
						|
        res = []
 | 
						|
        for data in list_all(self.organization_api.org_list_repos, self.org_name):
 | 
						|
            collaborators = self.repository_api.repo_list_collaborators(
 | 
						|
                self.org_name, data.name
 | 
						|
            )
 | 
						|
            if collaborators:
 | 
						|
                continue
 | 
						|
            logger.info(f"{self.org_name}/{data.name} has no collaborators")
 | 
						|
            res.append(data.name)
 | 
						|
        return res
 | 
						|
 | 
						|
    def get_repos_status(self) -> Dict[str, Tuple[int, int]]:
 | 
						|
        res = {}
 | 
						|
        for repo in list_all(self.organization_api.org_list_repos, self.org_name):
 | 
						|
            commits = []
 | 
						|
            issues = []
 | 
						|
            try:
 | 
						|
                commits = self.repository_api.repo_get_all_commits(
 | 
						|
                    self.org_name, repo.name
 | 
						|
                )
 | 
						|
            except ApiException as e:
 | 
						|
                if e.status != 409:
 | 
						|
                    raise
 | 
						|
            issues = self.issue_api.issue_list_issues(
 | 
						|
                self.org_name, repo.name, state="all"
 | 
						|
            )
 | 
						|
            # if not commits:
 | 
						|
            #     logger.info(f"{self.org_name}/{repo.name} has no commits")
 | 
						|
            res[repo.name] = (len(commits), len(issues))
 | 
						|
        return res
 | 
						|
 | 
						|
    def create_issue(
 | 
						|
        self,
 | 
						|
        repo_name: str,
 | 
						|
        title: str,
 | 
						|
        body: str,
 | 
						|
        assign_every_collaborators: bool = True,
 | 
						|
        milestone: str = "",
 | 
						|
        labels: list[str] = [],
 | 
						|
    ) -> None:
 | 
						|
        assignees = []
 | 
						|
        if assign_every_collaborators:
 | 
						|
            assignees = [
 | 
						|
                item.login
 | 
						|
                for item in list_all(
 | 
						|
                    self.repository_api.repo_list_collaborators,
 | 
						|
                    self.org_name,
 | 
						|
                    repo_name,
 | 
						|
                )
 | 
						|
            ]
 | 
						|
        milestone_id = None
 | 
						|
        if milestone:
 | 
						|
            milestone_list = self.issue_api.issue_get_milestones_list(
 | 
						|
                self.org_name, repo_name
 | 
						|
            )
 | 
						|
            if milestone not in [m.title for m in milestone_list]:
 | 
						|
                logger.warning(f"Milestone {milestone} does not exist in {repo_name}")
 | 
						|
            else:
 | 
						|
                milestone_id = first(
 | 
						|
                    [m.id for m in milestone_list if m.title == milestone]
 | 
						|
                )
 | 
						|
        labels_id = []
 | 
						|
        if labels:
 | 
						|
            labels_list = self.issue_api.issue_list_labels(self.org_name, repo_name)
 | 
						|
            labels_id = [l.id for l in labels_list if l.name in labels]
 | 
						|
            if not labels_id:
 | 
						|
                logger.warning(f"no label matches {labels}")
 | 
						|
        self.issue_api.issue_create_issue(
 | 
						|
            self.org_name,
 | 
						|
            repo_name,
 | 
						|
            body={
 | 
						|
                "title": title,
 | 
						|
                "body": body,
 | 
						|
                "assignees": assignees,
 | 
						|
                "milestone": milestone_id,
 | 
						|
                "labels": labels_id,
 | 
						|
            },
 | 
						|
        )
 | 
						|
        logger.info(f'Created issue "{title}" in {repo_name}')
 | 
						|
 | 
						|
    def create_comment(
 | 
						|
        self,
 | 
						|
        repo_name: str,
 | 
						|
        index: int,
 | 
						|
        body: str,
 | 
						|
    ) -> None:
 | 
						|
        self.issue_api.issue_create_comment(
 | 
						|
            self.org_name,
 | 
						|
            repo_name,
 | 
						|
            index,
 | 
						|
            body={"body": body},
 | 
						|
        )
 | 
						|
        logger.info(f"Created comment in {repo_name}/issues/{index}")
 | 
						|
 | 
						|
    def create_milestone(
 | 
						|
        self,
 | 
						|
        repo_name: str,
 | 
						|
        title: str,
 | 
						|
        description: str,
 | 
						|
        due_on: str,
 | 
						|
    ) -> None:
 | 
						|
        if due_on == "":
 | 
						|
            self.issue_api.issue_create_milestone(
 | 
						|
                self.org_name,
 | 
						|
                repo_name,
 | 
						|
                body={"title": title, "description": description},
 | 
						|
            )
 | 
						|
            return
 | 
						|
        self.issue_api.issue_create_milestone(
 | 
						|
            self.org_name,
 | 
						|
            repo_name,
 | 
						|
            body={
 | 
						|
                "title": title,
 | 
						|
                "description": description,
 | 
						|
                "due_on": due_on + "T23:59:59.999+08:00",
 | 
						|
            },
 | 
						|
        )
 | 
						|
 | 
						|
    def check_exist_issue_by_title(self, repo_name: str, title: str) -> bool:
 | 
						|
        for issue in list_all(
 | 
						|
            self.issue_api.issue_list_issues, self.org_name, repo_name
 | 
						|
        ):
 | 
						|
            if issue.title == title:
 | 
						|
                return True
 | 
						|
        return False
 | 
						|
 | 
						|
    def close_all_issues(self) -> None:
 | 
						|
        for repo_name in self.get_all_repo_names():
 | 
						|
            issues = list_all(
 | 
						|
                self.issue_api.issue_list_issues, self.org_name, repo_name
 | 
						|
            )
 | 
						|
            for issue in issues:
 | 
						|
                if issue.state != "closed":
 | 
						|
                    self.issue_api.issue_edit_issue(
 | 
						|
                        self.org_name, repo_name, issue.number, body={"state": "closed"}
 | 
						|
                    )
 | 
						|
 | 
						|
    def close_issues(
 | 
						|
        self, repo_name: str, issue_numbers: List[int], dry_run: bool = True
 | 
						|
    ) -> None:
 | 
						|
        if not issue_numbers:
 | 
						|
            logger.warning("No issue numbers provided to close")
 | 
						|
            return
 | 
						|
        if dry_run:
 | 
						|
            logger.info("Dry run enabled. No changes will be made to issues.")
 | 
						|
        try:
 | 
						|
            issues = {
 | 
						|
                issue.number: issue
 | 
						|
                for issue in list_all(
 | 
						|
                    self.issue_api.issue_list_issues, self.org_name, repo_name
 | 
						|
                )
 | 
						|
            }
 | 
						|
        except ApiException as e:
 | 
						|
            logger.error(f"Failed to list issues for {repo_name}: {e}")
 | 
						|
            return
 | 
						|
 | 
						|
        for num in issue_numbers:
 | 
						|
            issue = issues.get(num)
 | 
						|
            if issue is None:
 | 
						|
                logger.warning(f"Issue #{num} not found in {repo_name}")
 | 
						|
                continue
 | 
						|
            if getattr(issue, "state", "") == "closed":
 | 
						|
                logger.info(f"Issue #{num} in {repo_name} already closed")
 | 
						|
                continue
 | 
						|
            try:
 | 
						|
                if dry_run:
 | 
						|
                    logger.info(f"Would close issue #{num} in {repo_name} (dry run)")
 | 
						|
                    continue
 | 
						|
                self.issue_api.issue_edit_issue(
 | 
						|
                    self.org_name, repo_name, num, body={"state": "closed"}
 | 
						|
                )
 | 
						|
                logger.info(f"Closed issue #{num} in {repo_name}")
 | 
						|
            except ApiException as e:
 | 
						|
                logger.error(f"Failed to close issue #{num} in {repo_name}: {e}")
 | 
						|
 | 
						|
    def archive_repos(self, regex: str = ".+", dry_run: bool = True) -> None:
 | 
						|
        if dry_run:
 | 
						|
            logger.info("Dry run enabled. No changes will be made to the repositories.")
 | 
						|
        logger.info(f"Archiving repos with name matching {regex}")
 | 
						|
        for repo_name in self.get_all_repo_names():
 | 
						|
            if re.fullmatch(regex, repo_name):
 | 
						|
                logger.info(f"Archived {repo_name}")
 | 
						|
                if not dry_run:
 | 
						|
                    self.repository_api.repo_edit(
 | 
						|
                        self.org_name, repo_name, body={"archived": True}
 | 
						|
                    )
 | 
						|
 | 
						|
    def unwatch_all_repos(self) -> None:
 | 
						|
        for repo in list_all(self.organization_api.org_list_repos, self.org_name):
 | 
						|
            self.repository_api.user_current_delete_subscription(
 | 
						|
                self.org_name, repo.name
 | 
						|
            )
 | 
						|
 | 
						|
    def get_all_teams(self) -> Dict[str, List[str]]:
 | 
						|
        res: Dict[str, List[str]] = {}
 | 
						|
        for team in list_all(self.organization_api.org_list_teams, self.org_name):
 | 
						|
            if team.name == "Owners":
 | 
						|
                continue
 | 
						|
            team_id = team.id
 | 
						|
            try:
 | 
						|
                members = [
 | 
						|
                    m.login.lower()
 | 
						|
                    for m in self.organization_api.org_list_team_members(team_id)
 | 
						|
                ]
 | 
						|
            except ApiException as e:
 | 
						|
                logger.warning(
 | 
						|
                    f"Failed to get members of team {team_id} in {self.org_name}: {e}"
 | 
						|
                )
 | 
						|
                continue
 | 
						|
            res[team.name] = members
 | 
						|
        return res
 | 
						|
 | 
						|
    def unsubscribe_from_repos(self, pattern: str) -> None:
 | 
						|
        subscriptions = [
 | 
						|
            sub
 | 
						|
            for sub in self.user_api.user_current_list_subscriptions()
 | 
						|
            if sub.owner.login == self.org_name
 | 
						|
            and re.search(pattern, sub.name) is not None
 | 
						|
        ]
 | 
						|
        if len(subscriptions) == 0:
 | 
						|
            logger.warning(f"No subscribed repo matches the pattern {pattern}")
 | 
						|
            return
 | 
						|
        logger.info(
 | 
						|
            f"{len(subscriptions)} subscriptions match the pattern {pattern}: {[s.name for s in subscriptions]}"
 | 
						|
        )
 | 
						|
        for sub in subscriptions:
 | 
						|
            self.repository_api.user_current_delete_subscription(
 | 
						|
                self.org_name, sub.name
 | 
						|
            )
 | 
						|
            logger.info(f"Unsubscribed from {sub.name}")
 | 
						|
 | 
						|
    def create_milestones(
 | 
						|
        self, milestone: str, regex: str, due_date: str, description: str
 | 
						|
    ) -> None:
 | 
						|
        for repo_name in self.get_all_repo_names():
 | 
						|
            if not re.fullmatch(regex, repo_name):
 | 
						|
                continue
 | 
						|
            milestone_list = self.issue_api.issue_get_milestones_list(
 | 
						|
                self.org_name, repo_name
 | 
						|
            )
 | 
						|
            if milestone in [m.title for m in milestone_list]:
 | 
						|
                logger.warning(f"Milestone {milestone} already exists in {repo_name}")
 | 
						|
                continue
 | 
						|
            self.create_milestone(repo_name, milestone, description, due_date)
 | 
						|
            logger.info(f"Created milestone {milestone} in {repo_name}")
 | 
						|
 | 
						|
    def label_issues(
 | 
						|
        self,
 | 
						|
        repo_name: str,
 | 
						|
        label_name: str,
 | 
						|
        issue_numbers: List[int],
 | 
						|
        color: Optional[str] = None,
 | 
						|
    ) -> None:
 | 
						|
        if not issue_numbers:
 | 
						|
            logger.warning("No issue numbers provided to label")
 | 
						|
            return
 | 
						|
 | 
						|
        try:
 | 
						|
            labels = list(
 | 
						|
                cast(
 | 
						|
                    Iterable[Any],
 | 
						|
                    self.issue_api.issue_list_labels(self.org_name, repo_name),
 | 
						|
                )
 | 
						|
            )
 | 
						|
        except ApiException as e:
 | 
						|
            logger.error(f"Failed to list labels for {repo_name}: {e}")
 | 
						|
            return
 | 
						|
 | 
						|
        # Build a lookup for repo labels by name to read metadata like 'exclusive'
 | 
						|
        repo_label_name_to_obj: Dict[str, Any] = {}
 | 
						|
        try:
 | 
						|
            for l in labels:
 | 
						|
                name = getattr(l, "name", None) or (
 | 
						|
                    l.get("name") if isinstance(l, dict) else None
 | 
						|
                )
 | 
						|
                if isinstance(name, str):
 | 
						|
                    repo_label_name_to_obj[name] = l
 | 
						|
        except Exception:
 | 
						|
            repo_label_name_to_obj = {}
 | 
						|
 | 
						|
        label_obj = None
 | 
						|
        for l in labels:
 | 
						|
            if getattr(l, "name", None) == label_name:
 | 
						|
                label_obj = l
 | 
						|
                break
 | 
						|
 | 
						|
        if not label_obj:
 | 
						|
            chosen_color = None
 | 
						|
            if color:
 | 
						|
                c = color.strip()
 | 
						|
                if c.startswith("#"):
 | 
						|
                    c = c[1:]
 | 
						|
                if re.fullmatch(r"[0-9a-fA-F]{6}", c):
 | 
						|
                    chosen_color = c
 | 
						|
                else:
 | 
						|
                    logger.warning(
 | 
						|
                        f"Provided color '{color}' is not a valid 3- or 6-digit hex; falling back to default"
 | 
						|
                    )
 | 
						|
            if chosen_color is None:
 | 
						|
                chosen_color = "CCCCCC"
 | 
						|
            try:
 | 
						|
                # Create the label and mark it as exclusive so subsequent labels added by this
 | 
						|
                # command are treated as exclusive by Gitea (if supported by the server)
 | 
						|
                label_obj = self.issue_api.issue_create_label(
 | 
						|
                    self.org_name,
 | 
						|
                    repo_name,
 | 
						|
                    body={"name": label_name, "color": chosen_color, "exclusive": True},
 | 
						|
                )
 | 
						|
                logger.info(
 | 
						|
                    f"Created label '{label_name}' in {self.org_name}/{repo_name}"
 | 
						|
                )
 | 
						|
            except ApiException as e:
 | 
						|
                logger.error(f"Failed to create label {label_name} in {repo_name}: {e}")
 | 
						|
                if hasattr(e, "body"):
 | 
						|
                    logger.error(f"ApiException body: {getattr(e, 'body', None)}")
 | 
						|
                return
 | 
						|
 | 
						|
        else:
 | 
						|
            try:
 | 
						|
                existing_color = getattr(label_obj, "color", None) or (
 | 
						|
                    label_obj.get("color") if isinstance(label_obj, dict) else None
 | 
						|
                )
 | 
						|
                if (
 | 
						|
                    existing_color
 | 
						|
                    and isinstance(existing_color, str)
 | 
						|
                    and existing_color.startswith("#")
 | 
						|
                ):
 | 
						|
                    existing_color = existing_color[1:]
 | 
						|
                is_exclusive = getattr(label_obj, "exclusive", None)
 | 
						|
                if not is_exclusive:
 | 
						|
                    enc_name = quote(label_name, safe="")
 | 
						|
                    path = f"/repos/{self.org_name}/{repo_name}/labels/{enc_name}"
 | 
						|
                    url = f"{self.api_client.configuration.host}{path}"
 | 
						|
                    token = (
 | 
						|
                        getattr(self.api_client.configuration, "api_key", {}).get(
 | 
						|
                            "access_token"
 | 
						|
                        )
 | 
						|
                        or settings.gitea_access_token
 | 
						|
                    )
 | 
						|
                    headers_local = {
 | 
						|
                        "Authorization": f"token {token}",
 | 
						|
                        "Content-Type": "application/json",
 | 
						|
                    }
 | 
						|
                    payload = {"exclusive": True, "name": label_name}
 | 
						|
                    if existing_color:
 | 
						|
                        payload["color"] = existing_color
 | 
						|
                    try:
 | 
						|
                        resp = requests.patch(
 | 
						|
                            url, headers=headers_local, json=payload, timeout=10
 | 
						|
                        )
 | 
						|
                        if resp.status_code in (200, 201):
 | 
						|
                            logger.info(
 | 
						|
                                f"Marked existing label '{label_name}' as exclusive in {repo_name}"
 | 
						|
                            )
 | 
						|
                        else:
 | 
						|
                            logger.warning(
 | 
						|
                                f"Failed to mark existing label '{label_name}' as exclusive: status={resp.status_code}, body={getattr(resp, 'text', None)}"
 | 
						|
                            )
 | 
						|
                    except Exception as e:
 | 
						|
                        logger.warning(
 | 
						|
                            f"Error while trying to mark label '{label_name}' exclusive: {e}"
 | 
						|
                        )
 | 
						|
            except Exception:
 | 
						|
                logger.debug(
 | 
						|
                    f"Could not ensure exclusive attribute for label '{label_name}' (continuing)"
 | 
						|
                )
 | 
						|
 | 
						|
        # Determine numeric id of the label in a type-safe manner
 | 
						|
        label_id = None
 | 
						|
        try:
 | 
						|
            if isinstance(label_obj, dict):
 | 
						|
                label_id = label_obj.get("id")
 | 
						|
            else:
 | 
						|
                label_id = getattr(label_obj, "id", None)
 | 
						|
        except Exception:
 | 
						|
            label_id = None
 | 
						|
 | 
						|
        if label_id is None:
 | 
						|
            logger.error(
 | 
						|
                f"Unable to determine id of label '{label_name}' in {repo_name}"
 | 
						|
            )
 | 
						|
            return
 | 
						|
 | 
						|
        try:
 | 
						|
            issues = {
 | 
						|
                issue.number: issue
 | 
						|
                for issue in list_all(
 | 
						|
                    self.issue_api.issue_list_issues, self.org_name, repo_name
 | 
						|
                )
 | 
						|
            }
 | 
						|
        except ApiException as e:
 | 
						|
            logger.error(f"Failed to list issues for {repo_name}: {e}")
 | 
						|
            return
 | 
						|
 | 
						|
        for num in issue_numbers:
 | 
						|
            issue = issues.get(num)
 | 
						|
            if issue is None:
 | 
						|
                logger.warning(f"Issue #{num} not found in {repo_name}")
 | 
						|
                continue
 | 
						|
 | 
						|
            def _extract_label_names_from_issue(issue_obj: Any) -> List[str]:
 | 
						|
                try:
 | 
						|
                    return [
 | 
						|
                        getattr(l, "name", l)
 | 
						|
                        for l in getattr(issue_obj, "labels", []) or []
 | 
						|
                    ]
 | 
						|
                except Exception:
 | 
						|
                    return []
 | 
						|
 | 
						|
            existing_label_names = _extract_label_names_from_issue(issue)
 | 
						|
            if label_name in existing_label_names:
 | 
						|
                logger.info(
 | 
						|
                    f"Issue #{num} in {repo_name} already has label '{label_name}'"
 | 
						|
                )
 | 
						|
                continue
 | 
						|
 | 
						|
            existing_label_ids: List[int] = []
 | 
						|
            try:
 | 
						|
                for l in getattr(issue, "labels", []) or []:
 | 
						|
                    lid = getattr(l, "id", None)
 | 
						|
                    if lid is None:
 | 
						|
                        try:
 | 
						|
                            lid = l.get("id")
 | 
						|
                        except Exception:
 | 
						|
                            lid = None
 | 
						|
                    if lid is not None:
 | 
						|
                        existing_label_ids.append(lid)
 | 
						|
            except Exception:
 | 
						|
                existing_label_ids = []
 | 
						|
 | 
						|
            if label_id in existing_label_ids:
 | 
						|
                logger.info(
 | 
						|
                    f"Issue #{num} in {repo_name} already has label '{label_name}' (by id)"
 | 
						|
                )
 | 
						|
                continue
 | 
						|
 | 
						|
            # verification
 | 
						|
            def _fetch_issue_labels_via_api() -> List[str]:
 | 
						|
                try:
 | 
						|
                    single = self.issue_api.issue_get_issue(
 | 
						|
                        self.org_name, repo_name, num
 | 
						|
                    )
 | 
						|
                    return _extract_label_names_from_issue(single)
 | 
						|
                except Exception:
 | 
						|
                    try:
 | 
						|
                        updated = next(
 | 
						|
                            (
 | 
						|
                                i
 | 
						|
                                for i in list_all(
 | 
						|
                                    self.issue_api.issue_list_issues,
 | 
						|
                                    self.org_name,
 | 
						|
                                    repo_name,
 | 
						|
                                )
 | 
						|
                                if getattr(i, "number", None) == num
 | 
						|
                            ),
 | 
						|
                            None,
 | 
						|
                        )
 | 
						|
                        if updated is None:
 | 
						|
                            return []
 | 
						|
                        return _extract_label_names_from_issue(updated)
 | 
						|
                    except Exception:
 | 
						|
                        return []
 | 
						|
 | 
						|
            issue_labels = _fetch_issue_labels_via_api()
 | 
						|
 | 
						|
            # prepare low-level HTTP helpers for fallbacks
 | 
						|
            def _do_post_labels(issue_num: int, payload: Any) -> Any:
 | 
						|
                path = f"/repos/{self.org_name}/{repo_name}/issues/{issue_num}/labels"
 | 
						|
                full_url = f"{self.api_client.configuration.host}{path}"
 | 
						|
                token = (
 | 
						|
                    getattr(self.api_client.configuration, "api_key", {}).get(
 | 
						|
                        "access_token"
 | 
						|
                    )
 | 
						|
                    or settings.gitea_access_token
 | 
						|
                )
 | 
						|
                headers_local = {
 | 
						|
                    "Authorization": f"token {token}",
 | 
						|
                    "Content-Type": "application/json",
 | 
						|
                }
 | 
						|
                return requests.post(
 | 
						|
                    full_url, headers=headers_local, json=payload, timeout=10
 | 
						|
                )
 | 
						|
 | 
						|
            # fallback: if label still not present, POST JSON object {"labels":[name]} to add-labels endpoint
 | 
						|
            # determine if the target label is marked exclusive (force to bool)
 | 
						|
            target_is_exclusive: bool = False
 | 
						|
            try:
 | 
						|
                target_label_obj = repo_label_name_to_obj.get(label_name) or label_obj
 | 
						|
                _tmp_any: Any = getattr(target_label_obj, "exclusive", None) or (
 | 
						|
                    target_label_obj.get("exclusive")
 | 
						|
                    if isinstance(target_label_obj, dict)
 | 
						|
                    else False
 | 
						|
                )
 | 
						|
                target_is_exclusive = bool(_tmp_any)
 | 
						|
            except Exception:
 | 
						|
                target_is_exclusive = False
 | 
						|
 | 
						|
            if target_is_exclusive:
 | 
						|
                # find other exclusive labels present on this issue and remove them
 | 
						|
                other_exclusive_label_ids: List[int] = []
 | 
						|
                try:
 | 
						|
                    for lname in issue_labels or []:
 | 
						|
                        if lname == label_name:
 | 
						|
                            continue
 | 
						|
                        lobj = repo_label_name_to_obj.get(lname)
 | 
						|
                        is_ex = False
 | 
						|
                        if lobj is not None:
 | 
						|
                            is_ex = bool(
 | 
						|
                                getattr(lobj, "exclusive", None)
 | 
						|
                                or (
 | 
						|
                                    lobj.get("exclusive")
 | 
						|
                                    if isinstance(lobj, dict)
 | 
						|
                                    else False
 | 
						|
                                )
 | 
						|
                            )
 | 
						|
                        else:
 | 
						|
                            # try to find by name in labels list
 | 
						|
                            for ll in labels:
 | 
						|
                                n = getattr(ll, "name", None) or (
 | 
						|
                                    ll.get("name") if isinstance(ll, dict) else None
 | 
						|
                                )
 | 
						|
                                if n == lname:
 | 
						|
                                    _tmp_any_ex: Any = getattr(
 | 
						|
                                        ll, "exclusive", None
 | 
						|
                                    ) or (
 | 
						|
                                        ll.get("exclusive")
 | 
						|
                                        if isinstance(ll, dict)
 | 
						|
                                        else False
 | 
						|
                                    )
 | 
						|
                                    is_ex = bool(_tmp_any_ex)
 | 
						|
                                    break
 | 
						|
                        if is_ex:
 | 
						|
                            # determine id
 | 
						|
                            lid = None
 | 
						|
                            try:
 | 
						|
                                candidate = next(
 | 
						|
                                    (
 | 
						|
                                        ll
 | 
						|
                                        for ll in labels
 | 
						|
                                        if (
 | 
						|
                                            getattr(ll, "name", None)
 | 
						|
                                            or (
 | 
						|
                                                ll.get("name")
 | 
						|
                                                if isinstance(ll, dict)
 | 
						|
                                                else None
 | 
						|
                                            )
 | 
						|
                                        )
 | 
						|
                                        == lname
 | 
						|
                                    ),
 | 
						|
                                    None,
 | 
						|
                                )
 | 
						|
                                if candidate is not None:
 | 
						|
                                    lid = getattr(candidate, "id", None) or (
 | 
						|
                                        candidate.get("id")
 | 
						|
                                        if isinstance(candidate, dict)
 | 
						|
                                        else None
 | 
						|
                                    )
 | 
						|
                            except Exception:
 | 
						|
                                lid = None
 | 
						|
                            if lid is not None:
 | 
						|
                                other_exclusive_label_ids.append(lid)
 | 
						|
                except Exception:
 | 
						|
                    other_exclusive_label_ids = []
 | 
						|
 | 
						|
                # delete other exclusive labels by numeric id
 | 
						|
                token = (
 | 
						|
                    getattr(self.api_client.configuration, "api_key", {}).get(
 | 
						|
                        "access_token"
 | 
						|
                    )
 | 
						|
                    or settings.gitea_access_token
 | 
						|
                )
 | 
						|
                headers_local = {"Authorization": f"token {token}"}
 | 
						|
                for other_lid in other_exclusive_label_ids:
 | 
						|
                    try:
 | 
						|
                        path_id = f"/repos/{self.org_name}/{repo_name}/issues/{num}/labels/{other_lid}"
 | 
						|
                        url_id = f"{self.api_client.configuration.host}{path_id}"
 | 
						|
                        resp = requests.delete(
 | 
						|
                            url_id, headers=headers_local, timeout=10
 | 
						|
                        )
 | 
						|
                        if resp.status_code in (200, 204):
 | 
						|
                            logger.info(
 | 
						|
                                f"Removed exclusive label id {other_lid} from {repo_name}#{num}"
 | 
						|
                            )
 | 
						|
                        else:
 | 
						|
                            logger.warning(
 | 
						|
                                f"Failed to remove exclusive label id {other_lid} from {repo_name}#{num}: status={resp.status_code}"
 | 
						|
                            )
 | 
						|
                    except Exception as e:
 | 
						|
                        logger.warning(
 | 
						|
                            f"Error removing exclusive label id {other_lid} from {repo_name}#{num}: {e}"
 | 
						|
                        )
 | 
						|
 | 
						|
            if label_name not in (issue_labels or []):
 | 
						|
                try:
 | 
						|
                    resp = _do_post_labels(num, {"labels": [label_name]})
 | 
						|
                    if resp.status_code not in (200, 201):
 | 
						|
                        logger.error(
 | 
						|
                            f"Failed to add label via add-labels endpoint for issue #{num}: status={resp.status_code}"
 | 
						|
                        )
 | 
						|
                except Exception as e:
 | 
						|
                    logger.error(f"Failed to POST object payload to issue #{num}: {e}")
 | 
						|
 | 
						|
            # final verification
 | 
						|
            issue_labels = _fetch_issue_labels_via_api()
 | 
						|
            if label_name in (issue_labels or []):
 | 
						|
                logger.info(
 | 
						|
                    f"Label '{label_name}' attached to issue #{num} in {repo_name}"
 | 
						|
                )
 | 
						|
            else:
 | 
						|
                logger.warning(
 | 
						|
                    f"Label '{label_name}' not attached to issue #{num} in {repo_name} after attempts"
 | 
						|
                )
 | 
						|
 | 
						|
    def delete_label(
 | 
						|
        self,
 | 
						|
        repo_name: str,
 | 
						|
        label_name: str,
 | 
						|
        issue_numbers: Optional[List[int]] = None,
 | 
						|
        delete_repo_label: bool = False,
 | 
						|
    ) -> None:
 | 
						|
        token = (
 | 
						|
            getattr(self.api_client.configuration, "api_key", {}).get("access_token")
 | 
						|
            or settings.gitea_access_token
 | 
						|
        )
 | 
						|
        headers_local = {"Authorization": f"token {token}"}
 | 
						|
        repo_labels: List[Any] = []
 | 
						|
        try:
 | 
						|
            repo_labels = list(
 | 
						|
                cast(
 | 
						|
                    Iterable[Any],
 | 
						|
                    self.issue_api.issue_list_labels(self.org_name, repo_name),
 | 
						|
                )
 | 
						|
            )
 | 
						|
            label_name_to_id: Dict[str, int] = {}
 | 
						|
            for l in repo_labels:
 | 
						|
                name = getattr(l, "name", None) or (
 | 
						|
                    l.get("name") if isinstance(l, dict) else None
 | 
						|
                )
 | 
						|
                lid = getattr(l, "id", None) or (
 | 
						|
                    l.get("id") if isinstance(l, dict) else None
 | 
						|
                )
 | 
						|
                if isinstance(name, str) and isinstance(lid, int):
 | 
						|
                    label_name_to_id[name] = lid
 | 
						|
        except Exception:
 | 
						|
            label_name_to_id = {}
 | 
						|
 | 
						|
        def _delete_issue_label(issue_num: int) -> None:
 | 
						|
            lid = label_name_to_id.get(label_name)
 | 
						|
            if lid is None:
 | 
						|
                try:
 | 
						|
                    for l in repo_labels:
 | 
						|
                        name = getattr(l, "name", None) or (
 | 
						|
                            l.get("name") if isinstance(l, dict) else None
 | 
						|
                        )
 | 
						|
                        if name == label_name:
 | 
						|
                            lid = getattr(l, "id", None) or (
 | 
						|
                                l.get("id") if isinstance(l, dict) else None
 | 
						|
                            )
 | 
						|
                            break
 | 
						|
                except Exception:
 | 
						|
                    pass
 | 
						|
 | 
						|
            if lid is None:
 | 
						|
                logger.warning(
 | 
						|
                    f"No numeric id found for label '{label_name}' in {repo_name}; skipping issue-level delete for issue #{issue_num}"
 | 
						|
                )
 | 
						|
                return
 | 
						|
 | 
						|
            path_id = (
 | 
						|
                f"/repos/{self.org_name}/{repo_name}/issues/{issue_num}/labels/{lid}"
 | 
						|
            )
 | 
						|
            url_id = f"{self.api_client.configuration.host}{path_id}"
 | 
						|
            try:
 | 
						|
                resp = requests.delete(url_id, headers=headers_local, timeout=10)
 | 
						|
                if resp.status_code in (200, 204):
 | 
						|
                    logger.info(
 | 
						|
                        f"Removed label '{label_name}' from {repo_name}#{issue_num}"
 | 
						|
                    )
 | 
						|
                    return
 | 
						|
                logger.warning(
 | 
						|
                    f"Numeric-id DELETE returned status {resp.status_code} for {repo_name}#{issue_num}: body={getattr(resp, 'text', None)}"
 | 
						|
                )
 | 
						|
            except Exception as e:
 | 
						|
                logger.error(
 | 
						|
                    f"Numeric-id DELETE error for {repo_name}#{issue_num}: {e}"
 | 
						|
                )
 | 
						|
 | 
						|
        def _delete_repo_label() -> None:
 | 
						|
            enc_name = quote(label_name, safe="")
 | 
						|
            path = f"/repos/{self.org_name}/{repo_name}/labels/{enc_name}"
 | 
						|
            url = f"{self.api_client.configuration.host}{path}"
 | 
						|
            try:
 | 
						|
                resp = requests.delete(url, headers=headers_local, timeout=10)
 | 
						|
                if resp.status_code in (200, 204):
 | 
						|
                    logger.info(
 | 
						|
                        f"Removed repo-level label '{label_name}' from {repo_name}"
 | 
						|
                    )
 | 
						|
                    return
 | 
						|
                logger.error(
 | 
						|
                    f"Failed to delete repo label '{label_name}' from {repo_name}: status={resp.status_code}, body={getattr(resp, 'text', None)}"
 | 
						|
                )
 | 
						|
            except Exception as e:
 | 
						|
                logger.error(
 | 
						|
                    f"Error deleting repo label '{label_name}' from {repo_name}: {e}"
 | 
						|
                )
 | 
						|
 | 
						|
        if issue_numbers:
 | 
						|
            try:
 | 
						|
                issues = {
 | 
						|
                    issue.number: issue
 | 
						|
                    for issue in list_all(
 | 
						|
                        self.issue_api.issue_list_issues, self.org_name, repo_name
 | 
						|
                    )
 | 
						|
                }
 | 
						|
            except ApiException as e:
 | 
						|
                logger.error(f"Failed to list issues for {repo_name}: {e}")
 | 
						|
                return
 | 
						|
            for num in issue_numbers:
 | 
						|
                if num not in issues:
 | 
						|
                    logger.warning(f"Issue #{num} not found in {repo_name}")
 | 
						|
                    continue
 | 
						|
                _delete_issue_label(num)
 | 
						|
        else:
 | 
						|
            if delete_repo_label:
 | 
						|
                _delete_repo_label()
 | 
						|
            else:
 | 
						|
                logger.warning(
 | 
						|
                    "No issue numbers provided and --repo not set; nothing to do for delete_label"
 | 
						|
                )
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    gitea = Gitea()
 |