feat: new commands in mm channels and gitea issues #6

Open
蔡雨翔524370910013 wants to merge 12 commits from arthurcai/Joint-Teapot:master into master
Showing only changes of commit 0e37b5d444 - Show all commits

View File

@ -4,6 +4,8 @@ from functools import lru_cache
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, TypeVar
import focs_gitea
import requests
from urllib.parse import quote
from canvasapi.group import Group, GroupMembership
from canvasapi.paginated_list import PaginatedList
from canvasapi.user import User
@ -548,6 +550,212 @@ class Gitea:
self.create_milestone(repo_name, milestone, description, due_date)
logger.info(f"Created milestone {milestone} in {repo_name}")
def label_issues(self, repo_name: str, label_name: str, issue_numbers: List[int], color: Optional[str] = None) -> None:
if not issue_numbers:
logger.warning("No issue numbers provided to label")
return
try:
labels = self.issue_api.issue_list_labels(self.org_name, repo_name)
except ApiException as e:
logger.error(f"Failed to list labels for {repo_name}: {e}")
return
label_obj = None
for l in labels:
if getattr(l, "name", None) == label_name:
label_obj = l
break
if not label_obj:
chosen_color = None
if color:
c = color.strip()
if c.startswith('#'):
c = c[1:]
if re.fullmatch(r"[0-9a-fA-F]{6}", c):
chosen_color = c
else:
logger.warning(f"Provided color '{color}' is not a valid 3- or 6-digit hex; falling back to default")
if chosen_color is None:
chosen_color = "CCCCCC"
try:
label_obj = self.issue_api.issue_create_label(
self.org_name,
repo_name,
body={"name": label_name, "color": chosen_color, "exclusive": False},
)
logger.info(f"Created label '{label_name}' in {self.org_name}/{repo_name}")
except ApiException as e:
logger.error(f"Failed to create label {label_name} in {repo_name}: {e}")
if hasattr(e, 'body'):
logger.error(f"ApiException body: {getattr(e, 'body', None)}")
return

function body too long

function body too long
label_id = getattr(label_obj, "id", None)
if label_id is None:
try:
label_id = label_obj.get("id")
except Exception:
label_id = None
if label_id is None:
logger.error(f"Unable to determine id of label '{label_name}' in {repo_name}")
return
try:
issues = {issue.number: issue for issue in list_all(self.issue_api.issue_list_issues, self.org_name, repo_name)}
except ApiException as e:
logger.error(f"Failed to list issues for {repo_name}: {e}")
return
for num in issue_numbers:
issue = issues.get(num)
if issue is None:
logger.warning(f"Issue #{num} not found in {repo_name}")
continue
def _extract_label_names_from_issue(issue_obj) -> List[str]:
try:
return [getattr(l, "name", l) for l in getattr(issue_obj, "labels", []) or []]
except Exception:
return []
existing_label_names = _extract_label_names_from_issue(issue)
if label_name in existing_label_names:
logger.info(f"Issue #{num} in {repo_name} already has label '{label_name}'")
continue
existing_label_ids: List[int] = []
try:
for l in getattr(issue, "labels", []) or []:
lid = getattr(l, "id", None)
if lid is None:
try:
lid = l.get("id")
except Exception:
lid = None
if lid is not None:
existing_label_ids.append(lid)
except Exception:
existing_label_ids = []
if label_id in existing_label_ids:
logger.info(f"Issue #{num} in {repo_name} already has label '{label_name}' (by id)")
continue
# verification
def _fetch_issue_labels_via_api() -> List[str]:
try:
single = self.issue_api.issue_get_issue(self.org_name, repo_name, num)
return _extract_label_names_from_issue(single)
except Exception:
try:
updated = next((i for i in list_all(self.issue_api.issue_list_issues, self.org_name, repo_name) if getattr(i, "number", None) == num), None)
if updated is None:
return []
return _extract_label_names_from_issue(updated)
except Exception:
return []
issue_labels = _fetch_issue_labels_via_api()
# prepare low-level HTTP helpers for fallbacks
def _do_post_labels(issue_num: int, payload) -> requests.Response:
path = f"/repos/{self.org_name}/{repo_name}/issues/{issue_num}/labels"
full_url = f"{self.api_client.configuration.host}{path}"
token = getattr(self.api_client.configuration, 'api_key', {}) .get('access_token') or settings.gitea_access_token
headers_local = {
"Authorization": f"token {token}",
"Content-Type": "application/json",
}
return requests.post(full_url, headers=headers_local, json=payload, timeout=10)
# fallback: if label still not present, POST JSON object {"labels":[name]} to add-labels endpoint
if label_name not in (issue_labels or []):
try:
resp = _do_post_labels(num, {"labels": [label_name]})
if resp.status_code not in (200, 201):
logger.error(f"Failed to add label via add-labels endpoint for issue #{num}: status={resp.status_code}")
except Exception as e:
logger.error(f"Failed to POST object payload to issue #{num}: {e}")
# final verification
issue_labels = _fetch_issue_labels_via_api()
if label_name in (issue_labels or []):
logger.info(f"Label '{label_name}' attached to issue #{num} in {repo_name}")
else:
logger.warning(f"Label '{label_name}' not attached to issue #{num} in {repo_name} after attempts")
def delete_label(self, repo_name: str, label_name: str, issue_numbers: Optional[List[int]] = None, delete_repo_label: bool = False) -> None:
token = getattr(self.api_client.configuration, 'api_key', {}) .get('access_token') or settings.gitea_access_token
headers_local = {"Authorization": f"token {token}"}
try:
repo_labels = self.issue_api.issue_list_labels(self.org_name, repo_name)
label_name_to_id: Dict[str, int] = {
(getattr(l, 'name', None) or (l.get('name') if isinstance(l, dict) else None)): (getattr(l, 'id', None) or (l.get('id') if isinstance(l, dict) else None))
for l in repo_labels
}
except Exception:
label_name_to_id = {}
def _delete_issue_label(issue_num: int) -> None:
lid = label_name_to_id.get(label_name)
if lid is None:
try:
for l in self.issue_api.issue_list_labels(self.org_name, repo_name):
name = getattr(l, 'name', None) or (l.get('name') if isinstance(l, dict) else None)
if name == label_name:
lid = getattr(l, 'id', None) or (l.get('id') if isinstance(l, dict) else None)
break
except Exception:
pass
if lid is None:
logger.warning(f"No numeric id found for label '{label_name}' in {repo_name}; skipping issue-level delete for issue #{issue_num}")
return
path_id = f"/repos/{self.org_name}/{repo_name}/issues/{issue_num}/labels/{lid}"
url_id = f"{self.api_client.configuration.host}{path_id}"
try:
resp = requests.delete(url_id, headers=headers_local, timeout=10)
if resp.status_code in (200, 204):
logger.info(f"Removed label '{label_name}' from {repo_name}#{issue_num}")
return
logger.warning(f"Numeric-id DELETE returned status {resp.status_code} for {repo_name}#{issue_num}: body={getattr(resp, 'text', None)}")
except Exception as e:
logger.error(f"Numeric-id DELETE error for {repo_name}#{issue_num}: {e}")
def _delete_repo_label() -> None:
enc_name = quote(label_name, safe='')
path = f"/repos/{self.org_name}/{repo_name}/labels/{enc_name}"
url = f"{self.api_client.configuration.host}{path}"
try:
resp = requests.delete(url, headers=headers_local, timeout=10)
if resp.status_code in (200, 204):
logger.info(f"Removed repo-level label '{label_name}' from {repo_name}")
return
logger.error(f"Failed to delete repo label '{label_name}' from {repo_name}: status={resp.status_code}, body={getattr(resp, 'text', None)}")
except Exception as e:
logger.error(f"Error deleting repo label '{label_name}' from {repo_name}: {e}")
if issue_numbers:
try:
issues = {issue.number: issue for issue in list_all(self.issue_api.issue_list_issues, self.org_name, repo_name)}
except ApiException as e:
logger.error(f"Failed to list issues for {repo_name}: {e}")
return
for num in issue_numbers:
if num not in issues:
logger.warning(f"Issue #{num} not found in {repo_name}")
continue
_delete_issue_label(num)
else:
if delete_repo_label:
_delete_repo_label()
else:
logger.warning("No issue numbers provided and --repo not set; nothing to do for delete_label")
if __name__ == "__main__":
gitea = Gitea()