Add db.schema framework

jw.db.schema is a set of classes meant as an interface to describe a database schema.

Signed-off-by: Jan Lindemann <jan@janware.com>
This commit is contained in:
Jan Lindemann 2025-01-28 10:18:57 +01:00
commit 17ab47e96a
8 changed files with 556 additions and 0 deletions

View file

@ -0,0 +1,75 @@
# -*- coding: utf-8 -*-
from typing import Optional, Any
import abc
from ...log import *
from .DataType import DataType
class Column(abc.ABC): # export
def __init__(self, table, name, data_type: DataType):
self.__name: str = name
self.__table: Any = table
self.__is_nullable: Optional[bool] = None
self.__is_null_insertible: Optional[bool] = None
self.__is_primary_key: Optional[bool] = None
self.__default_value: Optional[Any] = None
self.__default_value_cached: bool = False
self.__is_auto_increment: Optional[bool] = None
self.__data_type: DataType = data_type
def __repr__(self):
return f'{self.__table.name}.{self.__name}: {self.__data_type}'
@property
def name(self) -> str:
return self.__name
@property
def data_type(self):
return self.__data_type
@property
def table(self) -> str:
return self.__table
@property
def is_nullable(self) -> bool:
if self.__is_nullable is None:
self.__is_nullable = self.__name in self.__table.nullable_columns
return self.__is_nullable
@property
def is_null_insertible(self):
if self.__is_null_insertible is None:
ret = False
if self.is_nullable:
ret = True
elif self.is_auto_increment:
ret = True
elif self.default_value is not None:
ret = True
self.__is_null_insertible = ret
return self.__is_null_insertible
@property
def is_primary_key(self) -> bool:
if self.__is_primary_key is None:
self.__is_primary_key = self.__name in self.__table.primary_keys
return self.__is_primary_key
@property
def is_auto_increment(self) -> bool:
if self.__is_auto_increment is None:
self.__is_auto_increment = self.__name in self.__table.auto_increment_columns
return self.__is_auto_increment
@property
def default_value(self) -> Optional[Any]:
if self.__default_value_cached is False:
self.__default_value = self.__table.column_default(self.name)
self.__default_value_cached = True
return self.__default_value

View file

@ -0,0 +1,43 @@
# -*- coding: utf-8 -*-
from typing import Optional, List, Iterable, Any
class ColumnSet: # export
def __init__(self, *args: List[Any], columns: List[Any]=[], table: Optional[Any]=None, names: Optional[List[str]]=None):
self.__columns: List[Any] = [*args]
self.__columns.extend(columns)
self.__table = table
if names is not None:
assert(table is not None)
for name in names:
self.__columns.append(table.column(name))
if self.__table is not None:
for col in columns:
assert(col.table == self.__table)
def __len__(self):
return len(self.__columns)
def __iter__(self):
yield from self.__columns
def __repr__(self):
return '|'.join([col.name for col in self.__columns])
def __getitem__(self, index):
return self.__columns[index]
def __eq__(self, rhs):
if self.__table != rhs.__table:
return False
if len(self.__columns) != len(rhs.__columns):
return False
for i in range(0, len(self.__columns)):
if self.__columns[i].name != rhs.__columns[i].name:
return False
return True
@property
def columns(self) -> List[Any]:
return self.__columns

View file

@ -0,0 +1,78 @@
# -*- coding: utf-8 -*-
from typing import Optional, List, Any
from .ColumnSet import ColumnSet
from .SingleForeignKey import SingleForeignKey
class CompositeForeignKey: # export
def __init__(self, child_col_set: ColumnSet, parent_col_set: ColumnSet): # TODO: Implement alternative ways to construct
def __table(s):
ret = None
for c in s:
if ret is None:
ret = c.table
else:
assert(ret == c.table)
assert(ret is not None)
return ret
self.__child_col_set = child_col_set
self.__parent_col_set = parent_col_set
self.__child_table = __table(child_col_set)
self.__parent_table = __table(parent_col_set)
assert(len(self.__child_col_set) == len(self.__parent_col_set))
self.__len = len(self.__child_col_set)
self.__column_relations: Optional[List[SingleForeignKey]] = None
def __table_rel_str(self):
return f'{self.__child_table.name} => {self.__parent_table.name}'
def __cols_rel_str(self, child, parent):
return f'{child.name} -> {parent.name}'
def __len__(self):
return self.__len
def __iter__(self):
yield from self.column_relations
def __repr__(self):
ret = self.__table_rel_str()
ret += '|| ' + ' | '.join([self.__cols_rel_str(rel.child_col, rel.parent_col) for rel in self.column_relations])
return ret
def __eq__(self, rhs):
if rhs.__child_col_set != self.__child_col_set:
return False
if rhs.__parent_col_set != self.__parent_col_set:
return False
return True
@property
def child_table(self) -> Any:
return self.__child_table
@property
def parent_table(self) -> Any:
return self.__parent_table
@property
def child_columns(self) -> ColumnSet:
return self.__child_col_set
@property
def parent_columns(self) -> ColumnSet:
return self.__parent_col_set
@property
def column_relations(self) -> List[Any]:
ret = []
if self.__column_relations is None:
for i in range(0, self.__len):
ret.append(SingleForeignKey(self.__child_col_set[i], self.__parent_col_set[i]))
self.__column_relations = ret
return self.__column_relations

View file

@ -0,0 +1,75 @@
# -*- coding: utf-8 -*-
from typing import Optional
from ...log import *
from enum import Enum, auto
from datetime import datetime
class Id(Enum):
Integer = auto()
SmallInteger = auto()
Currency = auto()
Single = auto()
DateTime = auto()
String = auto()
Text = auto()
Invalid = auto()
def py_type(type_id: Id) -> type: # export
match type_id:
case Id.Integer:
return int
case Id.SmallInteger:
return int
case Id.Currency:
return int
case Id.Single:
return int
case Id.DateTime:
return datetime
case Id.String:
return str
case Id.Text:
return str
case Id.DateTime:
return datetime
raise Exception(f'Unknown column type-id "{type_id}"')
class DataType: # export
def __init__(self, type_id: Id, size: Optional[int]=None):
if not isinstance(type_id, Id):
throw(ERR, f'Passed type id "{type_id}" with unsupported data type {type(type_id)}')
if size is not None:
assert(isinstance(size, int))
assert(size > 0)
self.__id = type_id
self.__size = size
def __repr__(self):
ret = f'{self.__id}'
if self.__size is not None:
ret += f'({self.__size})'
return ret
@property
def type_id(self) -> Id:
return self.__id
@property
def size(self) -> Optional[int]:
return self.__size
@property
def py_type(self) -> type:
return py_type(self.__id)
@property
def py_type_str(self) -> str:
return self.py_type.__name__
@property
def py_type_annotation(self) -> str:
return self.py_type_str # FIXME: This is not always correct

View file

@ -0,0 +1,4 @@
TOPDIR = ../../../../..
include $(TOPDIR)/make/proj.mk
include $(JWBDIR)/make/py-mod.mk

View file

@ -0,0 +1,78 @@
# -*- coding: utf-8 -*-
from typing import Optional, List, Iterable
import abc
from ...log import *
from .Table import Table
from .Column import Column
from .DataType import DataType
from .CompositeForeignKey import CompositeForeignKey
class Schema(abc.ABC): # export
def __init__(self) -> None:
self.___tables: Optional[List[Table]] = None
self.__foreign_keys: Optional[List[CompositeForeignKey]] = None
@property
def __tables(self):
if self.___tables is None:
ret = dict()
for name in self._table_names():
slog(WARNING, f'Caching metadata for table "{name}"')
assert(isinstance(name, str))
ret[name] = self._table(name)
self.___tables = ret
return self.___tables
# ------ API to be implemented
@abc.abstractmethod
def _table_names(self) -> Iterable[str]:
throw(ERR, "Called pure virtual base class method")
return []
@abc.abstractmethod
def _table(self, name: str) -> Table:
throw(ERR, "Called pure virtual base class method")
return None # type: ignore
@abc.abstractmethod
def _foreign_keys(self) -> List[CompositeForeignKey]:
pass
# ------ API to be called
@property
def table_names(self) -> Iterable[str]:
return self.__tables.keys()
@property
def tables(self) -> Iterable[Table]:
return self.__tables.values()
@property
def foreign_key_constraints(self) -> List[CompositeForeignKey]:
if self.__foreign_keys is None:
self.__foreign_keys = self._foreign_keys()
return self.__foreign_keys
def table(self, name: str) -> Table:
return self.__tables[name]
def table_by_model_name(self, name: str, throw=False) -> Table:
for table in self.__tables.values():
if table.model_name == name:
return table
if throw:
raise Exception(f'Table "{name}" not found in database metadata')
return None # type: ignore
def primary_keys(self, table_name: str) -> Iterable[str]:
return self.__tables[table_name].primary_keys
def columns(self, table_name: str) -> Iterable[Column]:
return self.__tables[table_name].columns

View file

@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
from typing import Optional, List, Any
from .ColumnSet import ColumnSet
class SingleForeignKey:
def __init__(self, child_col, parent_col):
self.__child_col = child_col
self.__parent_col = parent_col
@property
def child_col(self):
return self.__child_col
@property
def parent_col(self):
return self.__parent_col

View file

@ -0,0 +1,184 @@
# -*- coding: utf-8 -*-
from typing import Optional, Iterable, List, Any # TODO: Need any for many things, as I can't figure out how to avoid circular imports from here
import abc
from collections import OrderedDict
from ...log import *
from .ColumnSet import ColumnSet
from .DataType import DataType
from .CompositeForeignKey import CompositeForeignKey
from .Column import Column
class Table(abc.ABC): # export
def __init__(self, schema, name: str):
assert(isinstance(name, str))
self.__schema = schema
self.__name = name
self.__primary_keys: Optional[Iterable[str]] = None
self.__unique_constraints: Optional[List[ColumnSet]] = None
self.__foreign_key_constraints: Optional[List[CompositeForeignKey]] = None
self.___columns: Optional[OrderedDict[str, Any]] = None
self.__nullable_columns: Optional[Iterable[str]] = None
self.__non_nullable_columns: Optional[Iterable[str]] = None
self.__null_insertible_columns: Optional[Iterable[str]] = None
self.__not_null_insertible_columns: Optional[Iterable[str]] = None
self.__log_columns: Optional[Iterable[str]] = None
self.__column_default: Optional[dict[str, Any]] = None
def __repr__(self) -> str:
return self.__name
@property
def __columns(self):
if self.___columns is None:
ret: OrderedDict[str, Any] = OrderedDict()
for name in self._column_names():
ret[name] = Column(self, name, self._column_data_type(name))
self.___columns = ret
return self.___columns
# -- To be reimplemented
@abc.abstractmethod
def _column_names(self) -> Iterable[str]:
pass
@abc.abstractmethod
def _log_columns(self) -> Iterable[str]:
pass
@abc.abstractmethod
def _column_data_type(self, name) -> DataType:
pass
@abc.abstractmethod
def _primary_keys(self) -> Iterable[str]:
pass
@abc.abstractmethod
def _auto_increment_columns(self) -> Iterable[str]:
pass
@abc.abstractmethod
def _column_default(self, name) -> Any:
pass
@abc.abstractmethod
def _unique_constraints(self) -> List[List[str]]:
pass
def _model_name(self) -> Optional[str]:
slog(WARNING, f'Returning None model name for table {self.name}')
return None
# -- To be used
def column_default(self, name) -> Any:
if self.__column_default is None:
ret: dict[str, Any] = dict()
for name in self.column_names:
ret[name] = self._column_default(name)
self.__column_default = ret
return self.__column_default[name]
@property
def columns(self):
return self.__columns.values()
def column(self, name):
return self.__columns[name]
@property
def column_names(self) -> Iterable[str]:
return self.__columns.keys()
@property
def name(self) -> str:
return self.__name
@property
def schema(self):
return self.__schema
@property
def model_name(self) -> Optional[str]:
return self._model_name()
@property
def primary_keys(self) -> Iterable[str]:
if self.__primary_keys is None:
self.__primary_keys = self._primary_keys()
return self.__primary_keys
@property
def nullable_columns(self) -> Iterable[str]:
if self.__nullable_columns is None:
self.__nullable_columns = self._nullable_columns()
return self.__nullable_columns
@property
def non_nullable_columns(self) -> Iterable[str]:
if self.__non_nullable_columns is None:
ret = []
all_cols = self.column_names
nullable_columns = self.nullable_columns
for col in all_cols:
if col not in nullable_columns:
ret.append(col)
self.__non_nullable_columns = ret
return self.__non_nullable_columns
@property
def null_insertible_columns(self) -> Iterable[str]:
if self.__null_insertible_columns is None:
ret: list[str] = []
for col in self.__columns.values():
if col.is_null_insertible:
ret.append(col.name)
self.__null_insertible_columns = ret
return self.__null_insertible_columns
@property
def not_null_insertible_columns(self) -> Iterable[str]:
if self.__not_null_insertible_columns is None:
ret: list[str] = []
for col in self.__columns.values():
if not col.is_null_insertible:
ret.append(col.name)
self.__not_null_insertible_columns = ret
return self.__not_null_insertible_columns
@property
def log_columns(self):
if self.__log_columns is None:
self.__log_columns = self._log_columns()
return self.__log_columns
@property
def auto_increment_columns(self) -> Iterable[str]:
return self._auto_increment_columns()
@property
def unique_constraints(self) -> List[ColumnSet]:
if self.__unique_constraints is None:
ret: List[ColumnSet] = []
impl = self._unique_constraints()
if impl is not None:
for columns in impl:
ret.append(ColumnSet(columns=columns))
self.__unique_constraints = ret
return self.__unique_constraints
@property
def foreign_key_constraints(self) -> List[CompositeForeignKey]:
if self.__foreign_key_constraints is None:
ret: List[Any] = []
for composite_key in self.__schema.foreign_key_constraints:
if composite_key.child_table == self:
ret.append(composite_key)
self.__foreign_key_constraints = ret
return self.__foreign_key_constraints