mirror of
ssh://git.janware.com/janware/proj/jw-pkg
synced 2026-04-25 17:45:55 +02:00
lib.FileContext: Add file methods
Add the following methods, meant to do the obvious: unlink(self, path: str) -> None erase(self, path: str) -> None rename(self, src: str, dst: str) -> None mktemp(self, tmpl: str, directory: bool=False) -> None chown(self, path: str, owner: str|None=None, group: str|None=None) -> None chmod(self, path: str, mode: int) -> None stat(self, path: str, follow_symlinks: bool=True) -> StatResult file_exists(self, path: str) -> bool is_dir(self, path: str) -> bool All methods are async and call their protected counterpart, which is designed to be overridden. If possible, default implementations do something meaningful, if not, they just raise plain NotImplementedError. Signed-off-by: Jan Lindemann <jan@janware.com>
This commit is contained in:
parent
bc3ed1737f
commit
3cf5b2264e
5 changed files with 295 additions and 10 deletions
|
|
@ -2,18 +2,102 @@
|
|||
|
||||
from __future__ import annotations
|
||||
|
||||
import abc, re, sys
|
||||
import abc, re, sys, errno
|
||||
from enum import Enum, auto
|
||||
from typing import NamedTuple, TYPE_CHECKING
|
||||
from decimal import Decimal, ROUND_FLOOR
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Self, Type
|
||||
from types import TracebackType
|
||||
|
||||
from .log import *
|
||||
from .base import Input, InputMode, Result
|
||||
from .base import Input, InputMode, Result, StatResult
|
||||
from .FileContext import FileContext as Base
|
||||
|
||||
_US = "\x1f" # unlikely to appear in numeric output
|
||||
_BILLION = Decimal(1_000_000_000)
|
||||
|
||||
def _looks_like_option_error(stderr: str) -> bool:
|
||||
s = stderr.lower()
|
||||
return any(
|
||||
needle in s
|
||||
for needle in (
|
||||
"unrecognized option",
|
||||
"illegal option",
|
||||
"unknown option",
|
||||
"invalid option",
|
||||
"option requires an argument",
|
||||
)
|
||||
)
|
||||
|
||||
def _raise_stat_error(path: str, stderr: str, returncode: int) -> None:
|
||||
msg = (stderr or "").strip() or f"stat exited with status {returncode}"
|
||||
lower = msg.lower()
|
||||
if "no such file" in lower:
|
||||
raise FileNotFoundError(errno.ENOENT, msg, path)
|
||||
if "permission denied" in lower or "operation not permitted" in lower:
|
||||
raise PermissionError(errno.EACCES, msg, path)
|
||||
raise OSError(errno.EIO, msg, path)
|
||||
|
||||
def _parse_epoch(value: str) -> tuple[int, float, int]:
|
||||
"""
|
||||
Convert a decimal epoch timestamp string into:
|
||||
(integer seconds for tuple slot, float seconds for attribute, ns for *_ns)
|
||||
"""
|
||||
dec = Decimal(value.strip())
|
||||
sec = int(dec.to_integral_value(rounding=ROUND_FLOOR))
|
||||
ns = int((dec * _BILLION).to_integral_value(rounding=ROUND_FLOOR))
|
||||
return sec, float(dec), ns
|
||||
|
||||
def _build_stat_result(fields: list[str], mode_base: int) -> StatResult:
|
||||
if len(fields) != 13:
|
||||
raise ValueError(
|
||||
f"unexpected stat output: expected 13 fields, got {len(fields)}: {fields!r}"
|
||||
)
|
||||
|
||||
(
|
||||
mode_s,
|
||||
ino_s,
|
||||
dev_s,
|
||||
nlink_s,
|
||||
uid_s,
|
||||
gid_s,
|
||||
size_s,
|
||||
atime_s,
|
||||
mtime_s,
|
||||
ctime_s,
|
||||
blksize_s,
|
||||
blocks_s,
|
||||
rdev_s,
|
||||
) = fields
|
||||
|
||||
st_mode = int(mode_s, mode_base)
|
||||
st_ino = int(ino_s)
|
||||
st_dev = int(dev_s)
|
||||
st_nlink = int(nlink_s)
|
||||
st_uid = uid_s
|
||||
st_gid = gid_s
|
||||
st_size = int(size_s)
|
||||
|
||||
st_atime_i, st_atime_f, st_atime_ns = _parse_epoch(atime_s)
|
||||
st_mtime_i, st_mtime_f, st_mtime_ns = _parse_epoch(mtime_s)
|
||||
st_ctime_i, st_ctime_f, st_ctime_ns = _parse_epoch(ctime_s)
|
||||
|
||||
st_blksize = int(blksize_s)
|
||||
st_blocks = int(blocks_s)
|
||||
st_rdev = int(rdev_s)
|
||||
|
||||
return StatResult(
|
||||
mode = st_mode,
|
||||
owner = st_uid,
|
||||
group = st_gid,
|
||||
size = st_size,
|
||||
atime = st_atime_i,
|
||||
mtime = st_mtime_i,
|
||||
ctime = st_ctime_i,
|
||||
)
|
||||
|
||||
class ExecContext(Base):
|
||||
|
||||
class CallContext:
|
||||
|
|
@ -332,3 +416,97 @@ class ExecContext(Base):
|
|||
raise
|
||||
return cc.exception(ret, e)
|
||||
return ret
|
||||
|
||||
async def _unlink(self, path: str) -> None:
|
||||
cmd = ['rm', '-f', path]
|
||||
await self.run(cmd, cmd_input=InputMode.NonInteractive)
|
||||
|
||||
async def _erase(self, path: str) -> None:
|
||||
cmd = ['rm', '-rf', path]
|
||||
await self.run(cmd, cmd_input=InputMode.NonInteractive)
|
||||
|
||||
async def _rename(self, src: str, dst: str) -> None:
|
||||
cmd = ['mv', src, dst]
|
||||
await self.run(cmd, cmd_input=InputMode.NonInteractive)
|
||||
|
||||
async def _mktemp(self, tmpl: str, directory: bool) -> str:
|
||||
cmd = ['mktemp']
|
||||
if directory:
|
||||
cmd.append('-d')
|
||||
cmd.append(tmpl)
|
||||
result = await self.run(cmd, cmd_input=InputMode.NonInteractive)
|
||||
return result.stdout.strip().decode()
|
||||
|
||||
async def _stat(self, path: str, follow_symlinks: bool) -> StatResult:
|
||||
|
||||
async def __stat(opts: list[str]) -> str:
|
||||
env = {
|
||||
'LC_ALL': 'C'
|
||||
}
|
||||
cmd = ['stat']
|
||||
if follow_symlinks:
|
||||
cmd.append('-L')
|
||||
cmd.extend(opts)
|
||||
cmd.append(path)
|
||||
return (await self.run(cmd, env=env, throw=False,
|
||||
cmd_input=InputMode.NonInteractive)).decode()
|
||||
|
||||
# GNU coreutils stat
|
||||
gnu_format = _US.join([
|
||||
"%f", # st_mode in hex
|
||||
"%i", # st_ino
|
||||
"%d", # st_dev
|
||||
"%h", # st_nlink
|
||||
"%U", # st_uid
|
||||
"%G", # st_gid
|
||||
"%s", # st_size
|
||||
"%.9X", # st_atime
|
||||
"%.9Y", # st_mtime
|
||||
"%.9Z", # st_ctime
|
||||
"%o", # st_blksize hint
|
||||
"%b", # st_blocks
|
||||
"%r", # st_rdev
|
||||
])
|
||||
|
||||
result = await __stat(['--printf', gnu_format])
|
||||
if result.status == 0:
|
||||
return _build_stat_result(result.stdout.split(_US), mode_base=16)
|
||||
|
||||
if not _looks_like_option_error(result.stderr.decode()):
|
||||
# log(DEBUG, f'GNU stat attempt failed on "{path}" ({str(e)})')
|
||||
_raise_stat_error(path, result.stderr, result.status)
|
||||
|
||||
# BSD / macOS / OpenBSD / NetBSD stat
|
||||
bsd_format = _US.join([
|
||||
"%p", # st_mode in octal
|
||||
"%i", # st_ino
|
||||
"%d", # st_dev
|
||||
"%l", # st_nlink
|
||||
"%U", # st_uid
|
||||
"%G", # st_gid
|
||||
"%z", # st_size
|
||||
"%.9Fa", # st_atime
|
||||
"%.9Fm", # st_mtime
|
||||
"%.9Fc", # st_ctime
|
||||
"%k", # st_blksize
|
||||
"%b", # st_blocks
|
||||
"%r", # st_rdev
|
||||
])
|
||||
|
||||
result = await __stat(['-n', '-f', bst_format])
|
||||
if proc.returncode == 0:
|
||||
return _build_stat_result(proc.stdout.rstrip('\n').split(_US), mode_base=8)
|
||||
_raise_stat_error(path, result.stderr, result.status)
|
||||
|
||||
async def _chown(self, path: str, owner: str|None, group: str|None) -> None:
|
||||
assert owner is not None or group is not None
|
||||
if group is None:
|
||||
ownership = owner
|
||||
elif owner is None:
|
||||
ownership = ':' + group
|
||||
else:
|
||||
ownership = owner + ':' + group
|
||||
await self.run(['chown', ownership, path], cmd_input=InputMode.NonInteractive)
|
||||
|
||||
async def _chmod(self, path: str, mode: int) -> None:
|
||||
await self.run(['chmod', oct(mode), path], cmd_input=InputMode.NonInteractive)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue