# -*- coding: utf-8 -*- from __future__ import annotations import abc, re, sys from typing import NamedTuple, TYPE_CHECKING if TYPE_CHECKING: from typing import Self, Type from types import TracebackType from .log import * from .util import pretty_cmd class Result(NamedTuple): stdout: str|None stderr: str|None status: int|None def decode(self, encoding='UTF-8', errors='replace') -> Result: return Result( self.stdout.decode(encoding, errors=errors) if self.stdout is not None else None, self.stderr.decode(encoding, errors=errors) if self.stderr is not None else None, self.status ) class ExecContext(abc.ABC): class CallContext: def __init__( self, parent: ExecContext, title: str, cmd: list[str], cmd_input: str|None, wd: str|None, log_prefix: str, interactive: bool|None, throw: bool, verbose: bool ) -> None: self.__parent = parent self.__title = title self.__delim = title if title is not None else f'---- {parent.uri}: Running {pretty_cmd(cmd, wd)} -' delim_len = 120 self.__delim += '-' * max(0, delim_len - len(self.__delim)) self.__cmd = cmd self.__cmd_input = cmd_input self.__wd = wd self.__log_prefix = log_prefix self.__interactive = interactive if interactive is not None else ( cmd_input == "mode:interactive" or (cmd_input == "mode:auto" and sys.stdin.isatty()) ) self.__throw = throw self.__verbose = verbose if verbose is not None else parent.verbose_default self.__pretty_cmd: str|None = None def __enter__(self) -> CallContext: self.log_delim(start=True) return self def __exit__( self, exc_type: Type[BaseException]|None, exc_value: BaseException|None, traceback: TracebackType|None ) -> bool: self.log_delim(start=False) @property def log_prefix(self) -> str: return self.__log_prefix @property def interactive(self) -> bool: return self.__interactive @property def verbose(self) -> bool: return self.__verbose @property def pretty_cmd(self) -> str: if self.__pretty_cmd is None: self.__pretty_cmd = pretty_cmd(self.__cmd, self.__wd) return self.__pretty_cmd def log(prio: int, *args, **kwargs) -> None: log(prio, self.__log_prefix, *args, **kwargs) def log_delim(self, start: bool) -> None: if not self.__verbose: return None if start and self.__interactive: log(NOTICE, self.__delim) return delim = ',' + self.__delim + ' >' if start else '`' + self.__delim + ' <' log(NOTICE, delim) def check_exit_code(self, result: Result) -> None: if result.status == 0: return if (self.__throw or self.__verbose): msg = f'Command exited with status {result.status}: {self.pretty_cmd}' if result.stderr: msg += ': ' + result.decode().stderr.strip() if self.__throw: raise RuntimeError(msg) def exception(self, result: Result, e: Exception) -> Result: log(ERR, self.__log_prefix, f'Failed to run {self.pretty_cmd}') if self.__throw: raise e return result def __init__(self, uri: str, interactive: bool=True, verbose_default=False): self.__uri = uri self.__interactive = interactive self.__verbose_default = verbose_default assert verbose_default is not None @property def uri(self) -> str: return self.__uri @property def interactive(self) -> bool: return self.__interactive @property def verbose_default(self) -> bool: return self.__verbose_default @abc.abstractmethod async def _run(self, *args, **kwargs) -> Result: pass async def run( self, cmd: list[str], wd: str|None = None, throw: bool = True, verbose: bool|None = None, cmd_input: str|None = None, env: dict[str, str]|None = None, title: str=None ) -> Result: """ Run a command asynchronously and return its output Args: cmd: Command and arguments wd: Optional working directory throw: Raise an exception on non-zero exit status if True verbose: Emit log output while the command runs cmd_input: - None -> stdin from /dev/null - "mode:interactive" -> Inherit terminal stdin - "mode:auto" -> Inherit terminal stdin if it is a TTY - otherwise -> String fed to stdin env: The environment the command should be run in Returns: A Result instance In PTY mode stderr is always None because PTY merges stdout/stderr. """ ret = Result(None, None, 1) with self.CallContext(self, title=title, cmd=cmd, cmd_input=cmd_input, wd=wd, log_prefix='|', interactive=None, throw=throw, verbose=verbose) as cc: try: ret = await self._run( cmd=cmd, wd=wd, verbose=cc.verbose, cmd_input=cmd_input, env=env, interactive=cc.interactive, log_prefix = cc.log_prefix ) except Exception as e: return cc.exception(ret, e) cc.check_exit_code(ret) return ret async def sudo( self, cmd: list[str], mod_env: dict[str, str]|None=None, opts: list[str]|None=None, wd: str|None = None, throw: bool = True, verbose: bool|None = None, cmd_input: str|None = None, env: dict[str, str]|None = None, title: str=None, ) -> Result: ret = Result(None, None, 1) if opts is None: opts = {} with self.CallContext(self, title=title, cmd=cmd, cmd_input=cmd_input, wd=wd, log_prefix='|', interactive=None, throw=throw, verbose=verbose) as cc: try: ret = await self._sudo( cmd=cmd, mod_env=mod_env, opts=opts, wd=wd, verbose=cc.verbose, cmd_input=cmd_input, env=env, interactive=cc.interactive, log_prefix = cc.log_prefix, ) except Exception as e: return cc.exception(ret, e) cc.check_exit_code(ret) return ret @abc.abstractmethod async def _sudo(self, *args, **kwargs) -> Result: pass @classmethod def create(cls, uri: str, *args, **kwargs) -> Self: tokens = re.split(r'://', uri) schema = tokens[0] match schema: case 'local': from .ec.Local import Local return Local(uri, *args, **kwargs) case 'ssh': from .SSHClient import ssh_client return ssh_client(uri, *args, **kwargs) case _: pass raise Exception(f'Can\'t create execution context for {uri} with unknown schema "{schema}"')