mirror of
ssh://git.janware.com/srv/git/janware/proj/jw-python
synced 2026-01-15 09:53:32 +01:00
Add load_class() to complement load_object(), returning the class as opposed to instantiated object. Signed-off-by: Jan Lindemann <jan@janware.com>
128 lines
4.3 KiB
Python
128 lines
4.3 KiB
Python
import os, errno
|
|
import atexit
|
|
import tempfile
|
|
import filecmp
|
|
import inspect
|
|
import importlib
|
|
import re
|
|
|
|
from typing import Set, Iterable
|
|
|
|
from jwutils import log
|
|
|
|
_tmpfiles: Set[str] = set()
|
|
|
|
def _cleanup():
|
|
for f in _tmpfiles:
|
|
silentremove(f)
|
|
|
|
def silentremove(filename): #export
|
|
try:
|
|
os.remove(filename)
|
|
except OSError as e:
|
|
if e.errno != errno.ENOENT:
|
|
raise # re-raise exception if a different error occurred
|
|
|
|
def pad(token: str, total_size: int, right_align: bool = False) -> str:
|
|
add = total_size - len(token)
|
|
if add <= 0:
|
|
return token
|
|
space = ' ' * add
|
|
if right_align:
|
|
return space + token
|
|
return token + space
|
|
|
|
def atomic_store(contents, path): # export
|
|
if path[0:3] == '/dev':
|
|
with open(path, 'w') as outfile:
|
|
outfile.write(contents)
|
|
return
|
|
outfile = tempfile.NamedTemporaryFile(prefix=os.path.basename(path), delete=False, dir=os.path.dirname(path))
|
|
name = outfile.name
|
|
_tmpfiles.add(name)
|
|
outfile.write(contents)
|
|
outfile.close()
|
|
os.rename(name, path)
|
|
_tmpfiles.remove(name)
|
|
|
|
# see https://stackoverflow.com/questions/2020014
|
|
def object_builtin_name(o, full=True): # export
|
|
#if not full:
|
|
# return o.__class__.__name__
|
|
module = o.__class__.__module__
|
|
if module is None or module == str.__class__.__module__:
|
|
return o.__class__.__name__ # Avoid reporting __builtin__
|
|
return module + '.' + o.__class__.__name__
|
|
|
|
def get_derived_classes(mod, base, flt=None): # export
|
|
members = inspect.getmembers(mod, inspect.isclass)
|
|
r = []
|
|
for name, c in members:
|
|
log.slog(log.DEBUG, "found ", name)
|
|
if inspect.isabstract(c):
|
|
log.slog(log.DEBUG, " is abstract")
|
|
continue
|
|
if not base in inspect.getmro(c):
|
|
log.slog(log.DEBUG, " is not derived from", base, "only", inspect.getmro(c))
|
|
continue
|
|
if flt and not re.match(flt, name):
|
|
log.slog(log.DEBUG, ' "{}.{}" has wrong name'.format(mod, name))
|
|
continue
|
|
r.append(c)
|
|
return r
|
|
|
|
def load_classes(path, baseclass, flt=None): # export
|
|
r = []
|
|
for p in path.split(':'):
|
|
mod = importlib.import_module(path)
|
|
r.extend(get_derived_classes(mod, baseclass, flt))
|
|
return r
|
|
|
|
def load_class(module_path, baseclass, class_name_filter=None): # export
|
|
mod = importlib.import_module(module_path)
|
|
classes = get_derived_classes(mod, baseclass, flt=class_name_filter)
|
|
if len(classes) == 0:
|
|
raise Exception(f'no class matching "{class_name_filter}" of type "{baseclass}" found in module "{module_path}"')
|
|
if len(classes) > 1:
|
|
raise Exception(f'{len(classes)} classes matching "{class_name_filter}" of type "{baseclass}" found in module "{module_path}"')
|
|
return classes[0]
|
|
|
|
def load_class_names(path, baseclass, flt=None, remove_flt=False): # export
|
|
classes = load_classes(path, baseclass, flt)
|
|
r = []
|
|
for c in classes:
|
|
name = c.__name__
|
|
if flt and remove_flt:
|
|
name = re.subst(flt, "", name)
|
|
if name not in r:
|
|
r.append(name)
|
|
else:
|
|
pass
|
|
#log.slog(log.WARNING, "{} is already in in {}".format(name, r))
|
|
return r
|
|
|
|
def load_object(module_path, baseclass, class_name_filter=None, *args, **kwargs): # export
|
|
return load_class(module_path, baseclass, class_name_filter=class_name_filter)(*args, **kwargs)
|
|
|
|
def commit_tmpfile(tmp: str, path: str) -> None: # export
|
|
caller = log.get_caller_pos()
|
|
if os.path.isfile(path) and filecmp.cmp(tmp, path):
|
|
log.slog(log.INFO, "{} is up to date".format(path), caller=caller)
|
|
os.unlink(tmp)
|
|
else:
|
|
log.slog(log.NOTICE, "saving {}".format(path), caller=caller)
|
|
os.rename(path + '.tmp', path)
|
|
|
|
def dump(prio: int, objects: Iterable, *args, **kwargs) -> None: # export
|
|
caller = log.get_caller_pos(kwargs=kwargs)
|
|
log.slog(prio, ",---------- {}".format(' '.join(args)), caller=caller)
|
|
prefix = " | "
|
|
log.append_to_prefix(prefix)
|
|
i = 1
|
|
for o in objects:
|
|
o.dump(prio, "{} ({})".format(i, o.__class__.__name__), caller=caller, **kwargs)
|
|
i += 1
|
|
log.remove_from_prefix(prefix)
|
|
log.slog(prio, "`---------- {}".format(' '.join(args)), caller=caller)
|
|
|
|
atexit.register(_cleanup)
|