Compare commits

..

17 Commits

Author SHA1 Message Date
77064ac37c
chore: better help
All checks were successful
build / trigger-build-image (push) Successful in 15s
2025-10-29 20:38:22 -07:00
332e522051
feat: joj3-check-env ignore submitter
All checks were successful
build / trigger-build-image (push) Successful in 24s
2025-10-29 20:36:11 -07:00
69e097e04b
fix: remove git lock
All checks were successful
build / trigger-build-image (push) Successful in 28s
2025-10-28 19:37:08 -07:00
f4fb5eae05
feat: add filelock back
All checks were successful
build / trigger-build-image (push) Successful in 47s
2025-10-25 09:05:23 -07:00
3a511660bb
feat: remove all .lock files
All checks were successful
build / trigger-build-image (push) Successful in 14s
2025-10-09 00:14:34 -07:00
9fc7649696
feat: remove more locks
All checks were successful
build / trigger-build-image (push) Successful in 12s
2025-10-08 20:29:43 -07:00
e160023cbf
chore: log more commits length
All checks were successful
build / trigger-build-image (push) Successful in 16s
2025-10-08 20:21:27 -07:00
99d889ee12
feat: check stderr isatty for colorize
All checks were successful
build / trigger-build-image (push) Successful in 15s
2025-10-03 00:00:44 -07:00
f755fb44f6
chore: log unwatch
All checks were successful
build / trigger-build-image (push) Successful in 16s
2025-09-28 18:07:11 -07:00
94d3f993b2
feat: remove output repos in joj3-check-gitea-token
All checks were successful
build / trigger-build-image (push) Successful in 15s
2025-09-21 06:15:10 -07:00
aa33dcc2f1
feat: list orgs in joj3-check-gitea-token
All checks were successful
build / trigger-build-image (push) Successful in 8s
2025-09-21 06:02:58 -07:00
e24545324d
revert: "fix: disable file log in joj3-check-gitea-token"
This reverts commit 3dc6667716.
2025-09-21 06:01:04 -07:00
3dc6667716
fix: disable file log in joj3-check-gitea-token
All checks were successful
build / trigger-build-image (push) Successful in 8s
2025-09-21 05:26:08 -07:00
5b6c61af6d
fix: echo user
All checks were successful
build / trigger-build-image (push) Successful in 10s
2025-09-21 05:03:13 -07:00
8264152022
feat: joj3-check-gitea-token
All checks were successful
build / trigger-build-image (push) Successful in 9s
2025-09-21 05:01:03 -07:00
992f450004
feat: check current gitea user for joj3
All checks were successful
build / trigger-build-image (push) Successful in 24s
2025-09-21 04:56:14 -07:00
5478052c23
chore: updgrade to latest hooks 2025-09-21 04:53:13 -07:00
9 changed files with 58 additions and 821 deletions

View File

@ -60,24 +60,11 @@ clone all gitea repos to local
close all issues and pull requests in gitea organization close all issues and pull requests in gitea organization
### `create-group-channels-on-mm` ### `create-channels-on-mm`
create Mattermost channels for student groups based on team information on Gitea create channels for student groups according to group information on gitea. Optionally specify a prefix to ignore all repos whose names do not start with it. Optionally specify a suffix to add to all channels created.
**Options**: Example: `python3 -m joint_teapot create-channels-on-mm --prefix p1 --suffix -private --invite-teaching-team` will fetch all repos whose names start with `"p1"` and create channels on mm for these repos like "p1team1-private". Members of a repo will be added to the corresponding channel. And teaching team (adjust in `.env`) will be invited to the channels.
- `--prefix TEXT`: Only process repositories starting with this prefix
- `--suffix TEXT`: Add suffix to created channels
- `--invite-teaching-team/--no-invite-teaching-team`: Whether to invite teaching team (default: invite)
Example: `joint-teapot create-group-channels-on-mm --prefix "hteam" --suffix "-gitea"` will Create channels for webhook integration. Members of "hteam*" repo will be added to the corresponding channel. And teaching team (adjust in `.env`) will be invited to the channels.
### `create-personal-channels-on-mm`
create personal Mattermost channels for every student
**Options**:
- `--invite-teaching-team/--no-invite-teaching-team`: Whether to invite teaching team (default: invite)
### `create-comment` ### `create-comment`
@ -138,26 +125,6 @@ Example: `python3 -m joint_teapot unsubscribe-from-repos '\d{12}$'` will remove
upload assignment grades to canvas from grade file (GRADE.txt by default), read the first line as grade, the rest as comments upload assignment grades to canvas from grade file (GRADE.txt by default), read the first line as grade, the rest as comments
### `label-issues`
add a label to specific issues in a repository, create labels if not exist, with dry-run disabled by default. You may adjust the color of the label with `--color "#******"` if the label doesn't exist.
### `delete-labels`
remove a label from specific issues or delete the repository labels, with dry-run disabled by default.
### `close-issues`
close one or more specific issues in a repository, with dry-run disabled by default.
### `update-group-channels-on-mm`
update group Mattermost channels for student groups based on team information on Gitea. It will only add missing users, never delete anyone.
### `update-personal-channels-on-mm`
update personal Mattermost channels for every student. It will only add missing users, never delete anyone.
## License ## License
MIT MIT

View File

@ -6,7 +6,7 @@ from pathlib import Path
from time import sleep from time import sleep
from typing import TYPE_CHECKING, List, Optional from typing import TYPE_CHECKING, List, Optional
# from filelock import FileLock from filelock import FileLock
from git import Repo from git import Repo
from typer import Argument, Exit, Option, Typer, echo from typer import Argument, Exit, Option, Typer, echo
@ -167,51 +167,6 @@ def close_all_issues() -> None:
tea.pot.gitea.close_all_issues() tea.pot.gitea.close_all_issues()
@app.command(
"close-issues",
help="close specific issues in a repository",
)
def close_issues(
repo_name: str,
issue_numbers: List[int] = Argument(..., help="One or more issue numbers to close"),
) -> None:
tea.pot.gitea.close_issues(repo_name, issue_numbers)
@app.command(
"label-issues",
help="add a label to specific issues in a repository",
)
def label_issues(
label_name: str,
repo_name: str,
issue_numbers: List[int] = Argument(..., help="One or more issue numbers to label"),
issue_color: Optional[str] = Option(
None, "--color", help="Color for newly created label (hex without # or with #)"
),
) -> None:
tea.pot.gitea.label_issues(repo_name, label_name, issue_numbers, color=issue_color)
@app.command(
"delete-label",
help="remove a label from specific issues or delete the repository label",
)
def delete_label(
label_name: str,
repo_name: str,
issue_numbers: List[int] = Argument(
None,
help="issue numbers to remove the label from",
),
) -> None:
tea.pot.gitea.delete_label(
repo_name,
label_name,
issue_numbers if issue_numbers else None,
)
@app.command( @app.command(
"archive-repos", help="archive repos in gitea organization according to regex" "archive-repos", help="archive repos in gitea organization according to regex"
) )
@ -288,49 +243,6 @@ def create_personal_channels_on_mm(
tea.pot.create_channels_for_individuals(invite_teaching_team) tea.pot.create_channels_for_individuals(invite_teaching_team)
@app.command(
"update-group-channels-on-mm",
help="update Mattermost channels for student groups based on team information on Gitea; only add missing members",
)
def update_group_channels_on_mm(
prefix: str = Option(
"", help="Only process repositories starting with this prefix"
),
suffix: str = Option("", help="Only process channels ending with this suffix"),
update_teaching_team: bool = Option(
True,
"--update-teaching-team/--no-update-teaching-team",
help="Whether to update teaching team",
),
) -> None:
groups = {
group_name: members
for group_name, members in tea.pot.gitea.get_all_teams().items()
if group_name.startswith(prefix)
}
logger.info(
f"{len(groups)} group channel(s) to update"
+ (f" with suffix {suffix}" if suffix else "")
)
tea.pot.mattermost.update_channels_for_groups(groups, suffix, update_teaching_team)
@app.command(
"update-personal-channels-on-mm",
help="update personal Mattermost channels for every student; only add missing members",
)
def update_personal_channels_on_mm(
update_teaching_team: bool = Option(
True,
"--update-teaching-team/--no-update-teaching-team",
help="Whether to update teaching team",
),
) -> None:
tea.pot.mattermost.update_channels_for_individuals(
tea.pot.canvas.students, update_teaching_team
)
@app.command( @app.command(
"create-webhooks-for-mm", "create-webhooks-for-mm",
help="create a pair of webhooks on gitea and mm for all student groups on gitea, " help="create a pair of webhooks on gitea and mm for all student groups on gitea, "
@ -480,8 +392,7 @@ def joj3_all_env(
f"try to acquire lock, file path: {lock_file_path}, " f"try to acquire lock, file path: {lock_file_path}, "
+ f"timeout: {settings.joj3_lock_file_timeout}" + f"timeout: {settings.joj3_lock_file_timeout}"
) )
if True: # disable the file lock temporarily with FileLock(lock_file_path, timeout=settings.joj3_lock_file_timeout).acquire():
# with FileLock(lock_file_path, timeout=settings.joj3_lock_file_timeout).acquire():
logger.info("file lock acquired") logger.info("file lock acquired")
retry_interval = 1 retry_interval = 1
git_push_ok = False git_push_ok = False
@ -590,6 +501,9 @@ def joj3_check_env(
"Example: --penalty-config 24=0.75,48=0.5" "Example: --penalty-config 24=0.75,48=0.5"
), ),
), ),
ignore_submitter: bool = Option(
False, help="ignore submitter when checking submission count"
),
) -> None: ) -> None:
app.pretty_exceptions_enable = False app.pretty_exceptions_enable = False
set_settings(Settings(_env_file=env_path)) set_settings(Settings(_env_file=env_path))
@ -608,7 +522,7 @@ def joj3_check_env(
penalty_config, penalty_config,
) )
count_msg, count_failed = tea.pot.joj3_check_submission_count( count_msg, count_failed = tea.pot.joj3_check_submission_count(
env, grading_repo_name, group_config, scoreboard_filename env, grading_repo_name, group_config, scoreboard_filename, ignore_submitter
) )
echo( echo(
json.dumps( json.dumps(
@ -621,6 +535,16 @@ def joj3_check_env(
logger.info("joj3-check-env done") logger.info("joj3-check-env done")
@app.command("joj3-check-gitea-token")
def joj3_check_gitea_token(
env_path: str = Argument("", help="path to .env file")
) -> None:
app.pretty_exceptions_enable = False
set_settings(Settings(_env_file=env_path))
set_logger(settings.stderr_log_level)
tea.pot.gitea.organization_api.org_list_repos(settings.gitea_org_name)
if __name__ == "__main__": if __name__ == "__main__":
try: try:
app() app()

View File

@ -40,7 +40,7 @@ class Settings(BaseSettings):
joj_sid: str = "" joj_sid: str = ""
# joj3 # joj3
joj3_lock_file_path: str = ".git/teapot.lock" joj3_lock_file_path: str = ".git/teapot-joj3-all-env.lock"
joj3_lock_file_timeout: int = 30 joj3_lock_file_timeout: int = 30
# moss # moss

View File

@ -359,7 +359,9 @@ class Teapot:
grading_repo_name: str, grading_repo_name: str,
group_config: str, group_config: str,
scoreboard_filename: str, scoreboard_filename: str,
ignore_submitter: bool,
) -> Tuple[str, bool]: ) -> Tuple[str, bool]:
submitter = env.github_actor
submitter_repo_name = env.github_repository.split("/")[-1] submitter_repo_name = env.github_repository.split("/")[-1]
repo: Repo = self.git.get_repo(grading_repo_name) repo: Repo = self.git.get_repo(grading_repo_name)
now = datetime.now(timezone.utc) now = datetime.now(timezone.utc)
@ -386,11 +388,13 @@ class Teapot:
time_windows.append(since) time_windows.append(since)
valid_items.append((name, max_count, time_period, since)) valid_items.append((name, max_count, time_period, since))
logger.info(f"valid items: {valid_items}, time windows: {time_windows}") logger.info(f"valid items: {valid_items}, time windows: {time_windows}")
all_commits = [] matched_commits = []
all_commits_length = 0
if time_windows: if time_windows:
earliest_since = min(time_windows).strftime("%Y-%m-%dT%H:%M:%S") earliest_since = min(time_windows).strftime("%Y-%m-%dT%H:%M:%S")
commits = repo.iter_commits(paths=scoreboard_filename, since=earliest_since) commits = repo.iter_commits(paths=scoreboard_filename, since=earliest_since)
for commit in commits: for commit in commits:
all_commits_length += 1
lines = commit.message.strip().splitlines() lines = commit.message.strip().splitlines()
if not lines: if not lines:
continue continue
@ -400,25 +404,28 @@ class Teapot:
d = match.groupdict() d = match.groupdict()
if ( if (
env.joj3_conf_name != d["exercise_name"] env.joj3_conf_name != d["exercise_name"]
or env.github_actor != d["submitter"]
or submitter_repo_name != d["submitter_repo_name"] or submitter_repo_name != d["submitter_repo_name"]
): ):
continue continue
if not ignore_submitter and submitter != d["submitter"]:
continue
groups_line = next((l for l in lines if l.startswith("groups: ")), None) groups_line = next((l for l in lines if l.startswith("groups: ")), None)
commit_groups = ( commit_groups = (
groups_line[len("groups: ") :].split(",") if groups_line else [] groups_line[len("groups: ") :].split(",") if groups_line else []
) )
all_commits.append( matched_commits.append(
{ {
"time": commit.committed_datetime, "time": commit.committed_datetime,
"groups": [g.strip() for g in commit_groups], "groups": [g.strip() for g in commit_groups],
} }
) )
logger.info(f"all commits length: {len(all_commits)}") logger.info(
f"matched commits length: {len(matched_commits)}, all commits length: {all_commits_length}"
)
for name, max_count, time_period, since in valid_items: for name, max_count, time_period, since in valid_items:
submit_count = 0 submit_count = 0
time_limit = now - timedelta(hours=time_period) time_limit = now - timedelta(hours=time_period)
for commit in all_commits: for commit in matched_commits:
if commit["time"] < time_limit: if commit["time"] < time_limit:
continue continue
if name: if name:
@ -428,7 +435,7 @@ class Teapot:
continue continue
submit_count += 1 submit_count += 1
logger.info( logger.info(
f"submitter {env.github_actor} is submitting for the {submit_count + 1} time, " f"submitter {submitter} is submitting for the {submit_count + 1} time, "
f"{min(0, max_count - submit_count - 1)} time(s) remaining, " f"{min(0, max_count - submit_count - 1)} time(s) remaining, "
f"group={name}, " f"group={name}, "
f"time period={time_period} hour(s), " f"time period={time_period} hour(s), "

View File

@ -38,10 +38,7 @@ def set_logger(
) -> None: ) -> None:
logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True) logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)
logger.remove() logger.remove()
logger.add( logger.add(stderr, level=stderr_log_level, colorize=stderr.isatty())
stderr,
level=stderr_log_level,
)
logger.add(settings.log_file_path, level="DEBUG") logger.add(settings.log_file_path, level="DEBUG")

View File

@ -39,19 +39,8 @@ class Canvas:
student.name = ( student.name = (
re.sub(r"[^\x00-\x7F]+", "", student.name).strip().title() re.sub(r"[^\x00-\x7F]+", "", student.name).strip().title()
) # We only care english name ) # We only care english name
# Some users (like system users, announcers) might not have login_id student.sis_id = student.login_id
if hasattr(student, "login_id") and student.login_id: student.login_id = student.email.split("@")[0]
student.sis_id = student.login_id
student.login_id = student.email.split("@")[0]
else:
# For users without login_id, use email prefix as both sis_id and login_id
if hasattr(student, "email") and student.email:
student.login_id = student.email.split("@")[0]
student.sis_id = student.login_id
else:
# Fallback for users without email
student.login_id = f"user_{student.id}"
student.sis_id = student.login_id
return student return student
self.students = [ self.students = [

View File

@ -90,28 +90,23 @@ class Git:
retry_interval = 2 retry_interval = 2
while retry_interval and auto_retry: while retry_interval and auto_retry:
try: try:
current_branch = ""
if repo.head.is_detached:
current_branch = repo.head.commit.hexsha
else:
current_branch = repo.active_branch.name
if clean_git_lock: if clean_git_lock:
lock_files = [ locks_removed_count = 0
"index.lock", for root, _, files in os.walk(os.path.join(repo_dir, ".git")):
"HEAD.lock", for filename in files:
"fetch-pack.lock", if filename.endswith(".lock"):
"logs/HEAD.lock", lock_file_path = os.path.join(root, filename)
"packed-refs.lock", if (
"config.lock", os.path.relpath(lock_file_path, repo_dir)
f"refs/heads/{current_branch}.lock", == settings.joj3_lock_file_path
f"refs/remotes/origin/{current_branch}.lock", ):
f"refs/heads/{checkout_dest}.lock", continue
f"refs/remotes/origin/{checkout_dest}.lock", try:
] os.remove(lock_file_path)
for lock_file in lock_files: locks_removed_count += 1
lock_path = os.path.join(repo_dir, ".git", lock_file) except OSError as e:
if os.path.exists(lock_path): logger.warning(f"Error removing lock file: {e}")
os.remove(lock_path) logger.info(f"Removed {locks_removed_count} lock files")
repo.git.fetch("--tags", "--all", "-f") repo.git.fetch("--tags", "--all", "-f")
repo.git.reset("--hard", reset_target) repo.git.reset("--hard", reset_target)
repo.git.clean("-d", "-f", "-x") repo.git.clean("-d", "-f", "-x")

View File

@ -1,11 +1,9 @@
import re import re
from enum import Enum from enum import Enum
from functools import lru_cache from functools import lru_cache
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, cast from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, TypeVar
from urllib.parse import quote
import focs_gitea import focs_gitea
import requests # type: ignore
from canvasapi.group import Group, GroupMembership from canvasapi.group import Group, GroupMembership
from canvasapi.paginated_list import PaginatedList from canvasapi.paginated_list import PaginatedList
from canvasapi.user import User from canvasapi.user import User
@ -22,13 +20,11 @@ class PermissionEnum(Enum):
admin = "admin" admin = "admin"
def list_all(method: Callable[..., Any], *args: Any, **kwargs: Any) -> List[Any]: T = TypeVar("T")
"""Call paginated API methods repeatedly and collect results.
The exact return element types vary depending on the API client. We use
``Any`` here to avoid over-constraining typing for the external client. def list_all(method: Callable[..., Iterable[T]], *args: Any, **kwargs: Any) -> List[T]:
""" all_res = []
all_res: List[Any] = []
page = 1 page = 1
while True: while True:
res = method(*args, **kwargs, page=page) res = method(*args, **kwargs, page=page)
@ -40,165 +36,6 @@ def list_all(method: Callable[..., Any], *args: Any, **kwargs: Any) -> List[Any]
return all_res return all_res
def _get_color(c: Optional[str]) -> str:
if not c:
return "CCCCCC"
s = c.strip()
if s.startswith("#"):
s = s[1:]
if re.fullmatch(r"[0-9a-fA-F]{6}", s):
return s
logger.warning(f"Provided color '{c}' is not a valid hex; falling back to #CCCCCC")
return "CCCCCC"
def _label_id_from_obj(obj: Any) -> Optional[int]:
try:
if isinstance(obj, dict):
return obj.get("id")
return getattr(obj, "id", None)
except Exception:
return None
def _get_repo_labels(gitea: Any, repo_name: str) -> List[Any]:
try:
return list(
cast(
Iterable[Any],
gitea.issue_api.issue_list_labels(gitea.org_name, repo_name),
)
)
except ApiException as e:
logger.error(f"Failed to list labels for {repo_name}: {e}")
return []
def _list_issues_map(gitea: Any, repo_name: str) -> Dict[int, Any]:
try:
return {
issue.number: issue
for issue in list_all(
gitea.issue_api.issue_list_issues, gitea.org_name, repo_name
)
}
except ApiException as e:
logger.error(f"Failed to list issues for {repo_name}: {e}")
return {}
def _api_post_labels(gitea: Any, repo_name: str, issue_num: int, payload: Any) -> Any:
path = f"/repos/{gitea.org_name}/{repo_name}/issues/{issue_num}/labels"
full_url = f"{gitea.api_client.configuration.host}{path}"
token = (
getattr(gitea.api_client.configuration, "api_key", {}).get("access_token")
or settings.gitea_access_token
)
headers_local = {
"Authorization": f"token {token}",
"Content-Type": "application/json",
}
return requests.post(full_url, headers=headers_local, json=payload, timeout=10)
def _patch_label_exclusive(
gitea: Any, repo_name: str, name: str, label_obj: Any
) -> None:
try:
existing_color = getattr(label_obj, "color", None) or (
label_obj.get("color") if isinstance(label_obj, dict) else None
)
if (
existing_color
and isinstance(existing_color, str)
and existing_color.startswith("#")
):
existing_color = existing_color[1:]
enc_name = quote(name, safe="")
path = f"/repos/{gitea.org_name}/{repo_name}/labels/{enc_name}"
url = f"{gitea.api_client.configuration.host}{path}"
token = (
getattr(gitea.api_client.configuration, "api_key", {}).get("access_token")
or settings.gitea_access_token
)
headers_local = {
"Authorization": f"token {token}",
"Content-Type": "application/json",
}
payload = {"exclusive": True, "name": name}
if existing_color:
payload["color"] = existing_color
resp = requests.patch(url, headers=headers_local, json=payload, timeout=10)
if resp.status_code in (200, 201):
logger.info(f"Marked existing label '{name}' as exclusive in {repo_name}")
else:
logger.warning(
f"Failed to mark existing label '{name}' as exclusive: status={resp.status_code}"
)
except Exception as e:
logger.debug(f"Could not patch label exclusive for {name}: {e}")
def _delete_issue_label_by_id(
gitea: Any, repo_name: str, issue_num: int, lid: int
) -> None:
try:
path_id = f"/repos/{gitea.org_name}/{repo_name}/issues/{issue_num}/labels/{lid}"
url_id = f"{gitea.api_client.configuration.host}{path_id}"
token = (
getattr(gitea.api_client.configuration, "api_key", {}).get("access_token")
or settings.gitea_access_token
)
headers_local = {"Authorization": f"token {token}"}
resp = requests.delete(url_id, headers=headers_local, timeout=10)
if resp.status_code in (200, 204):
logger.info(f"Removed label id {lid} from {repo_name}#{issue_num}")
else:
logger.warning(
f"Failed to remove label id {lid} from {repo_name}#{issue_num}: status={resp.status_code}"
)
except Exception as e:
logger.warning(
f"Error removing label id {lid} from {repo_name}#{issue_num}: {e}"
)
def _create_label(
gitea: Any, repo_name: str, label_name: str, color: Optional[str], labels: List[Any]
) -> Optional[Any]:
for l in labels:
if getattr(l, "name", None) == label_name:
try:
is_ex = getattr(l, "exclusive", None)
if not bool(is_ex):
_patch_label_exclusive(gitea, repo_name, label_name, l)
except Exception:
return l
return l
chosen_color = _get_color(color)
try:
new = gitea.issue_api.issue_create_label(
gitea.org_name,
repo_name,
body={"name": label_name, "color": chosen_color, "exclusive": True},
)
logger.info(f"Created label '{label_name}' in {gitea.org_name}/{repo_name}")
return new
except ApiException as e:
logger.error(f"Failed to create label {label_name} in {repo_name}: {e}")
if hasattr(e, "body"):
logger.error(f"ApiException body: {getattr(e, 'body', None)}")
return None
def _extract_label_names(issue_obj: Any) -> List[str]:
try:
return [getattr(l, "name", l) for l in getattr(issue_obj, "labels", []) or []]
except Exception:
return []
class Gitea: class Gitea:
def __init__( def __init__(
self, self,
@ -640,44 +477,6 @@ class Gitea:
self.org_name, repo_name, issue.number, body={"state": "closed"} self.org_name, repo_name, issue.number, body={"state": "closed"}
) )
def close_issues(
self, repo_name: str, issue_numbers: List[int], dry_run: bool = False
) -> None:
if not issue_numbers:
logger.warning("No issue numbers provided to close")
return
if dry_run:
logger.info("Dry run enabled. No changes will be made to issues.")
try:
issues = {
issue.number: issue
for issue in list_all(
self.issue_api.issue_list_issues, self.org_name, repo_name
)
}
except ApiException as e:
logger.error(f"Failed to list issues for {repo_name}: {e}")
return
for num in issue_numbers:
issue = issues.get(num)
if issue is None:
logger.warning(f"Issue #{num} not found in {repo_name}")
continue
if getattr(issue, "state", "") == "closed":
logger.info(f"Issue #{num} in {repo_name} already closed")
continue
try:
if dry_run:
logger.info(f"Would close issue #{num} in {repo_name} (dry run)")
continue
self.issue_api.issue_edit_issue(
self.org_name, repo_name, num, body={"state": "closed"}
)
logger.info(f"Closed issue #{num} in {repo_name}")
except ApiException as e:
logger.error(f"Failed to close issue #{num} in {repo_name}: {e}")
def archive_repos(self, regex: str = ".+", dry_run: bool = True) -> None: def archive_repos(self, regex: str = ".+", dry_run: bool = True) -> None:
if dry_run: if dry_run:
logger.info("Dry run enabled. No changes will be made to the repositories.") logger.info("Dry run enabled. No changes will be made to the repositories.")
@ -695,6 +494,7 @@ class Gitea:
self.repository_api.user_current_delete_subscription( self.repository_api.user_current_delete_subscription(
self.org_name, repo.name self.org_name, repo.name
) )
logger.info(f"Unwatched {repo.name}")
def get_all_teams(self) -> Dict[str, List[str]]: def get_all_teams(self) -> Dict[str, List[str]]:
res: Dict[str, List[str]] = {} res: Dict[str, List[str]] = {}
@ -749,241 +549,6 @@ class Gitea:
self.create_milestone(repo_name, milestone, description, due_date) self.create_milestone(repo_name, milestone, description, due_date)
logger.info(f"Created milestone {milestone} in {repo_name}") logger.info(f"Created milestone {milestone} in {repo_name}")
def label_issues(
self,
repo_name: str,
label_name: str,
issue_numbers: List[int],
color: Optional[str] = None,
) -> None:
if not issue_numbers:
logger.warning("No issue numbers provided to label")
return
labels = _get_repo_labels(self, repo_name)
if not labels:
logger.warning(f"No labels found for {repo_name}")
return
repo_label_name_to_obj: Dict[str, Any] = {}
for l in labels:
name = getattr(l, "name", None) or (
l.get("name") if isinstance(l, dict) else None
)
if isinstance(name, str):
repo_label_name_to_obj[name] = l
label_obj = _create_label(self, repo_name, label_name, color, labels)
if label_obj is None:
logger.error(f"Unable to ensure label '{label_name}' exists in {repo_name}")
return
label_id = _label_id_from_obj(label_obj)
if label_id is None:
logger.error(f"Unable to find id of label '{label_name}' in {repo_name}")
return
issues_map = _list_issues_map(self, repo_name)
if not issues_map:
return
for num in issue_numbers:
issue = issues_map.get(num)
if issue is None:
logger.warning(f"Issue #{num} not found in {repo_name}")
continue
existing_label_names = _extract_label_names(issue)
if label_name in existing_label_names:
logger.info(
f"Issue #{num} in {repo_name} already has label '{label_name}'"
)
continue
try:
current = self.issue_api.issue_get_issue(self.org_name, repo_name, num)
issue_labels = _extract_label_names(current)
except Exception:
issue_labels = existing_label_names
try:
target_obj = repo_label_name_to_obj.get(label_name) or label_obj
target_is_exclusive = bool(
getattr(target_obj, "exclusive", None)
or (
target_obj.get("exclusive")
if isinstance(target_obj, dict)
else False
)
)
except Exception:
target_is_exclusive = False
if target_is_exclusive:
for lname in issue_labels or []:
if lname == label_name:
continue
lobj = repo_label_name_to_obj.get(lname)
is_ex = False
if lobj is not None:
is_ex = bool(
getattr(lobj, "exclusive", None)
or (
lobj.get("exclusive")
if isinstance(lobj, dict)
else False
)
)
if is_ex:
lid = _label_id_from_obj(lobj) if lobj is not None else None
if lid is not None:
_delete_issue_label_by_id(self, repo_name, num, lid)
if label_name not in (issue_labels or []):
try:
resp = _api_post_labels(
self, repo_name, num, {"labels": [label_name]}
)
if getattr(resp, "status_code", None) not in (200, 201):
logger.error(
f"Failed to add label via add-labels endpoint for issue #{num}: status={getattr(resp, 'status_code', None)}"
)
except Exception as e:
logger.error(f"Failed to POST labels to issue #{num}: {e}")
# verification
try:
final = self.issue_api.issue_get_issue(self.org_name, repo_name, num)
final_labels = _extract_label_names(final)
except Exception:
final_labels = []
if label_name in (final_labels or []):
logger.info(
f"Label '{label_name}' attached to issue #{num} in {repo_name}"
)
else:
logger.warning(
f"Label '{label_name}' not attached to issue #{num} in {repo_name} after attempts"
)
def delete_label(
self,
repo_name: str,
label_name: str,
issue_numbers: Optional[List[int]] = None,
delete_repo_label: bool = False,
) -> None:
token = (
getattr(self.api_client.configuration, "api_key", {}).get("access_token")
or settings.gitea_access_token
)
headers_local = {"Authorization": f"token {token}"}
repo_labels: List[Any] = []
try:
repo_labels = list(
cast(
Iterable[Any],
self.issue_api.issue_list_labels(self.org_name, repo_name),
)
)
label_name_to_id: Dict[str, int] = {}
for l in repo_labels:
name = getattr(l, "name", None) or (
l.get("name") if isinstance(l, dict) else None
)
lid = getattr(l, "id", None) or (
l.get("id") if isinstance(l, dict) else None
)
if isinstance(name, str) and isinstance(lid, int):
label_name_to_id[name] = lid
except Exception:
label_name_to_id = {}
def _delete_issue_label(issue_num: int) -> None:
lid = label_name_to_id.get(label_name)
if lid is None:
try:
for l in repo_labels:
name = getattr(l, "name", None) or (
l.get("name") if isinstance(l, dict) else None
)
if name == label_name:
lid = getattr(l, "id", None) or (
l.get("id") if isinstance(l, dict) else None
)
break
except Exception:
logger.warning(
f"Could not determine id of label '{label_name}' in {repo_name}"
)
if lid is None:
logger.warning(
f"No numeric id found for label '{label_name}' in {repo_name}; skipping issue-level delete for issue #{issue_num}"
)
return
path_id = (
f"/repos/{self.org_name}/{repo_name}/issues/{issue_num}/labels/{lid}"
)
url_id = f"{self.api_client.configuration.host}{path_id}"
try:
resp = requests.delete(url_id, headers=headers_local, timeout=10)
if resp.status_code in (200, 204):
logger.info(
f"Removed label '{label_name}' from {repo_name}#{issue_num}"
)
return
logger.warning(
f"Numeric-id DELETE returned status {resp.status_code} for {repo_name}#{issue_num}: body={getattr(resp, 'text', None)}"
)
except Exception as e:
logger.error(
f"Numeric-id DELETE error for {repo_name}#{issue_num}: {e}"
)
def _delete_repo_label() -> None:
enc_name = quote(label_name, safe="")
path = f"/repos/{self.org_name}/{repo_name}/labels/{enc_name}"
url = f"{self.api_client.configuration.host}{path}"
try:
resp = requests.delete(url, headers=headers_local, timeout=10)
if resp.status_code in (200, 204):
logger.info(
f"Removed repo-level label '{label_name}' from {repo_name}"
)
return
logger.error(
f"Failed to delete repo label '{label_name}' from {repo_name}: status={resp.status_code}, body={getattr(resp, 'text', None)}"
)
except Exception as e:
logger.error(
f"Error deleting repo label '{label_name}' from {repo_name}: {e}"
)
if issue_numbers:
try:
issues = {
issue.number: issue
for issue in list_all(
self.issue_api.issue_list_issues, self.org_name, repo_name
)
}
except ApiException as e:
logger.error(f"Failed to list issues for {repo_name}: {e}")
return
for num in issue_numbers:
if num not in issues:
logger.warning(f"Issue #{num} not found in {repo_name}")
continue
_delete_issue_label(num)
else:
if delete_repo_label:
_delete_repo_label()
else:
logger.warning(
"No issue numbers provided and --repo not set; nothing to do for delete_label"
)
if __name__ == "__main__": if __name__ == "__main__":
gitea = Gitea() gitea = Gitea()

View File

@ -103,109 +103,6 @@ class Mattermost:
) )
logger.info(f"Added member {member} to channel {channel_name}") logger.info(f"Added member {member} to channel {channel_name}")
def update_channels_for_groups(
self,
groups: Dict[str, List[str]],
suffix: str = "",
update_teaching_team: bool = True,
dry_run: bool = False,
) -> None:
for group_name, members in groups.items():
channel_name = group_name + suffix
try:
channel = self.endpoint.channels.get_channel_by_name(
self.team["id"], channel_name
)
logger.info(f"Channel {channel_name} exists, updating members")
except Exception:
# channel does not exist
if dry_run:
info_members = list(members)
if update_teaching_team:
info_members = info_members + settings.mattermost_teaching_team
logger.info(
f"Dry run: would create channel {channel_name} and add members: {info_members}"
)
continue
try:
channel = self.endpoint.channels.create_channel(
{
"team_id": self.team["id"],
"name": channel_name,
"display_name": channel_name,
"type": "P",
}
)
logger.info(f"Created channel {channel_name} on Mattermost")
except Exception as e:
logger.warning(
f"Error when creating channel {channel_name}: {e} Perhaps channel already exists?"
)
continue
current_members = set()
try:
mm_members = (
self.endpoint.channels.get_channel_members(channel["id"]) or []
)
for m in mm_members:
uname = None
if isinstance(m, dict):
uname = m.get("username") or m.get("name")
if not uname and "user" in m and isinstance(m["user"], dict):
uname = m["user"].get("username") or m["user"].get("name")
if not uname and "user_id" in m:
try:
u = self.endpoint.users.get_user(m["user_id"]) or {}
if isinstance(u, dict):
uname = u.get("username") or u.get("name")
except Exception:
uname = None
if uname:
current_members.add(uname.lower())
except Exception:
current_members = set()
add_members = list(members)
if update_teaching_team:
add_members = add_members + settings.mattermost_teaching_team
for member in add_members:
if member.lower() in current_members:
logger.debug(f"Member {member} already in channel {channel_name}")
continue
if dry_run:
logger.info(
f"Dry run: would add member {member} to channel {channel_name}"
)
continue
try:
mmuser = self.endpoint.users.get_user_by_username(member)
except Exception:
logger.warning(
f"User {member} is not found on the Mattermost server"
)
self.endpoint.posts.create_post(
{
"channel_id": channel["id"],
"message": f"User {member} is not found on the Mattermost server",
}
)
continue
try:
self.endpoint.channels.add_user(
channel["id"], {"user_id": mmuser["id"]}
)
except Exception:
logger.warning(f"User {member} is not in the team")
self.endpoint.posts.create_post(
{
"channel_id": channel["id"],
"message": f"User {member} is not in the team",
}
)
logger.info(f"Added member {member} to channel {channel_name}")
def create_channels_for_individuals( def create_channels_for_individuals(
self, self,
students: PaginatedList, students: PaginatedList,
@ -269,110 +166,6 @@ class Mattermost:
logger.info(f"Added member {member} to channel {channel_name}") logger.info(f"Added member {member} to channel {channel_name}")
def update_channels_for_individuals(
self,
students: PaginatedList,
update_teaching_team: bool = True,
dry_run: bool = False,
) -> None:
for student in students:
display_name = student.name
channel_name = student.sis_id
try:
channel = self.endpoint.channels.get_channel_by_name(
self.team["id"], channel_name
)
logger.info(f"Channel {channel_name} exists, updating members")
except Exception:
if dry_run:
members_info = [student.login_id]
if update_teaching_team:
members_info = members_info + settings.mattermost_teaching_team
logger.info(
f"Dry run: would create channel {display_name} ({channel_name}) and add members: {members_info}"
)
continue
try:
channel = self.endpoint.channels.create_channel(
{
"team_id": self.team["id"],
"name": channel_name,
"display_name": display_name,
"type": "P",
}
)
logger.info(
f"Created channel {display_name} ({channel_name}) on Mattermost"
)
except Exception as e:
logger.warning(
f"Error when creating channel {channel_name}: {e} Perhaps channel already exists?"
)
continue
current_members = set()
try:
mm_members = (
self.endpoint.channels.get_channel_members(channel["id"]) or []
)
for m in mm_members:
uname = None
if isinstance(m, dict):
uname = m.get("username") or m.get("name")
if not uname and "user" in m and isinstance(m["user"], dict):
uname = m["user"].get("username") or m["user"].get("name")
if not uname and "user_id" in m:
try:
u = self.endpoint.users.get_user(m["user_id"]) or {}
if isinstance(u, dict):
uname = u.get("username") or u.get("name")
except Exception:
uname = None
if uname:
current_members.add(uname.lower())
except Exception:
current_members = set()
members = [student.login_id]
if update_teaching_team:
members = members + settings.mattermost_teaching_team
for member in members:
if member.lower() in current_members:
logger.debug(f"Member {member} already in channel {channel_name}")
continue
if dry_run:
logger.info(
f"Dry run: would add member {member} to channel {channel_name}"
)
continue
try:
mmuser = self.endpoint.users.get_user_by_username(member)
except Exception:
logger.warning(
f"User {member} is not found on the Mattermost server"
)
self.endpoint.posts.create_post(
{
"channel_id": channel["id"],
"message": f"User {member} is not found on the Mattermost server",
}
)
continue
try:
self.endpoint.channels.add_user(
channel["id"], {"user_id": mmuser["id"]}
)
except Exception:
logger.warning(f"User {member} is not in the team")
self.endpoint.posts.create_post(
{
"channel_id": channel["id"],
"message": f"User {member} is not in the team",
}
)
logger.info(f"Added member {member} to channel {channel_name}")
def create_webhooks_for_repos( def create_webhooks_for_repos(
self, repos: List[str], gitea: Gitea, gitea_suffix: bool self, repos: List[str], gitea: Gitea, gitea_suffix: bool
) -> None: ) -> None: