jw-pkg/src/python/jw/pkg/lib/ExecContext.py
Jan Lindemann 54aecff8e4 lib.ExecContext.run(), .sudo(): Rename env
The name of the env parameter to ExecContext.run() and .sudo() is not
descriptive enough for which environment is supposed to be modified
and how, so rename and split it up as follows:

  - .run(): env -> mod_env

  - .sudo(): env -> mod_env_sudo and mod_env_cmd

The parameters have the following meaning:

  - "mod_env*" means that the environment is modified, not replaced

  - "mod_env" and "mod_env_cmd" modify the environment "cmd" runs in

  - "mod_env_sudo" modifies the environment sudo runs in

Fix the fallout of the API change all over jw-pkg.

Signed-off-by: Jan Lindemann <jan@janware.com>
2026-04-19 14:36:50 +02:00

583 lines
19 KiB
Python

# -*- coding: utf-8 -*-
from __future__ import annotations
import abc, re, sys, errno
from enum import Enum, auto
from typing import NamedTuple, TYPE_CHECKING
from decimal import Decimal, ROUND_FLOOR
if TYPE_CHECKING:
from typing import Self, Type
from types import TracebackType
from .log import *
from .base import Input, InputMode, Result, StatResult
from .FileContext import FileContext as Base
_US = "\x1f" # unlikely to appear in numeric output
_BILLION = Decimal(1_000_000_000)
def _looks_like_option_error(stderr: str) -> bool:
s = stderr.lower()
return any(
needle in s
for needle in (
"unrecognized option",
"illegal option",
"unknown option",
"invalid option",
"option requires an argument",
)
)
def _raise_stat_error(path: str, stderr: str, returncode: int) -> None:
msg = (stderr or "").strip() or f"stat exited with status {returncode}"
lower = msg.lower()
if "no such file" in lower:
raise FileNotFoundError(errno.ENOENT, msg, path)
if "permission denied" in lower or "operation not permitted" in lower:
raise PermissionError(errno.EACCES, msg, path)
raise OSError(errno.EIO, msg, path)
def _parse_epoch(value: str) -> tuple[int, float, int]:
"""
Convert a decimal epoch timestamp string into:
(integer seconds for tuple slot, float seconds for attribute, ns for *_ns)
"""
dec = Decimal(value.strip())
sec = int(dec.to_integral_value(rounding=ROUND_FLOOR))
ns = int((dec * _BILLION).to_integral_value(rounding=ROUND_FLOOR))
return sec, float(dec), ns
def _build_stat_result(fields: list[str], mode_base: int) -> StatResult:
if len(fields) != 13:
raise ValueError(
f"unexpected stat output: expected 13 fields, got {len(fields)}: {fields!r}"
)
(
mode_s,
ino_s,
dev_s,
nlink_s,
uid_s,
gid_s,
size_s,
atime_s,
mtime_s,
ctime_s,
blksize_s,
blocks_s,
rdev_s,
) = fields
st_mode = int(mode_s, mode_base)
st_ino = int(ino_s)
st_dev = int(dev_s)
st_nlink = int(nlink_s)
st_uid = uid_s
st_gid = gid_s
st_size = int(size_s)
st_atime_i, st_atime_f, st_atime_ns = _parse_epoch(atime_s)
st_mtime_i, st_mtime_f, st_mtime_ns = _parse_epoch(mtime_s)
st_ctime_i, st_ctime_f, st_ctime_ns = _parse_epoch(ctime_s)
st_blksize = int(blksize_s)
st_blocks = int(blocks_s)
st_rdev = int(rdev_s)
return StatResult(
mode = st_mode,
owner = st_uid,
group = st_gid,
size = st_size,
atime = st_atime_i,
mtime = st_mtime_i,
ctime = st_ctime_i,
)
class ExecContext(Base):
class CallContext:
def __init__(
self,
parent: ExecContext,
title: str|None,
cmd: list[str],
cmd_input: Input,
mod_env: dict[str, str]|None,
wd: str|None,
log_prefix: str,
throw: bool,
verbose: bool
) -> None:
self.__cmd = cmd
self.__wd = wd
self.__log_prefix = log_prefix
self.__parent = parent
self.__title = title
self.__pretty_cmd: str|None = None
self.__delim = title if title is not None else f'---- {parent.uri}: Running {self.pretty_cmd} -'
delim_len = 120
self.__delim += '-' * max(0, delim_len - len(self.__delim))
self.__mod_env = {'LC_ALL': 'C'} if mod_env is None else mod_env
# -- At the end of this dance, interactive needs to be either True
# or False
interactive: bool|None = None
if not isinstance(cmd_input, InputMode):
interactive = False
self.__cmd_input = (
cmd_input if isinstance(cmd_input, bytes) else
cmd_input.encode(sys.stdout.encoding or "utf-8")
)
else:
match cmd_input:
case InputMode.Interactive:
interactive = True
case InputMode.NonInteractive:
interactive = False
case InputMode.OptInteractive:
interactive = parent.interactive
case InputMode.Auto:
interactive = sys.stdin.isatty()
if interactive is None:
interactive = parent.interactive
if interactive is None:
interactive = sys.stdin.isatty()
self.__cmd_input = None
assert interactive in [ True, False ]
self.__interactive = interactive
self.__cmd_input = cmd_input if not isinstance(cmd_input, InputMode) else None
self.__throw = throw
self.__verbose = verbose if verbose is not None else parent.verbose_default
def __enter__(self) -> CallContext:
self.log_delim(start=True)
return self
def __exit__(
self,
exc_type: Type[BaseException]|None,
exc_value: BaseException|None,
traceback: TracebackType|None
) -> bool:
self.log_delim(start=False)
@property
def log_prefix(self) -> str:
return self.__log_prefix
@property
def interactive(self) -> bool:
return self.__interactive
@property
def verbose(self) -> bool:
return self.__verbose
@property
def cmd_input(self) -> bytes|None:
return self.__cmd_input
@property
def mod_env(self) -> dict[str, str]:
return self.__mod_env
@property
def throw(self) -> bool:
return self.__throw
@property
def wd(self) -> str|None:
return self.__wd
@property
def cmd(self) -> list[str]:
return self.__cmd
@property
def pretty_cmd(self) -> str:
if self.__pretty_cmd is None:
from .util import pretty_cmd
self.__pretty_cmd = pretty_cmd(self.__cmd, self.__wd)
return self.__pretty_cmd
def log(prio: int, *args, **kwargs) -> None:
log(prio, self.__log_prefix, *args, **kwargs)
def log_delim(self, start: bool) -> None:
if not self.__verbose:
return None
if self.__interactive: # Don't log footer in interative mode
if start:
log(NOTICE, self.__delim)
return
delim = ',' + self.__delim + ' >' if start else '`' + self.__delim + ' <'
log(NOTICE, delim)
def check_exit_code(self, result: Result) -> None:
if result.status == 0:
return
if (self.__throw or self.__verbose):
msg = f'Command exited with status {result.status}: {self.pretty_cmd}'
if result.stderr:
msg += ': ' + result.decode().stderr.strip()
if self.__throw:
raise RuntimeError(msg)
def exception(self, result: Result, e: Exception) -> Result:
log(ERR, self.__log_prefix, f'Failed to run {self.pretty_cmd}')
if self.__throw:
raise e
return result
def __init__(self, uri: str, interactive: bool|None=None, verbose_default=False):
super().__init__(uri=uri, interactive=interactive, verbose_default=verbose_default)
@abc.abstractmethod
def _username(self) -> str:
pass
@property
def username(self) -> str:
return self._username()
async def run(
self,
cmd: list[str],
wd: str|None = None,
throw: bool = True,
verbose: bool|None = None,
cmd_input: Input = InputMode.OptInteractive,
mod_env: dict[str, str]|None = None,
title: str = None
) -> Result:
"""
Run a command asynchronously and return its output
Args:
cmd: Command and arguments
wd: Optional working directory
throw: Raise an exception on non-zero exit status if True
verbose: Emit log output while the command runs
cmd_input:
- "InputMode.OptInteractive" -> Let --interactive govern how to handle interactivity (default)
- "InputMode.Interactive" -> Inherit terminal stdin
- "InputMode.Auto" -> Inherit terminal stdin if it is a TTY
- "InputMode.NonInteractive" -> stdin from /dev/null
- None -> Alias for InputMode.NonInteractive
- otherwise -> Feed cmd_input to stdin
mod_env: Change set to command's environment. key: val adds a variable, key: None removes it
Returns:
A Result instance
In PTY mode stderr is always None because PTY merges stdout/stderr.
"""
# Note that in the calls to the wrapped method, cmd_input == None can
# be returned by CallContext and is very much allowed
assert cmd_input is not None
ret = Result(None, None, 1)
with self.CallContext(self, title=title, cmd=cmd, cmd_input=cmd_input, mod_env=mod_env, wd=wd,
log_prefix='|', throw=throw, verbose=verbose) as cc:
try:
ret = await self._run(
cmd=cc.cmd,
wd=wd,
verbose=cc.verbose,
cmd_input=cc.cmd_input,
mod_env=cc.mod_env,
interactive=cc.interactive,
log_prefix=cc.log_prefix
)
except Exception as e:
return cc.exception(ret, e)
cc.check_exit_code(ret)
return ret
async def _sudo(
self,
cmd: list[str],
opts: list[str]|None,
wd: str|None,
mod_env_sudo: dict[str, str]|None,
mod_env_cmd: dict[str, str]|None,
cmd_input: bytes|None,
verbose: bool,
interactive: bool,
log_prefix: str,
) -> Result:
def __check_equal_values(d1: dict[str, str], d2: dict[str, str]) -> None:
for key, val in d1.items():
if not d2.get(key, None) in [None, val]:
raise ValueError(f'Outer and inner environments differ at least for {key}: "{val}" != "{d2.get(key)}"')
fw_cmd: list[str] = []
fw_env: dict[str, str] = {}
if opts is None:
opts = {}
if mod_env_cmd:
fw_env.update(mod_env_cmd)
if self.username != 'root':
if mod_env_sudo and mod_env_cmd:
__check_equal_values(mod_env_sudo, mod_env_cmd)
__check_equal_values(mod_env_cmd, mod_env_sudo)
fw_cmd.append('/usr/bin/sudo')
if mod_env_sudo:
fw_env.update(mod_env_sudo)
if mod_env_cmd:
fw_cmd.append('--preserve-env=' + ','.join(mod_env_cmd.keys()))
if wd is not None:
opts.extend('-D', wd)
wd = None
fw_cmd.extend(opts)
mod_env = fw_env if fw_env else None
fw_cmd.extend(cmd)
return await self._run(
fw_cmd,
wd = wd,
mod_env = mod_env,
verbose = verbose,
cmd_input = cmd_input,
interactive = interactive,
log_prefix = log_prefix
)
async def sudo(
self,
cmd: list[str],
opts: list[str]|None = None,
wd: str|None = None,
mod_env_sudo: dict[str, str]|None = None,
mod_env_cmd: dict[str, str]|None = None,
throw: bool = True,
verbose: bool|None = None,
cmd_input: Input = InputMode.OptInteractive,
title: str = None
) -> Result:
# Note that in the calls to the wrapped method, cmd_input == None can
# be returned by CallContext and is very much allowed
assert cmd_input is not None
ret = Result(None, None, 1)
with self.CallContext(self, title=title, cmd=cmd, cmd_input=cmd_input,
mod_env=mod_env_cmd, wd=wd,
log_prefix='|', throw=throw, verbose=verbose) as cc:
try:
ret = await self._sudo(
cmd = cc.cmd,
opts = opts,
wd = cc.wd,
mod_env_sudo = mod_env_sudo,
mod_env_cmd = cc.mod_env,
verbose = cc.verbose,
cmd_input = cc.cmd_input,
interactive = cc.interactive,
log_prefix = cc.log_prefix,
)
except Exception as e:
return cc.exception(ret, e)
cc.check_exit_code(ret)
return ret
return await self._sudo(
cmd,
opts = opts,
wd = wd,
mod_env_sudo = mod_env_sudo,
mod_env_cmd = mod_env_cmd,
throw = throw,
verbose = verbose,
cmd_input = cmd_input,
title = title,
)
async def _get(
self,
path: str,
wd: str|None,
throw: bool,
verbose: bool|None,
title: str
) -> Result:
ret = Result(None, None, 1)
if wd is not None:
path = wd + '/' + path
with self.CallContext(self, title=title, cmd=['cat', path],
cmd_input=InputMode.NonInteractive, wd=None, mod_env=None,
log_prefix='|', throw=throw, verbose=verbose) as cc:
try:
ret = await self._run(
cmd=cc.cmd,
wd=wd,
verbose=cc.verbose,
cmd_input=cc.cmd_input,
mod_env=cc.mod_env,
interactive=cc.interactive,
log_prefix=cc.log_prefix
)
except Exception as e:
return cc.exception(ret, e)
if ret.status != 0 and ret.stderr.decode().find('No such file') != -1:
raise FileNotFoundError(ret.stderr)
cc.check_exit_code(ret)
return ret
async def _put(
self,
path: str,
content: bytes,
wd: str|None,
throw: bool,
verbose: bool|None,
title: str,
owner: str|None,
group: str|None,
mode: str|None,
atomic: bool,
) -> Result:
from .util import pretty_cmd
async def __run(cmd: list[str], cmd_input: Input=InputMode.NonInteractive, **kwargs) -> Result:
return await self.run(cmd, cmd_input=cmd_input, **kwargs)
ret = Result(None, None, 1)
try:
if wd is not None:
path = wd + '/' + path
cmds: list[dict[str, str|list[str]|bool]] = []
out = (await __run(['mktemp', path + '.XXXXXX'])).stdout.decode().strip() if atomic else path
cmds.append({'cmd': ['tee', out], 'cmd_input': content})
if owner is not None and group is not None:
cmds.append({'cmd': ['chown', f'{owner}:{group}', out]})
elif owner is not None:
cmds.append({'cmd': ['chown', owner, out]})
elif group is not None:
cmds.append({'cmd': ['chgrp', group, out]})
if mode is not None:
cmds.append({'cmd': ['chmod', mode, out]})
if atomic:
cmds.append({'cmd': ['mv', out, path]})
for cmd in cmds:
log(DEBUG, f'{self.log_name}: Running {pretty_cmd(cmd['cmd'], wd)}')
ret = await __run(**cmd)
return ret
except:
if throw:
raise
return cc.exception(ret, e)
return ret
async def _unlink(self, path: str) -> None:
cmd = ['rm', '-f', path]
await self.run(cmd, cmd_input=InputMode.NonInteractive)
async def _erase(self, path: str) -> None:
cmd = ['rm', '-rf', path]
await self.run(cmd, cmd_input=InputMode.NonInteractive)
async def _rename(self, src: str, dst: str) -> None:
cmd = ['mv', src, dst]
await self.run(cmd, cmd_input=InputMode.NonInteractive)
async def _mktemp(self, tmpl: str, directory: bool) -> str:
cmd = ['mktemp']
if directory:
cmd.append('-d')
cmd.append(tmpl)
result = await self.run(cmd, cmd_input=InputMode.NonInteractive)
return result.stdout.strip().decode()
async def _stat(self, path: str, follow_symlinks: bool) -> StatResult:
async def __stat(opts: list[str]) -> str:
mod_env = {
'LC_ALL': 'C'
}
cmd = ['stat']
if follow_symlinks:
cmd.append('-L')
cmd.extend(opts)
cmd.append(path)
return (await self.run(cmd, mod_env=mod_env, throw=False,
cmd_input=InputMode.NonInteractive)).decode()
# GNU coreutils stat
gnu_format = _US.join([
"%f", # st_mode in hex
"%i", # st_ino
"%d", # st_dev
"%h", # st_nlink
"%U", # st_uid
"%G", # st_gid
"%s", # st_size
"%.9X", # st_atime
"%.9Y", # st_mtime
"%.9Z", # st_ctime
"%o", # st_blksize hint
"%b", # st_blocks
"%r", # st_rdev
])
result = await __stat(['--printf', gnu_format])
if result.status == 0:
return _build_stat_result(result.stdout.split(_US), mode_base=16)
if not _looks_like_option_error(result.stderr.decode()):
# log(DEBUG, f'GNU stat attempt failed on "{path}" ({str(e)})')
_raise_stat_error(path, result.stderr, result.status)
# BSD / macOS / OpenBSD / NetBSD stat
bsd_format = _US.join([
"%p", # st_mode in octal
"%i", # st_ino
"%d", # st_dev
"%l", # st_nlink
"%U", # st_uid
"%G", # st_gid
"%z", # st_size
"%.9Fa", # st_atime
"%.9Fm", # st_mtime
"%.9Fc", # st_ctime
"%k", # st_blksize
"%b", # st_blocks
"%r", # st_rdev
])
result = await __stat(['-n', '-f', bst_format])
if proc.returncode == 0:
return _build_stat_result(proc.stdout.rstrip('\n').split(_US), mode_base=8)
_raise_stat_error(path, result.stderr, result.status)
async def _chown(self, path: str, owner: str|None, group: str|None) -> None:
assert owner is not None or group is not None
if group is None:
ownership = owner
elif owner is None:
ownership = ':' + group
else:
ownership = owner + ':' + group
await self.run(['chown', ownership, path], cmd_input=InputMode.NonInteractive)
async def _chmod(self, path: str, mode: int) -> None:
await self.run(['chmod', oct(mode), path], cmd_input=InputMode.NonInteractive)