From 4274a71c6242752058c85fba80054b596efac01c Mon Sep 17 00:00:00 2001 From: Jan Lindemann Date: Tue, 27 Jan 2026 16:20:57 +0100 Subject: [PATCH] lib.util.run_cmd(): Rewrite it to be async run_cmd() is synchronous. Now that all commands are asynchronous, we can await it, so rewrite it to be asynchronous, too. Other changes: - Make it return stderr as well in case its needed - Drop into a pseuto-tty if - cmd_input == "mode:interactive" or - cmd_input == "mode:auto" and stdin is a TTY - Add argument env, defaulting to None. If it's a dict, it will be the environment the command is run in This entails making all functions using run_cmd() async, too, including run_curl(), get_username() and get_password(). Signed-off-by: Jan Lindemann --- .../jw/pkg/cmds/projects/CmdGetAuthInfo.py | 2 +- .../jw/pkg/cmds/projects/CmdListRepos.py | 10 +- src/python/jw/pkg/lib/SSHClient.py | 9 +- src/python/jw/pkg/lib/util.py | 189 ++++++++++++++---- 4 files changed, 164 insertions(+), 46 deletions(-) diff --git a/src/python/jw/pkg/cmds/projects/CmdGetAuthInfo.py b/src/python/jw/pkg/cmds/projects/CmdGetAuthInfo.py index c68d346c..be54b524 100644 --- a/src/python/jw/pkg/cmds/projects/CmdGetAuthInfo.py +++ b/src/python/jw/pkg/cmds/projects/CmdGetAuthInfo.py @@ -29,7 +29,7 @@ class CmdGetAuthInfo(Cmd): # export if not os.path.isdir(jw_pkg_dir + '/.git'): log(DEBUG, f'jw-pkg directory is not a Git repo: {jw_pkg_dir}') return - remotes = run_cmd(['git', '-C', jw_pkg_dir, 'remote', '-v']) + remotes, stderr = await run_cmd(['git', '-C', jw_pkg_dir, 'remote', '-v']) result: dict[str, str] = {} for line in remotes.split('\n'): if re.match(r'^\s*$', line): diff --git a/src/python/jw/pkg/cmds/projects/CmdListRepos.py b/src/python/jw/pkg/cmds/projects/CmdListRepos.py index c037dd93..6cc5c6ff 100644 --- a/src/python/jw/pkg/cmds/projects/CmdListRepos.py +++ b/src/python/jw/pkg/cmds/projects/CmdListRepos.py @@ -25,10 +25,10 @@ class CmdListRepos(Cmd): # export from urllib.parse import urlparse url = urlparse(args.base_url) askpass_env=['GIT_ASKPASS', 'SSH_ASKPASS'] - username = get_username(args=args, url=args.base_url, askpass_env=askpass_env) + username = await get_username(args=args, url=args.base_url, askpass_env=askpass_env) password = None if username is not None: - password = get_password(args=args, url=args.base_url, askpass_env=askpass_env) + password = await get_password(args=args, url=args.base_url, askpass_env=askpass_env) match url.scheme: case 'ssh': if re.match(r'ssh://.*git\.janware\.com/', args.base_url): @@ -39,7 +39,7 @@ class CmdListRepos(Cmd): # export if password is not None: ssh.set_password(password) cmd = f'/opt/jw-pkg/bin/git-srv-admin.sh -u {args.from_user} -j list-personal-projects' - out = ssh.run_cmd(cmd) + out = await ssh.run_cmd(cmd) print(out) return case 'https': @@ -55,7 +55,7 @@ class CmdListRepos(Cmd): # export cmd_input = (f'-u {username}:{password}').encode('utf-8') curl_args.extend(['-K-']) curl_args.append(f'https://api.github.com/users/{args.from_user}/repos') - repos = run_curl(curl_args, cmd_input=cmd_input) + repos = await run_curl(curl_args, cmd_input=cmd_input) for repo in repos: print(repo['name']) return @@ -71,7 +71,7 @@ class CmdListRepos(Cmd): # export curl_args.extend([ f'https://{url.hostname}/code/api/v1/orgs/{args.from_user}/repos' ]) - repos = run_curl(curl_args, cmd_input=cmd_input) + repos = await run_curl(curl_args, cmd_input=cmd_input) for repo in repos: print(repo['name']) return diff --git a/src/python/jw/pkg/lib/SSHClient.py b/src/python/jw/pkg/lib/SSHClient.py index c46c2b63..233fee67 100644 --- a/src/python/jw/pkg/lib/SSHClient.py +++ b/src/python/jw/pkg/lib/SSHClient.py @@ -32,7 +32,7 @@ class SSHClient(abc.ABC): return self.__username @abc.abstractmethod - def run_cmd(self, cmd: str): + async def run_cmd(self, cmd: str): pass class SSHClientInternal(SSHClient): # export @@ -61,7 +61,7 @@ class SSHClientInternal(SSHClient): # export def __scp(self): return SCPClient(self.__ssh.get_transport()) - def run_cmd(self, cmd: str): + async def run_cmd(self, cmd: str): return self.__ssh.exec_command(find_cmd) class SSHClientCmd(SSHClient): # export @@ -93,8 +93,9 @@ class SSHClientCmd(SSHClient): # export self.__askpass_orig[key] = os.getenv(key) os.environ[key] = val - def run_cmd(self, cmd: str): + async def run_cmd(self, cmd: str): self.__init_askpass() cmd_arr = ['ssh'] cmd_arr.append(self.hostname) - return run_cmd(['ssh', self.hostname, cmd]) + stdout, stderr = await run_cmd('ssh', self.hostname, cmd) + return stdout diff --git a/src/python/jw/pkg/lib/util.py b/src/python/jw/pkg/lib/util.py index 14ea4e6b..8b74f665 100644 --- a/src/python/jw/pkg/lib/util.py +++ b/src/python/jw/pkg/lib/util.py @@ -1,10 +1,15 @@ # -*- coding: utf-8 -*- -import os, sys, subprocess, json, time +from typing import Sequence + +import os, sys, subprocess, json, time, asyncio + from argparse import Namespace from urllib.parse import urlparse from enum import Enum, auto +from .log import * + class AskpassKey(Enum): Username = auto() Password = auto() @@ -20,58 +25,170 @@ def pretty_cmd(cmd: list[str], wd=None): ret += f' in {wd}' return ret -def run_cmd(cmd: list[str], wd=None, throw=True, verbose=False, cmd_input=None) -> str|None: # export +async def run_cmd( + *args: str, + wd: str | None = None, + throw: bool = True, + verbose: bool = False, + cmd_input: str|None = None, + env: dict[str, str] | None = None +) -> tuple[str|None, str|None]: + """ + Run a command asynchronously and return its output + + Args: + *args: Command and arguments + wd: Optional working directory + throw: Raise an exception on non-zero exit status if True + verbose: Emit log output while the command runs + cmd_input: + - None -> stdin from /dev/null + - "mode:interactive" -> Inherit terminal stdin + - "mode:auto" -> Inherit terminal stdin if it is a TTY + - otherwise -> String fed to stdin + + Returns: + (stdout, stderr), each as a string or None + """ + + def __log(prio, *args): + if verbose: + log(prio, "|", *args) + + def __check_exit_code(code): + if code != 0 and (throw or verbose): + msg = ( + time.strftime("%Y-%m-%d %H:%M") + + f': Command returned error {code}: "{pretty_cmd(args, wd)}"' + ) + if verbose: + __log(ERR, msg) + if throw: + raise RuntimeError(msg) + + def __make_pty_reader(collector: list[str], encoding: str, verbose: bool): + def _read(fd): + data = os.read(fd, 1024) + if not data: + return data + text = data.decode(encoding, errors="replace") + collector.append(text) + return data + return _read if verbose: delim_len = 120 - delim = f'---- running {pretty_cmd(cmd, wd)} -' - delim = delim + '-' * (delim_len - len(delim)) - print(',' + delim + ' >') + delim = f'---- running {pretty_cmd(args, wd)} -' + delim += '-' * max(0, delim_len - len(delim)) + log(NOTICE, ',' + delim + ' >') cwd: str|None = None if wd is not None: cwd = os.getcwd() os.chdir(wd) - ret = '' try: - stdin = None - if cmd_input is not None: - stdin = subprocess.PIPE - p = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=None, close_fds=True, stdin=stdin) - if cmd_input is not None: - ret = p.communicate(input=cmd_input)[0] - else: - for line in iter(p.stdout.readline, b''): - line = line.decode(sys.stdout.encoding) - ret += line - p.wait() - if verbose: - print('`' + delim + ' <') - if p.returncode: - if verbose: - print(' '.join(cmd) + ' failed') - raise Exception(time.strftime('%Y-%m-%d %H:%M') + f': Command returned an error: "{pretty_cmd(cmd, wd)}"') - finally: - if cwd: - os.chdir(cwd) - return ret -def run_curl(args: list[str], parse_json: bool=True, wd=None, throw=None, verbose=False, cmd_input=None) -> dict|str: # export + # -- interactive mode + if cmd_input == "mode:auto" and sys.stdin.isatty(): + cmd_input == "mode:interactive" + + if cmd_input == "mode:interactive": + + import pty + + stdout_chunks: list[str] = [] + + reader = __make_pty_reader( + stdout_chunks, + sys.stdout.encoding or "utf-8", + verbose, + ) + + def _spawn(): + return pty.spawn(args, master_read=reader) + + __check_exit_code(await asyncio.to_thread(_spawn)) + + # PTY merges stdout/stderr + return "".join(stdout_chunks), None + + # -- non-interactive mode + stdin = ( + asyncio.subprocess.DEVNULL + if cmd_input is None + else asyncio.subprocess.PIPE + ) + + proc = await asyncio.create_subprocess_exec( + *args, + stdin=stdin, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=env, + ) + + stdout_chunks: list[str] = [] + stderr_chunks: list[str] = [] + + async def read_stream(stream, prio, collector, encoding): + while True: + line = await stream.readline() + if not line: + break + text = line.decode(encoding, errors="replace") + collector.append(text) + __log(prio, text.rstrip("\n")) + + tasks = [ + asyncio.create_task( + read_stream(proc.stdout, NOTICE, stdout_chunks, sys.stdout.encoding or "utf-8") + ), + asyncio.create_task( + read_stream(proc.stderr, ERR, stderr_chunks, sys.stderr.encoding or "utf-8") + ), + ] + + if stdin is asyncio.subprocess.PIPE: + proc.stdin.write(cmd_input.encode(sys.stdout.encoding or "utf-8")) + await proc.stdin.drain() + proc.stdin.close() + + exit_code = await proc.wait() + await asyncio.gather(*tasks) + __check_exit_code(exit_code) + + return ( + "".join(stdout_chunks) if stdout_chunks else None, + "".join(stderr_chunks) if stderr_chunks else None, + ) + + finally: + if cwd is not None: + os.chdir(cwd) + if verbose: + log(NOTICE, '`' + delim + ' <') + +async def run_curl(args: list[str], parse_json: bool=True, wd=None, throw=None, verbose=False, cmd_input=None) -> dict|str: # export cmd = ['curl'] if not verbose: cmd.append('-s') cmd.extend(args) - ret = run_cmd(cmd, wd=wd, throw=throw, verbose=verbose, cmd_input=cmd_input) + ret, stderr = await run_cmd(*cmd, wd=wd, throw=throw, verbose=verbose, cmd_input=cmd_input) if parse_json: try: return json.loads(ret) except Exception as e: - print(f'Failed to parse {len(ret)} bytes output of command >{pretty_cmd(cmd, wd)}< ({e})', file=sys.stderr) + size = 'None' + try: + size = len(ret) + except: + pass + print(f'Failed to parse {size} bytes output of command >{pretty_cmd(cmd, wd)}< ({e})', file=sys.stderr) raise return ret -def run_askpass(askpass_env: list[str], key: AskpassKey, host: str|None=None): +async def run_askpass(askpass_env: list[str], key: AskpassKey, host: str|None=None): assert host is None # Currently unsupported for var in askpass_env: exe = os.getenv(var) @@ -91,12 +208,12 @@ def run_askpass(askpass_env: list[str], key: AskpassKey, host: str|None=None): continue # Can't get user name from SSH_ASKPASS case AskpassKey.Password: exe_arg += 'Password' - ret = run_cmd([exe, exe_arg], throw=False) + ret, stderr = await run_cmd(exe, exe_arg, throw=False) if ret is not None: return ret return None -def get_username(args: Namespace|None=None, url: str|None=None, askpass_env: list[str]=[]) -> str: # export +async def get_username(args: Namespace|None=None, url: str|None=None, askpass_env: list[str]=[]) -> str: # export url_user = None if url is None else urlparse(url).username if args is not None: if args.username is not None: @@ -105,9 +222,9 @@ def get_username(args: Namespace|None=None, url: str|None=None, askpass_env: lis return args.username if url_user is not None: return url_user - return run_askpass(askpass_env, AskpassKey.Username) + return await run_askpass(askpass_env, AskpassKey.Username) -def get_password(args: Namespace|None=None, url: str|None=None, askpass_env: list[str]=[]) -> str: # export +async def get_password(args: Namespace|None=None, url: str|None=None, askpass_env: list[str]=[]) -> str: # export if args is None and url is None and not askpass_env: raise Exception(f'Neither URL nor command-line arguments nor askpass environment variable available, can\'t get password') if args is not None and hasattr(args, 'password'): # use getattr(), because we don't necessarily want to have insecure --password among options @@ -118,4 +235,4 @@ def get_password(args: Namespace|None=None, url: str|None=None, askpass_env: lis parsed = urlparse(url) if parsed.password is not None: return parsed.password - return run_askpass(askpass_env, AskpassKey.Password) + return await run_askpass(askpass_env, AskpassKey.Password)