jw-pkg/src/python/jw/pkg/lib/ExecContext.py

172 lines
5.6 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
from __future__ import annotations
import abc, re, sys
from typing import NamedTuple, TYPE_CHECKING
if TYPE_CHECKING:
from typing import Self
from .log import *
from .util import pretty_cmd
class Result(NamedTuple):
stdout: str|None
stderr: str|None
status: int|None
class ExecContext(abc.ABC):
def __init__(self, uri: str, interactive: bool=True, verbose_default=False):
self.__uri = uri
self.__interactive = interactive
self.__verbose_default = verbose_default
assert verbose_default is not None
def _verbose(self, verbose: bool|None) -> bool:
if verbose is not None:
return verbose
return self.__verbose_default
@property
def uri(self) -> str:
return self.__uri
@property
def interactive(self) -> bool:
return self.__interactive
@property
def verbose_default(self) -> bool:
return self.__verbose_default
@abc.abstractmethod
async def _run(self, *args, **kwargs) -> Result:
pass
async def run(
self,
args: list[str],
wd: str|None = None,
throw: bool = True,
verbose: bool|None = None,
cmd_input: str|None = None,
env: dict[str, str]|None = None,
title: str=None,
output_encoding: str|None = None, # None => unchanged; "bytes" => return raw bytes
) -> Result:
"""
Run a command asynchronously and return its output
Args:
args: Command and arguments
wd: Optional working directory
throw: Raise an exception on non-zero exit status if True
verbose: Emit log output while the command runs
cmd_input:
- None -> stdin from /dev/null
- "mode:interactive" -> Inherit terminal stdin
- "mode:auto" -> Inherit terminal stdin if it is a TTY
- otherwise -> String fed to stdin
output_encoding:
- None -> unchanged behavior (decode stdout via sys.stdout.encoding, stderr via sys.stderr.encoding)
- "bytes" -> return raw bytes instead of decoded strings
- otherwise -> decode stdout/stderr using this encoding
Returns:
(stdout, stderr, exit_status):
stdout: stderr each as a string/bytes or None
In PTY mode stderr is always None because PTY merges stdout/stderr.
"""
def __check_exit_code(result: Result) -> None:
if result.status == 0:
return
if (throw or verbose):
msg = f'Command exited with status {code}: {pretty_cmd(args, wd)}'
if result.stderr:
msg += ': ' + result.stderr.strip()
if throw:
raise RuntimeError(msg)
interactive = (
cmd_input == "mode:interactive"
or (cmd_input == "mode:auto" and sys.stdin.isatty())
)
if verbose is None:
verbose = self.__verbose_default
if verbose:
delim_len = 120
delim = title if title is not None else f'---- {self.uri}: Running {pretty_cmd(args, wd)} -'
if interactive:
log(NOTICE, delim)
else:
delim += '-' * max(0, delim_len - len(delim))
log(NOTICE, ',' + delim + ' >')
try:
match output_encoding:
case 'bytes':
output_encoding = None
case None:
output_encoding = sys.stdout.encoding or "utf-8"
ret = Result(None, None, 1)
try:
ret = Result(*await self._run(
args=args,
wd=wd,
verbose=self._verbose(verbose),
cmd_input=cmd_input,
env=env,
interactive=interactive,
log_prefix = '|'
))
except Exception as e:
log(ERR, f'Failed to run {pretty_cmd(args, wd)} ({str(e)}')
if throw:
raise
return ret
__check_exit_code(ret)
if output_encoding is None:
return ret
return Result(
ret.stdout.decode(output_encoding, errors="replace") if ret.stdout is not None else None,
ret.stderr.decode(output_encoding, errors="replace") if ret.stderr is not None else None,
ret.status
)
finally:
if verbose and not interactive:
log(NOTICE, '`' + delim + ' <')
@abc.abstractmethod
async def _sudo(self, cmd: list[str], mod_env: dict[str, str], opts: list[str], verbose: bool) -> Result:
pass
async def sudo(self, cmd: list[str], mod_env: dict[str, str] = {}, opts: list[str]=[], verbose: bool|None=None) -> Result:
return await self._sudo(cmd, mod_env, opts, self._verbose(verbose))
@classmethod
def create(cls, uri: str, *args, **kwargs) -> Self:
tokens = re.split(r'://', uri)
schema = tokens[0]
match schema:
case 'local':
from .ec.Local import Local
return Local(uri, *args, **kwargs)
case 'ssh':
from .SSHClient import ssh_client
return ssh_client(uri, *args, **kwargs)
case _:
pass
raise Exception(f'Can\'t create execution context for {uri} with unknown schema "{schema}"')