cmds.secrets: Make commands work remotely

The "secrets" class of commands currently only works on the host it's
invoked on. Use the current FileContext to allow using the existing
commands on a target host.

Signed-off-by: Jan Lindemann <jan@janware.com>
This commit is contained in:
Jan Lindemann 2026-04-17 15:49:25 +02:00
commit 1b821f3b3f
5 changed files with 299 additions and 287 deletions

View file

@ -3,35 +3,41 @@
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from functools import cached_property
from ..Cmd import Cmd as Base from ..Cmd import Cmd as Base
if TYPE_CHECKING: if TYPE_CHECKING:
from typing import Iterable from typing import Iterable
from ...lib.Distro import Distro from ...lib.Distro import Distro
from ...lib.ExecContext import ExecContext
from ..CmdDistro import CmdDistro from ..CmdDistro import CmdDistro
from .lib.util import * from .lib.DistroContext import DistroContext
class Cmd(Base): # export class Cmd(Base): # export
@cached_property
def ctx(self) -> ExecContext:
return DistroContext(self.app.distro)
async def _match_files(self, packages: Iterable[str], pattern: str) -> list[str]: async def _match_files(self, packages: Iterable[str], pattern: str) -> list[str]:
return await match_files(self.distro, packages, pattern) return await self.ctx.match_files(packages, pattern)
async def _list_template_files(self, packages: Iterable[str]) -> list[str]: async def _list_template_files(self, packages: Iterable[str]) -> list[str]:
return await list_template_files(self.distro, packages) return await self.ctx.list_template_files(packages)
async def _list_secret_paths(self, packages: Iterable[str], ignore_missing: bool=False) -> list[str]: async def _list_secret_paths(self, packages: Iterable[str], ignore_missing: bool=False) -> list[str]:
return await list_secret_paths(self.distro, packages, ignore_missing) return await self.ctx.list_secret_paths(packages, ignore_missing)
async def _list_compilation_targets(self, packages: Iterable[str], ignore_missing: bool=False) -> list[str]: async def _list_compilation_targets(self, packages: Iterable[str], ignore_missing: bool=False) -> list[str]:
return await list_compilation_targets(self.distro, packages, ignore_missing) return await self.ctx.list_compilation_targets(packages, ignore_missing)
async def _remove_compilation_targets(self, packages: Iterable[str]) -> list[str]: async def _remove_compilation_targets(self, packages: Iterable[str]) -> list[str]:
return await remove_compilation_targets(self.distro, packages) return await self.ctx.remove_compilation_targets(packages)
async def _compile_template_files(self, packages: Iterable[str], default_attrs: Attrs) -> list[str]: async def _compile_template_files(self, packages: Iterable[str], default_attrs: Attrs) -> list[str]:
return await compile_template_files(self.distro, packages, default_attrs) return await self.ctx.compile_template_files(packages, default_attrs)
def __init__(self, parent: CmdDistro, name: str, help: str) -> None: def __init__(self, parent: CmdDistro, name: str, help: str) -> None:
super().__init__(parent, name, help) super().__init__(parent, name, help)

View file

@ -3,7 +3,7 @@
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from .lib.util import Attrs from .lib.base import Attrs
from .Cmd import Cmd from .Cmd import Cmd

View file

@ -0,0 +1,73 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import re
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Iterable
from ....lib.FileContext import FileContext
from ...CmdDistro import CmdDistro
from ....lib.log import *
from ....lib.util import run_cmd
from .base import Attrs
from .FilesContext import FilesContext
class DistroContext(FilesContext):
def __init__(self, distro: Distro) -> None:
super().__init__(distro.ctx)
self.__distro = distro
async def match_files(self, packages: Iterable[str], pattern: str) -> list[str]:
ret: list[str] = []
for package_name in packages:
for path in await self.__distro.pkg_files(package_name):
if re.match(pattern, path):
ret.append(path)
return ret
async def list_template_files(self, packages: Iterable[str]) -> list[str]:
return await self.match_files(packages, pattern=r'.*\.jw-tmpl$')
async def list_secret_paths(self, packages: Iterable[str], ignore_missing: bool=False) -> list[str]:
ret = []
for tmpl in await self.list_template_files(packages):
path = str(Path(tmpl).with_suffix(".jw-secret"))
if ignore_missing and not self.ctx.file_exists(path):
continue
ret.append(path)
return ret
async def list_compilation_targets(self, packages: Iterable[str], ignore_missing: bool=False) -> list[str]:
ret = []
for tmpl in await self.list_template_files(packages):
path = tmpl.removesuffix('.jw-tmpl')
if ignore_missing and not self.ctx.file_exists(path):
continue
ret.append(path)
return ret
async def remove_compilation_targets(self, packages: Iterable[str]) -> list[str]:
for path in await self.list_compilation_targets(packages):
try:
self.ctx.stat(path)
log(NOTICE, f'Removing {path}')
await self.ctx.unlink(path)
except FileNotFoundError as e:
log(DEBUG, f'Compilation target {path} doesn\'t exist (ignored)')
continue
async def compile_template_files(self, packages: Iterable[str], default_attrs: Attrs) -> list[str]:
missing = 0
for target in await self.list_compilation_targets(packages):
if not await self.compile_template_file(target, default_attrs):
missing += 1
if missing > 0:
log(WARNING, f'{missing} missing secrets found. You might want to add them and run sudo {app.cmdline} again')

View file

@ -0,0 +1,204 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import re, stat, copy
from contextlib import suppress
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Iterable
from ....lib.FileContext import FileContext
from ....lib.log import *
from ....lib.util import run_cmd
from .base import Attrs
class FilesContext:
def __init__(self, ctx: FileContext) -> None:
self.__ctx = ctx
@property
def ctx(self) -> FileContext:
return self.__ctx
async def _read_key_value_file(self, path: str, throw=False) -> dict[str, str]:
ret: dict[str, str] = {}
try:
result = await self.ctx.get(path)
for line in result.stdout.decode().splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, val = line.split("=", 1)
key = key.strip()
val = val.strip()
if key:
ret[key] = val
except FileNotFoundError:
log(DEBUG, f'File not found {path}')
return ret
def _parse_attributes(self, content: str) -> Attrs:
if not content:
return None
first_line = content.splitlines()[0]
if not re.match(r"^\s*#\s*conf\s*:", first_line):
return None
ret = Attrs()
ret.conf = first_line
m = re.match(r"^\s*#\s*conf\s*:\s*(.*?)\s*$", first_line)
if not m:
return ret
for part in re.split(r'[; ,]', m.group(1)):
part = part.strip()
if not part or "=" not in part:
continue
key, val = part.split("=", 1)
key = key.strip()
val = val.strip()
if key == "owner":
ret.owner = val or None
elif key == "group":
ret.group = val or None
elif key == "mode":
if val:
try:
if re.fullmatch(r"0[0-7]+", val):
ret.mode = int(val, 8)
else:
ret.mode = int(val, 0)
except ValueError:
ret.mode = None
return ret
async def _read_attributes(self, paths: Iterable[str]) -> Attrs|None:
ret = Attrs()
for path in paths:
try:
result = await self.ctx.get(path)
lines = result.stdout.decode().splitlines()
if lines:
ret.update(self._parse_attributes(lines[0]))
except FileNotFoundError:
log(DEBUG, f'Can\'t parse "{path}" for attributes, file doesn\'t exist (ignored)')
return ret
def _format_metadata(self, owner: str, group: str, mode: int) -> str:
return f"{owner}:{group} {mode:o}"
async def _compile_one_template_file(
self,
src: str,
dst: str,
default_attrs: Attrs | None,
replace: dict[str, str] = {},
) -> None:
owner = "root"
group = "root"
mode = 0o400
new_content = (await self.ctx.get(src)).stdout.decode()
attrs = self._parse_attributes(new_content)
if attrs is None:
attrs = default_attrs
if attrs is not None:
if attrs.owner is not None:
owner = attrs.owner
if attrs.group is not None:
group = attrs.group
if attrs.mode is not None:
mode = attrs.mode
new_meta = self._format_metadata(owner, group, mode)
for key, val in replace.items():
new_content = new_content.replace(key, val)
tmp_path: str|None = None
try:
tmp_path = await self.ctx.mktemp(dst + '.jw-pkg.XXXXX')
await self.ctx.put(tmp_path, new_content.encode('utf-8'), owner=owner, group=group, mode=mode)
content_changed = True
metadata_changed = True
old_meta = "<missing>"
try:
st = await self.ctx.stat(dst)
except FileNotFoundError:
pass
else:
old_mode = stat.S_IMODE(st.mode)
old_meta = self._format_metadata(st.owner, st.group, old_mode)
old_content = (await self.ctx.get(dst)).stdout.decode()
content_changed = old_content != new_content
metadata_changed = (
st.owner != owner
or st.group != group
or old_mode != mode
)
changes = []
if content_changed:
changes.append("@content")
if metadata_changed:
changes.append(f"@metadata ({old_meta} -> {new_meta})")
details = ", ".join(changes) if changes else "no changes"
log(NOTICE, f"Applying macros in {src} to {dst}: {details}")
if not changes:
await self.ctx.unlink(tmp_path)
tmp_path = None
return
await self.ctx.rename(tmp_path, dst)
tmp_path = None
finally:
if tmp_path is not None:
with suppress(FileNotFoundError):
await self.ctx.unlink(tmp_path)
async def compile_template_file(self, target_path: str, default_attrs: Attrs|None=None) -> bool:
path_tmpl = target_path + '.jw-tmpl'
path_secret_file = target_path + '.jw-secret-file'
path_secret = target_path + '.jw-secret'
attrs = copy.deepcopy(default_attrs if default_attrs is not None else Attrs())
attrs.update(await self._read_attributes([
path_tmpl,
path_secret_file,
path_secret
]))
replace = await self._read_key_value_file(path_secret)
for src in [ path_secret_file, path_tmpl ]:
try:
await self._compile_one_template_file(src=src, dst=target_path, default_attrs=attrs, replace=replace)
return True
except FileNotFoundError as e:
log(DEBUG, f'Compilation source {src} doesn\'t exist (ignored)')
continue
log(WARNING, f'No secret found for target {target_path}, not compiling')
return False

View file

@ -1,281 +1,10 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from __future__ import annotations from ....lib.ec.FileContext import FileContext
from ....lib.ec.Local import Local
from .FilesContext import FilesContext
import re, os, stat, pwd, grp, tempfile, copy async def compile_template_file(target_path: str, default_attrs: Attrs|None=None, ctx: FileContext|None=None) -> bool:
if ctx is None:
from contextlib import suppress ctx = Local()
from pathlib import Path FilesContext().compile_template_file(target_path, default_attrs=default_attrs)
from dataclasses import dataclass
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Iterable
from ....lib.Distro import Distro
from ...CmdDistro import CmdDistro
from ....lib.log import *
from ....lib.util import run_cmd
@dataclass
class Attrs:
mode: int | None = None
owner: str | None = None
group: str | None = None
conf: str | None = None
def update(self, rhs: Args|None) -> Args:
if rhs is not None:
if rhs.mode:
self.mode = rhs.mode
if rhs.owner:
self.owner = rhs.owner
if rhs.group:
self.group = rhs.group
if rhs.conf:
self.conf = rhs.conf
return self
def emtpy(self) -> bool:
if self.mode is not None:
return False
if self.owner is not None:
return False
if self.group is not None:
return False
return True
def _read_key_value_file(path: str, throw=False) -> dict[str, str]:
ret: dict[str, str] = {}
try:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, val = line.split("=", 1)
key = key.strip()
val = val.strip()
if key:
ret[key] = val
except FileNotFoundError:
log(DEBUG, f'File not found {path}')
return ret
def _parse_attributes(content: str) -> Attrs:
first_line = content.splitlines()[0] if content else ""
if not re.match(r"^\s*#\s*conf\s*:", first_line):
return None
ret = Attrs()
ret.conf = first_line
m = re.match(r"^\s*#\s*conf\s*:\s*(.*?)\s*$", first_line)
if not m:
return ret
for part in re.split(r'[; ,]', m.group(1)):
part = part.strip()
if not part or "=" not in part:
continue
key, val = part.split("=", 1)
key = key.strip()
val = val.strip()
if key == "owner":
ret.owner = val or None
elif key == "group":
ret.group = val or None
elif key == "mode":
if val:
try:
if re.fullmatch(r"0[0-7]+", val):
ret.mode = int(val, 8)
else:
ret.mode = int(val, 0)
except ValueError:
ret.mode = None
return ret
def _read_attributes(paths: Iterable[str]) -> Attrs|None:
ret = Attrs()
for path in paths:
try:
with open(path, 'r') as f:
ret.update(_parse_attributes(f.read()))
except FileNotFoundError:
log(DEBUG, f'Can\'t parse "{path}" for attributes, file doesn\'t exist (ignored)')
return ret
def _format_metadata(uid: int, gid: int, mode: int) -> str:
return f"{pwd.getpwuid(uid).pw_name}:{grp.getgrgid(gid).gr_name} {mode:o}"
def _copy_securely(
src: str,
dst: str,
default_attrs: Attrs | None,
replace: dict[str, str] = {},
) -> None:
owner = "root"
group = "root"
mode = 0o400
with open(src, "r", encoding="utf-8", newline="") as f:
content = f.read()
attrs = _parse_attributes(content)
if attrs is None:
attrs = default_attrs
if attrs is not None:
if attrs.owner is not None:
owner = attrs.owner
if attrs.group is not None:
group = attrs.group
if attrs.mode is not None:
mode = attrs.mode
new_uid = pwd.getpwnam(owner).pw_uid
new_gid = grp.getgrnam(group).gr_gid
new_meta = _format_metadata(new_uid, new_gid, mode)
for key, val in replace.items():
content = content.replace(key, val)
dst_dir = os.path.dirname(os.path.abspath(dst))
tmp_fd, tmp_path = tempfile.mkstemp(
prefix=f".{os.path.basename(dst)}.",
dir=dst_dir,
)
try:
os.fchown(tmp_fd, new_uid, new_gid)
os.fchmod(tmp_fd, mode)
with os.fdopen(tmp_fd, "w", encoding="utf-8", newline="") as f:
tmp_fd = None
f.write(content)
f.flush()
os.fsync(f.fileno())
content_changed = True
metadata_changed = True
old_meta = "<missing>"
try:
st = os.stat(dst)
except FileNotFoundError:
pass
else:
old_mode = stat.S_IMODE(st.st_mode)
old_meta = _format_metadata(st.st_uid, st.st_gid, old_mode)
with open(dst, "r", encoding="utf-8", newline="") as f:
old_content = f.read()
content_changed = old_content != content
metadata_changed = (
st.st_uid != new_uid
or st.st_gid != new_gid
or old_mode != mode
)
changes = []
if content_changed:
changes.append("@content")
if metadata_changed:
changes.append(f"@metadata ({old_meta} -> {new_meta})")
details = ", ".join(changes) if changes else "no changes"
log(NOTICE, f"Applying macros in {src} to {dst}: {details}")
if not changes:
os.unlink(tmp_path)
tmp_path = None
return
os.replace(tmp_path, dst)
tmp_path = None
dir_fd = os.open(dst_dir, os.O_DIRECTORY)
try:
os.fsync(dir_fd)
finally:
os.close(dir_fd)
finally:
if tmp_fd is not None:
os.close(tmp_fd)
if tmp_path is not None:
with suppress(FileNotFoundError):
os.unlink(tmp_path)
async def match_files(distro: Distro, packages: Iterable[str], pattern: str) -> list[str]:
ret: list[str] = []
for package_name in packages:
for path in await distro.pkg_files(package_name):
if re.match(pattern, path):
ret.append(path)
return ret
async def list_template_files(distro: Distro, packages: Iterable[str]) -> list[str]:
return await match_files(distro, packages, pattern=r'.*\.jw-tmpl$')
async def list_secret_paths(distro: Distro, packages: Iterable[str], ignore_missing: bool=False) -> list[str]:
ret = []
for tmpl in await list_template_files(distro, packages):
path = str(Path(tmpl).with_suffix(".jw-secret"))
if ignore_missing and not os.path.exists(path):
continue
ret.append(path)
return ret
async def list_compilation_targets(distro: Distro, packages: Iterable[str], ignore_missing: bool=False) -> list[str]:
ret = []
for tmpl in await list_template_files(distro, packages):
path = tmpl.removesuffix('.jw-tmpl')
if ignore_missing and not os.path.exists(path):
continue
ret.append(path)
return ret
async def remove_compilation_targets(distro: Distro, packages: Iterable[str]) -> list[str]:
for path in await list_compilation_targets(distro, packages):
if os.path.exists(path):
log(NOTICE, f'Removing {path}')
os.unlink(path)
async def compile_template_file(target_path: str, default_attrs: Attrs|None=None) -> bool:
path_tmpl = target_path + '.jw-tmpl'
path_secret_file = target_path + '.jw-secret-file'
path_secret = target_path + '.jw-secret'
attrs = copy.deepcopy(default_attrs if default_attrs is not None else Attrs())
attrs.update(_read_attributes([
path_tmpl,
path_secret_file,
path_secret
]))
replace = _read_key_value_file(path_secret)
for src in [ path_secret_file, path_tmpl ]:
if os.path.exists(src):
_copy_securely(src=src, dst=target_path, default_attrs=attrs, replace=replace)
return True
log(WARNING, f'No secret found for target {target_path}, not compiling')
return False
async def compile_template_files(distro: Distro, packages: Iterable[str], default_attrs: Attrs) -> list[str]:
missing = 0
for target in await list_compilation_targets(distro, packages):
if not await compile_template_file(target, default_attrs):
missing += 1
if missing > 0:
log(WARNING, f'{missing} missing secrets found. You might want to add them and run sudo {app.cmdline} again')