mirror of
ssh://git.janware.com/janware/proj/jw-pkg
synced 2026-04-24 17:23:36 +02:00
run_curl() has no clear API of whether or not the return values should be decoded. It has parse_json, which should imply decoding, but there's no way to specify that explicitly. Moreover, when it tries to decode, it decodes on the coroutine returned from run_cmd(), not the awaited coroutine return value. Add a decode parameter, defaulting to False, change the parse_json parameter's default from True to False, and fix the run_cmd() return value evaluation. Signed-off-by: Jan Lindemann <jan@janware.com>
159 lines
5.9 KiB
Python
159 lines
5.9 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import TYPE_CHECKING, Iterable
|
|
|
|
if TYPE_CHECKING:
|
|
from typing import Sequence
|
|
from ExecContext import ExecContext
|
|
|
|
import os, sys, json
|
|
|
|
from argparse import Namespace
|
|
from urllib.parse import urlparse
|
|
from enum import Enum, auto
|
|
|
|
from .log import *
|
|
|
|
class AskpassKey(Enum):
|
|
Username = auto()
|
|
Password = auto()
|
|
|
|
def pretty_cmd(cmd: list[str], wd=None):
|
|
tokens = [cmd[0]]
|
|
for token in cmd[1:]:
|
|
if token.find(' ') != -1:
|
|
token = '"' + token + '"'
|
|
tokens.append(token)
|
|
ret = ' '.join(tokens)
|
|
if wd is not None:
|
|
ret += f' in {wd}'
|
|
return ret
|
|
|
|
# See ExecContext.run() for what this function does
|
|
async def run_cmd(*args, ec: ExecContext|None=None, verbose: bool|None=None, **kwargs) -> Result:
|
|
if verbose is None:
|
|
verbose = False if ec is None else ec.verbose_default
|
|
if ec is None:
|
|
from .ec.Local import Local
|
|
ec = Local(verbose_default=verbose)
|
|
return await ec.run(verbose=verbose, *args, **kwargs)
|
|
|
|
async def run_curl(args: list[str], parse_json: bool=False, wd=None, throw=None, verbose=None, cmd_input=None, ec: ExecContext|None=None, decode=False) -> dict|str: # export
|
|
if verbose is None:
|
|
verbose = False if ec is None else ec.verbose_default
|
|
cmd = ['curl']
|
|
if not verbose:
|
|
cmd.append('-s')
|
|
cmd.extend(args)
|
|
if parse_json:
|
|
decode = True
|
|
output = await run_cmd(cmd, wd=wd, throw=throw, verbose=verbose, cmd_input=cmd_input, ec=ec)
|
|
stdout, stderr, status = output.decode() if decode else output
|
|
if not parse_json:
|
|
ret = stdout
|
|
else:
|
|
try:
|
|
ret = json.loads(stdout)
|
|
except Exception as e:
|
|
size = 'unknown number of'
|
|
try:
|
|
size = len(stdout)
|
|
except:
|
|
pass
|
|
log(ERR, f'Failed to parse {size} bytes output of command '
|
|
+ f'>{pretty_cmd(cmd, wd)}< ({str(e)}): "{stdout}"', file=sys.stderr)
|
|
raise
|
|
return ret, stderr, status
|
|
|
|
async def run_askpass(askpass_env: list[str], key: AskpassKey, host: str|None=None, ec: ExecContext|None=None):
|
|
assert host is None # Currently unsupported
|
|
for var in askpass_env:
|
|
exe = os.getenv(var)
|
|
if exe is None:
|
|
continue
|
|
exe_arg = ''
|
|
match var:
|
|
case 'GIT_ASKPASS':
|
|
match key:
|
|
case AskpassKey.Username:
|
|
exe_arg += 'Username'
|
|
case AskpassKey.Password:
|
|
exe_arg += 'Password'
|
|
case 'SSH_ASKPASS':
|
|
match key:
|
|
case AskpassKey.Username:
|
|
continue # Can't get user name from SSH_ASKPASS
|
|
case AskpassKey.Password:
|
|
exe_arg += 'Password'
|
|
ret, stderr, status = await run_cmd([exe, exe_arg], throw=False, ec=ec).decode()
|
|
if ret is not None:
|
|
return ret
|
|
return None
|
|
|
|
async def run_sudo(cmd: list[str], *args, interactive: bool=True, ec: ExecContext|None=None, **kwargs):
|
|
if ec is None:
|
|
from .ec.Local import Local
|
|
ec = Local(interactive=interactive)
|
|
return await ec.sudo(cmd, *args, **kwargs)
|
|
|
|
async def get_username(args: Namespace|None=None, url: str|None=None, askpass_env: list[str]=[], ec: ExecContext|None=None) -> str: # export
|
|
url_user = None if url is None else urlparse(url).username
|
|
if args is not None:
|
|
if args.username is not None:
|
|
if url_user is not None and url_user != args.username:
|
|
raise Exception(f'Username mismatch: called with --username="{args.username}", URL has user name "{url_user}"')
|
|
return args.username
|
|
if url_user is not None:
|
|
return url_user
|
|
return await run_askpass(askpass_env, AskpassKey.Username, ec=ec)
|
|
|
|
async def get_password(args: Namespace|None=None, url: str|None=None, askpass_env: list[str]=[], ec: ExecContext|None=None) -> str: # export
|
|
if args is None and url is None and not askpass_env:
|
|
raise Exception(f'Neither URL nor command-line arguments nor askpass environment variable available, can\'t get password')
|
|
if args is not None and hasattr(args, 'password'): # use getattr(), because we don't necessarily want to have insecure --password among options
|
|
ret = getattr(args, 'password')
|
|
if ret is not None:
|
|
return ret
|
|
if url is not None:
|
|
parsed = urlparse(url)
|
|
if parsed.password is not None:
|
|
return parsed.password
|
|
return await run_askpass(askpass_env, AskpassKey.Password, ec=ec)
|
|
|
|
async def get_profile_env(throw: bool=True, keep: Iterable[str]|bool=False, ec: ExecContext|None=None) -> dict[str, str]: # export
|
|
"""
|
|
Get a fresh environment from /etc/profile
|
|
|
|
Args:
|
|
keep:
|
|
- False -> Don't keep anything
|
|
- True -> Keep what's in the current environment
|
|
- List of strings -> Keep those variables
|
|
|
|
Returns:
|
|
Dictionary with fresh environment
|
|
"""
|
|
env: dict[str,str]|None = None
|
|
if keep == False or isinstance(keep, Iterable):
|
|
env = {
|
|
'HOME': os.environ.get('HOME', '/'),
|
|
'USER': os.environ.get('USER', ''),
|
|
'PATH': '/usr/bin:/bin',
|
|
}
|
|
# Run bash as a login shell, which sources /etc/profile, then print environment as NUL-separated key=value pairs
|
|
cmd = ['/usr/bin/env', '-i', '/bin/bash', '-lc', 'env -0']
|
|
result = await run_cmd(cmd, throw=throw, verbose=True, env=env, ec=ec)
|
|
ret: dict[str, str] = {}
|
|
for entry in result.stdout.rstrip(b"\0").split(b"\0"):
|
|
if not entry:
|
|
continue
|
|
key, val = entry.split(b"=", 1)
|
|
ret[key.decode()] = val.decode()
|
|
if isinstance(keep, Iterable):
|
|
for key in keep:
|
|
val = os.getenv(key)
|
|
if val is not None:
|
|
ret[key] = val
|
|
return ret
|