lib.ExecContext.run(): Push code up into base class

Take implementation burden from the derived classes _run() callback
by moving the respective code into the run() wrapper methods of the
base class.

Signed-off-by: Jan Lindemann <jan@janware.com>
This commit is contained in:
Jan Lindemann 2026-03-18 10:22:21 +01:00
commit 52dd3b8f21
3 changed files with 103 additions and 129 deletions

View file

@ -18,29 +18,16 @@ class Local(Base):
self,
args: list[str],
wd: str|None,
throw: bool,
verbose: bool,
cmd_input: str|None,
env: dict[str, str]|None,
title: str,
output_encoding: str|None, # None => unchanged; "bytes" => return raw bytes
interactive: bool,
log_prefix: str
) -> Result:
want_bytes = (output_encoding == "bytes")
def __log(prio, *args, verbose=verbose):
if verbose:
log(prio, "|", *args)
def __check_exit_code(code: int, stdout=None, stderr=None):
if code == 0:
return
if (throw or verbose):
msg = f'Command exited with status {code}: {pretty_cmd(args, wd)}'
if stderr:
msg += ': ' + stderr.strip()
if throw:
raise RuntimeError(msg)
log(prio, log_prefix, *args)
def __make_pty_reader(collector: list[bytes], enc_for_verbose: str):
def _read(fd):
@ -51,20 +38,6 @@ class Local(Base):
return ret
return _read
interactive = (
cmd_input == "mode:interactive"
or (cmd_input == "mode:auto" and sys.stdin.isatty())
)
if verbose:
delim_len = 120
delim = title if title is not None else f'---- Running {pretty_cmd(args, wd)} -'
if interactive:
log(NOTICE, delim)
else:
delim += '-' * max(0, delim_len - len(delim))
log(NOTICE, ',' + delim + ' >')
cwd: str|None = None
if wd is not None:
cwd = os.getcwd()
@ -78,16 +51,6 @@ class Local(Base):
import pty
stdout_chunks_b: list[bytes] = []
enc_for_verbose = (
(sys.stdout.encoding or "utf-8")
if output_encoding in (None, "bytes")
else output_encoding
)
reader = __make_pty_reader(stdout_chunks_b, enc_for_verbose)
def _spawn():
# Apply env in PTY mode by temporarily updating os.environ around spawn.
if env:
@ -100,24 +63,18 @@ class Local(Base):
os.environ.update(old_env)
return pty.spawn(args, master_read=reader)
stdout_chunks: list[bytes] = []
enc_for_verbose = sys.stdout.encoding or "utf-8"
reader = __make_pty_reader(stdout_chunks, enc_for_verbose)
exit_code = await asyncio.to_thread(_spawn)
__check_exit_code(exit_code)
# PTY merges stdout/stderr
stdout_b = b"".join(stdout_chunks_b) if stdout_chunks_b else None
if want_bytes:
return stdout_b, None, exit_code
stdout_dec_enc = (sys.stdout.encoding or "utf-8") if output_encoding is None else output_encoding
stdout_s = stdout_b.decode(stdout_dec_enc, errors="replace") if stdout_b is not None else None
return stdout_s, None, exit_code
stdout = b"".join(stdout_chunks) if stdout_chunks else None
return stdout, None, exit_code
# -- non-interactive mode
stdin = (
asyncio.subprocess.DEVNULL
if cmd_input is None
else asyncio.subprocess.PIPE
)
stdin = asyncio.subprocess.DEVNULL if cmd_input is None else asyncio.subprocess.PIPE
proc = await asyncio.create_subprocess_exec(
*args,
@ -127,16 +84,12 @@ class Local(Base):
env=env,
)
stdout_parts_b: list[bytes] = []
stderr_parts_b: list[bytes] = []
stdout_parts: list[bytes] = []
stderr_parts: list[bytes] = []
# -- decoding for verbose output in pipe mode
if output_encoding is None or want_bytes:
stdout_log_enc = sys.stdout.encoding or "utf-8"
stderr_log_enc = sys.stderr.encoding or "utf-8"
else:
stdout_log_enc = output_encoding
stderr_log_enc = output_encoding
stdout_log_enc = sys.stdout.encoding or "utf-8"
stderr_log_enc = sys.stderr.encoding or "utf-8"
async def read_stream(stream, prio, collector: list[bytes], log_enc: str):
buf = b""
@ -156,10 +109,10 @@ class Local(Base):
tasks = [
asyncio.create_task(
read_stream(proc.stdout, NOTICE, stdout_parts_b, stdout_log_enc)
read_stream(proc.stdout, NOTICE, stdout_parts, stdout_log_enc)
),
asyncio.create_task(
read_stream(proc.stderr, ERR, stderr_parts_b, stderr_log_enc)
read_stream(proc.stderr, ERR, stderr_parts, stderr_log_enc)
),
]
@ -170,35 +123,15 @@ class Local(Base):
exit_code = await proc.wait()
await asyncio.gather(*tasks)
if want_bytes:
__check_exit_code(exit_code)
stdout_b = b"".join(stdout_parts_b) if stdout_parts_b else None
stderr_b = b"".join(stderr_parts_b) if stderr_parts_b else None
stdout = b"".join(stdout_parts) if stdout_parts else None
stderr = b"".join(stderr_parts) if stderr_parts else None
if want_bytes:
return stdout_b, stderr_b, exit_code
if output_encoding is None:
stdout_dec_enc = sys.stdout.encoding or "utf-8"
stderr_dec_enc = sys.stderr.encoding or "utf-8"
else:
stdout_dec_enc = output_encoding
stderr_dec_enc = output_encoding
stdout_s = stdout_b.decode(stdout_dec_enc, errors="replace") if stdout_b is not None else None
stderr_s = stderr_b.decode(stderr_dec_enc, errors="replace") if stderr_b is not None else None
if not want_bytes:
__check_exit_code(exit_code, stdout=stdout_s, stderr=stderr_s)
return stdout_s, stderr_s, exit_code
return stdout, stderr, exit_code
finally:
if cwd is not None:
os.chdir(cwd)
if verbose and not interactive:
log(NOTICE, '`' + delim + ' <')
async def _sudo(self, cmd: list[str], mod_env: dict[str, str], opts: list[str], verbose: bool) -> Result:
env: dict[str, str]|None = None