diff --git a/src/python/jw/build/cmds/CmdListRepos.py b/src/python/jw/build/cmds/CmdListRepos.py index 8334ca01..a63d9dec 100644 --- a/src/python/jw/build/cmds/CmdListRepos.py +++ b/src/python/jw/build/cmds/CmdListRepos.py @@ -23,10 +23,13 @@ class CmdListRepos(Cmd): # export from urllib.parse import urlparse url = urlparse(args.base_url) - username = get_username(args=args, url=args.base_url) + askpass_env=['GIT_ASKPASS', 'SSH_ASKPASS'] + username = get_username(args=args, url=args.base_url, askpass_env=askpass_env) + password = None + if username is not None: + password = get_password(args=args, url=args.base_url, askpass_env=askpass_env) match url.scheme: case 'ssh': - password = get_password(args=args, url=args.base_url, askpass_env=['GIT_ASKPASS', 'SSH_ASKPASS']) if re.match(r'ssh://.*git\.janware\.com/', args.base_url): from jw.build.lib.SSHClient import SSHClientCmd as SSHClient ssh = SSHClient(hostname=url.hostname) @@ -39,16 +42,19 @@ class CmdListRepos(Cmd): # export print(out) return case 'https': + cmd_input = None if re.match(r'https://github.com', args.base_url): curl_args = [ '-f', - '-H', - 'Accept: application/vnd.github+json', - '-H', - 'X-GitHub-Api-Version: 2022-11-28', - f'https://api.github.com/users/{args.from_user}/repos' + '-H', 'Accept: application/vnd.github+json', + '-H', 'X-GitHub-Api-Version: 2022-11-28', ] - repos = run_curl(curl_args) + if password is not None: + assert username is not None + cmd_input = (f'-u {username}:{password}').encode('utf-8') + curl_args.extend(['-K-']) + curl_args.append(f'https://api.github.com/users/{args.from_user}/repos') + repos = run_curl(curl_args, cmd_input=cmd_input) for repo in repos: print(repo['name']) return @@ -57,10 +63,14 @@ class CmdListRepos(Cmd): # export curl_args = ['-f'] if re.match(r'https://janware.test', args.base_url): curl_args.append('--insecure') + if password is not None: + assert username is not None + cmd_input = (f'-u {username}:{password}').encode('utf-8') + curl_args.extend(['-K-']) curl_args.extend([ f'https://{url.hostname}/code/api/v1/orgs/{args.from_user}/repos' ]) - repos = run_curl(curl_args) + repos = run_curl(curl_args, cmd_input=cmd_input) for repo in repos: print(repo['name']) return diff --git a/src/python/jw/build/lib/util.py b/src/python/jw/build/lib/util.py index 04814bb5..14ea4e6b 100644 --- a/src/python/jw/build/lib/util.py +++ b/src/python/jw/build/lib/util.py @@ -3,6 +3,11 @@ import os, sys, subprocess, json, time from argparse import Namespace from urllib.parse import urlparse +from enum import Enum, auto + +class AskpassKey(Enum): + Username = auto() + Password = auto() def pretty_cmd(cmd: list[str], wd=None): tokens = [cmd[0]] @@ -15,7 +20,7 @@ def pretty_cmd(cmd: list[str], wd=None): ret += f' in {wd}' return ret -def run_cmd(cmd: list[str], wd=None, throw=True, verbose=False) -> str|None: # export +def run_cmd(cmd: list[str], wd=None, throw=True, verbose=False, cmd_input=None) -> str|None: # export if verbose: delim_len = 120 @@ -30,11 +35,17 @@ def run_cmd(cmd: list[str], wd=None, throw=True, verbose=False) -> str|None: # e ret = '' try: - p = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=None, close_fds=True) - for line in iter(p.stdout.readline, b''): - line = line.decode(sys.stdout.encoding) - ret += line - p.wait() + stdin = None + if cmd_input is not None: + stdin = subprocess.PIPE + p = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=None, close_fds=True, stdin=stdin) + if cmd_input is not None: + ret = p.communicate(input=cmd_input)[0] + else: + for line in iter(p.stdout.readline, b''): + line = line.decode(sys.stdout.encoding) + ret += line + p.wait() if verbose: print('`' + delim + ' <') if p.returncode: @@ -46,12 +57,12 @@ def run_cmd(cmd: list[str], wd=None, throw=True, verbose=False) -> str|None: # e os.chdir(cwd) return ret -def run_curl(args: list[str], parse_json: bool=True, wd=None, throw=None, verbose=False) -> dict|str: # export +def run_curl(args: list[str], parse_json: bool=True, wd=None, throw=None, verbose=False, cmd_input=None) -> dict|str: # export cmd = ['curl'] if not verbose: cmd.append('-s') cmd.extend(args) - ret = run_cmd(cmd, wd=wd, throw=throw, verbose=verbose) + ret = run_cmd(cmd, wd=wd, throw=throw, verbose=verbose, cmd_input=cmd_input) if parse_json: try: return json.loads(ret) @@ -60,16 +71,41 @@ def run_curl(args: list[str], parse_json: bool=True, wd=None, throw=None, verbos raise return ret -def get_username(args: Namespace|None=None, url: str|None=None) -> str: # export +def run_askpass(askpass_env: list[str], key: AskpassKey, host: str|None=None): + assert host is None # Currently unsupported + for var in askpass_env: + exe = os.getenv(var) + if exe is None: + continue + exe_arg = '' + match var: + case 'GIT_ASKPASS': + match key: + case AskpassKey.Username: + exe_arg += 'Username' + case AskpassKey.Password: + exe_arg += 'Password' + case 'SSH_ASKPASS': + match key: + case AskpassKey.Username: + continue # Can't get user name from SSH_ASKPASS + case AskpassKey.Password: + exe_arg += 'Password' + ret = run_cmd([exe, exe_arg], throw=False) + if ret is not None: + return ret + return None + +def get_username(args: Namespace|None=None, url: str|None=None, askpass_env: list[str]=[]) -> str: # export url_user = None if url is None else urlparse(url).username if args is not None: if args.username is not None: if url_user is not None and url_user != args.username: raise Exception(f'Username mismatch: called with --username="{args.username}", URL has user name "{url_user}"') return args.username - if url is not None: + if url_user is not None: return url_user - raise Exception(f'Neither URL nor command-line arguments available, can\'t get user name') + return run_askpass(askpass_env, AskpassKey.Username) def get_password(args: Namespace|None=None, url: str|None=None, askpass_env: list[str]=[]) -> str: # export if args is None and url is None and not askpass_env: @@ -82,15 +118,4 @@ def get_password(args: Namespace|None=None, url: str|None=None, askpass_env: lis parsed = urlparse(url) if parsed.password is not None: return parsed.password - exes = [] - if args is not None: - exes.append(getattr(args, 'askpass')) - for var in askpass_env: - exes.append(os.getenv(var)) - for exe in exes: - if exe is None: - continue - ret = run_cmd(exe, throw=False) - if ret is not None: - return ret - return None + return run_askpass(askpass_env, AskpassKey.Password)