mirror of
ssh://git.janware.com/srv/git/janware/proj/jw-python
synced 2026-01-15 01:52:56 +01:00
Fix missing raw string qualifier missing from regex with backslashes. Signed-off-by: Jan Lindemann <jan@janware.com>
332 lines
9.1 KiB
Python
332 lines
9.1 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
from __future__ import print_function
|
|
|
|
from typing import List, Tuple, Optional, Any
|
|
|
|
import sys, re, io, syslog, inspect, unicodedata
|
|
|
|
from os.path import basename
|
|
from datetime import datetime
|
|
|
|
from . import misc
|
|
|
|
# --- python 2 / 3 compatibility stuff
|
|
try:
|
|
basestring # type: ignore
|
|
except NameError:
|
|
basestring = str
|
|
|
|
_special_chars = {
|
|
'\a' : '\\a',
|
|
'\b' : '\\b',
|
|
'\t' : '\\t',
|
|
'\n' : '\\n',
|
|
'\v' : '\\v',
|
|
'\f' : '\\f',
|
|
'\r' : '\\r',
|
|
}
|
|
|
|
_special_char_regex = re.compile("(%s)" % "|".join(map(re.escape, _special_chars.keys())))
|
|
|
|
_all_control_chars = ''.join(chr(c) for c in range(sys.maxunicode) if unicodedata.category(chr(c)) in {'Cc'})
|
|
_clean_str_regex = re.compile(r'(\033\[[0-9]*m|[%s])' % re.escape(_all_control_chars))
|
|
|
|
EMERG = int(syslog.LOG_EMERG)
|
|
ALERT = int(syslog.LOG_ALERT)
|
|
CRIT = int(syslog.LOG_CRIT)
|
|
ERR = int(syslog.LOG_ERR)
|
|
WARNING = int(syslog.LOG_WARNING)
|
|
NOTICE = int(syslog.LOG_NOTICE)
|
|
INFO = int(syslog.LOG_INFO)
|
|
DEBUG = int(syslog.LOG_DEBUG)
|
|
DEVEL = int(syslog.LOG_DEBUG + 1)
|
|
OFF = DEVEL + 1
|
|
|
|
_level = NOTICE
|
|
|
|
CONSOLE_FONT_BOLD = '\033[1m'
|
|
CONSOLE_FONT_RED = '\033[31m'
|
|
CONSOLE_FONT_GREEN = '\033[32m'
|
|
CONSOLE_FONT_YELLOW = '\033[33m'
|
|
CONSOLE_FONT_BLUE = '\033[34m'
|
|
|
|
CONSOLE_FONT_MAGENTA = '\033[35m'
|
|
CONSOLE_FONT_CYAN = '\033[36m'
|
|
CONSOLE_FONT_WHITE = '\033[37m'
|
|
|
|
CONSOLE_FONT_BLINK = '\033[5m'
|
|
CONSOLE_FONT_OFF = '\033[m'
|
|
|
|
f_position = 'position'
|
|
f_module = 'module'
|
|
f_date = 'date'
|
|
f_stderr = 'stderr'
|
|
f_stdout = 'stdout'
|
|
f_prio = 'prio'
|
|
f_color = 'color'
|
|
f_default = [ f_position, f_stderr, f_prio, f_color ]
|
|
|
|
_flags = set(f_default)
|
|
_log_prefix = ''
|
|
_clean_log_prefix = ''
|
|
_file_name_len = 20
|
|
_module_name_len = 50
|
|
_log_file_streams: list[io.TextIOWrapper] = []
|
|
|
|
_short_prio_str = {
|
|
EMERG : '<Y>',
|
|
ALERT : '<A>',
|
|
CRIT : '<C>',
|
|
ERR : '<E>',
|
|
WARNING : '<W>',
|
|
NOTICE : '<N>',
|
|
INFO : '<I>',
|
|
DEBUG : '<D>',
|
|
DEVEL : '<V>',
|
|
}
|
|
|
|
_prio_colors = {
|
|
DEVEL : [ "", "" ],
|
|
DEBUG : [ "", "" ],
|
|
INFO : [ CONSOLE_FONT_BLUE, CONSOLE_FONT_OFF ],
|
|
NOTICE : [ CONSOLE_FONT_GREEN, CONSOLE_FONT_OFF ],
|
|
WARNING : [ CONSOLE_FONT_YELLOW, CONSOLE_FONT_OFF ],
|
|
ERR : [ CONSOLE_FONT_BOLD + CONSOLE_FONT_RED, CONSOLE_FONT_OFF ],
|
|
CRIT : [ CONSOLE_FONT_BOLD + CONSOLE_FONT_MAGENTA, CONSOLE_FONT_OFF ],
|
|
ALERT : [ CONSOLE_FONT_BOLD + CONSOLE_FONT_MAGENTA, CONSOLE_FONT_OFF ],
|
|
EMERG : [ CONSOLE_FONT_BOLD + CONSOLE_FONT_MAGENTA, CONSOLE_FONT_OFF ],
|
|
}
|
|
|
|
class Stream:
|
|
def __init__(self, stream, flags):
|
|
self.stream = stream
|
|
self.flags = flags
|
|
|
|
_streams: dict[int, Stream] = dict()
|
|
_stream_descriptors = [reversed(range(1, 16))]
|
|
|
|
def add_capture_stream(stream, flags=0x0):
|
|
ret = _stream_descriptors.pop()
|
|
_streams[ret] = Stream(stream=stream, flags=flags)
|
|
return ret
|
|
|
|
def rm_capture_stream(sd):
|
|
del _streams[sd]
|
|
_stream_descriptors.append(sd)
|
|
|
|
def prio_gets_logged(prio: int) -> bool: # export
|
|
if prio > _level:
|
|
return False
|
|
return True
|
|
|
|
def log_level(s: Optional[str]=None) -> int: # export
|
|
if s is None:
|
|
return _level
|
|
return parse_log_prio_str(s)
|
|
|
|
def get_caller_pos(up: int = 1, kwargs: Optional[dict[str, Any]] = None) -> Tuple[str, str, int]:
|
|
if kwargs and 'caller' in kwargs:
|
|
r = kwargs['caller']
|
|
del kwargs['caller']
|
|
return r
|
|
caller = inspect.stack()[up+1]
|
|
mod = inspect.getmodule(caller[0])
|
|
mod_name = '' if mod is None else mod.__name__
|
|
return (mod_name, basename(caller.filename), caller.lineno)
|
|
|
|
def slog_m(prio: int, *args, **kwargs) -> None: # export
|
|
if prio > _level:
|
|
return
|
|
if len(args):
|
|
margs = ''
|
|
for a in args:
|
|
if isinstance(a, list):
|
|
margs += '\n'.join([str(elem) for elem in a])
|
|
continue
|
|
margs += ' ' + str(a)
|
|
if 'caller' not in kwargs:
|
|
caller = get_caller_pos(1)
|
|
else:
|
|
caller = kwargs['caller']
|
|
del kwargs['caller']
|
|
for line in margs[1:].split('\n'):
|
|
slog(prio, line, **kwargs, caller=caller)
|
|
|
|
def slog(prio: int, *args, only_printable: bool=False, **kwargs) -> None: # export
|
|
|
|
if prio > _level:
|
|
return
|
|
|
|
msg = ''
|
|
color_on = ''
|
|
color_off = ''
|
|
|
|
if f_date in _flags:
|
|
msg += datetime.now().strftime("%b %d %H:%M:%S.%f ")
|
|
|
|
if f_prio in _flags:
|
|
msg += _short_prio_str[prio] + ' '
|
|
|
|
if f_position in _flags:
|
|
|
|
if 'caller' in kwargs:
|
|
mod, name, line = kwargs['caller']
|
|
else:
|
|
mod, name, line = get_caller_pos(1)
|
|
|
|
if f_module in _flags:
|
|
msg += misc.pad(mod, _module_name_len)
|
|
|
|
msg += misc.pad(name, _file_name_len) + '[' + misc.pad(str(line), 4, True) + ']'
|
|
|
|
if f_color in _flags:
|
|
color_on, color_off = console_color_chars(prio)
|
|
|
|
margs = ''
|
|
if len(args):
|
|
for a in args:
|
|
margs += ' ' + str(a)
|
|
if only_printable:
|
|
margs = _special_char_regex.sub(lambda mo: _special_chars[mo.string[mo.start():mo.end()]], margs)
|
|
margs = re.sub('[\x01-\x1f]', '.', margs)
|
|
|
|
for file in _log_file_streams:
|
|
print(msg + _clean_log_prefix + margs, file=file)
|
|
|
|
msg += _log_prefix
|
|
|
|
if not len(msg):
|
|
return
|
|
|
|
if len(margs):
|
|
msg += color_on + margs + color_off
|
|
|
|
files = []
|
|
if 'capture' in kwargs:
|
|
files.append(kwargs['capture'])
|
|
elif _streams:
|
|
files = [s.stream for s in _streams.values()]
|
|
else:
|
|
if f_stdout in _flags:
|
|
files.append(sys.stdout)
|
|
|
|
if f_stderr in _flags:
|
|
files.append(sys.stderr)
|
|
|
|
if not len(files):
|
|
files = [ sys.stdout ]
|
|
|
|
for file in files:
|
|
print(msg, file=file)
|
|
|
|
def throw(*args, prio=ERR, caller=None, **kwargs) -> None:
|
|
if caller is None:
|
|
caller = get_caller_pos(1)
|
|
msg = ' '.join([str(arg) for arg in args])
|
|
slog(prio, msg, caller=caller)
|
|
raise Exception(msg)
|
|
|
|
def parse_log_prio_str(prio: str) -> int: # export
|
|
try:
|
|
r = int(prio)
|
|
if r < 0 or r > DEVEL:
|
|
raise Exception("Invalid log priority ", prio)
|
|
except ValueError:
|
|
map_prio_str_to_val = {
|
|
"EMERG" : EMERG,
|
|
"emerg" : EMERG,
|
|
"ALERT" : ALERT,
|
|
"alert" : ALERT,
|
|
"CRIT" : CRIT,
|
|
"crit" : CRIT,
|
|
"ERR" : ERR,
|
|
"err" : ERR,
|
|
"WARNING" : WARNING,
|
|
"warning" : WARNING,
|
|
"NOTICE" : NOTICE,
|
|
"notice" : NOTICE,
|
|
"INFO" : INFO,
|
|
"info" : INFO,
|
|
"DEBUG" : DEBUG,
|
|
"debug" : DEBUG,
|
|
"DEVEL" : DEVEL,
|
|
"devel" : DEVEL,
|
|
"OFF" : OFF,
|
|
"off" : OFF,
|
|
}
|
|
if prio in map_prio_str_to_val:
|
|
return map_prio_str_to_val[prio]
|
|
raise Exception("Unknown priority string \"", prio, "\"")
|
|
|
|
def console_color_chars(prio: int) -> List[str]: # export
|
|
if not sys.stdout.isatty():
|
|
return [ '', '' ]
|
|
return _prio_colors[prio]
|
|
|
|
def set_level(level_: str) -> None: # export
|
|
global _level
|
|
if isinstance(level_, basestring):
|
|
_level = parse_log_prio_str(level_)
|
|
return
|
|
_level = level_
|
|
|
|
def set_flags(flags: str|None) -> str: # export
|
|
global _flags
|
|
ret = ','.join(_flags)
|
|
if flags is not None:
|
|
_flags = set(flags.split(','))
|
|
return ret
|
|
|
|
#syslog
|
|
#console
|
|
#color
|
|
#prio
|
|
#position
|
|
#ide
|
|
#trace_rename_thread_to_shorter
|
|
#trace_rename_thread_to_longer
|
|
#trace_inout
|
|
#skip_openlog
|
|
#id
|
|
#date
|
|
#pid
|
|
#highlight_first_error
|
|
|
|
def append_to_prefix(prefix: str) -> str: # export
|
|
global _log_prefix
|
|
global _clean_log_prefix
|
|
r = _log_prefix
|
|
if prefix:
|
|
_log_prefix += prefix
|
|
_clean_log_prefix = _clean_str_regex.sub('', _log_prefix)
|
|
return r
|
|
|
|
def remove_from_prefix(count) -> str: # export
|
|
if isinstance(count, str):
|
|
count = len(count)
|
|
global _log_prefix
|
|
global _clean_log_prefix
|
|
r = _log_prefix
|
|
_log_prefix = _log_prefix[:-count]
|
|
_clean_log_prefix = _clean_str_regex.sub('', _log_prefix)
|
|
return r
|
|
|
|
def set_filename_length(l: int) -> int: # export
|
|
global _file_name_len
|
|
r = _file_name_len
|
|
if l:
|
|
_file_name_len = l
|
|
return r
|
|
|
|
def set_module_name_length(l: int) -> int: # export
|
|
global _module_name_len
|
|
r = _module_name_len
|
|
if l:
|
|
_module_name_len = l
|
|
return r
|
|
|
|
def add_log_file(path: str) -> None: # export
|
|
global _log_file_streams
|
|
fd = open(path, 'w', buffering=1)
|
|
_log_file_streams.append(fd)
|