forked from JOJ/Joint-Teapot
		
	
		
			
				
	
	
		
			152 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			152 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import os
 | 
						|
import sys
 | 
						|
from time import sleep
 | 
						|
from typing import List, Optional
 | 
						|
 | 
						|
from joint_teapot.utils.logger import logger
 | 
						|
 | 
						|
current_path = sys.path[0]
 | 
						|
sys.path.remove(current_path)
 | 
						|
from git import Repo
 | 
						|
from git.exc import GitCommandError
 | 
						|
from git.remote import PushInfoList
 | 
						|
 | 
						|
sys.path.insert(0, current_path)
 | 
						|
 | 
						|
from joint_teapot.config import settings
 | 
						|
 | 
						|
 | 
						|
class Git:
 | 
						|
    def __init__(
 | 
						|
        self,
 | 
						|
        git_host: str = "",
 | 
						|
        org_name: str = "",
 | 
						|
        repos_dir: str = "",
 | 
						|
    ):
 | 
						|
        git_host = git_host or settings.git_host
 | 
						|
        org_name = org_name or settings.gitea_org_name
 | 
						|
        repos_dir = repos_dir or settings.repos_dir
 | 
						|
        self.git_host = git_host
 | 
						|
        self.org_name = org_name
 | 
						|
        self.repos_dir = repos_dir
 | 
						|
        if not os.path.isdir(self.repos_dir):
 | 
						|
            raise Exception(f"{self.repos_dir} does not exist! Create it first.")
 | 
						|
        logger.debug("Git initialized")
 | 
						|
        logger.info(f"repos dir: {self.repos_dir}")
 | 
						|
 | 
						|
    def clone_repo(
 | 
						|
        self,
 | 
						|
        repo_name: str,
 | 
						|
        branch: str = settings.default_branch,
 | 
						|
        auto_retry: bool = True,
 | 
						|
    ) -> Optional[Repo]:
 | 
						|
        repo = None
 | 
						|
        repo_dir = os.path.join(self.repos_dir, repo_name)
 | 
						|
        retry_interval = 2
 | 
						|
        while retry_interval and auto_retry:
 | 
						|
            try:
 | 
						|
                repo = Repo.clone_from(
 | 
						|
                    f"{self.git_host}/{self.org_name}/{repo_name}.git",
 | 
						|
                    repo_dir,
 | 
						|
                    branch=branch,
 | 
						|
                )
 | 
						|
                retry_interval = 0
 | 
						|
            except GitCommandError as e:
 | 
						|
                if "Connection refused" in e.stderr or "Connection reset" in e.stderr:
 | 
						|
                    logger.warning(
 | 
						|
                        f"{repo_name} connection refused/reset in clone. "
 | 
						|
                        "Probably by JI firewall."
 | 
						|
                    )
 | 
						|
                    logger.info(f"wait for {retry_interval} seconds to retry...")
 | 
						|
                    sleep(retry_interval)
 | 
						|
                    if retry_interval < 64:
 | 
						|
                        retry_interval *= 2
 | 
						|
                elif f"Remote branch {branch} not found in upstream origin" in e.stderr:
 | 
						|
                    retry_interval = 0
 | 
						|
                    logger.error(f"{repo_name} origin/{branch} not found")
 | 
						|
                else:
 | 
						|
                    raise
 | 
						|
        return repo
 | 
						|
 | 
						|
    def get_repo(self, repo_name: str) -> Optional[Repo]:
 | 
						|
        repo_dir = os.path.join(self.repos_dir, repo_name)
 | 
						|
        if os.path.exists(repo_dir):
 | 
						|
            return Repo(repo_dir)
 | 
						|
        return self.clone_repo(repo_name)
 | 
						|
 | 
						|
    def repo_clean_and_checkout(
 | 
						|
        self,
 | 
						|
        repo_name: str,
 | 
						|
        checkout_dest: str,
 | 
						|
        *,
 | 
						|
        auto_retry: bool = True,
 | 
						|
        clean_git_lock: bool = False,
 | 
						|
        reset_target: str = f"origin/{settings.default_branch}",
 | 
						|
    ) -> str:
 | 
						|
        repo_dir = os.path.join(self.repos_dir, repo_name)
 | 
						|
        repo = self.get_repo(repo_name)
 | 
						|
        if not repo:
 | 
						|
            return repo_dir
 | 
						|
        retry_interval = 2
 | 
						|
        while retry_interval and auto_retry:
 | 
						|
            try:
 | 
						|
                if clean_git_lock:
 | 
						|
                    lock_files = [
 | 
						|
                        "index.lock",
 | 
						|
                        "HEAD.lock",
 | 
						|
                        "fetch-pack.lock",
 | 
						|
                        "logs/HEAD.lock",
 | 
						|
                        "packed-refs.lock",
 | 
						|
                        "config.lock",
 | 
						|
                    ]
 | 
						|
                    for lock_file in lock_files:
 | 
						|
                        lock_path = os.path.join(repo_dir, ".git", lock_file)
 | 
						|
                        if os.path.exists(lock_path):
 | 
						|
                            os.remove(lock_path)
 | 
						|
                repo.git.fetch("--tags", "--all", "-f")
 | 
						|
                repo.git.reset("--hard", reset_target)
 | 
						|
                repo.git.clean("-d", "-f", "-x")
 | 
						|
                repo.git.checkout(checkout_dest)
 | 
						|
                retry_interval = 0
 | 
						|
            except GitCommandError as e:
 | 
						|
                if "Connection refused" in e.stderr or "Connection reset" in e.stderr:
 | 
						|
                    logger.warning(
 | 
						|
                        f"{repo_name} connection refused/reset in fetch. "
 | 
						|
                        "Probably by JI firewall."
 | 
						|
                    )
 | 
						|
                    logger.info(f"wait for {retry_interval} seconds to retry...")
 | 
						|
                    sleep(retry_interval)
 | 
						|
                    if retry_interval < 64:
 | 
						|
                        retry_interval *= 2
 | 
						|
                elif (
 | 
						|
                    f"Remote branch {settings.default_branch} not found in upstream origin"
 | 
						|
                    in e.stderr
 | 
						|
                ):
 | 
						|
                    retry_interval = 0
 | 
						|
                    logger.error(
 | 
						|
                        f"{repo_name} origin/{settings.default_branch} not found"
 | 
						|
                    )
 | 
						|
                else:
 | 
						|
                    retry_interval = 0
 | 
						|
                    logger.exception(e)
 | 
						|
        return repo_dir
 | 
						|
 | 
						|
    def add_commit(
 | 
						|
        self, repo_name: str, files_to_add: List[str], commit_message: str
 | 
						|
    ) -> None:
 | 
						|
        repo: Repo = self.get_repo(repo_name)
 | 
						|
        for file in files_to_add:
 | 
						|
            try:
 | 
						|
                repo.index.add(file)
 | 
						|
            except OSError:
 | 
						|
                logger.warning(
 | 
						|
                    f'File path "{file}" does not exist. Skipping this file.'
 | 
						|
                )
 | 
						|
                continue
 | 
						|
        if repo.is_dirty(untracked_files=True) or repo.index.diff(None):
 | 
						|
            repo.index.commit(commit_message)
 | 
						|
 | 
						|
    def push(self, repo_name: str) -> PushInfoList:
 | 
						|
        repo: Repo = self.get_repo(repo_name)
 | 
						|
        return repo.remote(name="origin").push()
 |