From c50c614f131578906bd833a67acc78ef0fde3405 Mon Sep 17 00:00:00 2001 From: Jan Lindemann Date: Thu, 30 Jan 2025 20:34:44 +0100 Subject: [PATCH] schema: Continue Signed-off-by: Jan Lindemann --- tools/python/jwutils/db/schema/Column.py | 39 +++++ tools/python/jwutils/db/schema/ColumnSet.py | 8 +- .../jwutils/db/schema/CompositeForeignKey.py | 33 ++++- tools/python/jwutils/db/schema/DataType.py | 10 +- tools/python/jwutils/db/schema/Schema.py | 26 +++- .../jwutils/db/schema/SingleForeignKey.py | 22 ++- tools/python/jwutils/db/schema/Table.py | 137 ++++++++++++++---- 7 files changed, 223 insertions(+), 52 deletions(-) diff --git a/tools/python/jwutils/db/schema/Column.py b/tools/python/jwutils/db/schema/Column.py index e5133e5..ab8e235 100644 --- a/tools/python/jwutils/db/schema/Column.py +++ b/tools/python/jwutils/db/schema/Column.py @@ -20,10 +20,27 @@ class Column(abc.ABC): # export self.__default_value_cached: bool = False self.__is_auto_increment: Optional[bool] = None self.__data_type: DataType = data_type + self.__foreign_keys: Optional[Any] = None + self.__foreign_keys_cached: bool = False + self.__foreign_keys_by_table: Optional[dict[str, Any]] = None def __repr__(self): return f'{self.__table.name}.{self.__name}: {self.__data_type}' + def __eq__(self, rhs) -> bool: + if isinstance(rhs, Column): + if self.__table != rhs.__table: + return False + if self.__name != rhs.__name: + return False + return True + elif isinstance(rhs, str): + if self.__name != rhs: + return False + return True + throw(ERR, f'Tried to compare column {self} to type {type(rhs)}: {rhs}') + return False # Unreachable but requested by mypy + @property def name(self) -> str: return self.__name @@ -73,3 +90,25 @@ class Column(abc.ABC): # export self.__default_value = self.__table.column_default(self.name) self.__default_value_cached = True return self.__default_value + + # Returns Column object on parent table + @property + def foreign_keys(self) -> Optional[Any]: + if not self.__foreign_keys_cached: + fks: list[Any] = [] + for cfk in self.__table.foreign_key_constraints: + for fk in cfk: + if fk.child_column == self: + fks.append(fk.parent_column) + self.__foreign_keys_cached = True + self.__foreign_keys = fks if fks else None + return self.__foreign_keys + + def foreign_key(self, table) -> Optional[Any]: + if self.__foreign_keys_by_table is None: + self.__foreign_keys_by_table = dict() + for col in self.foreign_keys: # type: ignore # Any not iterable + assert(col.table.name not in self.__foreign_keys_by_table) + self.__foreign_keys_by_table[col.table.name] = col + table_name = table if isinstance(table, str) else table.name + return self.__foreign_keys_by_table.get(table_name) diff --git a/tools/python/jwutils/db/schema/ColumnSet.py b/tools/python/jwutils/db/schema/ColumnSet.py index 9426696..8a98cc8 100644 --- a/tools/python/jwutils/db/schema/ColumnSet.py +++ b/tools/python/jwutils/db/schema/ColumnSet.py @@ -1,11 +1,11 @@ # -*- coding: utf-8 -*- -from typing import Optional, List, Iterable, Any +from typing import Optional, Iterable, Any class ColumnSet: # export - def __init__(self, *args: List[Any], columns: List[Any]=[], table: Optional[Any]=None, names: Optional[List[str]]=None): - self.__columns: List[Any] = [*args] + def __init__(self, *args: list[Any], columns: list[Any]=[], table: Optional[Any]=None, names: Optional[list[str]]=None): + self.__columns: list[Any] = [*args] self.__columns.extend(columns) self.__table = table if names is not None: @@ -39,5 +39,5 @@ class ColumnSet: # export return True @property - def columns(self) -> List[Any]: + def columns(self) -> list[Any]: return self.__columns diff --git a/tools/python/jwutils/db/schema/CompositeForeignKey.py b/tools/python/jwutils/db/schema/CompositeForeignKey.py index b77e9a7..4e80ff0 100644 --- a/tools/python/jwutils/db/schema/CompositeForeignKey.py +++ b/tools/python/jwutils/db/schema/CompositeForeignKey.py @@ -1,6 +1,8 @@ # -*- coding: utf-8 -*- -from typing import Optional, List, Any +from typing import Optional, Any + +from jwutils.log import * from .ColumnSet import ColumnSet from .SingleForeignKey import SingleForeignKey @@ -26,7 +28,9 @@ class CompositeForeignKey: # export assert(len(self.__child_col_set) == len(self.__parent_col_set)) self.__len = len(self.__child_col_set) - self.__column_relations: Optional[List[SingleForeignKey]] = None + self.__column_relations: Optional[list[SingleForeignKey]] = None + self.__parent_columns_by_child_column: Optional[dict[str, Column]] = None + self.__child_columns_by_parent_column: Optional[dict[str, Column]] = None def __table_rel_str(self): return f'{self.__child_table.name} => {self.__parent_table.name}' @@ -42,7 +46,7 @@ class CompositeForeignKey: # export def __repr__(self): ret = self.__table_rel_str() - ret += '|| ' + ' | '.join([self.__cols_rel_str(rel.child_col, rel.parent_col) for rel in self.column_relations]) + ret += ': ' + ', '.join([self.__cols_rel_str(rel.child_column, rel.parent_column) for rel in self.column_relations]) return ret def __eq__(self, rhs): @@ -68,8 +72,29 @@ class CompositeForeignKey: # export def parent_columns(self) -> ColumnSet: return self.__parent_col_set + def parent_column(self, child_column): + child_column_name = child_column if isinstance(child_column, str) else child_column.name + if self.__parent_columns_by_child_column is None: + d: dict[str, Column] = {} + assert(len(self.__child_col_set) == len(self.__parent_col_set)) + for i in range(0, len(self.__child_col_set)): + d[self.__child_col_set[i].name] = self.__parent_col_set[i] + self.__parent_columns_by_child_column = d + return self.__parent_columns_by_child_column[child_column] + + def child_column(self, parent_column): + slog(WARNING, f'{self}: Looking for child column belonging to parent column "{parent_column}"') + parent_column_name = parent_column if isinstance(parent_column, str) else parent_column.name + if self.__child_columns_by_parent_column is None: + d: dict[str, Column] = {} + assert(len(self.__parent_col_set) == len(self.__child_col_set)) + for i in range(0, len(self.__parent_col_set)): + d[self.__parent_col_set[i].name] = self.__child_col_set[i] + self.__child_columns_by_parent_column = d + return self.__child_columns_by_parent_column[parent_column] + @property - def column_relations(self) -> List[Any]: + def column_relations(self) -> list[Any]: ret = [] if self.__column_relations is None: for i in range(0, self.__len): diff --git a/tools/python/jwutils/db/schema/DataType.py b/tools/python/jwutils/db/schema/DataType.py index b03c1e9..6118267 100644 --- a/tools/python/jwutils/db/schema/DataType.py +++ b/tools/python/jwutils/db/schema/DataType.py @@ -48,12 +48,20 @@ class DataType: # export assert(size > 0) self.__id = type_id self.__size = size + def __repr__(self): - ret = f'{self.__id}' + ret = f'{self.__id.name}' if self.__size is not None: ret += f'({self.__size})' return ret + def __eq__(self, rhs): + if self.__id != rhs.__id: + return False + if self.__size != rhs.__size: + return False + return True + @property def type_id(self) -> Id: return self.__id diff --git a/tools/python/jwutils/db/schema/Schema.py b/tools/python/jwutils/db/schema/Schema.py index c14428a..61382b7 100644 --- a/tools/python/jwutils/db/schema/Schema.py +++ b/tools/python/jwutils/db/schema/Schema.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -from typing import Optional, List, Iterable +from typing import Optional, Iterable import abc @@ -14,16 +14,16 @@ from .CompositeForeignKey import CompositeForeignKey class Schema(abc.ABC): # export def __init__(self) -> None: - self.___tables: Optional[List[Table]] = None - self.__foreign_keys: Optional[List[CompositeForeignKey]] = None - self.__access_defining_columns: Optional[List[str]] = None + self.___tables: Optional[list[Table]] = None + self.__foreign_keys: Optional[list[CompositeForeignKey]] = None + self.__access_defining_columns: Optional[list[str]] = None @property def __tables(self): if self.___tables is None: ret = dict() for name in self._table_names(): - slog(WARNING, f'Caching metadata for table "{name}"') + slog(DEBUG, f'Caching metadata for table "{name}"') assert(isinstance(name, str)) ret[name] = self._table(name) self.___tables = ret @@ -42,7 +42,7 @@ class Schema(abc.ABC): # export return None # type: ignore @abc.abstractmethod - def _foreign_keys(self) -> List[CompositeForeignKey]: + def _foreign_keys(self) -> list[CompositeForeignKey]: pass @abc.abstractmethod @@ -51,6 +51,18 @@ class Schema(abc.ABC): # export # ------ API to be called + def __len__(self): + return len(self.__tables) + + def __iter__(self): + yield from self.__tables.values() + + def __repr__(self): + return '|'.join([table.name for table in self.__tables]) + + def __getitem__(self, index): + return self.__tables[index] + @property def table_names(self) -> Iterable[str]: return self.__tables.keys() @@ -66,7 +78,7 @@ class Schema(abc.ABC): # export return self.__access_defining_columns @property - def foreign_key_constraints(self) -> List[CompositeForeignKey]: + def foreign_key_constraints(self) -> list[CompositeForeignKey]: if self.__foreign_keys is None: self.__foreign_keys = self._foreign_keys() return self.__foreign_keys diff --git a/tools/python/jwutils/db/schema/SingleForeignKey.py b/tools/python/jwutils/db/schema/SingleForeignKey.py index aa6b7e0..0045fdf 100644 --- a/tools/python/jwutils/db/schema/SingleForeignKey.py +++ b/tools/python/jwutils/db/schema/SingleForeignKey.py @@ -1,19 +1,33 @@ # -*- coding: utf-8 -*- -from typing import Optional, List, Any +from typing import Optional, Any +from .Column import Column from .ColumnSet import ColumnSet class SingleForeignKey: - def __init__(self, child_col, parent_col): + def __init__(self, child_col: Column, parent_col: Column): self.__child_col = child_col self.__parent_col = parent_col + self.__iterable = (self.__child_col, self.__parent_col) + + def __len__(self): + return 2 + + def __iter__(self): + yield from self.__iterable + + def __repr__(self): + return f'{self.__child_col.table.name}.{self.__child_col.name} -> {self.__parent_col.table.name}.{self.__parent_col.name}' + + def __getitem__(self, index): + return self.__iterable[index] @property - def child_col(self): + def child_column(self): return self.__child_col @property - def parent_col(self): + def parent_column(self): return self.__parent_col diff --git a/tools/python/jwutils/db/schema/Table.py b/tools/python/jwutils/db/schema/Table.py index 8111357..b3bceb7 100644 --- a/tools/python/jwutils/db/schema/Table.py +++ b/tools/python/jwutils/db/schema/Table.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -from typing import Optional, Iterable, List, Any # TODO: Need any for many things, as I can't figure out how to avoid circular imports from here +from typing import Optional, Iterable, Any # TODO: Need any for many things, as I can't figure out how to avoid circular imports from here import abc from collections import OrderedDict @@ -18,26 +18,27 @@ class Table(abc.ABC): # export assert(isinstance(name, str)) self.__schema = schema self.__name = name - self.__primary_keys: Optional[Iterable[str]] = None - self.__unique_constraints: Optional[List[ColumnSet]] = None - self.__foreign_key_constraints: Optional[List[CompositeForeignKey]] = None + self.___columns: Optional[OrderedDict[str, Any]] = None + self.___foreign_key_parent_tables: Optional[OrderedDict[str, Any]] = None + + self.__primary_keys: Optional[Iterable[str]] = None + self.__unique_constraints: Optional[list[ColumnSet]] = None + self.__foreign_key_constraints: Optional[list[CompositeForeignKey]] = None self.__nullable_columns: Optional[Iterable[str]] = None self.__non_nullable_columns: Optional[Iterable[str]] = None self.__null_insertible_columns: Optional[Iterable[str]] = None self.__not_null_insertible_columns: Optional[Iterable[str]] = None self.__log_columns: Optional[Iterable[str]] = None + self.__display_columns: Optional[Iterable[str]] = None self.__column_default: Optional[dict[str, Any]] = None - self.__base_location: Optional[Iterable[str]] = None self.__location: Optional[Iterable[str]] = None self.__row_location: Optional[Iterable[str]] = None - - def __repr__(self) -> str: - return self.__name + self.__foreign_keys_to_parent_table: Optional[OrderedDict[str, Any]] = None @property - def __columns(self): + def __columns(self) -> OrderedDict[str, Any]: if self.___columns is None: ret: OrderedDict[str, Any] = OrderedDict() for name in self._column_names(): @@ -45,16 +46,47 @@ class Table(abc.ABC): # export self.___columns = ret return self.___columns + @property + def __foreign_key_parent_tables(self) -> OrderedDict[str, Any]: + if self.___foreign_key_parent_tables is None: + self.___foreign_key_parent_tables = OrderedDict() + for cfk in self.foreign_key_constraints: + self.___foreign_key_parent_tables[cfk.parent_table.name] = cfk.parent_table + return self.___foreign_key_parent_tables + + def __len__(self): + return len(self.__columns) + + def __iter__(self): + yield from self.__columns.values() + + def __repr__(self) -> str: + return self.__name + + def __getitem__(self, index): + return self.__columns[index] + + def __eq__(self, rhs) -> bool: + if isinstance(rhs, Table): + if self.__name != rhs.__name: + return False + return True + elif isinstance(rhs, str): + if self.__name != rhs: + return False + return True + throw(ERR, f'Tried to compare table {self} to type {type(rhs)}: {rhs}') + return False # Unreachable but requested by mypy + + def __hash__(self) -> int: + return hash(self.name) + # -- To be reimplemented @abc.abstractmethod def _column_names(self) -> Iterable[str]: pass - @abc.abstractmethod - def _log_columns(self) -> Iterable[str]: - pass - @abc.abstractmethod def _column_data_type(self, name) -> DataType: pass @@ -63,6 +95,18 @@ class Table(abc.ABC): # export def _primary_keys(self) -> Iterable[str]: pass + @abc.abstractmethod + def _log_columns(self) -> Iterable[str]: + pass + + @abc.abstractmethod + def _display_columns(self) -> Iterable[str]: + return self._primary_keys() + + @abc.abstractmethod + def _nullable_columns(self) -> Iterable[str]: + pass + @abc.abstractmethod def _auto_increment_columns(self) -> Iterable[str]: pass @@ -72,7 +116,7 @@ class Table(abc.ABC): # export pass @abc.abstractmethod - def _unique_constraints(self) -> List[List[str]]: + def _unique_constraints(self) -> list[list[str]]: pass def _model_name(self) -> Optional[str]: @@ -84,9 +128,13 @@ class Table(abc.ABC): # export throw(ERR, "Not implemented") return None + @abc.abstractmethod + def _query_name(self) -> str: + return self.__name + # -- common URL schema for all data def _base_location(self) -> Optional[str]: - return f'/self.name' + return f'/{self.name}' def _location(self) -> Optional[str]: ret = '' @@ -98,9 +146,10 @@ class Table(abc.ABC): # export def _row_location(self) -> Optional[str]: ret = self._location() - for col in self.primary_keys: - if col not in self.__schema.access_defining_columns: - ret += f'/<{col}>' + if ret is not None: + for col in self.primary_keys: + if col not in self.__schema.access_defining_columns: + ret += f'/<{col}>' return ret # -- To be used @@ -136,6 +185,9 @@ class Table(abc.ABC): # export def model_name(self) -> Optional[str]: return self._model_name() + def query_name(self) -> str: + return self._query_name() + @property def base_location(self): if self.__base_location is None: @@ -160,6 +212,22 @@ class Table(abc.ABC): # export self.__primary_keys = self._primary_keys() return self.__primary_keys + @property + def log_columns(self): + if self.__log_columns is None: + self.__log_columns = self._log_columns() + return self.__log_columns + + @property + def display_columns(self): + if self.__display_columns is None: + self.__display_columns = self._display_columns() + return self.__display_columns + + @property + def auto_increment_columns(self) -> Iterable[str]: + return self._auto_increment_columns() + @property def nullable_columns(self) -> Iterable[str]: if self.__nullable_columns is None: @@ -199,19 +267,9 @@ class Table(abc.ABC): # export return self.__not_null_insertible_columns @property - def log_columns(self): - if self.__log_columns is None: - self.__log_columns = self._log_columns() - return self.__log_columns - - @property - def auto_increment_columns(self) -> Iterable[str]: - return self._auto_increment_columns() - - @property - def unique_constraints(self) -> List[ColumnSet]: + def unique_constraints(self) -> list[ColumnSet]: if self.__unique_constraints is None: - ret: List[ColumnSet] = [] + ret: list[ColumnSet] = [] impl = self._unique_constraints() if impl is not None: for columns in impl: @@ -220,11 +278,26 @@ class Table(abc.ABC): # export return self.__unique_constraints @property - def foreign_key_constraints(self) -> List[CompositeForeignKey]: + def foreign_key_constraints(self) -> list[CompositeForeignKey]: if self.__foreign_key_constraints is None: - ret: List[Any] = [] + ret: list[Any] = [] for composite_key in self.__schema.foreign_key_constraints: if composite_key.child_table == self: ret.append(composite_key) self.__foreign_key_constraints = ret return self.__foreign_key_constraints + + @property + def foreign_key_parent_tables(self): + return self.__foreign_key_parent_tables.values() + + def foreign_keys_to_parent_table(self, parent_table) -> Iterable[CompositeForeignKey]: + if self.__foreign_keys_to_parent_table is None: + self.__foreign_keys_to_parent_table = OrderedDict() + for cfk in self.foreign_key_constraints: + pt = cfk.parent_table.name + if pt not in self.__foreign_keys_to_parent_table: + self.__foreign_keys_to_parent_table[pt] = [] + self.__foreign_keys_to_parent_table[pt].append(cfk) + parent_table_name = parent_table if isinstance(parent_table, str) else parent_table.name + return self.__foreign_keys_to_parent_table[parent_table_name]