From 5ad65f85fd8ad3ba35732d9105e48a0ccf04718e Mon Sep 17 00:00:00 2001 From: Jan Lindemann Date: Sat, 11 Apr 2026 10:16:54 +0200 Subject: [PATCH] cmds.secrets.lib.util: Add module To be able to use secret handling code from other modules, move the bulk of it from the "secrets"-command centric implementation in cmds.secrets.Cmd into a new module cmds.secrets.lib.util. Signed-off-by: Jan Lindemann --- src/python/jw/pkg/cmds/secrets/Cmd.py | 257 +--------------- .../pkg/cmds/secrets/CmdCompileTemplates.py | 4 +- src/python/jw/pkg/cmds/secrets/lib/Makefile | 4 + src/python/jw/pkg/cmds/secrets/lib/util.py | 281 ++++++++++++++++++ 4 files changed, 297 insertions(+), 249 deletions(-) create mode 100644 src/python/jw/pkg/cmds/secrets/lib/Makefile create mode 100644 src/python/jw/pkg/cmds/secrets/lib/util.py diff --git a/src/python/jw/pkg/cmds/secrets/Cmd.py b/src/python/jw/pkg/cmds/secrets/Cmd.py index ed1797ee..f00b3012 100644 --- a/src/python/jw/pkg/cmds/secrets/Cmd.py +++ b/src/python/jw/pkg/cmds/secrets/Cmd.py @@ -2,275 +2,36 @@ from __future__ import annotations -import re, os, stat, pwd, grp, tempfile - -from contextlib import suppress -from pathlib import Path -from dataclasses import dataclass - from typing import TYPE_CHECKING +from ..Cmd import Cmd as Base + if TYPE_CHECKING: from typing import Iterable from ...lib.Distro import Distro from ..CmdDistro import CmdDistro -from ...lib.log import * -from ..Cmd import Cmd as Base +from .lib.util import * class Cmd(Base): # export - @dataclass - class Attrs: - - mode: int | None = None - owner: str | None = None - group: str | None = None - conf: str | None = None - - def update(self, rhs: Args|None) -> Args: - if rhs is not None: - if rhs.mode: - self.mode = rhs.mode - if rhs.owner: - self.owner = rhs.owner - if rhs.group: - self.group = rhs.group - if rhs.conf: - self.conf = rhs.conf - return self - - def emtpy(self) -> bool: - if self.mode is not None: - return False - if self.owner is not None: - return False - if self.group is not None: - return False - return True - - def __read_key_value_file(self, path: str) -> dict[str, str]: - ret: dict[str, str] = {} - with open(path, "r", encoding="utf-8") as f: - for line in f: - line = line.strip() - if not line or line.startswith("#"): - continue - if "=" not in line: - continue - key, val = line.split("=", 1) - key = key.strip() - val = val.strip() - if key: - ret[key] = val - return ret - - def __parse_attributes(self, content: str) -> Attrs: - - first_line = content.splitlines()[0] if content else "" - if not re.match(r"^\s*#\s*conf\s*:", first_line): - return None - - ret = self.Attrs() - - ret.conf = first_line - - m = re.match(r"^\s*#\s*conf\s*:\s*(.*?)\s*$", first_line) - if not m: - return ret - - for part in m.group(1).split(";"): - part = part.strip() - if not part or "=" not in part: - continue - - key, val = part.split("=", 1) - key = key.strip() - val = val.strip() - - if key == "owner": - ret.owner = val or None - elif key == "group": - ret.group = val or None - elif key == "mode": - if val: - try: - if re.fullmatch(r"0[0-7]+", val): - ret.mode = int(val, 8) - else: - ret.mode = int(val, 0) - except ValueError: - ret.mode = None - - return ret - - def __read_attributes(self, paths: Iterable[str]) -> Attrs|None: - for path in paths: - try: - with open(path, 'r') as f: - return self.__parse_attributes(f.read(path)) - except: - continue - - def __format_metadata(self, uid: int, gid: int, mode: int) -> str: - return f"{pwd.getpwuid(uid).pw_name}:{grp.getgrgid(gid).gr_name} {mode:o}" - - def __copy_securely( - self, - src: str, - dst: str, - default_attrs: Attrs | None, - replace: dict[str, str] = {}, - ) -> None: - - owner = "root" - group = "root" - mode = 0o400 - - with open(src, "r", encoding="utf-8", newline="") as f: - content = f.read() - - attrs = self.__parse_attributes(content) - if attrs is None: - attrs = default_attrs - - if attrs is not None: - if attrs.owner is not None: - owner = attrs.owner - if attrs.group is not None: - group = attrs.group - if attrs.mode is not None: - mode = attrs.mode - - new_uid = pwd.getpwnam(owner).pw_uid - new_gid = grp.getgrnam(group).gr_gid - new_meta = self.__format_metadata(new_uid, new_gid, mode) - - for key, val in replace.items(): - content = content.replace(key, val) - - dst_dir = os.path.dirname(os.path.abspath(dst)) - tmp_fd, tmp_path = tempfile.mkstemp( - prefix=f".{os.path.basename(dst)}.", - dir=dst_dir, - ) - - try: - os.fchown(tmp_fd, new_uid, new_gid) - os.fchmod(tmp_fd, mode) - - with os.fdopen(tmp_fd, "w", encoding="utf-8", newline="") as f: - tmp_fd = None - f.write(content) - f.flush() - os.fsync(f.fileno()) - - content_changed = True - metadata_changed = True - old_meta = "" - - try: - st = os.stat(dst) - except FileNotFoundError: - pass - else: - old_mode = stat.S_IMODE(st.st_mode) - old_meta = self.__format_metadata(st.st_uid, st.st_gid, old_mode) - - with open(dst, "r", encoding="utf-8", newline="") as f: - old_content = f.read() - - content_changed = old_content != content - metadata_changed = ( - st.st_uid != new_uid - or st.st_gid != new_gid - or old_mode != mode - ) - - changes = [] - if content_changed: - changes.append("@content") - if metadata_changed: - changes.append(f"@metadata ({old_meta} -> {new_meta})") - - details = ", ".join(changes) if changes else "no changes" - log(NOTICE, f"Applying macros in {src} to {dst}: {details}") - - if not changes: - os.unlink(tmp_path) - tmp_path = None - return - - os.replace(tmp_path, dst) - tmp_path = None - - dir_fd = os.open(dst_dir, os.O_DIRECTORY) - try: - os.fsync(dir_fd) - finally: - os.close(dir_fd) - - finally: - if tmp_fd is not None: - os.close(tmp_fd) - if tmp_path is not None: - with suppress(FileNotFoundError): - os.unlink(tmp_path) - async def _match_files(self, packages: Iterable[str], pattern: str) -> list[str]: - ret: list[str] = [] - for package_name in packages: - for path in await self.distro.pkg_files(package_name): - if re.match(pattern, path): - ret.append(path) - return ret + return await match_files(self.distro, packages, pattern) async def _list_template_files(self, packages: Iterable[str]) -> list[str]: - return await self._match_files(packages, pattern=r'.*\.jw-tmpl$') + return await list_template_files(self.distro, packages) async def _list_secret_paths(self, packages: Iterable[str], ignore_missing: bool=False) -> list[str]: - ret = [] - for tmpl in await self._list_template_files(packages): - path = str(Path(tmpl).with_suffix(".jw-secret")) - if ignore_missing and not os.path.exists(path): - continue - ret.append(path) - return ret + return await list_secret_paths(self.distro, packages, ignore_missing) async def _list_compilation_targets(self, packages: Iterable[str], ignore_missing: bool=False) -> list[str]: - ret = [] - for tmpl in await self._list_template_files(packages): - path = tmpl.removesuffix('.jw-tmpl') - if ignore_missing and not os.path.exists(path): - continue - ret.append(path) - return ret + return await list_compilation_targets(self.distro, packages, ignore_missing) async def _remove_compilation_targets(self, packages: Iterable[str]) -> list[str]: - for path in await self._list_compilation_targets(packages): - if os.path.exists(path): - log(NOTICE, f'Removing {path}') - os.unlink(path) + return await remove_compilation_targets(self.distro, packages) async def _compile_template_files(self, packages: Iterable[str], default_attrs: Attrs) -> list[str]: - missing = 0 - if default_attrs is None: - default_attrs = Attrs() - for target in await self._list_compilation_targets(packages): - attrs = default_attrs - attrs.update(self.__read_attributes([target + '.jw-tmpl'])) - secret = target + '.jw-secret' - replace = {} if not os.path.exists(secret) else self.__read_key_value_file(secret) - for ext in [ '.jw-secret-file', '.jw-tmpl' ]: - src = target + ext - if os.path.exists(src): - self.__copy_securely(src=src, dst=target, default_attrs=attrs, replace=replace) - break - else: - log(WARNING, f'No secret found for target {target}, not compiling') - missing += 1 - if missing > 0: - log(WARNING, f'{missing} missing secrets found. You might want to add them and run sudo {app.cmdline} again') + return await compile_template_files(self.distro, packages, default_attrs) def __init__(self, parent: CmdDistro, name: str, help: str) -> None: super().__init__(parent, name, help) diff --git a/src/python/jw/pkg/cmds/secrets/CmdCompileTemplates.py b/src/python/jw/pkg/cmds/secrets/CmdCompileTemplates.py index 25a42e9f..c6a1b0bc 100644 --- a/src/python/jw/pkg/cmds/secrets/CmdCompileTemplates.py +++ b/src/python/jw/pkg/cmds/secrets/CmdCompileTemplates.py @@ -3,6 +3,8 @@ from __future__ import annotations from typing import TYPE_CHECKING +from .lib.util import Attrs + from .Cmd import Cmd if TYPE_CHECKING: @@ -15,7 +17,7 @@ class CmdCompileTemplates(Cmd): # export super().__init__(parent, 'compile-templates', help="Compile package template files") async def _run(self, args: Namespace) -> None: - attrs = Cmd.Attrs(args.mode, args.owner, args.group, None) + attrs = Attrs(args.mode, args.owner, args.group, None) await self._compile_template_files(args.packages, attrs) def add_arguments(self, parser: ArgumentParser) -> None: diff --git a/src/python/jw/pkg/cmds/secrets/lib/Makefile b/src/python/jw/pkg/cmds/secrets/lib/Makefile new file mode 100644 index 00000000..19fedac9 --- /dev/null +++ b/src/python/jw/pkg/cmds/secrets/lib/Makefile @@ -0,0 +1,4 @@ +TOPDIR = ../../../../../../.. + +include $(TOPDIR)/make/proj.mk +include $(JWBDIR)/make/py-mod.mk diff --git a/src/python/jw/pkg/cmds/secrets/lib/util.py b/src/python/jw/pkg/cmds/secrets/lib/util.py new file mode 100644 index 00000000..5fd4eac8 --- /dev/null +++ b/src/python/jw/pkg/cmds/secrets/lib/util.py @@ -0,0 +1,281 @@ +# -*- coding: utf-8 -*- + +from __future__ import annotations + +import re, os, stat, pwd, grp, tempfile, copy + +from contextlib import suppress +from pathlib import Path +from dataclasses import dataclass + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from typing import Iterable + from ....lib.Distro import Distro + from ...CmdDistro import CmdDistro + +from ....lib.log import * +from ....lib.util import run_cmd + +@dataclass +class Attrs: + + mode: int | None = None + owner: str | None = None + group: str | None = None + conf: str | None = None + + def update(self, rhs: Args|None) -> Args: + if rhs is not None: + if rhs.mode: + self.mode = rhs.mode + if rhs.owner: + self.owner = rhs.owner + if rhs.group: + self.group = rhs.group + if rhs.conf: + self.conf = rhs.conf + return self + + def emtpy(self) -> bool: + if self.mode is not None: + return False + if self.owner is not None: + return False + if self.group is not None: + return False + return True + +def _read_key_value_file(path: str, throw=False) -> dict[str, str]: + ret: dict[str, str] = {} + try: + with open(path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line or line.startswith("#"): + continue + if "=" not in line: + continue + key, val = line.split("=", 1) + key = key.strip() + val = val.strip() + if key: + ret[key] = val + except FileNotFoundError: + log(DEBUG, f'File not found {path}') + return ret + +def _parse_attributes(content: str) -> Attrs: + + first_line = content.splitlines()[0] if content else "" + if not re.match(r"^\s*#\s*conf\s*:", first_line): + return None + + ret = Attrs() + + ret.conf = first_line + + m = re.match(r"^\s*#\s*conf\s*:\s*(.*?)\s*$", first_line) + if not m: + return ret + + for part in re.split(r'[; ,]', m.group(1)): + part = part.strip() + if not part or "=" not in part: + continue + + key, val = part.split("=", 1) + key = key.strip() + val = val.strip() + + if key == "owner": + ret.owner = val or None + elif key == "group": + ret.group = val or None + elif key == "mode": + if val: + try: + if re.fullmatch(r"0[0-7]+", val): + ret.mode = int(val, 8) + else: + ret.mode = int(val, 0) + except ValueError: + ret.mode = None + + return ret + +def _read_attributes(paths: Iterable[str]) -> Attrs|None: + ret = Attrs() + for path in paths: + try: + with open(path, 'r') as f: + ret.update(_parse_attributes(f.read())) + except FileNotFoundError: + log(DEBUG, f'Can\'t parse "{path}" for attributes, file doesn\'t exist (ignored)') + return ret + +def _format_metadata(uid: int, gid: int, mode: int) -> str: + return f"{pwd.getpwuid(uid).pw_name}:{grp.getgrgid(gid).gr_name} {mode:o}" + +def _copy_securely( + src: str, + dst: str, + default_attrs: Attrs | None, + replace: dict[str, str] = {}, +) -> None: + + owner = "root" + group = "root" + mode = 0o400 + + with open(src, "r", encoding="utf-8", newline="") as f: + content = f.read() + + attrs = _parse_attributes(content) + if attrs is None: + attrs = default_attrs + + if attrs is not None: + if attrs.owner is not None: + owner = attrs.owner + if attrs.group is not None: + group = attrs.group + if attrs.mode is not None: + mode = attrs.mode + + new_uid = pwd.getpwnam(owner).pw_uid + new_gid = grp.getgrnam(group).gr_gid + new_meta = _format_metadata(new_uid, new_gid, mode) + + for key, val in replace.items(): + content = content.replace(key, val) + + dst_dir = os.path.dirname(os.path.abspath(dst)) + tmp_fd, tmp_path = tempfile.mkstemp( + prefix=f".{os.path.basename(dst)}.", + dir=dst_dir, + ) + + try: + os.fchown(tmp_fd, new_uid, new_gid) + os.fchmod(tmp_fd, mode) + + with os.fdopen(tmp_fd, "w", encoding="utf-8", newline="") as f: + tmp_fd = None + f.write(content) + f.flush() + os.fsync(f.fileno()) + + content_changed = True + metadata_changed = True + old_meta = "" + + try: + st = os.stat(dst) + except FileNotFoundError: + pass + else: + old_mode = stat.S_IMODE(st.st_mode) + old_meta = _format_metadata(st.st_uid, st.st_gid, old_mode) + + with open(dst, "r", encoding="utf-8", newline="") as f: + old_content = f.read() + + content_changed = old_content != content + metadata_changed = ( + st.st_uid != new_uid + or st.st_gid != new_gid + or old_mode != mode + ) + + changes = [] + if content_changed: + changes.append("@content") + if metadata_changed: + changes.append(f"@metadata ({old_meta} -> {new_meta})") + + details = ", ".join(changes) if changes else "no changes" + log(NOTICE, f"Applying macros in {src} to {dst}: {details}") + + if not changes: + os.unlink(tmp_path) + tmp_path = None + return + + os.replace(tmp_path, dst) + tmp_path = None + + dir_fd = os.open(dst_dir, os.O_DIRECTORY) + try: + os.fsync(dir_fd) + finally: + os.close(dir_fd) + + finally: + if tmp_fd is not None: + os.close(tmp_fd) + if tmp_path is not None: + with suppress(FileNotFoundError): + os.unlink(tmp_path) + +async def match_files(distro: Distro, packages: Iterable[str], pattern: str) -> list[str]: + ret: list[str] = [] + for package_name in packages: + for path in await distro.pkg_files(package_name): + if re.match(pattern, path): + ret.append(path) + return ret + +async def list_template_files(distro: Distro, packages: Iterable[str]) -> list[str]: + return await match_files(distro, packages, pattern=r'.*\.jw-tmpl$') + +async def list_secret_paths(distro: Distro, packages: Iterable[str], ignore_missing: bool=False) -> list[str]: + ret = [] + for tmpl in await list_template_files(distro, packages): + path = str(Path(tmpl).with_suffix(".jw-secret")) + if ignore_missing and not os.path.exists(path): + continue + ret.append(path) + return ret + +async def list_compilation_targets(distro: Distro, packages: Iterable[str], ignore_missing: bool=False) -> list[str]: + ret = [] + for tmpl in await list_template_files(distro, packages): + path = tmpl.removesuffix('.jw-tmpl') + if ignore_missing and not os.path.exists(path): + continue + ret.append(path) + return ret + +async def remove_compilation_targets(distro: Distro, packages: Iterable[str]) -> list[str]: + for path in await list_compilation_targets(distro, packages): + if os.path.exists(path): + log(NOTICE, f'Removing {path}') + os.unlink(path) + +async def compile_template_file(target_path: str, default_attrs: Attrs|None=None) -> bool: + path_tmpl = target_path + '.jw-tmpl' + path_secret_file = target_path + '.jw-secret-file' + path_secret = target_path + '.jw-secret' + attrs = copy.deepcopy(default_attrs if default_attrs is not None else Attrs()) + attrs.update(_read_attributes([ + path_tmpl, + path_secret_file, + path_secret + ])) + replace = _read_key_value_file(path_secret) + for src in [ path_secret_file, path_tmpl ]: + if os.path.exists(src): + _copy_securely(src=src, dst=target_path, default_attrs=attrs, replace=replace) + return True + log(WARNING, f'No secret found for target {target_path}, not compiling') + return False + +async def compile_template_files(distro: Distro, packages: Iterable[str], default_attrs: Attrs) -> list[str]: + missing = 0 + for target in await list_compilation_targets(distro, packages): + if not await compile_template_file(target, default_attrs): + missing += 1 + if missing > 0: + log(WARNING, f'{missing} missing secrets found. You might want to add them and run sudo {app.cmdline} again')