import os, errno import atexit import tempfile import filecmp import inspect import importlib from typing import Set, Iterable from jwutils import log _tmpfiles: Set[str] = set() def _cleanup(): for f in _tmpfiles: silentremove(f) def silentremove(filename): #export try: os.remove(filename) except OSError as e: if e.errno != errno.ENOENT: raise # re-raise exception if a different error occurred def pad(token: str, total_size: int, right_align: bool = False) -> str: add = total_size - len(token) if add <= 0: return token space = ' ' * add if right_align: return space + token return token + space def atomic_store(contents, path): # export if path[0:3] == '/dev': with open(path, 'w') as outfile: outfile.write(contents) return outfile = tempfile.NamedTemporaryFile(prefix=os.path.basename(path), delete=False, dir=os.path.dirname(path)) name = outfile.name _tmpfiles.add(name) outfile.write(contents) outfile.close() os.rename(name, path) _tmpfiles.remove(name) # see https://stackoverflow.com/questions/2020014 def object_builtin_name(o, full=True): # export #if not full: # return o.__class__.__name__ module = o.__class__.__module__ if module is None or module == str.__class__.__module__: return o.__class__.__name__ # Avoid reporting __builtin__ return module + '.' + o.__class__.__name__ def get_derived_classes(mod, base, flt=None): # export members = inspect.getmembers(mod, inspect.isclass) r = [] for name, c in members: log.slog(log.DEBUG, "found ", name) if inspect.isabstract(c): log.slog(log.DEBUG, " is abstract") continue if not base in inspect.getmro(c): log.slog(log.DEBUG, " is not derived from", base, "only", inspect.getmro(c)) continue if flt and not re.match(flt, name): slog(DEBUG, ' "{}.{}" has wrong name'.format(mod, name)) continue r.append(c) return r def load_classes(path, baseclass, flt=None): # export r = [] for p in path.split(':'): mod = importlib.import_module(path) r.extend(get_derived_classes(mod, baseclass, flt)) return r def load_class_names(path, baseclass, flt=None, remove_flt=False): # export classes = load_classes(path, baseclass, flt) r = [] for c in classes: name = c.__name__ if flt and remove_flt: name = re.subst(flt, "", name) if name not in r: r.append(name) else: pass #log.slog(log.WARNING, "{} is already in in {}".format(name, r)) return r # TODO: handling of "name" / "flt" is awkward def load_object(path, baseclass, name = None, *args, **kwargs): # export mod = importlib.import_module(path) classes = get_derived_classes(mod, baseclass, flt=None) if len(classes) == 0: raise Exception("no class {} found".format(name)) if len(classes) > 1: raise Exception("{} classes matching {} found".format(len(classes), name)) return classes[0](*args, **kwargs) def commit_tmpfile(tmp: str, path: str) -> None: # export caller = log.get_caller_pos() if os.path.isfile(path) and filecmp.cmp(tmp, path): log.slog(log.INFO, "{} is up to date".format(path), caller=caller) os.unlink(tmp) else: log.slog(log.NOTICE, "saving {}".format(path), caller=caller) os.rename(path + '.tmp', path) def dump(prio: int, objects: Iterable, *args, **kwargs) -> None: # export caller = log.get_caller_pos(kwargs=kwargs) log.slog(prio, ",---------- {}".format(' '.join(args)), caller=caller) prefix = " | " log.append_to_prefix(prefix) i = 1 for o in objects: o.dump(prio, "{} ({})".format(i, o.__class__.__name__), caller=caller, **kwargs) i += 1 log.remove_from_prefix(prefix) log.slog(prio, "`---------- {}".format(' '.join(args)), caller=caller) atexit.register(_cleanup)