mirror of
ssh://git.janware.com/srv/git/janware/proj/jw-python
synced 2026-01-15 09:53:32 +01:00
469 lines
17 KiB
Python
469 lines
17 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
from typing import Optional, Union, Iterable, Self, Any # TODO: Need any for many things, as I can't figure out how to avoid circular imports from here
|
|
|
|
import abc
|
|
from collections import OrderedDict
|
|
from urllib.parse import quote_plus
|
|
|
|
from ...log import *
|
|
from ...misc import load_class
|
|
|
|
from .ColumnSet import ColumnSet
|
|
from .DataType import DataType
|
|
from .CompositeForeignKey import CompositeForeignKey
|
|
from .Column import Column
|
|
|
|
class Table(abc.ABC): # export
|
|
|
|
def __init__(self, schema, name: str):
|
|
assert(isinstance(name, str))
|
|
self.__schema = schema
|
|
self.__name = name
|
|
|
|
self.___columns: Optional[OrderedDict[str, Any]] = None
|
|
self.___foreign_key_parent_tables: Optional[OrderedDict[str, Any]] = None
|
|
|
|
self.__primary_keys: Optional[Iterable[str]] = None
|
|
self.__unique_constraints: Optional[list[ColumnSet]] = None
|
|
self.__foreign_key_constraints: Optional[list[CompositeForeignKey]] = None
|
|
self.__nullable_columns: Optional[Iterable[str]] = None
|
|
self.__non_nullable_columns: Optional[Iterable[str]] = None
|
|
self.__null_insertible_columns: Optional[Iterable[str]] = None
|
|
self.__not_null_insertible_columns: Optional[Iterable[str]] = None
|
|
self.__log_columns: Optional[Iterable[str]] = None
|
|
self.__edit_columns: Optional[Iterable[str]] = None
|
|
self.__translate_columns: Optional[Iterable[str]] = None
|
|
self.__display_columns: Optional[Iterable[str]] = None
|
|
self.__default_sort_columns: Optional[Iterable[str]] = None
|
|
self.__column_default: Optional[dict[str, Any]] = None
|
|
self.__base_location_rule: Optional[Iterable[str]] = None
|
|
self.__location_rule: Optional[Iterable[str]] = None
|
|
self.__row_location_rule: Optional[Iterable[str]] = None
|
|
self.__add_row_location_rule: Optional[Iterable[str]] = None
|
|
self.___add_child_row_location_rules: Optional[dict[str, str]] = None
|
|
self.__foreign_keys_to_parent_table: Optional[OrderedDict[str, Any]] = None
|
|
self.__relationships: Optional[list[tuple[str, Self]]] = None
|
|
self.__model_class: Optional[Any] = None
|
|
self.___relationship_by_foreign_table: Optional[dict[str, Self]] = None
|
|
|
|
@property
|
|
def __columns(self) -> OrderedDict[str, Any]:
|
|
if self.___columns is None:
|
|
ret: OrderedDict[str, Any] = OrderedDict()
|
|
for name in self._column_names():
|
|
ret[name] = Column(self, name, self._column_data_type(name))
|
|
self.___columns = ret
|
|
return self.___columns
|
|
|
|
@property
|
|
def __foreign_key_parent_tables(self) -> OrderedDict[str, Any]:
|
|
if self.___foreign_key_parent_tables is None:
|
|
self.___foreign_key_parent_tables = OrderedDict()
|
|
for cfk in self.foreign_key_constraints:
|
|
self.___foreign_key_parent_tables[cfk.parent_table.name] = cfk.parent_table
|
|
return self.___foreign_key_parent_tables
|
|
|
|
@property
|
|
def __relationship_by_foreign_table(self) -> dict[str, Self]:
|
|
if self.___relationship_by_foreign_table is None:
|
|
ret: dict[str, Self] = dict()
|
|
for member_name, table_name in self.relationships:
|
|
ret[member_name] = self.schema[table_name]
|
|
self.___relationship_by_foreign_table = ret
|
|
return self.___relationship_by_foreign_table
|
|
|
|
@property
|
|
def __add_child_row_location_rules(self) -> dict[str, str]:
|
|
if self.___add_child_row_location_rules is None:
|
|
ret: dict[str, str] = {}
|
|
for foreign_table_name, foreign_table in self.__relationship_by_foreign_table.items():
|
|
if len([self.foreign_keys_to_parent_table(foreign_table)]):
|
|
rule = self._add_child_row_location_rule(foreign_table_name)
|
|
if rule is None:
|
|
continue
|
|
ret[foreign_table_name] = rule
|
|
self.___add_child_row_location_rules = ret
|
|
return self.___add_child_row_location_rules
|
|
|
|
def __len__(self):
|
|
return len(self.__columns)
|
|
|
|
def __iter__(self):
|
|
yield from self.__columns.values()
|
|
|
|
def __repr__(self) -> str:
|
|
return self.__name
|
|
|
|
def __getitem__(self, index):
|
|
return self.__columns[index]
|
|
|
|
def __eq__(self, rhs) -> bool:
|
|
if isinstance(rhs, Table):
|
|
if self.__name != rhs.__name:
|
|
return False
|
|
return True
|
|
elif isinstance(rhs, str):
|
|
if self.__name != rhs:
|
|
return False
|
|
return True
|
|
throw(ERR, f'Tried to compare table {self} to type {type(rhs)}: {rhs}')
|
|
return False # Unreachable but requested by mypy
|
|
|
|
def __hash__(self) -> int:
|
|
return hash(self.name)
|
|
|
|
# -- To be reimplemented
|
|
|
|
@abc.abstractmethod
|
|
def _column_names(self) -> Iterable[str]:
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def _column_data_type(self, name) -> DataType:
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def _primary_keys(self) -> Iterable[str]:
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def _log_columns(self) -> Iterable[str]:
|
|
pass
|
|
|
|
def _edit_columns(self) -> Iterable[str]:
|
|
return self._log_columns()
|
|
|
|
@abc.abstractmethod
|
|
def _display_columns(self) -> Optional[Iterable[str]]:
|
|
return None
|
|
#return self._primary_keys()
|
|
|
|
@abc.abstractmethod
|
|
def _default_sort_columns(self) -> Optional[Iterable[str]]:
|
|
return None
|
|
|
|
@abc.abstractmethod
|
|
def _nullable_columns(self) -> Iterable[str]:
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def _auto_increment_columns(self) -> Iterable[str]:
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def _translate_columns(self) -> Iterable[str]:
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def _column_default(self, name) -> Any:
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def _unique_constraints(self) -> list[list[str]]:
|
|
pass
|
|
|
|
def _model_name(self) -> Optional[str]:
|
|
slog(WARNING, f'Returning None model name for table {self.name}')
|
|
return None
|
|
|
|
def _model_module_search_paths(self) -> list[tuple[str, type]]:
|
|
return self.schema.model_module_search_paths # Fall back to Schema-global default
|
|
|
|
@abc.abstractmethod
|
|
def _query_name(self) -> str:
|
|
return 'tbl/' + self.__name
|
|
|
|
def _relationships(self) -> list[Self]:
|
|
return []
|
|
|
|
@abc.abstractmethod
|
|
def _row_query_name(self) -> str:
|
|
return 'row/' + self.__name
|
|
|
|
# -- common URL schema for all data
|
|
def _base_location_rule(self) -> Optional[str]:
|
|
return f'/{self.name}'
|
|
|
|
def _location_rule(self) -> Optional[str]:
|
|
ret = ''
|
|
for col in self.__schema.access_defining_columns:
|
|
if col in self.primary_keys:
|
|
ret += f'/<{col}>'
|
|
ret += self.base_location_rule
|
|
return ret
|
|
|
|
def _row_location_rule(self) -> Optional[str]:
|
|
ret = self._location_rule()
|
|
if ret is not None:
|
|
for col in self.primary_keys:
|
|
if col not in self.__schema.access_defining_columns:
|
|
ret += f'/<{col}>'
|
|
return ret
|
|
|
|
def _add_row_location_rule(self) -> Optional[str]:
|
|
rule = self._location_rule()
|
|
if rule is None:
|
|
return None
|
|
return rule + '/new'
|
|
|
|
def _add_child_row_location_rule(self, parent_table_name: str) -> Optional[str]:
|
|
parent_table = self.schema[parent_table_name]
|
|
ret = self._add_row_location_rule()
|
|
if ret is None:
|
|
return None
|
|
for cfk in self.foreign_keys_to_parent_table(parent_table):
|
|
for fk in cfk:
|
|
token = f'/<{fk.child_column.name}>'
|
|
if ret.find(token) != -1:
|
|
continue
|
|
ret += f'/<{fk.child_column.name}>'
|
|
return ret
|
|
|
|
# -- To be used
|
|
|
|
def column_default(self, name) -> Any:
|
|
if self.__column_default is None:
|
|
ret: dict[str, Any] = dict()
|
|
for name in self.column_names:
|
|
ret[name] = self._column_default(name)
|
|
self.__column_default = ret
|
|
return self.__column_default[name]
|
|
|
|
@property
|
|
def columns(self):
|
|
return self.__columns.values()
|
|
|
|
def column(self, name):
|
|
return self.__columns[name]
|
|
|
|
@property
|
|
def column_names(self) -> Iterable[str]:
|
|
return self.__columns.keys()
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return self.__name
|
|
|
|
@property
|
|
def schema(self):
|
|
return self.__schema
|
|
|
|
@property
|
|
def model_name(self) -> Optional[str]:
|
|
return self._model_name()
|
|
|
|
@property
|
|
def model_class(self) -> Any:
|
|
if self.__model_class is None:
|
|
model_name = self.model_name
|
|
if model_name is None:
|
|
return None
|
|
pattern = r'^' + model_name + '$'
|
|
for module_path, base_class in self._model_module_search_paths():
|
|
ret = load_class(module_path, base_class, class_name_filter=pattern)
|
|
if ret is not None:
|
|
self.__model_class = ret
|
|
break
|
|
else:
|
|
throw(ERR, f'No model class found for model {self.model_name}')
|
|
return self.__model_class
|
|
|
|
def query_name(self) -> str:
|
|
return self._query_name()
|
|
|
|
def row_query_name(self) -> str:
|
|
return self._row_query_name()
|
|
|
|
@property
|
|
def base_location_rule(self):
|
|
if self.__base_location_rule is None:
|
|
self.__base_location_rule = self._base_location_rule()
|
|
return self.__base_location_rule
|
|
|
|
@property
|
|
def location_rule(self):
|
|
if self.__location_rule is None:
|
|
self.__location_rule = self._location_rule()
|
|
return self.__location_rule
|
|
|
|
def location(self, *args, **kwargs):
|
|
ret = self.location_rule
|
|
for token, val in kwargs.items(): # FIXME: Poor man's row location assembly
|
|
ret = re.sub(f'<{token}>', quote_plus(quote_plus(str(val))), ret)
|
|
return ret
|
|
|
|
@property
|
|
def row_location_rule(self):
|
|
if self.__row_location_rule is None:
|
|
self.__row_location_rule = self._row_location_rule()
|
|
return self.__row_location_rule
|
|
|
|
def row_location(self, *args, **kwargs):
|
|
ret = self.row_location_rule
|
|
for col in self.primary_keys:
|
|
if col in kwargs: # FIXME: Poor man's row location assembly
|
|
ret = re.sub(f'<{col}>', quote_plus(quote_plus(str(kwargs[col]))), ret)
|
|
return ret
|
|
|
|
@property
|
|
def add_row_location_rule(self):
|
|
if self.__add_row_location_rule is None:
|
|
self.__add_row_location_rule = self._add_row_location_rule()
|
|
return self.__add_row_location_rule
|
|
|
|
def add_row_location(self, *args, **kwargs) -> Optional[str]:
|
|
ret = self.add_row_location_rule
|
|
for col in self.primary_keys:
|
|
if col in kwargs: # FIXME: Poor man's row location assembly
|
|
ret = re.sub(f'<{col}>', quote_plus(quote_plus(str(kwargs[col]))), ret)
|
|
return ret
|
|
|
|
@property
|
|
def add_child_row_location_rules(self) -> Iterable[str]:
|
|
return self.__add_child_row_location_rules.values()
|
|
|
|
def add_child_row_location_rule(self, child_table: Union[Self, str]) -> Optional[str]:
|
|
if isinstance(child_table, Table):
|
|
child_table = child_table.name
|
|
return self.__add_child_row_location_rules.get(child_table)
|
|
|
|
def add_child_row_location(self, parent_table: Union[Self, str], **kwargs) -> Optional[str]:
|
|
ret = self.add_child_row_location_rule(parent_table)
|
|
if isinstance(parent_table, str):
|
|
parent_table = self.schema[parent_table]
|
|
if ret is None:
|
|
return None
|
|
for cfk in self.foreign_keys_to_parent_table(parent_table):
|
|
for fk in cfk:
|
|
if fk.parent_column.name in kwargs:
|
|
ret = re.sub(f'<{fk.child_column.name}>', quote_plus(quote_plus(str(kwargs[fk.parent_column.name]))), ret)
|
|
return ret
|
|
|
|
@property
|
|
def primary_keys(self) -> Iterable[str]:
|
|
if self.__primary_keys is None:
|
|
self.__primary_keys = self._primary_keys()
|
|
return self.__primary_keys
|
|
|
|
@property
|
|
def log_columns(self):
|
|
if self.__log_columns is None:
|
|
self.__log_columns = self._log_columns()
|
|
return self.__log_columns
|
|
|
|
@property
|
|
def edit_columns(self):
|
|
if self.__edit_columns is None:
|
|
self.__edit_columns = self._edit_columns()
|
|
return self.__edit_columns
|
|
|
|
@property
|
|
def display_columns(self):
|
|
if self.__display_columns is None:
|
|
self.__display_columns = self._display_columns()
|
|
return self.__display_columns
|
|
|
|
@property
|
|
def default_sort_columns(self):
|
|
if self.__default_sort_columns is None:
|
|
self.__default_sort_columns = self._default_sort_columns()
|
|
return self.__default_sort_columns
|
|
|
|
@property
|
|
def auto_increment_columns(self) -> Iterable[str]:
|
|
return self._auto_increment_columns()
|
|
|
|
@property
|
|
def translate_columns(self) -> Iterable[str]:
|
|
if self.__translate_columns is None:
|
|
self.__translate_columns = self._translate_columns()
|
|
return self.__translate_columns
|
|
|
|
@property
|
|
def nullable_columns(self) -> Iterable[str]:
|
|
if self.__nullable_columns is None:
|
|
self.__nullable_columns = self._nullable_columns()
|
|
return self.__nullable_columns
|
|
|
|
@property
|
|
def non_nullable_columns(self) -> Iterable[str]:
|
|
if self.__non_nullable_columns is None:
|
|
ret = []
|
|
all_cols = self.column_names
|
|
nullable_columns = self.nullable_columns
|
|
for col in all_cols:
|
|
if col not in nullable_columns:
|
|
ret.append(col)
|
|
self.__non_nullable_columns = ret
|
|
return self.__non_nullable_columns
|
|
|
|
@property
|
|
def null_insertible_columns(self) -> Iterable[str]:
|
|
if self.__null_insertible_columns is None:
|
|
ret: list[str] = []
|
|
for col in self.__columns.values():
|
|
if col.is_null_insertible:
|
|
ret.append(col.name)
|
|
self.__null_insertible_columns = ret
|
|
return self.__null_insertible_columns
|
|
|
|
@property
|
|
def not_null_insertible_columns(self) -> Iterable[str]:
|
|
if self.__not_null_insertible_columns is None:
|
|
ret: list[str] = []
|
|
for col in self.__columns.values():
|
|
if not col.is_null_insertible:
|
|
ret.append(col.name)
|
|
self.__not_null_insertible_columns = ret
|
|
return self.__not_null_insertible_columns
|
|
|
|
@property
|
|
def unique_constraints(self) -> list[ColumnSet]:
|
|
if self.__unique_constraints is None:
|
|
ret: list[ColumnSet] = []
|
|
impl = self._unique_constraints()
|
|
if impl is not None:
|
|
for columns in impl:
|
|
ret.append(ColumnSet(columns=columns))
|
|
self.__unique_constraints = ret
|
|
return self.__unique_constraints
|
|
|
|
@property
|
|
def foreign_key_constraints(self) -> list[CompositeForeignKey]:
|
|
if self.__foreign_key_constraints is None:
|
|
ret: list[Any] = []
|
|
for composite_key in self.__schema.foreign_key_constraints:
|
|
if composite_key.child_table == self:
|
|
ret.append(composite_key)
|
|
self.__foreign_key_constraints = ret
|
|
return self.__foreign_key_constraints
|
|
|
|
@property
|
|
def foreign_key_parent_tables(self):
|
|
return self.__foreign_key_parent_tables.values()
|
|
|
|
def foreign_keys_to_parent_table(self, parent_table) -> Iterable[CompositeForeignKey]:
|
|
if self.__foreign_keys_to_parent_table is None:
|
|
self.__foreign_keys_to_parent_table = OrderedDict()
|
|
for cfk in self.foreign_key_constraints:
|
|
pt = cfk.parent_table.name
|
|
if pt not in self.__foreign_keys_to_parent_table:
|
|
self.__foreign_keys_to_parent_table[pt] = []
|
|
self.__foreign_keys_to_parent_table[pt].append(cfk)
|
|
parent_table_name = parent_table if isinstance(parent_table, str) else parent_table.name
|
|
return self.__foreign_keys_to_parent_table[parent_table_name] if parent_table_name in self.__foreign_keys_to_parent_table else []
|
|
|
|
@property
|
|
def relationships(self) -> list[tuple[str, Self]]:
|
|
if self.__relationships is None:
|
|
ret = []
|
|
for member_name, table_name in self._relationships():
|
|
ret.append((member_name, self.schema[table_name]))
|
|
self.__relationships = ret
|
|
return self.__relationships
|
|
|
|
def relationship(self, table: Union[Self, str]) -> Optional[Self]:
|
|
if isinstance(table, Table):
|
|
table = table.name
|
|
return self.__relationship_by_foreign_table.get(table)
|