diff --git a/tools/python/jwutils/Config.py b/tools/python/jwutils/Config.py new file mode 100644 index 0000000..6bd7669 --- /dev/null +++ b/tools/python/jwutils/Config.py @@ -0,0 +1,104 @@ +# -*- coding: utf-8 -*- + +import os +import re +import glob +import sys +from pathlib import Path, PosixPath +from typing import Optional, Dict, cast + +from jwutils import stree +from .stree.StringTree import StringTree +from .log import * + +class Config(): # export + + def __init__(self, search_dirs: Optional[list[str]]=None, glob_paths: Optional[list[str]]=None, + defaults: Dict[str, str]=None, tree: Optional[StringTree]=None, parent=None, + root_section=None) -> None: + + def __is_abs(path): + if path is None: + return False + if len(path) == 0: + return False + if path[0] != '/': + return False + return True + + self.__parent = parent + exe = Path(os.path.basename(sys.argv[0])).stem + if glob_paths is None: + glob_paths = [f'.{exe}', f'{exe}.conf'] + if search_dirs is None: + env_key = re.sub('[-.]', '_', exe) + search_dirs = os.getenv(env_key) + self.__conf = tree if tree else StringTree("", "") + for path in glob_paths: + dirs = search_dirs + if dirs is None: + dirs = [''] if __is_abs(path) else [ str(Path.home()), str(Path.cwd()) ] + for d in dirs: + g = d + '/' + path + slog(DEBUG, 'Looking for config "{}"'.format(g)) + for f in glob.glob(g): + slog(DEBUG, 'Reading config "{}"'.format(f)) + paths_buf = [] + tree = stree.read(f, paths_buf=paths_buf) + assert(len(paths_buf)) + for p in paths_buf: + st = os.stat(p) + if st.st_mode & 0o0077: + for item in tree.child_list(): + if item.content is None: + continue + if not re.search('password|secret', cast(str, item.content), flags=re.IGNORECASE): + continue + msg = "Config files define secret, but at least one has file permissions open for group or world" + slog(ERR, f'{msg}:') + for pp in paths_buf: + slog(ERR, f' {((os.stat(p).st_mode) & 0o7777):o} {pp}') + raise Exception(msg) + tree.dump(DEBUG, f) + self.__conf.add("", tree) + + if root_section is not None: + self.__conf = self.__conf.get(root_section) + if self.__conf is None: + self.__conf = StringTree("", "") + + if defaults is not None: + for key, val in defaults.items(): + if self.__conf.get(key) is None: + self.__conf.set(key, val) + + self.__conf.dump(DEBUG, "superposed configuration") + + def __getitem__(self, key: str) -> Optional[str]: + return self.get(key) + + @property + def parent(self): + return self.__parent + + def set(self, key: str, val): + self.__conf[key] = val + + def get(self, key: str, default = None) -> Optional[str]: + item = self.__conf.get(key) + if item: + return item.value() + return default + + # This is an alias for get() + def value(self, key: str, default = None) -> Optional[str]: + return self.get(key, default) + + def branch(self, path: str) -> Optional[StringTree]: + if self.__conf: + return Config(tree=self.__conf.get(path), parent=self) + return None + + def dump(self, prio: int, *args, **kwargs) -> None: + caller = get_caller_pos(1, kwargs) + self.__conf.dump(prio, caller=caller, *args, **kwargs) diff --git a/tools/python/jwutils/stree/serdes.py b/tools/python/jwutils/stree/serdes.py index f2ad994..777a008 100644 --- a/tools/python/jwutils/stree/serdes.py +++ b/tools/python/jwutils/stree/serdes.py @@ -68,9 +68,12 @@ def parse(s: str, allow_full_lines: bool=True, root_content: str='root') -> Stri root.add(sec + '.' + cleanup_string(lhs), cleanup_string(rhs), split=split) return root -def _read_lines(path: str, throw=True): +def _read_lines(path: str, throw=True, level=0, log_prio=INFO, paths_buf=None): try: with open(path, 'r') as infile: + slog(log_prio, 'Reading {}"{}"'.format(' ' * level * 2, path)) + if paths_buf is not None: + paths_buf.append(path) ret = [] for line in infile: # lines are all trailed by \n m = re.search(r'^\s*(-)*include\s+(\S+)', line) @@ -81,7 +84,7 @@ def _read_lines(path: str, throw=True): dir_name = os.path.dirname(path) if len(dir_name): include_path = dir_name + '/' + include_path - include_lines = _read_lines(include_path, throw=(not optional)) + include_lines = _read_lines(include_path, throw=(not optional), level=level+1, paths_buf=paths_buf) if include_lines is None: msg = f'{path}: Failed to process "{line}"' slog(DEBUG, line) @@ -97,7 +100,7 @@ def _read_lines(path: str, throw=True): slog(DEBUG, msg) return None -def read(path: str, root_content: str='root') -> StringTree: # export - lines = _read_lines(path) +def read(path: str, root_content: str='root', log_prio=INFO, paths_buf=None) -> StringTree: # export + lines = _read_lines(path, log_prio=log_prio, paths_buf=paths_buf) s = ''.join(lines) return parse(s, root_content=root_content)