mirror of
ssh://git.janware.com/srv/git/janware/proj/jw-python
synced 2026-01-15 18:03:31 +01:00
160 lines
5.6 KiB
Python
160 lines
5.6 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import os
|
|
import re
|
|
import glob
|
|
import sys
|
|
from pathlib import Path, PosixPath
|
|
from typing import Optional, Dict, cast
|
|
|
|
from jwutils import stree
|
|
from .stree.StringTree import StringTree
|
|
from .log import *
|
|
|
|
class Config(): # export
|
|
|
|
def __load(self, search_dirs, glob_paths, refuse_mode_mask):
|
|
|
|
def __is_abs(path):
|
|
if path is None:
|
|
return False
|
|
if len(path) == 0:
|
|
return False
|
|
if path[0] != '/':
|
|
return False
|
|
return True
|
|
|
|
ret = StringTree("", "")
|
|
exe = Path(os.path.basename(sys.argv[0])).stem
|
|
if glob_paths is None:
|
|
glob_paths = [f'.{exe}', f'{exe}.conf']
|
|
elif isinstance(glob_paths, str):
|
|
glob_paths = [glob_paths]
|
|
if search_dirs is None:
|
|
env_key = re.sub('[-.]', '_', exe)
|
|
search_dirs = os.getenv(env_key)
|
|
for path in glob_paths:
|
|
dirs = search_dirs
|
|
if dirs is None:
|
|
dirs = [''] if __is_abs(path) else [ str(Path.home()), str(Path.cwd()) ]
|
|
for d in dirs:
|
|
g = d + '/' + path if len(d) else path
|
|
slog(DEBUG, 'Looking for config "{}"'.format(g))
|
|
for f in glob.glob(g):
|
|
slog(DEBUG, 'Reading config "{}"'.format(f))
|
|
paths_buf = []
|
|
tree = stree.read(f, paths_buf=paths_buf)
|
|
assert(len(paths_buf))
|
|
if refuse_mode_mask is not None:
|
|
for p in paths_buf:
|
|
st = os.stat(p)
|
|
if st.st_mode & refuse_mode_mask:
|
|
for item in tree.child_list():
|
|
if item.content is None:
|
|
continue
|
|
if not re.search('password|secret', cast(str, item.content), flags=re.IGNORECASE):
|
|
continue
|
|
msg = "Config files define secret, but at least one has file permissions open for world"
|
|
slog(ERR, f'{msg}:')
|
|
for pp in paths_buf:
|
|
slog(ERR, f' {((os.stat(pp).st_mode) & 0o7777):o} {pp}')
|
|
raise Exception(msg)
|
|
tree.dump(DEBUG, f)
|
|
ret.add("", tree)
|
|
return ret
|
|
|
|
def __init__(self,
|
|
search_dirs: Optional[list[str]]=None,
|
|
glob_paths: Optional[list[str]]=None,
|
|
glob_paths_env_key: Optional[str]=None,
|
|
defaults: Optional[Dict[str, str]]=None,
|
|
tree: Optional[StringTree]=None,
|
|
parent=None,
|
|
root_section=None,
|
|
refuse_mode_mask=0o0027
|
|
) -> None:
|
|
|
|
self.__parent = parent
|
|
|
|
if tree is not None:
|
|
assert(search_dirs is None)
|
|
assert(glob_paths is None)
|
|
assert(glob_paths_env_key is None)
|
|
self.__conf = tree
|
|
else:
|
|
assert(tree is None)
|
|
if glob_paths_env_key is not None:
|
|
glob_paths_env = os.getenv(glob_paths_env_key)
|
|
if glob_paths_env is not None:
|
|
if glob_paths is None:
|
|
glob_paths = []
|
|
glob_paths.extend(glob_paths_env.split(':'))
|
|
|
|
self.__conf = self.__load(search_dirs=search_dirs, glob_paths=glob_paths,
|
|
refuse_mode_mask=refuse_mode_mask)
|
|
|
|
if root_section is not None:
|
|
tmp = self.__conf.get(root_section)
|
|
if tmp is None:
|
|
tmp = StringTree("", "")
|
|
self.__conf = tmp
|
|
|
|
if defaults is not None:
|
|
for key, val in defaults.items():
|
|
if self.__conf.get(key) is None:
|
|
self.__conf[key] = val
|
|
|
|
self.__conf.dump(DEBUG, "superposed configuration")
|
|
|
|
def __getitem__(self, key: str) -> Optional[str]:
|
|
return self.get(key)
|
|
|
|
def __setitem__(self, key: str, value: str):
|
|
return self.set(key, value)
|
|
|
|
@property
|
|
def parent(self):
|
|
return self.__parent
|
|
|
|
@property
|
|
def root(self):
|
|
if self.__parent is None:
|
|
return self
|
|
return self.__parent.root
|
|
|
|
def set(self, key: str, val):
|
|
self.__conf[key] = val
|
|
|
|
def get(self, key: str, default = None) -> Optional[str]:
|
|
item = self.__conf.get(key)
|
|
if item:
|
|
return item.value()
|
|
return default
|
|
|
|
def entries(self, key: str) -> list[str]:
|
|
item = self.__conf.get(key)
|
|
if item is None:
|
|
return []
|
|
return [name for name, child in item.children.items()]
|
|
|
|
# This is an alias for get()
|
|
def value(self, key: str, default = None) -> Optional[str]:
|
|
return self.get(key, default)
|
|
|
|
def branch(self, path: str): # type: ignore # Optional[Config]: FIXME: Don't know how to get hold of this type here
|
|
if self.__conf:
|
|
tree = self.__conf.get(path)
|
|
if tree is None:
|
|
msg = f'Tried to get non-existent branch "{path}" from config'
|
|
self.dump(ERR, msg)
|
|
throw(msg)
|
|
return Config(tree=tree, parent=self) # type: ignore
|
|
return None
|
|
|
|
def dump(self, prio: int, *args, **kwargs) -> None:
|
|
caller = get_caller_pos(1, kwargs)
|
|
self.__conf.dump(prio, caller=caller, *args, **kwargs)
|
|
|
|
@property
|
|
def name(self):
|
|
return self.__conf.content
|