# -*- coding: utf-8 -*- from typing import Optional, Union, Iterable, Self, Any # TODO: Need any for many things, as I can't figure out how to avoid circular imports from here import abc from collections import OrderedDict from urllib.parse import quote_plus from ...log import * from ...misc import load_class from .ColumnSet import ColumnSet from .DataType import DataType from .CompositeForeignKey import CompositeForeignKey from .Column import Column class Table(abc.ABC): # export def __init__(self, schema, name: str): assert(isinstance(name, str)) self.__schema = schema self.__name = name self.___columns: Optional[OrderedDict[str, Any]] = None self.___foreign_key_parent_tables: Optional[OrderedDict[str, Any]] = None self.__primary_keys: Optional[Iterable[str]] = None self.__unique_constraints: Optional[list[ColumnSet]] = None self.__foreign_key_constraints: Optional[list[CompositeForeignKey]] = None self.__nullable_columns: Optional[Iterable[str]] = None self.__non_nullable_columns: Optional[Iterable[str]] = None self.__null_insertible_columns: Optional[Iterable[str]] = None self.__not_null_insertible_columns: Optional[Iterable[str]] = None self.__log_columns: Optional[Iterable[str]] = None self.__edit_columns: Optional[Iterable[str]] = None self.__translate_columns: Optional[Iterable[str]] = None self.__display_columns: Optional[Iterable[str]] = None self.__default_sort_columns: Optional[Iterable[str]] = None self.__column_default: Optional[dict[str, Any]] = None self.__base_location_rule: Optional[Iterable[str]] = None self.__location_rule: Optional[Iterable[str]] = None self.__row_location_rule: Optional[Iterable[str]] = None self.__add_row_location_rule: Optional[Iterable[str]] = None self.___add_child_row_location_rules: Optional[dict[str, str]] = None self.__foreign_keys_to_parent_table: Optional[OrderedDict[str, Any]] = None self.__relationships: Optional[list[tuple[str, Self]]] = None self.__model_class: Optional[Any] = None self.___relationship_by_foreign_table: Optional[dict[str, Self]] = None @property def __columns(self) -> OrderedDict[str, Any]: if self.___columns is None: ret: OrderedDict[str, Any] = OrderedDict() for name in self._column_names(): ret[name] = Column(self, name, self._column_data_type(name)) self.___columns = ret return self.___columns @property def __foreign_key_parent_tables(self) -> OrderedDict[str, Any]: if self.___foreign_key_parent_tables is None: self.___foreign_key_parent_tables = OrderedDict() for cfk in self.foreign_key_constraints: self.___foreign_key_parent_tables[cfk.parent_table.name] = cfk.parent_table return self.___foreign_key_parent_tables @property def __relationship_by_foreign_table(self) -> dict[str, Self]: if self.___relationship_by_foreign_table is None: ret: dict[str, Self] = dict() for member_name, table_name in self.relationships: ret[member_name] = self.schema[table_name] self.___relationship_by_foreign_table = ret return self.___relationship_by_foreign_table @property def __add_child_row_location_rules(self) -> dict[str, str]: if self.___add_child_row_location_rules is None: ret: dict[str, str] = {} for foreign_table_name, foreign_table in self.__relationship_by_foreign_table.items(): if len(self.foreign_keys_to_parent_table(foreign_table)): rule = self._add_child_row_location_rule(foreign_table_name) ret[foreign_table_name] = rule self.___add_child_row_location_rules = ret return self.___add_child_row_location_rules def __len__(self): return len(self.__columns) def __iter__(self): yield from self.__columns.values() def __repr__(self) -> str: return self.__name def __getitem__(self, index): return self.__columns[index] def __eq__(self, rhs) -> bool: if isinstance(rhs, Table): if self.__name != rhs.__name: return False return True elif isinstance(rhs, str): if self.__name != rhs: return False return True throw(ERR, f'Tried to compare table {self} to type {type(rhs)}: {rhs}') return False # Unreachable but requested by mypy def __hash__(self) -> int: return hash(self.name) # -- To be reimplemented @abc.abstractmethod def _column_names(self) -> Iterable[str]: pass @abc.abstractmethod def _column_data_type(self, name) -> DataType: pass @abc.abstractmethod def _primary_keys(self) -> Iterable[str]: pass @abc.abstractmethod def _log_columns(self) -> Iterable[str]: pass def _edit_columns(self) -> Iterable[str]: return self._log_columns() @abc.abstractmethod def _display_columns(self) -> Iterable[str]: return None #return self._primary_keys() @abc.abstractmethod def _default_sort_columns(self) -> Iterable[str]: return None @abc.abstractmethod def _nullable_columns(self) -> Iterable[str]: pass @abc.abstractmethod def _auto_increment_columns(self) -> Iterable[str]: pass @abc.abstractmethod def _translate_columns(self) -> Iterable[str]: pass @abc.abstractmethod def _column_default(self, name) -> Any: pass @abc.abstractmethod def _unique_constraints(self) -> list[list[str]]: pass def _model_name(self) -> Optional[str]: slog(WARNING, f'Returning None model name for table {self.name}') return None def _model_module_search_paths(self) -> list[tuple[str, type]]: return self.schema.model_module_search_paths # Fall back to Schema-global default @abc.abstractmethod def _query_name(self) -> str: return 'tbl/' + self.__name def _relationships(self) -> list[Self]: return [] @abc.abstractmethod def _row_query_name(self) -> str: return 'row/' + self.__name # -- common URL schema for all data def _base_location_rule(self) -> Optional[str]: return f'/{self.name}' def _location_rule(self) -> Optional[str]: ret = '' for col in self.__schema.access_defining_columns: if col in self.primary_keys: ret += f'/<{col}>' ret += self.base_location_rule return ret def _row_location_rule(self) -> Optional[str]: ret = self._location_rule() if ret is not None: for col in self.primary_keys: if col not in self.__schema.access_defining_columns: ret += f'/<{col}>' return ret def _add_row_location_rule(self) -> Optional[str]: return self._location_rule() + '/new' def _add_child_row_location_rule(self, parent_table_name: str) -> Optional[str]: parent_table = self.schema[parent_table_name] ret = self._add_row_location_rule() for cfk in self.foreign_keys_to_parent_table(parent_table): for fk in cfk: token = f'/<{fk.child_column.name}>' if ret.find(token) != -1: continue ret += f'/<{fk.child_column.name}>' return ret # -- To be used def column_default(self, name) -> Any: if self.__column_default is None: ret: dict[str, Any] = dict() for name in self.column_names: ret[name] = self._column_default(name) self.__column_default = ret return self.__column_default[name] @property def columns(self): return self.__columns.values() def column(self, name): return self.__columns[name] @property def column_names(self) -> Iterable[str]: return self.__columns.keys() @property def name(self) -> str: return self.__name @property def schema(self): return self.__schema @property def model_name(self) -> Optional[str]: return self._model_name() @property def model_class(self) -> Any: if self.__model_class is None: pattern = r'^' + self.model_name + '$' for module_path, base_class in self._model_module_search_paths(): ret = load_class(module_path, base_class, class_name_filter=pattern) if ret is not None: self.__model_class = ret break else: throw(ERR, f'No model class found for model {self.model_name}') return self.__model_class def query_name(self) -> str: return self._query_name() def row_query_name(self) -> str: return self._row_query_name() @property def base_location_rule(self): if self.__base_location_rule is None: self.__base_location_rule = self._base_location_rule() return self.__base_location_rule @property def location_rule(self): if self.__location_rule is None: self.__location_rule = self._location_rule() return self.__location_rule def location(self, *args, **kwargs): ret = self.location_rule for token, val in kwargs.items(): # FIXME: Poor man's row location assembly ret = re.sub(f'<{token}>', quote_plus(quote_plus(str(val))), ret) return ret @property def row_location_rule(self): if self.__row_location_rule is None: self.__row_location_rule = self._row_location_rule() return self.__row_location_rule def row_location(self, *args, **kwargs): ret = self.row_location_rule for col in self.primary_keys: if col in kwargs: # FIXME: Poor man's row location assembly ret = re.sub(f'<{col}>', quote_plus(quote_plus(str(kwargs[col]))), ret) return ret @property def add_row_location_rule(self): if self.__add_row_location_rule is None: self.__add_row_location_rule = self._add_row_location_rule() return self.__add_row_location_rule def add_row_location(self, *args, **kwargs) -> Optional[str]: ret = self.add_row_location_rule for col in self.primary_keys: if col in kwargs: # FIXME: Poor man's row location assembly ret = re.sub(f'<{col}>', quote_plus(quote_plus(str(kwargs[col]))), ret) return ret @property def add_child_row_location_rules(self) -> Iterable[str]: return self.__add_child_row_location_rules.values() def add_child_row_location_rule(self, child_table: Union[Self, str]) -> Optional[str]: if isinstance(child_table, Table): child_table = child_table.name return self.__add_child_row_location_rules.get(child_table) def add_child_row_location(self, parent_table: Union[Self, str], **kwargs) -> Optional[str]: ret = self.add_child_row_location_rule(parent_table) if isinstance(parent_table, str): parent_table = self.schema[parent_table] if ret is None: return None for cfk in self.foreign_keys_to_parent_table(parent_table): for fk in cfk: if fk.parent_column.name in kwargs: ret = re.sub(f'<{fk.child_column.name}>', quote_plus(quote_plus(str(kwargs[fk.parent_column.name]))), ret) return ret @property def primary_keys(self) -> Iterable[str]: if self.__primary_keys is None: self.__primary_keys = self._primary_keys() return self.__primary_keys @property def log_columns(self): if self.__log_columns is None: self.__log_columns = self._log_columns() return self.__log_columns @property def edit_columns(self): if self.__edit_columns is None: self.__edit_columns = self._edit_columns() return self.__edit_columns @property def display_columns(self): if self.__display_columns is None: self.__display_columns = self._display_columns() return self.__display_columns @property def default_sort_columns(self): if self.__default_sort_columns is None: self.__default_sort_columns = self._default_sort_columns() return self.__default_sort_columns @property def auto_increment_columns(self) -> Iterable[str]: return self._auto_increment_columns() @property def translate_columns(self) -> Iterable[str]: if self.__translate_columns is None: self.__translate_columns = self._translate_columns() return self.__translate_columns @property def nullable_columns(self) -> Iterable[str]: if self.__nullable_columns is None: self.__nullable_columns = self._nullable_columns() return self.__nullable_columns @property def non_nullable_columns(self) -> Iterable[str]: if self.__non_nullable_columns is None: ret = [] all_cols = self.column_names nullable_columns = self.nullable_columns for col in all_cols: if col not in nullable_columns: ret.append(col) self.__non_nullable_columns = ret return self.__non_nullable_columns @property def null_insertible_columns(self) -> Iterable[str]: if self.__null_insertible_columns is None: ret: list[str] = [] for col in self.__columns.values(): if col.is_null_insertible: ret.append(col.name) self.__null_insertible_columns = ret return self.__null_insertible_columns @property def not_null_insertible_columns(self) -> Iterable[str]: if self.__not_null_insertible_columns is None: ret: list[str] = [] for col in self.__columns.values(): if not col.is_null_insertible: ret.append(col.name) self.__not_null_insertible_columns = ret return self.__not_null_insertible_columns @property def unique_constraints(self) -> list[ColumnSet]: if self.__unique_constraints is None: ret: list[ColumnSet] = [] impl = self._unique_constraints() if impl is not None: for columns in impl: ret.append(ColumnSet(columns=columns)) self.__unique_constraints = ret return self.__unique_constraints @property def foreign_key_constraints(self) -> list[CompositeForeignKey]: if self.__foreign_key_constraints is None: ret: list[Any] = [] for composite_key in self.__schema.foreign_key_constraints: if composite_key.child_table == self: ret.append(composite_key) self.__foreign_key_constraints = ret return self.__foreign_key_constraints @property def foreign_key_parent_tables(self): return self.__foreign_key_parent_tables.values() def foreign_keys_to_parent_table(self, parent_table) -> Iterable[CompositeForeignKey]: if self.__foreign_keys_to_parent_table is None: self.__foreign_keys_to_parent_table = OrderedDict() for cfk in self.foreign_key_constraints: pt = cfk.parent_table.name if pt not in self.__foreign_keys_to_parent_table: self.__foreign_keys_to_parent_table[pt] = [] self.__foreign_keys_to_parent_table[pt].append(cfk) parent_table_name = parent_table if isinstance(parent_table, str) else parent_table.name return self.__foreign_keys_to_parent_table[parent_table_name] if parent_table_name in self.__foreign_keys_to_parent_table else [] @property def relationships(self) -> list[tuple[str, Self]]: if self.__relationships is None: ret = [] for member_name, table_name in self._relationships(): ret.append((member_name, self.schema[table_name])) self.__relationships = ret return self.__relationships def relationship(self, table: Union[Self, str]) -> bool: if isinstance(table, Table): table = table.name return self.__relationship_by_foreign_table.get(table)