jw-python/tools/python/jwutils/db/schema/Table.py

438 lines
16 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
from typing import Optional, Union, Iterable, Self, Any # TODO: Need any for many things, as I can't figure out how to avoid circular imports from here
import abc
from collections import OrderedDict
from urllib.parse import quote_plus
from ...log import *
from ...misc import load_class
from .ColumnSet import ColumnSet
from .DataType import DataType
from .CompositeForeignKey import CompositeForeignKey
from .Column import Column
class Table(abc.ABC): # export
def __init__(self, schema, name: str):
assert(isinstance(name, str))
self.__schema = schema
self.__name = name
self.___columns: Optional[OrderedDict[str, Any]] = None
self.___foreign_key_parent_tables: Optional[OrderedDict[str, Any]] = None
self.__primary_keys: Optional[Iterable[str]] = None
self.__unique_constraints: Optional[list[ColumnSet]] = None
self.__foreign_key_constraints: Optional[list[CompositeForeignKey]] = None
self.__nullable_columns: Optional[Iterable[str]] = None
self.__non_nullable_columns: Optional[Iterable[str]] = None
self.__null_insertible_columns: Optional[Iterable[str]] = None
self.__not_null_insertible_columns: Optional[Iterable[str]] = None
self.__log_columns: Optional[Iterable[str]] = None
self.__translate_columns: Optional[Iterable[str]] = None
self.__display_columns: Optional[Iterable[str]] = None
self.__column_default: Optional[dict[str, Any]] = None
self.__base_location_rule: Optional[Iterable[str]] = None
self.__location_rule: Optional[Iterable[str]] = None
self.__row_location_rule: Optional[Iterable[str]] = None
self.__add_row_location_rule: Optional[Iterable[str]] = None
self.___add_child_row_location_rules: Optional[dict[str, str]] = None
self.__foreign_keys_to_parent_table: Optional[OrderedDict[str, Any]] = None
self.__relationships: Optional[list[tuple[str, Self]]] = None
self.__model_class: Optional[Any] = None
self.___relationship_by_foreign_table: Optional[dict[str, Self]] = None
@property
def __columns(self) -> OrderedDict[str, Any]:
if self.___columns is None:
ret: OrderedDict[str, Any] = OrderedDict()
for name in self._column_names():
ret[name] = Column(self, name, self._column_data_type(name))
self.___columns = ret
return self.___columns
@property
def __foreign_key_parent_tables(self) -> OrderedDict[str, Any]:
if self.___foreign_key_parent_tables is None:
self.___foreign_key_parent_tables = OrderedDict()
for cfk in self.foreign_key_constraints:
self.___foreign_key_parent_tables[cfk.parent_table.name] = cfk.parent_table
return self.___foreign_key_parent_tables
@property
def __relationship_by_foreign_table(self) -> dict[str, Self]:
if self.___relationship_by_foreign_table is None:
ret: dict[str, Self] = dict()
for member_name, table_name in self.relationships:
ret[member_name] = self.schema[table_name]
self.___relationship_by_foreign_table = ret
return self.___relationship_by_foreign_table
@property
def __add_child_row_location_rules(self) -> dict[str, str]:
if self.___add_child_row_location_rules is None:
ret: dict[str, str] = {}
for foreign_table_name, foreign_table in self.__relationship_by_foreign_table.items():
if len(self.foreign_keys_to_parent_table(foreign_table)):
rule = self._add_child_row_location_rule(foreign_table_name)
ret[foreign_table_name] = rule
self.___add_child_row_location_rules = ret
return self.___add_child_row_location_rules
def __len__(self):
return len(self.__columns)
def __iter__(self):
yield from self.__columns.values()
def __repr__(self) -> str:
return self.__name
def __getitem__(self, index):
return self.__columns[index]
def __eq__(self, rhs) -> bool:
if isinstance(rhs, Table):
if self.__name != rhs.__name:
return False
return True
elif isinstance(rhs, str):
if self.__name != rhs:
return False
return True
throw(ERR, f'Tried to compare table {self} to type {type(rhs)}: {rhs}')
return False # Unreachable but requested by mypy
def __hash__(self) -> int:
return hash(self.name)
# -- To be reimplemented
@abc.abstractmethod
def _column_names(self) -> Iterable[str]:
pass
@abc.abstractmethod
def _column_data_type(self, name) -> DataType:
pass
@abc.abstractmethod
def _primary_keys(self) -> Iterable[str]:
pass
@abc.abstractmethod
def _log_columns(self) -> Iterable[str]:
pass
@abc.abstractmethod
def _display_columns(self) -> Iterable[str]:
return None
#return self._primary_keys()
@abc.abstractmethod
def _nullable_columns(self) -> Iterable[str]:
pass
@abc.abstractmethod
def _auto_increment_columns(self) -> Iterable[str]:
pass
@abc.abstractmethod
def _translate_columns(self) -> Iterable[str]:
pass
@abc.abstractmethod
def _column_default(self, name) -> Any:
pass
@abc.abstractmethod
def _unique_constraints(self) -> list[list[str]]:
pass
def _model_name(self) -> Optional[str]:
slog(WARNING, f'Returning None model name for table {self.name}')
return None
def _model_module_search_paths(self) -> list[tuple[str, type]]:
return self.schema.model_module_search_paths # Fall back to Schema-global default
@abc.abstractmethod
def _query_name(self) -> str:
return 'tbl/' + self.__name
def _relationships(self) -> list[Self]:
return []
@abc.abstractmethod
def _row_query_name(self) -> str:
return 'row/' + self.__name
# -- common URL schema for all data
def _base_location_rule(self) -> Optional[str]:
return f'/{self.name}'
def _location_rule(self) -> Optional[str]:
ret = ''
for col in self.__schema.access_defining_columns:
if col in self.primary_keys:
ret += f'/<{col}>'
ret += self.base_location_rule
return ret
def _row_location_rule(self) -> Optional[str]:
ret = self._location_rule()
if ret is not None:
for col in self.primary_keys:
if col not in self.__schema.access_defining_columns:
ret += f'/<{col}>'
return ret
def _add_row_location_rule(self) -> Optional[str]:
return self._location_rule() + '/new'
def _add_child_row_location_rule(self, parent_table_name: str) -> Optional[str]:
parent_table = self.schema[parent_table_name]
ret = self._add_row_location_rule()
for cfk in self.foreign_keys_to_parent_table(parent_table):
for fk in cfk:
token = f'/<{fk.child_column.name}>'
if ret.find(token) != -1:
continue
ret += f'/<{fk.child_column.name}>'
return ret
# -- To be used
def column_default(self, name) -> Any:
if self.__column_default is None:
ret: dict[str, Any] = dict()
for name in self.column_names:
ret[name] = self._column_default(name)
self.__column_default = ret
return self.__column_default[name]
@property
def columns(self):
return self.__columns.values()
def column(self, name):
return self.__columns[name]
@property
def column_names(self) -> Iterable[str]:
return self.__columns.keys()
@property
def name(self) -> str:
return self.__name
@property
def schema(self):
return self.__schema
@property
def model_name(self) -> Optional[str]:
return self._model_name()
@property
def model_class(self) -> Any:
if self.__model_class is None:
pattern = r'^' + self.model_name + '$'
for module_path, base_class in self._model_module_search_paths():
ret = load_class(module_path, base_class, class_name_filter=pattern)
if ret is not None:
self.__model_class = ret
break
else:
throw(ERR, f'No model class found for model {self.model_name}')
return self.__model_class
def query_name(self) -> str:
return self._query_name()
def row_query_name(self) -> str:
return self._row_query_name()
@property
def base_location_rule(self):
if self.__base_location_rule is None:
self.__base_location_rule = self._base_location_rule()
return self.__base_location_rule
@property
def location_rule(self):
if self.__location_rule is None:
self.__location_rule = self._location_rule()
return self.__location_rule
def location(self, *args, **kwargs):
ret = self.location_rule
for token, val in kwargs.items(): # FIXME: Poor man's row location assembly
ret = re.sub(f'<{token}>', quote_plus(quote_plus(str(val))), ret)
return ret
@property
def row_location_rule(self):
if self.__row_location_rule is None:
self.__row_location_rule = self._row_location_rule()
return self.__row_location_rule
def row_location(self, *args, **kwargs):
ret = self.row_location_rule
for col in self.primary_keys:
if col in kwargs: # FIXME: Poor man's row location assembly
ret = re.sub(f'<{col}>', quote_plus(quote_plus(str(kwargs[col]))), ret)
return ret
@property
def add_row_location_rule(self):
if self.__add_row_location_rule is None:
self.__add_row_location_rule = self._add_row_location_rule()
return self.__add_row_location_rule
def add_row_location(self, *args, **kwargs) -> Optional[str]:
ret = self.add_row_location_rule
for col in self.primary_keys:
if col in kwargs: # FIXME: Poor man's row location assembly
ret = re.sub(f'<{col}>', quote_plus(quote_plus(str(kwargs[col]))), ret)
return ret
@property
def add_child_row_location_rules(self) -> Iterable[str]:
return self.__add_child_row_location_rules.values()
def add_child_row_location_rule(self, child_table: Union[Self, str]) -> Optional[str]:
if isinstance(child_table, Table):
child_table = child_table.name
return self.__add_child_row_location_rules.get(child_table)
def add_child_row_location(self, parent_table: Union[Self, str], **kwargs) -> Optional[str]:
ret = self.add_child_row_location_rule(parent_table)
if isinstance(parent_table, str):
parent_table = self.schema[parent_table]
if ret is None:
return None
for cfk in self.foreign_keys_to_parent_table(parent_table):
for fk in cfk:
if fk.parent_column.name in kwargs:
ret = re.sub(f'<{fk.child_column.name}>', quote_plus(quote_plus(str(kwargs[fk.parent_column.name]))), ret)
return ret
@property
def primary_keys(self) -> Iterable[str]:
if self.__primary_keys is None:
self.__primary_keys = self._primary_keys()
return self.__primary_keys
@property
def log_columns(self):
if self.__log_columns is None:
self.__log_columns = self._log_columns()
return self.__log_columns
@property
def display_columns(self):
if self.__display_columns is None:
self.__display_columns = self._display_columns()
return self.__display_columns
@property
def auto_increment_columns(self) -> Iterable[str]:
return self._auto_increment_columns()
@property
def translate_columns(self) -> Iterable[str]:
if self.__translate_columns is None:
self.__translate_columns = self._translate_columns()
return self.__translate_columns
@property
def nullable_columns(self) -> Iterable[str]:
if self.__nullable_columns is None:
self.__nullable_columns = self._nullable_columns()
return self.__nullable_columns
@property
def non_nullable_columns(self) -> Iterable[str]:
if self.__non_nullable_columns is None:
ret = []
all_cols = self.column_names
nullable_columns = self.nullable_columns
for col in all_cols:
if col not in nullable_columns:
ret.append(col)
self.__non_nullable_columns = ret
return self.__non_nullable_columns
@property
def null_insertible_columns(self) -> Iterable[str]:
if self.__null_insertible_columns is None:
ret: list[str] = []
for col in self.__columns.values():
if col.is_null_insertible:
ret.append(col.name)
self.__null_insertible_columns = ret
return self.__null_insertible_columns
@property
def not_null_insertible_columns(self) -> Iterable[str]:
if self.__not_null_insertible_columns is None:
ret: list[str] = []
for col in self.__columns.values():
if not col.is_null_insertible:
ret.append(col.name)
self.__not_null_insertible_columns = ret
return self.__not_null_insertible_columns
@property
def unique_constraints(self) -> list[ColumnSet]:
if self.__unique_constraints is None:
ret: list[ColumnSet] = []
impl = self._unique_constraints()
if impl is not None:
for columns in impl:
ret.append(ColumnSet(columns=columns))
self.__unique_constraints = ret
return self.__unique_constraints
@property
def foreign_key_constraints(self) -> list[CompositeForeignKey]:
if self.__foreign_key_constraints is None:
ret: list[Any] = []
for composite_key in self.__schema.foreign_key_constraints:
if composite_key.child_table == self:
ret.append(composite_key)
self.__foreign_key_constraints = ret
return self.__foreign_key_constraints
@property
def foreign_key_parent_tables(self):
return self.__foreign_key_parent_tables.values()
def foreign_keys_to_parent_table(self, parent_table) -> Iterable[CompositeForeignKey]:
if self.__foreign_keys_to_parent_table is None:
self.__foreign_keys_to_parent_table = OrderedDict()
for cfk in self.foreign_key_constraints:
pt = cfk.parent_table.name
if pt not in self.__foreign_keys_to_parent_table:
self.__foreign_keys_to_parent_table[pt] = []
self.__foreign_keys_to_parent_table[pt].append(cfk)
parent_table_name = parent_table if isinstance(parent_table, str) else parent_table.name
return self.__foreign_keys_to_parent_table[parent_table_name] if parent_table_name in self.__foreign_keys_to_parent_table else []
@property
def relationships(self) -> list[tuple[str, Self]]:
if self.__relationships is None:
ret = []
for member_name, table_name in self._relationships():
ret.append((member_name, self.schema[table_name]))
self.__relationships = ret
return self.__relationships
def relationship(self, table: Union[Self, str]) -> bool:
if isinstance(table, Table):
table = table.name
return self.__relationship_by_foreign_table.get(table)