jw-pkg/src/python/jw/pkg/lib/ExecContext.py

243 lines
7.8 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
from __future__ import annotations
import abc, re, sys
from typing import NamedTuple, TYPE_CHECKING
if TYPE_CHECKING:
from typing import Self, Type
from types import TracebackType
from .log import *
from .util import pretty_cmd
class Result(NamedTuple):
stdout: str|None
stderr: str|None
status: int|None
def decode(self, encoding='UTF-8', errors='replace') -> Result:
return Result(
self.stdout.decode(encoding, errors=errors) if self.stdout is not None else None,
self.stderr.decode(encoding, errors=errors) if self.stderr is not None else None,
self.status
)
class ExecContext(abc.ABC):
class CallContext:
def __init__(
self,
parent: ExecContext,
title: str,
cmd: list[str],
cmd_input: str|None,
wd: str|None,
log_prefix: str,
interactive: bool|None,
throw: bool,
verbose: bool
) -> None:
self.__parent = parent
self.__title = title
self.__delim = title if title is not None else f'---- {parent.uri}: Running {pretty_cmd(cmd, wd)} -'
delim_len = 120
self.__delim += '-' * max(0, delim_len - len(self.__delim))
self.__cmd = cmd
self.__cmd_input = cmd_input
self.__wd = wd
self.__log_prefix = log_prefix
self.__interactive = interactive if interactive is not None else (
cmd_input == "mode:interactive"
or (cmd_input == "mode:auto" and sys.stdin.isatty())
)
self.__throw = throw
self.__verbose = verbose if verbose is not None else parent.verbose_default
self.__pretty_cmd: str|None = None
def __enter__(self) -> CallContext:
self.log_delim(start=True)
return self
def __exit__(
self,
exc_type: Type[BaseException]|None,
exc_value: BaseException|None,
traceback: TracebackType|None
) -> bool:
self.log_delim(start=False)
@property
def log_prefix(self) -> str:
return self.__log_prefix
@property
def interactive(self) -> bool:
return self.__interactive
@property
def verbose(self) -> bool:
return self.__verbose
@property
def pretty_cmd(self) -> str:
if self.__pretty_cmd is None:
self.__pretty_cmd = pretty_cmd(self.__cmd, self.__wd)
return self.__pretty_cmd
def log(prio: int, *args, **kwargs) -> None:
log(prio, self.__log_prefix, *args, **kwargs)
def log_delim(self, start: bool) -> None:
if not self.__verbose:
return None
if start and self.__interactive:
log(NOTICE, self.__delim)
return
delim = ',' + self.__delim + ' >' if start else '`' + self.__delim + ' <'
log(NOTICE, delim)
def check_exit_code(self, result: Result) -> None:
if result.status == 0:
return
if (self.__throw or self.__verbose):
msg = f'Command exited with status {result.status}: {self.pretty_cmd}'
if result.stderr:
msg += ': ' + result.decode().stderr.strip()
if self.__throw:
raise RuntimeError(msg)
def exception(self, result: Result, e: Exception) -> Result:
log(ERR, self.__log_prefix, f'Failed to run {self.pretty_cmd}')
if self.__throw:
raise e
return result
def __init__(self, uri: str, interactive: bool=True, verbose_default=False):
self.__uri = uri
self.__interactive = interactive
self.__verbose_default = verbose_default
assert verbose_default is not None
@property
def uri(self) -> str:
return self.__uri
@property
def interactive(self) -> bool:
return self.__interactive
@property
def verbose_default(self) -> bool:
return self.__verbose_default
@abc.abstractmethod
async def _run(self, *args, **kwargs) -> Result:
pass
async def run(
self,
cmd: list[str],
wd: str|None = None,
throw: bool = True,
verbose: bool|None = None,
cmd_input: str|None = None,
env: dict[str, str]|None = None,
title: str=None
) -> Result:
"""
Run a command asynchronously and return its output
Args:
cmd: Command and arguments
wd: Optional working directory
throw: Raise an exception on non-zero exit status if True
verbose: Emit log output while the command runs
cmd_input:
- None -> stdin from /dev/null
- "mode:interactive" -> Inherit terminal stdin
- "mode:auto" -> Inherit terminal stdin if it is a TTY
- otherwise -> String fed to stdin
env: The environment the command should be run in
Returns:
A Result instance
In PTY mode stderr is always None because PTY merges stdout/stderr.
"""
ret = Result(None, None, 1)
with self.CallContext(self, title=title, cmd=cmd, cmd_input=cmd_input, wd=wd,
log_prefix='|', interactive=None, throw=throw, verbose=verbose) as cc:
try:
ret = await self._run(
cmd=cmd,
wd=wd,
verbose=cc.verbose,
cmd_input=cmd_input,
env=env,
interactive=cc.interactive,
log_prefix = cc.log_prefix
)
except Exception as e:
return cc.exception(ret, e)
cc.check_exit_code(ret)
return ret
async def sudo(
self,
cmd: list[str],
mod_env: dict[str, str]|None=None,
opts: list[str]|None=None,
wd: str|None = None,
throw: bool = True,
verbose: bool|None = None,
cmd_input: str|None = None,
env: dict[str, str]|None = None,
title: str=None,
) -> Result:
ret = Result(None, None, 1)
if opts is None:
opts = {}
if mod_env is None:
mod_env = {}
with self.CallContext(self, title=title, cmd=cmd, cmd_input=cmd_input, wd=wd,
log_prefix='|', interactive=None, throw=throw, verbose=verbose) as cc:
try:
ret = await self._sudo(
cmd=cmd,
mod_env=mod_env,
opts=opts,
wd=wd,
verbose=cc.verbose,
cmd_input=cmd_input,
env=env,
interactive=cc.interactive,
log_prefix = cc.log_prefix,
)
except Exception as e:
return cc.exception(ret, e)
cc.check_exit_code(ret)
return ret
@abc.abstractmethod
async def _sudo(self, *args, **kwargs) -> Result:
pass
@classmethod
def create(cls, uri: str, *args, **kwargs) -> Self:
tokens = re.split(r'://', uri)
schema = tokens[0]
match schema:
case 'local':
from .ec.Local import Local
return Local(uri, *args, **kwargs)
case 'ssh':
from .ec.SSHClient import ssh_client
return ssh_client(uri, *args, **kwargs)
case _:
pass
raise Exception(f'Can\'t create execution context for {uri} with unknown schema "{schema}"')