schema: Continue

Signed-off-by: Jan Lindemann <jan@janware.com>
This commit is contained in:
Jan Lindemann 2025-01-30 20:34:44 +01:00
commit c50c614f13
7 changed files with 223 additions and 52 deletions

View file

@ -20,10 +20,27 @@ class Column(abc.ABC): # export
self.__default_value_cached: bool = False
self.__is_auto_increment: Optional[bool] = None
self.__data_type: DataType = data_type
self.__foreign_keys: Optional[Any] = None
self.__foreign_keys_cached: bool = False
self.__foreign_keys_by_table: Optional[dict[str, Any]] = None
def __repr__(self):
return f'{self.__table.name}.{self.__name}: {self.__data_type}'
def __eq__(self, rhs) -> bool:
if isinstance(rhs, Column):
if self.__table != rhs.__table:
return False
if self.__name != rhs.__name:
return False
return True
elif isinstance(rhs, str):
if self.__name != rhs:
return False
return True
throw(ERR, f'Tried to compare column {self} to type {type(rhs)}: {rhs}')
return False # Unreachable but requested by mypy
@property
def name(self) -> str:
return self.__name
@ -73,3 +90,25 @@ class Column(abc.ABC): # export
self.__default_value = self.__table.column_default(self.name)
self.__default_value_cached = True
return self.__default_value
# Returns Column object on parent table
@property
def foreign_keys(self) -> Optional[Any]:
if not self.__foreign_keys_cached:
fks: list[Any] = []
for cfk in self.__table.foreign_key_constraints:
for fk in cfk:
if fk.child_column == self:
fks.append(fk.parent_column)
self.__foreign_keys_cached = True
self.__foreign_keys = fks if fks else None
return self.__foreign_keys
def foreign_key(self, table) -> Optional[Any]:
if self.__foreign_keys_by_table is None:
self.__foreign_keys_by_table = dict()
for col in self.foreign_keys: # type: ignore # Any not iterable
assert(col.table.name not in self.__foreign_keys_by_table)
self.__foreign_keys_by_table[col.table.name] = col
table_name = table if isinstance(table, str) else table.name
return self.__foreign_keys_by_table.get(table_name)

View file

@ -1,11 +1,11 @@
# -*- coding: utf-8 -*-
from typing import Optional, List, Iterable, Any
from typing import Optional, Iterable, Any
class ColumnSet: # export
def __init__(self, *args: List[Any], columns: List[Any]=[], table: Optional[Any]=None, names: Optional[List[str]]=None):
self.__columns: List[Any] = [*args]
def __init__(self, *args: list[Any], columns: list[Any]=[], table: Optional[Any]=None, names: Optional[list[str]]=None):
self.__columns: list[Any] = [*args]
self.__columns.extend(columns)
self.__table = table
if names is not None:
@ -39,5 +39,5 @@ class ColumnSet: # export
return True
@property
def columns(self) -> List[Any]:
def columns(self) -> list[Any]:
return self.__columns

View file

@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
from typing import Optional, List, Any
from typing import Optional, Any
from jwutils.log import *
from .ColumnSet import ColumnSet
from .SingleForeignKey import SingleForeignKey
@ -26,7 +28,9 @@ class CompositeForeignKey: # export
assert(len(self.__child_col_set) == len(self.__parent_col_set))
self.__len = len(self.__child_col_set)
self.__column_relations: Optional[List[SingleForeignKey]] = None
self.__column_relations: Optional[list[SingleForeignKey]] = None
self.__parent_columns_by_child_column: Optional[dict[str, Column]] = None
self.__child_columns_by_parent_column: Optional[dict[str, Column]] = None
def __table_rel_str(self):
return f'{self.__child_table.name} => {self.__parent_table.name}'
@ -42,7 +46,7 @@ class CompositeForeignKey: # export
def __repr__(self):
ret = self.__table_rel_str()
ret += '|| ' + ' | '.join([self.__cols_rel_str(rel.child_col, rel.parent_col) for rel in self.column_relations])
ret += ': ' + ', '.join([self.__cols_rel_str(rel.child_column, rel.parent_column) for rel in self.column_relations])
return ret
def __eq__(self, rhs):
@ -68,8 +72,29 @@ class CompositeForeignKey: # export
def parent_columns(self) -> ColumnSet:
return self.__parent_col_set
def parent_column(self, child_column):
child_column_name = child_column if isinstance(child_column, str) else child_column.name
if self.__parent_columns_by_child_column is None:
d: dict[str, Column] = {}
assert(len(self.__child_col_set) == len(self.__parent_col_set))
for i in range(0, len(self.__child_col_set)):
d[self.__child_col_set[i].name] = self.__parent_col_set[i]
self.__parent_columns_by_child_column = d
return self.__parent_columns_by_child_column[child_column]
def child_column(self, parent_column):
slog(WARNING, f'{self}: Looking for child column belonging to parent column "{parent_column}"')
parent_column_name = parent_column if isinstance(parent_column, str) else parent_column.name
if self.__child_columns_by_parent_column is None:
d: dict[str, Column] = {}
assert(len(self.__parent_col_set) == len(self.__child_col_set))
for i in range(0, len(self.__parent_col_set)):
d[self.__parent_col_set[i].name] = self.__child_col_set[i]
self.__child_columns_by_parent_column = d
return self.__child_columns_by_parent_column[parent_column]
@property
def column_relations(self) -> List[Any]:
def column_relations(self) -> list[Any]:
ret = []
if self.__column_relations is None:
for i in range(0, self.__len):

View file

@ -48,12 +48,20 @@ class DataType: # export
assert(size > 0)
self.__id = type_id
self.__size = size
def __repr__(self):
ret = f'{self.__id}'
ret = f'{self.__id.name}'
if self.__size is not None:
ret += f'({self.__size})'
return ret
def __eq__(self, rhs):
if self.__id != rhs.__id:
return False
if self.__size != rhs.__size:
return False
return True
@property
def type_id(self) -> Id:
return self.__id

View file

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
from typing import Optional, List, Iterable
from typing import Optional, Iterable
import abc
@ -14,16 +14,16 @@ from .CompositeForeignKey import CompositeForeignKey
class Schema(abc.ABC): # export
def __init__(self) -> None:
self.___tables: Optional[List[Table]] = None
self.__foreign_keys: Optional[List[CompositeForeignKey]] = None
self.__access_defining_columns: Optional[List[str]] = None
self.___tables: Optional[list[Table]] = None
self.__foreign_keys: Optional[list[CompositeForeignKey]] = None
self.__access_defining_columns: Optional[list[str]] = None
@property
def __tables(self):
if self.___tables is None:
ret = dict()
for name in self._table_names():
slog(WARNING, f'Caching metadata for table "{name}"')
slog(DEBUG, f'Caching metadata for table "{name}"')
assert(isinstance(name, str))
ret[name] = self._table(name)
self.___tables = ret
@ -42,7 +42,7 @@ class Schema(abc.ABC): # export
return None # type: ignore
@abc.abstractmethod
def _foreign_keys(self) -> List[CompositeForeignKey]:
def _foreign_keys(self) -> list[CompositeForeignKey]:
pass
@abc.abstractmethod
@ -51,6 +51,18 @@ class Schema(abc.ABC): # export
# ------ API to be called
def __len__(self):
return len(self.__tables)
def __iter__(self):
yield from self.__tables.values()
def __repr__(self):
return '|'.join([table.name for table in self.__tables])
def __getitem__(self, index):
return self.__tables[index]
@property
def table_names(self) -> Iterable[str]:
return self.__tables.keys()
@ -66,7 +78,7 @@ class Schema(abc.ABC): # export
return self.__access_defining_columns
@property
def foreign_key_constraints(self) -> List[CompositeForeignKey]:
def foreign_key_constraints(self) -> list[CompositeForeignKey]:
if self.__foreign_keys is None:
self.__foreign_keys = self._foreign_keys()
return self.__foreign_keys

View file

@ -1,19 +1,33 @@
# -*- coding: utf-8 -*-
from typing import Optional, List, Any
from typing import Optional, Any
from .Column import Column
from .ColumnSet import ColumnSet
class SingleForeignKey:
def __init__(self, child_col, parent_col):
def __init__(self, child_col: Column, parent_col: Column):
self.__child_col = child_col
self.__parent_col = parent_col
self.__iterable = (self.__child_col, self.__parent_col)
def __len__(self):
return 2
def __iter__(self):
yield from self.__iterable
def __repr__(self):
return f'{self.__child_col.table.name}.{self.__child_col.name} -> {self.__parent_col.table.name}.{self.__parent_col.name}'
def __getitem__(self, index):
return self.__iterable[index]
@property
def child_col(self):
def child_column(self):
return self.__child_col
@property
def parent_col(self):
def parent_column(self):
return self.__parent_col

View file

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
from typing import Optional, Iterable, List, Any # TODO: Need any for many things, as I can't figure out how to avoid circular imports from here
from typing import Optional, Iterable, Any # TODO: Need any for many things, as I can't figure out how to avoid circular imports from here
import abc
from collections import OrderedDict
@ -18,26 +18,27 @@ class Table(abc.ABC): # export
assert(isinstance(name, str))
self.__schema = schema
self.__name = name
self.__primary_keys: Optional[Iterable[str]] = None
self.__unique_constraints: Optional[List[ColumnSet]] = None
self.__foreign_key_constraints: Optional[List[CompositeForeignKey]] = None
self.___columns: Optional[OrderedDict[str, Any]] = None
self.___foreign_key_parent_tables: Optional[OrderedDict[str, Any]] = None
self.__primary_keys: Optional[Iterable[str]] = None
self.__unique_constraints: Optional[list[ColumnSet]] = None
self.__foreign_key_constraints: Optional[list[CompositeForeignKey]] = None
self.__nullable_columns: Optional[Iterable[str]] = None
self.__non_nullable_columns: Optional[Iterable[str]] = None
self.__null_insertible_columns: Optional[Iterable[str]] = None
self.__not_null_insertible_columns: Optional[Iterable[str]] = None
self.__log_columns: Optional[Iterable[str]] = None
self.__display_columns: Optional[Iterable[str]] = None
self.__column_default: Optional[dict[str, Any]] = None
self.__base_location: Optional[Iterable[str]] = None
self.__location: Optional[Iterable[str]] = None
self.__row_location: Optional[Iterable[str]] = None
def __repr__(self) -> str:
return self.__name
self.__foreign_keys_to_parent_table: Optional[OrderedDict[str, Any]] = None
@property
def __columns(self):
def __columns(self) -> OrderedDict[str, Any]:
if self.___columns is None:
ret: OrderedDict[str, Any] = OrderedDict()
for name in self._column_names():
@ -45,16 +46,47 @@ class Table(abc.ABC): # export
self.___columns = ret
return self.___columns
@property
def __foreign_key_parent_tables(self) -> OrderedDict[str, Any]:
if self.___foreign_key_parent_tables is None:
self.___foreign_key_parent_tables = OrderedDict()
for cfk in self.foreign_key_constraints:
self.___foreign_key_parent_tables[cfk.parent_table.name] = cfk.parent_table
return self.___foreign_key_parent_tables
def __len__(self):
return len(self.__columns)
def __iter__(self):
yield from self.__columns.values()
def __repr__(self) -> str:
return self.__name
def __getitem__(self, index):
return self.__columns[index]
def __eq__(self, rhs) -> bool:
if isinstance(rhs, Table):
if self.__name != rhs.__name:
return False
return True
elif isinstance(rhs, str):
if self.__name != rhs:
return False
return True
throw(ERR, f'Tried to compare table {self} to type {type(rhs)}: {rhs}')
return False # Unreachable but requested by mypy
def __hash__(self) -> int:
return hash(self.name)
# -- To be reimplemented
@abc.abstractmethod
def _column_names(self) -> Iterable[str]:
pass
@abc.abstractmethod
def _log_columns(self) -> Iterable[str]:
pass
@abc.abstractmethod
def _column_data_type(self, name) -> DataType:
pass
@ -63,6 +95,18 @@ class Table(abc.ABC): # export
def _primary_keys(self) -> Iterable[str]:
pass
@abc.abstractmethod
def _log_columns(self) -> Iterable[str]:
pass
@abc.abstractmethod
def _display_columns(self) -> Iterable[str]:
return self._primary_keys()
@abc.abstractmethod
def _nullable_columns(self) -> Iterable[str]:
pass
@abc.abstractmethod
def _auto_increment_columns(self) -> Iterable[str]:
pass
@ -72,7 +116,7 @@ class Table(abc.ABC): # export
pass
@abc.abstractmethod
def _unique_constraints(self) -> List[List[str]]:
def _unique_constraints(self) -> list[list[str]]:
pass
def _model_name(self) -> Optional[str]:
@ -84,9 +128,13 @@ class Table(abc.ABC): # export
throw(ERR, "Not implemented")
return None
@abc.abstractmethod
def _query_name(self) -> str:
return self.__name
# -- common URL schema for all data
def _base_location(self) -> Optional[str]:
return f'/self.name'
return f'/{self.name}'
def _location(self) -> Optional[str]:
ret = ''
@ -98,6 +146,7 @@ class Table(abc.ABC): # export
def _row_location(self) -> Optional[str]:
ret = self._location()
if ret is not None:
for col in self.primary_keys:
if col not in self.__schema.access_defining_columns:
ret += f'/<{col}>'
@ -136,6 +185,9 @@ class Table(abc.ABC): # export
def model_name(self) -> Optional[str]:
return self._model_name()
def query_name(self) -> str:
return self._query_name()
@property
def base_location(self):
if self.__base_location is None:
@ -160,6 +212,22 @@ class Table(abc.ABC): # export
self.__primary_keys = self._primary_keys()
return self.__primary_keys
@property
def log_columns(self):
if self.__log_columns is None:
self.__log_columns = self._log_columns()
return self.__log_columns
@property
def display_columns(self):
if self.__display_columns is None:
self.__display_columns = self._display_columns()
return self.__display_columns
@property
def auto_increment_columns(self) -> Iterable[str]:
return self._auto_increment_columns()
@property
def nullable_columns(self) -> Iterable[str]:
if self.__nullable_columns is None:
@ -199,19 +267,9 @@ class Table(abc.ABC): # export
return self.__not_null_insertible_columns
@property
def log_columns(self):
if self.__log_columns is None:
self.__log_columns = self._log_columns()
return self.__log_columns
@property
def auto_increment_columns(self) -> Iterable[str]:
return self._auto_increment_columns()
@property
def unique_constraints(self) -> List[ColumnSet]:
def unique_constraints(self) -> list[ColumnSet]:
if self.__unique_constraints is None:
ret: List[ColumnSet] = []
ret: list[ColumnSet] = []
impl = self._unique_constraints()
if impl is not None:
for columns in impl:
@ -220,11 +278,26 @@ class Table(abc.ABC): # export
return self.__unique_constraints
@property
def foreign_key_constraints(self) -> List[CompositeForeignKey]:
def foreign_key_constraints(self) -> list[CompositeForeignKey]:
if self.__foreign_key_constraints is None:
ret: List[Any] = []
ret: list[Any] = []
for composite_key in self.__schema.foreign_key_constraints:
if composite_key.child_table == self:
ret.append(composite_key)
self.__foreign_key_constraints = ret
return self.__foreign_key_constraints
@property
def foreign_key_parent_tables(self):
return self.__foreign_key_parent_tables.values()
def foreign_keys_to_parent_table(self, parent_table) -> Iterable[CompositeForeignKey]:
if self.__foreign_keys_to_parent_table is None:
self.__foreign_keys_to_parent_table = OrderedDict()
for cfk in self.foreign_key_constraints:
pt = cfk.parent_table.name
if pt not in self.__foreign_keys_to_parent_table:
self.__foreign_keys_to_parent_table[pt] = []
self.__foreign_keys_to_parent_table[pt].append(cfk)
parent_table_name = parent_table if isinstance(parent_table, str) else parent_table.name
return self.__foreign_keys_to_parent_table[parent_table_name]