Add DistroContext.install(). It takes a tar file containing secrets, decrypts it, and installs all secrets needed on target and present in that file. For every file that should be extracted, it logs if it acutally did something or didn't.
It also features an only_missing argument, which is just a stub for "allow to define somy extraction policy with respect to replacing all / some / not replace / whatever. Not thought through.
Signed-off-by: Jan Lindemann <jan@janware.com>
94 lines
3.7 KiB
Python
94 lines
3.7 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
|
|
from pathlib import Path
|
|
|
|
from typing import TYPE_CHECKING
|
|
|
|
if TYPE_CHECKING:
|
|
from typing import Iterable
|
|
from ....lib.FileContext import FileContext
|
|
|
|
from ....lib.log import *
|
|
from ....lib.util import run_cmd
|
|
from ....lib.TarIo import TarIo
|
|
from ....lib.ProcFilterGpg import ProcFilterGpg
|
|
|
|
from .base import Attrs
|
|
from .FilesContext import FilesContext
|
|
|
|
class DistroContext(FilesContext):
|
|
|
|
def __init__(self, distro: Distro) -> None:
|
|
super().__init__(distro.ctx)
|
|
self.__distro = distro
|
|
|
|
async def match_files(self, pkg_names: Iterable[str], pattern: str) -> list[str]:
|
|
ret: list[str] = []
|
|
for pkg_name in pkg_names:
|
|
for path in await self.__distro.pkg_files(pkg_name):
|
|
if re.match(pattern, path):
|
|
ret.append(path)
|
|
return ret
|
|
|
|
async def list_template_files(self, pkg_names: Iterable[str]) -> list[str]:
|
|
if not pkg_names:
|
|
pkg_names = [p.name for p in await self.__distro.select()]
|
|
return await self.match_files(pkg_names, pattern=r'.*\.jw-tmpl$')
|
|
|
|
async def list_secret_paths(self, pkg_names: Iterable[str], ignore_missing: bool=False) -> list[str]:
|
|
ret = []
|
|
for tmpl in await self.list_template_files(pkg_names):
|
|
path = str(Path(tmpl).with_suffix(".jw-secret"))
|
|
if ignore_missing and not await self.ctx.file_exists(path):
|
|
continue
|
|
ret.append(path)
|
|
return ret
|
|
|
|
async def list_compilation_targets(self, pkg_names: Iterable[str], ignore_missing: bool=False) -> list[str]:
|
|
ret = []
|
|
for tmpl in await self.list_template_files(pkg_names):
|
|
path = tmpl.removesuffix('.jw-tmpl')
|
|
if ignore_missing and not await self.ctx.file_exists(path):
|
|
continue
|
|
ret.append(path)
|
|
return ret
|
|
|
|
async def remove_compilation_targets(self, pkg_names: Iterable[str]) -> list[str]:
|
|
for path in await self.list_compilation_targets(pkg_names):
|
|
try:
|
|
self.ctx.stat(path)
|
|
log(NOTICE, f'Removing {path}')
|
|
await self.ctx.unlink(path)
|
|
except FileNotFoundError as e:
|
|
log(DEBUG, f'Compilation target {path} doesn\'t exist (ignored)')
|
|
continue
|
|
|
|
async def compile_template_files(self, pkg_names: Iterable[str], default_attrs: Attrs) -> list[str]:
|
|
missing = 0
|
|
for target in await self.list_compilation_targets(pkg_names):
|
|
if not await self.compile_template_file(target, default_attrs):
|
|
missing += 1
|
|
if missing > 0:
|
|
log(WARNING, f'{missing} missing secrets found. You might want to add them and run sudo {app.cmdline} again')
|
|
|
|
async def install(self, src_uri: str, pkg_names: Iterable[str], only_missing: bool=False) -> None:
|
|
if only_missing:
|
|
raise NotImplementedError('--only-missing is not yet implemented')
|
|
secret_paths = await self.list_secret_paths(pkg_names)
|
|
ec = self.ctx
|
|
from ....lib.ec.Local import Local
|
|
from ....lib.FileContext import FileContext
|
|
if not isinstance(ec, Local):
|
|
ec = Local() # Security: Use a local exec context for decrypting and filtering secrets
|
|
async with TarIo.create(src=src_uri, dst=self.ctx) as tio:
|
|
tio.src.add_proc_filter(FileContext.Direction.In, ProcFilterGpg(ec=ec))
|
|
extracted_paths = await tio.extract(path_filter=secret_paths)
|
|
for path in secret_paths:
|
|
if not path in extracted_paths:
|
|
log(NOTICE, f'not extracted: {path}')
|
|
else:
|
|
log(NOTICE, f'extracted: {path}')
|