jw-pkg/src/python/jw/pkg/lib/ExecContext.py

516 lines
17 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
from __future__ import annotations
import abc, re, sys, errno
from enum import Enum, auto
from typing import NamedTuple, TYPE_CHECKING
from decimal import Decimal, ROUND_FLOOR
if TYPE_CHECKING:
from typing import Self, Type
from types import TracebackType
from .log import *
from .base import Input, InputMode, Result, StatResult
from .FileContext import FileContext as Base
_US = "\x1f" # unlikely to appear in numeric output
_BILLION = Decimal(1_000_000_000)
def _looks_like_option_error(stderr: str) -> bool:
s = stderr.lower()
return any(
needle in s
for needle in (
"unrecognized option",
"illegal option",
"unknown option",
"invalid option",
"option requires an argument",
)
)
def _raise_stat_error(path: str, stderr: str, returncode: int) -> None:
msg = (stderr or "").strip() or f"stat exited with status {returncode}"
lower = msg.lower()
if "no such file" in lower:
raise FileNotFoundError(errno.ENOENT, msg, path)
if "permission denied" in lower or "operation not permitted" in lower:
raise PermissionError(errno.EACCES, msg, path)
raise OSError(errno.EIO, msg, path)
def _parse_epoch(value: str) -> tuple[int, float, int]:
"""
Convert a decimal epoch timestamp string into:
(integer seconds for tuple slot, float seconds for attribute, ns for *_ns)
"""
dec = Decimal(value.strip())
sec = int(dec.to_integral_value(rounding=ROUND_FLOOR))
ns = int((dec * _BILLION).to_integral_value(rounding=ROUND_FLOOR))
return sec, float(dec), ns
def _build_stat_result(fields: list[str], mode_base: int) -> StatResult:
if len(fields) != 13:
raise ValueError(
f"unexpected stat output: expected 13 fields, got {len(fields)}: {fields!r}"
)
(
mode_s,
ino_s,
dev_s,
nlink_s,
uid_s,
gid_s,
size_s,
atime_s,
mtime_s,
ctime_s,
blksize_s,
blocks_s,
rdev_s,
) = fields
st_mode = int(mode_s, mode_base)
st_ino = int(ino_s)
st_dev = int(dev_s)
st_nlink = int(nlink_s)
st_uid = uid_s
st_gid = gid_s
st_size = int(size_s)
st_atime_i, st_atime_f, st_atime_ns = _parse_epoch(atime_s)
st_mtime_i, st_mtime_f, st_mtime_ns = _parse_epoch(mtime_s)
st_ctime_i, st_ctime_f, st_ctime_ns = _parse_epoch(ctime_s)
st_blksize = int(blksize_s)
st_blocks = int(blocks_s)
st_rdev = int(rdev_s)
return StatResult(
mode = st_mode,
owner = st_uid,
group = st_gid,
size = st_size,
atime = st_atime_i,
mtime = st_mtime_i,
ctime = st_ctime_i,
)
class ExecContext(Base):
class CallContext:
def __init__(
self,
parent: ExecContext,
title: str|None,
cmd: list[str],
cmd_input: Input,
env: dict[str, str]|None,
wd: str|None,
log_prefix: str,
throw: bool,
verbose: bool
) -> None:
self.__cmd = cmd
self.__wd = wd
self.__log_prefix = log_prefix
self.__parent = parent
self.__title = title
self.__pretty_cmd: str|None = None
self.__delim = title if title is not None else f'---- {parent.uri}: Running {self.pretty_cmd} -'
delim_len = 120
self.__delim += '-' * max(0, delim_len - len(self.__delim))
self.__env = {'LC_ALL': 'C'} if env is None else env
# -- At the end of this dance, interactive needs to be either True
# or False
interactive: bool|None = None
if not isinstance(cmd_input, InputMode):
interactive = False
self.__cmd_input = (
cmd_input if isinstance(cmd_input, bytes) else
cmd_input.encode(sys.stdout.encoding or "utf-8")
)
else:
match cmd_input:
case InputMode.Interactive:
interactive = True
case InputMode.NonInteractive:
interactive = False
case InputMode.OptInteractive:
interactive = parent.interactive
case InputMode.Auto:
interactive = sys.stdin.isatty()
if interactive is None:
interactive = parent.interactive
if interactive is None:
interactive = sys.stdin.isatty()
self.__cmd_input = None
assert interactive in [ True, False ]
self.__interactive = interactive
self.__cmd_input = cmd_input if not isinstance(cmd_input, InputMode) else None
self.__throw = throw
self.__verbose = verbose if verbose is not None else parent.verbose_default
def __enter__(self) -> CallContext:
self.log_delim(start=True)
return self
def __exit__(
self,
exc_type: Type[BaseException]|None,
exc_value: BaseException|None,
traceback: TracebackType|None
) -> bool:
self.log_delim(start=False)
@property
def log_prefix(self) -> str:
return self.__log_prefix
@property
def interactive(self) -> bool:
return self.__interactive
@property
def verbose(self) -> bool:
return self.__verbose
@property
def cmd_input(self) -> bytes|None:
return self.__cmd_input
@property
def env(self) -> dict[str, str]:
return self.__env
@property
def throw(self) -> bool:
return self.__throw
@property
def wd(self) -> str|None:
return self.__wd
@property
def cmd(self) -> list[str]:
return self.__cmd
@property
def pretty_cmd(self) -> str:
if self.__pretty_cmd is None:
from .util import pretty_cmd
self.__pretty_cmd = pretty_cmd(self.__cmd, self.__wd)
return self.__pretty_cmd
def log(prio: int, *args, **kwargs) -> None:
log(prio, self.__log_prefix, *args, **kwargs)
def log_delim(self, start: bool) -> None:
if not self.__verbose:
return None
if self.__interactive: # Don't log footer in interative mode
if start:
log(NOTICE, self.__delim)
return
delim = ',' + self.__delim + ' >' if start else '`' + self.__delim + ' <'
log(NOTICE, delim)
def check_exit_code(self, result: Result) -> None:
if result.status == 0:
return
if (self.__throw or self.__verbose):
msg = f'Command exited with status {result.status}: {self.pretty_cmd}'
if result.stderr:
msg += ': ' + result.decode().stderr.strip()
if self.__throw:
raise RuntimeError(msg)
def exception(self, result: Result, e: Exception) -> Result:
log(ERR, self.__log_prefix, f'Failed to run {self.pretty_cmd}')
if self.__throw:
raise e
return result
def __init__(self, uri: str, interactive: bool|None=None, verbose_default=False):
super().__init__(uri=uri, interactive=interactive, verbose_default=verbose_default)
@abc.abstractmethod
def _username(self) -> str:
pass
@property
def username(self) -> str:
return self._username()
async def run(
self,
cmd: list[str],
wd: str|None = None,
throw: bool = True,
verbose: bool|None = None,
cmd_input: Input = InputMode.OptInteractive,
env: dict[str, str]|None = None,
title: str=None
) -> Result:
"""
Run a command asynchronously and return its output
Args:
cmd: Command and arguments
wd: Optional working directory
throw: Raise an exception on non-zero exit status if True
verbose: Emit log output while the command runs
cmd_input:
- "InputMode.OptInteractive" -> Let --interactive govern how to handle interactivity (default)
- "InputMode.Interactive" -> Inherit terminal stdin
- "InputMode.Auto" -> Inherit terminal stdin if it is a TTY
- "InputMode.NonInteractive" -> stdin from /dev/null
- None -> Alias for InputMode.NonInteractive
- otherwise -> Feed cmd_input to stdin
env: The environment the command should be run in
Returns:
A Result instance
In PTY mode stderr is always None because PTY merges stdout/stderr.
"""
# Note that in the calls to the wrapped method, cmd_input == None can
# be returned by CallContext and is very much allowed
assert cmd_input is not None
ret = Result(None, None, 1)
with self.CallContext(self, title=title, cmd=cmd, cmd_input=cmd_input, env=env, wd=wd,
log_prefix='|', throw=throw, verbose=verbose) as cc:
try:
ret = await self._run(
cmd=cc.cmd,
wd=wd,
verbose=cc.verbose,
cmd_input=cc.cmd_input,
env=cc.env,
interactive=cc.interactive,
log_prefix=cc.log_prefix
)
except Exception as e:
return cc.exception(ret, e)
cc.check_exit_code(ret)
return ret
@abc.abstractmethod
async def _sudo(self, *args, **kwargs) -> Result:
pass
async def sudo(
self,
cmd: list[str],
mod_env: dict[str, str]|None=None,
opts: list[str]|None=None,
wd: str|None = None,
throw: bool = True,
verbose: bool|None = None,
cmd_input: Input = InputMode.OptInteractive,
env: dict[str, str]|None = None,
title: str=None,
) -> Result:
# Note that in the calls to the wrapped method, cmd_input == None can
# be returned by CallContext and is very much allowed
assert cmd_input is not None
ret = Result(None, None, 1)
if opts is None:
opts = {}
with self.CallContext(self, title=title, cmd=cmd, cmd_input=cmd_input, env=env, wd=wd,
log_prefix='|', throw=throw, verbose=verbose) as cc:
try:
ret = await self._sudo(
cmd=cc.cmd,
mod_env=mod_env,
opts=opts,
wd=wd,
verbose=cc.verbose,
cmd_input=cc.cmd_input,
env=cc.env,
interactive=cc.interactive,
log_prefix=cc.log_prefix,
)
except Exception as e:
return cc.exception(ret, e)
cc.check_exit_code(ret)
return ret
async def _get(
self,
path: str,
wd: str|None,
throw: bool,
verbose: bool|None,
title: str
) -> Result:
ret = Result(None, None, 1)
if wd is not None:
path = wd + '/' + path
with self.CallContext(self, title=title, cmd=['cat', path],
cmd_input=InputMode.NonInteractive, wd=None, env=None,
log_prefix='|', throw=throw, verbose=verbose) as cc:
try:
ret = await self._run(
cmd=cc.cmd,
wd=wd,
verbose=cc.verbose,
cmd_input=cc.cmd_input,
env=cc.env,
interactive=cc.interactive,
log_prefix=cc.log_prefix
)
except Exception as e:
return cc.exception(ret, e)
if ret.status != 0 and ret.stderr.decode().find('No such file') != -1:
raise FileNotFoundError(ret.stderr)
cc.check_exit_code(ret)
return ret
async def _put(
self,
path: str,
content: bytes,
wd: str|None,
throw: bool,
verbose: bool|None,
title: str,
owner: str|None,
group: str|None,
mode: str|None,
atomic: bool,
) -> Result:
from .util import pretty_cmd
async def __run(cmd: list[str], cmd_input: Input=InputMode.NonInteractive, **kwargs) -> Result:
return await self.run(cmd, cmd_input=cmd_input, **kwargs)
ret = Result(None, None, 1)
try:
if wd is not None:
path = wd + '/' + path
cmds: list[dict[str, str|list[str]|bool]] = []
out = (await __run(['mktemp', path + '.XXXXXX'])).stdout.decode().strip() if atomic else path
cmds.append({'cmd': ['tee', out], 'cmd_input': content})
if owner is not None and group is not None:
cmds.append({'cmd': ['chown', f'{owner}:{group}', out]})
elif owner is not None:
cmds.append({'cmd': ['chown', owner, out]})
elif group is not None:
cmds.append({'cmd': ['chgrp', group, out]})
if mode is not None:
cmds.append({'cmd': ['chmod', mode, out]})
if atomic:
cmds.append({'cmd': ['mv', out, path]})
for cmd in cmds:
log(DEBUG, f'{self.log_name}: Running {pretty_cmd(cmd['cmd'], wd)}')
ret = await __run(**cmd)
return ret
except:
if throw:
raise
return cc.exception(ret, e)
return ret
async def _unlink(self, path: str) -> None:
cmd = ['rm', '-f', path]
await self.run(cmd, cmd_input=InputMode.NonInteractive)
async def _erase(self, path: str) -> None:
cmd = ['rm', '-rf', path]
await self.run(cmd, cmd_input=InputMode.NonInteractive)
async def _rename(self, src: str, dst: str) -> None:
cmd = ['mv', src, dst]
await self.run(cmd, cmd_input=InputMode.NonInteractive)
async def _mktemp(self, tmpl: str, directory: bool) -> str:
cmd = ['mktemp']
if directory:
cmd.append('-d')
cmd.append(tmpl)
result = await self.run(cmd, cmd_input=InputMode.NonInteractive)
return result.stdout.strip().decode()
async def _stat(self, path: str, follow_symlinks: bool) -> StatResult:
async def __stat(opts: list[str]) -> str:
env = {
'LC_ALL': 'C'
}
cmd = ['stat']
if follow_symlinks:
cmd.append('-L')
cmd.extend(opts)
cmd.append(path)
return (await self.run(cmd, env=env, throw=False,
cmd_input=InputMode.NonInteractive)).decode()
# GNU coreutils stat
gnu_format = _US.join([
"%f", # st_mode in hex
"%i", # st_ino
"%d", # st_dev
"%h", # st_nlink
"%U", # st_uid
"%G", # st_gid
"%s", # st_size
"%.9X", # st_atime
"%.9Y", # st_mtime
"%.9Z", # st_ctime
"%o", # st_blksize hint
"%b", # st_blocks
"%r", # st_rdev
])
result = await __stat(['--printf', gnu_format])
if result.status == 0:
return _build_stat_result(result.stdout.split(_US), mode_base=16)
if not _looks_like_option_error(result.stderr.decode()):
# log(DEBUG, f'GNU stat attempt failed on "{path}" ({str(e)})')
_raise_stat_error(path, result.stderr, result.status)
# BSD / macOS / OpenBSD / NetBSD stat
bsd_format = _US.join([
"%p", # st_mode in octal
"%i", # st_ino
"%d", # st_dev
"%l", # st_nlink
"%U", # st_uid
"%G", # st_gid
"%z", # st_size
"%.9Fa", # st_atime
"%.9Fm", # st_mtime
"%.9Fc", # st_ctime
"%k", # st_blksize
"%b", # st_blocks
"%r", # st_rdev
])
result = await __stat(['-n', '-f', bst_format])
if proc.returncode == 0:
return _build_stat_result(proc.stdout.rstrip('\n').split(_US), mode_base=8)
_raise_stat_error(path, result.stderr, result.status)
async def _chown(self, path: str, owner: str|None, group: str|None) -> None:
assert owner is not None or group is not None
if group is None:
ownership = owner
elif owner is None:
ownership = ':' + group
else:
ownership = owner + ':' + group
await self.run(['chown', ownership, path], cmd_input=InputMode.NonInteractive)
async def _chmod(self, path: str, mode: int) -> None:
await self.run(['chmod', oct(mode), path], cmd_input=InputMode.NonInteractive)