From 69f1552044b935c39adde17830758ba0ea241726 Mon Sep 17 00:00:00 2001 From: arthurcai Date: Sun, 21 Sep 2025 23:20:34 +0800 Subject: [PATCH] refactor: label issues update and fix dry-run issues --- joint_teapot/app.py | 64 +--- joint_teapot/workers/gitea.py | 530 ++++++++++++++-------------------- 2 files changed, 226 insertions(+), 368 deletions(-) diff --git a/joint_teapot/app.py b/joint_teapot/app.py index 4ba1198..c04fc80 100644 --- a/joint_teapot/app.py +++ b/joint_teapot/app.py @@ -169,21 +169,18 @@ def close_all_issues() -> None: @app.command( "close-issues", - help="close specific issues in a repository: joint-teapot close-issues REPO_NAME ISSUE_NUMBER [ISSUE_NUMBER ...]", + help="close specific issues in a repository", ) def close_issues( repo_name: str, issue_numbers: List[int] = Argument(..., help="One or more issue numbers to close"), - dry_run: bool = Option( - False, "--dry-run/--no-dry-run", help="Enable dry run (no changes will be made)" - ), ) -> None: - tea.pot.gitea.close_issues(repo_name, issue_numbers, dry_run) + tea.pot.gitea.close_issues(repo_name, issue_numbers) @app.command( "label-issues", - help="add a label to specific issues in a repository: joint-teapot label-issues TAG_NAME REPO_NAME ISSUE_NUMBER [ISSUE_NUMBER ...]", + help="add a label to specific issues in a repository", ) def label_issues( label_name: str, @@ -192,61 +189,26 @@ def label_issues( issue_color: Optional[str] = Option( None, "--color", help="Color for newly created label (hex without # or with #)" ), - dry_run: bool = Option( - False, - "--dry-run/--no-dry-run", - help="Dry run: show what would be labeled without making changes", - ), ) -> None: - if dry_run: - logger.info( - f"Dry run: would add label '{label_name}' to {repo_name} issues: {issue_numbers}" - ) - return tea.pot.gitea.label_issues(repo_name, label_name, issue_numbers, color=issue_color) @app.command( "delete-label", - help="remove a label from specific issues or delete the repository label: joint-teapot delete-label TAG_NAME REPO_NAME [ISSUE_NUMBER ...]", + help="remove a label from specific issues or delete the repository label", ) def delete_label( label_name: str, repo_name: str, issue_numbers: List[int] = Argument( None, - help="Zero or more issue numbers to remove the label from (if empty and --repo is set, delete repo label)", - ), - repo: bool = Option( - False, - "--repo", - help="Delete the repo-level label when set and issue_numbers is empty", - ), - dry_run: bool = Option( - False, - "--dry-run/--no-dry-run", - help="Dry run: show what would be removed without making changes", + help="issue numbers to remove the label from", ), ) -> None: - if dry_run: - if issue_numbers: - logger.info( - f"Dry run: would remove label '{label_name}' from {repo_name} issues: {issue_numbers}" - ) - elif repo: - logger.info( - f"Dry run: would delete repo label '{label_name}' from {repo_name}" - ) - else: - logger.info( - "Dry run: no action specified (provide issue numbers or --repo to delete repo label)" - ) - return tea.pot.gitea.delete_label( repo_name, label_name, issue_numbers if issue_numbers else None, - delete_repo_label=repo, ) @@ -340,11 +302,6 @@ def update_group_channels_on_mm( "--update-teaching-team/--no-update-teaching-team", help="Whether to update teaching team", ), - dry_run: bool = Option( - False, - "--dry-run/--no-dry-run", - help="Dry run: show what would be added without making changes", - ), ) -> None: groups = { group_name: members @@ -355,9 +312,7 @@ def update_group_channels_on_mm( f"{len(groups)} group channel(s) to update" + (f" with suffix {suffix}" if suffix else "") ) - tea.pot.mattermost.update_channels_for_groups( - groups, suffix, update_teaching_team, dry_run - ) + tea.pot.mattermost.update_channels_for_groups(groups, suffix, update_teaching_team) @app.command( @@ -370,14 +325,9 @@ def update_personal_channels_on_mm( "--update-teaching-team/--no-update-teaching-team", help="Whether to update teaching team", ), - dry_run: bool = Option( - False, - "--dry-run/--no-dry-run", - help="Dry run: show what would be added without making changes", - ), ) -> None: tea.pot.mattermost.update_channels_for_individuals( - tea.pot.canvas.students, update_teaching_team, dry_run + tea.pot.canvas.students, update_teaching_team ) diff --git a/joint_teapot/workers/gitea.py b/joint_teapot/workers/gitea.py index 70dfb31..25fa919 100644 --- a/joint_teapot/workers/gitea.py +++ b/joint_teapot/workers/gitea.py @@ -40,6 +40,165 @@ def list_all(method: Callable[..., Any], *args: Any, **kwargs: Any) -> List[Any] return all_res +def _get_color(c: Optional[str]) -> str: + if not c: + return "CCCCCC" + s = c.strip() + if s.startswith("#"): + s = s[1:] + if re.fullmatch(r"[0-9a-fA-F]{6}", s): + return s + logger.warning(f"Provided color '{c}' is not a valid hex; falling back to #CCCCCC") + return "CCCCCC" + + +def _label_id_from_obj(obj: Any) -> Optional[int]: + try: + if isinstance(obj, dict): + return obj.get("id") + return getattr(obj, "id", None) + except Exception: + return None + + +def _get_repo_labels(gitea: Any, repo_name: str) -> List[Any]: + try: + return list( + cast( + Iterable[Any], + gitea.issue_api.issue_list_labels(gitea.org_name, repo_name), + ) + ) + except ApiException as e: + logger.error(f"Failed to list labels for {repo_name}: {e}") + return [] + + +def _list_issues_map(gitea: Any, repo_name: str) -> Dict[int, Any]: + try: + return { + issue.number: issue + for issue in list_all( + gitea.issue_api.issue_list_issues, gitea.org_name, repo_name + ) + } + except ApiException as e: + logger.error(f"Failed to list issues for {repo_name}: {e}") + return {} + + +def _api_post_labels(gitea: Any, repo_name: str, issue_num: int, payload: Any) -> Any: + path = f"/repos/{gitea.org_name}/{repo_name}/issues/{issue_num}/labels" + full_url = f"{gitea.api_client.configuration.host}{path}" + token = ( + getattr(gitea.api_client.configuration, "api_key", {}).get("access_token") + or settings.gitea_access_token + ) + headers_local = { + "Authorization": f"token {token}", + "Content-Type": "application/json", + } + return requests.post(full_url, headers=headers_local, json=payload, timeout=10) + + +def _patch_label_exclusive( + gitea: Any, repo_name: str, name: str, label_obj: Any +) -> None: + try: + existing_color = getattr(label_obj, "color", None) or ( + label_obj.get("color") if isinstance(label_obj, dict) else None + ) + if ( + existing_color + and isinstance(existing_color, str) + and existing_color.startswith("#") + ): + existing_color = existing_color[1:] + enc_name = quote(name, safe="") + path = f"/repos/{gitea.org_name}/{repo_name}/labels/{enc_name}" + url = f"{gitea.api_client.configuration.host}{path}" + token = ( + getattr(gitea.api_client.configuration, "api_key", {}).get("access_token") + or settings.gitea_access_token + ) + headers_local = { + "Authorization": f"token {token}", + "Content-Type": "application/json", + } + payload = {"exclusive": True, "name": name} + if existing_color: + payload["color"] = existing_color + resp = requests.patch(url, headers=headers_local, json=payload, timeout=10) + if resp.status_code in (200, 201): + logger.info(f"Marked existing label '{name}' as exclusive in {repo_name}") + else: + logger.warning( + f"Failed to mark existing label '{name}' as exclusive: status={resp.status_code}" + ) + except Exception as e: + logger.debug(f"Could not patch label exclusive for {name}: {e}") + + +def _delete_issue_label_by_id( + gitea: Any, repo_name: str, issue_num: int, lid: int +) -> None: + try: + path_id = f"/repos/{gitea.org_name}/{repo_name}/issues/{issue_num}/labels/{lid}" + url_id = f"{gitea.api_client.configuration.host}{path_id}" + token = ( + getattr(gitea.api_client.configuration, "api_key", {}).get("access_token") + or settings.gitea_access_token + ) + headers_local = {"Authorization": f"token {token}"} + resp = requests.delete(url_id, headers=headers_local, timeout=10) + if resp.status_code in (200, 204): + logger.info(f"Removed label id {lid} from {repo_name}#{issue_num}") + else: + logger.warning( + f"Failed to remove label id {lid} from {repo_name}#{issue_num}: status={resp.status_code}" + ) + except Exception as e: + logger.warning( + f"Error removing label id {lid} from {repo_name}#{issue_num}: {e}" + ) + + +def _create_label( + gitea: Any, repo_name: str, label_name: str, color: Optional[str], labels: List[Any] +) -> Optional[Any]: + for l in labels: + if getattr(l, "name", None) == label_name: + try: + is_ex = getattr(l, "exclusive", None) + if not bool(is_ex): + _patch_label_exclusive(gitea, repo_name, label_name, l) + except Exception: + return l + return l + + chosen_color = _get_color(color) + try: + new = gitea.issue_api.issue_create_label( + gitea.org_name, + repo_name, + body={"name": label_name, "color": chosen_color, "exclusive": True}, + ) + logger.info(f"Created label '{label_name}' in {gitea.org_name}/{repo_name}") + return new + except ApiException as e: + logger.error(f"Failed to create label {label_name} in {repo_name}: {e}") + if hasattr(e, "body"): + logger.error(f"ApiException body: {getattr(e, 'body', None)}") + return None + + +def _extract_label_names(issue_obj: Any) -> List[str]: + try: + return [getattr(l, "name", l) for l in getattr(issue_obj, "labels", []) or []] + except Exception: + return [] + + class Gitea: def __init__( self, @@ -482,7 +641,7 @@ class Gitea: ) def close_issues( - self, repo_name: str, issue_numbers: List[int], dry_run: bool = True + self, repo_name: str, issue_numbers: List[int], dry_run: bool = False ) -> None: if not issue_numbers: logger.warning("No issue numbers provided to close") @@ -601,354 +760,103 @@ class Gitea: logger.warning("No issue numbers provided to label") return - try: - labels = list( - cast( - Iterable[Any], - self.issue_api.issue_list_labels(self.org_name, repo_name), - ) - ) - except ApiException as e: - logger.error(f"Failed to list labels for {repo_name}: {e}") + labels = _get_repo_labels(self, repo_name) + if not labels: + logger.warning(f"No labels found for {repo_name}") return - # Build a lookup for repo labels by name to read metadata like 'exclusive' repo_label_name_to_obj: Dict[str, Any] = {} - try: - for l in labels: - name = getattr(l, "name", None) or ( - l.get("name") if isinstance(l, dict) else None - ) - if isinstance(name, str): - repo_label_name_to_obj[name] = l - except Exception: - repo_label_name_to_obj = {} - - label_obj = None for l in labels: - if getattr(l, "name", None) == label_name: - label_obj = l - break - - if not label_obj: - chosen_color = None - if color: - c = color.strip() - if c.startswith("#"): - c = c[1:] - if re.fullmatch(r"[0-9a-fA-F]{6}", c): - chosen_color = c - else: - logger.warning( - f"Provided color '{color}' is not a valid 3- or 6-digit hex; falling back to default" - ) - if chosen_color is None: - chosen_color = "CCCCCC" - try: - # Create the label and mark it as exclusive so subsequent labels added by this - # command are treated as exclusive by Gitea (if supported by the server) - label_obj = self.issue_api.issue_create_label( - self.org_name, - repo_name, - body={"name": label_name, "color": chosen_color, "exclusive": True}, - ) - logger.info( - f"Created label '{label_name}' in {self.org_name}/{repo_name}" - ) - except ApiException as e: - logger.error(f"Failed to create label {label_name} in {repo_name}: {e}") - if hasattr(e, "body"): - logger.error(f"ApiException body: {getattr(e, 'body', None)}") - return - - else: - try: - existing_color = getattr(label_obj, "color", None) or ( - label_obj.get("color") if isinstance(label_obj, dict) else None - ) - if ( - existing_color - and isinstance(existing_color, str) - and existing_color.startswith("#") - ): - existing_color = existing_color[1:] - is_exclusive = getattr(label_obj, "exclusive", None) - if not is_exclusive: - enc_name = quote(label_name, safe="") - path = f"/repos/{self.org_name}/{repo_name}/labels/{enc_name}" - url = f"{self.api_client.configuration.host}{path}" - token = ( - getattr(self.api_client.configuration, "api_key", {}).get( - "access_token" - ) - or settings.gitea_access_token - ) - headers_local = { - "Authorization": f"token {token}", - "Content-Type": "application/json", - } - payload = {"exclusive": True, "name": label_name} - if existing_color: - payload["color"] = existing_color - try: - resp = requests.patch( - url, headers=headers_local, json=payload, timeout=10 - ) - if resp.status_code in (200, 201): - logger.info( - f"Marked existing label '{label_name}' as exclusive in {repo_name}" - ) - else: - logger.warning( - f"Failed to mark existing label '{label_name}' as exclusive: status={resp.status_code}, body={getattr(resp, 'text', None)}" - ) - except Exception as e: - logger.warning( - f"Error while trying to mark label '{label_name}' exclusive: {e}" - ) - except Exception: - logger.debug( - f"Could not ensure exclusive attribute for label '{label_name}' (continuing)" - ) - - # Determine numeric id of the label in a type-safe manner - label_id = None - try: - if isinstance(label_obj, dict): - label_id = label_obj.get("id") - else: - label_id = getattr(label_obj, "id", None) - except Exception: - label_id = None - - if label_id is None: - logger.error( - f"Unable to determine id of label '{label_name}' in {repo_name}" + name = getattr(l, "name", None) or ( + l.get("name") if isinstance(l, dict) else None ) + if isinstance(name, str): + repo_label_name_to_obj[name] = l + + label_obj = _create_label(self, repo_name, label_name, color, labels) + if label_obj is None: + logger.error(f"Unable to ensure label '{label_name}' exists in {repo_name}") return - try: - issues = { - issue.number: issue - for issue in list_all( - self.issue_api.issue_list_issues, self.org_name, repo_name - ) - } - except ApiException as e: - logger.error(f"Failed to list issues for {repo_name}: {e}") + label_id = _label_id_from_obj(label_obj) + if label_id is None: + logger.error(f"Unable to find id of label '{label_name}' in {repo_name}") + return + issues_map = _list_issues_map(self, repo_name) + if not issues_map: return for num in issue_numbers: - issue = issues.get(num) + issue = issues_map.get(num) if issue is None: logger.warning(f"Issue #{num} not found in {repo_name}") continue - def _extract_label_names_from_issue(issue_obj: Any) -> List[str]: - try: - return [ - getattr(l, "name", l) - for l in getattr(issue_obj, "labels", []) or [] - ] - except Exception: - return [] - - existing_label_names = _extract_label_names_from_issue(issue) + existing_label_names = _extract_label_names(issue) if label_name in existing_label_names: logger.info( f"Issue #{num} in {repo_name} already has label '{label_name}'" ) continue - existing_label_ids: List[int] = [] try: - for l in getattr(issue, "labels", []) or []: - lid = getattr(l, "id", None) - if lid is None: - try: - lid = l.get("id") - except Exception: - lid = None - if lid is not None: - existing_label_ids.append(lid) + current = self.issue_api.issue_get_issue(self.org_name, repo_name, num) + issue_labels = _extract_label_names(current) except Exception: - existing_label_ids = [] + issue_labels = existing_label_names - if label_id in existing_label_ids: - logger.info( - f"Issue #{num} in {repo_name} already has label '{label_name}' (by id)" - ) - continue - - # verification - def _fetch_issue_labels_via_api() -> List[str]: - try: - single = self.issue_api.issue_get_issue( - self.org_name, repo_name, num - ) - return _extract_label_names_from_issue(single) - except Exception: - try: - updated = next( - ( - i - for i in list_all( - self.issue_api.issue_list_issues, - self.org_name, - repo_name, - ) - if getattr(i, "number", None) == num - ), - None, - ) - if updated is None: - return [] - return _extract_label_names_from_issue(updated) - except Exception: - return [] - - issue_labels = _fetch_issue_labels_via_api() - - # prepare low-level HTTP helpers for fallbacks - def _do_post_labels(issue_num: int, payload: Any) -> Any: - path = f"/repos/{self.org_name}/{repo_name}/issues/{issue_num}/labels" - full_url = f"{self.api_client.configuration.host}{path}" - token = ( - getattr(self.api_client.configuration, "api_key", {}).get( - "access_token" - ) - or settings.gitea_access_token - ) - headers_local = { - "Authorization": f"token {token}", - "Content-Type": "application/json", - } - return requests.post( - full_url, headers=headers_local, json=payload, timeout=10 - ) - - # fallback: if label still not present, POST JSON object {"labels":[name]} to add-labels endpoint - # determine if the target label is marked exclusive (force to bool) - target_is_exclusive: bool = False try: - target_label_obj = repo_label_name_to_obj.get(label_name) or label_obj - _tmp_any: Any = getattr(target_label_obj, "exclusive", None) or ( - target_label_obj.get("exclusive") - if isinstance(target_label_obj, dict) - else False + target_obj = repo_label_name_to_obj.get(label_name) or label_obj + target_is_exclusive = bool( + getattr(target_obj, "exclusive", None) + or ( + target_obj.get("exclusive") + if isinstance(target_obj, dict) + else False + ) ) - target_is_exclusive = bool(_tmp_any) except Exception: target_is_exclusive = False if target_is_exclusive: - # find other exclusive labels present on this issue and remove them - other_exclusive_label_ids: List[int] = [] - try: - for lname in issue_labels or []: - if lname == label_name: - continue - lobj = repo_label_name_to_obj.get(lname) - is_ex = False - if lobj is not None: - is_ex = bool( - getattr(lobj, "exclusive", None) - or ( - lobj.get("exclusive") - if isinstance(lobj, dict) - else False - ) + for lname in issue_labels or []: + if lname == label_name: + continue + lobj = repo_label_name_to_obj.get(lname) + is_ex = False + if lobj is not None: + is_ex = bool( + getattr(lobj, "exclusive", None) + or ( + lobj.get("exclusive") + if isinstance(lobj, dict) + else False ) - else: - # try to find by name in labels list - for ll in labels: - n = getattr(ll, "name", None) or ( - ll.get("name") if isinstance(ll, dict) else None - ) - if n == lname: - _tmp_any_ex: Any = getattr( - ll, "exclusive", None - ) or ( - ll.get("exclusive") - if isinstance(ll, dict) - else False - ) - is_ex = bool(_tmp_any_ex) - break - if is_ex: - # determine id - lid = None - try: - candidate = next( - ( - ll - for ll in labels - if ( - getattr(ll, "name", None) - or ( - ll.get("name") - if isinstance(ll, dict) - else None - ) - ) - == lname - ), - None, - ) - if candidate is not None: - lid = getattr(candidate, "id", None) or ( - candidate.get("id") - if isinstance(candidate, dict) - else None - ) - except Exception: - lid = None - if lid is not None: - other_exclusive_label_ids.append(lid) - except Exception: - other_exclusive_label_ids = [] - - # delete other exclusive labels by numeric id - token = ( - getattr(self.api_client.configuration, "api_key", {}).get( - "access_token" - ) - or settings.gitea_access_token - ) - headers_local = {"Authorization": f"token {token}"} - for other_lid in other_exclusive_label_ids: - try: - path_id = f"/repos/{self.org_name}/{repo_name}/issues/{num}/labels/{other_lid}" - url_id = f"{self.api_client.configuration.host}{path_id}" - resp = requests.delete( - url_id, headers=headers_local, timeout=10 - ) - if resp.status_code in (200, 204): - logger.info( - f"Removed exclusive label id {other_lid} from {repo_name}#{num}" - ) - else: - logger.warning( - f"Failed to remove exclusive label id {other_lid} from {repo_name}#{num}: status={resp.status_code}" - ) - except Exception as e: - logger.warning( - f"Error removing exclusive label id {other_lid} from {repo_name}#{num}: {e}" ) + if is_ex: + lid = _label_id_from_obj(lobj) if lobj is not None else None + if lid is not None: + _delete_issue_label_by_id(self, repo_name, num, lid) if label_name not in (issue_labels or []): try: - resp = _do_post_labels(num, {"labels": [label_name]}) - if resp.status_code not in (200, 201): + resp = _api_post_labels( + self, repo_name, num, {"labels": [label_name]} + ) + if getattr(resp, "status_code", None) not in (200, 201): logger.error( - f"Failed to add label via add-labels endpoint for issue #{num}: status={resp.status_code}" + f"Failed to add label via add-labels endpoint for issue #{num}: status={getattr(resp, 'status_code', None)}" ) except Exception as e: - logger.error(f"Failed to POST object payload to issue #{num}: {e}") + logger.error(f"Failed to POST labels to issue #{num}: {e}") - # final verification - issue_labels = _fetch_issue_labels_via_api() - if label_name in (issue_labels or []): + # verification + try: + final = self.issue_api.issue_get_issue(self.org_name, repo_name, num) + final_labels = _extract_label_names(final) + except Exception: + final_labels = [] + if label_name in (final_labels or []): logger.info( f"Label '{label_name}' attached to issue #{num} in {repo_name}" )