jw-pkg/src/python/jw/pkg/lib/util.py
Jan Lindemann 4274a71c62 lib.util.run_cmd(): Rewrite it to be async
run_cmd() is synchronous. Now that all commands are asynchronous, we
can await it, so rewrite it to be asynchronous, too.

Other changes:

  - Make it return stderr as well in case its needed

  - Drop into a pseuto-tty if
    - cmd_input == "mode:interactive" or
    - cmd_input == "mode:auto" and stdin is a TTY

  - Add argument env, defaulting to None. If it's a dict, it will be
    the environment the command is run in

This entails making all functions using run_cmd() async, too,
including run_curl(), get_username() and get_password().

Signed-off-by: Jan Lindemann <jan@janware.com>
2026-01-28 17:41:40 +01:00

238 lines
7.6 KiB
Python

# -*- coding: utf-8 -*-
from typing import Sequence
import os, sys, subprocess, json, time, asyncio
from argparse import Namespace
from urllib.parse import urlparse
from enum import Enum, auto
from .log import *
class AskpassKey(Enum):
Username = auto()
Password = auto()
def pretty_cmd(cmd: list[str], wd=None):
tokens = [cmd[0]]
for token in cmd[1:]:
if token.find(' ') != -1:
token = '"' + token + '"'
tokens.append(token)
ret = ' '.join(tokens)
if wd is not None:
ret += f' in {wd}'
return ret
async def run_cmd(
*args: str,
wd: str | None = None,
throw: bool = True,
verbose: bool = False,
cmd_input: str|None = None,
env: dict[str, str] | None = None
) -> tuple[str|None, str|None]:
"""
Run a command asynchronously and return its output
Args:
*args: Command and arguments
wd: Optional working directory
throw: Raise an exception on non-zero exit status if True
verbose: Emit log output while the command runs
cmd_input:
- None -> stdin from /dev/null
- "mode:interactive" -> Inherit terminal stdin
- "mode:auto" -> Inherit terminal stdin if it is a TTY
- otherwise -> String fed to stdin
Returns:
(stdout, stderr), each as a string or None
"""
def __log(prio, *args):
if verbose:
log(prio, "|", *args)
def __check_exit_code(code):
if code != 0 and (throw or verbose):
msg = (
time.strftime("%Y-%m-%d %H:%M")
+ f': Command returned error {code}: "{pretty_cmd(args, wd)}"'
)
if verbose:
__log(ERR, msg)
if throw:
raise RuntimeError(msg)
def __make_pty_reader(collector: list[str], encoding: str, verbose: bool):
def _read(fd):
data = os.read(fd, 1024)
if not data:
return data
text = data.decode(encoding, errors="replace")
collector.append(text)
return data
return _read
if verbose:
delim_len = 120
delim = f'---- running {pretty_cmd(args, wd)} -'
delim += '-' * max(0, delim_len - len(delim))
log(NOTICE, ',' + delim + ' >')
cwd: str|None = None
if wd is not None:
cwd = os.getcwd()
os.chdir(wd)
try:
# -- interactive mode
if cmd_input == "mode:auto" and sys.stdin.isatty():
cmd_input == "mode:interactive"
if cmd_input == "mode:interactive":
import pty
stdout_chunks: list[str] = []
reader = __make_pty_reader(
stdout_chunks,
sys.stdout.encoding or "utf-8",
verbose,
)
def _spawn():
return pty.spawn(args, master_read=reader)
__check_exit_code(await asyncio.to_thread(_spawn))
# PTY merges stdout/stderr
return "".join(stdout_chunks), None
# -- non-interactive mode
stdin = (
asyncio.subprocess.DEVNULL
if cmd_input is None
else asyncio.subprocess.PIPE
)
proc = await asyncio.create_subprocess_exec(
*args,
stdin=stdin,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
)
stdout_chunks: list[str] = []
stderr_chunks: list[str] = []
async def read_stream(stream, prio, collector, encoding):
while True:
line = await stream.readline()
if not line:
break
text = line.decode(encoding, errors="replace")
collector.append(text)
__log(prio, text.rstrip("\n"))
tasks = [
asyncio.create_task(
read_stream(proc.stdout, NOTICE, stdout_chunks, sys.stdout.encoding or "utf-8")
),
asyncio.create_task(
read_stream(proc.stderr, ERR, stderr_chunks, sys.stderr.encoding or "utf-8")
),
]
if stdin is asyncio.subprocess.PIPE:
proc.stdin.write(cmd_input.encode(sys.stdout.encoding or "utf-8"))
await proc.stdin.drain()
proc.stdin.close()
exit_code = await proc.wait()
await asyncio.gather(*tasks)
__check_exit_code(exit_code)
return (
"".join(stdout_chunks) if stdout_chunks else None,
"".join(stderr_chunks) if stderr_chunks else None,
)
finally:
if cwd is not None:
os.chdir(cwd)
if verbose:
log(NOTICE, '`' + delim + ' <')
async def run_curl(args: list[str], parse_json: bool=True, wd=None, throw=None, verbose=False, cmd_input=None) -> dict|str: # export
cmd = ['curl']
if not verbose:
cmd.append('-s')
cmd.extend(args)
ret, stderr = await run_cmd(*cmd, wd=wd, throw=throw, verbose=verbose, cmd_input=cmd_input)
if parse_json:
try:
return json.loads(ret)
except Exception as e:
size = 'None'
try:
size = len(ret)
except:
pass
print(f'Failed to parse {size} bytes output of command >{pretty_cmd(cmd, wd)}< ({e})', file=sys.stderr)
raise
return ret
async def run_askpass(askpass_env: list[str], key: AskpassKey, host: str|None=None):
assert host is None # Currently unsupported
for var in askpass_env:
exe = os.getenv(var)
if exe is None:
continue
exe_arg = ''
match var:
case 'GIT_ASKPASS':
match key:
case AskpassKey.Username:
exe_arg += 'Username'
case AskpassKey.Password:
exe_arg += 'Password'
case 'SSH_ASKPASS':
match key:
case AskpassKey.Username:
continue # Can't get user name from SSH_ASKPASS
case AskpassKey.Password:
exe_arg += 'Password'
ret, stderr = await run_cmd(exe, exe_arg, throw=False)
if ret is not None:
return ret
return None
async def get_username(args: Namespace|None=None, url: str|None=None, askpass_env: list[str]=[]) -> str: # export
url_user = None if url is None else urlparse(url).username
if args is not None:
if args.username is not None:
if url_user is not None and url_user != args.username:
raise Exception(f'Username mismatch: called with --username="{args.username}", URL has user name "{url_user}"')
return args.username
if url_user is not None:
return url_user
return await run_askpass(askpass_env, AskpassKey.Username)
async def get_password(args: Namespace|None=None, url: str|None=None, askpass_env: list[str]=[]) -> str: # export
if args is None and url is None and not askpass_env:
raise Exception(f'Neither URL nor command-line arguments nor askpass environment variable available, can\'t get password')
if args is not None and hasattr(args, 'password'): # use getattr(), because we don't necessarily want to have insecure --password among options
ret = getattr(args, 'password')
if ret is not None:
return ret
if url is not None:
parsed = urlparse(url)
if parsed.password is not None:
return parsed.password
return await run_askpass(askpass_env, AskpassKey.Password)