jw-python/tools/python/jwutils/misc.py

161 lines
5.4 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
import os, errno, atexit, tempfile, filecmp, inspect, importlib, re
from typing import Iterable
from . import log
_tmpfiles: set[str] = set()
def _cleanup():
for f in _tmpfiles:
silentremove(f)
def silentremove(filename): #export
try:
os.remove(filename)
except OSError as e:
if e.errno != errno.ENOENT:
raise # re-raise exception if a different error occurred
def pad(token: str, total_size: int, right_align: bool = False) -> str:
add = total_size - len(token)
if add <= 0:
return token
space = ' ' * add
if right_align:
return space + token
return token + space
def atomic_store(contents, path): # export
if path[0:3] == '/dev':
with open(path, 'w') as outfile:
outfile.write(contents)
return
outfile = tempfile.NamedTemporaryFile(prefix=os.path.basename(path), delete=False, dir=os.path.dirname(path))
name = outfile.name
_tmpfiles.add(name)
outfile.write(contents)
outfile.close()
os.rename(name, path)
_tmpfiles.remove(name)
# see https://stackoverflow.com/questions/2020014
def object_builtin_name(o, full=True): # export
#if not full:
# return o.__class__.__name__
module = o.__class__.__module__
if module is None or module == str.__class__.__module__:
return o.__class__.__name__ # Avoid reporting __builtin__
return module + '.' + o.__class__.__name__
def get_derived_classes(mod, base, flt=None): # export
members = inspect.getmembers(mod, inspect.isclass)
r = []
for name, c in members:
log.slog(log.DEBUG, "found ", name)
if inspect.isabstract(c):
log.slog(log.DEBUG, " is abstract")
continue
if not base in inspect.getmro(c):
log.slog(log.DEBUG, " is not derived from", base, "only", inspect.getmro(c))
continue
if flt and not re.match(flt, name):
log.slog(log.DEBUG, ' "{}.{}" has wrong name'.format(mod, name))
continue
r.append(c)
return r
def load_classes(path, baseclass, flt=None): # export
r = []
for p in path.split(':'):
mod = importlib.import_module(path)
log.slog(log.DEBUG, "importing ", path)
r.extend(get_derived_classes(mod, baseclass, flt))
return r
def load_class(module_path, baseclass, class_name_filter=None): # export
mod = importlib.import_module(module_path)
classes = get_derived_classes(mod, baseclass, flt=class_name_filter)
if len(classes) == 0:
raise Exception(f'no class matching "{class_name_filter}" of type "{baseclass}" found in module "{module_path}"')
if len(classes) > 1:
raise Exception(f'{len(classes)} classes matching "{class_name_filter}" of type "{baseclass}" found in module "{module_path}"')
return classes[0]
def load_class_names(path, baseclass, flt=None, remove_flt=False): # export
classes = load_classes(path, baseclass, flt)
r = []
for c in classes:
name = c.__name__
if flt and remove_flt:
name = re.subst(flt, "", name)
if name not in r:
r.append(name)
else:
pass
#log.slog(log.WARNING, "{} is already in in {}".format(name, r))
return r
def load_object(module_path, baseclass, class_name_filter=None, *args, **kwargs): # export
return load_class(module_path, baseclass, class_name_filter=class_name_filter)(*args, **kwargs)
def load_function(module_path, name): # export
mod = importlib.import_module(module_path)
return getattr(mod, name)
def commit_tmpfile(tmp: str, path: str) -> None: # export
caller = log.get_caller_pos()
if os.path.isfile(path) and filecmp.cmp(tmp, path):
log.slog(log.INFO, "{} is up to date".format(path), caller=caller)
os.unlink(tmp)
else:
log.slog(log.NOTICE, "saving {}".format(path), caller=caller)
os.rename(path + '.tmp', path)
def multi_regex_edit(spec, strings): # export
for cmd in spec:
if len(cmd) < 2:
raise Exception('Invalid command in multi_regex_edit(): {}'.format(str(cmd)))
if cmd[0] == 'sub':
rx = re.compile(cmd[1])
replacement = cmd[2]
r = []
for l in strings:
r.append(re.sub(rx, replacement, l))
strings = r
continue
if cmd[0] == 'del':
rx = re.compile(cmd[1])
r = []
for l in strings:
if rx.search(l) is not None:
continue
r.append(l)
strings = r
continue
if cmd[0] == 'match':
rx = re.compile(cmd[1])
r = []
for l in strings:
if rx.search(l) is not None:
r.append(l)
strings = r
continue
raise Exception('Invalid command in multi_regex_edit(): {}'.format(str(cmd)))
return strings
def dump(prio: int, objects: Iterable, *args, **kwargs) -> None: # export
caller = log.get_caller_pos(kwargs=kwargs)
log.slog(prio, ",---------- {}".format(' '.join(args)), caller=caller)
prefix = " | "
log.append_to_prefix(prefix)
i = 1
for o in objects:
o.dump(prio, "{} ({})".format(i, o.__class__.__name__), caller=caller, **kwargs)
i += 1
log.remove_from_prefix(prefix)
log.slog(prio, "`---------- {}".format(' '.join(args)), caller=caller)
atexit.register(_cleanup)