cmds.secrets.lib.util: Add module

To be able to use secret handling code from other modules, move the
bulk of it from the "secrets"-command centric implementation in
cmds.secrets.Cmd into a new module cmds.secrets.lib.util.

Signed-off-by: Jan Lindemann <jan@janware.com>
This commit is contained in:
Jan Lindemann 2026-04-11 10:16:54 +02:00
commit 5ad65f85fd
4 changed files with 297 additions and 249 deletions

View file

@ -2,275 +2,36 @@
from __future__ import annotations
import re, os, stat, pwd, grp, tempfile
from contextlib import suppress
from pathlib import Path
from dataclasses import dataclass
from typing import TYPE_CHECKING
from ..Cmd import Cmd as Base
if TYPE_CHECKING:
from typing import Iterable
from ...lib.Distro import Distro
from ..CmdDistro import CmdDistro
from ...lib.log import *
from ..Cmd import Cmd as Base
from .lib.util import *
class Cmd(Base): # export
@dataclass
class Attrs:
mode: int | None = None
owner: str | None = None
group: str | None = None
conf: str | None = None
def update(self, rhs: Args|None) -> Args:
if rhs is not None:
if rhs.mode:
self.mode = rhs.mode
if rhs.owner:
self.owner = rhs.owner
if rhs.group:
self.group = rhs.group
if rhs.conf:
self.conf = rhs.conf
return self
def emtpy(self) -> bool:
if self.mode is not None:
return False
if self.owner is not None:
return False
if self.group is not None:
return False
return True
def __read_key_value_file(self, path: str) -> dict[str, str]:
ret: dict[str, str] = {}
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, val = line.split("=", 1)
key = key.strip()
val = val.strip()
if key:
ret[key] = val
return ret
def __parse_attributes(self, content: str) -> Attrs:
first_line = content.splitlines()[0] if content else ""
if not re.match(r"^\s*#\s*conf\s*:", first_line):
return None
ret = self.Attrs()
ret.conf = first_line
m = re.match(r"^\s*#\s*conf\s*:\s*(.*?)\s*$", first_line)
if not m:
return ret
for part in m.group(1).split(";"):
part = part.strip()
if not part or "=" not in part:
continue
key, val = part.split("=", 1)
key = key.strip()
val = val.strip()
if key == "owner":
ret.owner = val or None
elif key == "group":
ret.group = val or None
elif key == "mode":
if val:
try:
if re.fullmatch(r"0[0-7]+", val):
ret.mode = int(val, 8)
else:
ret.mode = int(val, 0)
except ValueError:
ret.mode = None
return ret
def __read_attributes(self, paths: Iterable[str]) -> Attrs|None:
for path in paths:
try:
with open(path, 'r') as f:
return self.__parse_attributes(f.read(path))
except:
continue
def __format_metadata(self, uid: int, gid: int, mode: int) -> str:
return f"{pwd.getpwuid(uid).pw_name}:{grp.getgrgid(gid).gr_name} {mode:o}"
def __copy_securely(
self,
src: str,
dst: str,
default_attrs: Attrs | None,
replace: dict[str, str] = {},
) -> None:
owner = "root"
group = "root"
mode = 0o400
with open(src, "r", encoding="utf-8", newline="") as f:
content = f.read()
attrs = self.__parse_attributes(content)
if attrs is None:
attrs = default_attrs
if attrs is not None:
if attrs.owner is not None:
owner = attrs.owner
if attrs.group is not None:
group = attrs.group
if attrs.mode is not None:
mode = attrs.mode
new_uid = pwd.getpwnam(owner).pw_uid
new_gid = grp.getgrnam(group).gr_gid
new_meta = self.__format_metadata(new_uid, new_gid, mode)
for key, val in replace.items():
content = content.replace(key, val)
dst_dir = os.path.dirname(os.path.abspath(dst))
tmp_fd, tmp_path = tempfile.mkstemp(
prefix=f".{os.path.basename(dst)}.",
dir=dst_dir,
)
try:
os.fchown(tmp_fd, new_uid, new_gid)
os.fchmod(tmp_fd, mode)
with os.fdopen(tmp_fd, "w", encoding="utf-8", newline="") as f:
tmp_fd = None
f.write(content)
f.flush()
os.fsync(f.fileno())
content_changed = True
metadata_changed = True
old_meta = "<missing>"
try:
st = os.stat(dst)
except FileNotFoundError:
pass
else:
old_mode = stat.S_IMODE(st.st_mode)
old_meta = self.__format_metadata(st.st_uid, st.st_gid, old_mode)
with open(dst, "r", encoding="utf-8", newline="") as f:
old_content = f.read()
content_changed = old_content != content
metadata_changed = (
st.st_uid != new_uid
or st.st_gid != new_gid
or old_mode != mode
)
changes = []
if content_changed:
changes.append("@content")
if metadata_changed:
changes.append(f"@metadata ({old_meta} -> {new_meta})")
details = ", ".join(changes) if changes else "no changes"
log(NOTICE, f"Applying macros in {src} to {dst}: {details}")
if not changes:
os.unlink(tmp_path)
tmp_path = None
return
os.replace(tmp_path, dst)
tmp_path = None
dir_fd = os.open(dst_dir, os.O_DIRECTORY)
try:
os.fsync(dir_fd)
finally:
os.close(dir_fd)
finally:
if tmp_fd is not None:
os.close(tmp_fd)
if tmp_path is not None:
with suppress(FileNotFoundError):
os.unlink(tmp_path)
async def _match_files(self, packages: Iterable[str], pattern: str) -> list[str]:
ret: list[str] = []
for package_name in packages:
for path in await self.distro.pkg_files(package_name):
if re.match(pattern, path):
ret.append(path)
return ret
return await match_files(self.distro, packages, pattern)
async def _list_template_files(self, packages: Iterable[str]) -> list[str]:
return await self._match_files(packages, pattern=r'.*\.jw-tmpl$')
return await list_template_files(self.distro, packages)
async def _list_secret_paths(self, packages: Iterable[str], ignore_missing: bool=False) -> list[str]:
ret = []
for tmpl in await self._list_template_files(packages):
path = str(Path(tmpl).with_suffix(".jw-secret"))
if ignore_missing and not os.path.exists(path):
continue
ret.append(path)
return ret
return await list_secret_paths(self.distro, packages, ignore_missing)
async def _list_compilation_targets(self, packages: Iterable[str], ignore_missing: bool=False) -> list[str]:
ret = []
for tmpl in await self._list_template_files(packages):
path = tmpl.removesuffix('.jw-tmpl')
if ignore_missing and not os.path.exists(path):
continue
ret.append(path)
return ret
return await list_compilation_targets(self.distro, packages, ignore_missing)
async def _remove_compilation_targets(self, packages: Iterable[str]) -> list[str]:
for path in await self._list_compilation_targets(packages):
if os.path.exists(path):
log(NOTICE, f'Removing {path}')
os.unlink(path)
return await remove_compilation_targets(self.distro, packages)
async def _compile_template_files(self, packages: Iterable[str], default_attrs: Attrs) -> list[str]:
missing = 0
if default_attrs is None:
default_attrs = Attrs()
for target in await self._list_compilation_targets(packages):
attrs = default_attrs
attrs.update(self.__read_attributes([target + '.jw-tmpl']))
secret = target + '.jw-secret'
replace = {} if not os.path.exists(secret) else self.__read_key_value_file(secret)
for ext in [ '.jw-secret-file', '.jw-tmpl' ]:
src = target + ext
if os.path.exists(src):
self.__copy_securely(src=src, dst=target, default_attrs=attrs, replace=replace)
break
else:
log(WARNING, f'No secret found for target {target}, not compiling')
missing += 1
if missing > 0:
log(WARNING, f'{missing} missing secrets found. You might want to add them and run sudo {app.cmdline} again')
return await compile_template_files(self.distro, packages, default_attrs)
def __init__(self, parent: CmdDistro, name: str, help: str) -> None:
super().__init__(parent, name, help)

View file

@ -3,6 +3,8 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from .lib.util import Attrs
from .Cmd import Cmd
if TYPE_CHECKING:
@ -15,7 +17,7 @@ class CmdCompileTemplates(Cmd): # export
super().__init__(parent, 'compile-templates', help="Compile package template files")
async def _run(self, args: Namespace) -> None:
attrs = Cmd.Attrs(args.mode, args.owner, args.group, None)
attrs = Attrs(args.mode, args.owner, args.group, None)
await self._compile_template_files(args.packages, attrs)
def add_arguments(self, parser: ArgumentParser) -> None:

View file

@ -0,0 +1,4 @@
TOPDIR = ../../../../../../..
include $(TOPDIR)/make/proj.mk
include $(JWBDIR)/make/py-mod.mk

View file

@ -0,0 +1,281 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import re, os, stat, pwd, grp, tempfile, copy
from contextlib import suppress
from pathlib import Path
from dataclasses import dataclass
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Iterable
from ....lib.Distro import Distro
from ...CmdDistro import CmdDistro
from ....lib.log import *
from ....lib.util import run_cmd
@dataclass
class Attrs:
mode: int | None = None
owner: str | None = None
group: str | None = None
conf: str | None = None
def update(self, rhs: Args|None) -> Args:
if rhs is not None:
if rhs.mode:
self.mode = rhs.mode
if rhs.owner:
self.owner = rhs.owner
if rhs.group:
self.group = rhs.group
if rhs.conf:
self.conf = rhs.conf
return self
def emtpy(self) -> bool:
if self.mode is not None:
return False
if self.owner is not None:
return False
if self.group is not None:
return False
return True
def _read_key_value_file(path: str, throw=False) -> dict[str, str]:
ret: dict[str, str] = {}
try:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, val = line.split("=", 1)
key = key.strip()
val = val.strip()
if key:
ret[key] = val
except FileNotFoundError:
log(DEBUG, f'File not found {path}')
return ret
def _parse_attributes(content: str) -> Attrs:
first_line = content.splitlines()[0] if content else ""
if not re.match(r"^\s*#\s*conf\s*:", first_line):
return None
ret = Attrs()
ret.conf = first_line
m = re.match(r"^\s*#\s*conf\s*:\s*(.*?)\s*$", first_line)
if not m:
return ret
for part in re.split(r'[; ,]', m.group(1)):
part = part.strip()
if not part or "=" not in part:
continue
key, val = part.split("=", 1)
key = key.strip()
val = val.strip()
if key == "owner":
ret.owner = val or None
elif key == "group":
ret.group = val or None
elif key == "mode":
if val:
try:
if re.fullmatch(r"0[0-7]+", val):
ret.mode = int(val, 8)
else:
ret.mode = int(val, 0)
except ValueError:
ret.mode = None
return ret
def _read_attributes(paths: Iterable[str]) -> Attrs|None:
ret = Attrs()
for path in paths:
try:
with open(path, 'r') as f:
ret.update(_parse_attributes(f.read()))
except FileNotFoundError:
log(DEBUG, f'Can\'t parse "{path}" for attributes, file doesn\'t exist (ignored)')
return ret
def _format_metadata(uid: int, gid: int, mode: int) -> str:
return f"{pwd.getpwuid(uid).pw_name}:{grp.getgrgid(gid).gr_name} {mode:o}"
def _copy_securely(
src: str,
dst: str,
default_attrs: Attrs | None,
replace: dict[str, str] = {},
) -> None:
owner = "root"
group = "root"
mode = 0o400
with open(src, "r", encoding="utf-8", newline="") as f:
content = f.read()
attrs = _parse_attributes(content)
if attrs is None:
attrs = default_attrs
if attrs is not None:
if attrs.owner is not None:
owner = attrs.owner
if attrs.group is not None:
group = attrs.group
if attrs.mode is not None:
mode = attrs.mode
new_uid = pwd.getpwnam(owner).pw_uid
new_gid = grp.getgrnam(group).gr_gid
new_meta = _format_metadata(new_uid, new_gid, mode)
for key, val in replace.items():
content = content.replace(key, val)
dst_dir = os.path.dirname(os.path.abspath(dst))
tmp_fd, tmp_path = tempfile.mkstemp(
prefix=f".{os.path.basename(dst)}.",
dir=dst_dir,
)
try:
os.fchown(tmp_fd, new_uid, new_gid)
os.fchmod(tmp_fd, mode)
with os.fdopen(tmp_fd, "w", encoding="utf-8", newline="") as f:
tmp_fd = None
f.write(content)
f.flush()
os.fsync(f.fileno())
content_changed = True
metadata_changed = True
old_meta = "<missing>"
try:
st = os.stat(dst)
except FileNotFoundError:
pass
else:
old_mode = stat.S_IMODE(st.st_mode)
old_meta = _format_metadata(st.st_uid, st.st_gid, old_mode)
with open(dst, "r", encoding="utf-8", newline="") as f:
old_content = f.read()
content_changed = old_content != content
metadata_changed = (
st.st_uid != new_uid
or st.st_gid != new_gid
or old_mode != mode
)
changes = []
if content_changed:
changes.append("@content")
if metadata_changed:
changes.append(f"@metadata ({old_meta} -> {new_meta})")
details = ", ".join(changes) if changes else "no changes"
log(NOTICE, f"Applying macros in {src} to {dst}: {details}")
if not changes:
os.unlink(tmp_path)
tmp_path = None
return
os.replace(tmp_path, dst)
tmp_path = None
dir_fd = os.open(dst_dir, os.O_DIRECTORY)
try:
os.fsync(dir_fd)
finally:
os.close(dir_fd)
finally:
if tmp_fd is not None:
os.close(tmp_fd)
if tmp_path is not None:
with suppress(FileNotFoundError):
os.unlink(tmp_path)
async def match_files(distro: Distro, packages: Iterable[str], pattern: str) -> list[str]:
ret: list[str] = []
for package_name in packages:
for path in await distro.pkg_files(package_name):
if re.match(pattern, path):
ret.append(path)
return ret
async def list_template_files(distro: Distro, packages: Iterable[str]) -> list[str]:
return await match_files(distro, packages, pattern=r'.*\.jw-tmpl$')
async def list_secret_paths(distro: Distro, packages: Iterable[str], ignore_missing: bool=False) -> list[str]:
ret = []
for tmpl in await list_template_files(distro, packages):
path = str(Path(tmpl).with_suffix(".jw-secret"))
if ignore_missing and not os.path.exists(path):
continue
ret.append(path)
return ret
async def list_compilation_targets(distro: Distro, packages: Iterable[str], ignore_missing: bool=False) -> list[str]:
ret = []
for tmpl in await list_template_files(distro, packages):
path = tmpl.removesuffix('.jw-tmpl')
if ignore_missing and not os.path.exists(path):
continue
ret.append(path)
return ret
async def remove_compilation_targets(distro: Distro, packages: Iterable[str]) -> list[str]:
for path in await list_compilation_targets(distro, packages):
if os.path.exists(path):
log(NOTICE, f'Removing {path}')
os.unlink(path)
async def compile_template_file(target_path: str, default_attrs: Attrs|None=None) -> bool:
path_tmpl = target_path + '.jw-tmpl'
path_secret_file = target_path + '.jw-secret-file'
path_secret = target_path + '.jw-secret'
attrs = copy.deepcopy(default_attrs if default_attrs is not None else Attrs())
attrs.update(_read_attributes([
path_tmpl,
path_secret_file,
path_secret
]))
replace = _read_key_value_file(path_secret)
for src in [ path_secret_file, path_tmpl ]:
if os.path.exists(src):
_copy_securely(src=src, dst=target_path, default_attrs=attrs, replace=replace)
return True
log(WARNING, f'No secret found for target {target_path}, not compiling')
return False
async def compile_template_files(distro: Distro, packages: Iterable[str], default_attrs: Attrs) -> list[str]:
missing = 0
for target in await list_compilation_targets(distro, packages):
if not await compile_template_file(target, default_attrs):
missing += 1
if missing > 0:
log(WARNING, f'{missing} missing secrets found. You might want to add them and run sudo {app.cmdline} again')