mirror of
ssh://git.janware.com/srv/git/janware/proj/jw-python
synced 2026-01-15 09:53:32 +01:00
Add class Config
Signed-off-by: Jan Lindemann <jan@janware.com>
This commit is contained in:
parent
224a16ec71
commit
84a4053157
2 changed files with 111 additions and 4 deletions
104
tools/python/jwutils/Config.py
Normal file
104
tools/python/jwutils/Config.py
Normal file
|
|
@ -0,0 +1,104 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import glob
|
||||||
|
import sys
|
||||||
|
from pathlib import Path, PosixPath
|
||||||
|
from typing import Optional, Dict, cast
|
||||||
|
|
||||||
|
from jwutils import stree
|
||||||
|
from .stree.StringTree import StringTree
|
||||||
|
from .log import *
|
||||||
|
|
||||||
|
class Config(): # export
|
||||||
|
|
||||||
|
def __init__(self, search_dirs: Optional[list[str]]=None, glob_paths: Optional[list[str]]=None,
|
||||||
|
defaults: Dict[str, str]=None, tree: Optional[StringTree]=None, parent=None,
|
||||||
|
root_section=None) -> None:
|
||||||
|
|
||||||
|
def __is_abs(path):
|
||||||
|
if path is None:
|
||||||
|
return False
|
||||||
|
if len(path) == 0:
|
||||||
|
return False
|
||||||
|
if path[0] != '/':
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
self.__parent = parent
|
||||||
|
exe = Path(os.path.basename(sys.argv[0])).stem
|
||||||
|
if glob_paths is None:
|
||||||
|
glob_paths = [f'.{exe}', f'{exe}.conf']
|
||||||
|
if search_dirs is None:
|
||||||
|
env_key = re.sub('[-.]', '_', exe)
|
||||||
|
search_dirs = os.getenv(env_key)
|
||||||
|
self.__conf = tree if tree else StringTree("", "")
|
||||||
|
for path in glob_paths:
|
||||||
|
dirs = search_dirs
|
||||||
|
if dirs is None:
|
||||||
|
dirs = [''] if __is_abs(path) else [ str(Path.home()), str(Path.cwd()) ]
|
||||||
|
for d in dirs:
|
||||||
|
g = d + '/' + path
|
||||||
|
slog(DEBUG, 'Looking for config "{}"'.format(g))
|
||||||
|
for f in glob.glob(g):
|
||||||
|
slog(DEBUG, 'Reading config "{}"'.format(f))
|
||||||
|
paths_buf = []
|
||||||
|
tree = stree.read(f, paths_buf=paths_buf)
|
||||||
|
assert(len(paths_buf))
|
||||||
|
for p in paths_buf:
|
||||||
|
st = os.stat(p)
|
||||||
|
if st.st_mode & 0o0077:
|
||||||
|
for item in tree.child_list():
|
||||||
|
if item.content is None:
|
||||||
|
continue
|
||||||
|
if not re.search('password|secret', cast(str, item.content), flags=re.IGNORECASE):
|
||||||
|
continue
|
||||||
|
msg = "Config files define secret, but at least one has file permissions open for group or world"
|
||||||
|
slog(ERR, f'{msg}:')
|
||||||
|
for pp in paths_buf:
|
||||||
|
slog(ERR, f' {((os.stat(p).st_mode) & 0o7777):o} {pp}')
|
||||||
|
raise Exception(msg)
|
||||||
|
tree.dump(DEBUG, f)
|
||||||
|
self.__conf.add("", tree)
|
||||||
|
|
||||||
|
if root_section is not None:
|
||||||
|
self.__conf = self.__conf.get(root_section)
|
||||||
|
if self.__conf is None:
|
||||||
|
self.__conf = StringTree("", "")
|
||||||
|
|
||||||
|
if defaults is not None:
|
||||||
|
for key, val in defaults.items():
|
||||||
|
if self.__conf.get(key) is None:
|
||||||
|
self.__conf.set(key, val)
|
||||||
|
|
||||||
|
self.__conf.dump(DEBUG, "superposed configuration")
|
||||||
|
|
||||||
|
def __getitem__(self, key: str) -> Optional[str]:
|
||||||
|
return self.get(key)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def parent(self):
|
||||||
|
return self.__parent
|
||||||
|
|
||||||
|
def set(self, key: str, val):
|
||||||
|
self.__conf[key] = val
|
||||||
|
|
||||||
|
def get(self, key: str, default = None) -> Optional[str]:
|
||||||
|
item = self.__conf.get(key)
|
||||||
|
if item:
|
||||||
|
return item.value()
|
||||||
|
return default
|
||||||
|
|
||||||
|
# This is an alias for get()
|
||||||
|
def value(self, key: str, default = None) -> Optional[str]:
|
||||||
|
return self.get(key, default)
|
||||||
|
|
||||||
|
def branch(self, path: str) -> Optional[StringTree]:
|
||||||
|
if self.__conf:
|
||||||
|
return Config(tree=self.__conf.get(path), parent=self)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def dump(self, prio: int, *args, **kwargs) -> None:
|
||||||
|
caller = get_caller_pos(1, kwargs)
|
||||||
|
self.__conf.dump(prio, caller=caller, *args, **kwargs)
|
||||||
|
|
@ -68,9 +68,12 @@ def parse(s: str, allow_full_lines: bool=True, root_content: str='root') -> Stri
|
||||||
root.add(sec + '.' + cleanup_string(lhs), cleanup_string(rhs), split=split)
|
root.add(sec + '.' + cleanup_string(lhs), cleanup_string(rhs), split=split)
|
||||||
return root
|
return root
|
||||||
|
|
||||||
def _read_lines(path: str, throw=True):
|
def _read_lines(path: str, throw=True, level=0, log_prio=INFO, paths_buf=None):
|
||||||
try:
|
try:
|
||||||
with open(path, 'r') as infile:
|
with open(path, 'r') as infile:
|
||||||
|
slog(log_prio, 'Reading {}"{}"'.format(' ' * level * 2, path))
|
||||||
|
if paths_buf is not None:
|
||||||
|
paths_buf.append(path)
|
||||||
ret = []
|
ret = []
|
||||||
for line in infile: # lines are all trailed by \n
|
for line in infile: # lines are all trailed by \n
|
||||||
m = re.search(r'^\s*(-)*include\s+(\S+)', line)
|
m = re.search(r'^\s*(-)*include\s+(\S+)', line)
|
||||||
|
|
@ -81,7 +84,7 @@ def _read_lines(path: str, throw=True):
|
||||||
dir_name = os.path.dirname(path)
|
dir_name = os.path.dirname(path)
|
||||||
if len(dir_name):
|
if len(dir_name):
|
||||||
include_path = dir_name + '/' + include_path
|
include_path = dir_name + '/' + include_path
|
||||||
include_lines = _read_lines(include_path, throw=(not optional))
|
include_lines = _read_lines(include_path, throw=(not optional), level=level+1, paths_buf=paths_buf)
|
||||||
if include_lines is None:
|
if include_lines is None:
|
||||||
msg = f'{path}: Failed to process "{line}"'
|
msg = f'{path}: Failed to process "{line}"'
|
||||||
slog(DEBUG, line)
|
slog(DEBUG, line)
|
||||||
|
|
@ -97,7 +100,7 @@ def _read_lines(path: str, throw=True):
|
||||||
slog(DEBUG, msg)
|
slog(DEBUG, msg)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def read(path: str, root_content: str='root') -> StringTree: # export
|
def read(path: str, root_content: str='root', log_prio=INFO, paths_buf=None) -> StringTree: # export
|
||||||
lines = _read_lines(path)
|
lines = _read_lines(path, log_prio=log_prio, paths_buf=paths_buf)
|
||||||
s = ''.join(lines)
|
s = ''.join(lines)
|
||||||
return parse(s, root_content=root_content)
|
return parse(s, root_content=root_content)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue