diff --git a/joint_teapot/workers/gitea.py b/joint_teapot/workers/gitea.py index 159633a..35f3c89 100644 --- a/joint_teapot/workers/gitea.py +++ b/joint_teapot/workers/gitea.py @@ -4,6 +4,8 @@ from functools import lru_cache from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, TypeVar import focs_gitea +import requests +from urllib.parse import quote from canvasapi.group import Group, GroupMembership from canvasapi.paginated_list import PaginatedList from canvasapi.user import User @@ -548,6 +550,212 @@ class Gitea: self.create_milestone(repo_name, milestone, description, due_date) logger.info(f"Created milestone {milestone} in {repo_name}") + def label_issues(self, repo_name: str, label_name: str, issue_numbers: List[int], color: Optional[str] = None) -> None: + if not issue_numbers: + logger.warning("No issue numbers provided to label") + return + + try: + labels = self.issue_api.issue_list_labels(self.org_name, repo_name) + except ApiException as e: + logger.error(f"Failed to list labels for {repo_name}: {e}") + return + + label_obj = None + for l in labels: + if getattr(l, "name", None) == label_name: + label_obj = l + break + + if not label_obj: + chosen_color = None + if color: + c = color.strip() + if c.startswith('#'): + c = c[1:] + if re.fullmatch(r"[0-9a-fA-F]{6}", c): + chosen_color = c + else: + logger.warning(f"Provided color '{color}' is not a valid 3- or 6-digit hex; falling back to default") + if chosen_color is None: + chosen_color = "CCCCCC" + try: + label_obj = self.issue_api.issue_create_label( + self.org_name, + repo_name, + body={"name": label_name, "color": chosen_color, "exclusive": False}, + ) + logger.info(f"Created label '{label_name}' in {self.org_name}/{repo_name}") + except ApiException as e: + logger.error(f"Failed to create label {label_name} in {repo_name}: {e}") + if hasattr(e, 'body'): + logger.error(f"ApiException body: {getattr(e, 'body', None)}") + return + + label_id = getattr(label_obj, "id", None) + if label_id is None: + try: + label_id = label_obj.get("id") + except Exception: + label_id = None + + if label_id is None: + logger.error(f"Unable to determine id of label '{label_name}' in {repo_name}") + return + + try: + issues = {issue.number: issue for issue in list_all(self.issue_api.issue_list_issues, self.org_name, repo_name)} + except ApiException as e: + logger.error(f"Failed to list issues for {repo_name}: {e}") + return + + for num in issue_numbers: + issue = issues.get(num) + if issue is None: + logger.warning(f"Issue #{num} not found in {repo_name}") + continue + + def _extract_label_names_from_issue(issue_obj) -> List[str]: + try: + return [getattr(l, "name", l) for l in getattr(issue_obj, "labels", []) or []] + except Exception: + return [] + + existing_label_names = _extract_label_names_from_issue(issue) + if label_name in existing_label_names: + logger.info(f"Issue #{num} in {repo_name} already has label '{label_name}'") + continue + + existing_label_ids: List[int] = [] + try: + for l in getattr(issue, "labels", []) or []: + lid = getattr(l, "id", None) + if lid is None: + try: + lid = l.get("id") + except Exception: + lid = None + if lid is not None: + existing_label_ids.append(lid) + except Exception: + existing_label_ids = [] + + if label_id in existing_label_ids: + logger.info(f"Issue #{num} in {repo_name} already has label '{label_name}' (by id)") + continue + + # verification + def _fetch_issue_labels_via_api() -> List[str]: + try: + single = self.issue_api.issue_get_issue(self.org_name, repo_name, num) + return _extract_label_names_from_issue(single) + except Exception: + try: + updated = next((i for i in list_all(self.issue_api.issue_list_issues, self.org_name, repo_name) if getattr(i, "number", None) == num), None) + if updated is None: + return [] + return _extract_label_names_from_issue(updated) + except Exception: + return [] + + issue_labels = _fetch_issue_labels_via_api() + + # prepare low-level HTTP helpers for fallbacks + def _do_post_labels(issue_num: int, payload) -> requests.Response: + path = f"/repos/{self.org_name}/{repo_name}/issues/{issue_num}/labels" + full_url = f"{self.api_client.configuration.host}{path}" + token = getattr(self.api_client.configuration, 'api_key', {}) .get('access_token') or settings.gitea_access_token + headers_local = { + "Authorization": f"token {token}", + "Content-Type": "application/json", + } + return requests.post(full_url, headers=headers_local, json=payload, timeout=10) + + # fallback: if label still not present, POST JSON object {"labels":[name]} to add-labels endpoint + if label_name not in (issue_labels or []): + try: + resp = _do_post_labels(num, {"labels": [label_name]}) + if resp.status_code not in (200, 201): + logger.error(f"Failed to add label via add-labels endpoint for issue #{num}: status={resp.status_code}") + except Exception as e: + logger.error(f"Failed to POST object payload to issue #{num}: {e}") + + # final verification + issue_labels = _fetch_issue_labels_via_api() + if label_name in (issue_labels or []): + logger.info(f"Label '{label_name}' attached to issue #{num} in {repo_name}") + else: + logger.warning(f"Label '{label_name}' not attached to issue #{num} in {repo_name} after attempts") + + + def delete_label(self, repo_name: str, label_name: str, issue_numbers: Optional[List[int]] = None, delete_repo_label: bool = False) -> None: + token = getattr(self.api_client.configuration, 'api_key', {}) .get('access_token') or settings.gitea_access_token + headers_local = {"Authorization": f"token {token}"} + try: + repo_labels = self.issue_api.issue_list_labels(self.org_name, repo_name) + label_name_to_id: Dict[str, int] = { + (getattr(l, 'name', None) or (l.get('name') if isinstance(l, dict) else None)): (getattr(l, 'id', None) or (l.get('id') if isinstance(l, dict) else None)) + for l in repo_labels + } + except Exception: + label_name_to_id = {} + + def _delete_issue_label(issue_num: int) -> None: + lid = label_name_to_id.get(label_name) + if lid is None: + try: + for l in self.issue_api.issue_list_labels(self.org_name, repo_name): + name = getattr(l, 'name', None) or (l.get('name') if isinstance(l, dict) else None) + if name == label_name: + lid = getattr(l, 'id', None) or (l.get('id') if isinstance(l, dict) else None) + break + except Exception: + pass + + if lid is None: + logger.warning(f"No numeric id found for label '{label_name}' in {repo_name}; skipping issue-level delete for issue #{issue_num}") + return + + path_id = f"/repos/{self.org_name}/{repo_name}/issues/{issue_num}/labels/{lid}" + url_id = f"{self.api_client.configuration.host}{path_id}" + try: + resp = requests.delete(url_id, headers=headers_local, timeout=10) + if resp.status_code in (200, 204): + logger.info(f"Removed label '{label_name}' from {repo_name}#{issue_num}") + return + logger.warning(f"Numeric-id DELETE returned status {resp.status_code} for {repo_name}#{issue_num}: body={getattr(resp, 'text', None)}") + except Exception as e: + logger.error(f"Numeric-id DELETE error for {repo_name}#{issue_num}: {e}") + + def _delete_repo_label() -> None: + enc_name = quote(label_name, safe='') + path = f"/repos/{self.org_name}/{repo_name}/labels/{enc_name}" + url = f"{self.api_client.configuration.host}{path}" + try: + resp = requests.delete(url, headers=headers_local, timeout=10) + if resp.status_code in (200, 204): + logger.info(f"Removed repo-level label '{label_name}' from {repo_name}") + return + logger.error(f"Failed to delete repo label '{label_name}' from {repo_name}: status={resp.status_code}, body={getattr(resp, 'text', None)}") + except Exception as e: + logger.error(f"Error deleting repo label '{label_name}' from {repo_name}: {e}") + + if issue_numbers: + try: + issues = {issue.number: issue for issue in list_all(self.issue_api.issue_list_issues, self.org_name, repo_name)} + except ApiException as e: + logger.error(f"Failed to list issues for {repo_name}: {e}") + return + for num in issue_numbers: + if num not in issues: + logger.warning(f"Issue #{num} not found in {repo_name}") + continue + _delete_issue_label(num) + else: + if delete_repo_label: + _delete_repo_label() + else: + logger.warning("No issue numbers provided and --repo not set; nothing to do for delete_label") if __name__ == "__main__": gitea = Gitea()