mirror of
ssh://git.janware.com/janware/proj/jw-pkg
synced 2026-04-25 17:45:55 +02:00
Make some incomprensible parser error messages if run_curl() returns nothing slightly less incomprehensible. Signed-off-by: Jan Lindemann <jan@janware.com>
344 lines
12 KiB
Python
344 lines
12 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
from typing import Sequence, Iterable
|
|
|
|
import os, sys, subprocess, json, time, asyncio
|
|
|
|
from argparse import Namespace
|
|
from urllib.parse import urlparse
|
|
from enum import Enum, auto
|
|
|
|
from .log import *
|
|
|
|
class AskpassKey(Enum):
|
|
Username = auto()
|
|
Password = auto()
|
|
|
|
def pretty_cmd(cmd: list[str], wd=None):
|
|
tokens = [cmd[0]]
|
|
for token in cmd[1:]:
|
|
if token.find(' ') != -1:
|
|
token = '"' + token + '"'
|
|
tokens.append(token)
|
|
ret = ' '.join(tokens)
|
|
if wd is not None:
|
|
ret += f' in {wd}'
|
|
return ret
|
|
|
|
async def run_cmd(
|
|
args: list[str],
|
|
wd: str|None = None,
|
|
throw: bool = True,
|
|
verbose: bool = False,
|
|
cmd_input: str|None = None,
|
|
env: dict[str, str]|None = None,
|
|
title: str=None,
|
|
output_encoding: str|None = None, # None => unchanged; "bytes" => return raw bytes
|
|
) -> tuple[str|bytes|None, str|bytes|None]:
|
|
"""
|
|
Run a command asynchronously and return its output
|
|
|
|
Args:
|
|
args: Command and arguments
|
|
wd: Optional working directory
|
|
throw: Raise an exception on non-zero exit status if True
|
|
verbose: Emit log output while the command runs
|
|
cmd_input:
|
|
- None -> stdin from /dev/null
|
|
- "mode:interactive" -> Inherit terminal stdin
|
|
- "mode:auto" -> Inherit terminal stdin if it is a TTY
|
|
- otherwise -> String fed to stdin
|
|
output_encoding:
|
|
- None -> unchanged behavior (decode stdout via sys.stdout.encoding, stderr via sys.stderr.encoding)
|
|
- "bytes" -> return raw bytes instead of decoded strings
|
|
- otherwise -> decode stdout/stderr using this encoding
|
|
|
|
Returns:
|
|
(stdout, stderr), each as a string/bytes or None
|
|
In PTY mode stderr is always None because PTY merges stdout/stderr.
|
|
"""
|
|
|
|
want_bytes = (output_encoding == "bytes")
|
|
|
|
def __log(prio, *args, verbose=verbose):
|
|
if verbose:
|
|
log(prio, "|", *args)
|
|
|
|
def __check_exit_code(code: int, stdout=None, stderr=None):
|
|
if code == 0:
|
|
return
|
|
if (throw or verbose):
|
|
msg = f'Command returned error {code}: {pretty_cmd(args, wd)}: '
|
|
if stderr:
|
|
msg += stderr.strip()
|
|
if throw:
|
|
raise RuntimeError(msg)
|
|
|
|
def __make_pty_reader(collector: list[bytes], enc_for_verbose: str):
|
|
def _read(fd):
|
|
data = os.read(fd, 1024)
|
|
if not data:
|
|
return data
|
|
collector.append(data)
|
|
if verbose:
|
|
__log(NOTICE, data.decode(enc_for_verbose, errors="replace").rstrip("\n"))
|
|
return data
|
|
return _read
|
|
|
|
if verbose:
|
|
delim_len = 120
|
|
delim = title if title is not None else f'---- Running {pretty_cmd(args, wd)} -'
|
|
delim += '-' * max(0, delim_len - len(delim))
|
|
log(NOTICE, ',' + delim + ' >')
|
|
|
|
cwd: str|None = None
|
|
if wd is not None:
|
|
cwd = os.getcwd()
|
|
os.chdir(wd)
|
|
|
|
try:
|
|
|
|
# -- interactive mode
|
|
if cmd_input == "mode:auto" and sys.stdin.isatty():
|
|
cmd_input = "mode:interactive"
|
|
|
|
if cmd_input == "mode:interactive":
|
|
|
|
import pty
|
|
|
|
stdout_chunks_b: list[bytes] = []
|
|
|
|
enc_for_verbose = (
|
|
(sys.stdout.encoding or "utf-8")
|
|
if output_encoding in (None, "bytes")
|
|
else output_encoding
|
|
)
|
|
|
|
reader = __make_pty_reader(stdout_chunks_b, enc_for_verbose)
|
|
|
|
def _spawn():
|
|
# Apply env in PTY mode by temporarily updating os.environ around spawn.
|
|
if env:
|
|
old_env = os.environ.copy()
|
|
try:
|
|
os.environ.update(env)
|
|
return pty.spawn(args, master_read=reader)
|
|
finally:
|
|
os.environ.clear()
|
|
os.environ.update(old_env)
|
|
return pty.spawn(args, master_read=reader)
|
|
|
|
__check_exit_code(await asyncio.to_thread(_spawn))
|
|
|
|
# PTY merges stdout/stderr
|
|
stdout_b = b"".join(stdout_chunks_b) if stdout_chunks_b else None
|
|
if want_bytes:
|
|
return stdout_b, None
|
|
|
|
stdout_dec_enc = (sys.stdout.encoding or "utf-8") if output_encoding is None else output_encoding
|
|
stdout_s = stdout_b.decode(stdout_dec_enc, errors="replace") if stdout_b is not None else None
|
|
return stdout_s, None
|
|
|
|
# -- non-interactive mode
|
|
stdin = (
|
|
asyncio.subprocess.DEVNULL
|
|
if cmd_input is None
|
|
else asyncio.subprocess.PIPE
|
|
)
|
|
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*args,
|
|
stdin=stdin,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
env=env,
|
|
)
|
|
|
|
stdout_parts_b: list[bytes] = []
|
|
stderr_parts_b: list[bytes] = []
|
|
|
|
# -- decoding for verbose output in pipe mode
|
|
if output_encoding is None or want_bytes:
|
|
stdout_log_enc = sys.stdout.encoding or "utf-8"
|
|
stderr_log_enc = sys.stderr.encoding or "utf-8"
|
|
else:
|
|
stdout_log_enc = output_encoding
|
|
stderr_log_enc = output_encoding
|
|
|
|
async def read_stream(stream, prio, collector: list[bytes], log_enc: str):
|
|
while True:
|
|
line = await stream.readline()
|
|
if not line:
|
|
break
|
|
collector.append(line)
|
|
if verbose:
|
|
__log(prio, line.decode(log_enc, errors="replace").rstrip("\n"))
|
|
|
|
tasks = [
|
|
asyncio.create_task(
|
|
read_stream(proc.stdout, NOTICE, stdout_parts_b, stdout_log_enc)
|
|
),
|
|
asyncio.create_task(
|
|
read_stream(proc.stderr, ERR, stderr_parts_b, stderr_log_enc)
|
|
),
|
|
]
|
|
|
|
if stdin is asyncio.subprocess.PIPE:
|
|
proc.stdin.write(cmd_input.encode(sys.stdout.encoding or "utf-8"))
|
|
await proc.stdin.drain()
|
|
proc.stdin.close()
|
|
|
|
exit_code = await proc.wait()
|
|
await asyncio.gather(*tasks)
|
|
if want_bytes:
|
|
__check_exit_code(exit_code)
|
|
|
|
stdout_b = b"".join(stdout_parts_b) if stdout_parts_b else None
|
|
stderr_b = b"".join(stderr_parts_b) if stderr_parts_b else None
|
|
|
|
if want_bytes:
|
|
return stdout_b, stderr_b
|
|
|
|
if output_encoding is None:
|
|
stdout_dec_enc = sys.stdout.encoding or "utf-8"
|
|
stderr_dec_enc = sys.stderr.encoding or "utf-8"
|
|
else:
|
|
stdout_dec_enc = output_encoding
|
|
stderr_dec_enc = output_encoding
|
|
|
|
stdout_s = stdout_b.decode(stdout_dec_enc, errors="replace") if stdout_b is not None else None
|
|
stderr_s = stderr_b.decode(stderr_dec_enc, errors="replace") if stderr_b is not None else None
|
|
|
|
if not want_bytes:
|
|
__check_exit_code(exit_code, stdout=stdout_s, stderr=stderr_s)
|
|
|
|
return stdout_s, stderr_s
|
|
|
|
finally:
|
|
if cwd is not None:
|
|
os.chdir(cwd)
|
|
if verbose:
|
|
log(NOTICE, '`' + delim + ' <')
|
|
|
|
async def run_curl(args: list[str], parse_json: bool=True, wd=None, throw=None, verbose=False, cmd_input=None) -> dict|str: # export
|
|
cmd = ['curl']
|
|
if not verbose:
|
|
cmd.append('-s')
|
|
cmd.extend(args)
|
|
ret, stderr = await run_cmd(cmd, wd=wd, throw=throw, verbose=verbose, cmd_input=cmd_input)
|
|
if parse_json:
|
|
try:
|
|
return json.loads(ret)
|
|
except Exception as e:
|
|
size = 'unknown number of'
|
|
try:
|
|
size = len(ret)
|
|
except:
|
|
pass
|
|
print(f'Failed to parse {size} bytes output of command '
|
|
+ f'>{pretty_cmd(cmd, wd)}< ({str(e)}): "{ret}"', file=sys.stderr)
|
|
raise
|
|
return ret
|
|
|
|
async def run_askpass(askpass_env: list[str], key: AskpassKey, host: str|None=None):
|
|
assert host is None # Currently unsupported
|
|
for var in askpass_env:
|
|
exe = os.getenv(var)
|
|
if exe is None:
|
|
continue
|
|
exe_arg = ''
|
|
match var:
|
|
case 'GIT_ASKPASS':
|
|
match key:
|
|
case AskpassKey.Username:
|
|
exe_arg += 'Username'
|
|
case AskpassKey.Password:
|
|
exe_arg += 'Password'
|
|
case 'SSH_ASKPASS':
|
|
match key:
|
|
case AskpassKey.Username:
|
|
continue # Can't get user name from SSH_ASKPASS
|
|
case AskpassKey.Password:
|
|
exe_arg += 'Password'
|
|
ret, stderr = await run_cmd([exe, exe_arg], throw=False)
|
|
if ret is not None:
|
|
return ret
|
|
return None
|
|
|
|
async def run_sudo(cmd: list[str], mod_env: dict[str, str] = {}, opts: list[str]=[], interactive: bool=True, verbose=True):
|
|
env: dict[str, str]|None = None
|
|
cmd_input: str|None = None
|
|
if mod_env:
|
|
env = os.environ.copy()
|
|
env.update(mod_env)
|
|
cmdline = []
|
|
if os.getuid() != 0:
|
|
cmdline.append('/usr/bin/sudo')
|
|
if env is not None:
|
|
cmdline.append('--preserve-env=' + ','.join(mod_env.keys()))
|
|
cmdline.extend(opts)
|
|
cmdline.extend(cmd)
|
|
if interactive:
|
|
cmd_input = "mode:interactive"
|
|
stdout, stderr = await run_cmd(cmdline, throw=True, verbose=verbose, env=env, cmd_input=cmd_input)
|
|
return stdout, stderr
|
|
|
|
async def get_username(args: Namespace|None=None, url: str|None=None, askpass_env: list[str]=[]) -> str: # export
|
|
url_user = None if url is None else urlparse(url).username
|
|
if args is not None:
|
|
if args.username is not None:
|
|
if url_user is not None and url_user != args.username:
|
|
raise Exception(f'Username mismatch: called with --username="{args.username}", URL has user name "{url_user}"')
|
|
return args.username
|
|
if url_user is not None:
|
|
return url_user
|
|
return await run_askpass(askpass_env, AskpassKey.Username)
|
|
|
|
async def get_password(args: Namespace|None=None, url: str|None=None, askpass_env: list[str]=[]) -> str: # export
|
|
if args is None and url is None and not askpass_env:
|
|
raise Exception(f'Neither URL nor command-line arguments nor askpass environment variable available, can\'t get password')
|
|
if args is not None and hasattr(args, 'password'): # use getattr(), because we don't necessarily want to have insecure --password among options
|
|
ret = getattr(args, 'password')
|
|
if ret is not None:
|
|
return ret
|
|
if url is not None:
|
|
parsed = urlparse(url)
|
|
if parsed.password is not None:
|
|
return parsed.password
|
|
return await run_askpass(askpass_env, AskpassKey.Password)
|
|
|
|
async def get_profile_env(throw: bool=True, keep: Iterable[str]|bool=False) -> dict[str, str]: # export
|
|
"""
|
|
Get a fresh environment from /etc/profile
|
|
|
|
Args:
|
|
keep:
|
|
- False -> Don't keep anything
|
|
- True -> Keep what's in the current environment
|
|
- List of strings -> Keep those variables
|
|
|
|
Returns:
|
|
Dictionary with fresh environment
|
|
"""
|
|
env: dict[str,str]|None = None
|
|
if keep == False or isinstance(keep, Iterable):
|
|
env = {
|
|
'HOME': os.environ.get('HOME', '/'),
|
|
'USER': os.environ.get('USER', ''),
|
|
'PATH': '/usr/bin:/bin',
|
|
}
|
|
# Run bash as a login shell, which sources /etc/profile, then print environment as NUL-separated key=value pairs
|
|
cmd = ['/usr/bin/env', '-i', '/bin/bash', '-lc', 'env -0']
|
|
stdout, stderr = await run_cmd(cmd, throw=throw, output_encoding="bytes", verbose=True, env=env)
|
|
ret: dict[str, str] = {}
|
|
for entry in stdout.split(b"\0"):
|
|
if not entry:
|
|
continue
|
|
key, val = entry.split(b"=", 1)
|
|
ret[key.decode()] = val.decode()
|
|
if isinstance(keep, Iterable):
|
|
for key in keep:
|
|
val = os.getenv(key)
|
|
if val is not None:
|
|
ret[key] = val
|
|
return ret
|