jw-python/tools/python/jwutils/misc.py

133 lines
4.4 KiB
Python
Raw Normal View History

import os, errno
import atexit
import tempfile
import filecmp
import inspect
import importlib
import re
from typing import Set, Iterable
from jwutils import log
_tmpfiles: Set[str] = set()
def _cleanup():
for f in _tmpfiles:
silentremove(f)
def silentremove(filename): #export
try:
os.remove(filename)
except OSError as e:
if e.errno != errno.ENOENT:
raise # re-raise exception if a different error occurred
def pad(token: str, total_size: int, right_align: bool = False) -> str:
add = total_size - len(token)
if add <= 0:
return token
space = ' ' * add
if right_align:
return space + token
return token + space
def atomic_store(contents, path): # export
if path[0:3] == '/dev':
with open(path, 'w') as outfile:
outfile.write(contents)
return
outfile = tempfile.NamedTemporaryFile(prefix=os.path.basename(path), delete=False, dir=os.path.dirname(path))
name = outfile.name
_tmpfiles.add(name)
outfile.write(contents)
outfile.close()
os.rename(name, path)
_tmpfiles.remove(name)
# see https://stackoverflow.com/questions/2020014
def object_builtin_name(o, full=True): # export
#if not full:
# return o.__class__.__name__
module = o.__class__.__module__
if module is None or module == str.__class__.__module__:
return o.__class__.__name__ # Avoid reporting __builtin__
return module + '.' + o.__class__.__name__
def get_derived_classes(mod, base, flt=None): # export
members = inspect.getmembers(mod, inspect.isclass)
r = []
for name, c in members:
log.slog(log.DEBUG, "found ", name)
if inspect.isabstract(c):
log.slog(log.DEBUG, " is abstract")
continue
if not base in inspect.getmro(c):
log.slog(log.DEBUG, " is not derived from", base, "only", inspect.getmro(c))
continue
if flt and not re.match(flt, name):
log.slog(log.DEBUG, ' "{}.{}" has wrong name'.format(mod, name))
continue
r.append(c)
return r
def load_classes(path, baseclass, flt=None): # export
r = []
for p in path.split(':'):
mod = importlib.import_module(path)
log.slog(log.DEBUG, "importing ", path)
r.extend(get_derived_classes(mod, baseclass, flt))
return r
def load_class(module_path, baseclass, class_name_filter=None): # export
mod = importlib.import_module(module_path)
classes = get_derived_classes(mod, baseclass, flt=class_name_filter)
if len(classes) == 0:
raise Exception(f'no class matching "{class_name_filter}" of type "{baseclass}" found in module "{module_path}"')
if len(classes) > 1:
raise Exception(f'{len(classes)} classes matching "{class_name_filter}" of type "{baseclass}" found in module "{module_path}"')
return classes[0]
def load_class_names(path, baseclass, flt=None, remove_flt=False): # export
classes = load_classes(path, baseclass, flt)
r = []
for c in classes:
name = c.__name__
if flt and remove_flt:
name = re.subst(flt, "", name)
if name not in r:
r.append(name)
else:
pass
#log.slog(log.WARNING, "{} is already in in {}".format(name, r))
return r
def load_object(module_path, baseclass, class_name_filter=None, *args, **kwargs): # export
return load_class(module_path, baseclass, class_name_filter=class_name_filter)(*args, **kwargs)
def load_function(module_path, name): # export
mod = importlib.import_module(module_path)
return getattr(mod, name)
def commit_tmpfile(tmp: str, path: str) -> None: # export
caller = log.get_caller_pos()
if os.path.isfile(path) and filecmp.cmp(tmp, path):
log.slog(log.INFO, "{} is up to date".format(path), caller=caller)
os.unlink(tmp)
else:
log.slog(log.NOTICE, "saving {}".format(path), caller=caller)
os.rename(path + '.tmp', path)
def dump(prio: int, objects: Iterable, *args, **kwargs) -> None: # export
caller = log.get_caller_pos(kwargs=kwargs)
log.slog(prio, ",---------- {}".format(' '.join(args)), caller=caller)
prefix = " | "
log.append_to_prefix(prefix)
i = 1
for o in objects:
o.dump(prio, "{} ({})".format(i, o.__class__.__name__), caller=caller, **kwargs)
i += 1
log.remove_from_prefix(prefix)
log.slog(prio, "`---------- {}".format(' '.join(args)), caller=caller)
atexit.register(_cleanup)