jw-python/tools/python/jwutils/db/rows.py
Jan Lindemann 428692ea3a Streamline Python file headers somewhat
- Add coding statement
  - Import all modules in one line where possible
  - Order: __future__, typing, plain imports, from imports,
    janware modules

Signed-off-by: Jan Lindemann <jan@janware.com>
2025-07-10 05:14:06 +02:00

250 lines
8.2 KiB
Python

# -*- coding: utf-8 -*-
import io, os, re, textwrap, json, csv
from tabulate import tabulate # type: ignore
from jwutils.log import *
def rows_pretty(rows): # export
if type(rows) == dict:
rows = [rows]
out = []
for row in rows:
out.append(json.dumps(row, sort_keys=True, indent=4, default=str))
return '\n'.join(out)
def rows_duplicates(rows, log_prio=INFO, caller=None): # export
def __equal(r1, r2):
for col in set(r1.keys()) | set(r2.keys()):
if col in r1:
if col not in r2:
return False
else:
if col in r2:
return False
if r1[col] != r2[col]:
return False
return True
ret = []
last = len(rows) - 1
i = last
while last > 0:
for i in reversed(range(0, last-1)):
if __equal(rows[last], rows[i]):
ret.append(last)
last -= 1
break
last -= 1
return ret
def rows_remove(rows, callback=None, candidates=None, log_prio=INFO, caller=None): # export
def __is_remove_candidate(row):
for remove_row in candidates:
for col, val in row.items():
if not col in remove_row.keys():
break
if val != remove_row[col]:
break
else:
return True
return False
if caller is None:
caller = get_caller_pos()
if callback is None:
if candidates is not None:
callback = __is_remove_candidate
else:
raise Exception('No criterion to remove rows')
remove = list()
index = -1
for row in rows:
index += 1
if callback(row):
remove.append(index)
continue
for index in reversed(remove):
slog(log_prio, f'Removing row {rows[index]}', caller=caller)
del rows[index]
def rows_select(rows, rules): # export
ret = []
for row in rows:
for rule in rules:
if type(rule) == tuple():
search_rule = rule[0]
else:
search_rule = rule
for col_name, expr in search_rule.items():
if not re.search(expr, row[col_name]):
break
else:
ret.append(row)
break
return ret
def rows_rewrite_regex(rows, rules): # export
for row in rows:
for rule in rules:
try:
for col_name, expr in rule[0].items():
if not re.search(expr, row[col_name]):
break
else:
for exec_col_name, exec_val in rule[1].items():
slog(INFO, f'Rewriting {row} {row.get(exec_col_name)} -> {exec_val}')
row[exec_col_name] = exec_val
except Exception as e:
slog(ERR, f'Failed to run rule {rule} against {row} ({e})')
raise
def rows_check_not_null(rows, keys, log_prio=WARNING, buf=None, stat_key=None, throw=True, caller=None): # export
if type(keys) == str:
keys = [keys]
if caller is None:
caller = get_caller_pos()
count = 0
stats = dict()
if buf is None:
buf = []
else:
buf.clear()
for row in rows:
for key in keys:
if row.get(key) is None:
slog(log_prio, f'{key} is missing in row {row}', caller=caller)
buf.append(row)
if stat_key is not None:
stat_val = row[stat_key]
if not stat_val in stats.keys():
stats[stat_val] = 0
stats[stat_val] += 1
count += 1
break
if count > 0:
if stat_key is not None:
i = 0
for k, v in reversed(sorted(stats.items(), key=lambda item: item[1])):
i += 1
slog(ERR, f'{i:>3}. {k:<23}: {v}', caller=caller)
if throw:
raise Exception(f'Found {count} rows violating null-constraint for keys {keys}')
return buf
def rows_dumps(rows, log_prio=INFO, caller=None, use_cols=None, skip_cols=None, table_name=None, out_path='log', heading=None, lead=None, tablefmt=None): # export
headers = 'keys'
dump_rows = rows
if use_cols is not None:
#dump_rows = {col: rows[col] for col in use_cols}
new_dump_rows = []
for row in dump_rows:
new_dump_rows.append({col: row.get(col) for col in use_cols})
dump_rows = new_dump_rows
if skip_cols is not None:
new_dump_rows = []
for row in dump_rows:
new_row = {}
for col, val in row.items():
if col in skip_cols:
continue
new_row[col] = val
new_dump_rows.append(new_row)
dump_rows = new_dump_rows
out = header = footer = ""
match tablefmt:
case 'html':
if heading is not None:
heading = f'<h1>{heading}</h1>\n'
if type(lead) == str:
lead = f'<div class="lead">\n {lead}\n</div>\n'
elif type(lead) == list:
l = '<ul>\n'
for li in lead:
l += f'<li>{li}</li>\n'
l += '</ul>\n'
lead = l
header=textwrap.dedent('''\
<html>
<head>
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
<script>
$(document).ready(function()
{
$("tr:odd").css({ "background-color":"#ffa"});
});
</script>
<style>
tbody td {
/* padding: 30px; */
}
tbody tr:nth-child(odd) {
background-color: #4C8BF5;
}
</style>
</head>
<body>
''')
footer = textwrap.dedent('''
</body>
</html>
''')
case _:
if type(heading) == str:
heading = '\n' + heading
if type(lead) == str:
pass
elif type(lead) == list:
l =''
for li in lead:
l += f' - {li}\n'
lead = '\n\n' + l + '\n'
if heading is None:
heading = ''
if lead is None:
lead = ''
return header + heading + lead + tabulate(dump_rows, headers=headers, tablefmt=tablefmt) + footer
def rows_dump(rows, log_prio=INFO, caller=None, use_cols=None, skip_cols=None, table_name=None, out_path='log', heading=None, lead=None, tablefmt=None): # export
if not prio_gets_logged(log_prio):
return
if caller is None:
caller = get_caller_pos()
if tablefmt is None and out_path:
tablefmt = os.path.splitext(out_path)[1][1:]
out = rows_dumps(rows, log_prio=log_prio, caller=caller, use_cols=use_cols, skip_cols=skip_cols, table_name=table_name, heading=heading, lead=lead, tablefmt=tablefmt)
match out_path:
case 'log':
slog_m(log_prio, out, caller=caller)
case _:
with open(out_path, 'w') as fp:
fp.write(out)
def rows_to_csv(rows, use_tmpfile=False): # export
def __write(rows, out):
writer = csv.DictWriter(out, fieldnames=field_names, delimiter=';', quotechar='"', quoting=csv.QUOTE_NONNUMERIC)
writer.writeheader()
for row in rows:
writer.writerow(row)
field_names = []
for row in rows:
for col in row.keys():
if col not in field_names:
field_names.append(col)
if True:
out = io.StringIO()
__write(rows, out)
return out.getvalue()
import tempfile
with tempfile.TemporaryFile(mode='w', newline='') as out:
__write(rows, out)
out.seek(0)
return out.read()