Compare commits

..

12 Commits

10 changed files with 847 additions and 84 deletions

View File

@ -60,11 +60,24 @@ clone all gitea repos to local
close all issues and pull requests in gitea organization close all issues and pull requests in gitea organization
### `create-channels-on-mm` ### `create-group-channels-on-mm`
create channels for student groups according to group information on gitea. Optionally specify a prefix to ignore all repos whose names do not start with it. Optionally specify a suffix to add to all channels created. create Mattermost channels for student groups based on team information on Gitea
Example: `python3 -m joint_teapot create-channels-on-mm --prefix p1 --suffix -private --invite-teaching-team` will fetch all repos whose names start with `"p1"` and create channels on mm for these repos like "p1team1-private". Members of a repo will be added to the corresponding channel. And teaching team (adjust in `.env`) will be invited to the channels. **Options**:
- `--prefix TEXT`: Only process repositories starting with this prefix
- `--suffix TEXT`: Add suffix to created channels
- `--invite-teaching-team/--no-invite-teaching-team`: Whether to invite teaching team (default: invite)
Example: `joint-teapot create-group-channels-on-mm --prefix "hteam" --suffix "-gitea"` will Create channels for webhook integration. Members of "hteam*" repo will be added to the corresponding channel. And teaching team (adjust in `.env`) will be invited to the channels.
### `create-personal-channels-on-mm`
create personal Mattermost channels for every student
**Options**:
- `--invite-teaching-team/--no-invite-teaching-team`: Whether to invite teaching team (default: invite)
### `create-comment` ### `create-comment`
@ -125,6 +138,26 @@ Example: `python3 -m joint_teapot unsubscribe-from-repos '\d{12}$'` will remove
upload assignment grades to canvas from grade file (GRADE.txt by default), read the first line as grade, the rest as comments upload assignment grades to canvas from grade file (GRADE.txt by default), read the first line as grade, the rest as comments
### `label-issues`
add a label to specific issues in a repository, create labels if not exist, with dry-run disabled by default. You may adjust the color of the label with `--color "#******"` if the label doesn't exist.
### `delete-labels`
remove a label from specific issues or delete the repository labels, with dry-run disabled by default.
### `close-issues`
close one or more specific issues in a repository, with dry-run disabled by default.
### `update-group-channels-on-mm`
update group Mattermost channels for student groups based on team information on Gitea. It will only add missing users, never delete anyone.
### `update-personal-channels-on-mm`
update personal Mattermost channels for every student. It will only add missing users, never delete anyone.
## License ## License
MIT MIT

View File

@ -6,7 +6,7 @@ from pathlib import Path
from time import sleep from time import sleep
from typing import TYPE_CHECKING, List, Optional from typing import TYPE_CHECKING, List, Optional
from filelock import FileLock # from filelock import FileLock
from git import Repo from git import Repo
from typer import Argument, Exit, Option, Typer, echo from typer import Argument, Exit, Option, Typer, echo
@ -167,6 +167,51 @@ def close_all_issues() -> None:
tea.pot.gitea.close_all_issues() tea.pot.gitea.close_all_issues()
@app.command(
"close-issues",
help="close specific issues in a repository",
)
def close_issues(
repo_name: str,
issue_numbers: List[int] = Argument(..., help="One or more issue numbers to close"),
) -> None:
tea.pot.gitea.close_issues(repo_name, issue_numbers)
@app.command(
"label-issues",
help="add a label to specific issues in a repository",
)
def label_issues(
label_name: str,
repo_name: str,
issue_numbers: List[int] = Argument(..., help="One or more issue numbers to label"),
issue_color: Optional[str] = Option(
None, "--color", help="Color for newly created label (hex without # or with #)"
),
) -> None:
tea.pot.gitea.label_issues(repo_name, label_name, issue_numbers, color=issue_color)
@app.command(
"delete-label",
help="remove a label from specific issues or delete the repository label",
)
def delete_label(
label_name: str,
repo_name: str,
issue_numbers: List[int] = Argument(
None,
help="issue numbers to remove the label from",
),
) -> None:
tea.pot.gitea.delete_label(
repo_name,
label_name,
issue_numbers if issue_numbers else None,
)
@app.command( @app.command(
"archive-repos", help="archive repos in gitea organization according to regex" "archive-repos", help="archive repos in gitea organization according to regex"
) )
@ -243,6 +288,49 @@ def create_personal_channels_on_mm(
tea.pot.create_channels_for_individuals(invite_teaching_team) tea.pot.create_channels_for_individuals(invite_teaching_team)
@app.command(
"update-group-channels-on-mm",
help="update Mattermost channels for student groups based on team information on Gitea; only add missing members",
)
def update_group_channels_on_mm(
prefix: str = Option(
"", help="Only process repositories starting with this prefix"
),
suffix: str = Option("", help="Only process channels ending with this suffix"),
update_teaching_team: bool = Option(
True,
"--update-teaching-team/--no-update-teaching-team",
help="Whether to update teaching team",
),
) -> None:
groups = {
group_name: members
for group_name, members in tea.pot.gitea.get_all_teams().items()
if group_name.startswith(prefix)
}
logger.info(
f"{len(groups)} group channel(s) to update"
+ (f" with suffix {suffix}" if suffix else "")
)
tea.pot.mattermost.update_channels_for_groups(groups, suffix, update_teaching_team)
@app.command(
"update-personal-channels-on-mm",
help="update personal Mattermost channels for every student; only add missing members",
)
def update_personal_channels_on_mm(
update_teaching_team: bool = Option(
True,
"--update-teaching-team/--no-update-teaching-team",
help="Whether to update teaching team",
),
) -> None:
tea.pot.mattermost.update_channels_for_individuals(
tea.pot.canvas.students, update_teaching_team
)
@app.command( @app.command(
"create-webhooks-for-mm", "create-webhooks-for-mm",
help="create a pair of webhooks on gitea and mm for all student groups on gitea, " help="create a pair of webhooks on gitea and mm for all student groups on gitea, "
@ -349,7 +437,6 @@ def joj3_all_env(
submitter_repo_name = env.github_repository.split("/")[-1] submitter_repo_name = env.github_repository.split("/")[-1]
penalty_factor = joj3.get_penalty_factor(end_time, penalty_config) penalty_factor = joj3.get_penalty_factor(end_time, penalty_config)
total_score = joj3.get_total_score(env.joj3_output_path) total_score = joj3.get_total_score(env.joj3_output_path)
total_score = round(total_score - abs(total_score) * (1 - penalty_factor))
res = { res = {
"totalScore": total_score, "totalScore": total_score,
"cappedTotalScore": ( "cappedTotalScore": (
@ -389,12 +476,13 @@ def joj3_all_env(
lock_file_path = os.path.join( lock_file_path = os.path.join(
settings.repos_dir, grading_repo_name, settings.joj3_lock_file_path settings.repos_dir, grading_repo_name, settings.joj3_lock_file_path
) )
logger.debug( logger.info(
f"try to acquire lock, file path: {lock_file_path}, " f"try to acquire lock, file path: {lock_file_path}, "
+ f"timeout: {settings.joj3_lock_file_timeout}" + f"timeout: {settings.joj3_lock_file_timeout}"
) )
with FileLock(lock_file_path, timeout=settings.joj3_lock_file_timeout).acquire(): if True: # disable the file lock temporarily
logger.debug("file lock acquired") # with FileLock(lock_file_path, timeout=settings.joj3_lock_file_timeout).acquire():
logger.info("file lock acquired")
retry_interval = 1 retry_interval = 1
git_push_ok = False git_push_ok = False
while not git_push_ok: while not git_push_ok:
@ -424,9 +512,7 @@ def joj3_all_env(
os.path.join(repo_path, scoreboard_filename), os.path.join(repo_path, scoreboard_filename),
exercise_name, exercise_name,
submitter_repo_name, submitter_repo_name,
total_score,
) )
failed_stage = joj3.get_failed_stage_from_file(env.joj3_output_path)
tea.pot.git.add_commit( tea.pot.git.add_commit(
grading_repo_name, grading_repo_name,
[scoreboard_filename], [scoreboard_filename],
@ -436,7 +522,6 @@ def joj3_all_env(
f"gitea actions link: {gitea_actions_url}\n" f"gitea actions link: {gitea_actions_url}\n"
f"gitea issue link: {gitea_issue_url}\n" f"gitea issue link: {gitea_issue_url}\n"
f"groups: {env.joj3_groups}\n" f"groups: {env.joj3_groups}\n"
f"failed stage: {failed_stage}\n"
), ),
) )
if not skip_failed_table: if not skip_failed_table:
@ -505,9 +590,6 @@ def joj3_check_env(
"Example: --penalty-config 24=0.75,48=0.5" "Example: --penalty-config 24=0.75,48=0.5"
), ),
), ),
ignore_submitter: bool = Option(
False, help="ignore submitter when checking submission count"
),
) -> None: ) -> None:
app.pretty_exceptions_enable = False app.pretty_exceptions_enable = False
set_settings(Settings(_env_file=env_path)) set_settings(Settings(_env_file=env_path))
@ -526,7 +608,7 @@ def joj3_check_env(
penalty_config, penalty_config,
) )
count_msg, count_failed = tea.pot.joj3_check_submission_count( count_msg, count_failed = tea.pot.joj3_check_submission_count(
env, grading_repo_name, group_config, scoreboard_filename, ignore_submitter env, grading_repo_name, group_config, scoreboard_filename
) )
echo( echo(
json.dumps( json.dumps(
@ -539,16 +621,6 @@ def joj3_check_env(
logger.info("joj3-check-env done") logger.info("joj3-check-env done")
@app.command("joj3-check-gitea-token")
def joj3_check_gitea_token(
env_path: str = Argument("", help="path to .env file")
) -> None:
app.pretty_exceptions_enable = False
set_settings(Settings(_env_file=env_path))
set_logger(settings.stderr_log_level)
tea.pot.gitea.organization_api.org_list_repos(settings.gitea_org_name)
if __name__ == "__main__": if __name__ == "__main__":
try: try:
app() app()

View File

@ -40,7 +40,7 @@ class Settings(BaseSettings):
joj_sid: str = "" joj_sid: str = ""
# joj3 # joj3
joj3_lock_file_path: str = ".git/teapot-joj3-all-env.lock" joj3_lock_file_path: str = ".git/teapot.lock"
joj3_lock_file_timeout: int = 30 joj3_lock_file_timeout: int = 30
# moss # moss

View File

@ -266,6 +266,7 @@ class Teapot:
): ):
if issue.title.startswith(title_prefix): if issue.title.startswith(title_prefix):
joj3_issue = issue joj3_issue = issue
logger.info(f"found joj3 issue: #{joj3_issue.number}")
break break
else: else:
new_issue = True new_issue = True
@ -292,7 +293,9 @@ class Teapot:
submitter_repo_name, submitter_repo_name,
body={"title": title, "body": comment, "labels": [label_id]}, body={"title": title, "body": comment, "labels": [label_id]},
) )
logger.info(f"created joj3 issue: #{joj3_issue.number}")
gitea_issue_url = joj3_issue.html_url gitea_issue_url = joj3_issue.html_url
logger.info(f"gitea issue url: {gitea_issue_url}")
if not new_issue: if not new_issue:
self.gitea.issue_api.issue_edit_issue( self.gitea.issue_api.issue_edit_issue(
self.gitea.org_name, self.gitea.org_name,
@ -356,9 +359,7 @@ class Teapot:
grading_repo_name: str, grading_repo_name: str,
group_config: str, group_config: str,
scoreboard_filename: str, scoreboard_filename: str,
ignore_submitter: bool,
) -> Tuple[str, bool]: ) -> Tuple[str, bool]:
submitter = env.github_actor
submitter_repo_name = env.github_repository.split("/")[-1] submitter_repo_name = env.github_repository.split("/")[-1]
repo: Repo = self.git.get_repo(grading_repo_name) repo: Repo = self.git.get_repo(grading_repo_name)
now = datetime.now(timezone.utc) now = datetime.now(timezone.utc)
@ -385,13 +386,11 @@ class Teapot:
time_windows.append(since) time_windows.append(since)
valid_items.append((name, max_count, time_period, since)) valid_items.append((name, max_count, time_period, since))
logger.info(f"valid items: {valid_items}, time windows: {time_windows}") logger.info(f"valid items: {valid_items}, time windows: {time_windows}")
matched_commits = [] all_commits = []
all_commits_length = 0
if time_windows: if time_windows:
earliest_since = min(time_windows).strftime("%Y-%m-%dT%H:%M:%S") earliest_since = min(time_windows).strftime("%Y-%m-%dT%H:%M:%S")
commits = repo.iter_commits(paths=scoreboard_filename, since=earliest_since) commits = repo.iter_commits(paths=scoreboard_filename, since=earliest_since)
for commit in commits: for commit in commits:
all_commits_length += 1
lines = commit.message.strip().splitlines() lines = commit.message.strip().splitlines()
if not lines: if not lines:
continue continue
@ -401,28 +400,25 @@ class Teapot:
d = match.groupdict() d = match.groupdict()
if ( if (
env.joj3_conf_name != d["exercise_name"] env.joj3_conf_name != d["exercise_name"]
or env.github_actor != d["submitter"]
or submitter_repo_name != d["submitter_repo_name"] or submitter_repo_name != d["submitter_repo_name"]
): ):
continue continue
if not ignore_submitter and submitter != d["submitter"]:
continue
groups_line = next((l for l in lines if l.startswith("groups: ")), None) groups_line = next((l for l in lines if l.startswith("groups: ")), None)
commit_groups = ( commit_groups = (
groups_line[len("groups: ") :].split(",") if groups_line else [] groups_line[len("groups: ") :].split(",") if groups_line else []
) )
matched_commits.append( all_commits.append(
{ {
"time": commit.committed_datetime, "time": commit.committed_datetime,
"groups": [g.strip() for g in commit_groups], "groups": [g.strip() for g in commit_groups],
} }
) )
logger.info( logger.info(f"all commits length: {len(all_commits)}")
f"matched commits length: {len(matched_commits)}, all commits length: {all_commits_length}"
)
for name, max_count, time_period, since in valid_items: for name, max_count, time_period, since in valid_items:
submit_count = 0 submit_count = 0
time_limit = now - timedelta(hours=time_period) time_limit = now - timedelta(hours=time_period)
for commit in matched_commits: for commit in all_commits:
if commit["time"] < time_limit: if commit["time"] < time_limit:
continue continue
if name: if name:
@ -432,7 +428,7 @@ class Teapot:
continue continue
submit_count += 1 submit_count += 1
logger.info( logger.info(
f"submitter {submitter} is submitting for the {submit_count + 1} time, " f"submitter {env.github_actor} is submitting for the {submit_count + 1} time, "
f"{min(0, max_count - submit_count - 1)} time(s) remaining, " f"{min(0, max_count - submit_count - 1)} time(s) remaining, "
f"group={name}, " f"group={name}, "
f"time period={time_period} hour(s), " f"time period={time_period} hour(s), "

View File

@ -7,7 +7,6 @@ from typing import Any, Dict, List, Optional, Tuple
from pydantic_settings import BaseSettings from pydantic_settings import BaseSettings
from joint_teapot.config import settings
from joint_teapot.utils.logger import logger from joint_teapot.utils.logger import logger
@ -42,7 +41,6 @@ def generate_scoreboard(
scoreboard_file_path: str, scoreboard_file_path: str,
exercise_name: str, exercise_name: str,
submitter_repo_name: str, submitter_repo_name: str,
exercise_total_score: int,
) -> None: ) -> None:
if not scoreboard_file_path.endswith(".csv"): if not scoreboard_file_path.endswith(".csv"):
logger.error( logger.error(
@ -101,6 +99,11 @@ def generate_scoreboard(
for row in data: for row in data:
row.insert(index, "") row.insert(index, "")
exercise_total_score = 0
for stage in stages:
for result in stage["results"]:
exercise_total_score += result["score"]
exercise_total_score = exercise_total_score
submitter_row[columns.index(exercise_name)] = str(exercise_total_score) submitter_row[columns.index(exercise_name)] = str(exercise_total_score)
total = 0 total = 0
@ -143,18 +146,6 @@ def get_failed_table_from_file(table_file_path: str) -> List[List[str]]:
return data return data
def get_failed_stage_from_file(score_file_path: str) -> str:
with open(score_file_path) as json_file:
stages: List[Dict[str, Any]] = json.load(json_file)
failed_stage = ""
for stage in stages:
if stage["force_quit"] == True:
failed_stage = stage["name"]
break
return failed_stage
def update_failed_table_from_score_file( def update_failed_table_from_score_file(
data: List[List[str]], data: List[List[str]],
score_file_path: str, score_file_path: str,
@ -162,23 +153,31 @@ def update_failed_table_from_score_file(
repo_link: str, repo_link: str,
action_link: str, action_link: str,
) -> None: ) -> None:
failed_stage = get_failed_stage_from_file(score_file_path) # get info from score file
with open(score_file_path) as json_file:
stages: List[Dict[str, Any]] = json.load(json_file)
failed_name = ""
for stage in stages:
if stage["force_quit"] == True:
failed_name = stage["name"]
break
# append to failed table # append to failed table
now = datetime.now().strftime("%Y-%m-%d %H:%M") now = datetime.now().strftime("%Y-%m-%d %H:%M")
repo = f"[{repo_name}]({repo_link})" repo = f"[{repo_name}]({repo_link})"
failure = f"[{failed_stage}]({action_link})" failure = f"[{failed_name}]({action_link})"
row_found = False row_found = False
for i, row in enumerate(data[:]): for i, row in enumerate(data[:]):
if row[1] == repo: if row[1] == repo:
row_found = True row_found = True
if failed_stage == "": if failed_name == "":
data.remove(row) data.remove(row)
else: else:
data[i][0] = now data[i][0] = now
data[i][2] = failure data[i][2] = failure
break break
if not row_found and failed_stage != "": if not row_found and failed_name != "":
data.append([now, repo, failure]) data.append([now, repo, failure])
@ -244,7 +243,7 @@ def generate_title_and_comment(
f"Generated at {now} from [Gitea Actions #{run_number}]({action_link}), " f"Generated at {now} from [Gitea Actions #{run_number}]({action_link}), "
f"commit {commit_hash}, " f"commit {commit_hash}, "
f"triggered by @{submitter}, " f"triggered by @{submitter}, "
f"run ID [`{run_id}`](https://focs.ji.sjtu.edu.cn/joj-mon/d/{settings.gitea_org_name}?var-Filters=RunID%7C%3D%7C{run_id}).\n" f"run ID `{run_id}`.\n"
"Powered by [JOJ3](https://github.com/joint-online-judge/JOJ3) and " "Powered by [JOJ3](https://github.com/joint-online-judge/JOJ3) and "
"[Joint-Teapot](https://github.com/BoYanZh/Joint-Teapot) with ❤️.\n" "[Joint-Teapot](https://github.com/BoYanZh/Joint-Teapot) with ❤️.\n"
) )

View File

@ -38,7 +38,10 @@ def set_logger(
) -> None: ) -> None:
logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True) logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)
logger.remove() logger.remove()
logger.add(stderr, level=stderr_log_level, colorize=stderr.isatty()) logger.add(
stderr,
level=stderr_log_level,
)
logger.add(settings.log_file_path, level="DEBUG") logger.add(settings.log_file_path, level="DEBUG")

View File

@ -39,8 +39,19 @@ class Canvas:
student.name = ( student.name = (
re.sub(r"[^\x00-\x7F]+", "", student.name).strip().title() re.sub(r"[^\x00-\x7F]+", "", student.name).strip().title()
) # We only care english name ) # We only care english name
student.sis_id = student.login_id # Some users (like system users, announcers) might not have login_id
student.login_id = student.email.split("@")[0] if hasattr(student, "login_id") and student.login_id:
student.sis_id = student.login_id
student.login_id = student.email.split("@")[0]
else:
# For users without login_id, use email prefix as both sis_id and login_id
if hasattr(student, "email") and student.email:
student.login_id = student.email.split("@")[0]
student.sis_id = student.login_id
else:
# Fallback for users without email
student.login_id = f"user_{student.id}"
student.sis_id = student.login_id
return student return student
self.students = [ self.students = [

View File

@ -90,23 +90,28 @@ class Git:
retry_interval = 2 retry_interval = 2
while retry_interval and auto_retry: while retry_interval and auto_retry:
try: try:
current_branch = ""
if repo.head.is_detached:
current_branch = repo.head.commit.hexsha
else:
current_branch = repo.active_branch.name
if clean_git_lock: if clean_git_lock:
locks_removed_count = 0 lock_files = [
for root, _, files in os.walk(os.path.join(repo_dir, ".git")): "index.lock",
for filename in files: "HEAD.lock",
if filename.endswith(".lock"): "fetch-pack.lock",
lock_file_path = os.path.join(root, filename) "logs/HEAD.lock",
if ( "packed-refs.lock",
os.path.relpath(lock_file_path, repo_dir) "config.lock",
== settings.joj3_lock_file_path f"refs/heads/{current_branch}.lock",
): f"refs/remotes/origin/{current_branch}.lock",
continue f"refs/heads/{checkout_dest}.lock",
try: f"refs/remotes/origin/{checkout_dest}.lock",
os.remove(lock_file_path) ]
locks_removed_count += 1 for lock_file in lock_files:
except OSError as e: lock_path = os.path.join(repo_dir, ".git", lock_file)
logger.warning(f"error removing lock file: {e}") if os.path.exists(lock_path):
logger.info(f"removed {locks_removed_count} lock files") os.remove(lock_path)
repo.git.fetch("--tags", "--all", "-f") repo.git.fetch("--tags", "--all", "-f")
repo.git.reset("--hard", reset_target) repo.git.reset("--hard", reset_target)
repo.git.clean("-d", "-f", "-x") repo.git.clean("-d", "-f", "-x")
@ -143,7 +148,9 @@ class Git:
try: try:
repo.index.add(file) repo.index.add(file)
except OSError: except OSError:
logger.warning(f'file path "{file}" does not exist, skipped') logger.warning(
f'File path "{file}" does not exist. Skipping this file.'
)
continue continue
if repo.is_dirty(untracked_files=True) or repo.index.diff(None): if repo.is_dirty(untracked_files=True) or repo.index.diff(None):
repo.index.commit(commit_message) repo.index.commit(commit_message)

View File

@ -1,9 +1,11 @@
import re import re
from enum import Enum from enum import Enum
from functools import lru_cache from functools import lru_cache
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, TypeVar from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, cast
from urllib.parse import quote
import focs_gitea import focs_gitea
import requests # type: ignore
from canvasapi.group import Group, GroupMembership from canvasapi.group import Group, GroupMembership
from canvasapi.paginated_list import PaginatedList from canvasapi.paginated_list import PaginatedList
from canvasapi.user import User from canvasapi.user import User
@ -20,11 +22,13 @@ class PermissionEnum(Enum):
admin = "admin" admin = "admin"
T = TypeVar("T") def list_all(method: Callable[..., Any], *args: Any, **kwargs: Any) -> List[Any]:
"""Call paginated API methods repeatedly and collect results.
The exact return element types vary depending on the API client. We use
def list_all(method: Callable[..., Iterable[T]], *args: Any, **kwargs: Any) -> List[T]: ``Any`` here to avoid over-constraining typing for the external client.
all_res = [] """
all_res: List[Any] = []
page = 1 page = 1
while True: while True:
res = method(*args, **kwargs, page=page) res = method(*args, **kwargs, page=page)
@ -36,6 +40,165 @@ def list_all(method: Callable[..., Iterable[T]], *args: Any, **kwargs: Any) -> L
return all_res return all_res
def _get_color(c: Optional[str]) -> str:
if not c:
return "CCCCCC"
s = c.strip()
if s.startswith("#"):
s = s[1:]
if re.fullmatch(r"[0-9a-fA-F]{6}", s):
return s
logger.warning(f"Provided color '{c}' is not a valid hex; falling back to #CCCCCC")
return "CCCCCC"
def _label_id_from_obj(obj: Any) -> Optional[int]:
try:
if isinstance(obj, dict):
return obj.get("id")
return getattr(obj, "id", None)
except Exception:
return None
def _get_repo_labels(gitea: Any, repo_name: str) -> List[Any]:
try:
return list(
cast(
Iterable[Any],
gitea.issue_api.issue_list_labels(gitea.org_name, repo_name),
)
)
except ApiException as e:
logger.error(f"Failed to list labels for {repo_name}: {e}")
return []
def _list_issues_map(gitea: Any, repo_name: str) -> Dict[int, Any]:
try:
return {
issue.number: issue
for issue in list_all(
gitea.issue_api.issue_list_issues, gitea.org_name, repo_name
)
}
except ApiException as e:
logger.error(f"Failed to list issues for {repo_name}: {e}")
return {}
def _api_post_labels(gitea: Any, repo_name: str, issue_num: int, payload: Any) -> Any:
path = f"/repos/{gitea.org_name}/{repo_name}/issues/{issue_num}/labels"
full_url = f"{gitea.api_client.configuration.host}{path}"
token = (
getattr(gitea.api_client.configuration, "api_key", {}).get("access_token")
or settings.gitea_access_token
)
headers_local = {
"Authorization": f"token {token}",
"Content-Type": "application/json",
}
return requests.post(full_url, headers=headers_local, json=payload, timeout=10)
def _patch_label_exclusive(
gitea: Any, repo_name: str, name: str, label_obj: Any
) -> None:
try:
existing_color = getattr(label_obj, "color", None) or (
label_obj.get("color") if isinstance(label_obj, dict) else None
)
if (
existing_color
and isinstance(existing_color, str)
and existing_color.startswith("#")
):
existing_color = existing_color[1:]
enc_name = quote(name, safe="")
path = f"/repos/{gitea.org_name}/{repo_name}/labels/{enc_name}"
url = f"{gitea.api_client.configuration.host}{path}"
token = (
getattr(gitea.api_client.configuration, "api_key", {}).get("access_token")
or settings.gitea_access_token
)
headers_local = {
"Authorization": f"token {token}",
"Content-Type": "application/json",
}
payload = {"exclusive": True, "name": name}
if existing_color:
payload["color"] = existing_color
resp = requests.patch(url, headers=headers_local, json=payload, timeout=10)
if resp.status_code in (200, 201):
logger.info(f"Marked existing label '{name}' as exclusive in {repo_name}")
else:
logger.warning(
f"Failed to mark existing label '{name}' as exclusive: status={resp.status_code}"
)
except Exception as e:
logger.debug(f"Could not patch label exclusive for {name}: {e}")
def _delete_issue_label_by_id(
gitea: Any, repo_name: str, issue_num: int, lid: int
) -> None:
try:
path_id = f"/repos/{gitea.org_name}/{repo_name}/issues/{issue_num}/labels/{lid}"
url_id = f"{gitea.api_client.configuration.host}{path_id}"
token = (
getattr(gitea.api_client.configuration, "api_key", {}).get("access_token")
or settings.gitea_access_token
)
headers_local = {"Authorization": f"token {token}"}
resp = requests.delete(url_id, headers=headers_local, timeout=10)
if resp.status_code in (200, 204):
logger.info(f"Removed label id {lid} from {repo_name}#{issue_num}")
else:
logger.warning(
f"Failed to remove label id {lid} from {repo_name}#{issue_num}: status={resp.status_code}"
)
except Exception as e:
logger.warning(
f"Error removing label id {lid} from {repo_name}#{issue_num}: {e}"
)
def _create_label(
gitea: Any, repo_name: str, label_name: str, color: Optional[str], labels: List[Any]
) -> Optional[Any]:
for l in labels:
if getattr(l, "name", None) == label_name:
try:
is_ex = getattr(l, "exclusive", None)
if not bool(is_ex):
_patch_label_exclusive(gitea, repo_name, label_name, l)
except Exception:
return l
return l
chosen_color = _get_color(color)
try:
new = gitea.issue_api.issue_create_label(
gitea.org_name,
repo_name,
body={"name": label_name, "color": chosen_color, "exclusive": True},
)
logger.info(f"Created label '{label_name}' in {gitea.org_name}/{repo_name}")
return new
except ApiException as e:
logger.error(f"Failed to create label {label_name} in {repo_name}: {e}")
if hasattr(e, "body"):
logger.error(f"ApiException body: {getattr(e, 'body', None)}")
return None
def _extract_label_names(issue_obj: Any) -> List[str]:
try:
return [getattr(l, "name", l) for l in getattr(issue_obj, "labels", []) or []]
except Exception:
return []
class Gitea: class Gitea:
def __init__( def __init__(
self, self,
@ -477,6 +640,44 @@ class Gitea:
self.org_name, repo_name, issue.number, body={"state": "closed"} self.org_name, repo_name, issue.number, body={"state": "closed"}
) )
def close_issues(
self, repo_name: str, issue_numbers: List[int], dry_run: bool = False
) -> None:
if not issue_numbers:
logger.warning("No issue numbers provided to close")
return
if dry_run:
logger.info("Dry run enabled. No changes will be made to issues.")
try:
issues = {
issue.number: issue
for issue in list_all(
self.issue_api.issue_list_issues, self.org_name, repo_name
)
}
except ApiException as e:
logger.error(f"Failed to list issues for {repo_name}: {e}")
return
for num in issue_numbers:
issue = issues.get(num)
if issue is None:
logger.warning(f"Issue #{num} not found in {repo_name}")
continue
if getattr(issue, "state", "") == "closed":
logger.info(f"Issue #{num} in {repo_name} already closed")
continue
try:
if dry_run:
logger.info(f"Would close issue #{num} in {repo_name} (dry run)")
continue
self.issue_api.issue_edit_issue(
self.org_name, repo_name, num, body={"state": "closed"}
)
logger.info(f"Closed issue #{num} in {repo_name}")
except ApiException as e:
logger.error(f"Failed to close issue #{num} in {repo_name}: {e}")
def archive_repos(self, regex: str = ".+", dry_run: bool = True) -> None: def archive_repos(self, regex: str = ".+", dry_run: bool = True) -> None:
if dry_run: if dry_run:
logger.info("Dry run enabled. No changes will be made to the repositories.") logger.info("Dry run enabled. No changes will be made to the repositories.")
@ -494,7 +695,6 @@ class Gitea:
self.repository_api.user_current_delete_subscription( self.repository_api.user_current_delete_subscription(
self.org_name, repo.name self.org_name, repo.name
) )
logger.info(f"Unwatched {repo.name}")
def get_all_teams(self) -> Dict[str, List[str]]: def get_all_teams(self) -> Dict[str, List[str]]:
res: Dict[str, List[str]] = {} res: Dict[str, List[str]] = {}
@ -549,6 +749,241 @@ class Gitea:
self.create_milestone(repo_name, milestone, description, due_date) self.create_milestone(repo_name, milestone, description, due_date)
logger.info(f"Created milestone {milestone} in {repo_name}") logger.info(f"Created milestone {milestone} in {repo_name}")
def label_issues(
self,
repo_name: str,
label_name: str,
issue_numbers: List[int],
color: Optional[str] = None,
) -> None:
if not issue_numbers:
logger.warning("No issue numbers provided to label")
return
labels = _get_repo_labels(self, repo_name)
if not labels:
logger.warning(f"No labels found for {repo_name}")
return
repo_label_name_to_obj: Dict[str, Any] = {}
for l in labels:
name = getattr(l, "name", None) or (
l.get("name") if isinstance(l, dict) else None
)
if isinstance(name, str):
repo_label_name_to_obj[name] = l
label_obj = _create_label(self, repo_name, label_name, color, labels)
if label_obj is None:
logger.error(f"Unable to ensure label '{label_name}' exists in {repo_name}")
return
label_id = _label_id_from_obj(label_obj)
if label_id is None:
logger.error(f"Unable to find id of label '{label_name}' in {repo_name}")
return
issues_map = _list_issues_map(self, repo_name)
if not issues_map:
return
for num in issue_numbers:
issue = issues_map.get(num)
if issue is None:
logger.warning(f"Issue #{num} not found in {repo_name}")
continue
existing_label_names = _extract_label_names(issue)
if label_name in existing_label_names:
logger.info(
f"Issue #{num} in {repo_name} already has label '{label_name}'"
)
continue
try:
current = self.issue_api.issue_get_issue(self.org_name, repo_name, num)
issue_labels = _extract_label_names(current)
except Exception:
issue_labels = existing_label_names
try:
target_obj = repo_label_name_to_obj.get(label_name) or label_obj
target_is_exclusive = bool(
getattr(target_obj, "exclusive", None)
or (
target_obj.get("exclusive")
if isinstance(target_obj, dict)
else False
)
)
except Exception:
target_is_exclusive = False
if target_is_exclusive:
for lname in issue_labels or []:
if lname == label_name:
continue
lobj = repo_label_name_to_obj.get(lname)
is_ex = False
if lobj is not None:
is_ex = bool(
getattr(lobj, "exclusive", None)
or (
lobj.get("exclusive")
if isinstance(lobj, dict)
else False
)
)
if is_ex:
lid = _label_id_from_obj(lobj) if lobj is not None else None
if lid is not None:
_delete_issue_label_by_id(self, repo_name, num, lid)
if label_name not in (issue_labels or []):
try:
resp = _api_post_labels(
self, repo_name, num, {"labels": [label_name]}
)
if getattr(resp, "status_code", None) not in (200, 201):
logger.error(
f"Failed to add label via add-labels endpoint for issue #{num}: status={getattr(resp, 'status_code', None)}"
)
except Exception as e:
logger.error(f"Failed to POST labels to issue #{num}: {e}")
# verification
try:
final = self.issue_api.issue_get_issue(self.org_name, repo_name, num)
final_labels = _extract_label_names(final)
except Exception:
final_labels = []
if label_name in (final_labels or []):
logger.info(
f"Label '{label_name}' attached to issue #{num} in {repo_name}"
)
else:
logger.warning(
f"Label '{label_name}' not attached to issue #{num} in {repo_name} after attempts"
)
def delete_label(
self,
repo_name: str,
label_name: str,
issue_numbers: Optional[List[int]] = None,
delete_repo_label: bool = False,
) -> None:
token = (
getattr(self.api_client.configuration, "api_key", {}).get("access_token")
or settings.gitea_access_token
)
headers_local = {"Authorization": f"token {token}"}
repo_labels: List[Any] = []
try:
repo_labels = list(
cast(
Iterable[Any],
self.issue_api.issue_list_labels(self.org_name, repo_name),
)
)
label_name_to_id: Dict[str, int] = {}
for l in repo_labels:
name = getattr(l, "name", None) or (
l.get("name") if isinstance(l, dict) else None
)
lid = getattr(l, "id", None) or (
l.get("id") if isinstance(l, dict) else None
)
if isinstance(name, str) and isinstance(lid, int):
label_name_to_id[name] = lid
except Exception:
label_name_to_id = {}
def _delete_issue_label(issue_num: int) -> None:
lid = label_name_to_id.get(label_name)
if lid is None:
try:
for l in repo_labels:
name = getattr(l, "name", None) or (
l.get("name") if isinstance(l, dict) else None
)
if name == label_name:
lid = getattr(l, "id", None) or (
l.get("id") if isinstance(l, dict) else None
)
break
except Exception:
logger.warning(
f"Could not determine id of label '{label_name}' in {repo_name}"
)
if lid is None:
logger.warning(
f"No numeric id found for label '{label_name}' in {repo_name}; skipping issue-level delete for issue #{issue_num}"
)
return
path_id = (
f"/repos/{self.org_name}/{repo_name}/issues/{issue_num}/labels/{lid}"
)
url_id = f"{self.api_client.configuration.host}{path_id}"
try:
resp = requests.delete(url_id, headers=headers_local, timeout=10)
if resp.status_code in (200, 204):
logger.info(
f"Removed label '{label_name}' from {repo_name}#{issue_num}"
)
return
logger.warning(
f"Numeric-id DELETE returned status {resp.status_code} for {repo_name}#{issue_num}: body={getattr(resp, 'text', None)}"
)
except Exception as e:
logger.error(
f"Numeric-id DELETE error for {repo_name}#{issue_num}: {e}"
)
def _delete_repo_label() -> None:
enc_name = quote(label_name, safe="")
path = f"/repos/{self.org_name}/{repo_name}/labels/{enc_name}"
url = f"{self.api_client.configuration.host}{path}"
try:
resp = requests.delete(url, headers=headers_local, timeout=10)
if resp.status_code in (200, 204):
logger.info(
f"Removed repo-level label '{label_name}' from {repo_name}"
)
return
logger.error(
f"Failed to delete repo label '{label_name}' from {repo_name}: status={resp.status_code}, body={getattr(resp, 'text', None)}"
)
except Exception as e:
logger.error(
f"Error deleting repo label '{label_name}' from {repo_name}: {e}"
)
if issue_numbers:
try:
issues = {
issue.number: issue
for issue in list_all(
self.issue_api.issue_list_issues, self.org_name, repo_name
)
}
except ApiException as e:
logger.error(f"Failed to list issues for {repo_name}: {e}")
return
for num in issue_numbers:
if num not in issues:
logger.warning(f"Issue #{num} not found in {repo_name}")
continue
_delete_issue_label(num)
else:
if delete_repo_label:
_delete_repo_label()
else:
logger.warning(
"No issue numbers provided and --repo not set; nothing to do for delete_label"
)
if __name__ == "__main__": if __name__ == "__main__":
gitea = Gitea() gitea = Gitea()

View File

@ -103,6 +103,109 @@ class Mattermost:
) )
logger.info(f"Added member {member} to channel {channel_name}") logger.info(f"Added member {member} to channel {channel_name}")
def update_channels_for_groups(
self,
groups: Dict[str, List[str]],
suffix: str = "",
update_teaching_team: bool = True,
dry_run: bool = False,
) -> None:
for group_name, members in groups.items():
channel_name = group_name + suffix
try:
channel = self.endpoint.channels.get_channel_by_name(
self.team["id"], channel_name
)
logger.info(f"Channel {channel_name} exists, updating members")
except Exception:
# channel does not exist
if dry_run:
info_members = list(members)
if update_teaching_team:
info_members = info_members + settings.mattermost_teaching_team
logger.info(
f"Dry run: would create channel {channel_name} and add members: {info_members}"
)
continue
try:
channel = self.endpoint.channels.create_channel(
{
"team_id": self.team["id"],
"name": channel_name,
"display_name": channel_name,
"type": "P",
}
)
logger.info(f"Created channel {channel_name} on Mattermost")
except Exception as e:
logger.warning(
f"Error when creating channel {channel_name}: {e} Perhaps channel already exists?"
)
continue
current_members = set()
try:
mm_members = (
self.endpoint.channels.get_channel_members(channel["id"]) or []
)
for m in mm_members:
uname = None
if isinstance(m, dict):
uname = m.get("username") or m.get("name")
if not uname and "user" in m and isinstance(m["user"], dict):
uname = m["user"].get("username") or m["user"].get("name")
if not uname and "user_id" in m:
try:
u = self.endpoint.users.get_user(m["user_id"]) or {}
if isinstance(u, dict):
uname = u.get("username") or u.get("name")
except Exception:
uname = None
if uname:
current_members.add(uname.lower())
except Exception:
current_members = set()
add_members = list(members)
if update_teaching_team:
add_members = add_members + settings.mattermost_teaching_team
for member in add_members:
if member.lower() in current_members:
logger.debug(f"Member {member} already in channel {channel_name}")
continue
if dry_run:
logger.info(
f"Dry run: would add member {member} to channel {channel_name}"
)
continue
try:
mmuser = self.endpoint.users.get_user_by_username(member)
except Exception:
logger.warning(
f"User {member} is not found on the Mattermost server"
)
self.endpoint.posts.create_post(
{
"channel_id": channel["id"],
"message": f"User {member} is not found on the Mattermost server",
}
)
continue
try:
self.endpoint.channels.add_user(
channel["id"], {"user_id": mmuser["id"]}
)
except Exception:
logger.warning(f"User {member} is not in the team")
self.endpoint.posts.create_post(
{
"channel_id": channel["id"],
"message": f"User {member} is not in the team",
}
)
logger.info(f"Added member {member} to channel {channel_name}")
def create_channels_for_individuals( def create_channels_for_individuals(
self, self,
students: PaginatedList, students: PaginatedList,
@ -166,6 +269,110 @@ class Mattermost:
logger.info(f"Added member {member} to channel {channel_name}") logger.info(f"Added member {member} to channel {channel_name}")
def update_channels_for_individuals(
self,
students: PaginatedList,
update_teaching_team: bool = True,
dry_run: bool = False,
) -> None:
for student in students:
display_name = student.name
channel_name = student.sis_id
try:
channel = self.endpoint.channels.get_channel_by_name(
self.team["id"], channel_name
)
logger.info(f"Channel {channel_name} exists, updating members")
except Exception:
if dry_run:
members_info = [student.login_id]
if update_teaching_team:
members_info = members_info + settings.mattermost_teaching_team
logger.info(
f"Dry run: would create channel {display_name} ({channel_name}) and add members: {members_info}"
)
continue
try:
channel = self.endpoint.channels.create_channel(
{
"team_id": self.team["id"],
"name": channel_name,
"display_name": display_name,
"type": "P",
}
)
logger.info(
f"Created channel {display_name} ({channel_name}) on Mattermost"
)
except Exception as e:
logger.warning(
f"Error when creating channel {channel_name}: {e} Perhaps channel already exists?"
)
continue
current_members = set()
try:
mm_members = (
self.endpoint.channels.get_channel_members(channel["id"]) or []
)
for m in mm_members:
uname = None
if isinstance(m, dict):
uname = m.get("username") or m.get("name")
if not uname and "user" in m and isinstance(m["user"], dict):
uname = m["user"].get("username") or m["user"].get("name")
if not uname and "user_id" in m:
try:
u = self.endpoint.users.get_user(m["user_id"]) or {}
if isinstance(u, dict):
uname = u.get("username") or u.get("name")
except Exception:
uname = None
if uname:
current_members.add(uname.lower())
except Exception:
current_members = set()
members = [student.login_id]
if update_teaching_team:
members = members + settings.mattermost_teaching_team
for member in members:
if member.lower() in current_members:
logger.debug(f"Member {member} already in channel {channel_name}")
continue
if dry_run:
logger.info(
f"Dry run: would add member {member} to channel {channel_name}"
)
continue
try:
mmuser = self.endpoint.users.get_user_by_username(member)
except Exception:
logger.warning(
f"User {member} is not found on the Mattermost server"
)
self.endpoint.posts.create_post(
{
"channel_id": channel["id"],
"message": f"User {member} is not found on the Mattermost server",
}
)
continue
try:
self.endpoint.channels.add_user(
channel["id"], {"user_id": mmuser["id"]}
)
except Exception:
logger.warning(f"User {member} is not in the team")
self.endpoint.posts.create_post(
{
"channel_id": channel["id"],
"message": f"User {member} is not in the team",
}
)
logger.info(f"Added member {member} to channel {channel_name}")
def create_webhooks_for_repos( def create_webhooks_for_repos(
self, repos: List[str], gitea: Gitea, gitea_suffix: bool self, repos: List[str], gitea: Gitea, gitea_suffix: bool
) -> None: ) -> None: