diff --git a/src/python/jw/pkg/cmds/secrets/Cmd.py b/src/python/jw/pkg/cmds/secrets/Cmd.py index f00b3012..e13091be 100644 --- a/src/python/jw/pkg/cmds/secrets/Cmd.py +++ b/src/python/jw/pkg/cmds/secrets/Cmd.py @@ -3,35 +3,41 @@ from __future__ import annotations from typing import TYPE_CHECKING +from functools import cached_property from ..Cmd import Cmd as Base if TYPE_CHECKING: from typing import Iterable from ...lib.Distro import Distro + from ...lib.ExecContext import ExecContext from ..CmdDistro import CmdDistro -from .lib.util import * +from .lib.DistroContext import DistroContext class Cmd(Base): # export + @cached_property + def ctx(self) -> ExecContext: + return DistroContext(self.app.distro) + async def _match_files(self, packages: Iterable[str], pattern: str) -> list[str]: - return await match_files(self.distro, packages, pattern) + return await self.ctx.match_files(packages, pattern) async def _list_template_files(self, packages: Iterable[str]) -> list[str]: - return await list_template_files(self.distro, packages) + return await self.ctx.list_template_files(packages) async def _list_secret_paths(self, packages: Iterable[str], ignore_missing: bool=False) -> list[str]: - return await list_secret_paths(self.distro, packages, ignore_missing) + return await self.ctx.list_secret_paths(packages, ignore_missing) async def _list_compilation_targets(self, packages: Iterable[str], ignore_missing: bool=False) -> list[str]: - return await list_compilation_targets(self.distro, packages, ignore_missing) + return await self.ctx.list_compilation_targets(packages, ignore_missing) async def _remove_compilation_targets(self, packages: Iterable[str]) -> list[str]: - return await remove_compilation_targets(self.distro, packages) + return await self.ctx.remove_compilation_targets(packages) async def _compile_template_files(self, packages: Iterable[str], default_attrs: Attrs) -> list[str]: - return await compile_template_files(self.distro, packages, default_attrs) + return await self.ctx.compile_template_files(packages, default_attrs) def __init__(self, parent: CmdDistro, name: str, help: str) -> None: super().__init__(parent, name, help) diff --git a/src/python/jw/pkg/cmds/secrets/CmdCompileTemplates.py b/src/python/jw/pkg/cmds/secrets/CmdCompileTemplates.py index c6a1b0bc..c5172816 100644 --- a/src/python/jw/pkg/cmds/secrets/CmdCompileTemplates.py +++ b/src/python/jw/pkg/cmds/secrets/CmdCompileTemplates.py @@ -3,7 +3,7 @@ from __future__ import annotations from typing import TYPE_CHECKING -from .lib.util import Attrs +from .lib.base import Attrs from .Cmd import Cmd diff --git a/src/python/jw/pkg/cmds/secrets/lib/DistroContext.py b/src/python/jw/pkg/cmds/secrets/lib/DistroContext.py new file mode 100644 index 00000000..ea2ce814 --- /dev/null +++ b/src/python/jw/pkg/cmds/secrets/lib/DistroContext.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- + +from __future__ import annotations + +import re + +from pathlib import Path + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from typing import Iterable + from ....lib.FileContext import FileContext + from ...CmdDistro import CmdDistro + +from ....lib.log import * +from ....lib.util import run_cmd + +from .base import Attrs +from .FilesContext import FilesContext + +class DistroContext(FilesContext): + + def __init__(self, distro: Distro) -> None: + super().__init__(distro.ctx) + self.__distro = distro + + async def match_files(self, packages: Iterable[str], pattern: str) -> list[str]: + ret: list[str] = [] + for package_name in packages: + for path in await self.__distro.pkg_files(package_name): + if re.match(pattern, path): + ret.append(path) + return ret + + async def list_template_files(self, packages: Iterable[str]) -> list[str]: + return await self.match_files(packages, pattern=r'.*\.jw-tmpl$') + + async def list_secret_paths(self, packages: Iterable[str], ignore_missing: bool=False) -> list[str]: + ret = [] + for tmpl in await self.list_template_files(packages): + path = str(Path(tmpl).with_suffix(".jw-secret")) + if ignore_missing and not self.ctx.file_exists(path): + continue + ret.append(path) + return ret + + async def list_compilation_targets(self, packages: Iterable[str], ignore_missing: bool=False) -> list[str]: + ret = [] + for tmpl in await self.list_template_files(packages): + path = tmpl.removesuffix('.jw-tmpl') + if ignore_missing and not self.ctx.file_exists(path): + continue + ret.append(path) + return ret + + async def remove_compilation_targets(self, packages: Iterable[str]) -> list[str]: + for path in await self.list_compilation_targets(packages): + try: + self.ctx.stat(path) + log(NOTICE, f'Removing {path}') + await self.ctx.unlink(path) + except FileNotFoundError as e: + log(DEBUG, f'Compilation target {path} doesn\'t exist (ignored)') + continue + + async def compile_template_files(self, packages: Iterable[str], default_attrs: Attrs) -> list[str]: + missing = 0 + for target in await self.list_compilation_targets(packages): + if not await self.compile_template_file(target, default_attrs): + missing += 1 + if missing > 0: + log(WARNING, f'{missing} missing secrets found. You might want to add them and run sudo {app.cmdline} again') diff --git a/src/python/jw/pkg/cmds/secrets/lib/FilesContext.py b/src/python/jw/pkg/cmds/secrets/lib/FilesContext.py new file mode 100644 index 00000000..28bc2cc1 --- /dev/null +++ b/src/python/jw/pkg/cmds/secrets/lib/FilesContext.py @@ -0,0 +1,204 @@ +# -*- coding: utf-8 -*- + +from __future__ import annotations + +import re, stat, copy + +from contextlib import suppress +from pathlib import Path + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from typing import Iterable + from ....lib.FileContext import FileContext + +from ....lib.log import * +from ....lib.util import run_cmd + +from .base import Attrs + +class FilesContext: + + def __init__(self, ctx: FileContext) -> None: + self.__ctx = ctx + + @property + def ctx(self) -> FileContext: + return self.__ctx + + async def _read_key_value_file(self, path: str, throw=False) -> dict[str, str]: + ret: dict[str, str] = {} + try: + result = await self.ctx.get(path) + for line in result.stdout.decode().splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + if "=" not in line: + continue + key, val = line.split("=", 1) + key = key.strip() + val = val.strip() + if key: + ret[key] = val + except FileNotFoundError: + log(DEBUG, f'File not found {path}') + return ret + + def _parse_attributes(self, content: str) -> Attrs: + + if not content: + return None + + first_line = content.splitlines()[0] + if not re.match(r"^\s*#\s*conf\s*:", first_line): + return None + + ret = Attrs() + ret.conf = first_line + + m = re.match(r"^\s*#\s*conf\s*:\s*(.*?)\s*$", first_line) + if not m: + return ret + + for part in re.split(r'[; ,]', m.group(1)): + part = part.strip() + if not part or "=" not in part: + continue + + key, val = part.split("=", 1) + key = key.strip() + val = val.strip() + + if key == "owner": + ret.owner = val or None + elif key == "group": + ret.group = val or None + elif key == "mode": + if val: + try: + if re.fullmatch(r"0[0-7]+", val): + ret.mode = int(val, 8) + else: + ret.mode = int(val, 0) + except ValueError: + ret.mode = None + + return ret + + async def _read_attributes(self, paths: Iterable[str]) -> Attrs|None: + ret = Attrs() + for path in paths: + try: + result = await self.ctx.get(path) + lines = result.stdout.decode().splitlines() + if lines: + ret.update(self._parse_attributes(lines[0])) + except FileNotFoundError: + log(DEBUG, f'Can\'t parse "{path}" for attributes, file doesn\'t exist (ignored)') + return ret + + def _format_metadata(self, owner: str, group: str, mode: int) -> str: + return f"{owner}:{group} {mode:o}" + + async def _compile_one_template_file( + self, + src: str, + dst: str, + default_attrs: Attrs | None, + replace: dict[str, str] = {}, + ) -> None: + + owner = "root" + group = "root" + mode = 0o400 + + new_content = (await self.ctx.get(src)).stdout.decode() + + attrs = self._parse_attributes(new_content) + if attrs is None: + attrs = default_attrs + + if attrs is not None: + if attrs.owner is not None: + owner = attrs.owner + if attrs.group is not None: + group = attrs.group + if attrs.mode is not None: + mode = attrs.mode + + new_meta = self._format_metadata(owner, group, mode) + + for key, val in replace.items(): + new_content = new_content.replace(key, val) + + tmp_path: str|None = None + + try: + tmp_path = await self.ctx.mktemp(dst + '.jw-pkg.XXXXX') + await self.ctx.put(tmp_path, new_content.encode('utf-8'), owner=owner, group=group, mode=mode) + + content_changed = True + metadata_changed = True + old_meta = "" + + try: + st = await self.ctx.stat(dst) + except FileNotFoundError: + pass + else: + old_mode = stat.S_IMODE(st.mode) + old_meta = self._format_metadata(st.owner, st.group, old_mode) + old_content = (await self.ctx.get(dst)).stdout.decode() + + content_changed = old_content != new_content + metadata_changed = ( + st.owner != owner + or st.group != group + or old_mode != mode + ) + + changes = [] + if content_changed: + changes.append("@content") + if metadata_changed: + changes.append(f"@metadata ({old_meta} -> {new_meta})") + + details = ", ".join(changes) if changes else "no changes" + log(NOTICE, f"Applying macros in {src} to {dst}: {details}") + + if not changes: + await self.ctx.unlink(tmp_path) + tmp_path = None + return + + await self.ctx.rename(tmp_path, dst) + tmp_path = None + + finally: + if tmp_path is not None: + with suppress(FileNotFoundError): + await self.ctx.unlink(tmp_path) + + async def compile_template_file(self, target_path: str, default_attrs: Attrs|None=None) -> bool: + path_tmpl = target_path + '.jw-tmpl' + path_secret_file = target_path + '.jw-secret-file' + path_secret = target_path + '.jw-secret' + attrs = copy.deepcopy(default_attrs if default_attrs is not None else Attrs()) + attrs.update(await self._read_attributes([ + path_tmpl, + path_secret_file, + path_secret + ])) + replace = await self._read_key_value_file(path_secret) + for src in [ path_secret_file, path_tmpl ]: + try: + await self._compile_one_template_file(src=src, dst=target_path, default_attrs=attrs, replace=replace) + return True + except FileNotFoundError as e: + log(DEBUG, f'Compilation source {src} doesn\'t exist (ignored)') + continue + + log(WARNING, f'No secret found for target {target_path}, not compiling') + return False diff --git a/src/python/jw/pkg/cmds/secrets/lib/util.py b/src/python/jw/pkg/cmds/secrets/lib/util.py index 5fd4eac8..353838fe 100644 --- a/src/python/jw/pkg/cmds/secrets/lib/util.py +++ b/src/python/jw/pkg/cmds/secrets/lib/util.py @@ -1,281 +1,10 @@ # -*- coding: utf-8 -*- -from __future__ import annotations +from ....lib.ec.FileContext import FileContext +from ....lib.ec.Local import Local +from .FilesContext import FilesContext -import re, os, stat, pwd, grp, tempfile, copy - -from contextlib import suppress -from pathlib import Path -from dataclasses import dataclass - -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from typing import Iterable - from ....lib.Distro import Distro - from ...CmdDistro import CmdDistro - -from ....lib.log import * -from ....lib.util import run_cmd - -@dataclass -class Attrs: - - mode: int | None = None - owner: str | None = None - group: str | None = None - conf: str | None = None - - def update(self, rhs: Args|None) -> Args: - if rhs is not None: - if rhs.mode: - self.mode = rhs.mode - if rhs.owner: - self.owner = rhs.owner - if rhs.group: - self.group = rhs.group - if rhs.conf: - self.conf = rhs.conf - return self - - def emtpy(self) -> bool: - if self.mode is not None: - return False - if self.owner is not None: - return False - if self.group is not None: - return False - return True - -def _read_key_value_file(path: str, throw=False) -> dict[str, str]: - ret: dict[str, str] = {} - try: - with open(path, "r", encoding="utf-8") as f: - for line in f: - line = line.strip() - if not line or line.startswith("#"): - continue - if "=" not in line: - continue - key, val = line.split("=", 1) - key = key.strip() - val = val.strip() - if key: - ret[key] = val - except FileNotFoundError: - log(DEBUG, f'File not found {path}') - return ret - -def _parse_attributes(content: str) -> Attrs: - - first_line = content.splitlines()[0] if content else "" - if not re.match(r"^\s*#\s*conf\s*:", first_line): - return None - - ret = Attrs() - - ret.conf = first_line - - m = re.match(r"^\s*#\s*conf\s*:\s*(.*?)\s*$", first_line) - if not m: - return ret - - for part in re.split(r'[; ,]', m.group(1)): - part = part.strip() - if not part or "=" not in part: - continue - - key, val = part.split("=", 1) - key = key.strip() - val = val.strip() - - if key == "owner": - ret.owner = val or None - elif key == "group": - ret.group = val or None - elif key == "mode": - if val: - try: - if re.fullmatch(r"0[0-7]+", val): - ret.mode = int(val, 8) - else: - ret.mode = int(val, 0) - except ValueError: - ret.mode = None - - return ret - -def _read_attributes(paths: Iterable[str]) -> Attrs|None: - ret = Attrs() - for path in paths: - try: - with open(path, 'r') as f: - ret.update(_parse_attributes(f.read())) - except FileNotFoundError: - log(DEBUG, f'Can\'t parse "{path}" for attributes, file doesn\'t exist (ignored)') - return ret - -def _format_metadata(uid: int, gid: int, mode: int) -> str: - return f"{pwd.getpwuid(uid).pw_name}:{grp.getgrgid(gid).gr_name} {mode:o}" - -def _copy_securely( - src: str, - dst: str, - default_attrs: Attrs | None, - replace: dict[str, str] = {}, -) -> None: - - owner = "root" - group = "root" - mode = 0o400 - - with open(src, "r", encoding="utf-8", newline="") as f: - content = f.read() - - attrs = _parse_attributes(content) - if attrs is None: - attrs = default_attrs - - if attrs is not None: - if attrs.owner is not None: - owner = attrs.owner - if attrs.group is not None: - group = attrs.group - if attrs.mode is not None: - mode = attrs.mode - - new_uid = pwd.getpwnam(owner).pw_uid - new_gid = grp.getgrnam(group).gr_gid - new_meta = _format_metadata(new_uid, new_gid, mode) - - for key, val in replace.items(): - content = content.replace(key, val) - - dst_dir = os.path.dirname(os.path.abspath(dst)) - tmp_fd, tmp_path = tempfile.mkstemp( - prefix=f".{os.path.basename(dst)}.", - dir=dst_dir, - ) - - try: - os.fchown(tmp_fd, new_uid, new_gid) - os.fchmod(tmp_fd, mode) - - with os.fdopen(tmp_fd, "w", encoding="utf-8", newline="") as f: - tmp_fd = None - f.write(content) - f.flush() - os.fsync(f.fileno()) - - content_changed = True - metadata_changed = True - old_meta = "" - - try: - st = os.stat(dst) - except FileNotFoundError: - pass - else: - old_mode = stat.S_IMODE(st.st_mode) - old_meta = _format_metadata(st.st_uid, st.st_gid, old_mode) - - with open(dst, "r", encoding="utf-8", newline="") as f: - old_content = f.read() - - content_changed = old_content != content - metadata_changed = ( - st.st_uid != new_uid - or st.st_gid != new_gid - or old_mode != mode - ) - - changes = [] - if content_changed: - changes.append("@content") - if metadata_changed: - changes.append(f"@metadata ({old_meta} -> {new_meta})") - - details = ", ".join(changes) if changes else "no changes" - log(NOTICE, f"Applying macros in {src} to {dst}: {details}") - - if not changes: - os.unlink(tmp_path) - tmp_path = None - return - - os.replace(tmp_path, dst) - tmp_path = None - - dir_fd = os.open(dst_dir, os.O_DIRECTORY) - try: - os.fsync(dir_fd) - finally: - os.close(dir_fd) - - finally: - if tmp_fd is not None: - os.close(tmp_fd) - if tmp_path is not None: - with suppress(FileNotFoundError): - os.unlink(tmp_path) - -async def match_files(distro: Distro, packages: Iterable[str], pattern: str) -> list[str]: - ret: list[str] = [] - for package_name in packages: - for path in await distro.pkg_files(package_name): - if re.match(pattern, path): - ret.append(path) - return ret - -async def list_template_files(distro: Distro, packages: Iterable[str]) -> list[str]: - return await match_files(distro, packages, pattern=r'.*\.jw-tmpl$') - -async def list_secret_paths(distro: Distro, packages: Iterable[str], ignore_missing: bool=False) -> list[str]: - ret = [] - for tmpl in await list_template_files(distro, packages): - path = str(Path(tmpl).with_suffix(".jw-secret")) - if ignore_missing and not os.path.exists(path): - continue - ret.append(path) - return ret - -async def list_compilation_targets(distro: Distro, packages: Iterable[str], ignore_missing: bool=False) -> list[str]: - ret = [] - for tmpl in await list_template_files(distro, packages): - path = tmpl.removesuffix('.jw-tmpl') - if ignore_missing and not os.path.exists(path): - continue - ret.append(path) - return ret - -async def remove_compilation_targets(distro: Distro, packages: Iterable[str]) -> list[str]: - for path in await list_compilation_targets(distro, packages): - if os.path.exists(path): - log(NOTICE, f'Removing {path}') - os.unlink(path) - -async def compile_template_file(target_path: str, default_attrs: Attrs|None=None) -> bool: - path_tmpl = target_path + '.jw-tmpl' - path_secret_file = target_path + '.jw-secret-file' - path_secret = target_path + '.jw-secret' - attrs = copy.deepcopy(default_attrs if default_attrs is not None else Attrs()) - attrs.update(_read_attributes([ - path_tmpl, - path_secret_file, - path_secret - ])) - replace = _read_key_value_file(path_secret) - for src in [ path_secret_file, path_tmpl ]: - if os.path.exists(src): - _copy_securely(src=src, dst=target_path, default_attrs=attrs, replace=replace) - return True - log(WARNING, f'No secret found for target {target_path}, not compiling') - return False - -async def compile_template_files(distro: Distro, packages: Iterable[str], default_attrs: Attrs) -> list[str]: - missing = 0 - for target in await list_compilation_targets(distro, packages): - if not await compile_template_file(target, default_attrs): - missing += 1 - if missing > 0: - log(WARNING, f'{missing} missing secrets found. You might want to add them and run sudo {app.cmdline} again') +async def compile_template_file(target_path: str, default_attrs: Attrs|None=None, ctx: FileContext|None=None) -> bool: + if ctx is None: + ctx = Local() + FilesContext().compile_template_file(target_path, default_attrs=default_attrs)