mirror of
ssh://git.janware.com/janware/proj/jw-pkg
synced 2026-04-24 17:23:36 +02:00
cmds.CmdSecrets: Add command class + subcommands
jw-pkg.py secrets [sub-command] [packages] is a set of utility
commands designed to manage configuration files containing secrets.
To keep secrets from leaking via version control or packages, a
_template_ should be packaged for every sensitive configuration file.
Then, during post-install, configuration files can be generated from
packaged templates via
jw-pkg.py secrets compile-templates <package> <package> ...
During post-uninstall
jw-pkg.py secrets rm-compilation-output <package> <package> ...
removes them.
Not specifying any packages will compile or remove all templates on
the system.
To identify which files to consider and generate or remove, the
compilation scans <package> for files ending in .jw-tmpl. For each
match, e.g.
/path/to/some.conf.jw-tmpl
it will read key-value pairs from
/path/to/some.conf.jw-secret
and generate
/path/to/some.conf
from it, replacing all keys by their respective values. The file
attributes of the generated file can be determined by the first line:
of some.conf.jw-tmpl or some.conf.jw-secret:
# conf: owner=mysql; group=mysql; mode=0640
There are other commands for managing all secrets on the system at
once, see jw-pkg.py secrets --help:
compile-templates Compile package template files
list-compilation-output
List package compilation output files
list-secrets List package secret files
list-templates List package template files
rm-compilation-output
Remove package compilation output files
Signed-off-by: Jan Lindemann <jan@janware.com>
This commit is contained in:
parent
18c16917b2
commit
18de6f2cf2
9 changed files with 352 additions and 0 deletions
15
src/python/jw/pkg/cmds/CmdSecrets.py
Normal file
15
src/python/jw/pkg/cmds/CmdSecrets.py
Normal file
|
|
@ -0,0 +1,15 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
from argparse import ArgumentParser
|
||||||
|
|
||||||
|
from ..App import App
|
||||||
|
from .Cmd import Cmd as CmdBase
|
||||||
|
|
||||||
|
class CmdSecrets(CmdBase): # export
|
||||||
|
|
||||||
|
def __init__(self, parent: App) -> None:
|
||||||
|
super().__init__(parent, 'secrets', help="Manage package secrets")
|
||||||
|
self._add_subcommands()
|
||||||
|
|
||||||
|
def add_arguments(self, p: ArgumentParser) -> None:
|
||||||
|
super().add_arguments(p)
|
||||||
|
|
@ -1,2 +1,4 @@
|
||||||
from .CmdProjects import CmdProjects
|
from .CmdProjects import CmdProjects
|
||||||
from .CmdDistro import CmdDistro
|
from .CmdDistro import CmdDistro
|
||||||
|
from .CmdSecrets import CmdSecrets
|
||||||
|
from .CmdDistro import CmdDistro
|
||||||
|
|
|
||||||
241
src/python/jw/pkg/cmds/secrets/Cmd.py
Normal file
241
src/python/jw/pkg/cmds/secrets/Cmd.py
Normal file
|
|
@ -0,0 +1,241 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import re, os, stat, pwd, grp, tempfile
|
||||||
|
|
||||||
|
from contextlib import suppress
|
||||||
|
from pathlib import Path
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from typing import Iterable
|
||||||
|
from ...lib.Distro import Distro
|
||||||
|
from ..CmdDistro import CmdDistro
|
||||||
|
|
||||||
|
from ...lib.log import *
|
||||||
|
from ..Cmd import Cmd as Base
|
||||||
|
|
||||||
|
class Cmd(Base): # export
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Attrs:
|
||||||
|
mode: int | None = None
|
||||||
|
owner: str | None = None
|
||||||
|
group: str | None = None
|
||||||
|
conf: str | None = None
|
||||||
|
|
||||||
|
def __read_key_value_file(self, path: str) -> dict[str, str]:
|
||||||
|
ret: dict[str, str] = {}
|
||||||
|
with open(path, "r", encoding="utf-8") as f:
|
||||||
|
for line in f:
|
||||||
|
line = line.strip()
|
||||||
|
if not line or line.startswith("#"):
|
||||||
|
continue
|
||||||
|
if "=" not in line:
|
||||||
|
continue
|
||||||
|
key, val = line.split("=", 1)
|
||||||
|
key = key.strip()
|
||||||
|
val = val.strip()
|
||||||
|
if key:
|
||||||
|
ret[key] = val
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def __parse_attributes(self, content: str) -> Attrs:
|
||||||
|
|
||||||
|
first_line = content.splitlines()[0] if content else ""
|
||||||
|
if not re.match(r"^\s*#\s*conf\s*:", first_line):
|
||||||
|
return None
|
||||||
|
|
||||||
|
ret = Attrs()
|
||||||
|
|
||||||
|
ret.conf = first_line
|
||||||
|
|
||||||
|
m = re.match(r"^\s*#\s*conf\s*:\s*(.*?)\s*$", first_line)
|
||||||
|
if not m:
|
||||||
|
return ret
|
||||||
|
|
||||||
|
for part in m.group(1).split(";"):
|
||||||
|
part = part.strip()
|
||||||
|
if not part or "=" not in part:
|
||||||
|
continue
|
||||||
|
|
||||||
|
key, val = part.split("=", 1)
|
||||||
|
key = key.strip()
|
||||||
|
val = val.strip()
|
||||||
|
|
||||||
|
if key == "owner":
|
||||||
|
ret.owner = val or None
|
||||||
|
elif key == "group":
|
||||||
|
ret.group = val or None
|
||||||
|
elif key == "mode":
|
||||||
|
if val:
|
||||||
|
try:
|
||||||
|
if re.fullmatch(r"0[0-7]+", val):
|
||||||
|
ret.mode = int(val, 8)
|
||||||
|
else:
|
||||||
|
ret.mode = int(val, 0)
|
||||||
|
except ValueError:
|
||||||
|
ret.mode = None
|
||||||
|
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def __read_attributes(self, paths: Iterable[str]) -> Attrs|None:
|
||||||
|
for path in paths:
|
||||||
|
try:
|
||||||
|
with open(path, 'r') as f:
|
||||||
|
return self.__parse_attributes(f.read(path))
|
||||||
|
except:
|
||||||
|
continue
|
||||||
|
|
||||||
|
def __format_metadata(self, uid: int, gid: int, mode: int) -> str:
|
||||||
|
return f"{pwd.getpwuid(uid).pw_name}:{grp.getgrgid(gid).gr_name} {mode:o}"
|
||||||
|
|
||||||
|
def __copy_securely(
|
||||||
|
self,
|
||||||
|
src: str,
|
||||||
|
dst: str,
|
||||||
|
default_attrs: Attrs | None,
|
||||||
|
replace: dict[str, str] = [],
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
owner = "root"
|
||||||
|
group = "root"
|
||||||
|
mode = 0o400
|
||||||
|
|
||||||
|
with open(src, "r", encoding="utf-8", newline="") as f:
|
||||||
|
content = f.read()
|
||||||
|
|
||||||
|
attrs = self.__parse_attributes(content)
|
||||||
|
if attrs is None:
|
||||||
|
attrs = default_attrs
|
||||||
|
|
||||||
|
if attrs is not None:
|
||||||
|
if attrs.owner is not None:
|
||||||
|
owner = attrs.owner
|
||||||
|
if attrs.group is not None:
|
||||||
|
group = attrs.group
|
||||||
|
if attrs.mode is not None:
|
||||||
|
mode = attrs.mode
|
||||||
|
|
||||||
|
new_uid = pwd.getpwnam(owner).pw_uid
|
||||||
|
new_gid = grp.getgrnam(group).gr_gid
|
||||||
|
new_meta = self.__format_metadata(new_uid, new_gid, mode)
|
||||||
|
|
||||||
|
for key, val in replace.items():
|
||||||
|
content = content.replace(key, val)
|
||||||
|
|
||||||
|
dst_dir = os.path.dirname(os.path.abspath(dst))
|
||||||
|
tmp_fd, tmp_path = tempfile.mkstemp(
|
||||||
|
prefix=f".{os.path.basename(dst)}.",
|
||||||
|
dir=dst_dir,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
os.fchown(tmp_fd, new_uid, new_gid)
|
||||||
|
os.fchmod(tmp_fd, mode)
|
||||||
|
|
||||||
|
with os.fdopen(tmp_fd, "w", encoding="utf-8", newline="") as f:
|
||||||
|
tmp_fd = None
|
||||||
|
f.write(content)
|
||||||
|
f.flush()
|
||||||
|
os.fsync(f.fileno())
|
||||||
|
|
||||||
|
content_changed = True
|
||||||
|
metadata_changed = True
|
||||||
|
old_meta = "<missing>"
|
||||||
|
|
||||||
|
try:
|
||||||
|
st = os.stat(dst)
|
||||||
|
except FileNotFoundError:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
old_mode = stat.S_IMODE(st.st_mode)
|
||||||
|
old_meta = self.__format_metadata(st.st_uid, st.st_gid, old_mode)
|
||||||
|
|
||||||
|
with open(dst, "r", encoding="utf-8", newline="") as f:
|
||||||
|
old_content = f.read()
|
||||||
|
|
||||||
|
content_changed = old_content != content
|
||||||
|
metadata_changed = (
|
||||||
|
st.st_uid != new_uid
|
||||||
|
or st.st_gid != new_gid
|
||||||
|
or old_mode != mode
|
||||||
|
)
|
||||||
|
|
||||||
|
changes = []
|
||||||
|
if content_changed:
|
||||||
|
changes.append("@content")
|
||||||
|
if metadata_changed:
|
||||||
|
changes.append(f"@metadata ({old_meta} -> {new_meta})")
|
||||||
|
|
||||||
|
details = ", ".join(changes) if changes else "no changes"
|
||||||
|
log(NOTICE, f"Applying macros in {src} to {dst}: {details}")
|
||||||
|
|
||||||
|
if not changes:
|
||||||
|
os.unlink(tmp_path)
|
||||||
|
tmp_path = None
|
||||||
|
return
|
||||||
|
|
||||||
|
os.replace(tmp_path, dst)
|
||||||
|
tmp_path = None
|
||||||
|
|
||||||
|
dir_fd = os.open(dst_dir, os.O_DIRECTORY)
|
||||||
|
try:
|
||||||
|
os.fsync(dir_fd)
|
||||||
|
finally:
|
||||||
|
os.close(dir_fd)
|
||||||
|
|
||||||
|
finally:
|
||||||
|
if tmp_fd is not None:
|
||||||
|
os.close(tmp_fd)
|
||||||
|
if tmp_path is not None:
|
||||||
|
with suppress(FileNotFoundError):
|
||||||
|
os.unlink(tmp_path)
|
||||||
|
|
||||||
|
async def _match_files(self, packages: Iterable[str], pattern: str) -> list[str]:
|
||||||
|
ret: list[str] = []
|
||||||
|
for package_name in packages:
|
||||||
|
for path in await self.distro.pkg_files(package_name):
|
||||||
|
if re.match(pattern, path):
|
||||||
|
ret.append(path)
|
||||||
|
return ret
|
||||||
|
|
||||||
|
async def _list_template_files(self, packages: Iterable[str]) -> list[str]:
|
||||||
|
return await self._match_files(packages, pattern=r'.*\.jw-tmpl$')
|
||||||
|
|
||||||
|
async def _list_secret_paths(self, packages: Iterable[str]) -> list[str]:
|
||||||
|
return [str(Path(f).with_suffix(".jw-secret")) for f in await self._list_template_files(packages)]
|
||||||
|
|
||||||
|
async def _list_compilation_targets(self, packages: Iterable[str]) -> list[str]:
|
||||||
|
return [f.removesuffix('.jw-tmpl') for f in await self._list_template_files(packages)]
|
||||||
|
|
||||||
|
async def _remove_compilation_targets(self, packages: Iterable[str]) -> list[str]:
|
||||||
|
for path in await self._list_compilation_targets(packages):
|
||||||
|
if os.path.exists(path):
|
||||||
|
log(NOTICE, f'Removing {path}')
|
||||||
|
os.unlink(path)
|
||||||
|
|
||||||
|
async def _compile_template_files(self, packages: Iterable[str], attrs: str|Attrs=None) -> list[str]:
|
||||||
|
for target in await self._list_compilation_targets(packages):
|
||||||
|
default_attrs = self.__read_attributes([target + '.jw-tmpl'])
|
||||||
|
if default_attrs is None:
|
||||||
|
default_attrs = attrs
|
||||||
|
secret = target + '.jw-secret'
|
||||||
|
replace = [] if not os.path.exists(secret) else self.__read_key_value_file(secret)
|
||||||
|
for ext in [ '.jw-secret-file', '.jw-tmpl' ]:
|
||||||
|
src = target + ext
|
||||||
|
if os.path.exists(src):
|
||||||
|
self.__copy_securely(src=src, dst=target, default_attrs=default_attrs, replace=replace)
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
log(WARNING, f'No secret found for target {target}, not compiling')
|
||||||
|
|
||||||
|
def __init__(self, parent: CmdDistro, name: str, help: str) -> None:
|
||||||
|
super().__init__(parent, name, help)
|
||||||
|
|
||||||
|
def add_arguments(self, parser: ArgumentParser) -> None:
|
||||||
|
super().add_arguments(parser)
|
||||||
|
parser.add_argument("packages", nargs='*', help="Package names")
|
||||||
18
src/python/jw/pkg/cmds/secrets/CmdCompileTemplates.py
Normal file
18
src/python/jw/pkg/cmds/secrets/CmdCompileTemplates.py
Normal file
|
|
@ -0,0 +1,18 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
from .Cmd import Cmd
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from ..CmdSecrets import CmdSecrets
|
||||||
|
from argparse import Namespace, ArgumentParser
|
||||||
|
|
||||||
|
class CmdCompileTemplates(Cmd): # export
|
||||||
|
|
||||||
|
def __init__(self, parent: CmdSecrets) -> None:
|
||||||
|
super().__init__(parent, 'compile-templates', help="Compile package template files")
|
||||||
|
|
||||||
|
async def _run(self, args: Namespace) -> None:
|
||||||
|
await self._compile_template_files(args.packages)
|
||||||
18
src/python/jw/pkg/cmds/secrets/CmdListCompilationOutput.py
Normal file
18
src/python/jw/pkg/cmds/secrets/CmdListCompilationOutput.py
Normal file
|
|
@ -0,0 +1,18 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
from .Cmd import Cmd
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from ..CmdSecrets import CmdSecrets
|
||||||
|
from argparse import Namespace, ArgumentParser
|
||||||
|
|
||||||
|
class CmdListCompilationOutput(Cmd): # export
|
||||||
|
|
||||||
|
def __init__(self, parent: CmdSecrets) -> None:
|
||||||
|
super().__init__(parent, 'list-compilation-output', help="List package compilation output files")
|
||||||
|
|
||||||
|
async def _run(self, args: Namespace) -> None:
|
||||||
|
await _remove_compilation_targets(args.packages)
|
||||||
18
src/python/jw/pkg/cmds/secrets/CmdListSecrets.py
Normal file
18
src/python/jw/pkg/cmds/secrets/CmdListSecrets.py
Normal file
|
|
@ -0,0 +1,18 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
from .Cmd import Cmd
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from ..CmdSecrets import CmdSecrets
|
||||||
|
from argparse import Namespace, ArgumentParser
|
||||||
|
|
||||||
|
class CmdListSecrets(Cmd): # export
|
||||||
|
|
||||||
|
def __init__(self, parent: CmdSecrets) -> None:
|
||||||
|
super().__init__(parent, 'list-secrets', help="List package secret files")
|
||||||
|
|
||||||
|
async def _run(self, args: Namespace) -> None:
|
||||||
|
print('\n'.join(await self._list_secret_paths(args.packages)))
|
||||||
18
src/python/jw/pkg/cmds/secrets/CmdListTemplates.py
Normal file
18
src/python/jw/pkg/cmds/secrets/CmdListTemplates.py
Normal file
|
|
@ -0,0 +1,18 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
from .Cmd import Cmd
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from ..CmdSecrets import CmdSecrets
|
||||||
|
from argparse import Namespace, ArgumentParser
|
||||||
|
|
||||||
|
class CmdListTemplates(Cmd): # export
|
||||||
|
|
||||||
|
def __init__(self, parent: CmdSecrets) -> None:
|
||||||
|
super().__init__(parent, 'list-templates', help="List package template files")
|
||||||
|
|
||||||
|
async def _run(self, args: Namespace) -> None:
|
||||||
|
print('\n'.join(await self._list_template_files(args.packages)))
|
||||||
18
src/python/jw/pkg/cmds/secrets/CmdRmCompilationOutput.py
Normal file
18
src/python/jw/pkg/cmds/secrets/CmdRmCompilationOutput.py
Normal file
|
|
@ -0,0 +1,18 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
from .Cmd import Cmd
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from ..CmdSecrets import CmdSecrets
|
||||||
|
from argparse import Namespace, ArgumentParser
|
||||||
|
|
||||||
|
class CmdRmCompilationOutput(Cmd): # export
|
||||||
|
|
||||||
|
def __init__(self, parent: CmdSecrets) -> None:
|
||||||
|
super().__init__(parent, 'rm-compilation-output', help="Remove package compilation output files")
|
||||||
|
|
||||||
|
async def _run(self, args: Namespace) -> None:
|
||||||
|
await self._remove_compilation_targets(args.packages)
|
||||||
4
src/python/jw/pkg/cmds/secrets/Makefile
Normal file
4
src/python/jw/pkg/cmds/secrets/Makefile
Normal file
|
|
@ -0,0 +1,4 @@
|
||||||
|
TOPDIR = ../../../../../..
|
||||||
|
|
||||||
|
include $(TOPDIR)/make/proj.mk
|
||||||
|
include $(JWBDIR)/make/py-mod.mk
|
||||||
Loading…
Add table
Add a link
Reference in a new issue