lib.ExecContext: Align .sudo() prototype to .run()

ExecContext's .sudo() omits many of run()'s parameters, and this
commit adds them. To avoid redundancy around repeating and massaging
the long parameter list of both functions and their return values, it
also adds some deeper changes:

  - Make run(), _run(), sudo() and _sudo() always return instances of
    Result. Before it was allowed to return a triplet of stdout,
    stderr, and exit status.

  - Have ExecContext stay out of the business of decoding the result
    entirely. Result provides a convenience method .decode()
    operating on stdout and stderr and leaves the decision to the
    caller.

    This entails miniscule adaptations in calling code, namely in
    App.os_release, util.get_profile_env() and CmdListRepos._run().

  - Wrap the _run() and _sudo() callbacks in a context manager object
    of type CallContext to avoid code duplication.

  - Consistently name the first argument to run(), _run(), sudo() and
    _sudo() "cmd", not "args". The latter suggests that the caller is
    omitting the executable, which is not the case.

Signed-off-by: Jan Lindemann <jan@janware.com>
This commit is contained in:
Jan Lindemann 2026-03-19 11:38:16 +01:00
commit 02697af568
6 changed files with 165 additions and 100 deletions

View file

@ -301,7 +301,7 @@ class App(Base):
if self.__os_release is None:
result = self.call_async(self.exec_context.run(['/usr/bin/cat', '/etc/os-release'], throw=True))
assert result.status == 0
self.__os_release = result.stdout.strip()
self.__os_release = result.decode().stdout.strip()
return self.__os_release
def os_release_field(self, key: str, throw: bool=False) -> str:

View file

@ -39,8 +39,7 @@ class CmdListRepos(Cmd): # export
if password is not None:
ssh.set_password(password)
cmd = ['/opt/jw-pkg/bin/git-srv-admin.sh', '-u', args.from_owner, '-j', 'list-personal-projects']
stdout, stderr, code = await ssh.run(cmd)
print(stdout)
print((await ssh.run(cmd)).decode().stdout)
return
case 'https':
cmd_input = None

View file

@ -6,7 +6,8 @@ import abc, re, sys
from typing import NamedTuple, TYPE_CHECKING
if TYPE_CHECKING:
from typing import Self
from typing import Self, Type
from types import TracebackType
from .log import *
from .util import pretty_cmd
@ -17,19 +18,110 @@ class Result(NamedTuple):
stderr: str|None
status: int|None
def decode(self, encoding='UTF-8', errors='replace') -> Result:
return Result(
self.stdout.decode(encoding, errors=errors) if self.stdout is not None else None,
self.stderr.decode(encoding, errors=errors) if self.stderr is not None else None,
self.status
)
class ExecContext(abc.ABC):
class CallContext:
def __init__(
self,
parent: ExecContext,
title: str,
cmd: list[str],
cmd_input: str|None,
wd: str|None,
log_prefix: str,
interactive: bool|None,
throw: bool,
verbose: bool
) -> None:
self.__parent = parent
self.__title = title
self.__delim = title if title is not None else f'---- {parent.uri}: Running {pretty_cmd(cmd, wd)} -'
delim_len = 120
self.__delim += '-' * max(0, delim_len - len(self.__delim))
self.__cmd = cmd
self.__cmd_input = cmd_input
self.__wd = wd
self.__log_prefix = log_prefix
self.__interactive = interactive if interactive is not None else (
cmd_input == "mode:interactive"
or (cmd_input == "mode:auto" and sys.stdin.isatty())
)
self.__throw = throw
self.__verbose = verbose if verbose is not None else parent.verbose_default
self.__pretty_cmd: str|None = None
def __enter__(self) -> CallContext:
self.log_delim(start=True)
return self
def __exit__(
self,
exc_type: Type[BaseException]|None,
exc_value: BaseException|None,
traceback: TracebackType|None
) -> bool:
self.log_delim(start=False)
@property
def log_prefix(self) -> str:
return self.__log_prefix
@property
def interactive(self) -> bool:
return self.__interactive
@property
def verbose(self) -> bool:
return self.__verbose
@property
def pretty_cmd(self) -> str:
if self.__pretty_cmd is None:
self.__pretty_cmd = pretty_cmd(self.__cmd, self.__wd)
return self.__pretty_cmd
def log(prio: int, *args, **kwargs) -> None:
log(prio, self.__log_prefix, *args, **kwargs)
def log_delim(self, start: bool) -> None:
if not self.__verbose:
return None
if start and self.__interactive:
log(NOTICE, self.__delim)
return
delim = ',' + self.__delim + ' >' if start else '`' + self.__delim + ' <'
log(NOTICE, delim)
def check_exit_code(self, result: Result) -> None:
if result.status == 0:
return
if (self.__throw or self.__verbose):
msg = f'Command exited with status {result.status}: {self.pretty_cmd}'
if result.stderr:
msg += ': ' + result.decode().stderr.strip()
if self.__throw:
raise RuntimeError(msg)
def exception(self, result: Result, e: Exception) -> Result:
log(ERR, self.__log_prefix, f'Failed to run {self.pretty_cmd}')
if self.__throw:
raise e
return result
def __init__(self, uri: str, interactive: bool=True, verbose_default=False):
self.__uri = uri
self.__interactive = interactive
self.__verbose_default = verbose_default
assert verbose_default is not None
def _verbose(self, verbose: bool|None) -> bool:
if verbose is not None:
return verbose
return self.__verbose_default
@property
def uri(self) -> str:
return self.__uri
@ -48,20 +140,19 @@ class ExecContext(abc.ABC):
async def run(
self,
args: list[str],
cmd: list[str],
wd: str|None = None,
throw: bool = True,
verbose: bool|None = None,
cmd_input: str|None = None,
env: dict[str, str]|None = None,
title: str=None,
output_encoding: str|None = None, # None => unchanged; "bytes" => return raw bytes
title: str=None
) -> Result:
"""
Run a command asynchronously and return its output
Args:
args: Command and arguments
cmd: Command and arguments
wd: Optional working directory
throw: Raise an exception on non-zero exit status if True
verbose: Emit log output while the command runs
@ -70,92 +161,70 @@ class ExecContext(abc.ABC):
- "mode:interactive" -> Inherit terminal stdin
- "mode:auto" -> Inherit terminal stdin if it is a TTY
- otherwise -> String fed to stdin
output_encoding:
- None -> unchanged behavior (decode stdout via sys.stdout.encoding, stderr via sys.stderr.encoding)
- "bytes" -> return raw bytes instead of decoded strings
- otherwise -> decode stdout/stderr using this encoding
env: The environment the command should be run in
Returns:
(stdout, stderr, exit_status):
stdout: stderr each as a string/bytes or None
A Result instance
In PTY mode stderr is always None because PTY merges stdout/stderr.
"""
def __check_exit_code(result: Result) -> None:
if result.status == 0:
return
if (throw or verbose):
msg = f'Command exited with status {code}: {pretty_cmd(args, wd)}'
if result.stderr:
msg += ': ' + result.stderr.strip()
if throw:
raise RuntimeError(msg)
interactive = (
cmd_input == "mode:interactive"
or (cmd_input == "mode:auto" and sys.stdin.isatty())
)
if verbose is None:
verbose = self.__verbose_default
if verbose:
delim_len = 120
delim = title if title is not None else f'---- {self.uri}: Running {pretty_cmd(args, wd)} -'
if interactive:
log(NOTICE, delim)
else:
delim += '-' * max(0, delim_len - len(delim))
log(NOTICE, ',' + delim + ' >')
try:
match output_encoding:
case 'bytes':
output_encoding = None
case None:
output_encoding = sys.stdout.encoding or "utf-8"
ret = Result(None, None, 1)
with self.CallContext(self, title=title, cmd=cmd, cmd_input=cmd_input, wd=wd,
log_prefix='|', interactive=None, throw=throw, verbose=verbose) as cc:
try:
ret = Result(*await self._run(
args=args,
ret = await self._run(
cmd=cmd,
wd=wd,
verbose=self._verbose(verbose),
verbose=cc.verbose,
cmd_input=cmd_input,
env=env,
interactive=interactive,
log_prefix = '|'
))
except Exception as e:
log(ERR, f'Failed to run {pretty_cmd(args, wd)} ({str(e)}')
if throw:
raise
return ret
__check_exit_code(ret)
if output_encoding is None:
return ret
return Result(
ret.stdout.decode(output_encoding, errors="replace") if ret.stdout is not None else None,
ret.stderr.decode(output_encoding, errors="replace") if ret.stderr is not None else None,
ret.status
interactive=cc.interactive,
log_prefix = cc.log_prefix
)
except Exception as e:
return cc.exception(ret, e)
cc.check_exit_code(ret)
return ret
finally:
if verbose and not interactive:
log(NOTICE, '`' + delim + ' <')
async def sudo(
self,
cmd: list[str],
mod_env: dict[str, str]|None=None,
opts: list[str]|None=None,
wd: str|None = None,
throw: bool = True,
verbose: bool|None = None,
cmd_input: str|None = None,
env: dict[str, str]|None = None,
title: str=None,
) -> Result:
ret = Result(None, None, 1)
if opts is None:
opts = {}
with self.CallContext(self, title=title, cmd=cmd, cmd_input=cmd_input, wd=wd,
log_prefix='|', interactive=None, throw=throw, verbose=verbose) as cc:
try:
ret = await self._sudo(
cmd=cmd,
mod_env=mod_env,
opts=opts,
wd=wd,
verbose=cc.verbose,
cmd_input=cmd_input,
env=env,
interactive=cc.interactive,
log_prefix = cc.log_prefix,
)
except Exception as e:
return cc.exception(ret, e)
cc.check_exit_code(ret)
return ret
@abc.abstractmethod
async def _sudo(self, cmd: list[str], mod_env: dict[str, str], opts: list[str], verbose: bool) -> Result:
async def _sudo(self, *args, **kwargs) -> Result:
pass
async def sudo(self, cmd: list[str], mod_env: dict[str, str] = {}, opts: list[str]=[], verbose: bool|None=None) -> Result:
return await self._sudo(cmd, mod_env, opts, self._verbose(verbose))
@classmethod
def create(cls, uri: str, *args, **kwargs) -> Self:
tokens = re.split(r'://', uri)

View file

@ -28,7 +28,7 @@ class SSHClient(ExecContext):
async def _run(
self,
args: list[str],
cmd: list[str],
wd: str|None,
verbose: bool,
cmd_input: str|None,
@ -52,7 +52,7 @@ class SSHClient(ExecContext):
__log(prio, f'`{delim}')
if wd is not None:
args = ['cd', wd, '&&', *args]
cmd = ['cd', wd, '&&', *cmd]
if interactive:
raise NotImplementedError('Interactive SSH is not yet implemented')
@ -60,7 +60,7 @@ class SSHClient(ExecContext):
if env is not None:
raise NotImplementedError('Passing an environment to SSH commands is not yet implemented')
ret = await self._run_ssh(args, cmd_input=cmd_input)
ret = await self._run_ssh(cmd, cmd_input=cmd_input)
if verbose:
__log_block(NOTICE, 'stdout', ret.stdout)
__log_block(NOTICE, 'stderr', ret.stderr)
@ -107,6 +107,7 @@ class SSHClientInternal(SSHClient): # export
try:
ret.connect(
hostname=self.hostname,
username=self.username,
allow_agent=True
)
except Exception as e:

View file

@ -16,7 +16,7 @@ class Local(Base):
async def _run(
self,
args: list[str],
cmd: list[str],
wd: str|None,
verbose: bool,
cmd_input: str|None,
@ -57,11 +57,11 @@ class Local(Base):
old_env = os.environ.copy()
try:
os.environ.update(env)
return pty.spawn(args, master_read=reader)
return pty.spawn(cmd, master_read=reader)
finally:
os.environ.clear()
os.environ.update(old_env)
return pty.spawn(args, master_read=reader)
return pty.spawn(cmd, master_read=reader)
stdout_chunks: list[bytes] = []
enc_for_verbose = sys.stdout.encoding or "utf-8"
@ -71,13 +71,13 @@ class Local(Base):
# PTY merges stdout/stderr
stdout = b"".join(stdout_chunks) if stdout_chunks else None
return stdout, None, exit_code
return Result(stdout, None, exit_code)
# -- non-interactive mode
stdin = asyncio.subprocess.DEVNULL if cmd_input is None else asyncio.subprocess.PIPE
proc = await asyncio.create_subprocess_exec(
*args,
*cmd,
stdin=stdin,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
@ -127,13 +127,13 @@ class Local(Base):
stdout = b"".join(stdout_parts) if stdout_parts else None
stderr = b"".join(stderr_parts) if stderr_parts else None
return stdout, stderr, exit_code
return Result(stdout, stderr, exit_code)
finally:
if cwd is not None:
os.chdir(cwd)
async def _sudo(self, cmd: list[str], mod_env: dict[str, str], opts: list[str], verbose: bool) -> Result:
async def _sudo(self, cmd: list[str], mod_env: dict[str, str], opts: list[str], *args, **kwargs) -> Result:
env: dict[str, str]|None = None
cmd_input: str|None = None
if mod_env:
@ -146,8 +146,4 @@ class Local(Base):
cmdline.append('--preserve-env=' + ','.join(mod_env.keys()))
cmdline.extend(opts)
cmdline.extend(cmd)
if self.interactive:
cmd_input = "mode:interactive"
# Need to call the base class function, because _run() needs more
# parameters than we have values for
return await self.run(cmdline, throw=True, verbose=verbose, env=env, cmd_input=cmd_input)
return await self._run(cmdline, *args, **kwargs)

View file

@ -139,9 +139,9 @@ async def get_profile_env(throw: bool=True, keep: Iterable[str]|bool=False, ec:
}
# Run bash as a login shell, which sources /etc/profile, then print environment as NUL-separated key=value pairs
cmd = ['/usr/bin/env', '-i', '/bin/bash', '-lc', 'env -0']
stdout, stderr, status = await run_cmd(cmd, throw=throw, output_encoding="bytes", verbose=True, env=env, ec=ec)
result = await run_cmd(cmd, throw=throw, verbose=True, env=env, ec=ec)
ret: dict[str, str] = {}
for entry in stdout.split(b"\0"):
for entry in result.stdout.split(b"\0"):
if not entry:
continue
key, val = entry.split(b"=", 1)