# -*- coding: utf-8 -*- from __future__ import annotations import abc, re, sys from enum import Enum, auto from typing import NamedTuple, TYPE_CHECKING if TYPE_CHECKING: from typing import Self, Type from types import TracebackType from .log import * from .base import Input, InputMode, Result from .FileContext import FileContext as Base class ExecContext(Base): class CallContext: def __init__( self, parent: ExecContext, title: str, cmd: list[str], cmd_input: Input, wd: str|None, log_prefix: str, throw: bool, verbose: bool ) -> None: self.__cmd = cmd self.__wd = wd self.__log_prefix = log_prefix self.__parent = parent self.__title = title self.__pretty_cmd: str|None = None self.__delim = title if title is not None else f'---- {parent.uri}: Running {self.pretty_cmd} -' delim_len = 120 self.__delim += '-' * max(0, delim_len - len(self.__delim)) # -- At the end of this dance, interactive needs to be either True # or False interactive: bool|None = None if not isinstance(cmd_input, InputMode): interactive = False self.__cmd_input = ( cmd_input if isinstance(cmd_input, bytes) else cmd_input.encode(sys.stdout.encoding or "utf-8") ) else: match cmd_input: case InputMode.Interactive: interactive = True case InputMode.NonInteractive: interactive = False case InputMode.OptInteractive: interactive = parent.interactive case InputMode.Auto: interactive = sys.stdin.isatty() if interactive is None: interactive = parent.interactive if interactive is None: interactive = sys.stdin.isatty() self.__cmd_input = None assert interactive in [ True, False ] self.__interactive = interactive self.__cmd_input = cmd_input if not isinstance(cmd_input, InputMode) else None self.__throw = throw self.__verbose = verbose if verbose is not None else parent.verbose_default def __enter__(self) -> CallContext: self.log_delim(start=True) return self def __exit__( self, exc_type: Type[BaseException]|None, exc_value: BaseException|None, traceback: TracebackType|None ) -> bool: self.log_delim(start=False) @property def log_prefix(self) -> str: return self.__log_prefix @property def interactive(self) -> bool: return self.__interactive @property def verbose(self) -> bool: return self.__verbose @property def cmd_input(self) -> bytes|None: return self.__cmd_input @property def throw(self) -> bool: return self.__throw @property def wd(self) -> str|None: return self.__wd @property def cmd(self) -> list[str]: return self.__cmd @property def pretty_cmd(self) -> str: if self.__pretty_cmd is None: from .util import pretty_cmd self.__pretty_cmd = pretty_cmd(self.__cmd, self.__wd) return self.__pretty_cmd def log(prio: int, *args, **kwargs) -> None: log(prio, self.__log_prefix, *args, **kwargs) def log_delim(self, start: bool) -> None: if not self.__verbose: return None if self.__interactive: # Don't log footer in interative mode if start: log(NOTICE, self.__delim) return delim = ',' + self.__delim + ' >' if start else '`' + self.__delim + ' <' log(NOTICE, delim) def check_exit_code(self, result: Result) -> None: if result.status == 0: return if (self.__throw or self.__verbose): msg = f'Command exited with status {result.status}: {self.pretty_cmd}' if result.stderr: msg += ': ' + result.decode().stderr.strip() if self.__throw: raise RuntimeError(msg) def exception(self, result: Result, e: Exception) -> Result: log(ERR, self.__log_prefix, f'Failed to run {self.pretty_cmd}') if self.__throw: raise e return result def __init__(self, uri: str, interactive: bool|None=None, verbose_default=False): super().__init__(uri=uri, interactive=interactive, verbose_default=verbose_default) @abc.abstractmethod async def _run(self, *args, **kwargs) -> Result: pass async def run( self, cmd: list[str], wd: str|None = None, throw: bool = True, verbose: bool|None = None, cmd_input: Input = InputMode.OptInteractive, env: dict[str, str]|None = None, title: str=None ) -> Result: """ Run a command asynchronously and return its output Args: cmd: Command and arguments wd: Optional working directory throw: Raise an exception on non-zero exit status if True verbose: Emit log output while the command runs cmd_input: - "InputMode.OptInteractive" -> Let --interactive govern how to handle interactivity (default) - "InputMode.Interactive" -> Inherit terminal stdin - "InputMode.Auto" -> Inherit terminal stdin if it is a TTY - "InputMode.NonInteractive" -> stdin from /dev/null - None -> Alias for InputMode.NonInteractive - otherwise -> Feed cmd_input to stdin env: The environment the command should be run in Returns: A Result instance In PTY mode stderr is always None because PTY merges stdout/stderr. """ # Note that in the calls to the wrapped method, cmd_input == None can # be returned by CallContext and is very much allowed assert cmd_input is not None ret = Result(None, None, 1) with self.CallContext(self, title=title, cmd=cmd, cmd_input=cmd_input, wd=wd, log_prefix='|', throw=throw, verbose=verbose) as cc: try: ret = await self._run( cmd=cc.cmd, wd=wd, verbose=cc.verbose, cmd_input=cc.cmd_input, env=env, interactive=cc.interactive, log_prefix=cc.log_prefix ) except Exception as e: return cc.exception(ret, e) cc.check_exit_code(ret) return ret @abc.abstractmethod async def _sudo(self, *args, **kwargs) -> Result: pass async def sudo( self, cmd: list[str], mod_env: dict[str, str]|None=None, opts: list[str]|None=None, wd: str|None = None, throw: bool = True, verbose: bool|None = None, cmd_input: Input = InputMode.OptInteractive, env: dict[str, str]|None = None, title: str=None, ) -> Result: # Note that in the calls to the wrapped method, cmd_input == None can # be returned by CallContext and is very much allowed assert cmd_input is not None ret = Result(None, None, 1) if opts is None: opts = {} if mod_env is None: mod_env = {} with self.CallContext(self, title=title, cmd=cmd, cmd_input=cmd_input, wd=wd, log_prefix='|', throw=throw, verbose=verbose) as cc: try: ret = await self._sudo( cmd=cc.cmd, mod_env=mod_env, opts=opts, wd=wd, verbose=cc.verbose, cmd_input=cc.cmd_input, env=env, interactive=cc.interactive, log_prefix=cc.log_prefix, ) except Exception as e: return cc.exception(ret, e) cc.check_exit_code(ret) return ret async def _get( self, path: str, wd: str|None, throw: bool, verbose: bool|None, title: str ) -> Result: ret = Result(None, None, 1) if wd is not None: path = wd + '/' + path with self.CallContext(self, title=title, cmd=['cat', path], cmd_input=InputMode.NonInteractive, wd=None, log_prefix='|', throw=throw, verbose=verbose) as cc: try: ret = await self._run( cmd=cc.cmd, wd=wd, verbose=cc.verbose, cmd_input=cc.cmd_input, env=None, interactive=cc.interactive, log_prefix=cc.log_prefix ) except Exception as e: return cc.exception(ret, e) cc.check_exit_code(ret) return ret async def _put( self, content: bytes, path: str, wd: str|None, throw: bool, verbose: bool|None, title: str, owner: str|None, group: str|None, mode: str|None, atomic: bool, ) -> Result: from .util import pretty_cmd async def __run(cmd: list[str], cmd_input: Input=InputMode.NonInteractive, **kwargs) -> Result: return await self.run(cmd, cmd_input=cmd_input, **kwargs) ret = Result(None, None, 1) try: if wd is not None: path = wd + '/' + path cmds: list[dict[str, str|list[str]|bool]] = [] out = (await __run(['mktemp', path + '.XXXXXX'])).stdout.decode().strip() if atomic else path cmds.append({'cmd': ['tee', out], 'cmd_input': content}) if owner is not None and group is not None: cmds.append({'cmd': ['chown', f'{owner}:{group}', out]}) elif owner is not None: cmds.append({'cmd': ['chown', owner, out]}) elif group is not None: cmds.append({'cmd': ['chgrp', group, out]}) if mode is not None: cmds.append({'cmd': ['chmod', mode, out]}) if atomic: cmds.append({'cmd': ['mv', out, path]}) for cmd in cmds: log(DEBUG, f'{self.log_name}: Running {pretty_cmd(cmd['cmd'], wd)}') ret = await __run(**cmd) return ret except: if throw: raise return cc.exception(ret, e) return ret