# -*- coding: utf-8 -*- from typing import Any, List, Union, Optional, Dict from abc import ABC, abstractmethod import re, csv, json from ..log import * from ..cast import cast_str from .schema.Schema import Schema from .rows import * TType = Union[Any, Dict[str, Any]] class TableIoHandler(ABC): # export def __init__(self, schema: Schema): self.__table_meta = None self.__schema = schema @property def _table_meta(self): if self.__table_meta is None: self.__table_meta = self.__schema.table_by_model_name( self.__class__.__name__, throw=True) return self.__table_meta @property def _table_name(self): return self._table_meta.name @property def _primary_keys(self): return self._table_meta.primary_keys def _check_non_nullable(self, rows): buf = [] non_nullable = self.__table_meta.not_null_insertible_columns try: rows_check_not_null(rows, non_nullable, buf=buf) except: cn = self.__class__.__name__ tn = self._table_name d = '=========================================================' slog_m(ERR, f'{d} Null values in {cn}\n') for key in non_nullable: buf = rows_check_not_null(rows, key, log_prio=OFF, throw=False) if not buf: continue slog_m(ERR, f'\n{d} Null values in {cn} / {tn}: "{key}"\n') use_cols=self.log_columns if key not in use_cols: use_cols.append(key) rows_dump(buf, use_cols=use_cols, log_prio=ERR) rows_dump(buf, use_cols=use_cols, out_path=f'/tmp/missing_{key}_in_{tn}.html', heading=f'Missing "{key}" in table {tn}') raise @property def log_columns(self): return self._table_meta.log_columns @abstractmethod def _load(self, uri: str, reference) -> TType: pass @abstractmethod def _store(self, uri: str, data: TType): pass def load(self, uri: str, reference, check_duplicates=False, write_csv=None) -> TType: slog(INFO, f'Reading table "{self._table_name}" from "{uri}"') ret = self._load(uri, reference) if check_duplicates: slog(INFO, f'Checking duplicates in {self._table_name}') d = rows_duplicates(ret) if len(d): slog(ERR, f'Duplicates {d}') raise Exception("Duplicates") self._check_non_nullable(ret) #schema_missing_foreign_key_values(self._table_name, ret, reference) return ret def store(self, uri: str, data: TType) -> None: return self._store(uri, data)