lib.util.get(): Add function

Add get(), which does pretty much what FileContext.get() does, but with auto-instantiating a FileContext instance. Input processing filters can be passed, too, all *args and **kwargs are passed unchanged to the FileContext's constructor.

Signed-off-by: Jan Lindemann <jan@janware.com>
This commit is contained in:
Jan Lindemann 2026-04-27 14:35:53 +02:00
commit 5bb7fe96db
Signed by: Jan Lindemann
GPG key ID: 3750640C9E25DD61

View file

@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Iterable
if TYPE_CHECKING:
from typing import Sequence
from .ExecContext import ExecContext
from .ProcFilter import ProcFilter, ProcPipeline
import os, sys, json
@ -101,6 +102,20 @@ async def run_sudo(cmd: list[str], *args, interactive: bool=True, ec: ExecContex
ec = Local(interactive=interactive)
return await ec.sudo(cmd, *args, **kwargs)
async def get(
uri: str|Uri,
*args,
ctx: FileContext|None=None,
content_filter: ProcFilter|list[ProcFilter]|ProcPipeline|None = None,
**kwargs
) -> Result:
uri = Uri.pimp(uri)
if ctx is None or uri.id != ctx.uri.id:
from .FileContext import FileContext
ctx = FileContext.create(uri)
from .ProcFilter import run as run_pipeline
return await run_pipeline(await ctx.get(uri.path, *args, **kwargs), content_filter)
async def copy(src_uri: str|Iterable[str], dst: str|FileContext, owner: str|None=None, group: str|None=None, mode: int|None=None, throw=True) -> Exception|str|list[str]:
if not isinstance(src_uri, str):
ret: list[str] = []