# -*- coding: utf-8 -*- import os, sys, subprocess, json, time from argparse import Namespace from urllib.parse import urlparse from enum import Enum, auto class AskpassKey(Enum): Username = auto() Password = auto() def pretty_cmd(cmd: list[str], wd=None): tokens = [cmd[0]] for token in cmd[1:]: if token.find(' ') != -1: token = '"' + token + '"' tokens.append(token) ret = ' '.join(tokens) if wd is not None: ret += f' in {wd}' return ret def run_cmd(cmd: list[str], wd=None, throw=True, verbose=False, cmd_input=None) -> str|None: # export if verbose: delim_len = 120 delim = f'---- running {pretty_cmd(cmd, wd)} -' delim = delim + '-' * (delim_len - len(delim)) print(',' + delim + ' >') cwd: str|None = None if wd is not None: cwd = os.getcwd() os.chdir(wd) ret = '' try: stdin = None if cmd_input is not None: stdin = subprocess.PIPE p = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=None, close_fds=True, stdin=stdin) if cmd_input is not None: ret = p.communicate(input=cmd_input)[0] else: for line in iter(p.stdout.readline, b''): line = line.decode(sys.stdout.encoding) ret += line p.wait() if verbose: print('`' + delim + ' <') if p.returncode: if verbose: print(' '.join(cmd) + ' failed') raise Exception(time.strftime('%Y-%m-%d %H:%M') + f': Command returned an error: "{pretty_cmd(cmd, wd)}"') finally: if cwd: os.chdir(cwd) return ret def run_curl(args: list[str], parse_json: bool=True, wd=None, throw=None, verbose=False, cmd_input=None) -> dict|str: # export cmd = ['curl'] if not verbose: cmd.append('-s') cmd.extend(args) ret = run_cmd(cmd, wd=wd, throw=throw, verbose=verbose, cmd_input=cmd_input) if parse_json: try: return json.loads(ret) except Exception as e: print(f'Failed to parse {len(ret)} bytes output of command >{pretty_cmd(cmd, wd)}< ({e})', file=sys.stderr) raise return ret def run_askpass(askpass_env: list[str], key: AskpassKey, host: str|None=None): assert host is None # Currently unsupported for var in askpass_env: exe = os.getenv(var) if exe is None: continue exe_arg = '' match var: case 'GIT_ASKPASS': match key: case AskpassKey.Username: exe_arg += 'Username' case AskpassKey.Password: exe_arg += 'Password' case 'SSH_ASKPASS': match key: case AskpassKey.Username: continue # Can't get user name from SSH_ASKPASS case AskpassKey.Password: exe_arg += 'Password' ret = run_cmd([exe, exe_arg], throw=False) if ret is not None: return ret return None def get_username(args: Namespace|None=None, url: str|None=None, askpass_env: list[str]=[]) -> str: # export url_user = None if url is None else urlparse(url).username if args is not None: if args.username is not None: if url_user is not None and url_user != args.username: raise Exception(f'Username mismatch: called with --username="{args.username}", URL has user name "{url_user}"') return args.username if url_user is not None: return url_user return run_askpass(askpass_env, AskpassKey.Username) def get_password(args: Namespace|None=None, url: str|None=None, askpass_env: list[str]=[]) -> str: # export if args is None and url is None and not askpass_env: raise Exception(f'Neither URL nor command-line arguments nor askpass environment variable available, can\'t get password') if args is not None and hasattr(args, 'password'): # use getattr(), because we don't necessarily want to have insecure --password among options ret = getattr(args, 'password') if ret is not None: return ret if url is not None: parsed = urlparse(url) if parsed.password is not None: return parsed.password return run_askpass(askpass_env, AskpassKey.Password)