feat: add exclusive tag for addition and deletion

This commit is contained in:
蔡雨翔524370910013 2025-09-19 22:54:39 +08:00
parent 594337a54d
commit a2f6a83b09
4 changed files with 479 additions and 95 deletions

View File

@ -174,7 +174,9 @@ def close_all_issues() -> None:
def close_issues( def close_issues(
repo_name: str, repo_name: str,
issue_numbers: List[int] = Argument(..., help="One or more issue numbers to close"), issue_numbers: List[int] = Argument(..., help="One or more issue numbers to close"),
dry_run: bool = Option(False, "--dry-run/--no-dry-run", help="Enable dry run (no changes will be made)"), dry_run: bool = Option(
False, "--dry-run/--no-dry-run", help="Enable dry run (no changes will be made)"
),
) -> None: ) -> None:
tea.pot.gitea.close_issues(repo_name, issue_numbers, dry_run) tea.pot.gitea.close_issues(repo_name, issue_numbers, dry_run)
@ -187,11 +189,19 @@ def label_issues(
label_name: str, label_name: str,
repo_name: str, repo_name: str,
issue_numbers: List[int] = Argument(..., help="One or more issue numbers to label"), issue_numbers: List[int] = Argument(..., help="One or more issue numbers to label"),
issue_color: Optional[str] = Option(None, "--color", help="Color for newly created label (hex without # or with #)"), issue_color: Optional[str] = Option(
dry_run: bool = Option(False, "--dry-run/--no-dry-run", help="Dry run: show what would be labeled without making changes"), None, "--color", help="Color for newly created label (hex without # or with #)"
),
dry_run: bool = Option(
False,
"--dry-run/--no-dry-run",
help="Dry run: show what would be labeled without making changes",
),
) -> None: ) -> None:
if dry_run: if dry_run:
logger.info(f"Dry run: would add label '{label_name}' to {repo_name} issues: {issue_numbers}") logger.info(
f"Dry run: would add label '{label_name}' to {repo_name} issues: {issue_numbers}"
)
return return
tea.pot.gitea.label_issues(repo_name, label_name, issue_numbers, color=issue_color) tea.pot.gitea.label_issues(repo_name, label_name, issue_numbers, color=issue_color)
@ -203,19 +213,41 @@ def label_issues(
def delete_label( def delete_label(
label_name: str, label_name: str,
repo_name: str, repo_name: str,
issue_numbers: List[int] = Argument(None, help="Zero or more issue numbers to remove the label from (if empty and --repo is set, delete repo label)"), issue_numbers: List[int] = Argument(
repo: bool = Option(False, "--repo", help="Delete the repo-level label when set and issue_numbers is empty"), None,
dry_run: bool = Option(False, "--dry-run/--no-dry-run", help="Dry run: show what would be removed without making changes"), help="Zero or more issue numbers to remove the label from (if empty and --repo is set, delete repo label)",
),
repo: bool = Option(
False,
"--repo",
help="Delete the repo-level label when set and issue_numbers is empty",
),
dry_run: bool = Option(
False,
"--dry-run/--no-dry-run",
help="Dry run: show what would be removed without making changes",
),
) -> None: ) -> None:
if dry_run: if dry_run:
if issue_numbers: if issue_numbers:
logger.info(f"Dry run: would remove label '{label_name}' from {repo_name} issues: {issue_numbers}") logger.info(
f"Dry run: would remove label '{label_name}' from {repo_name} issues: {issue_numbers}"
)
elif repo: elif repo:
logger.info(f"Dry run: would delete repo label '{label_name}' from {repo_name}") logger.info(
f"Dry run: would delete repo label '{label_name}' from {repo_name}"
)
else: else:
logger.info("Dry run: no action specified (provide issue numbers or --repo to delete repo label)") logger.info(
"Dry run: no action specified (provide issue numbers or --repo to delete repo label)"
)
return return
tea.pot.gitea.delete_label(repo_name, label_name, issue_numbers if issue_numbers else None, delete_repo_label=repo) tea.pot.gitea.delete_label(
repo_name,
label_name,
issue_numbers if issue_numbers else None,
delete_repo_label=repo,
)
@app.command( @app.command(
@ -299,18 +331,33 @@ def create_personal_channels_on_mm(
help="update Mattermost channels for student groups based on team information on Gitea; only add missing members", help="update Mattermost channels for student groups based on team information on Gitea; only add missing members",
) )
def update_group_channels_on_mm( def update_group_channels_on_mm(
prefix: str = Option("", help="Only process repositories starting with this prefix"), prefix: str = Option(
"", help="Only process repositories starting with this prefix"
),
suffix: str = Option("", help="Only process channels ending with this suffix"), suffix: str = Option("", help="Only process channels ending with this suffix"),
update_teaching_team: bool = Option(True, "--update-teaching-team/--no-update-teaching-team", help="Whether to update teaching team"), update_teaching_team: bool = Option(
dry_run: bool = Option(False, "--dry-run/--no-dry-run", help="Dry run: show what would be added without making changes"), True,
"--update-teaching-team/--no-update-teaching-team",
help="Whether to update teaching team",
),
dry_run: bool = Option(
False,
"--dry-run/--no-dry-run",
help="Dry run: show what would be added without making changes",
),
) -> None: ) -> None:
groups = { groups = {
group_name: members group_name: members
for group_name, members in tea.pot.gitea.get_all_teams().items() for group_name, members in tea.pot.gitea.get_all_teams().items()
if group_name.startswith(prefix) if group_name.startswith(prefix)
} }
logger.info(f"{len(groups)} group channel(s) to update" + (f" with suffix {suffix}" if suffix else "")) logger.info(
tea.pot.mattermost.update_channels_for_groups(groups, suffix, update_teaching_team, dry_run) f"{len(groups)} group channel(s) to update"
+ (f" with suffix {suffix}" if suffix else "")
)
tea.pot.mattermost.update_channels_for_groups(
groups, suffix, update_teaching_team, dry_run
)
@app.command( @app.command(
@ -318,10 +365,20 @@ def update_group_channels_on_mm(
help="update personal Mattermost channels for every student; only add missing members", help="update personal Mattermost channels for every student; only add missing members",
) )
def update_personal_channels_on_mm( def update_personal_channels_on_mm(
update_teaching_team: bool = Option(True, "--update-teaching-team/--no-update-teaching-team", help="Whether to update teaching team"), update_teaching_team: bool = Option(
dry_run: bool = Option(False, "--dry-run/--no-dry-run", help="Dry run: show what would be added without making changes"), True,
"--update-teaching-team/--no-update-teaching-team",
help="Whether to update teaching team",
),
dry_run: bool = Option(
False,
"--dry-run/--no-dry-run",
help="Dry run: show what would be added without making changes",
),
) -> None: ) -> None:
tea.pot.mattermost.update_channels_for_individuals(tea.pot.canvas.students, update_teaching_team, dry_run) tea.pot.mattermost.update_channels_for_individuals(
tea.pot.canvas.students, update_teaching_team, dry_run
)
@app.command( @app.command(

View File

@ -40,12 +40,12 @@ class Canvas:
re.sub(r"[^\x00-\x7F]+", "", student.name).strip().title() re.sub(r"[^\x00-\x7F]+", "", student.name).strip().title()
) # We only care english name ) # We only care english name
# Some users (like system users, announcers) might not have login_id # Some users (like system users, announcers) might not have login_id
if hasattr(student, 'login_id') and student.login_id: if hasattr(student, "login_id") and student.login_id:
student.sis_id = student.login_id student.sis_id = student.login_id
student.login_id = student.email.split("@")[0] student.login_id = student.email.split("@")[0]
else: else:
# For users without login_id, use email prefix as both sis_id and login_id # For users without login_id, use email prefix as both sis_id and login_id
if hasattr(student, 'email') and student.email: if hasattr(student, "email") and student.email:
student.login_id = student.email.split("@")[0] student.login_id = student.email.split("@")[0]
student.sis_id = student.login_id student.sis_id = student.login_id
else: else:

View File

@ -1,11 +1,11 @@
import re import re
from enum import Enum from enum import Enum
from functools import lru_cache from functools import lru_cache
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, TypeVar from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, cast
from urllib.parse import quote
import focs_gitea import focs_gitea
import requests import requests # type: ignore
from urllib.parse import quote
from canvasapi.group import Group, GroupMembership from canvasapi.group import Group, GroupMembership
from canvasapi.paginated_list import PaginatedList from canvasapi.paginated_list import PaginatedList
from canvasapi.user import User from canvasapi.user import User
@ -22,11 +22,13 @@ class PermissionEnum(Enum):
admin = "admin" admin = "admin"
T = TypeVar("T") def list_all(method: Callable[..., Any], *args: Any, **kwargs: Any) -> List[Any]:
"""Call paginated API methods repeatedly and collect results.
The exact return element types vary depending on the API client. We use
def list_all(method: Callable[..., Iterable[T]], *args: Any, **kwargs: Any) -> List[T]: ``Any`` here to avoid over-constraining typing for the external client.
all_res = [] """
all_res: List[Any] = []
page = 1 page = 1
while True: while True:
res = method(*args, **kwargs, page=page) res = method(*args, **kwargs, page=page)
@ -479,14 +481,21 @@ class Gitea:
self.org_name, repo_name, issue.number, body={"state": "closed"} self.org_name, repo_name, issue.number, body={"state": "closed"}
) )
def close_issues(self, repo_name: str, issue_numbers: List[int], dry_run: bool = True) -> None: def close_issues(
self, repo_name: str, issue_numbers: List[int], dry_run: bool = True
) -> None:
if not issue_numbers: if not issue_numbers:
logger.warning("No issue numbers provided to close") logger.warning("No issue numbers provided to close")
return return
if dry_run: if dry_run:
logger.info("Dry run enabled. No changes will be made to issues.") logger.info("Dry run enabled. No changes will be made to issues.")
try: try:
issues = {issue.number: issue for issue in list_all(self.issue_api.issue_list_issues, self.org_name, repo_name)} issues = {
issue.number: issue
for issue in list_all(
self.issue_api.issue_list_issues, self.org_name, repo_name
)
}
except ApiException as e: except ApiException as e:
logger.error(f"Failed to list issues for {repo_name}: {e}") logger.error(f"Failed to list issues for {repo_name}: {e}")
return return
@ -503,7 +512,9 @@ class Gitea:
if dry_run: if dry_run:
logger.info(f"Would close issue #{num} in {repo_name} (dry run)") logger.info(f"Would close issue #{num} in {repo_name} (dry run)")
continue continue
self.issue_api.issue_edit_issue(self.org_name, repo_name, num, body={"state": "closed"}) self.issue_api.issue_edit_issue(
self.org_name, repo_name, num, body={"state": "closed"}
)
logger.info(f"Closed issue #{num} in {repo_name}") logger.info(f"Closed issue #{num} in {repo_name}")
except ApiException as e: except ApiException as e:
logger.error(f"Failed to close issue #{num} in {repo_name}: {e}") logger.error(f"Failed to close issue #{num} in {repo_name}: {e}")
@ -579,17 +590,40 @@ class Gitea:
self.create_milestone(repo_name, milestone, description, due_date) self.create_milestone(repo_name, milestone, description, due_date)
logger.info(f"Created milestone {milestone} in {repo_name}") logger.info(f"Created milestone {milestone} in {repo_name}")
def label_issues(self, repo_name: str, label_name: str, issue_numbers: List[int], color: Optional[str] = None) -> None: def label_issues(
self,
repo_name: str,
label_name: str,
issue_numbers: List[int],
color: Optional[str] = None,
) -> None:
if not issue_numbers: if not issue_numbers:
logger.warning("No issue numbers provided to label") logger.warning("No issue numbers provided to label")
return return
try: try:
labels = self.issue_api.issue_list_labels(self.org_name, repo_name) labels = list(
cast(
Iterable[Any],
self.issue_api.issue_list_labels(self.org_name, repo_name),
)
)
except ApiException as e: except ApiException as e:
logger.error(f"Failed to list labels for {repo_name}: {e}") logger.error(f"Failed to list labels for {repo_name}: {e}")
return return
# Build a lookup for repo labels by name to read metadata like 'exclusive'
repo_label_name_to_obj: Dict[str, Any] = {}
try:
for l in labels:
name = getattr(l, "name", None) or (
l.get("name") if isinstance(l, dict) else None
)
if isinstance(name, str):
repo_label_name_to_obj[name] = l
except Exception:
repo_label_name_to_obj = {}
label_obj = None label_obj = None
for l in labels: for l in labels:
if getattr(l, "name", None) == label_name: if getattr(l, "name", None) == label_name:
@ -600,40 +634,106 @@ class Gitea:
chosen_color = None chosen_color = None
if color: if color:
c = color.strip() c = color.strip()
if c.startswith('#'): if c.startswith("#"):
c = c[1:] c = c[1:]
if re.fullmatch(r"[0-9a-fA-F]{6}", c): if re.fullmatch(r"[0-9a-fA-F]{6}", c):
chosen_color = c chosen_color = c
else: else:
logger.warning(f"Provided color '{color}' is not a valid 3- or 6-digit hex; falling back to default") logger.warning(
f"Provided color '{color}' is not a valid 3- or 6-digit hex; falling back to default"
)
if chosen_color is None: if chosen_color is None:
chosen_color = "CCCCCC" chosen_color = "CCCCCC"
try: try:
# Create the label and mark it as exclusive so subsequent labels added by this
# command are treated as exclusive by Gitea (if supported by the server)
label_obj = self.issue_api.issue_create_label( label_obj = self.issue_api.issue_create_label(
self.org_name, self.org_name,
repo_name, repo_name,
body={"name": label_name, "color": chosen_color, "exclusive": False}, body={"name": label_name, "color": chosen_color, "exclusive": True},
)
logger.info(
f"Created label '{label_name}' in {self.org_name}/{repo_name}"
) )
logger.info(f"Created label '{label_name}' in {self.org_name}/{repo_name}")
except ApiException as e: except ApiException as e:
logger.error(f"Failed to create label {label_name} in {repo_name}: {e}") logger.error(f"Failed to create label {label_name} in {repo_name}: {e}")
if hasattr(e, 'body'): if hasattr(e, "body"):
logger.error(f"ApiException body: {getattr(e, 'body', None)}") logger.error(f"ApiException body: {getattr(e, 'body', None)}")
return return
label_id = getattr(label_obj, "id", None) else:
if label_id is None:
try: try:
existing_color = getattr(label_obj, "color", None) or (
label_obj.get("color") if isinstance(label_obj, dict) else None
)
if (
existing_color
and isinstance(existing_color, str)
and existing_color.startswith("#")
):
existing_color = existing_color[1:]
is_exclusive = getattr(label_obj, "exclusive", None)
if not is_exclusive:
enc_name = quote(label_name, safe="")
path = f"/repos/{self.org_name}/{repo_name}/labels/{enc_name}"
url = f"{self.api_client.configuration.host}{path}"
token = (
getattr(self.api_client.configuration, "api_key", {}).get(
"access_token"
)
or settings.gitea_access_token
)
headers_local = {
"Authorization": f"token {token}",
"Content-Type": "application/json",
}
payload = {"exclusive": True, "name": label_name}
if existing_color:
payload["color"] = existing_color
try:
resp = requests.patch(
url, headers=headers_local, json=payload, timeout=10
)
if resp.status_code in (200, 201):
logger.info(
f"Marked existing label '{label_name}' as exclusive in {repo_name}"
)
else:
logger.warning(
f"Failed to mark existing label '{label_name}' as exclusive: status={resp.status_code}, body={getattr(resp, 'text', None)}"
)
except Exception as e:
logger.warning(
f"Error while trying to mark label '{label_name}' exclusive: {e}"
)
except Exception:
logger.debug(
f"Could not ensure exclusive attribute for label '{label_name}' (continuing)"
)
# Determine numeric id of the label in a type-safe manner
label_id = None
try:
if isinstance(label_obj, dict):
label_id = label_obj.get("id") label_id = label_obj.get("id")
else:
label_id = getattr(label_obj, "id", None)
except Exception: except Exception:
label_id = None label_id = None
if label_id is None: if label_id is None:
logger.error(f"Unable to determine id of label '{label_name}' in {repo_name}") logger.error(
f"Unable to determine id of label '{label_name}' in {repo_name}"
)
return return
try: try:
issues = {issue.number: issue for issue in list_all(self.issue_api.issue_list_issues, self.org_name, repo_name)} issues = {
issue.number: issue
for issue in list_all(
self.issue_api.issue_list_issues, self.org_name, repo_name
)
}
except ApiException as e: except ApiException as e:
logger.error(f"Failed to list issues for {repo_name}: {e}") logger.error(f"Failed to list issues for {repo_name}: {e}")
return return
@ -644,15 +744,20 @@ class Gitea:
logger.warning(f"Issue #{num} not found in {repo_name}") logger.warning(f"Issue #{num} not found in {repo_name}")
continue continue
def _extract_label_names_from_issue(issue_obj) -> List[str]: def _extract_label_names_from_issue(issue_obj: Any) -> List[str]:
try: try:
return [getattr(l, "name", l) for l in getattr(issue_obj, "labels", []) or []] return [
getattr(l, "name", l)
for l in getattr(issue_obj, "labels", []) or []
]
except Exception: except Exception:
return [] return []
existing_label_names = _extract_label_names_from_issue(issue) existing_label_names = _extract_label_names_from_issue(issue)
if label_name in existing_label_names: if label_name in existing_label_names:
logger.info(f"Issue #{num} in {repo_name} already has label '{label_name}'") logger.info(
f"Issue #{num} in {repo_name} already has label '{label_name}'"
)
continue continue
existing_label_ids: List[int] = [] existing_label_ids: List[int] = []
@ -670,17 +775,32 @@ class Gitea:
existing_label_ids = [] existing_label_ids = []
if label_id in existing_label_ids: if label_id in existing_label_ids:
logger.info(f"Issue #{num} in {repo_name} already has label '{label_name}' (by id)") logger.info(
f"Issue #{num} in {repo_name} already has label '{label_name}' (by id)"
)
continue continue
# verification # verification
def _fetch_issue_labels_via_api() -> List[str]: def _fetch_issue_labels_via_api() -> List[str]:
try: try:
single = self.issue_api.issue_get_issue(self.org_name, repo_name, num) single = self.issue_api.issue_get_issue(
self.org_name, repo_name, num
)
return _extract_label_names_from_issue(single) return _extract_label_names_from_issue(single)
except Exception: except Exception:
try: try:
updated = next((i for i in list_all(self.issue_api.issue_list_issues, self.org_name, repo_name) if getattr(i, "number", None) == num), None) updated = next(
(
i
for i in list_all(
self.issue_api.issue_list_issues,
self.org_name,
repo_name,
)
if getattr(i, "number", None) == num
),
None,
)
if updated is None: if updated is None:
return [] return []
return _extract_label_names_from_issue(updated) return _extract_label_names_from_issue(updated)
@ -690,42 +810,183 @@ class Gitea:
issue_labels = _fetch_issue_labels_via_api() issue_labels = _fetch_issue_labels_via_api()
# prepare low-level HTTP helpers for fallbacks # prepare low-level HTTP helpers for fallbacks
def _do_post_labels(issue_num: int, payload) -> requests.Response: def _do_post_labels(issue_num: int, payload: Any) -> Any:
path = f"/repos/{self.org_name}/{repo_name}/issues/{issue_num}/labels" path = f"/repos/{self.org_name}/{repo_name}/issues/{issue_num}/labels"
full_url = f"{self.api_client.configuration.host}{path}" full_url = f"{self.api_client.configuration.host}{path}"
token = getattr(self.api_client.configuration, 'api_key', {}) .get('access_token') or settings.gitea_access_token token = (
getattr(self.api_client.configuration, "api_key", {}).get(
"access_token"
)
or settings.gitea_access_token
)
headers_local = { headers_local = {
"Authorization": f"token {token}", "Authorization": f"token {token}",
"Content-Type": "application/json", "Content-Type": "application/json",
} }
return requests.post(full_url, headers=headers_local, json=payload, timeout=10) return requests.post(
full_url, headers=headers_local, json=payload, timeout=10
)
# fallback: if label still not present, POST JSON object {"labels":[name]} to add-labels endpoint # fallback: if label still not present, POST JSON object {"labels":[name]} to add-labels endpoint
# determine if the target label is marked exclusive (force to bool)
target_is_exclusive: bool = False
try:
target_label_obj = repo_label_name_to_obj.get(label_name) or label_obj
_tmp_any: Any = getattr(target_label_obj, "exclusive", None) or (
target_label_obj.get("exclusive")
if isinstance(target_label_obj, dict)
else False
)
target_is_exclusive = bool(_tmp_any)
except Exception:
target_is_exclusive = False
if target_is_exclusive:
# find other exclusive labels present on this issue and remove them
other_exclusive_label_ids: List[int] = []
try:
for lname in issue_labels or []:
if lname == label_name:
continue
lobj = repo_label_name_to_obj.get(lname)
is_ex = False
if lobj is not None:
is_ex = bool(
getattr(lobj, "exclusive", None)
or (
lobj.get("exclusive")
if isinstance(lobj, dict)
else False
)
)
else:
# try to find by name in labels list
for ll in labels:
n = getattr(ll, "name", None) or (
ll.get("name") if isinstance(ll, dict) else None
)
if n == lname:
_tmp_any_ex: Any = getattr(
ll, "exclusive", None
) or (
ll.get("exclusive")
if isinstance(ll, dict)
else False
)
is_ex = bool(_tmp_any_ex)
break
if is_ex:
# determine id
lid = None
try:
candidate = next(
(
ll
for ll in labels
if (
getattr(ll, "name", None)
or (
ll.get("name")
if isinstance(ll, dict)
else None
)
)
== lname
),
None,
)
if candidate is not None:
lid = getattr(candidate, "id", None) or (
candidate.get("id")
if isinstance(candidate, dict)
else None
)
except Exception:
lid = None
if lid is not None:
other_exclusive_label_ids.append(lid)
except Exception:
other_exclusive_label_ids = []
# delete other exclusive labels by numeric id
token = (
getattr(self.api_client.configuration, "api_key", {}).get(
"access_token"
)
or settings.gitea_access_token
)
headers_local = {"Authorization": f"token {token}"}
for other_lid in other_exclusive_label_ids:
try:
path_id = f"/repos/{self.org_name}/{repo_name}/issues/{num}/labels/{other_lid}"
url_id = f"{self.api_client.configuration.host}{path_id}"
resp = requests.delete(
url_id, headers=headers_local, timeout=10
)
if resp.status_code in (200, 204):
logger.info(
f"Removed exclusive label id {other_lid} from {repo_name}#{num}"
)
else:
logger.warning(
f"Failed to remove exclusive label id {other_lid} from {repo_name}#{num}: status={resp.status_code}"
)
except Exception as e:
logger.warning(
f"Error removing exclusive label id {other_lid} from {repo_name}#{num}: {e}"
)
if label_name not in (issue_labels or []): if label_name not in (issue_labels or []):
try: try:
resp = _do_post_labels(num, {"labels": [label_name]}) resp = _do_post_labels(num, {"labels": [label_name]})
if resp.status_code not in (200, 201): if resp.status_code not in (200, 201):
logger.error(f"Failed to add label via add-labels endpoint for issue #{num}: status={resp.status_code}") logger.error(
f"Failed to add label via add-labels endpoint for issue #{num}: status={resp.status_code}"
)
except Exception as e: except Exception as e:
logger.error(f"Failed to POST object payload to issue #{num}: {e}") logger.error(f"Failed to POST object payload to issue #{num}: {e}")
# final verification # final verification
issue_labels = _fetch_issue_labels_via_api() issue_labels = _fetch_issue_labels_via_api()
if label_name in (issue_labels or []): if label_name in (issue_labels or []):
logger.info(f"Label '{label_name}' attached to issue #{num} in {repo_name}") logger.info(
f"Label '{label_name}' attached to issue #{num} in {repo_name}"
)
else: else:
logger.warning(f"Label '{label_name}' not attached to issue #{num} in {repo_name} after attempts") logger.warning(
f"Label '{label_name}' not attached to issue #{num} in {repo_name} after attempts"
)
def delete_label(
def delete_label(self, repo_name: str, label_name: str, issue_numbers: Optional[List[int]] = None, delete_repo_label: bool = False) -> None: self,
token = getattr(self.api_client.configuration, 'api_key', {}) .get('access_token') or settings.gitea_access_token repo_name: str,
label_name: str,
issue_numbers: Optional[List[int]] = None,
delete_repo_label: bool = False,
) -> None:
token = (
getattr(self.api_client.configuration, "api_key", {}).get("access_token")
or settings.gitea_access_token
)
headers_local = {"Authorization": f"token {token}"} headers_local = {"Authorization": f"token {token}"}
repo_labels: List[Any] = []
try: try:
repo_labels = self.issue_api.issue_list_labels(self.org_name, repo_name) repo_labels = list(
label_name_to_id: Dict[str, int] = { cast(
(getattr(l, 'name', None) or (l.get('name') if isinstance(l, dict) else None)): (getattr(l, 'id', None) or (l.get('id') if isinstance(l, dict) else None)) Iterable[Any],
for l in repo_labels self.issue_api.issue_list_labels(self.org_name, repo_name),
} )
)
label_name_to_id: Dict[str, int] = {}
for l in repo_labels:
name = getattr(l, "name", None) or (
l.get("name") if isinstance(l, dict) else None
)
lid = getattr(l, "id", None) or (
l.get("id") if isinstance(l, dict) else None
)
if isinstance(name, str) and isinstance(lid, int):
label_name_to_id[name] = lid
except Exception: except Exception:
label_name_to_id = {} label_name_to_id = {}
@ -733,45 +994,70 @@ class Gitea:
lid = label_name_to_id.get(label_name) lid = label_name_to_id.get(label_name)
if lid is None: if lid is None:
try: try:
for l in self.issue_api.issue_list_labels(self.org_name, repo_name): for l in repo_labels:
name = getattr(l, 'name', None) or (l.get('name') if isinstance(l, dict) else None) name = getattr(l, "name", None) or (
l.get("name") if isinstance(l, dict) else None
)
if name == label_name: if name == label_name:
lid = getattr(l, 'id', None) or (l.get('id') if isinstance(l, dict) else None) lid = getattr(l, "id", None) or (
l.get("id") if isinstance(l, dict) else None
)
break break
except Exception: except Exception:
pass pass
if lid is None: if lid is None:
logger.warning(f"No numeric id found for label '{label_name}' in {repo_name}; skipping issue-level delete for issue #{issue_num}") logger.warning(
f"No numeric id found for label '{label_name}' in {repo_name}; skipping issue-level delete for issue #{issue_num}"
)
return return
path_id = f"/repos/{self.org_name}/{repo_name}/issues/{issue_num}/labels/{lid}" path_id = (
f"/repos/{self.org_name}/{repo_name}/issues/{issue_num}/labels/{lid}"
)
url_id = f"{self.api_client.configuration.host}{path_id}" url_id = f"{self.api_client.configuration.host}{path_id}"
try: try:
resp = requests.delete(url_id, headers=headers_local, timeout=10) resp = requests.delete(url_id, headers=headers_local, timeout=10)
if resp.status_code in (200, 204): if resp.status_code in (200, 204):
logger.info(f"Removed label '{label_name}' from {repo_name}#{issue_num}") logger.info(
f"Removed label '{label_name}' from {repo_name}#{issue_num}"
)
return return
logger.warning(f"Numeric-id DELETE returned status {resp.status_code} for {repo_name}#{issue_num}: body={getattr(resp, 'text', None)}") logger.warning(
f"Numeric-id DELETE returned status {resp.status_code} for {repo_name}#{issue_num}: body={getattr(resp, 'text', None)}"
)
except Exception as e: except Exception as e:
logger.error(f"Numeric-id DELETE error for {repo_name}#{issue_num}: {e}") logger.error(
f"Numeric-id DELETE error for {repo_name}#{issue_num}: {e}"
)
def _delete_repo_label() -> None: def _delete_repo_label() -> None:
enc_name = quote(label_name, safe='') enc_name = quote(label_name, safe="")
path = f"/repos/{self.org_name}/{repo_name}/labels/{enc_name}" path = f"/repos/{self.org_name}/{repo_name}/labels/{enc_name}"
url = f"{self.api_client.configuration.host}{path}" url = f"{self.api_client.configuration.host}{path}"
try: try:
resp = requests.delete(url, headers=headers_local, timeout=10) resp = requests.delete(url, headers=headers_local, timeout=10)
if resp.status_code in (200, 204): if resp.status_code in (200, 204):
logger.info(f"Removed repo-level label '{label_name}' from {repo_name}") logger.info(
f"Removed repo-level label '{label_name}' from {repo_name}"
)
return return
logger.error(f"Failed to delete repo label '{label_name}' from {repo_name}: status={resp.status_code}, body={getattr(resp, 'text', None)}") logger.error(
f"Failed to delete repo label '{label_name}' from {repo_name}: status={resp.status_code}, body={getattr(resp, 'text', None)}"
)
except Exception as e: except Exception as e:
logger.error(f"Error deleting repo label '{label_name}' from {repo_name}: {e}") logger.error(
f"Error deleting repo label '{label_name}' from {repo_name}: {e}"
)
if issue_numbers: if issue_numbers:
try: try:
issues = {issue.number: issue for issue in list_all(self.issue_api.issue_list_issues, self.org_name, repo_name)} issues = {
issue.number: issue
for issue in list_all(
self.issue_api.issue_list_issues, self.org_name, repo_name
)
}
except ApiException as e: except ApiException as e:
logger.error(f"Failed to list issues for {repo_name}: {e}") logger.error(f"Failed to list issues for {repo_name}: {e}")
return return
@ -784,7 +1070,10 @@ class Gitea:
if delete_repo_label: if delete_repo_label:
_delete_repo_label() _delete_repo_label()
else: else:
logger.warning("No issue numbers provided and --repo not set; nothing to do for delete_label") logger.warning(
"No issue numbers provided and --repo not set; nothing to do for delete_label"
)
if __name__ == "__main__": if __name__ == "__main__":
gitea = Gitea() gitea = Gitea()

View File

@ -113,7 +113,9 @@ class Mattermost:
for group_name, members in groups.items(): for group_name, members in groups.items():
channel_name = group_name + suffix channel_name = group_name + suffix
try: try:
channel = self.endpoint.channels.get_channel_by_name(self.team["id"], channel_name) channel = self.endpoint.channels.get_channel_by_name(
self.team["id"], channel_name
)
logger.info(f"Channel {channel_name} exists, updating members") logger.info(f"Channel {channel_name} exists, updating members")
except Exception: except Exception:
# channel does not exist # channel does not exist
@ -121,7 +123,9 @@ class Mattermost:
info_members = list(members) info_members = list(members)
if update_teaching_team: if update_teaching_team:
info_members = info_members + settings.mattermost_teaching_team info_members = info_members + settings.mattermost_teaching_team
logger.info(f"Dry run: would create channel {channel_name} and add members: {info_members}") logger.info(
f"Dry run: would create channel {channel_name} and add members: {info_members}"
)
continue continue
try: try:
channel = self.endpoint.channels.create_channel( channel = self.endpoint.channels.create_channel(
@ -141,7 +145,9 @@ class Mattermost:
current_members = set() current_members = set()
try: try:
mm_members = self.endpoint.channels.get_channel_members(channel["id"]) or [] mm_members = (
self.endpoint.channels.get_channel_members(channel["id"]) or []
)
for m in mm_members: for m in mm_members:
uname = None uname = None
if isinstance(m, dict): if isinstance(m, dict):
@ -169,22 +175,34 @@ class Mattermost:
logger.debug(f"Member {member} already in channel {channel_name}") logger.debug(f"Member {member} already in channel {channel_name}")
continue continue
if dry_run: if dry_run:
logger.info(f"Dry run: would add member {member} to channel {channel_name}") logger.info(
f"Dry run: would add member {member} to channel {channel_name}"
)
continue continue
try: try:
mmuser = self.endpoint.users.get_user_by_username(member) mmuser = self.endpoint.users.get_user_by_username(member)
except Exception: except Exception:
logger.warning(f"User {member} is not found on the Mattermost server") logger.warning(
f"User {member} is not found on the Mattermost server"
)
self.endpoint.posts.create_post( self.endpoint.posts.create_post(
{"channel_id": channel["id"], "message": f"User {member} is not found on the Mattermost server"} {
"channel_id": channel["id"],
"message": f"User {member} is not found on the Mattermost server",
}
) )
continue continue
try: try:
self.endpoint.channels.add_user(channel["id"], {"user_id": mmuser["id"]}) self.endpoint.channels.add_user(
channel["id"], {"user_id": mmuser["id"]}
)
except Exception: except Exception:
logger.warning(f"User {member} is not in the team") logger.warning(f"User {member} is not in the team")
self.endpoint.posts.create_post( self.endpoint.posts.create_post(
{"channel_id": channel["id"], "message": f"User {member} is not in the team"} {
"channel_id": channel["id"],
"message": f"User {member} is not in the team",
}
) )
logger.info(f"Added member {member} to channel {channel_name}") logger.info(f"Added member {member} to channel {channel_name}")
@ -261,14 +279,18 @@ class Mattermost:
display_name = student.name display_name = student.name
channel_name = student.sis_id channel_name = student.sis_id
try: try:
channel = self.endpoint.channels.get_channel_by_name(self.team["id"], channel_name) channel = self.endpoint.channels.get_channel_by_name(
self.team["id"], channel_name
)
logger.info(f"Channel {channel_name} exists, updating members") logger.info(f"Channel {channel_name} exists, updating members")
except Exception: except Exception:
if dry_run: if dry_run:
members_info = [student.login_id] members_info = [student.login_id]
if update_teaching_team: if update_teaching_team:
members_info = members_info + settings.mattermost_teaching_team members_info = members_info + settings.mattermost_teaching_team
logger.info(f"Dry run: would create channel {display_name} ({channel_name}) and add members: {members_info}") logger.info(
f"Dry run: would create channel {display_name} ({channel_name}) and add members: {members_info}"
)
continue continue
try: try:
channel = self.endpoint.channels.create_channel( channel = self.endpoint.channels.create_channel(
@ -279,7 +301,9 @@ class Mattermost:
"type": "P", "type": "P",
} }
) )
logger.info(f"Created channel {display_name} ({channel_name}) on Mattermost") logger.info(
f"Created channel {display_name} ({channel_name}) on Mattermost"
)
except Exception as e: except Exception as e:
logger.warning( logger.warning(
f"Error when creating channel {channel_name}: {e} Perhaps channel already exists?" f"Error when creating channel {channel_name}: {e} Perhaps channel already exists?"
@ -288,7 +312,9 @@ class Mattermost:
current_members = set() current_members = set()
try: try:
mm_members = self.endpoint.channels.get_channel_members(channel["id"]) or [] mm_members = (
self.endpoint.channels.get_channel_members(channel["id"]) or []
)
for m in mm_members: for m in mm_members:
uname = None uname = None
if isinstance(m, dict): if isinstance(m, dict):
@ -316,22 +342,34 @@ class Mattermost:
logger.debug(f"Member {member} already in channel {channel_name}") logger.debug(f"Member {member} already in channel {channel_name}")
continue continue
if dry_run: if dry_run:
logger.info(f"Dry run: would add member {member} to channel {channel_name}") logger.info(
f"Dry run: would add member {member} to channel {channel_name}"
)
continue continue
try: try:
mmuser = self.endpoint.users.get_user_by_username(member) mmuser = self.endpoint.users.get_user_by_username(member)
except Exception: except Exception:
logger.warning(f"User {member} is not found on the Mattermost server") logger.warning(
f"User {member} is not found on the Mattermost server"
)
self.endpoint.posts.create_post( self.endpoint.posts.create_post(
{"channel_id": channel["id"], "message": f"User {member} is not found on the Mattermost server"} {
"channel_id": channel["id"],
"message": f"User {member} is not found on the Mattermost server",
}
) )
continue continue
try: try:
self.endpoint.channels.add_user(channel["id"], {"user_id": mmuser["id"]}) self.endpoint.channels.add_user(
channel["id"], {"user_id": mmuser["id"]}
)
except Exception: except Exception:
logger.warning(f"User {member} is not in the team") logger.warning(f"User {member} is not in the team")
self.endpoint.posts.create_post( self.endpoint.posts.create_post(
{"channel_id": channel["id"], "message": f"User {member} is not in the team"} {
"channel_id": channel["id"],
"message": f"User {member} is not in the team",
}
) )
logger.info(f"Added member {member} to channel {channel_name}") logger.info(f"Added member {member} to channel {channel_name}")