build.cmds.CmdListRepos: Support username/password authentication

Signed-off-by: Jan Lindemann <jan@janware.com>
This commit is contained in:
Jan Lindemann 2025-11-20 11:29:50 +01:00
commit 27e9b23849
2 changed files with 67 additions and 32 deletions

View file

@ -23,10 +23,13 @@ class CmdListRepos(Cmd): # export
from urllib.parse import urlparse from urllib.parse import urlparse
url = urlparse(args.base_url) url = urlparse(args.base_url)
username = get_username(args=args, url=args.base_url) askpass_env=['GIT_ASKPASS', 'SSH_ASKPASS']
username = get_username(args=args, url=args.base_url, askpass_env=askpass_env)
password = None
if username is not None:
password = get_password(args=args, url=args.base_url, askpass_env=askpass_env)
match url.scheme: match url.scheme:
case 'ssh': case 'ssh':
password = get_password(args=args, url=args.base_url, askpass_env=['GIT_ASKPASS', 'SSH_ASKPASS'])
if re.match(r'ssh://.*git\.janware\.com/', args.base_url): if re.match(r'ssh://.*git\.janware\.com/', args.base_url):
from jw.build.lib.SSHClient import SSHClientCmd as SSHClient from jw.build.lib.SSHClient import SSHClientCmd as SSHClient
ssh = SSHClient(hostname=url.hostname) ssh = SSHClient(hostname=url.hostname)
@ -39,16 +42,19 @@ class CmdListRepos(Cmd): # export
print(out) print(out)
return return
case 'https': case 'https':
cmd_input = None
if re.match(r'https://github.com', args.base_url): if re.match(r'https://github.com', args.base_url):
curl_args = [ curl_args = [
'-f', '-f',
'-H', '-H', 'Accept: application/vnd.github+json',
'Accept: application/vnd.github+json', '-H', 'X-GitHub-Api-Version: 2022-11-28',
'-H',
'X-GitHub-Api-Version: 2022-11-28',
f'https://api.github.com/users/{args.from_user}/repos'
] ]
repos = run_curl(curl_args) if password is not None:
assert username is not None
cmd_input = (f'-u {username}:{password}').encode('utf-8')
curl_args.extend(['-K-'])
curl_args.append(f'https://api.github.com/users/{args.from_user}/repos')
repos = run_curl(curl_args, cmd_input=cmd_input)
for repo in repos: for repo in repos:
print(repo['name']) print(repo['name'])
return return
@ -57,10 +63,14 @@ class CmdListRepos(Cmd): # export
curl_args = ['-f'] curl_args = ['-f']
if re.match(r'https://janware.test', args.base_url): if re.match(r'https://janware.test', args.base_url):
curl_args.append('--insecure') curl_args.append('--insecure')
if password is not None:
assert username is not None
cmd_input = (f'-u {username}:{password}').encode('utf-8')
curl_args.extend(['-K-'])
curl_args.extend([ curl_args.extend([
f'https://{url.hostname}/code/api/v1/orgs/{args.from_user}/repos' f'https://{url.hostname}/code/api/v1/orgs/{args.from_user}/repos'
]) ])
repos = run_curl(curl_args) repos = run_curl(curl_args, cmd_input=cmd_input)
for repo in repos: for repo in repos:
print(repo['name']) print(repo['name'])
return return

View file

@ -3,6 +3,11 @@
import os, sys, subprocess, json, time import os, sys, subprocess, json, time
from argparse import Namespace from argparse import Namespace
from urllib.parse import urlparse from urllib.parse import urlparse
from enum import Enum, auto
class AskpassKey(Enum):
Username = auto()
Password = auto()
def pretty_cmd(cmd: list[str], wd=None): def pretty_cmd(cmd: list[str], wd=None):
tokens = [cmd[0]] tokens = [cmd[0]]
@ -15,7 +20,7 @@ def pretty_cmd(cmd: list[str], wd=None):
ret += f' in {wd}' ret += f' in {wd}'
return ret return ret
def run_cmd(cmd: list[str], wd=None, throw=True, verbose=False) -> str|None: # export def run_cmd(cmd: list[str], wd=None, throw=True, verbose=False, cmd_input=None) -> str|None: # export
if verbose: if verbose:
delim_len = 120 delim_len = 120
@ -30,11 +35,17 @@ def run_cmd(cmd: list[str], wd=None, throw=True, verbose=False) -> str|None: # e
ret = '' ret = ''
try: try:
p = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=None, close_fds=True) stdin = None
for line in iter(p.stdout.readline, b''): if cmd_input is not None:
line = line.decode(sys.stdout.encoding) stdin = subprocess.PIPE
ret += line p = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=None, close_fds=True, stdin=stdin)
p.wait() if cmd_input is not None:
ret = p.communicate(input=cmd_input)[0]
else:
for line in iter(p.stdout.readline, b''):
line = line.decode(sys.stdout.encoding)
ret += line
p.wait()
if verbose: if verbose:
print('`' + delim + ' <') print('`' + delim + ' <')
if p.returncode: if p.returncode:
@ -46,12 +57,12 @@ def run_cmd(cmd: list[str], wd=None, throw=True, verbose=False) -> str|None: # e
os.chdir(cwd) os.chdir(cwd)
return ret return ret
def run_curl(args: list[str], parse_json: bool=True, wd=None, throw=None, verbose=False) -> dict|str: # export def run_curl(args: list[str], parse_json: bool=True, wd=None, throw=None, verbose=False, cmd_input=None) -> dict|str: # export
cmd = ['curl'] cmd = ['curl']
if not verbose: if not verbose:
cmd.append('-s') cmd.append('-s')
cmd.extend(args) cmd.extend(args)
ret = run_cmd(cmd, wd=wd, throw=throw, verbose=verbose) ret = run_cmd(cmd, wd=wd, throw=throw, verbose=verbose, cmd_input=cmd_input)
if parse_json: if parse_json:
try: try:
return json.loads(ret) return json.loads(ret)
@ -60,16 +71,41 @@ def run_curl(args: list[str], parse_json: bool=True, wd=None, throw=None, verbos
raise raise
return ret return ret
def get_username(args: Namespace|None=None, url: str|None=None) -> str: # export def run_askpass(askpass_env: list[str], key: AskpassKey, host: str|None=None):
assert host is None # Currently unsupported
for var in askpass_env:
exe = os.getenv(var)
if exe is None:
continue
exe_arg = ''
match var:
case 'GIT_ASKPASS':
match key:
case AskpassKey.Username:
exe_arg += 'Username'
case AskpassKey.Password:
exe_arg += 'Password'
case 'SSH_ASKPASS':
match key:
case AskpassKey.Username:
continue # Can't get user name from SSH_ASKPASS
case AskpassKey.Password:
exe_arg += 'Password'
ret = run_cmd([exe, exe_arg], throw=False)
if ret is not None:
return ret
return None
def get_username(args: Namespace|None=None, url: str|None=None, askpass_env: list[str]=[]) -> str: # export
url_user = None if url is None else urlparse(url).username url_user = None if url is None else urlparse(url).username
if args is not None: if args is not None:
if args.username is not None: if args.username is not None:
if url_user is not None and url_user != args.username: if url_user is not None and url_user != args.username:
raise Exception(f'Username mismatch: called with --username="{args.username}", URL has user name "{url_user}"') raise Exception(f'Username mismatch: called with --username="{args.username}", URL has user name "{url_user}"')
return args.username return args.username
if url is not None: if url_user is not None:
return url_user return url_user
raise Exception(f'Neither URL nor command-line arguments available, can\'t get user name') return run_askpass(askpass_env, AskpassKey.Username)
def get_password(args: Namespace|None=None, url: str|None=None, askpass_env: list[str]=[]) -> str: # export def get_password(args: Namespace|None=None, url: str|None=None, askpass_env: list[str]=[]) -> str: # export
if args is None and url is None and not askpass_env: if args is None and url is None and not askpass_env:
@ -82,15 +118,4 @@ def get_password(args: Namespace|None=None, url: str|None=None, askpass_env: lis
parsed = urlparse(url) parsed = urlparse(url)
if parsed.password is not None: if parsed.password is not None:
return parsed.password return parsed.password
exes = [] return run_askpass(askpass_env, AskpassKey.Password)
if args is not None:
exes.append(getattr(args, 'askpass'))
for var in askpass_env:
exes.append(os.getenv(var))
for exe in exes:
if exe is None:
continue
ret = run_cmd(exe, throw=False)
if ret is not None:
return ret
return None