feat: new commands in mm channels and gitea issues #6

Open
蔡雨翔524370910013 wants to merge 12 commits from arthurcai/Joint-Teapot:master into master
6 changed files with 793 additions and 18 deletions

View File

@ -1,41 +1,41 @@
repos: repos:
- repo: https://github.com/pre-commit/pre-commit-hooks - repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0 rev: v6.0.0
hooks: hooks:
- id: check-yaml - id: check-yaml
- id: end-of-file-fixer - id: end-of-file-fixer
- id: trailing-whitespace - id: trailing-whitespace
- id: requirements-txt-fixer - id: requirements-txt-fixer
- repo: https://github.com/pre-commit/mirrors-mypy - repo: https://github.com/pre-commit/mirrors-mypy
rev: "v1.4.1" rev: "v1.18.2"
hooks: hooks:
- id: mypy - id: mypy
additional_dependencies: additional_dependencies:
- pydantic - pydantic
- repo: https://github.com/asottile/pyupgrade - repo: https://github.com/asottile/pyupgrade
rev: v3.9.0 rev: v3.20.0
hooks: hooks:
- id: pyupgrade - id: pyupgrade
- repo: https://github.com/hadialqattan/pycln - repo: https://github.com/hadialqattan/pycln
rev: v2.4.0 rev: v2.5.0
hooks: hooks:
- id: pycln - id: pycln
args: [-a] args: [-a]
- repo: https://github.com/PyCQA/bandit - repo: https://github.com/PyCQA/bandit
rev: '1.7.5' rev: '1.8.6'
hooks: hooks:
- id: bandit - id: bandit
- repo: https://github.com/PyCQA/isort - repo: https://github.com/PyCQA/isort
rev: 5.12.0 rev: 6.0.1
hooks: hooks:
- id: isort - id: isort
args: ["--profile", "black", "--filter-files"] args: ["--profile", "black", "--filter-files"]
- repo: https://github.com/psf/black - repo: https://github.com/psf/black
rev: 23.7.0 rev: 25.9.0
hooks: hooks:
- id: black - id: black
- repo: https://github.com/Lucas-C/pre-commit-hooks - repo: https://github.com/Lucas-C/pre-commit-hooks
rev: v1.5.1 rev: v1.5.5
hooks: hooks:
- id: remove-crlf - id: remove-crlf
- id: remove-tabs - id: remove-tabs

View File

@ -60,11 +60,24 @@ clone all gitea repos to local
close all issues and pull requests in gitea organization close all issues and pull requests in gitea organization
### `create-channels-on-mm` ### `create-group-channels-on-mm`
create channels for student groups according to group information on gitea. Optionally specify a prefix to ignore all repos whose names do not start with it. Optionally specify a suffix to add to all channels created. create Mattermost channels for student groups based on team information on Gitea
Example: `python3 -m joint_teapot create-channels-on-mm --prefix p1 --suffix -private --invite-teaching-team` will fetch all repos whose names start with `"p1"` and create channels on mm for these repos like "p1team1-private". Members of a repo will be added to the corresponding channel. And teaching team (adjust in `.env`) will be invited to the channels. **Options**:
- `--prefix TEXT`: Only process repositories starting with this prefix
- `--suffix TEXT`: Add suffix to created channels
- `--invite-teaching-team/--no-invite-teaching-team`: Whether to invite teaching team (default: invite)
Example: `joint-teapot create-group-channels-on-mm --prefix "hteam" --suffix "-gitea"` will Create channels for webhook integration. Members of "hteam*" repo will be added to the corresponding channel. And teaching team (adjust in `.env`) will be invited to the channels.
### `create-personal-channels-on-mm`
create personal Mattermost channels for every student
**Options**:
- `--invite-teaching-team/--no-invite-teaching-team`: Whether to invite teaching team (default: invite)
### `create-comment` ### `create-comment`
@ -125,6 +138,26 @@ Example: `python3 -m joint_teapot unsubscribe-from-repos '\d{12}$'` will remove
upload assignment grades to canvas from grade file (GRADE.txt by default), read the first line as grade, the rest as comments upload assignment grades to canvas from grade file (GRADE.txt by default), read the first line as grade, the rest as comments
### `label-issues`
add a label to specific issues in a repository, create labels if not exist, with dry-run disabled by default. You may adjust the color of the label with `--color "#******"` if the label doesn't exist.
### `delete-labels`
remove a label from specific issues or delete the repository labels, with dry-run disabled by default.
### `close-issues`
close one or more specific issues in a repository, with dry-run disabled by default.
### `update-group-channels-on-mm`
update group Mattermost channels for student groups based on team information on Gitea. It will only add missing users, never delete anyone.
### `update-personal-channels-on-mm`
update personal Mattermost channels for every student. It will only add missing users, never delete anyone.
## License ## License
MIT MIT

View File

@ -167,6 +167,51 @@ def close_all_issues() -> None:
tea.pot.gitea.close_all_issues() tea.pot.gitea.close_all_issues()
@app.command(
"close-issues",
help="close specific issues in a repository",
)
def close_issues(
repo_name: str,
issue_numbers: List[int] = Argument(..., help="One or more issue numbers to close"),
) -> None:
tea.pot.gitea.close_issues(repo_name, issue_numbers)
@app.command(
"label-issues",
help="add a label to specific issues in a repository",
)
def label_issues(
label_name: str,
repo_name: str,
issue_numbers: List[int] = Argument(..., help="One or more issue numbers to label"),
issue_color: Optional[str] = Option(
None, "--color", help="Color for newly created label (hex without # or with #)"
),
) -> None:
tea.pot.gitea.label_issues(repo_name, label_name, issue_numbers, color=issue_color)
@app.command(
"delete-label",
help="remove a label from specific issues or delete the repository label",
)
def delete_label(
label_name: str,
repo_name: str,
issue_numbers: List[int] = Argument(
None,
help="issue numbers to remove the label from",
),
) -> None:
tea.pot.gitea.delete_label(
repo_name,
label_name,
issue_numbers if issue_numbers else None,
)
@app.command( @app.command(
"archive-repos", help="archive repos in gitea organization according to regex" "archive-repos", help="archive repos in gitea organization according to regex"
) )
@ -243,6 +288,49 @@ def create_personal_channels_on_mm(
tea.pot.create_channels_for_individuals(invite_teaching_team) tea.pot.create_channels_for_individuals(invite_teaching_team)
@app.command(
"update-group-channels-on-mm",
help="update Mattermost channels for student groups based on team information on Gitea; only add missing members",
)
def update_group_channels_on_mm(
prefix: str = Option(
"", help="Only process repositories starting with this prefix"
),
suffix: str = Option("", help="Only process channels ending with this suffix"),
update_teaching_team: bool = Option(
True,
"--update-teaching-team/--no-update-teaching-team",
help="Whether to update teaching team",
),
) -> None:
groups = {
group_name: members
for group_name, members in tea.pot.gitea.get_all_teams().items()
if group_name.startswith(prefix)
}
logger.info(
f"{len(groups)} group channel(s) to update"
+ (f" with suffix {suffix}" if suffix else "")
)
tea.pot.mattermost.update_channels_for_groups(groups, suffix, update_teaching_team)
@app.command(
"update-personal-channels-on-mm",
help="update personal Mattermost channels for every student; only add missing members",
)
def update_personal_channels_on_mm(
update_teaching_team: bool = Option(
True,
"--update-teaching-team/--no-update-teaching-team",
help="Whether to update teaching team",
),
) -> None:
tea.pot.mattermost.update_channels_for_individuals(
tea.pot.canvas.students, update_teaching_team
)
@app.command( @app.command(
"create-webhooks-for-mm", "create-webhooks-for-mm",
help="create a pair of webhooks on gitea and mm for all student groups on gitea, " help="create a pair of webhooks on gitea and mm for all student groups on gitea, "

View File

@ -39,8 +39,19 @@ class Canvas:
student.name = ( student.name = (
re.sub(r"[^\x00-\x7F]+", "", student.name).strip().title() re.sub(r"[^\x00-\x7F]+", "", student.name).strip().title()
) # We only care english name ) # We only care english name
student.sis_id = student.login_id # Some users (like system users, announcers) might not have login_id
student.login_id = student.email.split("@")[0] if hasattr(student, "login_id") and student.login_id:
student.sis_id = student.login_id
student.login_id = student.email.split("@")[0]
else:
# For users without login_id, use email prefix as both sis_id and login_id
if hasattr(student, "email") and student.email:
student.login_id = student.email.split("@")[0]
student.sis_id = student.login_id
else:
# Fallback for users without email
student.login_id = f"user_{student.id}"
student.sis_id = student.login_id
return student return student
self.students = [ self.students = [

View File

@ -1,9 +1,11 @@
import re import re
from enum import Enum from enum import Enum
from functools import lru_cache from functools import lru_cache
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, TypeVar from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, cast
from urllib.parse import quote
import focs_gitea import focs_gitea
import requests # type: ignore
from canvasapi.group import Group, GroupMembership from canvasapi.group import Group, GroupMembership
from canvasapi.paginated_list import PaginatedList from canvasapi.paginated_list import PaginatedList
from canvasapi.user import User from canvasapi.user import User
@ -20,11 +22,13 @@ class PermissionEnum(Enum):
admin = "admin" admin = "admin"
T = TypeVar("T") def list_all(method: Callable[..., Any], *args: Any, **kwargs: Any) -> List[Any]:
"""Call paginated API methods repeatedly and collect results.
The exact return element types vary depending on the API client. We use
def list_all(method: Callable[..., Iterable[T]], *args: Any, **kwargs: Any) -> List[T]: ``Any`` here to avoid over-constraining typing for the external client.
all_res = [] """
all_res: List[Any] = []
page = 1 page = 1
while True: while True:
res = method(*args, **kwargs, page=page) res = method(*args, **kwargs, page=page)
@ -36,6 +40,165 @@ def list_all(method: Callable[..., Iterable[T]], *args: Any, **kwargs: Any) -> L
return all_res return all_res
def _get_color(c: Optional[str]) -> str:
if not c:
return "CCCCCC"
s = c.strip()
if s.startswith("#"):
s = s[1:]
if re.fullmatch(r"[0-9a-fA-F]{6}", s):
return s
logger.warning(f"Provided color '{c}' is not a valid hex; falling back to #CCCCCC")
return "CCCCCC"
def _label_id_from_obj(obj: Any) -> Optional[int]:
try:
if isinstance(obj, dict):
return obj.get("id")
return getattr(obj, "id", None)
except Exception:
return None
def _get_repo_labels(gitea: Any, repo_name: str) -> List[Any]:
try:
return list(
cast(
Iterable[Any],
gitea.issue_api.issue_list_labels(gitea.org_name, repo_name),
)
)
except ApiException as e:
logger.error(f"Failed to list labels for {repo_name}: {e}")
return []
def _list_issues_map(gitea: Any, repo_name: str) -> Dict[int, Any]:
try:
return {
issue.number: issue
for issue in list_all(
gitea.issue_api.issue_list_issues, gitea.org_name, repo_name
)
}
except ApiException as e:
logger.error(f"Failed to list issues for {repo_name}: {e}")
return {}
def _api_post_labels(gitea: Any, repo_name: str, issue_num: int, payload: Any) -> Any:
path = f"/repos/{gitea.org_name}/{repo_name}/issues/{issue_num}/labels"
full_url = f"{gitea.api_client.configuration.host}{path}"
token = (
getattr(gitea.api_client.configuration, "api_key", {}).get("access_token")
or settings.gitea_access_token
)
headers_local = {
"Authorization": f"token {token}",
"Content-Type": "application/json",
}
return requests.post(full_url, headers=headers_local, json=payload, timeout=10)
def _patch_label_exclusive(
gitea: Any, repo_name: str, name: str, label_obj: Any
) -> None:
try:
existing_color = getattr(label_obj, "color", None) or (
label_obj.get("color") if isinstance(label_obj, dict) else None
)
if (
existing_color
and isinstance(existing_color, str)
and existing_color.startswith("#")
):
existing_color = existing_color[1:]
enc_name = quote(name, safe="")
path = f"/repos/{gitea.org_name}/{repo_name}/labels/{enc_name}"
url = f"{gitea.api_client.configuration.host}{path}"
token = (
getattr(gitea.api_client.configuration, "api_key", {}).get("access_token")
or settings.gitea_access_token
)
headers_local = {
"Authorization": f"token {token}",
"Content-Type": "application/json",
}
payload = {"exclusive": True, "name": name}
if existing_color:
payload["color"] = existing_color
resp = requests.patch(url, headers=headers_local, json=payload, timeout=10)
if resp.status_code in (200, 201):
logger.info(f"Marked existing label '{name}' as exclusive in {repo_name}")
else:
logger.warning(
f"Failed to mark existing label '{name}' as exclusive: status={resp.status_code}"
)
except Exception as e:
logger.debug(f"Could not patch label exclusive for {name}: {e}")
def _delete_issue_label_by_id(
gitea: Any, repo_name: str, issue_num: int, lid: int
) -> None:
try:
path_id = f"/repos/{gitea.org_name}/{repo_name}/issues/{issue_num}/labels/{lid}"
url_id = f"{gitea.api_client.configuration.host}{path_id}"
token = (
getattr(gitea.api_client.configuration, "api_key", {}).get("access_token")
or settings.gitea_access_token
)
headers_local = {"Authorization": f"token {token}"}
resp = requests.delete(url_id, headers=headers_local, timeout=10)
if resp.status_code in (200, 204):
logger.info(f"Removed label id {lid} from {repo_name}#{issue_num}")
else:
logger.warning(
f"Failed to remove label id {lid} from {repo_name}#{issue_num}: status={resp.status_code}"
)
except Exception as e:
logger.warning(
f"Error removing label id {lid} from {repo_name}#{issue_num}: {e}"
)
def _create_label(
gitea: Any, repo_name: str, label_name: str, color: Optional[str], labels: List[Any]
) -> Optional[Any]:
for l in labels:
if getattr(l, "name", None) == label_name:
try:
is_ex = getattr(l, "exclusive", None)
if not bool(is_ex):
_patch_label_exclusive(gitea, repo_name, label_name, l)
except Exception:
return l
return l
chosen_color = _get_color(color)
try:
new = gitea.issue_api.issue_create_label(
gitea.org_name,
repo_name,
body={"name": label_name, "color": chosen_color, "exclusive": True},
)
logger.info(f"Created label '{label_name}' in {gitea.org_name}/{repo_name}")
return new
except ApiException as e:
logger.error(f"Failed to create label {label_name} in {repo_name}: {e}")
if hasattr(e, "body"):
logger.error(f"ApiException body: {getattr(e, 'body', None)}")
return None
def _extract_label_names(issue_obj: Any) -> List[str]:
try:
return [getattr(l, "name", l) for l in getattr(issue_obj, "labels", []) or []]
except Exception:
return []
class Gitea: class Gitea:
def __init__( def __init__(
self, self,
@ -477,6 +640,44 @@ class Gitea:
self.org_name, repo_name, issue.number, body={"state": "closed"} self.org_name, repo_name, issue.number, body={"state": "closed"}
) )
def close_issues(
self, repo_name: str, issue_numbers: List[int], dry_run: bool = False
) -> None:
if not issue_numbers:
logger.warning("No issue numbers provided to close")
return
if dry_run:
logger.info("Dry run enabled. No changes will be made to issues.")
try:
issues = {
issue.number: issue
for issue in list_all(
self.issue_api.issue_list_issues, self.org_name, repo_name
)
}
except ApiException as e:
logger.error(f"Failed to list issues for {repo_name}: {e}")
return
for num in issue_numbers:
issue = issues.get(num)
if issue is None:
logger.warning(f"Issue #{num} not found in {repo_name}")
continue
if getattr(issue, "state", "") == "closed":
logger.info(f"Issue #{num} in {repo_name} already closed")
continue
try:
if dry_run:
logger.info(f"Would close issue #{num} in {repo_name} (dry run)")
continue
self.issue_api.issue_edit_issue(
self.org_name, repo_name, num, body={"state": "closed"}
)
logger.info(f"Closed issue #{num} in {repo_name}")
except ApiException as e:
logger.error(f"Failed to close issue #{num} in {repo_name}: {e}")
def archive_repos(self, regex: str = ".+", dry_run: bool = True) -> None: def archive_repos(self, regex: str = ".+", dry_run: bool = True) -> None:
if dry_run: if dry_run:
logger.info("Dry run enabled. No changes will be made to the repositories.") logger.info("Dry run enabled. No changes will be made to the repositories.")
@ -548,6 +749,241 @@ class Gitea:
self.create_milestone(repo_name, milestone, description, due_date) self.create_milestone(repo_name, milestone, description, due_date)
logger.info(f"Created milestone {milestone} in {repo_name}") logger.info(f"Created milestone {milestone} in {repo_name}")
def label_issues(
self,
repo_name: str,
label_name: str,
issue_numbers: List[int],
color: Optional[str] = None,
) -> None:
if not issue_numbers:
logger.warning("No issue numbers provided to label")
return
labels = _get_repo_labels(self, repo_name)
if not labels:
logger.warning(f"No labels found for {repo_name}")
return
repo_label_name_to_obj: Dict[str, Any] = {}
for l in labels:
name = getattr(l, "name", None) or (
l.get("name") if isinstance(l, dict) else None
)
if isinstance(name, str):
repo_label_name_to_obj[name] = l
label_obj = _create_label(self, repo_name, label_name, color, labels)
if label_obj is None:
logger.error(f"Unable to ensure label '{label_name}' exists in {repo_name}")
return
label_id = _label_id_from_obj(label_obj)
if label_id is None:
logger.error(f"Unable to find id of label '{label_name}' in {repo_name}")
return
issues_map = _list_issues_map(self, repo_name)
if not issues_map:
return
for num in issue_numbers:
issue = issues_map.get(num)
if issue is None:
logger.warning(f"Issue #{num} not found in {repo_name}")
continue
existing_label_names = _extract_label_names(issue)
if label_name in existing_label_names:
logger.info(
f"Issue #{num} in {repo_name} already has label '{label_name}'"
)
continue
try:
current = self.issue_api.issue_get_issue(self.org_name, repo_name, num)
issue_labels = _extract_label_names(current)
except Exception:
issue_labels = existing_label_names
try:
target_obj = repo_label_name_to_obj.get(label_name) or label_obj
target_is_exclusive = bool(
getattr(target_obj, "exclusive", None)
or (
target_obj.get("exclusive")
if isinstance(target_obj, dict)
else False
)
)
except Exception:
target_is_exclusive = False
if target_is_exclusive:
for lname in issue_labels or []:
if lname == label_name:
continue
lobj = repo_label_name_to_obj.get(lname)
is_ex = False
if lobj is not None:
is_ex = bool(
getattr(lobj, "exclusive", None)
or (
lobj.get("exclusive")
if isinstance(lobj, dict)
else False
)
)
if is_ex:
lid = _label_id_from_obj(lobj) if lobj is not None else None
if lid is not None:
_delete_issue_label_by_id(self, repo_name, num, lid)
if label_name not in (issue_labels or []):
try:
resp = _api_post_labels(
self, repo_name, num, {"labels": [label_name]}
)
if getattr(resp, "status_code", None) not in (200, 201):
logger.error(
f"Failed to add label via add-labels endpoint for issue #{num}: status={getattr(resp, 'status_code', None)}"
)
except Exception as e:
logger.error(f"Failed to POST labels to issue #{num}: {e}")
# verification
try:
final = self.issue_api.issue_get_issue(self.org_name, repo_name, num)
final_labels = _extract_label_names(final)
except Exception:
final_labels = []
if label_name in (final_labels or []):
logger.info(
f"Label '{label_name}' attached to issue #{num} in {repo_name}"
)
else:
logger.warning(
f"Label '{label_name}' not attached to issue #{num} in {repo_name} after attempts"
)
def delete_label(
self,
repo_name: str,
label_name: str,
issue_numbers: Optional[List[int]] = None,
delete_repo_label: bool = False,
) -> None:
token = (
getattr(self.api_client.configuration, "api_key", {}).get("access_token")
or settings.gitea_access_token
)
headers_local = {"Authorization": f"token {token}"}
repo_labels: List[Any] = []
try:
repo_labels = list(
cast(
Iterable[Any],
self.issue_api.issue_list_labels(self.org_name, repo_name),
)
)
label_name_to_id: Dict[str, int] = {}
for l in repo_labels:
name = getattr(l, "name", None) or (
l.get("name") if isinstance(l, dict) else None
)
lid = getattr(l, "id", None) or (
l.get("id") if isinstance(l, dict) else None
)
if isinstance(name, str) and isinstance(lid, int):
label_name_to_id[name] = lid
except Exception:
label_name_to_id = {}
def _delete_issue_label(issue_num: int) -> None:
lid = label_name_to_id.get(label_name)
if lid is None:
try:
for l in repo_labels:
name = getattr(l, "name", None) or (
l.get("name") if isinstance(l, dict) else None
)
if name == label_name:
lid = getattr(l, "id", None) or (
l.get("id") if isinstance(l, dict) else None
)
break
except Exception:
logger.warning(
f"Could not determine id of label '{label_name}' in {repo_name}"
)
if lid is None:
logger.warning(
f"No numeric id found for label '{label_name}' in {repo_name}; skipping issue-level delete for issue #{issue_num}"
)
return
path_id = (
f"/repos/{self.org_name}/{repo_name}/issues/{issue_num}/labels/{lid}"
)
url_id = f"{self.api_client.configuration.host}{path_id}"
try:
resp = requests.delete(url_id, headers=headers_local, timeout=10)
if resp.status_code in (200, 204):
logger.info(
f"Removed label '{label_name}' from {repo_name}#{issue_num}"
)
return
logger.warning(
f"Numeric-id DELETE returned status {resp.status_code} for {repo_name}#{issue_num}: body={getattr(resp, 'text', None)}"
)
except Exception as e:
logger.error(
f"Numeric-id DELETE error for {repo_name}#{issue_num}: {e}"
)
def _delete_repo_label() -> None:
enc_name = quote(label_name, safe="")
path = f"/repos/{self.org_name}/{repo_name}/labels/{enc_name}"
url = f"{self.api_client.configuration.host}{path}"
try:
resp = requests.delete(url, headers=headers_local, timeout=10)
if resp.status_code in (200, 204):
logger.info(
f"Removed repo-level label '{label_name}' from {repo_name}"
)
return
logger.error(
f"Failed to delete repo label '{label_name}' from {repo_name}: status={resp.status_code}, body={getattr(resp, 'text', None)}"
)
except Exception as e:
logger.error(
f"Error deleting repo label '{label_name}' from {repo_name}: {e}"
)
if issue_numbers:
try:
issues = {
issue.number: issue
for issue in list_all(
self.issue_api.issue_list_issues, self.org_name, repo_name
)
}
except ApiException as e:
logger.error(f"Failed to list issues for {repo_name}: {e}")
return
for num in issue_numbers:
if num not in issues:
logger.warning(f"Issue #{num} not found in {repo_name}")
continue
_delete_issue_label(num)
else:
if delete_repo_label:
_delete_repo_label()
else:
logger.warning(
"No issue numbers provided and --repo not set; nothing to do for delete_label"
)
if __name__ == "__main__": if __name__ == "__main__":
gitea = Gitea() gitea = Gitea()

View File

@ -103,6 +103,109 @@ class Mattermost:
) )
logger.info(f"Added member {member} to channel {channel_name}") logger.info(f"Added member {member} to channel {channel_name}")
def update_channels_for_groups(
self,
groups: Dict[str, List[str]],
suffix: str = "",
update_teaching_team: bool = True,
dry_run: bool = False,
) -> None:
for group_name, members in groups.items():
channel_name = group_name + suffix
try:
channel = self.endpoint.channels.get_channel_by_name(
self.team["id"], channel_name
)
logger.info(f"Channel {channel_name} exists, updating members")
except Exception:
# channel does not exist
if dry_run:
info_members = list(members)
if update_teaching_team:
info_members = info_members + settings.mattermost_teaching_team
logger.info(
f"Dry run: would create channel {channel_name} and add members: {info_members}"
)
continue
try:
channel = self.endpoint.channels.create_channel(
{
"team_id": self.team["id"],
"name": channel_name,
"display_name": channel_name,
"type": "P",
}
)
logger.info(f"Created channel {channel_name} on Mattermost")
except Exception as e:
logger.warning(
f"Error when creating channel {channel_name}: {e} Perhaps channel already exists?"
)
continue
current_members = set()
try:
mm_members = (
self.endpoint.channels.get_channel_members(channel["id"]) or []
)
for m in mm_members:
uname = None
if isinstance(m, dict):
uname = m.get("username") or m.get("name")
if not uname and "user" in m and isinstance(m["user"], dict):
uname = m["user"].get("username") or m["user"].get("name")
if not uname and "user_id" in m:
try:
u = self.endpoint.users.get_user(m["user_id"]) or {}
if isinstance(u, dict):
uname = u.get("username") or u.get("name")
except Exception:
uname = None
if uname:
current_members.add(uname.lower())
except Exception:
current_members = set()
add_members = list(members)
if update_teaching_team:
add_members = add_members + settings.mattermost_teaching_team
for member in add_members:
if member.lower() in current_members:
logger.debug(f"Member {member} already in channel {channel_name}")
continue
if dry_run:
logger.info(
f"Dry run: would add member {member} to channel {channel_name}"
)
continue
try:
mmuser = self.endpoint.users.get_user_by_username(member)
except Exception:
logger.warning(
f"User {member} is not found on the Mattermost server"
)
self.endpoint.posts.create_post(
{
"channel_id": channel["id"],
"message": f"User {member} is not found on the Mattermost server",
}
)
continue
try:
self.endpoint.channels.add_user(
channel["id"], {"user_id": mmuser["id"]}
)
except Exception:
logger.warning(f"User {member} is not in the team")
self.endpoint.posts.create_post(
{
"channel_id": channel["id"],
"message": f"User {member} is not in the team",
}
)
logger.info(f"Added member {member} to channel {channel_name}")
def create_channels_for_individuals( def create_channels_for_individuals(
self, self,
students: PaginatedList, students: PaginatedList,
@ -166,6 +269,110 @@ class Mattermost:
logger.info(f"Added member {member} to channel {channel_name}") logger.info(f"Added member {member} to channel {channel_name}")
def update_channels_for_individuals(
self,
students: PaginatedList,
update_teaching_team: bool = True,
dry_run: bool = False,
) -> None:
for student in students:
display_name = student.name
channel_name = student.sis_id
try:
channel = self.endpoint.channels.get_channel_by_name(
self.team["id"], channel_name
)
logger.info(f"Channel {channel_name} exists, updating members")
except Exception:
if dry_run:
members_info = [student.login_id]
if update_teaching_team:
members_info = members_info + settings.mattermost_teaching_team
logger.info(
f"Dry run: would create channel {display_name} ({channel_name}) and add members: {members_info}"
)
continue
try:
channel = self.endpoint.channels.create_channel(
{
"team_id": self.team["id"],
"name": channel_name,
"display_name": display_name,
"type": "P",
}
)
logger.info(
f"Created channel {display_name} ({channel_name}) on Mattermost"
)
except Exception as e:
logger.warning(
f"Error when creating channel {channel_name}: {e} Perhaps channel already exists?"
)
continue
current_members = set()
try:
mm_members = (
self.endpoint.channels.get_channel_members(channel["id"]) or []
)
for m in mm_members:
uname = None
if isinstance(m, dict):
uname = m.get("username") or m.get("name")
if not uname and "user" in m and isinstance(m["user"], dict):
uname = m["user"].get("username") or m["user"].get("name")
if not uname and "user_id" in m:
try:
u = self.endpoint.users.get_user(m["user_id"]) or {}
if isinstance(u, dict):
uname = u.get("username") or u.get("name")
except Exception:
uname = None
if uname:
current_members.add(uname.lower())
except Exception:
current_members = set()
members = [student.login_id]
if update_teaching_team:
members = members + settings.mattermost_teaching_team
for member in members:
if member.lower() in current_members:
logger.debug(f"Member {member} already in channel {channel_name}")
continue
if dry_run:
logger.info(
f"Dry run: would add member {member} to channel {channel_name}"
)
continue
try:
mmuser = self.endpoint.users.get_user_by_username(member)
except Exception:
logger.warning(
f"User {member} is not found on the Mattermost server"
)
self.endpoint.posts.create_post(
{
"channel_id": channel["id"],
"message": f"User {member} is not found on the Mattermost server",
}
)
continue
try:
self.endpoint.channels.add_user(
channel["id"], {"user_id": mmuser["id"]}
)
except Exception:
logger.warning(f"User {member} is not in the team")
self.endpoint.posts.create_post(
{
"channel_id": channel["id"],
"message": f"User {member} is not in the team",
}
)
logger.info(f"Added member {member} to channel {channel_name}")
def create_webhooks_for_repos( def create_webhooks_for_repos(
self, repos: List[str], gitea: Gitea, gitea_suffix: bool self, repos: List[str], gitea: Gitea, gitea_suffix: bool
) -> None: ) -> None: