273 lines
11 KiB
Python
273 lines
11 KiB
Python
import argparse
|
|
import os
|
|
import socket
|
|
from gitea import *
|
|
from rich.console import Console
|
|
from rich.logging import RichHandler
|
|
from loguru import logger
|
|
from dotenv import load_dotenv
|
|
|
|
# fmt: off
|
|
|
|
load_dotenv()
|
|
|
|
def str_to_bool(value):
|
|
return value.lower() in ['true', '1', 'yes', 'y']
|
|
|
|
GITEA_INSTANCE : str = os.getenv("GITEA_INSTANCE") if os.getenv("GITEA_INSTANCE") != None else None
|
|
GITEA_PORT : int = os.getenv("GITEA_PORT") if os.getenv("GITEA_PORT") != None else 0
|
|
GITEA_TOKEN : str = os.getenv("GITEA_TOKEN") if os.getenv("GITEA_TOKEN") != None else None
|
|
GITEA_SSL : bool = str_to_bool(os.getenv("GITEA_SSL")) if os.getenv("GITEA_SSL") != None else False
|
|
DEBUG : bool = str_to_bool(os.getenv("DEBUG")) if os.getenv("DEBUG") != None else False
|
|
SOURCE_ORGA : str = os.getenv("SOURCE_ORGA") if os.getenv("SOURCE_ORGA") != None else None
|
|
DRY_RUN : bool = str_to_bool(os.getenv("DRY_RUN")) if os.getenv("DRY_RUN") != None else False
|
|
EXCLUDE_ORGAS : list = os.getenv("EXCLUDE_ORGAS").split(',') if os.getenv("EXCLUDE_ORGAS") != None else []
|
|
UPDATE_TEAMS : bool = str_to_bool(os.getenv("UPDATE_TEAMS")) if os.getenv("UPDATE_TEAMS") != None else False
|
|
OVERRIDE_TEAMS : bool = str_to_bool(os.getenv("OVERRIDE_TEAMS")) if os.getenv("OVERRIDE_TEAMS") != None else False
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--host", help="Specify the Gitea instance host")
|
|
parser.add_argument("--port", help="Specify the Gitea instance port")
|
|
parser.add_argument("--token", help="Specify the Gitea instance token")
|
|
parser.add_argument("--ssl", help="Specify if the Gitea instance uses SSL", action="store_true")
|
|
parser.add_argument("--debug", help="Enable debug logging", action="store_true")
|
|
parser.add_argument("--source-orga", help="Specify the source organization")
|
|
parser.add_argument("--dry-run", help="Enable dry-run mode, no changes will be made", action="store_true")
|
|
parser.add_argument("--exclude", help="Specify organizations to exclude", nargs="+")
|
|
parser.add_argument("--update", help="Updates existing teams in target organizations", action="store_true")
|
|
parser.add_argument("--override", help="Override existing teams in target organizations", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
GITEA_INSTANCE : str = args.host if args.host else GITEA_INSTANCE
|
|
GITEA_PORT : int = args.port if args.port else GITEA_PORT
|
|
GITEA_TOKEN : str = args.token if args.token else GITEA_TOKEN
|
|
GITEA_SSL : bool = args.ssl if args.ssl else GITEA_SSL
|
|
DEBUG : bool = args.debug if args.debug else DEBUG
|
|
SOURCE_ORGA : str = args.source_orga if args.source_orga else SOURCE_ORGA
|
|
DRY_RUN : bool = args.dry_run if args.dry_run else DRY_RUN
|
|
EXCLUDE_ORGAS : list = args.exclude if args.exclude else EXCLUDE_ORGAS
|
|
UPDATE_TEAMS : bool = args.update if args.update else UPDATE_TEAMS
|
|
OVERRIDE_TEAMS : bool = args.override if args.override else OVERRIDE_TEAMS
|
|
|
|
# fmt: on
|
|
|
|
console = Console()
|
|
logger.remove() # Remove default handler
|
|
logger.add(
|
|
RichHandler(console=console, show_time=True, show_level=True, show_path=False),
|
|
format="{time:YYYY-MM-DD HH:mm:ss} - {message}",
|
|
level="DEBUG" if args.debug else "INFO",
|
|
)
|
|
|
|
# fmt: off
|
|
|
|
logger.info("Starting Gitea Auto Mapper")
|
|
logger.debug("Debug logging enabled")
|
|
logger.info(f"Dry-run mode: {'Enabled' if DRY_RUN else 'Disabled'}")
|
|
logger.info(f"Target Gitea instance: {GITEA_INSTANCE}:{GITEA_PORT}")
|
|
logger.info(f"Using SSL: {'Enabled' if GITEA_SSL else 'Disabled'}")
|
|
logger.info(f"Source organization: {SOURCE_ORGA}")
|
|
logger.info(f"Excluded organizations: {', '.join(EXCLUDE_ORGAS) if EXCLUDE_ORGAS else 'None'}")
|
|
logger.info(f"Update mode: {'Enabled' if UPDATE_TEAMS else 'Disabled'}")
|
|
logger.info(f"Override mode: {'Enabled' if OVERRIDE_TEAMS else 'Disabled'}")
|
|
|
|
# fmt: on
|
|
|
|
|
|
def check_host(host, port):
|
|
|
|
if not host:
|
|
raise Exception("Host not specified")
|
|
|
|
if not port:
|
|
port = 3000
|
|
logger.warning(f"Port not specified, defaulting to {port}")
|
|
|
|
try:
|
|
port = int(port)
|
|
if port < 1 or port > 65535:
|
|
raise ValueError("Invalid port number")
|
|
except ValueError:
|
|
raise Exception(f"{port} is not a valid port")
|
|
|
|
try:
|
|
socket.inet_aton(host)
|
|
except socket.error:
|
|
if host.startswith("http://"):
|
|
host = host[7:]
|
|
elif host.startswith("https://"):
|
|
host = host[8:]
|
|
else:
|
|
raise Exception(f"{host} is not a valid host")
|
|
|
|
try:
|
|
socket.gethostbyname(host)
|
|
except Exception as e:
|
|
raise Exception(f"{host} is not a valid host: {e}")
|
|
|
|
try:
|
|
logger.debug(f"Checking connection to {host}:{port}")
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.settimeout(5)
|
|
s.connect((host, port))
|
|
s.close()
|
|
except Exception as e:
|
|
raise Exception(f"Connection to {host}:{port} failed: {e}")
|
|
|
|
|
|
# Log with a spinner
|
|
with console.status("- Checking Gitea Endpoint", spinner="dots") as status:
|
|
try:
|
|
check_host(GITEA_INSTANCE, GITEA_PORT)
|
|
logger.info(f"Gitea endpoint at {GITEA_INSTANCE}:{GITEA_PORT} is valid")
|
|
except Exception as e:
|
|
logger.error(f"Failed to check Gitea endpoint: {e}")
|
|
logger.error(
|
|
"Please double check the Gitea endpoint and make sure it is reachable"
|
|
)
|
|
os._exit(1)
|
|
|
|
# Create a Gitea API client
|
|
gitea_client = None
|
|
with console.status("- Creating Gitea Client", spinner="dots") as status:
|
|
try:
|
|
if GITEA_SSL:
|
|
logger.debug("Using SSL")
|
|
gitea_client = Gitea(f"https://{GITEA_INSTANCE}:{GITEA_PORT}", GITEA_TOKEN)
|
|
else:
|
|
logger.debug("Not using SSL")
|
|
gitea_client = Gitea(f"http://{GITEA_INSTANCE}:{GITEA_PORT}", GITEA_TOKEN)
|
|
except Exception as e:
|
|
logger.error(f"Failed to create Gitea client: {e}")
|
|
|
|
if not gitea_client:
|
|
os._exit(1)
|
|
|
|
# Check if the token is valid
|
|
with console.status("- Checking Gitea Token", spinner="dots") as status:
|
|
try:
|
|
v = gitea_client.get_version()
|
|
u = gitea_client.get_user()
|
|
logger.info(f"Connected to Gitea {v}")
|
|
logger.info(f"Authenticated as {u.username}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to check Gitea token: {e}")
|
|
|
|
# Get the source organization
|
|
source_orga = None
|
|
with console.status("- Getting Source Organization", spinner="dots") as status:
|
|
try:
|
|
source_orga = Organization.request(gitea_client, SOURCE_ORGA)
|
|
logger.info(f"Source organization is '{source_orga.name}'")
|
|
except Exception as e:
|
|
logger.error(f"Failed to get source organization: {e}")
|
|
|
|
if not source_orga:
|
|
os._exit(1)
|
|
|
|
# Get the source organization teams
|
|
source_orga_teams = None
|
|
with console.status("- Getting Source Organization Teams", spinner="dots") as status:
|
|
try:
|
|
source_orga_teams = source_orga.get_teams()
|
|
source_orga_teams = [
|
|
team for team in source_orga_teams if team.name != "Owners"
|
|
] # Skip the default team 'Owners'
|
|
logger.info(f"Source organization has {len(source_orga_teams)} teams:")
|
|
for team in source_orga_teams:
|
|
logger.info(f" - {team.name}")
|
|
logger.info("Note: The default team 'Owners' will always be skipped!")
|
|
except Exception as e:
|
|
logger.error(f"Failed to get source organization teams: {e}")
|
|
|
|
if not source_orga_teams:
|
|
os._exit(1)
|
|
|
|
# Get all other organizations except for the source organization
|
|
all_orgas = None
|
|
with console.status("- Getting All Organizations", spinner="dots") as status:
|
|
try:
|
|
all_orgas = gitea_client.get_orgs()
|
|
all_orgas = [orga for orga in all_orgas if orga.name != source_orga.name]
|
|
logger.info(f"Found {len(all_orgas)} other organizations:")
|
|
for orga in all_orgas:
|
|
logger.info(f" - {orga.name}")
|
|
logger.info(
|
|
f"Note: The source organization {source_orga.name} will always be skipped!"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to get all organizations: {e}")
|
|
|
|
if not all_orgas:
|
|
os._exit(1)
|
|
|
|
# Copy teams from source organization to all other organizations except for the source organization
|
|
with console.status("- Copying Teams", spinner="dots") as status:
|
|
if DRY_RUN:
|
|
logger.warning("Dry-run mode enabled, no changes will be made")
|
|
|
|
if OVERRIDE_TEAMS:
|
|
logger.info("Update mode enabled, existing teams will be updated")
|
|
|
|
for orga in all_orgas:
|
|
if orga.name in EXCLUDE_ORGAS:
|
|
logger.info(f"Skipping organization '{orga.name}'")
|
|
continue
|
|
|
|
logger.info(f"{source_orga.name} -> {orga.name}")
|
|
for team in source_orga_teams:
|
|
try:
|
|
# check if the team already exists in the target organization
|
|
orga_teams = orga.get_teams()
|
|
existing_team = next(
|
|
(t for t in orga_teams if t.name == team.name), None
|
|
)
|
|
|
|
if existing_team:
|
|
logger.debug(f"\tTeam {team.name} already exists in {orga.name}")
|
|
|
|
if OVERRIDE_TEAMS and existing_team:
|
|
logger.info(f"\tDeleting existing team '{team.name}'")
|
|
if not DRY_RUN:
|
|
existing_team.delete()
|
|
existing_team = None
|
|
|
|
if not DRY_RUN and not existing_team:
|
|
logger.info(f"\tCreating team '{team.name}'")
|
|
new_team = gitea_client.create_team(
|
|
org=orga,
|
|
name=team.name,
|
|
description=team.description,
|
|
permission="read",
|
|
includes_all_repositories=False,
|
|
can_create_org_repo=False,
|
|
units=[
|
|
"repo.code",
|
|
"repo.issues",
|
|
"repo.ext_issues",
|
|
"repo.wiki",
|
|
"repo.pulls",
|
|
"repo.releases",
|
|
"repo.ext_wiki",
|
|
"repo.actions",
|
|
"repo.projects",
|
|
],
|
|
units_map={
|
|
"repo.code": "none",
|
|
"repo.ext_issues": "none",
|
|
"repo.ext_wiki": "none",
|
|
"repo.issues": "none",
|
|
"repo.projects": "none",
|
|
"repo.pulls": "none",
|
|
"repo.releases": "none",
|
|
"repo.wiki": "none",
|
|
},
|
|
)
|
|
|
|
if not DRY_RUN and existing_team:
|
|
logger.info(f"\tUpdating existing team '{team.name}'")
|
|
existing_team.description = team.description
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to copy team '{team.name}': {e}")
|