This repository has been archived on 2025-04-03. You can view files and clone it, but cannot push or open issues or pull requests.
linux-bash-scripts/gitea/auto_mapper/auto_mapper.py

233 lines
8.8 KiB
Python

import argparse
import os
import socket
import sys
from gitea import *
from rich.console import Console
from rich.spinner import Spinner
from rich.logging import RichHandler
from loguru import logger
# fmt: off
parser = argparse.ArgumentParser()
parser.add_argument("--host", help="Specify the Gitea instance host")
parser.add_argument("--port", help="Specify the Gitea instance port")
parser.add_argument("--token", help="Specify the Gitea instance token")
parser.add_argument("--ssl", help="Specify if the Gitea instance uses SSL", action="store_true")
parser.add_argument("--debug", help="Enable debug logging", action="store_true")
parser.add_argument("--source-orga", help="Specify the source organization")
parser.add_argument("--dry-run", help="Enable dry-run mode, no changes will be made", action="store_true")
parser.add_argument("--exclude", help="Specify organizations to exclude", nargs="+")
parser.add_argument("--update", help="Updates existing teams in target organizations", action="store_true")
parser.add_argument("--override", help="Override existing teams in target organizations", action="store_true")
args = parser.parse_args()
GITEA_INSTANCE : str = args.host if args.host else None
GITEA_PORT : int = args.port if args.port else None
GITEA_TOKEN : str = args.token if args.token else None
GITEA_SSL : bool = args.ssl if args.ssl else False
SOURCE_ORGA : str = args.source_orga if args.source_orga else None
DRY_RUN : bool = args.dry_run if args.dry_run else False
EXCLUDE_ORGAS : list = args.exclude if args.exclude else []
UPDATE_TEAMS : bool = args.update if args.update else False
OVERRIDE_TEAMS : bool = args.override if args.override else False
# fmt: on
console = Console()
logger.remove() # Remove default handler
logger.add(
RichHandler(console=console, show_time=True, show_level=True, show_path=False),
format="{time:YYYY-MM-DD HH:mm:ss} - {message}",
level="DEBUG" if args.debug else "INFO",
)
def check_host(host, port):
if not host:
raise Exception("Host not specified")
if not port:
port = 3000
logger.warning(f"Port not specified, defaulting to {port}")
try:
port = int(port)
if port < 1 or port > 65535:
raise ValueError("Invalid port number")
except ValueError:
raise Exception(f"{port} is not a valid port")
try:
socket.inet_aton(host)
except socket.error:
if host.startswith("http://"):
host = host[7:]
elif host.startswith("https://"):
host = host[8:]
else:
raise Exception(f"{host} is not a valid host")
try:
socket.gethostbyname(host)
except Exception as e:
raise Exception(f"{host} is not a valid host: {e}")
try:
logger.debug(f"Checking connection to {host}:{port}")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(5)
s.connect((host, port))
s.close()
except Exception as e:
raise Exception(f"Connection to {host}:{port} failed: {e}")
# Log with a spinner
with console.status("- Checking Gitea Endpoint", spinner="dots") as status:
try:
check_host(GITEA_INSTANCE, GITEA_PORT)
logger.info("Gitea endpoint is valid")
except Exception as e:
logger.error(f"Failed to check Gitea endpoint: {e}")
# Create a Gitea API client
gitea_client = None
with console.status("- Creating Gitea Client", spinner="dots") as status:
try:
if GITEA_SSL:
logger.debug("Using SSL")
gitea_client = Gitea(f"https://{GITEA_INSTANCE}:{GITEA_PORT}", GITEA_TOKEN)
else:
logger.debug("Not using SSL")
gitea_client = Gitea(f"http://{GITEA_INSTANCE}:{GITEA_PORT}", GITEA_TOKEN)
except Exception as e:
logger.error(f"Failed to create Gitea client: {e}")
if not gitea_client:
os._exit(1)
# Check if the token is valid
with console.status("- Checking Gitea Token", spinner="dots") as status:
try:
v = gitea_client.get_version()
u = gitea_client.get_user()
logger.info(f"Connected to Gitea {v}")
logger.info(f"Authenticated as {u.username}")
except Exception as e:
logger.error(f"Failed to check Gitea token: {e}")
# Get the source organization
source_orga = None
with console.status("- Getting Source Organization", spinner="dots") as status:
try:
source_orga = Organization.request(gitea_client, SOURCE_ORGA)
logger.info(f"Source organization is '{source_orga.name}'")
except Exception as e:
logger.error(f"Failed to get source organization: {e}")
if not source_orga:
os._exit(1)
# Get the source organization teams
source_orga_teams = None
with console.status("- Getting Source Organization Teams", spinner="dots") as status:
try:
source_orga_teams = source_orga.get_teams()
source_orga_teams = [
team for team in source_orga_teams if team.name != "Owners"
] # Skip the default team 'Owners'
logger.info(f"Source organization has {len(source_orga_teams)} teams:")
for team in source_orga_teams:
logger.info(f" - {team.name}")
logger.info("Note: The default team 'Owners' will always be skipped!")
except Exception as e:
logger.error(f"Failed to get source organization teams: {e}")
if not source_orga_teams:
os._exit(1)
# Get all other organizations except for the source organization
all_orgas = None
with console.status("- Getting All Organizations", spinner="dots") as status:
try:
all_orgas = gitea_client.get_orgs()
all_orgas = [orga for orga in all_orgas if orga.name != source_orga.name]
logger.info(f"Found {len(all_orgas)} other organizations:")
for orga in all_orgas:
logger.info(f" - {orga.name}")
logger.info(
f"Note: The source organization {source_orga.name} will always be skipped!"
)
except Exception as e:
logger.error(f"Failed to get all organizations: {e}")
if not all_orgas:
os._exit(1)
# Copy teams from source organization to all other organizations except for the source organization
with console.status("- Copying Teams", spinner="dots") as status:
if DRY_RUN:
logger.warning("Dry-run mode enabled, no changes will be made")
if OVERRIDE_TEAMS:
logger.info("Update mode enabled, existing teams will be updated")
for orga in all_orgas:
if orga.name in EXCLUDE_ORGAS:
logger.info(f"Skipping organization '{orga.name}'")
continue
logger.info(f"{source_orga.name} -> {orga.name}")
for team in source_orga_teams:
try:
# check if the team already exists in the target organization
existing_team = orga.get_team(team.name, True)
if OVERRIDE_TEAMS and existing_team:
logger.info(f"\tDeleting existing team '{team.name}'")
if not DRY_RUN:
existing_team.delete()
existing_team = None
logger.info(f"\tCreating team '{team.name}'")
if not DRY_RUN and not existing_team:
new_team = gitea_client.create_team(
org=orga,
name=team.name,
description=team.description,
permission="read",
includes_all_repositories=False,
can_create_org_repo=False,
units=[
"repo.code",
"repo.issues",
"repo.ext_issues",
"repo.wiki",
"repo.pulls",
"repo.releases",
"repo.ext_wiki",
"repo.actions",
"repo.projects",
],
units_map={
"repo.code": "none",
"repo.ext_issues": "none",
"repo.ext_wiki": "none",
"repo.issues": "none",
"repo.projects": "none",
"repo.pulls": "none",
"repo.releases": "none",
"repo.wiki": "none",
},
)
if not DRY_RUN and existing_team:
logger.info(f"\tUpdating existing team '{team.name}'")
existing_team.description = team.description
except Exception as e:
logger.error(f"Failed to copy team '{team.name}': {e}")