forked from JOJ/Joint-Teapot
		
	Compare commits
	
		
			17 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 77064ac37c | |||
| 332e522051 | |||
| 69e097e04b | |||
| f4fb5eae05 | |||
| 3a511660bb | |||
| 9fc7649696 | |||
| e160023cbf | |||
| 99d889ee12 | |||
| f755fb44f6 | |||
| 94d3f993b2 | |||
| aa33dcc2f1 | |||
| e24545324d | |||
| 3dc6667716 | |||
| 5b6c61af6d | |||
| 8264152022 | |||
| 992f450004 | |||
| 5478052c23 | 
							
								
								
									
										39
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										39
									
								
								README.md
									
									
									
									
									
								
							| 
						 | 
					@ -60,24 +60,11 @@ clone all gitea repos to local
 | 
				
			||||||
 | 
					
 | 
				
			||||||
close all issues and pull requests in gitea organization
 | 
					close all issues and pull requests in gitea organization
 | 
				
			||||||
 | 
					
 | 
				
			||||||
### `create-group-channels-on-mm`
 | 
					### `create-channels-on-mm`
 | 
				
			||||||
 | 
					
 | 
				
			||||||
create Mattermost channels for student groups based on team information on Gitea
 | 
					create channels for student groups according to group information on gitea. Optionally specify a prefix to ignore all repos whose names do not start with it. Optionally specify a suffix to add to all channels created.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
**Options**:
 | 
					Example: `python3 -m joint_teapot create-channels-on-mm --prefix p1 --suffix -private --invite-teaching-team` will fetch all repos whose names start with `"p1"` and create channels on mm for these repos like "p1team1-private". Members of a repo will be added to the corresponding channel. And teaching team (adjust in `.env`) will be invited to the channels.
 | 
				
			||||||
- `--prefix TEXT`: Only process repositories starting with this prefix
 | 
					 | 
				
			||||||
- `--suffix TEXT`: Add suffix to created channels
 | 
					 | 
				
			||||||
- `--invite-teaching-team/--no-invite-teaching-team`: Whether to invite teaching team (default: invite)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
Example: `joint-teapot create-group-channels-on-mm --prefix "hteam" --suffix "-gitea"` will Create channels for webhook integration. Members of "hteam*" repo will be added to the corresponding channel. And teaching team (adjust in `.env`) will be invited to the channels.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
### `create-personal-channels-on-mm`
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
create personal Mattermost channels for every student
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
**Options**:
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
- `--invite-teaching-team/--no-invite-teaching-team`: Whether to invite teaching team (default: invite)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
### `create-comment`
 | 
					### `create-comment`
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -138,26 +125,6 @@ Example: `python3 -m joint_teapot unsubscribe-from-repos '\d{12}$'` will remove
 | 
				
			||||||
 | 
					
 | 
				
			||||||
upload assignment grades to canvas from grade file (GRADE.txt by default), read the first line as grade, the rest as comments
 | 
					upload assignment grades to canvas from grade file (GRADE.txt by default), read the first line as grade, the rest as comments
 | 
				
			||||||
 | 
					
 | 
				
			||||||
### `label-issues`
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
add a label to specific issues in a repository, create labels if not exist, with dry-run disabled by default. You may adjust the color of the label with `--color "#******"` if the label doesn't exist.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
### `delete-labels`
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
remove a label from specific issues or delete the repository labels, with dry-run disabled by default.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
### `close-issues`
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
close one or more specific issues in a repository, with dry-run disabled by default.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
### `update-group-channels-on-mm`
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
update group Mattermost channels for student groups based on team information on Gitea. It will only add missing users, never delete anyone.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
### `update-personal-channels-on-mm`
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
update personal Mattermost channels for every student. It will only add missing users, never delete anyone.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
## License
 | 
					## License
 | 
				
			||||||
 | 
					
 | 
				
			||||||
MIT
 | 
					MIT
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -6,7 +6,7 @@ from pathlib import Path
 | 
				
			||||||
from time import sleep
 | 
					from time import sleep
 | 
				
			||||||
from typing import TYPE_CHECKING, List, Optional
 | 
					from typing import TYPE_CHECKING, List, Optional
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# from filelock import FileLock
 | 
					from filelock import FileLock
 | 
				
			||||||
from git import Repo
 | 
					from git import Repo
 | 
				
			||||||
from typer import Argument, Exit, Option, Typer, echo
 | 
					from typer import Argument, Exit, Option, Typer, echo
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -167,51 +167,6 @@ def close_all_issues() -> None:
 | 
				
			||||||
    tea.pot.gitea.close_all_issues()
 | 
					    tea.pot.gitea.close_all_issues()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@app.command(
 | 
					 | 
				
			||||||
    "close-issues",
 | 
					 | 
				
			||||||
    help="close specific issues in a repository",
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
def close_issues(
 | 
					 | 
				
			||||||
    repo_name: str,
 | 
					 | 
				
			||||||
    issue_numbers: List[int] = Argument(..., help="One or more issue numbers to close"),
 | 
					 | 
				
			||||||
) -> None:
 | 
					 | 
				
			||||||
    tea.pot.gitea.close_issues(repo_name, issue_numbers)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
@app.command(
 | 
					 | 
				
			||||||
    "label-issues",
 | 
					 | 
				
			||||||
    help="add a label to specific issues in a repository",
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
def label_issues(
 | 
					 | 
				
			||||||
    label_name: str,
 | 
					 | 
				
			||||||
    repo_name: str,
 | 
					 | 
				
			||||||
    issue_numbers: List[int] = Argument(..., help="One or more issue numbers to label"),
 | 
					 | 
				
			||||||
    issue_color: Optional[str] = Option(
 | 
					 | 
				
			||||||
        None, "--color", help="Color for newly created label (hex without # or with #)"
 | 
					 | 
				
			||||||
    ),
 | 
					 | 
				
			||||||
) -> None:
 | 
					 | 
				
			||||||
    tea.pot.gitea.label_issues(repo_name, label_name, issue_numbers, color=issue_color)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
@app.command(
 | 
					 | 
				
			||||||
    "delete-label",
 | 
					 | 
				
			||||||
    help="remove a label from specific issues or delete the repository label",
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
def delete_label(
 | 
					 | 
				
			||||||
    label_name: str,
 | 
					 | 
				
			||||||
    repo_name: str,
 | 
					 | 
				
			||||||
    issue_numbers: List[int] = Argument(
 | 
					 | 
				
			||||||
        None,
 | 
					 | 
				
			||||||
        help="issue numbers to remove the label from",
 | 
					 | 
				
			||||||
    ),
 | 
					 | 
				
			||||||
) -> None:
 | 
					 | 
				
			||||||
    tea.pot.gitea.delete_label(
 | 
					 | 
				
			||||||
        repo_name,
 | 
					 | 
				
			||||||
        label_name,
 | 
					 | 
				
			||||||
        issue_numbers if issue_numbers else None,
 | 
					 | 
				
			||||||
    )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
@app.command(
 | 
					@app.command(
 | 
				
			||||||
    "archive-repos", help="archive repos in gitea organization according to regex"
 | 
					    "archive-repos", help="archive repos in gitea organization according to regex"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
| 
						 | 
					@ -288,49 +243,6 @@ def create_personal_channels_on_mm(
 | 
				
			||||||
    tea.pot.create_channels_for_individuals(invite_teaching_team)
 | 
					    tea.pot.create_channels_for_individuals(invite_teaching_team)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@app.command(
 | 
					 | 
				
			||||||
    "update-group-channels-on-mm",
 | 
					 | 
				
			||||||
    help="update Mattermost channels for student groups based on team information on Gitea; only add missing members",
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
def update_group_channels_on_mm(
 | 
					 | 
				
			||||||
    prefix: str = Option(
 | 
					 | 
				
			||||||
        "", help="Only process repositories starting with this prefix"
 | 
					 | 
				
			||||||
    ),
 | 
					 | 
				
			||||||
    suffix: str = Option("", help="Only process channels ending with this suffix"),
 | 
					 | 
				
			||||||
    update_teaching_team: bool = Option(
 | 
					 | 
				
			||||||
        True,
 | 
					 | 
				
			||||||
        "--update-teaching-team/--no-update-teaching-team",
 | 
					 | 
				
			||||||
        help="Whether to update teaching team",
 | 
					 | 
				
			||||||
    ),
 | 
					 | 
				
			||||||
) -> None:
 | 
					 | 
				
			||||||
    groups = {
 | 
					 | 
				
			||||||
        group_name: members
 | 
					 | 
				
			||||||
        for group_name, members in tea.pot.gitea.get_all_teams().items()
 | 
					 | 
				
			||||||
        if group_name.startswith(prefix)
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
    logger.info(
 | 
					 | 
				
			||||||
        f"{len(groups)} group channel(s) to update"
 | 
					 | 
				
			||||||
        + (f" with suffix {suffix}" if suffix else "")
 | 
					 | 
				
			||||||
    )
 | 
					 | 
				
			||||||
    tea.pot.mattermost.update_channels_for_groups(groups, suffix, update_teaching_team)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
@app.command(
 | 
					 | 
				
			||||||
    "update-personal-channels-on-mm",
 | 
					 | 
				
			||||||
    help="update personal Mattermost channels for every student; only add missing members",
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
def update_personal_channels_on_mm(
 | 
					 | 
				
			||||||
    update_teaching_team: bool = Option(
 | 
					 | 
				
			||||||
        True,
 | 
					 | 
				
			||||||
        "--update-teaching-team/--no-update-teaching-team",
 | 
					 | 
				
			||||||
        help="Whether to update teaching team",
 | 
					 | 
				
			||||||
    ),
 | 
					 | 
				
			||||||
) -> None:
 | 
					 | 
				
			||||||
    tea.pot.mattermost.update_channels_for_individuals(
 | 
					 | 
				
			||||||
        tea.pot.canvas.students, update_teaching_team
 | 
					 | 
				
			||||||
    )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
@app.command(
 | 
					@app.command(
 | 
				
			||||||
    "create-webhooks-for-mm",
 | 
					    "create-webhooks-for-mm",
 | 
				
			||||||
    help="create a pair of webhooks on gitea and mm for all student groups on gitea, "
 | 
					    help="create a pair of webhooks on gitea and mm for all student groups on gitea, "
 | 
				
			||||||
| 
						 | 
					@ -480,8 +392,7 @@ def joj3_all_env(
 | 
				
			||||||
        f"try to acquire lock, file path: {lock_file_path}, "
 | 
					        f"try to acquire lock, file path: {lock_file_path}, "
 | 
				
			||||||
        + f"timeout: {settings.joj3_lock_file_timeout}"
 | 
					        + f"timeout: {settings.joj3_lock_file_timeout}"
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
    if True:  # disable the file lock temporarily
 | 
					    with FileLock(lock_file_path, timeout=settings.joj3_lock_file_timeout).acquire():
 | 
				
			||||||
        # with FileLock(lock_file_path, timeout=settings.joj3_lock_file_timeout).acquire():
 | 
					 | 
				
			||||||
        logger.info("file lock acquired")
 | 
					        logger.info("file lock acquired")
 | 
				
			||||||
        retry_interval = 1
 | 
					        retry_interval = 1
 | 
				
			||||||
        git_push_ok = False
 | 
					        git_push_ok = False
 | 
				
			||||||
| 
						 | 
					@ -590,6 +501,9 @@ def joj3_check_env(
 | 
				
			||||||
            "Example: --penalty-config 24=0.75,48=0.5"
 | 
					            "Example: --penalty-config 24=0.75,48=0.5"
 | 
				
			||||||
        ),
 | 
					        ),
 | 
				
			||||||
    ),
 | 
					    ),
 | 
				
			||||||
 | 
					    ignore_submitter: bool = Option(
 | 
				
			||||||
 | 
					        False, help="ignore submitter when checking submission count"
 | 
				
			||||||
 | 
					    ),
 | 
				
			||||||
) -> None:
 | 
					) -> None:
 | 
				
			||||||
    app.pretty_exceptions_enable = False
 | 
					    app.pretty_exceptions_enable = False
 | 
				
			||||||
    set_settings(Settings(_env_file=env_path))
 | 
					    set_settings(Settings(_env_file=env_path))
 | 
				
			||||||
| 
						 | 
					@ -608,7 +522,7 @@ def joj3_check_env(
 | 
				
			||||||
        penalty_config,
 | 
					        penalty_config,
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
    count_msg, count_failed = tea.pot.joj3_check_submission_count(
 | 
					    count_msg, count_failed = tea.pot.joj3_check_submission_count(
 | 
				
			||||||
        env, grading_repo_name, group_config, scoreboard_filename
 | 
					        env, grading_repo_name, group_config, scoreboard_filename, ignore_submitter
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
    echo(
 | 
					    echo(
 | 
				
			||||||
        json.dumps(
 | 
					        json.dumps(
 | 
				
			||||||
| 
						 | 
					@ -621,6 +535,16 @@ def joj3_check_env(
 | 
				
			||||||
    logger.info("joj3-check-env done")
 | 
					    logger.info("joj3-check-env done")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@app.command("joj3-check-gitea-token")
 | 
				
			||||||
 | 
					def joj3_check_gitea_token(
 | 
				
			||||||
 | 
					    env_path: str = Argument("", help="path to .env file")
 | 
				
			||||||
 | 
					) -> None:
 | 
				
			||||||
 | 
					    app.pretty_exceptions_enable = False
 | 
				
			||||||
 | 
					    set_settings(Settings(_env_file=env_path))
 | 
				
			||||||
 | 
					    set_logger(settings.stderr_log_level)
 | 
				
			||||||
 | 
					    tea.pot.gitea.organization_api.org_list_repos(settings.gitea_org_name)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
if __name__ == "__main__":
 | 
					if __name__ == "__main__":
 | 
				
			||||||
    try:
 | 
					    try:
 | 
				
			||||||
        app()
 | 
					        app()
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -40,7 +40,7 @@ class Settings(BaseSettings):
 | 
				
			||||||
    joj_sid: str = ""
 | 
					    joj_sid: str = ""
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # joj3
 | 
					    # joj3
 | 
				
			||||||
    joj3_lock_file_path: str = ".git/teapot.lock"
 | 
					    joj3_lock_file_path: str = ".git/teapot-joj3-all-env.lock"
 | 
				
			||||||
    joj3_lock_file_timeout: int = 30
 | 
					    joj3_lock_file_timeout: int = 30
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # moss
 | 
					    # moss
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -359,7 +359,9 @@ class Teapot:
 | 
				
			||||||
        grading_repo_name: str,
 | 
					        grading_repo_name: str,
 | 
				
			||||||
        group_config: str,
 | 
					        group_config: str,
 | 
				
			||||||
        scoreboard_filename: str,
 | 
					        scoreboard_filename: str,
 | 
				
			||||||
 | 
					        ignore_submitter: bool,
 | 
				
			||||||
    ) -> Tuple[str, bool]:
 | 
					    ) -> Tuple[str, bool]:
 | 
				
			||||||
 | 
					        submitter = env.github_actor
 | 
				
			||||||
        submitter_repo_name = env.github_repository.split("/")[-1]
 | 
					        submitter_repo_name = env.github_repository.split("/")[-1]
 | 
				
			||||||
        repo: Repo = self.git.get_repo(grading_repo_name)
 | 
					        repo: Repo = self.git.get_repo(grading_repo_name)
 | 
				
			||||||
        now = datetime.now(timezone.utc)
 | 
					        now = datetime.now(timezone.utc)
 | 
				
			||||||
| 
						 | 
					@ -386,11 +388,13 @@ class Teapot:
 | 
				
			||||||
            time_windows.append(since)
 | 
					            time_windows.append(since)
 | 
				
			||||||
            valid_items.append((name, max_count, time_period, since))
 | 
					            valid_items.append((name, max_count, time_period, since))
 | 
				
			||||||
        logger.info(f"valid items: {valid_items}, time windows: {time_windows}")
 | 
					        logger.info(f"valid items: {valid_items}, time windows: {time_windows}")
 | 
				
			||||||
        all_commits = []
 | 
					        matched_commits = []
 | 
				
			||||||
 | 
					        all_commits_length = 0
 | 
				
			||||||
        if time_windows:
 | 
					        if time_windows:
 | 
				
			||||||
            earliest_since = min(time_windows).strftime("%Y-%m-%dT%H:%M:%S")
 | 
					            earliest_since = min(time_windows).strftime("%Y-%m-%dT%H:%M:%S")
 | 
				
			||||||
            commits = repo.iter_commits(paths=scoreboard_filename, since=earliest_since)
 | 
					            commits = repo.iter_commits(paths=scoreboard_filename, since=earliest_since)
 | 
				
			||||||
            for commit in commits:
 | 
					            for commit in commits:
 | 
				
			||||||
 | 
					                all_commits_length += 1
 | 
				
			||||||
                lines = commit.message.strip().splitlines()
 | 
					                lines = commit.message.strip().splitlines()
 | 
				
			||||||
                if not lines:
 | 
					                if not lines:
 | 
				
			||||||
                    continue
 | 
					                    continue
 | 
				
			||||||
| 
						 | 
					@ -400,25 +404,28 @@ class Teapot:
 | 
				
			||||||
                d = match.groupdict()
 | 
					                d = match.groupdict()
 | 
				
			||||||
                if (
 | 
					                if (
 | 
				
			||||||
                    env.joj3_conf_name != d["exercise_name"]
 | 
					                    env.joj3_conf_name != d["exercise_name"]
 | 
				
			||||||
                    or env.github_actor != d["submitter"]
 | 
					 | 
				
			||||||
                    or submitter_repo_name != d["submitter_repo_name"]
 | 
					                    or submitter_repo_name != d["submitter_repo_name"]
 | 
				
			||||||
                ):
 | 
					                ):
 | 
				
			||||||
                    continue
 | 
					                    continue
 | 
				
			||||||
 | 
					                if not ignore_submitter and submitter != d["submitter"]:
 | 
				
			||||||
 | 
					                    continue
 | 
				
			||||||
                groups_line = next((l for l in lines if l.startswith("groups: ")), None)
 | 
					                groups_line = next((l for l in lines if l.startswith("groups: ")), None)
 | 
				
			||||||
                commit_groups = (
 | 
					                commit_groups = (
 | 
				
			||||||
                    groups_line[len("groups: ") :].split(",") if groups_line else []
 | 
					                    groups_line[len("groups: ") :].split(",") if groups_line else []
 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
                all_commits.append(
 | 
					                matched_commits.append(
 | 
				
			||||||
                    {
 | 
					                    {
 | 
				
			||||||
                        "time": commit.committed_datetime,
 | 
					                        "time": commit.committed_datetime,
 | 
				
			||||||
                        "groups": [g.strip() for g in commit_groups],
 | 
					                        "groups": [g.strip() for g in commit_groups],
 | 
				
			||||||
                    }
 | 
					                    }
 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
        logger.info(f"all commits length: {len(all_commits)}")
 | 
					        logger.info(
 | 
				
			||||||
 | 
					            f"matched commits length: {len(matched_commits)}, all commits length: {all_commits_length}"
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
        for name, max_count, time_period, since in valid_items:
 | 
					        for name, max_count, time_period, since in valid_items:
 | 
				
			||||||
            submit_count = 0
 | 
					            submit_count = 0
 | 
				
			||||||
            time_limit = now - timedelta(hours=time_period)
 | 
					            time_limit = now - timedelta(hours=time_period)
 | 
				
			||||||
            for commit in all_commits:
 | 
					            for commit in matched_commits:
 | 
				
			||||||
                if commit["time"] < time_limit:
 | 
					                if commit["time"] < time_limit:
 | 
				
			||||||
                    continue
 | 
					                    continue
 | 
				
			||||||
                if name:
 | 
					                if name:
 | 
				
			||||||
| 
						 | 
					@ -428,7 +435,7 @@ class Teapot:
 | 
				
			||||||
                        continue
 | 
					                        continue
 | 
				
			||||||
                submit_count += 1
 | 
					                submit_count += 1
 | 
				
			||||||
            logger.info(
 | 
					            logger.info(
 | 
				
			||||||
                f"submitter {env.github_actor} is submitting for the {submit_count + 1} time, "
 | 
					                f"submitter {submitter} is submitting for the {submit_count + 1} time, "
 | 
				
			||||||
                f"{min(0, max_count - submit_count - 1)} time(s) remaining, "
 | 
					                f"{min(0, max_count - submit_count - 1)} time(s) remaining, "
 | 
				
			||||||
                f"group={name}, "
 | 
					                f"group={name}, "
 | 
				
			||||||
                f"time period={time_period} hour(s), "
 | 
					                f"time period={time_period} hour(s), "
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -38,10 +38,7 @@ def set_logger(
 | 
				
			||||||
) -> None:
 | 
					) -> None:
 | 
				
			||||||
    logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)
 | 
					    logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)
 | 
				
			||||||
    logger.remove()
 | 
					    logger.remove()
 | 
				
			||||||
    logger.add(
 | 
					    logger.add(stderr, level=stderr_log_level, colorize=stderr.isatty())
 | 
				
			||||||
        stderr,
 | 
					 | 
				
			||||||
        level=stderr_log_level,
 | 
					 | 
				
			||||||
    )
 | 
					 | 
				
			||||||
    logger.add(settings.log_file_path, level="DEBUG")
 | 
					    logger.add(settings.log_file_path, level="DEBUG")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -39,19 +39,8 @@ class Canvas:
 | 
				
			||||||
            student.name = (
 | 
					            student.name = (
 | 
				
			||||||
                re.sub(r"[^\x00-\x7F]+", "", student.name).strip().title()
 | 
					                re.sub(r"[^\x00-\x7F]+", "", student.name).strip().title()
 | 
				
			||||||
            )  # We only care english name
 | 
					            )  # We only care english name
 | 
				
			||||||
            # Some users (like system users, announcers) might not have login_id
 | 
					 | 
				
			||||||
            if hasattr(student, "login_id") and student.login_id:
 | 
					 | 
				
			||||||
            student.sis_id = student.login_id
 | 
					            student.sis_id = student.login_id
 | 
				
			||||||
            student.login_id = student.email.split("@")[0]
 | 
					            student.login_id = student.email.split("@")[0]
 | 
				
			||||||
            else:
 | 
					 | 
				
			||||||
                # For users without login_id, use email prefix as both sis_id and login_id
 | 
					 | 
				
			||||||
                if hasattr(student, "email") and student.email:
 | 
					 | 
				
			||||||
                    student.login_id = student.email.split("@")[0]
 | 
					 | 
				
			||||||
                    student.sis_id = student.login_id
 | 
					 | 
				
			||||||
                else:
 | 
					 | 
				
			||||||
                    # Fallback for users without email
 | 
					 | 
				
			||||||
                    student.login_id = f"user_{student.id}"
 | 
					 | 
				
			||||||
                    student.sis_id = student.login_id
 | 
					 | 
				
			||||||
            return student
 | 
					            return student
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.students = [
 | 
					        self.students = [
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -90,28 +90,23 @@ class Git:
 | 
				
			||||||
        retry_interval = 2
 | 
					        retry_interval = 2
 | 
				
			||||||
        while retry_interval and auto_retry:
 | 
					        while retry_interval and auto_retry:
 | 
				
			||||||
            try:
 | 
					            try:
 | 
				
			||||||
                current_branch = ""
 | 
					 | 
				
			||||||
                if repo.head.is_detached:
 | 
					 | 
				
			||||||
                    current_branch = repo.head.commit.hexsha
 | 
					 | 
				
			||||||
                else:
 | 
					 | 
				
			||||||
                    current_branch = repo.active_branch.name
 | 
					 | 
				
			||||||
                if clean_git_lock:
 | 
					                if clean_git_lock:
 | 
				
			||||||
                    lock_files = [
 | 
					                    locks_removed_count = 0
 | 
				
			||||||
                        "index.lock",
 | 
					                    for root, _, files in os.walk(os.path.join(repo_dir, ".git")):
 | 
				
			||||||
                        "HEAD.lock",
 | 
					                        for filename in files:
 | 
				
			||||||
                        "fetch-pack.lock",
 | 
					                            if filename.endswith(".lock"):
 | 
				
			||||||
                        "logs/HEAD.lock",
 | 
					                                lock_file_path = os.path.join(root, filename)
 | 
				
			||||||
                        "packed-refs.lock",
 | 
					                                if (
 | 
				
			||||||
                        "config.lock",
 | 
					                                    os.path.relpath(lock_file_path, repo_dir)
 | 
				
			||||||
                        f"refs/heads/{current_branch}.lock",
 | 
					                                    == settings.joj3_lock_file_path
 | 
				
			||||||
                        f"refs/remotes/origin/{current_branch}.lock",
 | 
					                                ):
 | 
				
			||||||
                        f"refs/heads/{checkout_dest}.lock",
 | 
					                                    continue
 | 
				
			||||||
                        f"refs/remotes/origin/{checkout_dest}.lock",
 | 
					                                try:
 | 
				
			||||||
                    ]
 | 
					                                    os.remove(lock_file_path)
 | 
				
			||||||
                    for lock_file in lock_files:
 | 
					                                    locks_removed_count += 1
 | 
				
			||||||
                        lock_path = os.path.join(repo_dir, ".git", lock_file)
 | 
					                                except OSError as e:
 | 
				
			||||||
                        if os.path.exists(lock_path):
 | 
					                                    logger.warning(f"Error removing lock file: {e}")
 | 
				
			||||||
                            os.remove(lock_path)
 | 
					                    logger.info(f"Removed {locks_removed_count} lock files")
 | 
				
			||||||
                repo.git.fetch("--tags", "--all", "-f")
 | 
					                repo.git.fetch("--tags", "--all", "-f")
 | 
				
			||||||
                repo.git.reset("--hard", reset_target)
 | 
					                repo.git.reset("--hard", reset_target)
 | 
				
			||||||
                repo.git.clean("-d", "-f", "-x")
 | 
					                repo.git.clean("-d", "-f", "-x")
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,11 +1,9 @@
 | 
				
			||||||
import re
 | 
					import re
 | 
				
			||||||
from enum import Enum
 | 
					from enum import Enum
 | 
				
			||||||
from functools import lru_cache
 | 
					from functools import lru_cache
 | 
				
			||||||
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, cast
 | 
					from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, TypeVar
 | 
				
			||||||
from urllib.parse import quote
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
import focs_gitea
 | 
					import focs_gitea
 | 
				
			||||||
import requests  # type: ignore
 | 
					 | 
				
			||||||
from canvasapi.group import Group, GroupMembership
 | 
					from canvasapi.group import Group, GroupMembership
 | 
				
			||||||
from canvasapi.paginated_list import PaginatedList
 | 
					from canvasapi.paginated_list import PaginatedList
 | 
				
			||||||
from canvasapi.user import User
 | 
					from canvasapi.user import User
 | 
				
			||||||
| 
						 | 
					@ -22,13 +20,11 @@ class PermissionEnum(Enum):
 | 
				
			||||||
    admin = "admin"
 | 
					    admin = "admin"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def list_all(method: Callable[..., Any], *args: Any, **kwargs: Any) -> List[Any]:
 | 
					T = TypeVar("T")
 | 
				
			||||||
    """Call paginated API methods repeatedly and collect results.
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    The exact return element types vary depending on the API client. We use
 | 
					
 | 
				
			||||||
    ``Any`` here to avoid over-constraining typing for the external client.
 | 
					def list_all(method: Callable[..., Iterable[T]], *args: Any, **kwargs: Any) -> List[T]:
 | 
				
			||||||
    """
 | 
					    all_res = []
 | 
				
			||||||
    all_res: List[Any] = []
 | 
					 | 
				
			||||||
    page = 1
 | 
					    page = 1
 | 
				
			||||||
    while True:
 | 
					    while True:
 | 
				
			||||||
        res = method(*args, **kwargs, page=page)
 | 
					        res = method(*args, **kwargs, page=page)
 | 
				
			||||||
| 
						 | 
					@ -40,165 +36,6 @@ def list_all(method: Callable[..., Any], *args: Any, **kwargs: Any) -> List[Any]
 | 
				
			||||||
    return all_res
 | 
					    return all_res
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def _get_color(c: Optional[str]) -> str:
 | 
					 | 
				
			||||||
    if not c:
 | 
					 | 
				
			||||||
        return "CCCCCC"
 | 
					 | 
				
			||||||
    s = c.strip()
 | 
					 | 
				
			||||||
    if s.startswith("#"):
 | 
					 | 
				
			||||||
        s = s[1:]
 | 
					 | 
				
			||||||
    if re.fullmatch(r"[0-9a-fA-F]{6}", s):
 | 
					 | 
				
			||||||
        return s
 | 
					 | 
				
			||||||
    logger.warning(f"Provided color '{c}' is not a valid hex; falling back to #CCCCCC")
 | 
					 | 
				
			||||||
    return "CCCCCC"
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def _label_id_from_obj(obj: Any) -> Optional[int]:
 | 
					 | 
				
			||||||
    try:
 | 
					 | 
				
			||||||
        if isinstance(obj, dict):
 | 
					 | 
				
			||||||
            return obj.get("id")
 | 
					 | 
				
			||||||
        return getattr(obj, "id", None)
 | 
					 | 
				
			||||||
    except Exception:
 | 
					 | 
				
			||||||
        return None
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def _get_repo_labels(gitea: Any, repo_name: str) -> List[Any]:
 | 
					 | 
				
			||||||
    try:
 | 
					 | 
				
			||||||
        return list(
 | 
					 | 
				
			||||||
            cast(
 | 
					 | 
				
			||||||
                Iterable[Any],
 | 
					 | 
				
			||||||
                gitea.issue_api.issue_list_labels(gitea.org_name, repo_name),
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
    except ApiException as e:
 | 
					 | 
				
			||||||
        logger.error(f"Failed to list labels for {repo_name}: {e}")
 | 
					 | 
				
			||||||
        return []
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def _list_issues_map(gitea: Any, repo_name: str) -> Dict[int, Any]:
 | 
					 | 
				
			||||||
    try:
 | 
					 | 
				
			||||||
        return {
 | 
					 | 
				
			||||||
            issue.number: issue
 | 
					 | 
				
			||||||
            for issue in list_all(
 | 
					 | 
				
			||||||
                gitea.issue_api.issue_list_issues, gitea.org_name, repo_name
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
    except ApiException as e:
 | 
					 | 
				
			||||||
        logger.error(f"Failed to list issues for {repo_name}: {e}")
 | 
					 | 
				
			||||||
        return {}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def _api_post_labels(gitea: Any, repo_name: str, issue_num: int, payload: Any) -> Any:
 | 
					 | 
				
			||||||
    path = f"/repos/{gitea.org_name}/{repo_name}/issues/{issue_num}/labels"
 | 
					 | 
				
			||||||
    full_url = f"{gitea.api_client.configuration.host}{path}"
 | 
					 | 
				
			||||||
    token = (
 | 
					 | 
				
			||||||
        getattr(gitea.api_client.configuration, "api_key", {}).get("access_token")
 | 
					 | 
				
			||||||
        or settings.gitea_access_token
 | 
					 | 
				
			||||||
    )
 | 
					 | 
				
			||||||
    headers_local = {
 | 
					 | 
				
			||||||
        "Authorization": f"token {token}",
 | 
					 | 
				
			||||||
        "Content-Type": "application/json",
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
    return requests.post(full_url, headers=headers_local, json=payload, timeout=10)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def _patch_label_exclusive(
 | 
					 | 
				
			||||||
    gitea: Any, repo_name: str, name: str, label_obj: Any
 | 
					 | 
				
			||||||
) -> None:
 | 
					 | 
				
			||||||
    try:
 | 
					 | 
				
			||||||
        existing_color = getattr(label_obj, "color", None) or (
 | 
					 | 
				
			||||||
            label_obj.get("color") if isinstance(label_obj, dict) else None
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
        if (
 | 
					 | 
				
			||||||
            existing_color
 | 
					 | 
				
			||||||
            and isinstance(existing_color, str)
 | 
					 | 
				
			||||||
            and existing_color.startswith("#")
 | 
					 | 
				
			||||||
        ):
 | 
					 | 
				
			||||||
            existing_color = existing_color[1:]
 | 
					 | 
				
			||||||
        enc_name = quote(name, safe="")
 | 
					 | 
				
			||||||
        path = f"/repos/{gitea.org_name}/{repo_name}/labels/{enc_name}"
 | 
					 | 
				
			||||||
        url = f"{gitea.api_client.configuration.host}{path}"
 | 
					 | 
				
			||||||
        token = (
 | 
					 | 
				
			||||||
            getattr(gitea.api_client.configuration, "api_key", {}).get("access_token")
 | 
					 | 
				
			||||||
            or settings.gitea_access_token
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
        headers_local = {
 | 
					 | 
				
			||||||
            "Authorization": f"token {token}",
 | 
					 | 
				
			||||||
            "Content-Type": "application/json",
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        payload = {"exclusive": True, "name": name}
 | 
					 | 
				
			||||||
        if existing_color:
 | 
					 | 
				
			||||||
            payload["color"] = existing_color
 | 
					 | 
				
			||||||
        resp = requests.patch(url, headers=headers_local, json=payload, timeout=10)
 | 
					 | 
				
			||||||
        if resp.status_code in (200, 201):
 | 
					 | 
				
			||||||
            logger.info(f"Marked existing label '{name}' as exclusive in {repo_name}")
 | 
					 | 
				
			||||||
        else:
 | 
					 | 
				
			||||||
            logger.warning(
 | 
					 | 
				
			||||||
                f"Failed to mark existing label '{name}' as exclusive: status={resp.status_code}"
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
    except Exception as e:
 | 
					 | 
				
			||||||
        logger.debug(f"Could not patch label exclusive for {name}: {e}")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def _delete_issue_label_by_id(
 | 
					 | 
				
			||||||
    gitea: Any, repo_name: str, issue_num: int, lid: int
 | 
					 | 
				
			||||||
) -> None:
 | 
					 | 
				
			||||||
    try:
 | 
					 | 
				
			||||||
        path_id = f"/repos/{gitea.org_name}/{repo_name}/issues/{issue_num}/labels/{lid}"
 | 
					 | 
				
			||||||
        url_id = f"{gitea.api_client.configuration.host}{path_id}"
 | 
					 | 
				
			||||||
        token = (
 | 
					 | 
				
			||||||
            getattr(gitea.api_client.configuration, "api_key", {}).get("access_token")
 | 
					 | 
				
			||||||
            or settings.gitea_access_token
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
        headers_local = {"Authorization": f"token {token}"}
 | 
					 | 
				
			||||||
        resp = requests.delete(url_id, headers=headers_local, timeout=10)
 | 
					 | 
				
			||||||
        if resp.status_code in (200, 204):
 | 
					 | 
				
			||||||
            logger.info(f"Removed label id {lid} from {repo_name}#{issue_num}")
 | 
					 | 
				
			||||||
        else:
 | 
					 | 
				
			||||||
            logger.warning(
 | 
					 | 
				
			||||||
                f"Failed to remove label id {lid} from {repo_name}#{issue_num}: status={resp.status_code}"
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
    except Exception as e:
 | 
					 | 
				
			||||||
        logger.warning(
 | 
					 | 
				
			||||||
            f"Error removing label id {lid} from {repo_name}#{issue_num}: {e}"
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def _create_label(
 | 
					 | 
				
			||||||
    gitea: Any, repo_name: str, label_name: str, color: Optional[str], labels: List[Any]
 | 
					 | 
				
			||||||
) -> Optional[Any]:
 | 
					 | 
				
			||||||
    for l in labels:
 | 
					 | 
				
			||||||
        if getattr(l, "name", None) == label_name:
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                is_ex = getattr(l, "exclusive", None)
 | 
					 | 
				
			||||||
                if not bool(is_ex):
 | 
					 | 
				
			||||||
                    _patch_label_exclusive(gitea, repo_name, label_name, l)
 | 
					 | 
				
			||||||
            except Exception:
 | 
					 | 
				
			||||||
                return l
 | 
					 | 
				
			||||||
            return l
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    chosen_color = _get_color(color)
 | 
					 | 
				
			||||||
    try:
 | 
					 | 
				
			||||||
        new = gitea.issue_api.issue_create_label(
 | 
					 | 
				
			||||||
            gitea.org_name,
 | 
					 | 
				
			||||||
            repo_name,
 | 
					 | 
				
			||||||
            body={"name": label_name, "color": chosen_color, "exclusive": True},
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
        logger.info(f"Created label '{label_name}' in {gitea.org_name}/{repo_name}")
 | 
					 | 
				
			||||||
        return new
 | 
					 | 
				
			||||||
    except ApiException as e:
 | 
					 | 
				
			||||||
        logger.error(f"Failed to create label {label_name} in {repo_name}: {e}")
 | 
					 | 
				
			||||||
        if hasattr(e, "body"):
 | 
					 | 
				
			||||||
            logger.error(f"ApiException body: {getattr(e, 'body', None)}")
 | 
					 | 
				
			||||||
        return None
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def _extract_label_names(issue_obj: Any) -> List[str]:
 | 
					 | 
				
			||||||
    try:
 | 
					 | 
				
			||||||
        return [getattr(l, "name", l) for l in getattr(issue_obj, "labels", []) or []]
 | 
					 | 
				
			||||||
    except Exception:
 | 
					 | 
				
			||||||
        return []
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
class Gitea:
 | 
					class Gitea:
 | 
				
			||||||
    def __init__(
 | 
					    def __init__(
 | 
				
			||||||
        self,
 | 
					        self,
 | 
				
			||||||
| 
						 | 
					@ -640,44 +477,6 @@ class Gitea:
 | 
				
			||||||
                        self.org_name, repo_name, issue.number, body={"state": "closed"}
 | 
					                        self.org_name, repo_name, issue.number, body={"state": "closed"}
 | 
				
			||||||
                    )
 | 
					                    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def close_issues(
 | 
					 | 
				
			||||||
        self, repo_name: str, issue_numbers: List[int], dry_run: bool = False
 | 
					 | 
				
			||||||
    ) -> None:
 | 
					 | 
				
			||||||
        if not issue_numbers:
 | 
					 | 
				
			||||||
            logger.warning("No issue numbers provided to close")
 | 
					 | 
				
			||||||
            return
 | 
					 | 
				
			||||||
        if dry_run:
 | 
					 | 
				
			||||||
            logger.info("Dry run enabled. No changes will be made to issues.")
 | 
					 | 
				
			||||||
        try:
 | 
					 | 
				
			||||||
            issues = {
 | 
					 | 
				
			||||||
                issue.number: issue
 | 
					 | 
				
			||||||
                for issue in list_all(
 | 
					 | 
				
			||||||
                    self.issue_api.issue_list_issues, self.org_name, repo_name
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
        except ApiException as e:
 | 
					 | 
				
			||||||
            logger.error(f"Failed to list issues for {repo_name}: {e}")
 | 
					 | 
				
			||||||
            return
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        for num in issue_numbers:
 | 
					 | 
				
			||||||
            issue = issues.get(num)
 | 
					 | 
				
			||||||
            if issue is None:
 | 
					 | 
				
			||||||
                logger.warning(f"Issue #{num} not found in {repo_name}")
 | 
					 | 
				
			||||||
                continue
 | 
					 | 
				
			||||||
            if getattr(issue, "state", "") == "closed":
 | 
					 | 
				
			||||||
                logger.info(f"Issue #{num} in {repo_name} already closed")
 | 
					 | 
				
			||||||
                continue
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                if dry_run:
 | 
					 | 
				
			||||||
                    logger.info(f"Would close issue #{num} in {repo_name} (dry run)")
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
                self.issue_api.issue_edit_issue(
 | 
					 | 
				
			||||||
                    self.org_name, repo_name, num, body={"state": "closed"}
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
                logger.info(f"Closed issue #{num} in {repo_name}")
 | 
					 | 
				
			||||||
            except ApiException as e:
 | 
					 | 
				
			||||||
                logger.error(f"Failed to close issue #{num} in {repo_name}: {e}")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def archive_repos(self, regex: str = ".+", dry_run: bool = True) -> None:
 | 
					    def archive_repos(self, regex: str = ".+", dry_run: bool = True) -> None:
 | 
				
			||||||
        if dry_run:
 | 
					        if dry_run:
 | 
				
			||||||
            logger.info("Dry run enabled. No changes will be made to the repositories.")
 | 
					            logger.info("Dry run enabled. No changes will be made to the repositories.")
 | 
				
			||||||
| 
						 | 
					@ -695,6 +494,7 @@ class Gitea:
 | 
				
			||||||
            self.repository_api.user_current_delete_subscription(
 | 
					            self.repository_api.user_current_delete_subscription(
 | 
				
			||||||
                self.org_name, repo.name
 | 
					                self.org_name, repo.name
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					            logger.info(f"Unwatched {repo.name}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def get_all_teams(self) -> Dict[str, List[str]]:
 | 
					    def get_all_teams(self) -> Dict[str, List[str]]:
 | 
				
			||||||
        res: Dict[str, List[str]] = {}
 | 
					        res: Dict[str, List[str]] = {}
 | 
				
			||||||
| 
						 | 
					@ -749,241 +549,6 @@ class Gitea:
 | 
				
			||||||
            self.create_milestone(repo_name, milestone, description, due_date)
 | 
					            self.create_milestone(repo_name, milestone, description, due_date)
 | 
				
			||||||
            logger.info(f"Created milestone {milestone} in {repo_name}")
 | 
					            logger.info(f"Created milestone {milestone} in {repo_name}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def label_issues(
 | 
					 | 
				
			||||||
        self,
 | 
					 | 
				
			||||||
        repo_name: str,
 | 
					 | 
				
			||||||
        label_name: str,
 | 
					 | 
				
			||||||
        issue_numbers: List[int],
 | 
					 | 
				
			||||||
        color: Optional[str] = None,
 | 
					 | 
				
			||||||
    ) -> None:
 | 
					 | 
				
			||||||
        if not issue_numbers:
 | 
					 | 
				
			||||||
            logger.warning("No issue numbers provided to label")
 | 
					 | 
				
			||||||
            return
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        labels = _get_repo_labels(self, repo_name)
 | 
					 | 
				
			||||||
        if not labels:
 | 
					 | 
				
			||||||
            logger.warning(f"No labels found for {repo_name}")
 | 
					 | 
				
			||||||
            return
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        repo_label_name_to_obj: Dict[str, Any] = {}
 | 
					 | 
				
			||||||
        for l in labels:
 | 
					 | 
				
			||||||
            name = getattr(l, "name", None) or (
 | 
					 | 
				
			||||||
                l.get("name") if isinstance(l, dict) else None
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
            if isinstance(name, str):
 | 
					 | 
				
			||||||
                repo_label_name_to_obj[name] = l
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        label_obj = _create_label(self, repo_name, label_name, color, labels)
 | 
					 | 
				
			||||||
        if label_obj is None:
 | 
					 | 
				
			||||||
            logger.error(f"Unable to ensure label '{label_name}' exists in {repo_name}")
 | 
					 | 
				
			||||||
            return
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        label_id = _label_id_from_obj(label_obj)
 | 
					 | 
				
			||||||
        if label_id is None:
 | 
					 | 
				
			||||||
            logger.error(f"Unable to find id of label '{label_name}' in {repo_name}")
 | 
					 | 
				
			||||||
            return
 | 
					 | 
				
			||||||
        issues_map = _list_issues_map(self, repo_name)
 | 
					 | 
				
			||||||
        if not issues_map:
 | 
					 | 
				
			||||||
            return
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        for num in issue_numbers:
 | 
					 | 
				
			||||||
            issue = issues_map.get(num)
 | 
					 | 
				
			||||||
            if issue is None:
 | 
					 | 
				
			||||||
                logger.warning(f"Issue #{num} not found in {repo_name}")
 | 
					 | 
				
			||||||
                continue
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            existing_label_names = _extract_label_names(issue)
 | 
					 | 
				
			||||||
            if label_name in existing_label_names:
 | 
					 | 
				
			||||||
                logger.info(
 | 
					 | 
				
			||||||
                    f"Issue #{num} in {repo_name} already has label '{label_name}'"
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
                continue
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                current = self.issue_api.issue_get_issue(self.org_name, repo_name, num)
 | 
					 | 
				
			||||||
                issue_labels = _extract_label_names(current)
 | 
					 | 
				
			||||||
            except Exception:
 | 
					 | 
				
			||||||
                issue_labels = existing_label_names
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                target_obj = repo_label_name_to_obj.get(label_name) or label_obj
 | 
					 | 
				
			||||||
                target_is_exclusive = bool(
 | 
					 | 
				
			||||||
                    getattr(target_obj, "exclusive", None)
 | 
					 | 
				
			||||||
                    or (
 | 
					 | 
				
			||||||
                        target_obj.get("exclusive")
 | 
					 | 
				
			||||||
                        if isinstance(target_obj, dict)
 | 
					 | 
				
			||||||
                        else False
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
            except Exception:
 | 
					 | 
				
			||||||
                target_is_exclusive = False
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            if target_is_exclusive:
 | 
					 | 
				
			||||||
                for lname in issue_labels or []:
 | 
					 | 
				
			||||||
                    if lname == label_name:
 | 
					 | 
				
			||||||
                        continue
 | 
					 | 
				
			||||||
                    lobj = repo_label_name_to_obj.get(lname)
 | 
					 | 
				
			||||||
                    is_ex = False
 | 
					 | 
				
			||||||
                    if lobj is not None:
 | 
					 | 
				
			||||||
                        is_ex = bool(
 | 
					 | 
				
			||||||
                            getattr(lobj, "exclusive", None)
 | 
					 | 
				
			||||||
                            or (
 | 
					 | 
				
			||||||
                                lobj.get("exclusive")
 | 
					 | 
				
			||||||
                                if isinstance(lobj, dict)
 | 
					 | 
				
			||||||
                                else False
 | 
					 | 
				
			||||||
                            )
 | 
					 | 
				
			||||||
                        )
 | 
					 | 
				
			||||||
                    if is_ex:
 | 
					 | 
				
			||||||
                        lid = _label_id_from_obj(lobj) if lobj is not None else None
 | 
					 | 
				
			||||||
                        if lid is not None:
 | 
					 | 
				
			||||||
                            _delete_issue_label_by_id(self, repo_name, num, lid)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            if label_name not in (issue_labels or []):
 | 
					 | 
				
			||||||
                try:
 | 
					 | 
				
			||||||
                    resp = _api_post_labels(
 | 
					 | 
				
			||||||
                        self, repo_name, num, {"labels": [label_name]}
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                    if getattr(resp, "status_code", None) not in (200, 201):
 | 
					 | 
				
			||||||
                        logger.error(
 | 
					 | 
				
			||||||
                            f"Failed to add label via add-labels endpoint for issue #{num}: status={getattr(resp, 'status_code', None)}"
 | 
					 | 
				
			||||||
                        )
 | 
					 | 
				
			||||||
                except Exception as e:
 | 
					 | 
				
			||||||
                    logger.error(f"Failed to POST labels to issue #{num}: {e}")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            # verification
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                final = self.issue_api.issue_get_issue(self.org_name, repo_name, num)
 | 
					 | 
				
			||||||
                final_labels = _extract_label_names(final)
 | 
					 | 
				
			||||||
            except Exception:
 | 
					 | 
				
			||||||
                final_labels = []
 | 
					 | 
				
			||||||
            if label_name in (final_labels or []):
 | 
					 | 
				
			||||||
                logger.info(
 | 
					 | 
				
			||||||
                    f"Label '{label_name}' attached to issue #{num} in {repo_name}"
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
            else:
 | 
					 | 
				
			||||||
                logger.warning(
 | 
					 | 
				
			||||||
                    f"Label '{label_name}' not attached to issue #{num} in {repo_name} after attempts"
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def delete_label(
 | 
					 | 
				
			||||||
        self,
 | 
					 | 
				
			||||||
        repo_name: str,
 | 
					 | 
				
			||||||
        label_name: str,
 | 
					 | 
				
			||||||
        issue_numbers: Optional[List[int]] = None,
 | 
					 | 
				
			||||||
        delete_repo_label: bool = False,
 | 
					 | 
				
			||||||
    ) -> None:
 | 
					 | 
				
			||||||
        token = (
 | 
					 | 
				
			||||||
            getattr(self.api_client.configuration, "api_key", {}).get("access_token")
 | 
					 | 
				
			||||||
            or settings.gitea_access_token
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
        headers_local = {"Authorization": f"token {token}"}
 | 
					 | 
				
			||||||
        repo_labels: List[Any] = []
 | 
					 | 
				
			||||||
        try:
 | 
					 | 
				
			||||||
            repo_labels = list(
 | 
					 | 
				
			||||||
                cast(
 | 
					 | 
				
			||||||
                    Iterable[Any],
 | 
					 | 
				
			||||||
                    self.issue_api.issue_list_labels(self.org_name, repo_name),
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
            label_name_to_id: Dict[str, int] = {}
 | 
					 | 
				
			||||||
            for l in repo_labels:
 | 
					 | 
				
			||||||
                name = getattr(l, "name", None) or (
 | 
					 | 
				
			||||||
                    l.get("name") if isinstance(l, dict) else None
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
                lid = getattr(l, "id", None) or (
 | 
					 | 
				
			||||||
                    l.get("id") if isinstance(l, dict) else None
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
                if isinstance(name, str) and isinstance(lid, int):
 | 
					 | 
				
			||||||
                    label_name_to_id[name] = lid
 | 
					 | 
				
			||||||
        except Exception:
 | 
					 | 
				
			||||||
            label_name_to_id = {}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        def _delete_issue_label(issue_num: int) -> None:
 | 
					 | 
				
			||||||
            lid = label_name_to_id.get(label_name)
 | 
					 | 
				
			||||||
            if lid is None:
 | 
					 | 
				
			||||||
                try:
 | 
					 | 
				
			||||||
                    for l in repo_labels:
 | 
					 | 
				
			||||||
                        name = getattr(l, "name", None) or (
 | 
					 | 
				
			||||||
                            l.get("name") if isinstance(l, dict) else None
 | 
					 | 
				
			||||||
                        )
 | 
					 | 
				
			||||||
                        if name == label_name:
 | 
					 | 
				
			||||||
                            lid = getattr(l, "id", None) or (
 | 
					 | 
				
			||||||
                                l.get("id") if isinstance(l, dict) else None
 | 
					 | 
				
			||||||
                            )
 | 
					 | 
				
			||||||
                            break
 | 
					 | 
				
			||||||
                except Exception:
 | 
					 | 
				
			||||||
                    logger.warning(
 | 
					 | 
				
			||||||
                        f"Could not determine id of label '{label_name}' in {repo_name}"
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            if lid is None:
 | 
					 | 
				
			||||||
                logger.warning(
 | 
					 | 
				
			||||||
                    f"No numeric id found for label '{label_name}' in {repo_name}; skipping issue-level delete for issue #{issue_num}"
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
                return
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            path_id = (
 | 
					 | 
				
			||||||
                f"/repos/{self.org_name}/{repo_name}/issues/{issue_num}/labels/{lid}"
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
            url_id = f"{self.api_client.configuration.host}{path_id}"
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                resp = requests.delete(url_id, headers=headers_local, timeout=10)
 | 
					 | 
				
			||||||
                if resp.status_code in (200, 204):
 | 
					 | 
				
			||||||
                    logger.info(
 | 
					 | 
				
			||||||
                        f"Removed label '{label_name}' from {repo_name}#{issue_num}"
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                    return
 | 
					 | 
				
			||||||
                logger.warning(
 | 
					 | 
				
			||||||
                    f"Numeric-id DELETE returned status {resp.status_code} for {repo_name}#{issue_num}: body={getattr(resp, 'text', None)}"
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
            except Exception as e:
 | 
					 | 
				
			||||||
                logger.error(
 | 
					 | 
				
			||||||
                    f"Numeric-id DELETE error for {repo_name}#{issue_num}: {e}"
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        def _delete_repo_label() -> None:
 | 
					 | 
				
			||||||
            enc_name = quote(label_name, safe="")
 | 
					 | 
				
			||||||
            path = f"/repos/{self.org_name}/{repo_name}/labels/{enc_name}"
 | 
					 | 
				
			||||||
            url = f"{self.api_client.configuration.host}{path}"
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                resp = requests.delete(url, headers=headers_local, timeout=10)
 | 
					 | 
				
			||||||
                if resp.status_code in (200, 204):
 | 
					 | 
				
			||||||
                    logger.info(
 | 
					 | 
				
			||||||
                        f"Removed repo-level label '{label_name}' from {repo_name}"
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                    return
 | 
					 | 
				
			||||||
                logger.error(
 | 
					 | 
				
			||||||
                    f"Failed to delete repo label '{label_name}' from {repo_name}: status={resp.status_code}, body={getattr(resp, 'text', None)}"
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
            except Exception as e:
 | 
					 | 
				
			||||||
                logger.error(
 | 
					 | 
				
			||||||
                    f"Error deleting repo label '{label_name}' from {repo_name}: {e}"
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        if issue_numbers:
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                issues = {
 | 
					 | 
				
			||||||
                    issue.number: issue
 | 
					 | 
				
			||||||
                    for issue in list_all(
 | 
					 | 
				
			||||||
                        self.issue_api.issue_list_issues, self.org_name, repo_name
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                }
 | 
					 | 
				
			||||||
            except ApiException as e:
 | 
					 | 
				
			||||||
                logger.error(f"Failed to list issues for {repo_name}: {e}")
 | 
					 | 
				
			||||||
                return
 | 
					 | 
				
			||||||
            for num in issue_numbers:
 | 
					 | 
				
			||||||
                if num not in issues:
 | 
					 | 
				
			||||||
                    logger.warning(f"Issue #{num} not found in {repo_name}")
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
                _delete_issue_label(num)
 | 
					 | 
				
			||||||
        else:
 | 
					 | 
				
			||||||
            if delete_repo_label:
 | 
					 | 
				
			||||||
                _delete_repo_label()
 | 
					 | 
				
			||||||
            else:
 | 
					 | 
				
			||||||
                logger.warning(
 | 
					 | 
				
			||||||
                    "No issue numbers provided and --repo not set; nothing to do for delete_label"
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
if __name__ == "__main__":
 | 
					if __name__ == "__main__":
 | 
				
			||||||
    gitea = Gitea()
 | 
					    gitea = Gitea()
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -103,109 +103,6 @@ class Mattermost:
 | 
				
			||||||
                    )
 | 
					                    )
 | 
				
			||||||
                logger.info(f"Added member {member} to channel {channel_name}")
 | 
					                logger.info(f"Added member {member} to channel {channel_name}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def update_channels_for_groups(
 | 
					 | 
				
			||||||
        self,
 | 
					 | 
				
			||||||
        groups: Dict[str, List[str]],
 | 
					 | 
				
			||||||
        suffix: str = "",
 | 
					 | 
				
			||||||
        update_teaching_team: bool = True,
 | 
					 | 
				
			||||||
        dry_run: bool = False,
 | 
					 | 
				
			||||||
    ) -> None:
 | 
					 | 
				
			||||||
        for group_name, members in groups.items():
 | 
					 | 
				
			||||||
            channel_name = group_name + suffix
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                channel = self.endpoint.channels.get_channel_by_name(
 | 
					 | 
				
			||||||
                    self.team["id"], channel_name
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
                logger.info(f"Channel {channel_name} exists, updating members")
 | 
					 | 
				
			||||||
            except Exception:
 | 
					 | 
				
			||||||
                # channel does not exist
 | 
					 | 
				
			||||||
                if dry_run:
 | 
					 | 
				
			||||||
                    info_members = list(members)
 | 
					 | 
				
			||||||
                    if update_teaching_team:
 | 
					 | 
				
			||||||
                        info_members = info_members + settings.mattermost_teaching_team
 | 
					 | 
				
			||||||
                    logger.info(
 | 
					 | 
				
			||||||
                        f"Dry run: would create channel {channel_name} and add members: {info_members}"
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
                try:
 | 
					 | 
				
			||||||
                    channel = self.endpoint.channels.create_channel(
 | 
					 | 
				
			||||||
                        {
 | 
					 | 
				
			||||||
                            "team_id": self.team["id"],
 | 
					 | 
				
			||||||
                            "name": channel_name,
 | 
					 | 
				
			||||||
                            "display_name": channel_name,
 | 
					 | 
				
			||||||
                            "type": "P",
 | 
					 | 
				
			||||||
                        }
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                    logger.info(f"Created channel {channel_name} on Mattermost")
 | 
					 | 
				
			||||||
                except Exception as e:
 | 
					 | 
				
			||||||
                    logger.warning(
 | 
					 | 
				
			||||||
                        f"Error when creating channel {channel_name}: {e} Perhaps channel already exists?"
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            current_members = set()
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                mm_members = (
 | 
					 | 
				
			||||||
                    self.endpoint.channels.get_channel_members(channel["id"]) or []
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
                for m in mm_members:
 | 
					 | 
				
			||||||
                    uname = None
 | 
					 | 
				
			||||||
                    if isinstance(m, dict):
 | 
					 | 
				
			||||||
                        uname = m.get("username") or m.get("name")
 | 
					 | 
				
			||||||
                        if not uname and "user" in m and isinstance(m["user"], dict):
 | 
					 | 
				
			||||||
                            uname = m["user"].get("username") or m["user"].get("name")
 | 
					 | 
				
			||||||
                        if not uname and "user_id" in m:
 | 
					 | 
				
			||||||
                            try:
 | 
					 | 
				
			||||||
                                u = self.endpoint.users.get_user(m["user_id"]) or {}
 | 
					 | 
				
			||||||
                                if isinstance(u, dict):
 | 
					 | 
				
			||||||
                                    uname = u.get("username") or u.get("name")
 | 
					 | 
				
			||||||
                            except Exception:
 | 
					 | 
				
			||||||
                                uname = None
 | 
					 | 
				
			||||||
                    if uname:
 | 
					 | 
				
			||||||
                        current_members.add(uname.lower())
 | 
					 | 
				
			||||||
            except Exception:
 | 
					 | 
				
			||||||
                current_members = set()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            add_members = list(members)
 | 
					 | 
				
			||||||
            if update_teaching_team:
 | 
					 | 
				
			||||||
                add_members = add_members + settings.mattermost_teaching_team
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            for member in add_members:
 | 
					 | 
				
			||||||
                if member.lower() in current_members:
 | 
					 | 
				
			||||||
                    logger.debug(f"Member {member} already in channel {channel_name}")
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
                if dry_run:
 | 
					 | 
				
			||||||
                    logger.info(
 | 
					 | 
				
			||||||
                        f"Dry run: would add member {member} to channel {channel_name}"
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
                try:
 | 
					 | 
				
			||||||
                    mmuser = self.endpoint.users.get_user_by_username(member)
 | 
					 | 
				
			||||||
                except Exception:
 | 
					 | 
				
			||||||
                    logger.warning(
 | 
					 | 
				
			||||||
                        f"User {member} is not found on the Mattermost server"
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                    self.endpoint.posts.create_post(
 | 
					 | 
				
			||||||
                        {
 | 
					 | 
				
			||||||
                            "channel_id": channel["id"],
 | 
					 | 
				
			||||||
                            "message": f"User {member} is not found on the Mattermost server",
 | 
					 | 
				
			||||||
                        }
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
                try:
 | 
					 | 
				
			||||||
                    self.endpoint.channels.add_user(
 | 
					 | 
				
			||||||
                        channel["id"], {"user_id": mmuser["id"]}
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                except Exception:
 | 
					 | 
				
			||||||
                    logger.warning(f"User {member} is not in the team")
 | 
					 | 
				
			||||||
                    self.endpoint.posts.create_post(
 | 
					 | 
				
			||||||
                        {
 | 
					 | 
				
			||||||
                            "channel_id": channel["id"],
 | 
					 | 
				
			||||||
                            "message": f"User {member} is not in the team",
 | 
					 | 
				
			||||||
                        }
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                logger.info(f"Added member {member} to channel {channel_name}")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def create_channels_for_individuals(
 | 
					    def create_channels_for_individuals(
 | 
				
			||||||
        self,
 | 
					        self,
 | 
				
			||||||
        students: PaginatedList,
 | 
					        students: PaginatedList,
 | 
				
			||||||
| 
						 | 
					@ -269,110 +166,6 @@ class Mattermost:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                logger.info(f"Added member {member} to channel {channel_name}")
 | 
					                logger.info(f"Added member {member} to channel {channel_name}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def update_channels_for_individuals(
 | 
					 | 
				
			||||||
        self,
 | 
					 | 
				
			||||||
        students: PaginatedList,
 | 
					 | 
				
			||||||
        update_teaching_team: bool = True,
 | 
					 | 
				
			||||||
        dry_run: bool = False,
 | 
					 | 
				
			||||||
    ) -> None:
 | 
					 | 
				
			||||||
        for student in students:
 | 
					 | 
				
			||||||
            display_name = student.name
 | 
					 | 
				
			||||||
            channel_name = student.sis_id
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                channel = self.endpoint.channels.get_channel_by_name(
 | 
					 | 
				
			||||||
                    self.team["id"], channel_name
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
                logger.info(f"Channel {channel_name} exists, updating members")
 | 
					 | 
				
			||||||
            except Exception:
 | 
					 | 
				
			||||||
                if dry_run:
 | 
					 | 
				
			||||||
                    members_info = [student.login_id]
 | 
					 | 
				
			||||||
                    if update_teaching_team:
 | 
					 | 
				
			||||||
                        members_info = members_info + settings.mattermost_teaching_team
 | 
					 | 
				
			||||||
                    logger.info(
 | 
					 | 
				
			||||||
                        f"Dry run: would create channel {display_name} ({channel_name}) and add members: {members_info}"
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
                try:
 | 
					 | 
				
			||||||
                    channel = self.endpoint.channels.create_channel(
 | 
					 | 
				
			||||||
                        {
 | 
					 | 
				
			||||||
                            "team_id": self.team["id"],
 | 
					 | 
				
			||||||
                            "name": channel_name,
 | 
					 | 
				
			||||||
                            "display_name": display_name,
 | 
					 | 
				
			||||||
                            "type": "P",
 | 
					 | 
				
			||||||
                        }
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                    logger.info(
 | 
					 | 
				
			||||||
                        f"Created channel {display_name} ({channel_name}) on Mattermost"
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                except Exception as e:
 | 
					 | 
				
			||||||
                    logger.warning(
 | 
					 | 
				
			||||||
                        f"Error when creating channel {channel_name}: {e} Perhaps channel already exists?"
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            current_members = set()
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                mm_members = (
 | 
					 | 
				
			||||||
                    self.endpoint.channels.get_channel_members(channel["id"]) or []
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
                for m in mm_members:
 | 
					 | 
				
			||||||
                    uname = None
 | 
					 | 
				
			||||||
                    if isinstance(m, dict):
 | 
					 | 
				
			||||||
                        uname = m.get("username") or m.get("name")
 | 
					 | 
				
			||||||
                        if not uname and "user" in m and isinstance(m["user"], dict):
 | 
					 | 
				
			||||||
                            uname = m["user"].get("username") or m["user"].get("name")
 | 
					 | 
				
			||||||
                        if not uname and "user_id" in m:
 | 
					 | 
				
			||||||
                            try:
 | 
					 | 
				
			||||||
                                u = self.endpoint.users.get_user(m["user_id"]) or {}
 | 
					 | 
				
			||||||
                                if isinstance(u, dict):
 | 
					 | 
				
			||||||
                                    uname = u.get("username") or u.get("name")
 | 
					 | 
				
			||||||
                            except Exception:
 | 
					 | 
				
			||||||
                                uname = None
 | 
					 | 
				
			||||||
                    if uname:
 | 
					 | 
				
			||||||
                        current_members.add(uname.lower())
 | 
					 | 
				
			||||||
            except Exception:
 | 
					 | 
				
			||||||
                current_members = set()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            members = [student.login_id]
 | 
					 | 
				
			||||||
            if update_teaching_team:
 | 
					 | 
				
			||||||
                members = members + settings.mattermost_teaching_team
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            for member in members:
 | 
					 | 
				
			||||||
                if member.lower() in current_members:
 | 
					 | 
				
			||||||
                    logger.debug(f"Member {member} already in channel {channel_name}")
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
                if dry_run:
 | 
					 | 
				
			||||||
                    logger.info(
 | 
					 | 
				
			||||||
                        f"Dry run: would add member {member} to channel {channel_name}"
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
                try:
 | 
					 | 
				
			||||||
                    mmuser = self.endpoint.users.get_user_by_username(member)
 | 
					 | 
				
			||||||
                except Exception:
 | 
					 | 
				
			||||||
                    logger.warning(
 | 
					 | 
				
			||||||
                        f"User {member} is not found on the Mattermost server"
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                    self.endpoint.posts.create_post(
 | 
					 | 
				
			||||||
                        {
 | 
					 | 
				
			||||||
                            "channel_id": channel["id"],
 | 
					 | 
				
			||||||
                            "message": f"User {member} is not found on the Mattermost server",
 | 
					 | 
				
			||||||
                        }
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
                try:
 | 
					 | 
				
			||||||
                    self.endpoint.channels.add_user(
 | 
					 | 
				
			||||||
                        channel["id"], {"user_id": mmuser["id"]}
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                except Exception:
 | 
					 | 
				
			||||||
                    logger.warning(f"User {member} is not in the team")
 | 
					 | 
				
			||||||
                    self.endpoint.posts.create_post(
 | 
					 | 
				
			||||||
                        {
 | 
					 | 
				
			||||||
                            "channel_id": channel["id"],
 | 
					 | 
				
			||||||
                            "message": f"User {member} is not in the team",
 | 
					 | 
				
			||||||
                        }
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                logger.info(f"Added member {member} to channel {channel_name}")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def create_webhooks_for_repos(
 | 
					    def create_webhooks_for_repos(
 | 
				
			||||||
        self, repos: List[str], gitea: Gitea, gitea_suffix: bool
 | 
					        self, repos: List[str], gitea: Gitea, gitea_suffix: bool
 | 
				
			||||||
    ) -> None:
 | 
					    ) -> None:
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue
	
	Block a user