GiteaPC/giteapc/repo.py

372 lines
11 KiB
Python

import giteapc.gitea as gitea
from giteapc.repos import LocalRepo, RemoteRepo
from giteapc.util import *
from giteapc.config import REPO_FOLDER, PREFIX_FOLDER
import fnmatch
import shutil
import os
import re
#
# Determine full repo names from short versions
#
def local_match(name):
"""Find all local repositories with the specified name."""
return [ r for r in LocalRepo.all() if r.name == name ]
def remote_match(name):
"""Find all remote repositories matching the given name."""
return [ r for r in gitea.all_remote_repos() if r.name == name ]
def resolve(name, local_only=False, remote_only=False):
assert not (local_only and remote_only)
# Resolve full names directly
if "/" in name:
if not remote_only and LocalRepo.exists(name):
return LocalRepo(name)
if local_only:
raise ResolveMissingException(name, local_only, remote_only)
r = gitea.repo_get(name)
if r is None:
raise ResolveMissingException(name, local_only, remote_only)
return r
# Match local names without owners
if not remote_only:
r = local_match(name)
if len(r) == 0 and local_only:
raise ResolveMissingException(name, local_only, remote_only)
elif len(r) == 1:
return r[0]
elif len(r) > 1:
raise ResolveAmbiguousException(name, r, "local")
# Match remote names without owners
if not local_only:
r = remote_match(name)
if len(r) == 0:
raise ResolveMissingException(name, local_only, remote_only)
elif len(r) == 1:
return r[0]
else:
raise ResolveAmbiguousException(name, r, "remote")
#
# Utilities
#
def print_repo(r, branches=None, tags=None, has_giteapc=True):
color = "ARGYBMW"[sum(map(ord,r.owner[:5])) % 7]
print(colors()[color] +
"{}{_}/{W}{}{_}".format(r.owner, r.name, **colors()), end="")
print(" (remote)" if r.remote else " (local)", end="")
if r.remote and r.parent:
print(" {A}[{}]{_}".format(r.parent.fullname, **colors()), end="")
if r.remote and has_giteapc == False:
print(" {R}(NOT SUPPORTED){_}".format(**colors()), end="")
print("")
if r.remote:
print(("\n" + r.description).replace("\n", "\n ")[1:])
else:
print(" {W}HEAD:{_}".format(**colors()), r.describe(all=True))
print(" {W}Path:{_}".format(**colors()), r.folder, end="")
if os.path.islink(r.folder):
print(" ->", os.readlink(r.folder))
else:
print("")
branches = r.branches()
tags = r.tags()
if branches:
print(" {W}Branches:{_}".format(**colors()), end="")
for b in branches:
if "local" in b and b["local"] == False:
print(" {A}{}{_}".format(b["name"], **colors()), end="")
else:
print(" " + b["name"], end="")
print("")
if tags:
print(" {W}Tags:{_}".format(**colors()),
" ".join(t["name"] for t in reversed(tags)))
def pretty_repo(r):
color = "ARGYBMW"[sum(map(ord,r.owner[:5])) % 7]
return colors()[color] + "{}{_}/{W}{}{_}".format(r.owner,r.name,**colors())
def split_config(name):
"""Splits REPOSITORY[@VERSION][:CONFIGURATION] into components."""
RE_CONFIG = re.compile(r'^([^@:]+)(?:@([^@:]+))?(?:[:]([^@:]+))?')
m = re.match(RE_CONFIG, name)
if m is None:
return None
repo, version, config = m[1], m[2] or "", m[3] or ""
return repo, version, config
def make_config(name, version, config):
version = f"@{version}" if version else ""
config = f":{config}" if config else ""
return name + version + config
#
# repo list command
#
def list(*args, remote=False):
if len(args) > 1:
return fatal("repo list: too many arguments")
if remote:
# Since there aren't many repositories under [giteapc], just list them
# and then filter by hand (this avoids some requests and makes search
# results more consistent with local repositories)
repos = gitea.all_remote_repos()
else:
repos = LocalRepo.all()
# - Ignore case in pattern
# - Add '*' at start and end to match substrings only
pattern = args[0].lower() if args else "*"
if not pattern.startswith("*"):
pattern = "*" + pattern
if not pattern.endswith("*"):
pattern = pattern + "*"
# Filter
repos = [ r for r in repos
if fnmatch.fnmatch(r.fullname.lower(), pattern)
or r.remote and fnmatch.fnmatch(r.description.lower(), pattern) ]
# Print
if repos == []:
if args:
print(f"no repository matching '{args[0]}'")
else:
print(f"no repository")
return 1
else:
for r in repos:
print_repo(r)
return 0
#
# repo fetch command
#
def fetch(*args, use_ssh=False, use_https=False, force=False, update=False):
# Use HTTPS by default
if use_ssh and use_https:
return fatal("repo fetch: --ssh and --https are mutually exclusive")
protocol = "ssh" if use_ssh else "https"
# With no arguments, fetch all local repositories
if args == ():
for r in LocalRepo.all():
msg(f"Fetching {pretty_repo(r)}...")
r.fetch()
if update:
r.pull()
return 0
for spec in args:
name, version, config = split_config(spec)
r = resolve(name)
# If this is a local repository, just git fetch
if not r.remote:
if version:
msg("Checking out {W}{}{_}".format(version, **colors()))
r.checkout(version)
msg(f"Fetching {pretty_repo(r)}...")
r.fetch()
if update:
r.pull()
continue
msg(f"Cloning {pretty_repo(r)}...")
# For remote repositories, make sure the repository supports GiteaPC
has_tag = "giteapc" in gitea.repo_topics(r)
if has_tag or force:
LocalRepo.clone(r, protocol)
if not has_tag and force:
warn(f"{r.fullname} doesn't have the [giteapc] tag")
if not has_tag and not force:
fatal(f"{r.fullname} doesn't have the [giteapc] tag, use -f to force")
#
# repo show command
#
def show(*args, remote=False, path=False):
if remote and path:
raise Error("repo show: -r and -p are exclusive")
if not remote:
for name in args:
r = resolve(name, local_only=True)
if path:
print(LocalRepo.path(r.fullname))
else:
print_repo(r)
return 0
repos = []
for name in args:
r = resolve(name, remote_only=True)
branches = gitea.repo_branches(r)
tags = gitea.repo_tags(r)
topics = gitea.repo_topics(r)
repos.append((r, branches, tags, "giteapc" in topics))
for (r, branches, tags, has_giteapc) in repos:
print_repo(r, branches, tags, has_giteapc=has_giteapc)
#
# repo build command
#
def build(*args, install=False, skip_configure=False, update=False):
if len(args) < 1:
return fatal("repo build: specify at least one repository")
specs = []
for spec in args:
repo, version, config = split_config(spec)
repo = resolve(repo, local_only=True)
specs.append((repo, version, config))
for (r, version, config) in specs:
pretty = pretty_repo(r)
config_string = f" for {config}" if config else ""
if version != "":
msg("{}: Checking out {W}{}{_}".format(pretty, version, **colors()))
r.checkout(version)
if update:
previous = r.describe()
r.pull()
new = r.describe()
if new == previous:
msg("{}: Still at {W}{}{_}, skipping build".format(pretty,
previous, **colors()))
continue
# Check that the project has a Makefile
if not os.path.exists(r.makefile):
raise Error(f"{r.fullname} has no giteapc.make")
env = os.environ.copy()
if config:
env["GITEAPC_CONFIG"] = config
env["GITEAPC_PREFIX"] = PREFIX_FOLDER
if not skip_configure:
msg(f"{pretty}: Configuring{config_string}")
r.make("configure", env)
msg(f"{pretty}: Building")
r.make("build", env)
if install:
msg(f"{pretty}: Installing")
r.make("install", env)
msg(f"{pretty}: Done! :D")
#
# repo install command
#
def install(*args, use_https=False, use_ssh=False, update=False, yes=False):
if args == ():
return 0
def recursive_fetch(spec, fetched_so_far):
repos_to_build = []
repo, version, config = split_config(spec)
r = resolve(repo, local_only=True)
if r.fullname not in fetched_so_far:
fetched_so_far.add(r.fullname)
fetch(repo, use_https=use_https, use_ssh=use_ssh, update=update)
for dep_spec in r.dependencies():
rtb, fsf = recursive_fetch(dep_spec, fetched_so_far)
repos_to_build += rtb
fetched_so_far = fetched_so_far.union(fsf)
return repos_to_build + [(r, version, config)], fetched_so_far
# First download every repository, and only then build
repos_to_build = []
fetched_so_far = set()
for spec in args:
rtb, fsf = recursive_fetch(spec, fetched_so_far)
repos_to_build += rtb
fetched_so_far = fetched_so_far.union(fsf)
# Eliminate duplicates and look for version collisions
rd = dict()
for (r, version, config) in repos_to_build:
name = r.fullname
if name not in rd:
rd[name] = (version, config)
elif rd[name] != (version, config):
s1 = make_config(name, *rd[name])
s2 = make_config(name, version, config)
return fatal(f"repo install: cannot install both {s1} and {s2}")
msg("Will install:", ", ".join(make_config(pretty_repo(r), version, config)
for (r, version, config) in repos_to_build))
if not yes:
msg("Is that okay (Y/n)? ", end="")
confirm = input().strip()
if confirm not in ["Y", "y", ""]:
return 1
for (r, version, config) in repos_to_build:
build(make_config(r.fullname, version, config), install=True,
update=update)
#
# repo uninstall command
#
def uninstall(*args, keep=False):
if len(args) < 1:
return fatal("repo uninstall: specify at least one repository")
for name in args:
r = resolve(name, local_only=True)
msg(f"{pretty_repo(r)}: Uninstalling")
env = os.environ.copy()
env["GITEAPC_PREFIX"] = PREFIX_FOLDER
r.make("uninstall", env)
if not keep:
msg("{}: {R}Removing files{_}".format(pretty_repo(r), **colors()))
if os.path.isdir(r.folder):
shutil.rmtree(r.folder)
elif os.path.islink(r.folder):
os.remove(r.folder)
else:
raise Error(f"cannot handle {r.folder} (not a folder/symlink)")
parent = os.path.dirname(r.folder)
if not os.listdir(parent):
os.rmdir(parent)