forked from JOJ/Joint-Teapot
		
	Compare commits
	
		
			No commits in common. "594337a54d2bc695bb5a5d23053aef757910340f" and "b64ef1198ec808d1f3d6db64dfc412a9fe071fb5" have entirely different histories.
		
	
	
		
			594337a54d
			...
			b64ef1198e
		
	
		
							
								
								
									
										39
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										39
									
								
								README.md
									
									
									
									
									
								
							| 
						 | 
					@ -60,24 +60,11 @@ clone all gitea repos to local
 | 
				
			||||||
 | 
					
 | 
				
			||||||
close all issues and pull requests in gitea organization
 | 
					close all issues and pull requests in gitea organization
 | 
				
			||||||
 | 
					
 | 
				
			||||||
### `create-group-channels-on-mm`
 | 
					### `create-channels-on-mm`
 | 
				
			||||||
 | 
					
 | 
				
			||||||
create Mattermost channels for student groups based on team information on Gitea
 | 
					create channels for student groups according to group information on gitea. Optionally specify a prefix to ignore all repos whose names do not start with it. Optionally specify a suffix to add to all channels created.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
**Options**:
 | 
					Example: `python3 -m joint_teapot create-channels-on-mm --prefix p1 --suffix -private --invite-teaching-team` will fetch all repos whose names start with `"p1"` and create channels on mm for these repos like "p1team1-private". Members of a repo will be added to the corresponding channel. And teaching team (adjust in `.env`) will be invited to the channels.
 | 
				
			||||||
- `--prefix TEXT`: Only process repositories starting with this prefix
 | 
					 | 
				
			||||||
- `--suffix TEXT`: Add suffix to created channels
 | 
					 | 
				
			||||||
- `--invite-teaching-team/--no-invite-teaching-team`: Whether to invite teaching team (default: invite)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
Example: `joint-teapot create-group-channels-on-mm --prefix "hteam" --suffix "-gitea"` will Create channels for webhook integration. Members of "hteam*" repo will be added to the corresponding channel. And teaching team (adjust in `.env`) will be invited to the channels.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
### `create-personal-channels-on-mm`
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
create personal Mattermost channels for every student
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
**Options**:
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
- `--invite-teaching-team/--no-invite-teaching-team`: Whether to invite teaching team (default: invite)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
### `create-comment`
 | 
					### `create-comment`
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -138,26 +125,6 @@ Example: `python3 -m joint_teapot unsubscribe-from-repos '\d{12}$'` will remove
 | 
				
			||||||
 | 
					
 | 
				
			||||||
upload assignment grades to canvas from grade file (GRADE.txt by default), read the first line as grade, the rest as comments
 | 
					upload assignment grades to canvas from grade file (GRADE.txt by default), read the first line as grade, the rest as comments
 | 
				
			||||||
 | 
					
 | 
				
			||||||
### `label-issues`
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
add a label to specific issues in a repository, create labels if not exist, with dry-run disabled by default. You may adjust the color of the label with `--color "#******"` if the label doesn't exist.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
### `delete-labels`
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
remove a label from specific issues or delete the repository labels, with dry-run disabled by default.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
### `close-issues`
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
close one or more specific issues in a repository, with dry-run disabled by default.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
### `update-group-channels-on-mm`
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
update Mattermost channels for student groups based on team information on Gitea. It will only add missing users, never delete anyone.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
### `update-personal-channels-on-mm`
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
update personal Mattermost channels for every student. It will only add missing users, never delete anyone.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
## License
 | 
					## License
 | 
				
			||||||
 | 
					
 | 
				
			||||||
MIT
 | 
					MIT
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -167,57 +167,6 @@ def close_all_issues() -> None:
 | 
				
			||||||
    tea.pot.gitea.close_all_issues()
 | 
					    tea.pot.gitea.close_all_issues()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@app.command(
 | 
					 | 
				
			||||||
    "close-issues",
 | 
					 | 
				
			||||||
    help="close specific issues in a repository: joint-teapot close-issues REPO_NAME ISSUE_NUMBER [ISSUE_NUMBER ...]",
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
def close_issues(
 | 
					 | 
				
			||||||
    repo_name: str,
 | 
					 | 
				
			||||||
    issue_numbers: List[int] = Argument(..., help="One or more issue numbers to close"),
 | 
					 | 
				
			||||||
    dry_run: bool = Option(False, "--dry-run/--no-dry-run", help="Enable dry run (no changes will be made)"),
 | 
					 | 
				
			||||||
) -> None:
 | 
					 | 
				
			||||||
    tea.pot.gitea.close_issues(repo_name, issue_numbers, dry_run)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
@app.command(
 | 
					 | 
				
			||||||
    "label-issues",
 | 
					 | 
				
			||||||
    help="add a label to specific issues in a repository: joint-teapot label-issues TAG_NAME REPO_NAME ISSUE_NUMBER [ISSUE_NUMBER ...]",
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
def label_issues(
 | 
					 | 
				
			||||||
    label_name: str,
 | 
					 | 
				
			||||||
    repo_name: str,
 | 
					 | 
				
			||||||
    issue_numbers: List[int] = Argument(..., help="One or more issue numbers to label"),
 | 
					 | 
				
			||||||
    issue_color: Optional[str] = Option(None, "--color", help="Color for newly created label (hex without # or with #)"),
 | 
					 | 
				
			||||||
    dry_run: bool = Option(False, "--dry-run/--no-dry-run", help="Dry run: show what would be labeled without making changes"),
 | 
					 | 
				
			||||||
) -> None:
 | 
					 | 
				
			||||||
    if dry_run:
 | 
					 | 
				
			||||||
        logger.info(f"Dry run: would add label '{label_name}' to {repo_name} issues: {issue_numbers}")
 | 
					 | 
				
			||||||
        return
 | 
					 | 
				
			||||||
    tea.pot.gitea.label_issues(repo_name, label_name, issue_numbers, color=issue_color)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
@app.command(
 | 
					 | 
				
			||||||
    "delete-label",
 | 
					 | 
				
			||||||
    help="remove a label from specific issues or delete the repository label: joint-teapot delete-label TAG_NAME REPO_NAME [ISSUE_NUMBER ...]",
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
def delete_label(
 | 
					 | 
				
			||||||
    label_name: str,
 | 
					 | 
				
			||||||
    repo_name: str,
 | 
					 | 
				
			||||||
    issue_numbers: List[int] = Argument(None, help="Zero or more issue numbers to remove the label from (if empty and --repo is set, delete repo label)"),
 | 
					 | 
				
			||||||
    repo: bool = Option(False, "--repo", help="Delete the repo-level label when set and issue_numbers is empty"),
 | 
					 | 
				
			||||||
    dry_run: bool = Option(False, "--dry-run/--no-dry-run", help="Dry run: show what would be removed without making changes"),
 | 
					 | 
				
			||||||
) -> None:
 | 
					 | 
				
			||||||
    if dry_run:
 | 
					 | 
				
			||||||
        if issue_numbers:
 | 
					 | 
				
			||||||
            logger.info(f"Dry run: would remove label '{label_name}' from {repo_name} issues: {issue_numbers}")
 | 
					 | 
				
			||||||
        elif repo:
 | 
					 | 
				
			||||||
            logger.info(f"Dry run: would delete repo label '{label_name}' from {repo_name}")
 | 
					 | 
				
			||||||
        else:
 | 
					 | 
				
			||||||
            logger.info("Dry run: no action specified (provide issue numbers or --repo to delete repo label)")
 | 
					 | 
				
			||||||
        return
 | 
					 | 
				
			||||||
    tea.pot.gitea.delete_label(repo_name, label_name, issue_numbers if issue_numbers else None, delete_repo_label=repo)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
@app.command(
 | 
					@app.command(
 | 
				
			||||||
    "archive-repos", help="archive repos in gitea organization according to regex"
 | 
					    "archive-repos", help="archive repos in gitea organization according to regex"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
| 
						 | 
					@ -294,36 +243,6 @@ def create_personal_channels_on_mm(
 | 
				
			||||||
    tea.pot.create_channels_for_individuals(invite_teaching_team)
 | 
					    tea.pot.create_channels_for_individuals(invite_teaching_team)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@app.command(
 | 
					 | 
				
			||||||
    "update-group-channels-on-mm",
 | 
					 | 
				
			||||||
    help="update Mattermost channels for student groups based on team information on Gitea; only add missing members",
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
def update_group_channels_on_mm(
 | 
					 | 
				
			||||||
    prefix: str = Option("", help="Only process repositories starting with this prefix"),
 | 
					 | 
				
			||||||
    suffix: str = Option("", help="Only process channels ending with this suffix"),
 | 
					 | 
				
			||||||
    update_teaching_team: bool = Option(True, "--update-teaching-team/--no-update-teaching-team", help="Whether to update teaching team"),
 | 
					 | 
				
			||||||
    dry_run: bool = Option(False, "--dry-run/--no-dry-run", help="Dry run: show what would be added without making changes"),
 | 
					 | 
				
			||||||
) -> None:
 | 
					 | 
				
			||||||
    groups = {
 | 
					 | 
				
			||||||
        group_name: members
 | 
					 | 
				
			||||||
        for group_name, members in tea.pot.gitea.get_all_teams().items()
 | 
					 | 
				
			||||||
        if group_name.startswith(prefix)
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
    logger.info(f"{len(groups)} group channel(s) to update" + (f" with suffix {suffix}" if suffix else ""))
 | 
					 | 
				
			||||||
    tea.pot.mattermost.update_channels_for_groups(groups, suffix, update_teaching_team, dry_run)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
@app.command(
 | 
					 | 
				
			||||||
    "update-personal-channels-on-mm",
 | 
					 | 
				
			||||||
    help="update personal Mattermost channels for every student; only add missing members",
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
def update_personal_channels_on_mm(
 | 
					 | 
				
			||||||
    update_teaching_team: bool = Option(True, "--update-teaching-team/--no-update-teaching-team", help="Whether to update teaching team"),
 | 
					 | 
				
			||||||
    dry_run: bool = Option(False, "--dry-run/--no-dry-run", help="Dry run: show what would be added without making changes"),
 | 
					 | 
				
			||||||
) -> None:
 | 
					 | 
				
			||||||
    tea.pot.mattermost.update_channels_for_individuals(tea.pot.canvas.students, update_teaching_team, dry_run)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
@app.command(
 | 
					@app.command(
 | 
				
			||||||
    "create-webhooks-for-mm",
 | 
					    "create-webhooks-for-mm",
 | 
				
			||||||
    help="create a pair of webhooks on gitea and mm for all student groups on gitea, "
 | 
					    help="create a pair of webhooks on gitea and mm for all student groups on gitea, "
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -39,19 +39,8 @@ class Canvas:
 | 
				
			||||||
            student.name = (
 | 
					            student.name = (
 | 
				
			||||||
                re.sub(r"[^\x00-\x7F]+", "", student.name).strip().title()
 | 
					                re.sub(r"[^\x00-\x7F]+", "", student.name).strip().title()
 | 
				
			||||||
            )  # We only care english name
 | 
					            )  # We only care english name
 | 
				
			||||||
            # Some users (like system users, announcers) might not have login_id
 | 
					            student.sis_id = student.login_id
 | 
				
			||||||
            if hasattr(student, 'login_id') and student.login_id:
 | 
					            student.login_id = student.email.split("@")[0]
 | 
				
			||||||
                student.sis_id = student.login_id
 | 
					 | 
				
			||||||
                student.login_id = student.email.split("@")[0]
 | 
					 | 
				
			||||||
            else:
 | 
					 | 
				
			||||||
                # For users without login_id, use email prefix as both sis_id and login_id
 | 
					 | 
				
			||||||
                if hasattr(student, 'email') and student.email:
 | 
					 | 
				
			||||||
                    student.login_id = student.email.split("@")[0]
 | 
					 | 
				
			||||||
                    student.sis_id = student.login_id
 | 
					 | 
				
			||||||
                else:
 | 
					 | 
				
			||||||
                    # Fallback for users without email
 | 
					 | 
				
			||||||
                    student.login_id = f"user_{student.id}"
 | 
					 | 
				
			||||||
                    student.sis_id = student.login_id
 | 
					 | 
				
			||||||
            return student
 | 
					            return student
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.students = [
 | 
					        self.students = [
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -4,8 +4,6 @@ from functools import lru_cache
 | 
				
			||||||
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, TypeVar
 | 
					from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, TypeVar
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import focs_gitea
 | 
					import focs_gitea
 | 
				
			||||||
import requests
 | 
					 | 
				
			||||||
from urllib.parse import quote
 | 
					 | 
				
			||||||
from canvasapi.group import Group, GroupMembership
 | 
					from canvasapi.group import Group, GroupMembership
 | 
				
			||||||
from canvasapi.paginated_list import PaginatedList
 | 
					from canvasapi.paginated_list import PaginatedList
 | 
				
			||||||
from canvasapi.user import User
 | 
					from canvasapi.user import User
 | 
				
			||||||
| 
						 | 
					@ -479,35 +477,6 @@ class Gitea:
 | 
				
			||||||
                        self.org_name, repo_name, issue.number, body={"state": "closed"}
 | 
					                        self.org_name, repo_name, issue.number, body={"state": "closed"}
 | 
				
			||||||
                    )
 | 
					                    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def close_issues(self, repo_name: str, issue_numbers: List[int], dry_run: bool = True) -> None:
 | 
					 | 
				
			||||||
        if not issue_numbers:
 | 
					 | 
				
			||||||
            logger.warning("No issue numbers provided to close")
 | 
					 | 
				
			||||||
            return
 | 
					 | 
				
			||||||
        if dry_run:
 | 
					 | 
				
			||||||
            logger.info("Dry run enabled. No changes will be made to issues.")
 | 
					 | 
				
			||||||
        try:
 | 
					 | 
				
			||||||
            issues = {issue.number: issue for issue in list_all(self.issue_api.issue_list_issues, self.org_name, repo_name)}
 | 
					 | 
				
			||||||
        except ApiException as e:
 | 
					 | 
				
			||||||
            logger.error(f"Failed to list issues for {repo_name}: {e}")
 | 
					 | 
				
			||||||
            return
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        for num in issue_numbers:
 | 
					 | 
				
			||||||
            issue = issues.get(num)
 | 
					 | 
				
			||||||
            if issue is None:
 | 
					 | 
				
			||||||
                logger.warning(f"Issue #{num} not found in {repo_name}")
 | 
					 | 
				
			||||||
                continue
 | 
					 | 
				
			||||||
            if getattr(issue, "state", "") == "closed":
 | 
					 | 
				
			||||||
                logger.info(f"Issue #{num} in {repo_name} already closed")
 | 
					 | 
				
			||||||
                continue
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                if dry_run:
 | 
					 | 
				
			||||||
                    logger.info(f"Would close issue #{num} in {repo_name} (dry run)")
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
                self.issue_api.issue_edit_issue(self.org_name, repo_name, num, body={"state": "closed"})
 | 
					 | 
				
			||||||
                logger.info(f"Closed issue #{num} in {repo_name}")
 | 
					 | 
				
			||||||
            except ApiException as e:
 | 
					 | 
				
			||||||
                logger.error(f"Failed to close issue #{num} in {repo_name}: {e}")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def archive_repos(self, regex: str = ".+", dry_run: bool = True) -> None:
 | 
					    def archive_repos(self, regex: str = ".+", dry_run: bool = True) -> None:
 | 
				
			||||||
        if dry_run:
 | 
					        if dry_run:
 | 
				
			||||||
            logger.info("Dry run enabled. No changes will be made to the repositories.")
 | 
					            logger.info("Dry run enabled. No changes will be made to the repositories.")
 | 
				
			||||||
| 
						 | 
					@ -579,212 +548,6 @@ class Gitea:
 | 
				
			||||||
            self.create_milestone(repo_name, milestone, description, due_date)
 | 
					            self.create_milestone(repo_name, milestone, description, due_date)
 | 
				
			||||||
            logger.info(f"Created milestone {milestone} in {repo_name}")
 | 
					            logger.info(f"Created milestone {milestone} in {repo_name}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def label_issues(self, repo_name: str, label_name: str, issue_numbers: List[int], color: Optional[str] = None) -> None:
 | 
					 | 
				
			||||||
        if not issue_numbers:
 | 
					 | 
				
			||||||
            logger.warning("No issue numbers provided to label")
 | 
					 | 
				
			||||||
            return
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        try:
 | 
					 | 
				
			||||||
            labels = self.issue_api.issue_list_labels(self.org_name, repo_name)
 | 
					 | 
				
			||||||
        except ApiException as e:
 | 
					 | 
				
			||||||
            logger.error(f"Failed to list labels for {repo_name}: {e}")
 | 
					 | 
				
			||||||
            return
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        label_obj = None
 | 
					 | 
				
			||||||
        for l in labels:
 | 
					 | 
				
			||||||
            if getattr(l, "name", None) == label_name:
 | 
					 | 
				
			||||||
                label_obj = l
 | 
					 | 
				
			||||||
                break
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        if not label_obj:
 | 
					 | 
				
			||||||
            chosen_color = None
 | 
					 | 
				
			||||||
            if color:
 | 
					 | 
				
			||||||
                c = color.strip()
 | 
					 | 
				
			||||||
                if c.startswith('#'):
 | 
					 | 
				
			||||||
                    c = c[1:]
 | 
					 | 
				
			||||||
                if re.fullmatch(r"[0-9a-fA-F]{6}", c):
 | 
					 | 
				
			||||||
                    chosen_color = c
 | 
					 | 
				
			||||||
                else:
 | 
					 | 
				
			||||||
                    logger.warning(f"Provided color '{color}' is not a valid 3- or 6-digit hex; falling back to default")
 | 
					 | 
				
			||||||
            if chosen_color is None:
 | 
					 | 
				
			||||||
                chosen_color = "CCCCCC"
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                label_obj = self.issue_api.issue_create_label(
 | 
					 | 
				
			||||||
                    self.org_name,
 | 
					 | 
				
			||||||
                    repo_name,
 | 
					 | 
				
			||||||
                    body={"name": label_name, "color": chosen_color, "exclusive": False},
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
                logger.info(f"Created label '{label_name}' in {self.org_name}/{repo_name}")
 | 
					 | 
				
			||||||
            except ApiException as e:
 | 
					 | 
				
			||||||
                logger.error(f"Failed to create label {label_name} in {repo_name}: {e}")
 | 
					 | 
				
			||||||
                if hasattr(e, 'body'):
 | 
					 | 
				
			||||||
                    logger.error(f"ApiException body: {getattr(e, 'body', None)}")
 | 
					 | 
				
			||||||
                return
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        label_id = getattr(label_obj, "id", None)
 | 
					 | 
				
			||||||
        if label_id is None:
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                label_id = label_obj.get("id")
 | 
					 | 
				
			||||||
            except Exception:
 | 
					 | 
				
			||||||
                label_id = None
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        if label_id is None:
 | 
					 | 
				
			||||||
            logger.error(f"Unable to determine id of label '{label_name}' in {repo_name}")
 | 
					 | 
				
			||||||
            return
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        try:
 | 
					 | 
				
			||||||
            issues = {issue.number: issue for issue in list_all(self.issue_api.issue_list_issues, self.org_name, repo_name)}
 | 
					 | 
				
			||||||
        except ApiException as e:
 | 
					 | 
				
			||||||
            logger.error(f"Failed to list issues for {repo_name}: {e}")
 | 
					 | 
				
			||||||
            return
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        for num in issue_numbers:
 | 
					 | 
				
			||||||
            issue = issues.get(num)
 | 
					 | 
				
			||||||
            if issue is None:
 | 
					 | 
				
			||||||
                logger.warning(f"Issue #{num} not found in {repo_name}")
 | 
					 | 
				
			||||||
                continue
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            def _extract_label_names_from_issue(issue_obj) -> List[str]:
 | 
					 | 
				
			||||||
                try:
 | 
					 | 
				
			||||||
                    return [getattr(l, "name", l) for l in getattr(issue_obj, "labels", []) or []]
 | 
					 | 
				
			||||||
                except Exception:
 | 
					 | 
				
			||||||
                    return []
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            existing_label_names = _extract_label_names_from_issue(issue)
 | 
					 | 
				
			||||||
            if label_name in existing_label_names:
 | 
					 | 
				
			||||||
                logger.info(f"Issue #{num} in {repo_name} already has label '{label_name}'")
 | 
					 | 
				
			||||||
                continue
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            existing_label_ids: List[int] = []
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                for l in getattr(issue, "labels", []) or []:
 | 
					 | 
				
			||||||
                    lid = getattr(l, "id", None)
 | 
					 | 
				
			||||||
                    if lid is None:
 | 
					 | 
				
			||||||
                        try:
 | 
					 | 
				
			||||||
                            lid = l.get("id")
 | 
					 | 
				
			||||||
                        except Exception:
 | 
					 | 
				
			||||||
                            lid = None
 | 
					 | 
				
			||||||
                    if lid is not None:
 | 
					 | 
				
			||||||
                        existing_label_ids.append(lid)
 | 
					 | 
				
			||||||
            except Exception:
 | 
					 | 
				
			||||||
                existing_label_ids = []
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            if label_id in existing_label_ids:
 | 
					 | 
				
			||||||
                logger.info(f"Issue #{num} in {repo_name} already has label '{label_name}' (by id)")
 | 
					 | 
				
			||||||
                continue
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            # verification
 | 
					 | 
				
			||||||
            def _fetch_issue_labels_via_api() -> List[str]:
 | 
					 | 
				
			||||||
                try:
 | 
					 | 
				
			||||||
                    single = self.issue_api.issue_get_issue(self.org_name, repo_name, num)
 | 
					 | 
				
			||||||
                    return _extract_label_names_from_issue(single)
 | 
					 | 
				
			||||||
                except Exception:
 | 
					 | 
				
			||||||
                    try:
 | 
					 | 
				
			||||||
                        updated = next((i for i in list_all(self.issue_api.issue_list_issues, self.org_name, repo_name) if getattr(i, "number", None) == num), None)
 | 
					 | 
				
			||||||
                        if updated is None:
 | 
					 | 
				
			||||||
                            return []
 | 
					 | 
				
			||||||
                        return _extract_label_names_from_issue(updated)
 | 
					 | 
				
			||||||
                    except Exception:
 | 
					 | 
				
			||||||
                        return []
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            issue_labels = _fetch_issue_labels_via_api()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            # prepare low-level HTTP helpers for fallbacks
 | 
					 | 
				
			||||||
            def _do_post_labels(issue_num: int, payload) -> requests.Response:
 | 
					 | 
				
			||||||
                path = f"/repos/{self.org_name}/{repo_name}/issues/{issue_num}/labels"
 | 
					 | 
				
			||||||
                full_url = f"{self.api_client.configuration.host}{path}"
 | 
					 | 
				
			||||||
                token = getattr(self.api_client.configuration, 'api_key', {}) .get('access_token') or settings.gitea_access_token
 | 
					 | 
				
			||||||
                headers_local = {
 | 
					 | 
				
			||||||
                    "Authorization": f"token {token}",
 | 
					 | 
				
			||||||
                    "Content-Type": "application/json",
 | 
					 | 
				
			||||||
                }
 | 
					 | 
				
			||||||
                return requests.post(full_url, headers=headers_local, json=payload, timeout=10)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            # fallback: if label still not present, POST JSON object {"labels":[name]} to add-labels endpoint
 | 
					 | 
				
			||||||
            if label_name not in (issue_labels or []):
 | 
					 | 
				
			||||||
                try:
 | 
					 | 
				
			||||||
                    resp = _do_post_labels(num, {"labels": [label_name]})
 | 
					 | 
				
			||||||
                    if resp.status_code not in (200, 201):
 | 
					 | 
				
			||||||
                        logger.error(f"Failed to add label via add-labels endpoint for issue #{num}: status={resp.status_code}")
 | 
					 | 
				
			||||||
                except Exception as e:
 | 
					 | 
				
			||||||
                    logger.error(f"Failed to POST object payload to issue #{num}: {e}")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            # final verification
 | 
					 | 
				
			||||||
            issue_labels = _fetch_issue_labels_via_api()
 | 
					 | 
				
			||||||
            if label_name in (issue_labels or []):
 | 
					 | 
				
			||||||
                logger.info(f"Label '{label_name}' attached to issue #{num} in {repo_name}")
 | 
					 | 
				
			||||||
            else:
 | 
					 | 
				
			||||||
                logger.warning(f"Label '{label_name}' not attached to issue #{num} in {repo_name} after attempts")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def delete_label(self, repo_name: str, label_name: str, issue_numbers: Optional[List[int]] = None, delete_repo_label: bool = False) -> None:
 | 
					 | 
				
			||||||
        token = getattr(self.api_client.configuration, 'api_key', {}) .get('access_token') or settings.gitea_access_token
 | 
					 | 
				
			||||||
        headers_local = {"Authorization": f"token {token}"}
 | 
					 | 
				
			||||||
        try:
 | 
					 | 
				
			||||||
            repo_labels = self.issue_api.issue_list_labels(self.org_name, repo_name)
 | 
					 | 
				
			||||||
            label_name_to_id: Dict[str, int] = {
 | 
					 | 
				
			||||||
                (getattr(l, 'name', None) or (l.get('name') if isinstance(l, dict) else None)): (getattr(l, 'id', None) or (l.get('id') if isinstance(l, dict) else None))
 | 
					 | 
				
			||||||
                for l in repo_labels
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
        except Exception:
 | 
					 | 
				
			||||||
            label_name_to_id = {}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        def _delete_issue_label(issue_num: int) -> None:
 | 
					 | 
				
			||||||
            lid = label_name_to_id.get(label_name)
 | 
					 | 
				
			||||||
            if lid is None:
 | 
					 | 
				
			||||||
                try:
 | 
					 | 
				
			||||||
                    for l in self.issue_api.issue_list_labels(self.org_name, repo_name):
 | 
					 | 
				
			||||||
                        name = getattr(l, 'name', None) or (l.get('name') if isinstance(l, dict) else None)
 | 
					 | 
				
			||||||
                        if name == label_name:
 | 
					 | 
				
			||||||
                            lid = getattr(l, 'id', None) or (l.get('id') if isinstance(l, dict) else None)
 | 
					 | 
				
			||||||
                            break
 | 
					 | 
				
			||||||
                except Exception:
 | 
					 | 
				
			||||||
                    pass
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            if lid is None:
 | 
					 | 
				
			||||||
                logger.warning(f"No numeric id found for label '{label_name}' in {repo_name}; skipping issue-level delete for issue #{issue_num}")
 | 
					 | 
				
			||||||
                return
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            path_id = f"/repos/{self.org_name}/{repo_name}/issues/{issue_num}/labels/{lid}"
 | 
					 | 
				
			||||||
            url_id = f"{self.api_client.configuration.host}{path_id}"
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                resp = requests.delete(url_id, headers=headers_local, timeout=10)
 | 
					 | 
				
			||||||
                if resp.status_code in (200, 204):
 | 
					 | 
				
			||||||
                    logger.info(f"Removed label '{label_name}' from {repo_name}#{issue_num}")
 | 
					 | 
				
			||||||
                    return
 | 
					 | 
				
			||||||
                logger.warning(f"Numeric-id DELETE returned status {resp.status_code} for {repo_name}#{issue_num}: body={getattr(resp, 'text', None)}")
 | 
					 | 
				
			||||||
            except Exception as e:
 | 
					 | 
				
			||||||
                logger.error(f"Numeric-id DELETE error for {repo_name}#{issue_num}: {e}")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        def _delete_repo_label() -> None:
 | 
					 | 
				
			||||||
            enc_name = quote(label_name, safe='')
 | 
					 | 
				
			||||||
            path = f"/repos/{self.org_name}/{repo_name}/labels/{enc_name}"
 | 
					 | 
				
			||||||
            url = f"{self.api_client.configuration.host}{path}"
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                resp = requests.delete(url, headers=headers_local, timeout=10)
 | 
					 | 
				
			||||||
                if resp.status_code in (200, 204):
 | 
					 | 
				
			||||||
                    logger.info(f"Removed repo-level label '{label_name}' from {repo_name}")
 | 
					 | 
				
			||||||
                    return
 | 
					 | 
				
			||||||
                logger.error(f"Failed to delete repo label '{label_name}' from {repo_name}: status={resp.status_code}, body={getattr(resp, 'text', None)}")
 | 
					 | 
				
			||||||
            except Exception as e:
 | 
					 | 
				
			||||||
                logger.error(f"Error deleting repo label '{label_name}' from {repo_name}: {e}")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        if issue_numbers:
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                issues = {issue.number: issue for issue in list_all(self.issue_api.issue_list_issues, self.org_name, repo_name)}
 | 
					 | 
				
			||||||
            except ApiException as e:
 | 
					 | 
				
			||||||
                logger.error(f"Failed to list issues for {repo_name}: {e}")
 | 
					 | 
				
			||||||
                return
 | 
					 | 
				
			||||||
            for num in issue_numbers:
 | 
					 | 
				
			||||||
                if num not in issues:
 | 
					 | 
				
			||||||
                    logger.warning(f"Issue #{num} not found in {repo_name}")
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
                _delete_issue_label(num)
 | 
					 | 
				
			||||||
        else:
 | 
					 | 
				
			||||||
            if delete_repo_label:
 | 
					 | 
				
			||||||
                _delete_repo_label()
 | 
					 | 
				
			||||||
            else:
 | 
					 | 
				
			||||||
                logger.warning("No issue numbers provided and --repo not set; nothing to do for delete_label")
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
if __name__ == "__main__":
 | 
					if __name__ == "__main__":
 | 
				
			||||||
    gitea = Gitea()
 | 
					    gitea = Gitea()
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -103,91 +103,6 @@ class Mattermost:
 | 
				
			||||||
                    )
 | 
					                    )
 | 
				
			||||||
                logger.info(f"Added member {member} to channel {channel_name}")
 | 
					                logger.info(f"Added member {member} to channel {channel_name}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def update_channels_for_groups(
 | 
					 | 
				
			||||||
        self,
 | 
					 | 
				
			||||||
        groups: Dict[str, List[str]],
 | 
					 | 
				
			||||||
        suffix: str = "",
 | 
					 | 
				
			||||||
        update_teaching_team: bool = True,
 | 
					 | 
				
			||||||
        dry_run: bool = False,
 | 
					 | 
				
			||||||
    ) -> None:
 | 
					 | 
				
			||||||
        for group_name, members in groups.items():
 | 
					 | 
				
			||||||
            channel_name = group_name + suffix
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                channel = self.endpoint.channels.get_channel_by_name(self.team["id"], channel_name)
 | 
					 | 
				
			||||||
                logger.info(f"Channel {channel_name} exists, updating members")
 | 
					 | 
				
			||||||
            except Exception:
 | 
					 | 
				
			||||||
                # channel does not exist
 | 
					 | 
				
			||||||
                if dry_run:
 | 
					 | 
				
			||||||
                    info_members = list(members)
 | 
					 | 
				
			||||||
                    if update_teaching_team:
 | 
					 | 
				
			||||||
                        info_members = info_members + settings.mattermost_teaching_team
 | 
					 | 
				
			||||||
                    logger.info(f"Dry run: would create channel {channel_name} and add members: {info_members}")
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
                try:
 | 
					 | 
				
			||||||
                    channel = self.endpoint.channels.create_channel(
 | 
					 | 
				
			||||||
                        {
 | 
					 | 
				
			||||||
                            "team_id": self.team["id"],
 | 
					 | 
				
			||||||
                            "name": channel_name,
 | 
					 | 
				
			||||||
                            "display_name": channel_name,
 | 
					 | 
				
			||||||
                            "type": "P",
 | 
					 | 
				
			||||||
                        }
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                    logger.info(f"Created channel {channel_name} on Mattermost")
 | 
					 | 
				
			||||||
                except Exception as e:
 | 
					 | 
				
			||||||
                    logger.warning(
 | 
					 | 
				
			||||||
                        f"Error when creating channel {channel_name}: {e} Perhaps channel already exists?"
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            current_members = set()
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                mm_members = self.endpoint.channels.get_channel_members(channel["id"]) or []
 | 
					 | 
				
			||||||
                for m in mm_members:
 | 
					 | 
				
			||||||
                    uname = None
 | 
					 | 
				
			||||||
                    if isinstance(m, dict):
 | 
					 | 
				
			||||||
                        uname = m.get("username") or m.get("name")
 | 
					 | 
				
			||||||
                        if not uname and "user" in m and isinstance(m["user"], dict):
 | 
					 | 
				
			||||||
                            uname = m["user"].get("username") or m["user"].get("name")
 | 
					 | 
				
			||||||
                        if not uname and "user_id" in m:
 | 
					 | 
				
			||||||
                            try:
 | 
					 | 
				
			||||||
                                u = self.endpoint.users.get_user(m["user_id"]) or {}
 | 
					 | 
				
			||||||
                                if isinstance(u, dict):
 | 
					 | 
				
			||||||
                                    uname = u.get("username") or u.get("name")
 | 
					 | 
				
			||||||
                            except Exception:
 | 
					 | 
				
			||||||
                                uname = None
 | 
					 | 
				
			||||||
                    if uname:
 | 
					 | 
				
			||||||
                        current_members.add(uname.lower())
 | 
					 | 
				
			||||||
            except Exception:
 | 
					 | 
				
			||||||
                current_members = set()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            add_members = list(members)
 | 
					 | 
				
			||||||
            if update_teaching_team:
 | 
					 | 
				
			||||||
                add_members = add_members + settings.mattermost_teaching_team
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            for member in add_members:
 | 
					 | 
				
			||||||
                if member.lower() in current_members:
 | 
					 | 
				
			||||||
                    logger.debug(f"Member {member} already in channel {channel_name}")
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
                if dry_run:
 | 
					 | 
				
			||||||
                    logger.info(f"Dry run: would add member {member} to channel {channel_name}")
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
                try:
 | 
					 | 
				
			||||||
                    mmuser = self.endpoint.users.get_user_by_username(member)
 | 
					 | 
				
			||||||
                except Exception:
 | 
					 | 
				
			||||||
                    logger.warning(f"User {member} is not found on the Mattermost server")
 | 
					 | 
				
			||||||
                    self.endpoint.posts.create_post(
 | 
					 | 
				
			||||||
                        {"channel_id": channel["id"], "message": f"User {member} is not found on the Mattermost server"}
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
                try:
 | 
					 | 
				
			||||||
                    self.endpoint.channels.add_user(channel["id"], {"user_id": mmuser["id"]})
 | 
					 | 
				
			||||||
                except Exception:
 | 
					 | 
				
			||||||
                    logger.warning(f"User {member} is not in the team")
 | 
					 | 
				
			||||||
                    self.endpoint.posts.create_post(
 | 
					 | 
				
			||||||
                        {"channel_id": channel["id"], "message": f"User {member} is not in the team"}
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                logger.info(f"Added member {member} to channel {channel_name}")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def create_channels_for_individuals(
 | 
					    def create_channels_for_individuals(
 | 
				
			||||||
        self,
 | 
					        self,
 | 
				
			||||||
        students: PaginatedList,
 | 
					        students: PaginatedList,
 | 
				
			||||||
| 
						 | 
					@ -251,90 +166,6 @@ class Mattermost:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                logger.info(f"Added member {member} to channel {channel_name}")
 | 
					                logger.info(f"Added member {member} to channel {channel_name}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def update_channels_for_individuals(
 | 
					 | 
				
			||||||
        self,
 | 
					 | 
				
			||||||
        students: PaginatedList,
 | 
					 | 
				
			||||||
        update_teaching_team: bool = True,
 | 
					 | 
				
			||||||
        dry_run: bool = False,
 | 
					 | 
				
			||||||
    ) -> None:
 | 
					 | 
				
			||||||
        for student in students:
 | 
					 | 
				
			||||||
            display_name = student.name
 | 
					 | 
				
			||||||
            channel_name = student.sis_id
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                channel = self.endpoint.channels.get_channel_by_name(self.team["id"], channel_name)
 | 
					 | 
				
			||||||
                logger.info(f"Channel {channel_name} exists, updating members")
 | 
					 | 
				
			||||||
            except Exception:
 | 
					 | 
				
			||||||
                if dry_run:
 | 
					 | 
				
			||||||
                    members_info = [student.login_id]
 | 
					 | 
				
			||||||
                    if update_teaching_team:
 | 
					 | 
				
			||||||
                        members_info = members_info + settings.mattermost_teaching_team
 | 
					 | 
				
			||||||
                    logger.info(f"Dry run: would create channel {display_name} ({channel_name}) and add members: {members_info}")
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
                try:
 | 
					 | 
				
			||||||
                    channel = self.endpoint.channels.create_channel(
 | 
					 | 
				
			||||||
                        {
 | 
					 | 
				
			||||||
                            "team_id": self.team["id"],
 | 
					 | 
				
			||||||
                            "name": channel_name,
 | 
					 | 
				
			||||||
                            "display_name": display_name,
 | 
					 | 
				
			||||||
                            "type": "P",
 | 
					 | 
				
			||||||
                        }
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                    logger.info(f"Created channel {display_name} ({channel_name}) on Mattermost")
 | 
					 | 
				
			||||||
                except Exception as e:
 | 
					 | 
				
			||||||
                    logger.warning(
 | 
					 | 
				
			||||||
                        f"Error when creating channel {channel_name}: {e} Perhaps channel already exists?"
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            current_members = set()
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                mm_members = self.endpoint.channels.get_channel_members(channel["id"]) or []
 | 
					 | 
				
			||||||
                for m in mm_members:
 | 
					 | 
				
			||||||
                    uname = None
 | 
					 | 
				
			||||||
                    if isinstance(m, dict):
 | 
					 | 
				
			||||||
                        uname = m.get("username") or m.get("name")
 | 
					 | 
				
			||||||
                        if not uname and "user" in m and isinstance(m["user"], dict):
 | 
					 | 
				
			||||||
                            uname = m["user"].get("username") or m["user"].get("name")
 | 
					 | 
				
			||||||
                        if not uname and "user_id" in m:
 | 
					 | 
				
			||||||
                            try:
 | 
					 | 
				
			||||||
                                u = self.endpoint.users.get_user(m["user_id"]) or {}
 | 
					 | 
				
			||||||
                                if isinstance(u, dict):
 | 
					 | 
				
			||||||
                                    uname = u.get("username") or u.get("name")
 | 
					 | 
				
			||||||
                            except Exception:
 | 
					 | 
				
			||||||
                                uname = None
 | 
					 | 
				
			||||||
                    if uname:
 | 
					 | 
				
			||||||
                        current_members.add(uname.lower())
 | 
					 | 
				
			||||||
            except Exception:
 | 
					 | 
				
			||||||
                current_members = set()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            members = [student.login_id]
 | 
					 | 
				
			||||||
            if update_teaching_team:
 | 
					 | 
				
			||||||
                members = members + settings.mattermost_teaching_team
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            for member in members:
 | 
					 | 
				
			||||||
                if member.lower() in current_members:
 | 
					 | 
				
			||||||
                    logger.debug(f"Member {member} already in channel {channel_name}")
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
                if dry_run:
 | 
					 | 
				
			||||||
                    logger.info(f"Dry run: would add member {member} to channel {channel_name}")
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
                try:
 | 
					 | 
				
			||||||
                    mmuser = self.endpoint.users.get_user_by_username(member)
 | 
					 | 
				
			||||||
                except Exception:
 | 
					 | 
				
			||||||
                    logger.warning(f"User {member} is not found on the Mattermost server")
 | 
					 | 
				
			||||||
                    self.endpoint.posts.create_post(
 | 
					 | 
				
			||||||
                        {"channel_id": channel["id"], "message": f"User {member} is not found on the Mattermost server"}
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
                try:
 | 
					 | 
				
			||||||
                    self.endpoint.channels.add_user(channel["id"], {"user_id": mmuser["id"]})
 | 
					 | 
				
			||||||
                except Exception:
 | 
					 | 
				
			||||||
                    logger.warning(f"User {member} is not in the team")
 | 
					 | 
				
			||||||
                    self.endpoint.posts.create_post(
 | 
					 | 
				
			||||||
                        {"channel_id": channel["id"], "message": f"User {member} is not in the team"}
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                logger.info(f"Added member {member} to channel {channel_name}")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def create_webhooks_for_repos(
 | 
					    def create_webhooks_for_repos(
 | 
				
			||||||
        self, repos: List[str], gitea: Gitea, gitea_suffix: bool
 | 
					        self, repos: List[str], gitea: Gitea, gitea_suffix: bool
 | 
				
			||||||
    ) -> None:
 | 
					    ) -> None:
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue
	
	Block a user