cmds.secrets.lib.DistroContext.install(): Full archive

Make "secrets install" digest archives with a more elaborate layout. It selects secrets from archives containing:

default/path/to/secret.jw-secret com/janware/grautvornix/path/to/secret.jw-secret

Signed-off-by: Jan Lindemann <jan@janware.com>
This commit is contained in:
Jan Lindemann 2026-04-27 17:15:16 +02:00
commit 7dfe733865
Signed by: Jan Lindemann
GPG key ID: 3750640C9E25DD61

View file

@ -75,18 +75,53 @@ class DistroContext(FilesContext):
if missing > 0:
log(WARNING, f'{missing} missing secrets found. You might want to add them and run sudo {app.cmdline} again')
async def install(self, src_uri: str, pkg_names: Iterable[str], only_missing: bool=False) -> None:
if only_missing:
raise NotImplementedError('--only-missing is not yet implemented')
secret_paths = await self.list_secret_paths(pkg_names)
async def install(self, src_uri: str, pkg_names: Iterable[str], only_missing: bool=False, verbose: bool=False) -> None:
async def _read_secret_tar_blob(src_uri: str):
ec = self.ctx
from ....lib.ec.Local import Local
from ....lib.FileContext import FileContext
from ....lib.util import get
if not isinstance(ec, Local):
ec = Local() # Security: Use a local exec context for decrypting and filtering secrets
async with TarIo.create(src=src_uri, dst=self.ctx) as tio:
tio.src.add_proc_filter(FileContext.Direction.In, ProcFilterGpg(ec=ec))
extracted_paths = await tio.extract(path_filter=secret_paths)
return (await get(src_uri, content_filter=ProcFilterGpg(ec=ec))).stdout
def _matches_host_prefix(path: str) -> bool:
return re.match(r'^' + root_in_tar, path)
def _crop_host_prefix(path: str) -> bool:
return re.sub(r'^' + root_in_tar, '', path)
def _crop_default_prefix(path: str) -> bool:
return re.sub(r'^' + default, '', path)
def _matches_default_prefix(path: str) -> bool:
return re.match(r'^default', path)
def _is_needed_secret(path: str) -> bool:
return path in secret_paths
from .tar import filter as tar_filter, rewrite as tar_rewrite, extract as tar_extract, merge as tar_merge
if only_missing:
raise NotImplementedError('--only-missing is not yet implemented')
secret_paths = await self.list_secret_paths(pkg_names)
host_root_in_tar = '/'.join(reversed(self.ctx.hostname.split('.')))
host_rx = re.compile(r'^' + host_root_in_tar)
default_rx = re.compile(r'^default')
filtered_paths: list[str] = []
extracted_paths: list[str] = []
blob_all = await _read_secret_tar_blob(src_uri)
blob_host_filtered = tar_filter(blob_all, lambda p: re.match(host_rx, p), filtered_paths)
blob_host_transformed = tar_rewrite(blob_host_filtered, lambda p: re.sub(host_rx, '', p))
blob_default_filtered = tar_filter(blob_all, lambda p: re.match(default_rx, p), filtered_paths)
blob_default_transformed = tar_rewrite(blob_default_filtered, lambda p: re.sub(default_rx, '', p))
blob_secret_material = tar_merge([blob_host_transformed, blob_default_transformed], overwrite=False)
blob_needed = tar_filter(blob_secret_material, _is_needed_secret, extracted_paths)
await tar_extract(self.ctx, blob_needed, root='/', verbose=verbose)
for path in secret_paths:
if not path in extracted_paths:
log(NOTICE, f'not extracted: {path}')