# -*- coding: utf-8 -*- from __future__ import annotations from typing import TYPE_CHECKING, Iterable if TYPE_CHECKING: from typing import Sequence from ExecContext import ExecContext import os, sys, json from argparse import Namespace from urllib.parse import urlparse from enum import Enum, auto from .log import * class AskpassKey(Enum): Username = auto() Password = auto() def pretty_cmd(cmd: list[str], wd=None): tokens = [cmd[0]] for token in cmd[1:]: if token.find(' ') != -1: token = '"' + token + '"' tokens.append(token) ret = ' '.join(tokens) if wd is not None: ret += f' in {wd}' return ret # See ExecContext.run() for what this function does async def run_cmd(*args, ec: ExecContext|None=None, verbose: bool|None=None, **kwargs) -> Result: if verbose is None: verbose = False if ec is None else ec.verbose_default if ec is None: from .ec.Local import Local ec = Local(verbose_default=verbose) return await ec.run(verbose=verbose, *args, **kwargs) async def run_curl(args: list[str], parse_json: bool=True, wd=None, throw=None, verbose=None, cmd_input=None, ec: ExecContext|None=None) -> dict|str: # export if verbose is None: verbose = False if ec is None else ec.verbose_default cmd = ['curl'] if not verbose: cmd.append('-s') cmd.extend(args) ret, stderr, status = await run_cmd(cmd, wd=wd, throw=throw, verbose=verbose, cmd_input=cmd_input, ec=ec).decode() if parse_json: try: ret = json.loads(ret) except Exception as e: size = 'unknown number of' try: size = len(ret) except: pass print(f'Failed to parse {size} bytes output of command ' + f'>{pretty_cmd(cmd, wd)}< ({str(e)}): "{ret}"', file=sys.stderr) raise return ret, stderr, status async def run_askpass(askpass_env: list[str], key: AskpassKey, host: str|None=None, ec: ExecContext|None=None): assert host is None # Currently unsupported for var in askpass_env: exe = os.getenv(var) if exe is None: continue exe_arg = '' match var: case 'GIT_ASKPASS': match key: case AskpassKey.Username: exe_arg += 'Username' case AskpassKey.Password: exe_arg += 'Password' case 'SSH_ASKPASS': match key: case AskpassKey.Username: continue # Can't get user name from SSH_ASKPASS case AskpassKey.Password: exe_arg += 'Password' ret, stderr, status = await run_cmd([exe, exe_arg], throw=False, ec=ec).decode() if ret is not None: return ret return None async def run_sudo(cmd: list[str], *args, interactive: bool=True, ec: ExecContext|None=None, **kwargs): if ec is None: from .ec.Local import Local ec = Local(interactive=interactive) return await ec.sudo(cmd, *args, **kwargs) async def get_username(args: Namespace|None=None, url: str|None=None, askpass_env: list[str]=[], ec: ExecContext|None=None) -> str: # export url_user = None if url is None else urlparse(url).username if args is not None: if args.username is not None: if url_user is not None and url_user != args.username: raise Exception(f'Username mismatch: called with --username="{args.username}", URL has user name "{url_user}"') return args.username if url_user is not None: return url_user return await run_askpass(askpass_env, AskpassKey.Username, ec=ec) async def get_password(args: Namespace|None=None, url: str|None=None, askpass_env: list[str]=[], ec: ExecContext|None=None) -> str: # export if args is None and url is None and not askpass_env: raise Exception(f'Neither URL nor command-line arguments nor askpass environment variable available, can\'t get password') if args is not None and hasattr(args, 'password'): # use getattr(), because we don't necessarily want to have insecure --password among options ret = getattr(args, 'password') if ret is not None: return ret if url is not None: parsed = urlparse(url) if parsed.password is not None: return parsed.password return await run_askpass(askpass_env, AskpassKey.Password, ec=ec) async def get_profile_env(throw: bool=True, keep: Iterable[str]|bool=False, ec: ExecContext|None=None) -> dict[str, str]: # export """ Get a fresh environment from /etc/profile Args: keep: - False -> Don't keep anything - True -> Keep what's in the current environment - List of strings -> Keep those variables Returns: Dictionary with fresh environment """ env: dict[str,str]|None = None if keep == False or isinstance(keep, Iterable): env = { 'HOME': os.environ.get('HOME', '/'), 'USER': os.environ.get('USER', ''), 'PATH': '/usr/bin:/bin', } # Run bash as a login shell, which sources /etc/profile, then print environment as NUL-separated key=value pairs cmd = ['/usr/bin/env', '-i', '/bin/bash', '-lc', 'env -0'] result = await run_cmd(cmd, throw=throw, verbose=True, env=env, ec=ec) ret: dict[str, str] = {} for entry in result.stdout.rstrip(b"\0").split(b"\0"): if not entry: continue key, val = entry.split(b"=", 1) ret[key.decode()] = val.decode() if isinstance(keep, Iterable): for key in keep: val = os.getenv(key) if val is not None: ret[key] = val return ret