Gitlab Community Edition Instance

Commit 906b8506 authored by mhellka's avatar mhellka
Browse files

Moved more functionality to CliContext and Workspace

parent f2b55011
import os
import urllib.parse
PATH_TYPES = (str,)
if hasattr(os, "PathLike"):
......@@ -15,3 +16,19 @@ class cached_property(object):
return self
value = obj.__dict__[self.func.__name__] = self.func(obj)
return value
def url_split_auth(url: str):
""" Extract and remove auth information from an URL. Return a (cleaned-url, username, password) tuple.
The cleaned-url will have any auth information removed from the netloc part.
Username and password may be None if not present.
"""
split = urllib.parse.urlsplit(url)
username, password = split.username, split.password
if username or password:
netloc = split.host
if split.port:
netloc += ":" + str(split.port)
url = urllib.parse.urlunsplit(split._replace(netloc=netloc))
return url, username, password
......@@ -7,8 +7,8 @@ import typing
import requests
from pycdstar3.model import ApiError, JsonObject, FileDownload, FormUpdate
from pycdstar3._utils import PATH_TYPES
from pycdstar3.model import ApiError, JsonObject, FileDownload, FormUpdate
__all__ = "CDStar", "CDStarVault", "FormUpdate", "ApiError"
......
......@@ -7,7 +7,6 @@ import json
import sys
from pycdstar3 import __version__ as VERSION, ApiError
from pycdstar3.cli._utils import Printer
__ALL__ = ["main", "printer"]
......@@ -25,10 +24,6 @@ subparsers = parser.add_subparsers(title="available commands",
description='Run "COMAMND -h" to get help for a specific command.',
metavar="COMMAND")
#: This should be used to print optional messages to the user.
#: Messages are printed to stderr, so only use it for complementary information, not for the primary results.
printer = Printer(level=0, file=sys.stderr)
def _autodiscover_commands():
""" Autodiscover and import all modules in the pycdstar3.cli.commands namespace.
......@@ -57,34 +52,29 @@ def main(*args): # noqa: C901
print("pycdstar3-" + VERSION)
sys.exit(0)
# Set root logging level based on verbosity setting
if opts.quiet:
printer.set_verbosity(-1)
else:
printer.set_verbosity(opts.verbose)
if not hasattr(opts, "main"):
# No sub-command specified
parser.print_help()
return 1
from pycdstar3.cli.context import CliContext
ctx = CliContext(opts, workdir=".")
try:
from pycdstar3.cli.context import CliContext
ctx = CliContext(opts, workdir=".")
return opts.main(ctx, opts) or 0
except KeyboardInterrupt:
printer("Exiting...")
ctx.print("Exiting...")
return 0
except CliError as e:
printer.error(str(e))
ctx.print.error(str(e))
return e.return_code
except ApiError as e:
printer.error(str(e))
printer.v("Full error response:")
printer.v(json.dumps(e.json, indent=2), indent=2)
ctx.print.error(str(e))
ctx.print.v("Full error response:")
ctx.print.v(json.dumps(e.json, indent=2), indent=2)
return e.status
except Exception as e:
printer.fatal("Uncaught exception ({}: {}). Exiting...", type(e).__name__, e)
ctx.print.fatal("Uncaught exception ({}: {}). Exiting...", type(e).__name__, e)
return 1
......
......@@ -2,7 +2,6 @@
Manage access control lists (ACL)
"""
from pycdstar3 import FormUpdate
from pycdstar3.cli import printer
def register(subparsers):
......@@ -23,10 +22,10 @@ def acl_set(ctx, args):
subject = args.SUBJECT
permissions = args.PERMISSIONS
printer("Set ACL for {!r} on /{}/{} -> [{}]", subject, vault, archive, ', '.join(permissions))
ctx.print("Set ACL for {!r} on /{}/{} -> [{}]", subject, vault, archive, ', '.join(permissions))
with archive.api.begin(autocommit=True):
update = FormUpdate()
update.acl(subject, *permissions)
client.update_archive(vault, archive, form=update)
printer("Done")
ctx.print("Done")
......@@ -5,7 +5,7 @@ import os
import sys
from pycdstar3.cli._utils import hbytes
from pycdstar3.cli import printer, CliError
from pycdstar3.cli import CliError
def register(subparsers):
......@@ -50,7 +50,7 @@ def get(ctx, args): # noqa: C901
if partfile and os.path.exists(partfile):
if resume:
offset = os.path.getsize(partfile)
printer("Resuming download at: {}", hbytes(offset))
ctx.print("Resuming download at: {}", hbytes(offset))
else:
raise CliError("Found partial download, but --resume is not set: " + partfile)
......@@ -67,10 +67,10 @@ def get(ctx, args): # noqa: C901
if ispipe and out.isatty() and not dl.type.startswith("text/"):
raise CliError("Not printing binary data ({}) a terminal".format(dl.type))
if progress and not printer.quiet:
if progress and not ctx.print.quiet:
from tqdm import tqdm
pbar = tqdm(total=dl.size+offset, initial=offset, unit='b', unit_scale=True, unit_divisor=1024,
dynamic_ncols=True, file=printer.file)
dynamic_ncols=True, file=ctx.print.file)
def write(chunk):
pbar.update(len(chunk))
......@@ -80,7 +80,7 @@ def get(ctx, args): # noqa: C901
pbar.close()
out.close()
printer.v("Downloading {} ({})", file, hbytes(dl.size + offset))
ctx.print.v("Downloading {} ({})", file, hbytes(dl.size + offset))
for chunk in dl.iter_content():
write(chunk)
......@@ -92,7 +92,7 @@ def get(ctx, args): # noqa: C901
else:
os.rename(partfile, dst)
printer.v("Done!", file, vault, archive, hbytes(dl.size + offset))
ctx.print.v("Done!", file, vault, archive, hbytes(dl.size + offset))
except (KeyboardInterrupt, Exception):
if close:
......@@ -101,6 +101,6 @@ def get(ctx, args): # noqa: C901
try:
os.remove(partfile)
except OSError:
printer("Failed to delete partial download: {}", partfile)
ctx.print("Failed to delete partial download: {}", partfile)
raise
raise
......@@ -8,14 +8,12 @@ If the main --config parameter is set, the configuration is saved at the specifi
instead of the current working directory.
"""
import configparser
import os
import re
from pycdstar3 import __url__ as help_url
from pycdstar3.api import CDStar
from pycdstar3.cli import printer, CliError
from pycdstar3.cli.context import CONFDIR_NAME, CONFIG_NAME
from pycdstar3.cli import CliError
from pycdstar3.cli.context import Workspace
def register(subparsers):
......@@ -25,14 +23,14 @@ def register(subparsers):
def command(ctx, args):
workspace = os.path.abspath(args.PATH)
confdir = os.path.join(workspace, CONFDIR_NAME)
conffile = os.path.join(confdir, CONFIG_NAME)
if os.path.exists(confdir) or os.path.exists(conffile):
raise CliError("Config directory '{}' already exists.".format(confdir))
root = os.path.abspath(args.PATH)
work = Workspace(root)
printer("Creating workspace in: {}", workspace)
printer()
if os.path.exists(work.configdir) or os.path.exists(work.configfile):
raise CliError("Config directory '{}' already exists.".format(work.configdir))
ctx.print("Creating workspace in: {}", work.root)
ctx.print()
server = ask("CDSTAR Server URI", args.server, r"^https?://.*/?$")
if not server.endswith("/"):
......@@ -41,28 +39,22 @@ def command(ctx, args):
server += "v3/"
try:
printer(" Connecting to {} ... ", server, end="")
ctx.print(" Connecting to {} ... ", server, end="")
info = CDStar(server).service_info()
printer("OK ({})", info.version.cdstar)
ctx.print("OK ({})", info.version.cdstar)
vaults = info.vaults or []
printer(" Available vaults: {}", ', '.join(sorted(vaults)))
ctx.print(" Available vaults: {}", ', '.join(sorted(vaults)))
except Exception:
raise CliError("Failed to connect")
printer()
ctx.print()
vault = ask("Select a vault", args.vault or (info.vaults and info.vaults[0]), choice=vaults, rx=".+")
config = configparser.ConfigParser()
config["DEFAULT"]["server"] = server
config["DEFAULT"]["vault"] = vault
os.mkdir(confdir, 0o700)
with open(conffile, 'x') as fp:
fp.write("# This is a pycdstar3 workspace config file. See {}\n".format(help_url))
config.write(fp)
work.store_config(server=server, vault=vault)
printer()
printer("Config saved to: {}", confdir)
ctx.print()
ctx.print.v("Config saved to: {}", work.configfile)
ctx.print("Created workspace directory: {}", work.root)
def ask(q, default=None, rx=None, choice=None, password=False):
......@@ -81,13 +73,13 @@ def ask(q, default=None, rx=None, choice=None, password=False):
if not val:
if default:
return default
printer("No input. Try again...")
print("No input. Try again...")
continue
if rx and not re.match(rx, val):
printer("This does not look right. Try again...")
print("This does not look right. Try again...")
continue
if choice and val not in choice:
printer("Please select one of: {}", choice)
print("Please select one of: {}", choice)
continue
return val
......@@ -6,7 +6,6 @@ from collections import defaultdict
import iso8601
from pycdstar3.cli import printer
from pycdstar3.cli._utils import hbytes
......@@ -62,7 +61,8 @@ def ls(ctx, args):
n += 1
b += file['size']
print(file2str(fmt, file))
printer("\nTotal: {:,} files {:,} bytes", n, b)
ctx.print()
ctx.print("Total: {:,} files {:,} bytes", n, b)
def file2str(fmt, file):
......
......@@ -16,7 +16,7 @@ you can simply re-run the same command.
import os
from pycdstar3 import FormUpdate
from pycdstar3.cli import printer, CliError
from pycdstar3.cli import CliError
from pycdstar3.cli._utils import hbytes, kvtype, globtype
......@@ -72,13 +72,13 @@ def collect_files(path, hidden=False):
raise CliError("Not a file: " + path)
def filter_files(files, inc_rules, exc_rules):
def filter_files(files, inc_rules, exc_rules, debug):
for file in files:
if inc_rules and not any(rule.match(file) for rule in inc_rules):
printer.vv("Skipping: {} (not included)", file)
debug("Skipping: {} (not included)", file)
continue
if any(rule.match(file) for rule in exc_rules):
printer.vv("Skipping: {} (excluded)", file)
debug("Skipping: {} (excluded)", file)
continue
yield file
......@@ -112,7 +112,7 @@ def command(ctx, args): # noqa: C901
total = 0
for path in args.PATH:
files = collect_files(path, args.include_hidden)
files = filter_files(files, inc_rules, exc_rules)
files = filter_files(files, inc_rules, exc_rules, debug=ctx.print.vv)
for file in files:
fstat = os.stat(file)
uploads[remote_name(".", file, prefix, args.flat)] = file, fstat
......@@ -128,9 +128,10 @@ def command(ctx, args): # noqa: C901
if args.dry_run:
for target in sorted(uploads):
printer(target)
printer("\nWould upload {} files ({}) to {}",
len(uploads), hbytes(total), "new archive" if archive == 'new' else "archive: " + vault + "/" + archive)
ctx.print(target)
ctx.print("\nWould upload {} files ({}) to {}",
len(uploads), hbytes(total),
"new archive" if archive == 'new' else "archive: " + vault + "/" + archive)
return
form = FormUpdate()
......@@ -142,21 +143,21 @@ def command(ctx, args): # noqa: C901
with client.begin(autocommit=True):
if archive == 'new':
archive = client.create_archive(vault, form=form)['id']
printer("Created new archive: /{}/{} ", vault, archive)
ctx.print("Created new archive: /{}/{} ", vault, archive)
elif form.fields:
client.update_archive(vault, archive, form=form)
printer("Updated archive: /{}/{} ", vault, archive)
ctx.print("Updated archive: /{}/{} ", vault, archive)
elif not client.exists(vault, archive):
raise CliError("Archive /{}/{} does not exist.".format(vault, archive))
printer("Uploading {} files ({}) to archive: /{}/{}", len(uploads), hbytes(total), vault, archive)
ctx.print("Uploading {} files ({}) to archive: /{}/{}", len(uploads), hbytes(total), vault, archive)
pbar = None
if args.progress and not printer.quiet:
if args.progress and not ctx.print.quiet:
from tqdm import tqdm
total = sum(stat.st_size for (file, stat) in uploads.values())
pbar = tqdm(total=total, unit='b', unit_scale=True, unit_divisor=1024, dynamic_ncols=True,
file=printer.file)
file=ctx.print.file)
try:
for i, target in enumerate(sorted(uploads)):
......@@ -170,10 +171,10 @@ def command(ctx, args): # noqa: C901
chunks = (chunk for chunk in chunks if not pbar.update(len(chunk)))
fp = chunks
else:
printer(line)
ctx.print(line)
client.put_file(vault, archive, target, fp)
finally:
if pbar:
pbar.close()
printer("Done! Uploaded {} files ({}) to archive: /{}/{}", len(uploads), hbytes(total), vault, archive)
ctx.print("Done! Uploaded {} files ({}) to archive: /{}/{}", len(uploads), hbytes(total), vault, archive)
import configparser
import os
import urllib.parse
import sys
import typing
from pycdstar3._utils import cached_property
from pycdstar3.cli import printer, CliError
from pycdstar3.cli._utils import walk_up
from pycdstar3 import CDStar
from pycdstar3 import CDStar, __url__ as help_url
from pycdstar3._utils import cached_property, url_split_auth
from pycdstar3.cli import CliError
from pycdstar3.cli._utils import walk_up, Printer
CONFDIR_NAME = '.cdstar'
CONFIG_NAME = "pycdstar.conf"
CONFIG_NAME = "workspace.conf"
CONFIG_SECTION = "pycdstar3"
ENV_PREFIX = "CDSTAR_"
class CliContext:
""" Search for config files and archive directory in or above the current working directory. """
""" Provide context and tools to CLI commands.
This class may provide anything that is used by more than one command. For example a ready to use CDSTAR client,
workspace config settings, default vault or other global settings. Some properties may raise CliError ask for
user input.
"""
def __init__(self, args, workdir="."):
self.args = args
self.workdir = os.path.abspath(workdir)
self.workdir = os.path.abspath(str(workdir))
@cached_property
def client(self):
server = self.server_uri
url = urllib.parse.urlsplit(server)
if url.username:
auth = (url.username, url.password or self._ask_pass(url))
return CDStar(server, auth=auth)
return CDStar(server)
def print(self) -> Printer:
""" An instance of :class:`pycdstar3.cli._utils.Printer` to print optional messages to the user.
Messages are printed to stderr, so only use it for complementary information, not for the primary results.
"""
printer = Printer(level=0, file=sys.stderr)
@cached_property
def vault(self):
return self._get_mandatory("vault")
if self.args.quiet:
printer.set_verbosity(-1)
else:
printer.set_verbosity(self.args.verbose)
return printer
@cached_property
def server_uri(self):
return self._get_mandatory("server")
def client(self) -> CDStar:
server_url = self._require_setting("server")
url, user, pwd = url_split_auth(server_url)
if user:
return CDStar(url, auth=(user, pwd or self._ask_pass(server_url)))
return CDStar(url)
@cached_property
def config(self):
if self.confdir:
conf = os.path.join(self.confdir, CONFIG_NAME)
if os.path.isfile(conf):
return Config(conf)
def vault(self):
return self._require_setting("vault")
@cached_property
def confdir(self):
confdir = _find_config(self.workdir)
if confdir:
printer.vv("Found config directory: {}", confdir)
return confdir
def _get_mandatory(self, name):
result = getattr(self.args, name) or os.environ.get(ENV_PREFIX + name.upper()) or (
self.config and self.config[name])
if not result:
raise CliError("Missing --{} parameter.\n\n"
" Alternatively, set the {} environment variable or create a workspace."
.format(name, ENV_PREFIX + name.upper()))
return result
def workspace(self) -> "Workspace":
# TODO: Allow to set a workspace via ENV
ws = _find_workspace(self.workdir)
if ws:
self.print.vv("Found workspace directory: {}", ws.root)
return ws
def _get_setting(self, name) -> typing.Any:
""" Look for a setting in command-line arguments, environment variables or workspace configuration. """
return getattr(self.args, name, None) \
or os.environ.get(ENV_PREFIX + name.upper()) \
or (self.workspace and self.workspace.config.get(name))
def _require_setting(self, name) -> typing.Any:
result = self._get_setting(name)
if result:
return result
raise CliError("Missing --{} parameter.\n\n"
" Alternatively, set the {} environment variable or create a workspace."
.format(name, ENV_PREFIX + name.upper()))
def _ask_pass(self, url):
# Enter password for {url.scheme}://{url.netloc}/{url.path} (user={url.username})
raise RuntimeError("Asking for password not implemented yet")
def _find_config(start="."):
for path in walk_up(start):
confdir = os.path.join(path, CONFDIR_NAME)
if os.path.isdir(confdir):
return confdir
def _find_workspace(start='.'):
for path in walk_up(os.path.abspath(start)):
if os.path.isfile(os.path.join(path, CONFDIR_NAME, CONFIG_NAME)):
return Workspace(path)
class Workspace:
""" Workspace directory with configuration and more.
Currently a workspace directory is simply a folder which contains a `.cdstar/workspace.conf` file.
"""
def __init__(self, root, config_dir=CONFDIR_NAME, config_file=CONFIG_NAME):
self.root = os.path.abspath(str(root))
self.configdir = os.path.abspath(os.path.join(self.root, config_dir))
self.configfile = os.path.abspath(os.path.join(self.configdir, config_file))
class Config:
""" Configuration read from a workspace directory. """
def validate(self):
for path in (self.configdir, self.configfile):
if not os.path.isdir(path):
raise CliError("Not a valid workspace: {} not found".format(path))
def __init__(self, path):
import configparser
self.path = os.path.abspath(path)
@cached_property
def config(self):
cfg = configparser.ConfigParser()
cfg.read(path)
self.data = dict(cfg["DEFAULT"])
cfg.read(self.configfile)
if CONFIG_SECTION not in cfg:
return {}
return dict(cfg[CONFIG_SECTION])
def __contains__(self, item):
return item in self.config
def __iter__(self):
return iter(self.data)
def get(self, item, default=None):
return self.config.get(item, default)
def __getitem__(self, item):
try:
return self.data[item]
except KeyError:
raise KeyError("Missing config field: " + item)
return self.config[item]
def __contains__(self, item):
return item in self.data
def store_config(self, **settings):
cfg = configparser.ConfigParser()
if os.path.exists(self.configfile):
cfg.read(self.configfile)
cfg[CONFIG_SECTION] = settings
os.makedirs(self.configdir, mode=0o700, exist_ok=True)
with open(self.configfile, 'w') as fp:
fp.write("# This is a pycdstar3 workspace config file. See {}\n".format(help_url))
cfg.write(fp)
self.__dict__.pop("config", None)
"""
Data objects or wrappers used by the api module.
"""
import abc
import datetime
import os
from json import JSONDecodeError
import iso8601
import requests
from requests_toolbelt import MultipartEncoder
from pycdstar3._utils import PATH_TYPES
class JsonObject:
""" A wrapper for a JSON object that allows attribute-access to all its fields and provides convenience methods
for data type conversions. """
__slots__ = ['__data']
def __init__(self, data):
self.__data = data
def get(self, item, default=None):
""" Get a field value by name, or a default value. """
return self.__data.get(item, default)
def get_datetime(self, item):
""" Get a date field as a parsed datetime. """
value = self[item]
if isinstance(value, abc.Number):
return datetime.datetime.fromtimestamp(value / 1000)
else:
return iso8601.parse_date(value)
def __bool__(self):
return bool(self.__data)