diff --git a/src/python/jw/pkg/lib/ExecContext.py b/src/python/jw/pkg/lib/ExecContext.py index f021b1c2..2177e3b3 100644 --- a/src/python/jw/pkg/lib/ExecContext.py +++ b/src/python/jw/pkg/lib/ExecContext.py @@ -22,8 +22,50 @@ class ExecContext(abc.ABC): async def _run(self, *args, **kwargs) -> Result: pass - async def run(self, *args, **kwargs) -> Result: - return await self._run(*args, **kwargs) + async def run( + self, + args: list[str], + wd: str|None = None, + throw: bool = True, + verbose: bool = False, + cmd_input: str|None = None, + env: dict[str, str]|None = None, + title: str=None, + output_encoding: str|None = None, # None => unchanged; "bytes" => return raw bytes + ) -> Result: + """ + Run a command asynchronously and return its output + + Args: + args: Command and arguments + wd: Optional working directory + throw: Raise an exception on non-zero exit status if True + verbose: Emit log output while the command runs + cmd_input: + - None -> stdin from /dev/null + - "mode:interactive" -> Inherit terminal stdin + - "mode:auto" -> Inherit terminal stdin if it is a TTY + - otherwise -> String fed to stdin + output_encoding: + - None -> unchanged behavior (decode stdout via sys.stdout.encoding, stderr via sys.stderr.encoding) + - "bytes" -> return raw bytes instead of decoded strings + - otherwise -> decode stdout/stderr using this encoding + + Returns: + (stdout, stderr, exit_status): + stdout: stderr each as a string/bytes or None + In PTY mode stderr is always None because PTY merges stdout/stderr. + """ + return await self._run( + args=args, + wd=wd, + throw=throw, + verbose=verbose, + cmd_input=cmd_input, + env=env, + title=title, + output_encoding=output_encoding + ) @abc.abstractmethod async def _sudo(self, cmd: list[str], mod_env: dict[str, str] = {}, opts: list[str]=[], verbose=True) -> Result: diff --git a/src/python/jw/pkg/lib/ec/Local.py b/src/python/jw/pkg/lib/ec/Local.py index 0ef46a93..dde5a04c 100644 --- a/src/python/jw/pkg/lib/ec/Local.py +++ b/src/python/jw/pkg/lib/ec/Local.py @@ -1,15 +1,201 @@ # -*- coding: utf-8 -*- -import os +import os, sys, subprocess, asyncio from ..util import run_cmd from ..ExecContext import ExecContext as Base from ..ExecContext import Result +from ..log import * +from ..util import pretty_cmd + class Local(Base): - async def _run(self, *args, **kwargs) -> Result: - return await run_cmd(*args, **kwargs) + async def _run( + self, + args: list[str], + wd: str|None = None, + throw: bool = True, + verbose: bool = False, + cmd_input: str|None = None, + env: dict[str, str]|None = None, + title: str=None, + output_encoding: str|None = None, # None => unchanged; "bytes" => return raw bytes + ) -> tuple[str|bytes|None, str|bytes|None]: + + want_bytes = (output_encoding == "bytes") + + def __log(prio, *args, verbose=verbose): + if verbose: + log(prio, "|", *args) + + def __check_exit_code(code: int, stdout=None, stderr=None): + if code == 0: + return + if (throw or verbose): + msg = f'Command returned error {code}: {pretty_cmd(args, wd)}' + if stderr: + msg += ': ' + stderr.strip() + if throw: + raise RuntimeError(msg) + + def __make_pty_reader(collector: list[bytes], enc_for_verbose: str): + def _read(fd): + ret = os.read(fd, 1024) + if not ret: + return ret + collector.append(ret) + return ret + return _read + + interactive = ( + cmd_input == "mode:interactive" + or (cmd_input == "mode:auto" and sys.stdin.isatty()) + ) + + if verbose: + delim_len = 120 + delim = title if title is not None else f'---- Running {pretty_cmd(args, wd)} -' + if interactive: + log(NOTICE, delim) + else: + delim += '-' * max(0, delim_len - len(delim)) + log(NOTICE, ',' + delim + ' >') + + cwd: str|None = None + if wd is not None: + cwd = os.getcwd() + os.chdir(wd) + + try: + + # -- interactive mode + + if interactive: + + import pty + + stdout_chunks_b: list[bytes] = [] + + enc_for_verbose = ( + (sys.stdout.encoding or "utf-8") + if output_encoding in (None, "bytes") + else output_encoding + ) + + reader = __make_pty_reader(stdout_chunks_b, enc_for_verbose) + + def _spawn(): + # Apply env in PTY mode by temporarily updating os.environ around spawn. + if env: + old_env = os.environ.copy() + try: + os.environ.update(env) + return pty.spawn(args, master_read=reader) + finally: + os.environ.clear() + os.environ.update(old_env) + return pty.spawn(args, master_read=reader) + + exit_code = await asyncio.to_thread(_spawn) + __check_exit_code(exit_code) + + # PTY merges stdout/stderr + stdout_b = b"".join(stdout_chunks_b) if stdout_chunks_b else None + if want_bytes: + return stdout_b, None, exit_code + + stdout_dec_enc = (sys.stdout.encoding or "utf-8") if output_encoding is None else output_encoding + stdout_s = stdout_b.decode(stdout_dec_enc, errors="replace") if stdout_b is not None else None + return stdout_s, None, exit_code + + # -- non-interactive mode + stdin = ( + asyncio.subprocess.DEVNULL + if cmd_input is None + else asyncio.subprocess.PIPE + ) + + proc = await asyncio.create_subprocess_exec( + *args, + stdin=stdin, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=env, + ) + + stdout_parts_b: list[bytes] = [] + stderr_parts_b: list[bytes] = [] + + # -- decoding for verbose output in pipe mode + if output_encoding is None or want_bytes: + stdout_log_enc = sys.stdout.encoding or "utf-8" + stderr_log_enc = sys.stderr.encoding or "utf-8" + else: + stdout_log_enc = output_encoding + stderr_log_enc = output_encoding + + async def read_stream(stream, prio, collector: list[bytes], log_enc: str): + buf = b"" + while True: + chunk = await stream.read(4096) + if not chunk: + break + collector.append(chunk) + if verbose: + buf += chunk + while b"\n" in buf: + line, buf = buf.split(b"\n", 1) + __log(prio, line.decode(log_enc, errors="replace")) + if verbose and buf: + # flush trailing partial line (no newline) + __log(prio, buf.decode(log_enc, errors="replace")) + + tasks = [ + asyncio.create_task( + read_stream(proc.stdout, NOTICE, stdout_parts_b, stdout_log_enc) + ), + asyncio.create_task( + read_stream(proc.stderr, ERR, stderr_parts_b, stderr_log_enc) + ), + ] + + if stdin is asyncio.subprocess.PIPE: + proc.stdin.write(cmd_input.encode(sys.stdout.encoding or "utf-8")) + await proc.stdin.drain() + proc.stdin.close() + + exit_code = await proc.wait() + await asyncio.gather(*tasks) + if want_bytes: + __check_exit_code(exit_code) + + stdout_b = b"".join(stdout_parts_b) if stdout_parts_b else None + stderr_b = b"".join(stderr_parts_b) if stderr_parts_b else None + + if want_bytes: + return stdout_b, stderr_b, exit_code + + if output_encoding is None: + stdout_dec_enc = sys.stdout.encoding or "utf-8" + stderr_dec_enc = sys.stderr.encoding or "utf-8" + else: + stdout_dec_enc = output_encoding + stderr_dec_enc = output_encoding + + stdout_s = stdout_b.decode(stdout_dec_enc, errors="replace") if stdout_b is not None else None + stderr_s = stderr_b.decode(stderr_dec_enc, errors="replace") if stderr_b is not None else None + + if not want_bytes: + __check_exit_code(exit_code, stdout=stdout_s, stderr=stderr_s) + + return stdout_s, stderr_s, exit_code + + finally: + if cwd is not None: + os.chdir(cwd) + if verbose and not interactive: + log(NOTICE, '`' + delim + ' <') async def _sudo(self, cmd: list[str], mod_env: dict[str, str] = {}, opts: list[str]=[], verbose=True) -> Result: env: dict[str, str]|None = None diff --git a/src/python/jw/pkg/lib/util.py b/src/python/jw/pkg/lib/util.py index e164c3dc..ab294a78 100644 --- a/src/python/jw/pkg/lib/util.py +++ b/src/python/jw/pkg/lib/util.py @@ -1,8 +1,14 @@ # -*- coding: utf-8 -*- -from typing import Sequence, Iterable +from __future__ import annotations -import os, sys, subprocess, json, time, asyncio +from typing import TYPE_CHECKING, Iterable + +if TYPE_CHECKING: + from typing import Sequence + from ExecContext import ExecContext + +import os, sys, json from argparse import Namespace from urllib.parse import urlparse @@ -25,213 +31,11 @@ def pretty_cmd(cmd: list[str], wd=None): ret += f' in {wd}' return ret -async def run_cmd( - args: list[str], - wd: str|None = None, - throw: bool = True, - verbose: bool = False, - cmd_input: str|None = None, - env: dict[str, str]|None = None, - title: str=None, - output_encoding: str|None = None, # None => unchanged; "bytes" => return raw bytes -) -> tuple[str|bytes|None, str|bytes|None]: - """ - Run a command asynchronously and return its output - - Args: - args: Command and arguments - wd: Optional working directory - throw: Raise an exception on non-zero exit status if True - verbose: Emit log output while the command runs - cmd_input: - - None -> stdin from /dev/null - - "mode:interactive" -> Inherit terminal stdin - - "mode:auto" -> Inherit terminal stdin if it is a TTY - - otherwise -> String fed to stdin - output_encoding: - - None -> unchanged behavior (decode stdout via sys.stdout.encoding, stderr via sys.stderr.encoding) - - "bytes" -> return raw bytes instead of decoded strings - - otherwise -> decode stdout/stderr using this encoding - - Returns: - (stdout, stderr, exit_status): - stdout: stderr each as a string/bytes or None - In PTY mode stderr is always None because PTY merges stdout/stderr. - """ - - want_bytes = (output_encoding == "bytes") - - def __log(prio, *args, verbose=verbose): - if verbose: - log(prio, "|", *args) - - def __check_exit_code(code: int, stdout=None, stderr=None): - if code == 0: - return - if (throw or verbose): - msg = f'Command returned error {code}: {pretty_cmd(args, wd)}' - if stderr: - msg += ': ' + stderr.strip() - if throw: - raise RuntimeError(msg) - - def __make_pty_reader(collector: list[bytes], enc_for_verbose: str): - def _read(fd): - ret = os.read(fd, 1024) - if not ret: - return ret - collector.append(ret) - return ret - return _read - - interactive = ( - cmd_input == "mode:interactive" - or (cmd_input == "mode:auto" and sys.stdin.isatty()) - ) - - if verbose: - delim_len = 120 - delim = title if title is not None else f'---- Running {pretty_cmd(args, wd)} -' - if interactive: - log(NOTICE, delim) - else: - delim += '-' * max(0, delim_len - len(delim)) - log(NOTICE, ',' + delim + ' >') - - cwd: str|None = None - if wd is not None: - cwd = os.getcwd() - os.chdir(wd) - - try: - - # -- interactive mode - - if interactive: - - import pty - - stdout_chunks_b: list[bytes] = [] - - enc_for_verbose = ( - (sys.stdout.encoding or "utf-8") - if output_encoding in (None, "bytes") - else output_encoding - ) - - reader = __make_pty_reader(stdout_chunks_b, enc_for_verbose) - - def _spawn(): - # Apply env in PTY mode by temporarily updating os.environ around spawn. - if env: - old_env = os.environ.copy() - try: - os.environ.update(env) - return pty.spawn(args, master_read=reader) - finally: - os.environ.clear() - os.environ.update(old_env) - return pty.spawn(args, master_read=reader) - - exit_code = await asyncio.to_thread(_spawn) - __check_exit_code(exit_code) - - # PTY merges stdout/stderr - stdout_b = b"".join(stdout_chunks_b) if stdout_chunks_b else None - if want_bytes: - return stdout_b, None, exit_code - - stdout_dec_enc = (sys.stdout.encoding or "utf-8") if output_encoding is None else output_encoding - stdout_s = stdout_b.decode(stdout_dec_enc, errors="replace") if stdout_b is not None else None - return stdout_s, None, exit_code - - # -- non-interactive mode - stdin = ( - asyncio.subprocess.DEVNULL - if cmd_input is None - else asyncio.subprocess.PIPE - ) - - proc = await asyncio.create_subprocess_exec( - *args, - stdin=stdin, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - env=env, - ) - - stdout_parts_b: list[bytes] = [] - stderr_parts_b: list[bytes] = [] - - # -- decoding for verbose output in pipe mode - if output_encoding is None or want_bytes: - stdout_log_enc = sys.stdout.encoding or "utf-8" - stderr_log_enc = sys.stderr.encoding or "utf-8" - else: - stdout_log_enc = output_encoding - stderr_log_enc = output_encoding - - async def read_stream(stream, prio, collector: list[bytes], log_enc: str): - buf = b"" - while True: - chunk = await stream.read(4096) - if not chunk: - break - collector.append(chunk) - if verbose: - buf += chunk - while b"\n" in buf: - line, buf = buf.split(b"\n", 1) - __log(prio, line.decode(log_enc, errors="replace")) - if verbose and buf: - # flush trailing partial line (no newline) - __log(prio, buf.decode(log_enc, errors="replace")) - - tasks = [ - asyncio.create_task( - read_stream(proc.stdout, NOTICE, stdout_parts_b, stdout_log_enc) - ), - asyncio.create_task( - read_stream(proc.stderr, ERR, stderr_parts_b, stderr_log_enc) - ), - ] - - if stdin is asyncio.subprocess.PIPE: - proc.stdin.write(cmd_input.encode(sys.stdout.encoding or "utf-8")) - await proc.stdin.drain() - proc.stdin.close() - - exit_code = await proc.wait() - await asyncio.gather(*tasks) - if want_bytes: - __check_exit_code(exit_code) - - stdout_b = b"".join(stdout_parts_b) if stdout_parts_b else None - stderr_b = b"".join(stderr_parts_b) if stderr_parts_b else None - - if want_bytes: - return stdout_b, stderr_b, exit_code - - if output_encoding is None: - stdout_dec_enc = sys.stdout.encoding or "utf-8" - stderr_dec_enc = sys.stderr.encoding or "utf-8" - else: - stdout_dec_enc = output_encoding - stderr_dec_enc = output_encoding - - stdout_s = stdout_b.decode(stdout_dec_enc, errors="replace") if stdout_b is not None else None - stderr_s = stderr_b.decode(stderr_dec_enc, errors="replace") if stderr_b is not None else None - - if not want_bytes: - __check_exit_code(exit_code, stdout=stdout_s, stderr=stderr_s) - - return stdout_s, stderr_s, exit_code - - finally: - if cwd is not None: - os.chdir(cwd) - if verbose and not interactive: - log(NOTICE, '`' + delim + ' <') +# See ec.Local.run() for what this function does +async def run_cmd(*args, ec: ExecContext|None=None, **kwargs) -> tuple[str|bytes|None, str|bytes|None]: + if ec is not None: + ec = Local() + return await ec.run(*args, **kwargs) async def run_curl(args: list[str], parse_json: bool=True, wd=None, throw=None, verbose=False, cmd_input=None) -> dict|str: # export cmd = ['curl']