# -*- coding: utf-8 -*- import io import os, re, textwrap, json, csv from tabulate import tabulate # type: ignore from jwutils.log import * def rows_pretty(rows): # export if type(rows) == dict: rows = [rows] out = [] for row in rows: out.append(json.dumps(row, sort_keys=True, indent=4, default=str)) return '\n'.join(out) def rows_duplicates(rows, log_prio=INFO, caller=None): # export def __equal(r1, r2): for col in set(r1.keys()) | set(r2.keys()): if col in r1: if col not in r2: return False else: if col in r2: return False if r1[col] != r2[col]: return False return True ret = [] last = len(rows) - 1 i = last while last > 0: for i in reversed(range(0, last-1)): if __equal(rows[last], rows[i]): ret.append(last) last -= 1 break last -= 1 return ret def rows_remove(rows, callback=None, candidates=None, log_prio=INFO, caller=None): # export def __is_remove_candidate(row): for remove_row in candidates: for col, val in row.items(): if not col in remove_row.keys(): break if val != remove_row[col]: break else: return True return False if caller is None: caller = get_caller_pos() if callback is None: if candidates is not None: callback = __is_remove_candidate else: raise Exception('No criterion to remove rows') remove = list() index = -1 for row in rows: index += 1 if callback(row): remove.append(index) continue for index in reversed(remove): slog(log_prio, f'Removing row {rows[index]}', caller=caller) del rows[index] def rows_select(rows, rules): # export ret = [] for row in rows: for rule in rules: if type(rule) == tuple(): search_rule = rule[0] else: search_rule = rule for col_name, expr in search_rule.items(): if not re.search(expr, row[col_name]): break else: ret.append(row) break return ret def rows_rewrite_regex(rows, rules): # export for row in rows: for rule in rules: try: for col_name, expr in rule[0].items(): if not re.search(expr, row[col_name]): break else: for exec_col_name, exec_val in rule[1].items(): slog(INFO, f'Rewriting {row} {row.get(exec_col_name)} -> {exec_val}') row[exec_col_name] = exec_val except Exception as e: slog(ERR, f'Failed to run rule {rule} against {row} ({e})') raise def rows_check_not_null(rows, keys, log_prio=WARNING, buf=None, stat_key=None, throw=True, caller=None): # export if type(keys) == str: keys = [keys] if caller is None: caller = get_caller_pos() count = 0 stats = dict() if buf is None: buf = [] else: buf.clear() for row in rows: for key in keys: if row.get(key) is None: slog(log_prio, f'{key} is missing in row {row}', caller=caller) buf.append(row) if stat_key is not None: stat_val = row[stat_key] if not stat_val in stats.keys(): stats[stat_val] = 0 stats[stat_val] += 1 count += 1 break if count > 0: if stat_key is not None: i = 0 for k, v in reversed(sorted(stats.items(), key=lambda item: item[1])): i += 1 slog(ERR, f'{i:>3}. {k:<23}: {v}', caller=caller) if throw: raise Exception(f'Found {count} rows violating null-constraint for keys {keys}') return buf def rows_dumps(rows, log_prio=INFO, caller=None, use_cols=None, skip_cols=None, table_name=None, out_path='log', heading=None, lead=None, tablefmt=None): # export headers = 'keys' dump_rows = rows if use_cols is not None: #dump_rows = {col: rows[col] for col in use_cols} new_dump_rows = [] for row in dump_rows: new_dump_rows.append({col: row.get(col) for col in use_cols}) dump_rows = new_dump_rows if skip_cols is not None: new_dump_rows = [] for row in dump_rows: new_row = {} for col, val in row.items(): if col in skip_cols: continue new_row[col] = val new_dump_rows.append(new_row) dump_rows = new_dump_rows out = header = footer = "" match tablefmt: case 'html': if heading is not None: heading = f'

{heading}

\n' if type(lead) == str: lead = f'
\n {lead}\n
\n' elif type(lead) == list: l = '\n' lead = l header=textwrap.dedent('''\ ''') footer = textwrap.dedent(''' ''') case _: if type(heading) == str: heading = '\n' + heading if type(lead) == str: pass elif type(lead) == list: l ='' for li in lead: l += f' - {li}\n' lead = '\n\n' + l + '\n' if heading is None: heading = '' if lead is None: lead = '' return header + heading + lead + tabulate(dump_rows, headers=headers, tablefmt=tablefmt) + footer def rows_dump(rows, log_prio=INFO, caller=None, use_cols=None, skip_cols=None, table_name=None, out_path='log', heading=None, lead=None, tablefmt=None): # export if not prio_gets_logged(log_prio): return if caller is None: caller = get_caller_pos() if tablefmt is None and out_path: tablefmt = os.path.splitext(out_path)[1][1:] out = rows_dumps(rows, log_prio=log_prio, caller=caller, use_cols=use_cols, skip_cols=skip_cols, table_name=table_name, heading=heading, lead=lead, tablefmt=tablefmt) match out_path: case 'log': slog_m(log_prio, out, caller=caller) case _: with open(out_path, 'w') as fp: fp.write(out) def rows_to_csv(rows, use_tmpfile=False): # export def __write(rows, out): writer = csv.DictWriter(out, fieldnames=field_names, delimiter=';', quotechar='"', quoting=csv.QUOTE_NONNUMERIC) writer.writeheader() for row in rows: writer.writerow(row) field_names = [] for row in rows: for col in row.keys(): if col not in field_names: field_names.append(col) if True: out = io.StringIO() __write(rows, out) return out.getvalue() import tempfile with tempfile.TemporaryFile(mode='w', newline='') as out: __write(rows, out) out.seek(0) return out.read()