Compare commits

..

12 Commits

10 changed files with 847 additions and 84 deletions

View File

@ -60,11 +60,24 @@ clone all gitea repos to local
close all issues and pull requests in gitea organization
### `create-channels-on-mm`
### `create-group-channels-on-mm`
create channels for student groups according to group information on gitea. Optionally specify a prefix to ignore all repos whose names do not start with it. Optionally specify a suffix to add to all channels created.
create Mattermost channels for student groups based on team information on Gitea
Example: `python3 -m joint_teapot create-channels-on-mm --prefix p1 --suffix -private --invite-teaching-team` will fetch all repos whose names start with `"p1"` and create channels on mm for these repos like "p1team1-private". Members of a repo will be added to the corresponding channel. And teaching team (adjust in `.env`) will be invited to the channels.
**Options**:
- `--prefix TEXT`: Only process repositories starting with this prefix
- `--suffix TEXT`: Add suffix to created channels
- `--invite-teaching-team/--no-invite-teaching-team`: Whether to invite teaching team (default: invite)
Example: `joint-teapot create-group-channels-on-mm --prefix "hteam" --suffix "-gitea"` will Create channels for webhook integration. Members of "hteam*" repo will be added to the corresponding channel. And teaching team (adjust in `.env`) will be invited to the channels.
### `create-personal-channels-on-mm`
create personal Mattermost channels for every student
**Options**:
- `--invite-teaching-team/--no-invite-teaching-team`: Whether to invite teaching team (default: invite)
### `create-comment`
@ -125,6 +138,26 @@ Example: `python3 -m joint_teapot unsubscribe-from-repos '\d{12}$'` will remove
upload assignment grades to canvas from grade file (GRADE.txt by default), read the first line as grade, the rest as comments
### `label-issues`
add a label to specific issues in a repository, create labels if not exist, with dry-run disabled by default. You may adjust the color of the label with `--color "#******"` if the label doesn't exist.
### `delete-labels`
remove a label from specific issues or delete the repository labels, with dry-run disabled by default.
### `close-issues`
close one or more specific issues in a repository, with dry-run disabled by default.
### `update-group-channels-on-mm`
update group Mattermost channels for student groups based on team information on Gitea. It will only add missing users, never delete anyone.
### `update-personal-channels-on-mm`
update personal Mattermost channels for every student. It will only add missing users, never delete anyone.
## License
MIT

View File

@ -6,7 +6,7 @@ from pathlib import Path
from time import sleep
from typing import TYPE_CHECKING, List, Optional
from filelock import FileLock
# from filelock import FileLock
from git import Repo
from typer import Argument, Exit, Option, Typer, echo
@ -167,6 +167,51 @@ def close_all_issues() -> None:
tea.pot.gitea.close_all_issues()
@app.command(
"close-issues",
help="close specific issues in a repository",
)
def close_issues(
repo_name: str,
issue_numbers: List[int] = Argument(..., help="One or more issue numbers to close"),
) -> None:
tea.pot.gitea.close_issues(repo_name, issue_numbers)
@app.command(
"label-issues",
help="add a label to specific issues in a repository",
)
def label_issues(
label_name: str,
repo_name: str,
issue_numbers: List[int] = Argument(..., help="One or more issue numbers to label"),
issue_color: Optional[str] = Option(
None, "--color", help="Color for newly created label (hex without # or with #)"
),
) -> None:
tea.pot.gitea.label_issues(repo_name, label_name, issue_numbers, color=issue_color)
@app.command(
"delete-label",
help="remove a label from specific issues or delete the repository label",
)
def delete_label(
label_name: str,
repo_name: str,
issue_numbers: List[int] = Argument(
None,
help="issue numbers to remove the label from",
),
) -> None:
tea.pot.gitea.delete_label(
repo_name,
label_name,
issue_numbers if issue_numbers else None,
)
@app.command(
"archive-repos", help="archive repos in gitea organization according to regex"
)
@ -243,6 +288,49 @@ def create_personal_channels_on_mm(
tea.pot.create_channels_for_individuals(invite_teaching_team)
@app.command(
"update-group-channels-on-mm",
help="update Mattermost channels for student groups based on team information on Gitea; only add missing members",
)
def update_group_channels_on_mm(
prefix: str = Option(
"", help="Only process repositories starting with this prefix"
),
suffix: str = Option("", help="Only process channels ending with this suffix"),
update_teaching_team: bool = Option(
True,
"--update-teaching-team/--no-update-teaching-team",
help="Whether to update teaching team",
),
) -> None:
groups = {
group_name: members
for group_name, members in tea.pot.gitea.get_all_teams().items()
if group_name.startswith(prefix)
}
logger.info(
f"{len(groups)} group channel(s) to update"
+ (f" with suffix {suffix}" if suffix else "")
)
tea.pot.mattermost.update_channels_for_groups(groups, suffix, update_teaching_team)
@app.command(
"update-personal-channels-on-mm",
help="update personal Mattermost channels for every student; only add missing members",
)
def update_personal_channels_on_mm(
update_teaching_team: bool = Option(
True,
"--update-teaching-team/--no-update-teaching-team",
help="Whether to update teaching team",
),
) -> None:
tea.pot.mattermost.update_channels_for_individuals(
tea.pot.canvas.students, update_teaching_team
)
@app.command(
"create-webhooks-for-mm",
help="create a pair of webhooks on gitea and mm for all student groups on gitea, "
@ -349,7 +437,6 @@ def joj3_all_env(
submitter_repo_name = env.github_repository.split("/")[-1]
penalty_factor = joj3.get_penalty_factor(end_time, penalty_config)
total_score = joj3.get_total_score(env.joj3_output_path)
total_score = round(total_score - abs(total_score) * (1 - penalty_factor))
res = {
"totalScore": total_score,
"cappedTotalScore": (
@ -389,12 +476,13 @@ def joj3_all_env(
lock_file_path = os.path.join(
settings.repos_dir, grading_repo_name, settings.joj3_lock_file_path
)
logger.debug(
logger.info(
f"try to acquire lock, file path: {lock_file_path}, "
+ f"timeout: {settings.joj3_lock_file_timeout}"
)
with FileLock(lock_file_path, timeout=settings.joj3_lock_file_timeout).acquire():
logger.debug("file lock acquired")
if True: # disable the file lock temporarily
# with FileLock(lock_file_path, timeout=settings.joj3_lock_file_timeout).acquire():
logger.info("file lock acquired")
retry_interval = 1
git_push_ok = False
while not git_push_ok:
@ -424,9 +512,7 @@ def joj3_all_env(
os.path.join(repo_path, scoreboard_filename),
exercise_name,
submitter_repo_name,
total_score,
)
failed_stage = joj3.get_failed_stage_from_file(env.joj3_output_path)
tea.pot.git.add_commit(
grading_repo_name,
[scoreboard_filename],
@ -436,7 +522,6 @@ def joj3_all_env(
f"gitea actions link: {gitea_actions_url}\n"
f"gitea issue link: {gitea_issue_url}\n"
f"groups: {env.joj3_groups}\n"
f"failed stage: {failed_stage}\n"
),
)
if not skip_failed_table:
@ -505,9 +590,6 @@ def joj3_check_env(
"Example: --penalty-config 24=0.75,48=0.5"
),
),
ignore_submitter: bool = Option(
False, help="ignore submitter when checking submission count"
),
) -> None:
app.pretty_exceptions_enable = False
set_settings(Settings(_env_file=env_path))
@ -526,7 +608,7 @@ def joj3_check_env(
penalty_config,
)
count_msg, count_failed = tea.pot.joj3_check_submission_count(
env, grading_repo_name, group_config, scoreboard_filename, ignore_submitter
env, grading_repo_name, group_config, scoreboard_filename
)
echo(
json.dumps(
@ -539,16 +621,6 @@ def joj3_check_env(
logger.info("joj3-check-env done")
@app.command("joj3-check-gitea-token")
def joj3_check_gitea_token(
env_path: str = Argument("", help="path to .env file")
) -> None:
app.pretty_exceptions_enable = False
set_settings(Settings(_env_file=env_path))
set_logger(settings.stderr_log_level)
tea.pot.gitea.organization_api.org_list_repos(settings.gitea_org_name)
if __name__ == "__main__":
try:
app()

View File

@ -40,7 +40,7 @@ class Settings(BaseSettings):
joj_sid: str = ""
# joj3
joj3_lock_file_path: str = ".git/teapot-joj3-all-env.lock"
joj3_lock_file_path: str = ".git/teapot.lock"
joj3_lock_file_timeout: int = 30
# moss

View File

@ -266,6 +266,7 @@ class Teapot:
):
if issue.title.startswith(title_prefix):
joj3_issue = issue
logger.info(f"found joj3 issue: #{joj3_issue.number}")
break
else:
new_issue = True
@ -292,7 +293,9 @@ class Teapot:
submitter_repo_name,
body={"title": title, "body": comment, "labels": [label_id]},
)
logger.info(f"created joj3 issue: #{joj3_issue.number}")
gitea_issue_url = joj3_issue.html_url
logger.info(f"gitea issue url: {gitea_issue_url}")
if not new_issue:
self.gitea.issue_api.issue_edit_issue(
self.gitea.org_name,
@ -356,9 +359,7 @@ class Teapot:
grading_repo_name: str,
group_config: str,
scoreboard_filename: str,
ignore_submitter: bool,
) -> Tuple[str, bool]:
submitter = env.github_actor
submitter_repo_name = env.github_repository.split("/")[-1]
repo: Repo = self.git.get_repo(grading_repo_name)
now = datetime.now(timezone.utc)
@ -385,13 +386,11 @@ class Teapot:
time_windows.append(since)
valid_items.append((name, max_count, time_period, since))
logger.info(f"valid items: {valid_items}, time windows: {time_windows}")
matched_commits = []
all_commits_length = 0
all_commits = []
if time_windows:
earliest_since = min(time_windows).strftime("%Y-%m-%dT%H:%M:%S")
commits = repo.iter_commits(paths=scoreboard_filename, since=earliest_since)
for commit in commits:
all_commits_length += 1
lines = commit.message.strip().splitlines()
if not lines:
continue
@ -401,28 +400,25 @@ class Teapot:
d = match.groupdict()
if (
env.joj3_conf_name != d["exercise_name"]
or env.github_actor != d["submitter"]
or submitter_repo_name != d["submitter_repo_name"]
):
continue
if not ignore_submitter and submitter != d["submitter"]:
continue
groups_line = next((l for l in lines if l.startswith("groups: ")), None)
commit_groups = (
groups_line[len("groups: ") :].split(",") if groups_line else []
)
matched_commits.append(
all_commits.append(
{
"time": commit.committed_datetime,
"groups": [g.strip() for g in commit_groups],
}
)
logger.info(
f"matched commits length: {len(matched_commits)}, all commits length: {all_commits_length}"
)
logger.info(f"all commits length: {len(all_commits)}")
for name, max_count, time_period, since in valid_items:
submit_count = 0
time_limit = now - timedelta(hours=time_period)
for commit in matched_commits:
for commit in all_commits:
if commit["time"] < time_limit:
continue
if name:
@ -432,7 +428,7 @@ class Teapot:
continue
submit_count += 1
logger.info(
f"submitter {submitter} is submitting for the {submit_count + 1} time, "
f"submitter {env.github_actor} is submitting for the {submit_count + 1} time, "
f"{min(0, max_count - submit_count - 1)} time(s) remaining, "
f"group={name}, "
f"time period={time_period} hour(s), "

View File

@ -7,7 +7,6 @@ from typing import Any, Dict, List, Optional, Tuple
from pydantic_settings import BaseSettings
from joint_teapot.config import settings
from joint_teapot.utils.logger import logger
@ -42,7 +41,6 @@ def generate_scoreboard(
scoreboard_file_path: str,
exercise_name: str,
submitter_repo_name: str,
exercise_total_score: int,
) -> None:
if not scoreboard_file_path.endswith(".csv"):
logger.error(
@ -101,6 +99,11 @@ def generate_scoreboard(
for row in data:
row.insert(index, "")
exercise_total_score = 0
for stage in stages:
for result in stage["results"]:
exercise_total_score += result["score"]
exercise_total_score = exercise_total_score
submitter_row[columns.index(exercise_name)] = str(exercise_total_score)
total = 0
@ -143,18 +146,6 @@ def get_failed_table_from_file(table_file_path: str) -> List[List[str]]:
return data
def get_failed_stage_from_file(score_file_path: str) -> str:
with open(score_file_path) as json_file:
stages: List[Dict[str, Any]] = json.load(json_file)
failed_stage = ""
for stage in stages:
if stage["force_quit"] == True:
failed_stage = stage["name"]
break
return failed_stage
def update_failed_table_from_score_file(
data: List[List[str]],
score_file_path: str,
@ -162,23 +153,31 @@ def update_failed_table_from_score_file(
repo_link: str,
action_link: str,
) -> None:
failed_stage = get_failed_stage_from_file(score_file_path)
# get info from score file
with open(score_file_path) as json_file:
stages: List[Dict[str, Any]] = json.load(json_file)
failed_name = ""
for stage in stages:
if stage["force_quit"] == True:
failed_name = stage["name"]
break
# append to failed table
now = datetime.now().strftime("%Y-%m-%d %H:%M")
repo = f"[{repo_name}]({repo_link})"
failure = f"[{failed_stage}]({action_link})"
failure = f"[{failed_name}]({action_link})"
row_found = False
for i, row in enumerate(data[:]):
if row[1] == repo:
row_found = True
if failed_stage == "":
if failed_name == "":
data.remove(row)
else:
data[i][0] = now
data[i][2] = failure
break
if not row_found and failed_stage != "":
if not row_found and failed_name != "":
data.append([now, repo, failure])
@ -244,7 +243,7 @@ def generate_title_and_comment(
f"Generated at {now} from [Gitea Actions #{run_number}]({action_link}), "
f"commit {commit_hash}, "
f"triggered by @{submitter}, "
f"run ID [`{run_id}`](https://focs.ji.sjtu.edu.cn/joj-mon/d/{settings.gitea_org_name}?var-Filters=RunID%7C%3D%7C{run_id}).\n"
f"run ID `{run_id}`.\n"
"Powered by [JOJ3](https://github.com/joint-online-judge/JOJ3) and "
"[Joint-Teapot](https://github.com/BoYanZh/Joint-Teapot) with ❤️.\n"
)

View File

@ -38,7 +38,10 @@ def set_logger(
) -> None:
logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)
logger.remove()
logger.add(stderr, level=stderr_log_level, colorize=stderr.isatty())
logger.add(
stderr,
level=stderr_log_level,
)
logger.add(settings.log_file_path, level="DEBUG")

View File

@ -39,8 +39,19 @@ class Canvas:
student.name = (
re.sub(r"[^\x00-\x7F]+", "", student.name).strip().title()
) # We only care english name
student.sis_id = student.login_id
student.login_id = student.email.split("@")[0]
# Some users (like system users, announcers) might not have login_id
if hasattr(student, "login_id") and student.login_id:
student.sis_id = student.login_id
student.login_id = student.email.split("@")[0]
else:
# For users without login_id, use email prefix as both sis_id and login_id
if hasattr(student, "email") and student.email:
student.login_id = student.email.split("@")[0]
student.sis_id = student.login_id
else:
# Fallback for users without email
student.login_id = f"user_{student.id}"
student.sis_id = student.login_id
return student
self.students = [

View File

@ -90,23 +90,28 @@ class Git:
retry_interval = 2
while retry_interval and auto_retry:
try:
current_branch = ""
if repo.head.is_detached:
current_branch = repo.head.commit.hexsha
else:
current_branch = repo.active_branch.name
if clean_git_lock:
locks_removed_count = 0
for root, _, files in os.walk(os.path.join(repo_dir, ".git")):
for filename in files:
if filename.endswith(".lock"):
lock_file_path = os.path.join(root, filename)
if (
os.path.relpath(lock_file_path, repo_dir)
== settings.joj3_lock_file_path
):
continue
try:
os.remove(lock_file_path)
locks_removed_count += 1
except OSError as e:
logger.warning(f"error removing lock file: {e}")
logger.info(f"removed {locks_removed_count} lock files")
lock_files = [
"index.lock",
"HEAD.lock",
"fetch-pack.lock",
"logs/HEAD.lock",
"packed-refs.lock",
"config.lock",
f"refs/heads/{current_branch}.lock",
f"refs/remotes/origin/{current_branch}.lock",
f"refs/heads/{checkout_dest}.lock",
f"refs/remotes/origin/{checkout_dest}.lock",
]
for lock_file in lock_files:
lock_path = os.path.join(repo_dir, ".git", lock_file)
if os.path.exists(lock_path):
os.remove(lock_path)
repo.git.fetch("--tags", "--all", "-f")
repo.git.reset("--hard", reset_target)
repo.git.clean("-d", "-f", "-x")
@ -143,7 +148,9 @@ class Git:
try:
repo.index.add(file)
except OSError:
logger.warning(f'file path "{file}" does not exist, skipped')
logger.warning(
f'File path "{file}" does not exist. Skipping this file.'
)
continue
if repo.is_dirty(untracked_files=True) or repo.index.diff(None):
repo.index.commit(commit_message)

View File

@ -1,9 +1,11 @@
import re
from enum import Enum
from functools import lru_cache
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, TypeVar
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, cast
from urllib.parse import quote
import focs_gitea
import requests # type: ignore
from canvasapi.group import Group, GroupMembership
from canvasapi.paginated_list import PaginatedList
from canvasapi.user import User
@ -20,11 +22,13 @@ class PermissionEnum(Enum):
admin = "admin"
T = TypeVar("T")
def list_all(method: Callable[..., Any], *args: Any, **kwargs: Any) -> List[Any]:
"""Call paginated API methods repeatedly and collect results.
def list_all(method: Callable[..., Iterable[T]], *args: Any, **kwargs: Any) -> List[T]:
all_res = []
The exact return element types vary depending on the API client. We use
``Any`` here to avoid over-constraining typing for the external client.
"""
all_res: List[Any] = []
page = 1
while True:
res = method(*args, **kwargs, page=page)
@ -36,6 +40,165 @@ def list_all(method: Callable[..., Iterable[T]], *args: Any, **kwargs: Any) -> L
return all_res
def _get_color(c: Optional[str]) -> str:
if not c:
return "CCCCCC"
s = c.strip()
if s.startswith("#"):
s = s[1:]
if re.fullmatch(r"[0-9a-fA-F]{6}", s):
return s
logger.warning(f"Provided color '{c}' is not a valid hex; falling back to #CCCCCC")
return "CCCCCC"
def _label_id_from_obj(obj: Any) -> Optional[int]:
try:
if isinstance(obj, dict):
return obj.get("id")
return getattr(obj, "id", None)
except Exception:
return None
def _get_repo_labels(gitea: Any, repo_name: str) -> List[Any]:
try:
return list(
cast(
Iterable[Any],
gitea.issue_api.issue_list_labels(gitea.org_name, repo_name),
)
)
except ApiException as e:
logger.error(f"Failed to list labels for {repo_name}: {e}")
return []
def _list_issues_map(gitea: Any, repo_name: str) -> Dict[int, Any]:
try:
return {
issue.number: issue
for issue in list_all(
gitea.issue_api.issue_list_issues, gitea.org_name, repo_name
)
}
except ApiException as e:
logger.error(f"Failed to list issues for {repo_name}: {e}")
return {}
def _api_post_labels(gitea: Any, repo_name: str, issue_num: int, payload: Any) -> Any:
path = f"/repos/{gitea.org_name}/{repo_name}/issues/{issue_num}/labels"
full_url = f"{gitea.api_client.configuration.host}{path}"
token = (
getattr(gitea.api_client.configuration, "api_key", {}).get("access_token")
or settings.gitea_access_token
)
headers_local = {
"Authorization": f"token {token}",
"Content-Type": "application/json",
}
return requests.post(full_url, headers=headers_local, json=payload, timeout=10)
def _patch_label_exclusive(
gitea: Any, repo_name: str, name: str, label_obj: Any
) -> None:
try:
existing_color = getattr(label_obj, "color", None) or (
label_obj.get("color") if isinstance(label_obj, dict) else None
)
if (
existing_color
and isinstance(existing_color, str)
and existing_color.startswith("#")
):
existing_color = existing_color[1:]
enc_name = quote(name, safe="")
path = f"/repos/{gitea.org_name}/{repo_name}/labels/{enc_name}"
url = f"{gitea.api_client.configuration.host}{path}"
token = (
getattr(gitea.api_client.configuration, "api_key", {}).get("access_token")
or settings.gitea_access_token
)
headers_local = {
"Authorization": f"token {token}",
"Content-Type": "application/json",
}
payload = {"exclusive": True, "name": name}
if existing_color:
payload["color"] = existing_color
resp = requests.patch(url, headers=headers_local, json=payload, timeout=10)
if resp.status_code in (200, 201):
logger.info(f"Marked existing label '{name}' as exclusive in {repo_name}")
else:
logger.warning(
f"Failed to mark existing label '{name}' as exclusive: status={resp.status_code}"
)
except Exception as e:
logger.debug(f"Could not patch label exclusive for {name}: {e}")
def _delete_issue_label_by_id(
gitea: Any, repo_name: str, issue_num: int, lid: int
) -> None:
try:
path_id = f"/repos/{gitea.org_name}/{repo_name}/issues/{issue_num}/labels/{lid}"
url_id = f"{gitea.api_client.configuration.host}{path_id}"
token = (
getattr(gitea.api_client.configuration, "api_key", {}).get("access_token")
or settings.gitea_access_token
)
headers_local = {"Authorization": f"token {token}"}
resp = requests.delete(url_id, headers=headers_local, timeout=10)
if resp.status_code in (200, 204):
logger.info(f"Removed label id {lid} from {repo_name}#{issue_num}")
else:
logger.warning(
f"Failed to remove label id {lid} from {repo_name}#{issue_num}: status={resp.status_code}"
)
except Exception as e:
logger.warning(
f"Error removing label id {lid} from {repo_name}#{issue_num}: {e}"
)
def _create_label(
gitea: Any, repo_name: str, label_name: str, color: Optional[str], labels: List[Any]
) -> Optional[Any]:
for l in labels:
if getattr(l, "name", None) == label_name:
try:
is_ex = getattr(l, "exclusive", None)
if not bool(is_ex):
_patch_label_exclusive(gitea, repo_name, label_name, l)
except Exception:
return l
return l
chosen_color = _get_color(color)
try:
new = gitea.issue_api.issue_create_label(
gitea.org_name,
repo_name,
body={"name": label_name, "color": chosen_color, "exclusive": True},
)
logger.info(f"Created label '{label_name}' in {gitea.org_name}/{repo_name}")
return new
except ApiException as e:
logger.error(f"Failed to create label {label_name} in {repo_name}: {e}")
if hasattr(e, "body"):
logger.error(f"ApiException body: {getattr(e, 'body', None)}")
return None
def _extract_label_names(issue_obj: Any) -> List[str]:
try:
return [getattr(l, "name", l) for l in getattr(issue_obj, "labels", []) or []]
except Exception:
return []
class Gitea:
def __init__(
self,
@ -477,6 +640,44 @@ class Gitea:
self.org_name, repo_name, issue.number, body={"state": "closed"}
)
def close_issues(
self, repo_name: str, issue_numbers: List[int], dry_run: bool = False
) -> None:
if not issue_numbers:
logger.warning("No issue numbers provided to close")
return
if dry_run:
logger.info("Dry run enabled. No changes will be made to issues.")
try:
issues = {
issue.number: issue
for issue in list_all(
self.issue_api.issue_list_issues, self.org_name, repo_name
)
}
except ApiException as e:
logger.error(f"Failed to list issues for {repo_name}: {e}")
return
for num in issue_numbers:
issue = issues.get(num)
if issue is None:
logger.warning(f"Issue #{num} not found in {repo_name}")
continue
if getattr(issue, "state", "") == "closed":
logger.info(f"Issue #{num} in {repo_name} already closed")
continue
try:
if dry_run:
logger.info(f"Would close issue #{num} in {repo_name} (dry run)")
continue
self.issue_api.issue_edit_issue(
self.org_name, repo_name, num, body={"state": "closed"}
)
logger.info(f"Closed issue #{num} in {repo_name}")
except ApiException as e:
logger.error(f"Failed to close issue #{num} in {repo_name}: {e}")
def archive_repos(self, regex: str = ".+", dry_run: bool = True) -> None:
if dry_run:
logger.info("Dry run enabled. No changes will be made to the repositories.")
@ -494,7 +695,6 @@ class Gitea:
self.repository_api.user_current_delete_subscription(
self.org_name, repo.name
)
logger.info(f"Unwatched {repo.name}")
def get_all_teams(self) -> Dict[str, List[str]]:
res: Dict[str, List[str]] = {}
@ -549,6 +749,241 @@ class Gitea:
self.create_milestone(repo_name, milestone, description, due_date)
logger.info(f"Created milestone {milestone} in {repo_name}")
def label_issues(
self,
repo_name: str,
label_name: str,
issue_numbers: List[int],
color: Optional[str] = None,
) -> None:
if not issue_numbers:
logger.warning("No issue numbers provided to label")
return
labels = _get_repo_labels(self, repo_name)
if not labels:
logger.warning(f"No labels found for {repo_name}")
return
repo_label_name_to_obj: Dict[str, Any] = {}
for l in labels:
name = getattr(l, "name", None) or (
l.get("name") if isinstance(l, dict) else None
)
if isinstance(name, str):
repo_label_name_to_obj[name] = l
label_obj = _create_label(self, repo_name, label_name, color, labels)
if label_obj is None:
logger.error(f"Unable to ensure label '{label_name}' exists in {repo_name}")
return
label_id = _label_id_from_obj(label_obj)
if label_id is None:
logger.error(f"Unable to find id of label '{label_name}' in {repo_name}")
return
issues_map = _list_issues_map(self, repo_name)
if not issues_map:
return
for num in issue_numbers:
issue = issues_map.get(num)
if issue is None:
logger.warning(f"Issue #{num} not found in {repo_name}")
continue
existing_label_names = _extract_label_names(issue)
if label_name in existing_label_names:
logger.info(
f"Issue #{num} in {repo_name} already has label '{label_name}'"
)
continue
try:
current = self.issue_api.issue_get_issue(self.org_name, repo_name, num)
issue_labels = _extract_label_names(current)
except Exception:
issue_labels = existing_label_names
try:
target_obj = repo_label_name_to_obj.get(label_name) or label_obj
target_is_exclusive = bool(
getattr(target_obj, "exclusive", None)
or (
target_obj.get("exclusive")
if isinstance(target_obj, dict)
else False
)
)
except Exception:
target_is_exclusive = False
if target_is_exclusive:
for lname in issue_labels or []:
if lname == label_name:
continue
lobj = repo_label_name_to_obj.get(lname)
is_ex = False
if lobj is not None:
is_ex = bool(
getattr(lobj, "exclusive", None)
or (
lobj.get("exclusive")
if isinstance(lobj, dict)
else False
)
)
if is_ex:
lid = _label_id_from_obj(lobj) if lobj is not None else None
if lid is not None:
_delete_issue_label_by_id(self, repo_name, num, lid)
if label_name not in (issue_labels or []):
try:
resp = _api_post_labels(
self, repo_name, num, {"labels": [label_name]}
)
if getattr(resp, "status_code", None) not in (200, 201):
logger.error(
f"Failed to add label via add-labels endpoint for issue #{num}: status={getattr(resp, 'status_code', None)}"
)
except Exception as e:
logger.error(f"Failed to POST labels to issue #{num}: {e}")
# verification
try:
final = self.issue_api.issue_get_issue(self.org_name, repo_name, num)
final_labels = _extract_label_names(final)
except Exception:
final_labels = []
if label_name in (final_labels or []):
logger.info(
f"Label '{label_name}' attached to issue #{num} in {repo_name}"
)
else:
logger.warning(
f"Label '{label_name}' not attached to issue #{num} in {repo_name} after attempts"
)
def delete_label(
self,
repo_name: str,
label_name: str,
issue_numbers: Optional[List[int]] = None,
delete_repo_label: bool = False,
) -> None:
token = (
getattr(self.api_client.configuration, "api_key", {}).get("access_token")
or settings.gitea_access_token
)
headers_local = {"Authorization": f"token {token}"}
repo_labels: List[Any] = []
try:
repo_labels = list(
cast(
Iterable[Any],
self.issue_api.issue_list_labels(self.org_name, repo_name),
)
)
label_name_to_id: Dict[str, int] = {}
for l in repo_labels:
name = getattr(l, "name", None) or (
l.get("name") if isinstance(l, dict) else None
)
lid = getattr(l, "id", None) or (
l.get("id") if isinstance(l, dict) else None
)
if isinstance(name, str) and isinstance(lid, int):
label_name_to_id[name] = lid
except Exception:
label_name_to_id = {}
def _delete_issue_label(issue_num: int) -> None:
lid = label_name_to_id.get(label_name)
if lid is None:
try:
for l in repo_labels:
name = getattr(l, "name", None) or (
l.get("name") if isinstance(l, dict) else None
)
if name == label_name:
lid = getattr(l, "id", None) or (
l.get("id") if isinstance(l, dict) else None
)
break
except Exception:
logger.warning(
f"Could not determine id of label '{label_name}' in {repo_name}"
)
if lid is None:
logger.warning(
f"No numeric id found for label '{label_name}' in {repo_name}; skipping issue-level delete for issue #{issue_num}"
)
return
path_id = (
f"/repos/{self.org_name}/{repo_name}/issues/{issue_num}/labels/{lid}"
)
url_id = f"{self.api_client.configuration.host}{path_id}"
try:
resp = requests.delete(url_id, headers=headers_local, timeout=10)
if resp.status_code in (200, 204):
logger.info(
f"Removed label '{label_name}' from {repo_name}#{issue_num}"
)
return
logger.warning(
f"Numeric-id DELETE returned status {resp.status_code} for {repo_name}#{issue_num}: body={getattr(resp, 'text', None)}"
)
except Exception as e:
logger.error(
f"Numeric-id DELETE error for {repo_name}#{issue_num}: {e}"
)
def _delete_repo_label() -> None:
enc_name = quote(label_name, safe="")
path = f"/repos/{self.org_name}/{repo_name}/labels/{enc_name}"
url = f"{self.api_client.configuration.host}{path}"
try:
resp = requests.delete(url, headers=headers_local, timeout=10)
if resp.status_code in (200, 204):
logger.info(
f"Removed repo-level label '{label_name}' from {repo_name}"
)
return
logger.error(
f"Failed to delete repo label '{label_name}' from {repo_name}: status={resp.status_code}, body={getattr(resp, 'text', None)}"
)
except Exception as e:
logger.error(
f"Error deleting repo label '{label_name}' from {repo_name}: {e}"
)
if issue_numbers:
try:
issues = {
issue.number: issue
for issue in list_all(
self.issue_api.issue_list_issues, self.org_name, repo_name
)
}
except ApiException as e:
logger.error(f"Failed to list issues for {repo_name}: {e}")
return
for num in issue_numbers:
if num not in issues:
logger.warning(f"Issue #{num} not found in {repo_name}")
continue
_delete_issue_label(num)
else:
if delete_repo_label:
_delete_repo_label()
else:
logger.warning(
"No issue numbers provided and --repo not set; nothing to do for delete_label"
)
if __name__ == "__main__":
gitea = Gitea()

View File

@ -103,6 +103,109 @@ class Mattermost:
)
logger.info(f"Added member {member} to channel {channel_name}")
def update_channels_for_groups(
self,
groups: Dict[str, List[str]],
suffix: str = "",
update_teaching_team: bool = True,
dry_run: bool = False,
) -> None:
for group_name, members in groups.items():
channel_name = group_name + suffix
try:
channel = self.endpoint.channels.get_channel_by_name(
self.team["id"], channel_name
)
logger.info(f"Channel {channel_name} exists, updating members")
except Exception:
# channel does not exist
if dry_run:
info_members = list(members)
if update_teaching_team:
info_members = info_members + settings.mattermost_teaching_team
logger.info(
f"Dry run: would create channel {channel_name} and add members: {info_members}"
)
continue
try:
channel = self.endpoint.channels.create_channel(
{
"team_id": self.team["id"],
"name": channel_name,
"display_name": channel_name,
"type": "P",
}
)
logger.info(f"Created channel {channel_name} on Mattermost")
except Exception as e:
logger.warning(
f"Error when creating channel {channel_name}: {e} Perhaps channel already exists?"
)
continue
current_members = set()
try:
mm_members = (
self.endpoint.channels.get_channel_members(channel["id"]) or []
)
for m in mm_members:
uname = None
if isinstance(m, dict):
uname = m.get("username") or m.get("name")
if not uname and "user" in m and isinstance(m["user"], dict):
uname = m["user"].get("username") or m["user"].get("name")
if not uname and "user_id" in m:
try:
u = self.endpoint.users.get_user(m["user_id"]) or {}
if isinstance(u, dict):
uname = u.get("username") or u.get("name")
except Exception:
uname = None
if uname:
current_members.add(uname.lower())
except Exception:
current_members = set()
add_members = list(members)
if update_teaching_team:
add_members = add_members + settings.mattermost_teaching_team
for member in add_members:
if member.lower() in current_members:
logger.debug(f"Member {member} already in channel {channel_name}")
continue
if dry_run:
logger.info(
f"Dry run: would add member {member} to channel {channel_name}"
)
continue
try:
mmuser = self.endpoint.users.get_user_by_username(member)
except Exception:
logger.warning(
f"User {member} is not found on the Mattermost server"
)
self.endpoint.posts.create_post(
{
"channel_id": channel["id"],
"message": f"User {member} is not found on the Mattermost server",
}
)
continue
try:
self.endpoint.channels.add_user(
channel["id"], {"user_id": mmuser["id"]}
)
except Exception:
logger.warning(f"User {member} is not in the team")
self.endpoint.posts.create_post(
{
"channel_id": channel["id"],
"message": f"User {member} is not in the team",
}
)
logger.info(f"Added member {member} to channel {channel_name}")
def create_channels_for_individuals(
self,
students: PaginatedList,
@ -166,6 +269,110 @@ class Mattermost:
logger.info(f"Added member {member} to channel {channel_name}")
def update_channels_for_individuals(
self,
students: PaginatedList,
update_teaching_team: bool = True,
dry_run: bool = False,
) -> None:
for student in students:
display_name = student.name
channel_name = student.sis_id
try:
channel = self.endpoint.channels.get_channel_by_name(
self.team["id"], channel_name
)
logger.info(f"Channel {channel_name} exists, updating members")
except Exception:
if dry_run:
members_info = [student.login_id]
if update_teaching_team:
members_info = members_info + settings.mattermost_teaching_team
logger.info(
f"Dry run: would create channel {display_name} ({channel_name}) and add members: {members_info}"
)
continue
try:
channel = self.endpoint.channels.create_channel(
{
"team_id": self.team["id"],
"name": channel_name,
"display_name": display_name,
"type": "P",
}
)
logger.info(
f"Created channel {display_name} ({channel_name}) on Mattermost"
)
except Exception as e:
logger.warning(
f"Error when creating channel {channel_name}: {e} Perhaps channel already exists?"
)
continue
current_members = set()
try:
mm_members = (
self.endpoint.channels.get_channel_members(channel["id"]) or []
)
for m in mm_members:
uname = None
if isinstance(m, dict):
uname = m.get("username") or m.get("name")
if not uname and "user" in m and isinstance(m["user"], dict):
uname = m["user"].get("username") or m["user"].get("name")
if not uname and "user_id" in m:
try:
u = self.endpoint.users.get_user(m["user_id"]) or {}
if isinstance(u, dict):
uname = u.get("username") or u.get("name")
except Exception:
uname = None
if uname:
current_members.add(uname.lower())
except Exception:
current_members = set()
members = [student.login_id]
if update_teaching_team:
members = members + settings.mattermost_teaching_team
for member in members:
if member.lower() in current_members:
logger.debug(f"Member {member} already in channel {channel_name}")
continue
if dry_run:
logger.info(
f"Dry run: would add member {member} to channel {channel_name}"
)
continue
try:
mmuser = self.endpoint.users.get_user_by_username(member)
except Exception:
logger.warning(
f"User {member} is not found on the Mattermost server"
)
self.endpoint.posts.create_post(
{
"channel_id": channel["id"],
"message": f"User {member} is not found on the Mattermost server",
}
)
continue
try:
self.endpoint.channels.add_user(
channel["id"], {"user_id": mmuser["id"]}
)
except Exception:
logger.warning(f"User {member} is not in the team")
self.endpoint.posts.create_post(
{
"channel_id": channel["id"],
"message": f"User {member} is not in the team",
}
)
logger.info(f"Added member {member} to channel {channel_name}")
def create_webhooks_for_repos(
self, repos: List[str], gitea: Gitea, gitea_suffix: bool
) -> None: