# -*- coding: utf-8 -*- from __future__ import annotations import abc, re, sys from enum import Enum, auto from typing import NamedTuple, TypeAlias, TYPE_CHECKING if TYPE_CHECKING: from typing import Self, Type from types import TracebackType from .log import * from .util import pretty_cmd class InputMode(Enum): Interactive = auto() NonInteractive = auto() OptInteractive = auto() Auto = auto() Input: TypeAlias = InputMode | None | str class Result(NamedTuple): stdout: str|None stderr: str|None status: int|None def decode(self, encoding='UTF-8', errors='replace') -> Result: return Result( self.stdout.decode(encoding, errors=errors) if self.stdout is not None else None, self.stderr.decode(encoding, errors=errors) if self.stderr is not None else None, self.status ) class ExecContext(abc.ABC): class CallContext: def __init__( self, parent: ExecContext, title: str, cmd: list[str], cmd_input: Input, wd: str|None, log_prefix: str, throw: bool, verbose: bool ) -> None: self.__parent = parent self.__title = title self.__delim = title if title is not None else f'---- {parent.uri}: Running {pretty_cmd(cmd, wd)} -' delim_len = 120 self.__delim += '-' * max(0, delim_len - len(self.__delim)) self.__cmd = cmd self.__wd = wd self.__log_prefix = log_prefix # -- At the end of this dance, interactive needs to be either True # or False interactive: bool|None = None match cmd_input: case InputMode.Interactive: interactive = True case InputMode.NonInteractive: interactive = False case InputMode.OptInteractive: interactive = parent.interactive case InputMode.Auto: interactive = sys.stdin.isatty() if interactive is None: interactive = parent.interactive if interactive is None: interactive = sys.stdin.isatty() assert interactive in [ True, False ] self.__interactive = interactive self.__cmd_input = cmd_input if not isinstance(cmd_input, InputMode) else None self.__throw = throw self.__verbose = verbose if verbose is not None else parent.verbose_default self.__pretty_cmd: str|None = None def __enter__(self) -> CallContext: self.log_delim(start=True) return self def __exit__( self, exc_type: Type[BaseException]|None, exc_value: BaseException|None, traceback: TracebackType|None ) -> bool: self.log_delim(start=False) @property def log_prefix(self) -> str: return self.__log_prefix @property def interactive(self) -> bool: return self.__interactive @property def verbose(self) -> bool: return self.__verbose @property def cmd_input(self) -> bool: return self.__cmd_input @property def pretty_cmd(self) -> str: if self.__pretty_cmd is None: self.__pretty_cmd = pretty_cmd(self.__cmd, self.__wd) return self.__pretty_cmd def log(prio: int, *args, **kwargs) -> None: log(prio, self.__log_prefix, *args, **kwargs) def log_delim(self, start: bool) -> None: if not self.__verbose: return None if start and self.__interactive: log(NOTICE, self.__delim) return delim = ',' + self.__delim + ' >' if start else '`' + self.__delim + ' <' log(NOTICE, delim) def check_exit_code(self, result: Result) -> None: if result.status == 0: return if (self.__throw or self.__verbose): msg = f'Command exited with status {result.status}: {self.pretty_cmd}' if result.stderr: msg += ': ' + result.decode().stderr.strip() if self.__throw: raise RuntimeError(msg) def exception(self, result: Result, e: Exception) -> Result: log(ERR, self.__log_prefix, f'Failed to run {self.pretty_cmd}') if self.__throw: raise e return result def __init__(self, uri: str, interactive: bool|None=None, verbose_default=False): self.__uri = uri self.__interactive = interactive self.__verbose_default = verbose_default assert verbose_default is not None @property def uri(self) -> str: return self.__uri @property def interactive(self) -> bool|None: return self.__interactive @property def verbose_default(self) -> bool: return self.__verbose_default @abc.abstractmethod async def _run(self, *args, **kwargs) -> Result: pass async def run( self, cmd: list[str], wd: str|None = None, throw: bool = True, verbose: bool|None = None, cmd_input: Input = InputMode.OptInteractive, env: dict[str, str]|None = None, title: str=None ) -> Result: """ Run a command asynchronously and return its output Args: cmd: Command and arguments wd: Optional working directory throw: Raise an exception on non-zero exit status if True verbose: Emit log output while the command runs cmd_input: - "InputMode.OptInteractive" -> Let --interactive govern how to handle interactivity (default) - "InputMode.Interactive" -> Inherit terminal stdin - "InputMode.Auto" -> Inherit terminal stdin if it is a TTY - "InputMode.NonInteractive" -> stdin from /dev/null - None -> Alias for InputMode.NonInteractive - otherwise -> Feed cmd_input to stdin env: The environment the command should be run in Returns: A Result instance In PTY mode stderr is always None because PTY merges stdout/stderr. """ ret = Result(None, None, 1) with self.CallContext(self, title=title, cmd=cmd, cmd_input=cmd_input, wd=wd, log_prefix='|', throw=throw, verbose=verbose) as cc: try: ret = await self._run( cmd=cmd, wd=wd, verbose=cc.verbose, cmd_input=cc.cmd_input, env=env, interactive=cc.interactive, log_prefix = cc.log_prefix ) except Exception as e: return cc.exception(ret, e) cc.check_exit_code(ret) return ret async def sudo( self, cmd: list[str], mod_env: dict[str, str]|None=None, opts: list[str]|None=None, wd: str|None = None, throw: bool = True, verbose: bool|None = None, cmd_input: str|None = None, env: dict[str, str]|None = None, title: str=None, ) -> Result: ret = Result(None, None, 1) if opts is None: opts = {} if mod_env is None: mod_env = {} with self.CallContext(self, title=title, cmd=cmd, cmd_input=cmd_input, wd=wd, log_prefix='|', throw=throw, verbose=verbose) as cc: try: ret = await self._sudo( cmd=cmd, mod_env=mod_env, opts=opts, wd=wd, verbose=cc.verbose, cmd_input=cc.cmd_input, env=env, interactive=cc.interactive, log_prefix = cc.log_prefix, ) except Exception as e: return cc.exception(ret, e) cc.check_exit_code(ret) return ret @abc.abstractmethod async def _sudo(self, *args, **kwargs) -> Result: pass @classmethod def create(cls, uri: str, *args, **kwargs) -> Self: tokens = re.split(r'://', uri) schema = tokens[0] match schema: case 'local': from .ec.Local import Local return Local(uri, *args, **kwargs) case 'ssh': from .ec.SSHClient import ssh_client return ssh_client(uri, *args, **kwargs) case _: pass raise Exception(f'Can\'t create execution context for {uri} with unknown schema "{schema}"')