This repository has been archived on 2025-04-03. You can view files and clone it, but cannot push or open issues or pull requests.
linux-bash-scripts/gitea/auto_mapper/auto_mapper.py

273 lines
11 KiB
Python

import argparse
import os
import socket
from gitea import *
from rich.console import Console
from rich.logging import RichHandler
from loguru import logger
from dotenv import load_dotenv
# fmt: off
load_dotenv()
def str_to_bool(value):
return value.lower() in ['true', '1', 'yes', 'y']
GITEA_INSTANCE : str = os.getenv("GITEA_INSTANCE") if os.getenv("GITEA_INSTANCE") != None else None
GITEA_PORT : int = os.getenv("GITEA_PORT") if os.getenv("GITEA_PORT") != None else None
GITEA_TOKEN : str = os.getenv("GITEA_TOKEN") if os.getenv("GITEA_TOKEN") != None else None
GITEA_SSL : bool = str_to_bool(os.getenv("GITEA_SSL")) if os.getenv("GITEA_SSL") != None else False
DEBUG : bool = str_to_bool(os.getenv("DEBUG")) if os.getenv("DEBUG") != None else False
SOURCE_ORGA : str = os.getenv("SOURCE_ORGA") if os.getenv("SOURCE_ORGA") != None else None
DRY_RUN : bool = str_to_bool(os.getenv("DRY_RUN")) if os.getenv("DRY_RUN") != None else None
EXCLUDE_ORGAS : list = os.getenv("EXCLUDE_ORGAS").split(',') if os.getenv("EXCLUDE_ORGAS") != None else None
UPDATE_TEAMS : bool = str_to_bool(os.getenv("UPDATE_TEAMS")) if os.getenv("UPDATE_TEAMS") != None else False
OVERRIDE_TEAMS : bool = str_to_bool(os.getenv("OVERRIDE_TEAMS")) if os.getenv("OVERRIDE_TEAMS") != None else False
parser = argparse.ArgumentParser()
parser.add_argument("--host", help="Specify the Gitea instance host")
parser.add_argument("--port", help="Specify the Gitea instance port")
parser.add_argument("--token", help="Specify the Gitea instance token")
parser.add_argument("--ssl", help="Specify if the Gitea instance uses SSL", action="store_true")
parser.add_argument("--debug", help="Enable debug logging", action="store_true")
parser.add_argument("--source-orga", help="Specify the source organization")
parser.add_argument("--dry-run", help="Enable dry-run mode, no changes will be made", action="store_true")
parser.add_argument("--exclude", help="Specify organizations to exclude", nargs="+")
parser.add_argument("--update", help="Updates existing teams in target organizations", action="store_true")
parser.add_argument("--override", help="Override existing teams in target organizations", action="store_true")
args = parser.parse_args()
GITEA_INSTANCE : str = args.host if args.host else GITEA_INSTANCE
GITEA_PORT : int = args.port if args.port else GITEA_PORT
GITEA_TOKEN : str = args.token if args.token else GITEA_TOKEN
GITEA_SSL : bool = args.ssl if args.ssl else GITEA_SSL
DEBUG : bool = args.debug if args.debug else DEBUG
SOURCE_ORGA : str = args.source_orga if args.source_orga else SOURCE_ORGA
DRY_RUN : bool = args.dry_run if args.dry_run else DRY_RUN
EXCLUDE_ORGAS : list = args.exclude if args.exclude else EXCLUDE_ORGAS
UPDATE_TEAMS : bool = args.update if args.update else UPDATE_TEAMS
OVERRIDE_TEAMS : bool = args.override if args.override else OVERRIDE_TEAMS
# fmt: on
console = Console()
logger.remove() # Remove default handler
logger.add(
RichHandler(console=console, show_time=True, show_level=True, show_path=False),
format="{time:YYYY-MM-DD HH:mm:ss} - {message}",
level="DEBUG" if args.debug else "INFO",
)
# fmt: off
logger.info("Starting Gitea Auto Mapper")
logger.debug("Debug logging enabled")
logger.info(f"Dry-run mode: {'Enabled' if DRY_RUN else 'Disabled'}")
logger.info(f"Target Gitea instance: {GITEA_INSTANCE}:{GITEA_PORT}")
logger.info(f"Using SSL: {'Enabled' if GITEA_SSL else 'Disabled'}")
logger.info(f"Source organization: {SOURCE_ORGA}")
logger.info(f"Excluded organizations: {', '.join(EXCLUDE_ORGAS) if EXCLUDE_ORGAS else 'None'}")
logger.info(f"Update mode: {'Enabled' if UPDATE_TEAMS else 'Disabled'}")
logger.info(f"Override mode: {'Enabled' if OVERRIDE_TEAMS else 'Disabled'}")
# fmt: on
def check_host(host, port):
if not host:
raise Exception("Host not specified")
if not port:
port = 3000
logger.warning(f"Port not specified, defaulting to {port}")
try:
port = int(port)
if port < 1 or port > 65535:
raise ValueError("Invalid port number")
except ValueError:
raise Exception(f"{port} is not a valid port")
try:
socket.inet_aton(host)
except socket.error:
if host.startswith("http://"):
host = host[7:]
elif host.startswith("https://"):
host = host[8:]
else:
raise Exception(f"{host} is not a valid host")
try:
socket.gethostbyname(host)
except Exception as e:
raise Exception(f"{host} is not a valid host: {e}")
try:
logger.debug(f"Checking connection to {host}:{port}")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(5)
s.connect((host, port))
s.close()
except Exception as e:
raise Exception(f"Connection to {host}:{port} failed: {e}")
# Log with a spinner
with console.status("- Checking Gitea Endpoint", spinner="dots") as status:
try:
check_host(GITEA_INSTANCE, GITEA_PORT)
logger.info(f"Gitea endpoint at {GITEA_INSTANCE}:{GITEA_PORT} is valid")
except Exception as e:
logger.error(f"Failed to check Gitea endpoint: {e}")
logger.error(
"Please double check the Gitea endpoint and make sure it is reachable"
)
os._exit(1)
# Create a Gitea API client
gitea_client = None
with console.status("- Creating Gitea Client", spinner="dots") as status:
try:
if GITEA_SSL:
logger.debug("Using SSL")
gitea_client = Gitea(f"https://{GITEA_INSTANCE}:{GITEA_PORT}", GITEA_TOKEN)
else:
logger.debug("Not using SSL")
gitea_client = Gitea(f"http://{GITEA_INSTANCE}:{GITEA_PORT}", GITEA_TOKEN)
except Exception as e:
logger.error(f"Failed to create Gitea client: {e}")
if not gitea_client:
os._exit(1)
# Check if the token is valid
with console.status("- Checking Gitea Token", spinner="dots") as status:
try:
v = gitea_client.get_version()
u = gitea_client.get_user()
logger.info(f"Connected to Gitea {v}")
logger.info(f"Authenticated as {u.username}")
except Exception as e:
logger.error(f"Failed to check Gitea token: {e}")
# Get the source organization
source_orga = None
with console.status("- Getting Source Organization", spinner="dots") as status:
try:
source_orga = Organization.request(gitea_client, SOURCE_ORGA)
logger.info(f"Source organization is '{source_orga.name}'")
except Exception as e:
logger.error(f"Failed to get source organization: {e}")
if not source_orga:
os._exit(1)
# Get the source organization teams
source_orga_teams = None
with console.status("- Getting Source Organization Teams", spinner="dots") as status:
try:
source_orga_teams = source_orga.get_teams()
source_orga_teams = [
team for team in source_orga_teams if team.name != "Owners"
] # Skip the default team 'Owners'
logger.info(f"Source organization has {len(source_orga_teams)} teams:")
for team in source_orga_teams:
logger.info(f" - {team.name}")
logger.info("Note: The default team 'Owners' will always be skipped!")
except Exception as e:
logger.error(f"Failed to get source organization teams: {e}")
if not source_orga_teams:
os._exit(1)
# Get all other organizations except for the source organization
all_orgas = None
with console.status("- Getting All Organizations", spinner="dots") as status:
try:
all_orgas = gitea_client.get_orgs()
all_orgas = [orga for orga in all_orgas if orga.name != source_orga.name]
logger.info(f"Found {len(all_orgas)} other organizations:")
for orga in all_orgas:
logger.info(f" - {orga.name}")
logger.info(
f"Note: The source organization {source_orga.name} will always be skipped!"
)
except Exception as e:
logger.error(f"Failed to get all organizations: {e}")
if not all_orgas:
os._exit(1)
# Copy teams from source organization to all other organizations except for the source organization
with console.status("- Copying Teams", spinner="dots") as status:
if DRY_RUN:
logger.warning("Dry-run mode enabled, no changes will be made")
if OVERRIDE_TEAMS:
logger.info("Update mode enabled, existing teams will be updated")
for orga in all_orgas:
if orga.name in EXCLUDE_ORGAS:
logger.info(f"Skipping organization '{orga.name}'")
continue
logger.info(f"{source_orga.name} -> {orga.name}")
for team in source_orga_teams:
try:
# check if the team already exists in the target organization
orga_teams = orga.get_teams()
existing_team = next(
(t for t in orga_teams if t.name == team.name), None
)
if existing_team:
logger.debug(f"\tTeam {team.name} already exists in {orga.name}")
if OVERRIDE_TEAMS and existing_team:
logger.info(f"\tDeleting existing team '{team.name}'")
if not DRY_RUN:
existing_team.delete()
existing_team = None
if not DRY_RUN and not existing_team:
logger.info(f"\tCreating team '{team.name}'")
new_team = gitea_client.create_team(
org=orga,
name=team.name,
description=team.description,
permission="read",
includes_all_repositories=False,
can_create_org_repo=False,
units=[
"repo.code",
"repo.issues",
"repo.ext_issues",
"repo.wiki",
"repo.pulls",
"repo.releases",
"repo.ext_wiki",
"repo.actions",
"repo.projects",
],
units_map={
"repo.code": "none",
"repo.ext_issues": "none",
"repo.ext_wiki": "none",
"repo.issues": "none",
"repo.projects": "none",
"repo.pulls": "none",
"repo.releases": "none",
"repo.wiki": "none",
},
)
if not DRY_RUN and existing_team:
logger.info(f"\tUpdating existing team '{team.name}'")
existing_team.description = team.description
except Exception as e:
logger.error(f"Failed to copy team '{team.name}': {e}")