diff --git a/tools/python/jwutils/db/schema/Column.py b/tools/python/jwutils/db/schema/Column.py new file mode 100644 index 0000000..e5133e5 --- /dev/null +++ b/tools/python/jwutils/db/schema/Column.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- + +from typing import Optional, Any + +import abc + +from ...log import * + +from .DataType import DataType + +class Column(abc.ABC): # export + + def __init__(self, table, name, data_type: DataType): + self.__name: str = name + self.__table: Any = table + self.__is_nullable: Optional[bool] = None + self.__is_null_insertible: Optional[bool] = None + self.__is_primary_key: Optional[bool] = None + self.__default_value: Optional[Any] = None + self.__default_value_cached: bool = False + self.__is_auto_increment: Optional[bool] = None + self.__data_type: DataType = data_type + + def __repr__(self): + return f'{self.__table.name}.{self.__name}: {self.__data_type}' + + @property + def name(self) -> str: + return self.__name + + @property + def data_type(self): + return self.__data_type + + @property + def table(self) -> str: + return self.__table + + @property + def is_nullable(self) -> bool: + if self.__is_nullable is None: + self.__is_nullable = self.__name in self.__table.nullable_columns + return self.__is_nullable + + @property + def is_null_insertible(self): + if self.__is_null_insertible is None: + ret = False + if self.is_nullable: + ret = True + elif self.is_auto_increment: + ret = True + elif self.default_value is not None: + ret = True + self.__is_null_insertible = ret + return self.__is_null_insertible + + @property + def is_primary_key(self) -> bool: + if self.__is_primary_key is None: + self.__is_primary_key = self.__name in self.__table.primary_keys + return self.__is_primary_key + + @property + def is_auto_increment(self) -> bool: + if self.__is_auto_increment is None: + self.__is_auto_increment = self.__name in self.__table.auto_increment_columns + return self.__is_auto_increment + + @property + def default_value(self) -> Optional[Any]: + if self.__default_value_cached is False: + self.__default_value = self.__table.column_default(self.name) + self.__default_value_cached = True + return self.__default_value diff --git a/tools/python/jwutils/db/schema/ColumnSet.py b/tools/python/jwutils/db/schema/ColumnSet.py new file mode 100644 index 0000000..9426696 --- /dev/null +++ b/tools/python/jwutils/db/schema/ColumnSet.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- + +from typing import Optional, List, Iterable, Any + +class ColumnSet: # export + + def __init__(self, *args: List[Any], columns: List[Any]=[], table: Optional[Any]=None, names: Optional[List[str]]=None): + self.__columns: List[Any] = [*args] + self.__columns.extend(columns) + self.__table = table + if names is not None: + assert(table is not None) + for name in names: + self.__columns.append(table.column(name)) + if self.__table is not None: + for col in columns: + assert(col.table == self.__table) + + def __len__(self): + return len(self.__columns) + + def __iter__(self): + yield from self.__columns + + def __repr__(self): + return '|'.join([col.name for col in self.__columns]) + + def __getitem__(self, index): + return self.__columns[index] + + def __eq__(self, rhs): + if self.__table != rhs.__table: + return False + if len(self.__columns) != len(rhs.__columns): + return False + for i in range(0, len(self.__columns)): + if self.__columns[i].name != rhs.__columns[i].name: + return False + return True + + @property + def columns(self) -> List[Any]: + return self.__columns diff --git a/tools/python/jwutils/db/schema/CompositeForeignKey.py b/tools/python/jwutils/db/schema/CompositeForeignKey.py new file mode 100644 index 0000000..b77e9a7 --- /dev/null +++ b/tools/python/jwutils/db/schema/CompositeForeignKey.py @@ -0,0 +1,78 @@ +# -*- coding: utf-8 -*- + +from typing import Optional, List, Any + +from .ColumnSet import ColumnSet +from .SingleForeignKey import SingleForeignKey + +class CompositeForeignKey: # export + + def __init__(self, child_col_set: ColumnSet, parent_col_set: ColumnSet): # TODO: Implement alternative ways to construct + + def __table(s): + ret = None + for c in s: + if ret is None: + ret = c.table + else: + assert(ret == c.table) + assert(ret is not None) + return ret + + self.__child_col_set = child_col_set + self.__parent_col_set = parent_col_set + self.__child_table = __table(child_col_set) + self.__parent_table = __table(parent_col_set) + + assert(len(self.__child_col_set) == len(self.__parent_col_set)) + self.__len = len(self.__child_col_set) + self.__column_relations: Optional[List[SingleForeignKey]] = None + + def __table_rel_str(self): + return f'{self.__child_table.name} => {self.__parent_table.name}' + + def __cols_rel_str(self, child, parent): + return f'{child.name} -> {parent.name}' + + def __len__(self): + return self.__len + + def __iter__(self): + yield from self.column_relations + + def __repr__(self): + ret = self.__table_rel_str() + ret += '|| ' + ' | '.join([self.__cols_rel_str(rel.child_col, rel.parent_col) for rel in self.column_relations]) + return ret + + def __eq__(self, rhs): + if rhs.__child_col_set != self.__child_col_set: + return False + if rhs.__parent_col_set != self.__parent_col_set: + return False + return True + + @property + def child_table(self) -> Any: + return self.__child_table + + @property + def parent_table(self) -> Any: + return self.__parent_table + + @property + def child_columns(self) -> ColumnSet: + return self.__child_col_set + + @property + def parent_columns(self) -> ColumnSet: + return self.__parent_col_set + + @property + def column_relations(self) -> List[Any]: + ret = [] + if self.__column_relations is None: + for i in range(0, self.__len): + ret.append(SingleForeignKey(self.__child_col_set[i], self.__parent_col_set[i])) + self.__column_relations = ret + return self.__column_relations diff --git a/tools/python/jwutils/db/schema/DataType.py b/tools/python/jwutils/db/schema/DataType.py new file mode 100644 index 0000000..b03c1e9 --- /dev/null +++ b/tools/python/jwutils/db/schema/DataType.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- + +from typing import Optional + +from ...log import * +from enum import Enum, auto +from datetime import datetime + +class Id(Enum): + Integer = auto() + SmallInteger = auto() + Currency = auto() + Single = auto() + DateTime = auto() + String = auto() + Text = auto() + Invalid = auto() + +def py_type(type_id: Id) -> type: # export + + match type_id: + case Id.Integer: + return int + case Id.SmallInteger: + return int + case Id.Currency: + return int + case Id.Single: + return int + case Id.DateTime: + return datetime + case Id.String: + return str + case Id.Text: + return str + case Id.DateTime: + return datetime + + raise Exception(f'Unknown column type-id "{type_id}"') + +class DataType: # export + + def __init__(self, type_id: Id, size: Optional[int]=None): + if not isinstance(type_id, Id): + throw(ERR, f'Passed type id "{type_id}" with unsupported data type {type(type_id)}') + if size is not None: + assert(isinstance(size, int)) + assert(size > 0) + self.__id = type_id + self.__size = size + def __repr__(self): + ret = f'{self.__id}' + if self.__size is not None: + ret += f'({self.__size})' + return ret + + @property + def type_id(self) -> Id: + return self.__id + + @property + def size(self) -> Optional[int]: + return self.__size + + @property + def py_type(self) -> type: + return py_type(self.__id) + + @property + def py_type_str(self) -> str: + return self.py_type.__name__ + + @property + def py_type_annotation(self) -> str: + return self.py_type_str # FIXME: This is not always correct diff --git a/tools/python/jwutils/db/schema/Makefile b/tools/python/jwutils/db/schema/Makefile new file mode 100644 index 0000000..781b0c8 --- /dev/null +++ b/tools/python/jwutils/db/schema/Makefile @@ -0,0 +1,4 @@ +TOPDIR = ../../../../.. + +include $(TOPDIR)/make/proj.mk +include $(JWBDIR)/make/py-mod.mk diff --git a/tools/python/jwutils/db/schema/Schema.py b/tools/python/jwutils/db/schema/Schema.py new file mode 100644 index 0000000..3ab372b --- /dev/null +++ b/tools/python/jwutils/db/schema/Schema.py @@ -0,0 +1,78 @@ +# -*- coding: utf-8 -*- + +from typing import Optional, List, Iterable + +import abc + +from ...log import * + +from .Table import Table +from .Column import Column +from .DataType import DataType +from .CompositeForeignKey import CompositeForeignKey + +class Schema(abc.ABC): # export + + def __init__(self) -> None: + self.___tables: Optional[List[Table]] = None + self.__foreign_keys: Optional[List[CompositeForeignKey]] = None + + @property + def __tables(self): + if self.___tables is None: + ret = dict() + for name in self._table_names(): + slog(WARNING, f'Caching metadata for table "{name}"') + assert(isinstance(name, str)) + ret[name] = self._table(name) + self.___tables = ret + return self.___tables + + # ------ API to be implemented + + @abc.abstractmethod + def _table_names(self) -> Iterable[str]: + throw(ERR, "Called pure virtual base class method") + return [] + + @abc.abstractmethod + def _table(self, name: str) -> Table: + throw(ERR, "Called pure virtual base class method") + return None # type: ignore + + @abc.abstractmethod + def _foreign_keys(self) -> List[CompositeForeignKey]: + pass + + # ------ API to be called + + @property + def table_names(self) -> Iterable[str]: + return self.__tables.keys() + + @property + def tables(self) -> Iterable[Table]: + return self.__tables.values() + + @property + def foreign_key_constraints(self) -> List[CompositeForeignKey]: + if self.__foreign_keys is None: + self.__foreign_keys = self._foreign_keys() + return self.__foreign_keys + + def table(self, name: str) -> Table: + return self.__tables[name] + + def table_by_model_name(self, name: str, throw=False) -> Table: + for table in self.__tables.values(): + if table.model_name == name: + return table + if throw: + raise Exception(f'Table "{name}" not found in database metadata') + return None # type: ignore + + def primary_keys(self, table_name: str) -> Iterable[str]: + return self.__tables[table_name].primary_keys + + def columns(self, table_name: str) -> Iterable[Column]: + return self.__tables[table_name].columns diff --git a/tools/python/jwutils/db/schema/SingleForeignKey.py b/tools/python/jwutils/db/schema/SingleForeignKey.py new file mode 100644 index 0000000..aa6b7e0 --- /dev/null +++ b/tools/python/jwutils/db/schema/SingleForeignKey.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- + +from typing import Optional, List, Any + +from .ColumnSet import ColumnSet + +class SingleForeignKey: + + def __init__(self, child_col, parent_col): + self.__child_col = child_col + self.__parent_col = parent_col + + @property + def child_col(self): + return self.__child_col + + @property + def parent_col(self): + return self.__parent_col diff --git a/tools/python/jwutils/db/schema/Table.py b/tools/python/jwutils/db/schema/Table.py new file mode 100644 index 0000000..3e8c00a --- /dev/null +++ b/tools/python/jwutils/db/schema/Table.py @@ -0,0 +1,184 @@ +# -*- coding: utf-8 -*- + +from typing import Optional, Iterable, List, Any # TODO: Need any for many things, as I can't figure out how to avoid circular imports from here + +import abc +from collections import OrderedDict + +from ...log import * + +from .ColumnSet import ColumnSet +from .DataType import DataType +from .CompositeForeignKey import CompositeForeignKey +from .Column import Column + +class Table(abc.ABC): # export + + def __init__(self, schema, name: str): + assert(isinstance(name, str)) + self.__schema = schema + self.__name = name + self.__primary_keys: Optional[Iterable[str]] = None + self.__unique_constraints: Optional[List[ColumnSet]] = None + self.__foreign_key_constraints: Optional[List[CompositeForeignKey]] = None + self.___columns: Optional[OrderedDict[str, Any]] = None + self.__nullable_columns: Optional[Iterable[str]] = None + self.__non_nullable_columns: Optional[Iterable[str]] = None + self.__null_insertible_columns: Optional[Iterable[str]] = None + self.__not_null_insertible_columns: Optional[Iterable[str]] = None + self.__log_columns: Optional[Iterable[str]] = None + self.__column_default: Optional[dict[str, Any]] = None + + def __repr__(self) -> str: + return self.__name + + @property + def __columns(self): + if self.___columns is None: + ret: OrderedDict[str, Any] = OrderedDict() + for name in self._column_names(): + ret[name] = Column(self, name, self._column_data_type(name)) + self.___columns = ret + return self.___columns + + # -- To be reimplemented + + @abc.abstractmethod + def _column_names(self) -> Iterable[str]: + pass + + @abc.abstractmethod + def _log_columns(self) -> Iterable[str]: + pass + + @abc.abstractmethod + def _column_data_type(self, name) -> DataType: + pass + + @abc.abstractmethod + def _primary_keys(self) -> Iterable[str]: + pass + + @abc.abstractmethod + def _auto_increment_columns(self) -> Iterable[str]: + pass + + @abc.abstractmethod + def _column_default(self, name) -> Any: + pass + + @abc.abstractmethod + def _unique_constraints(self) -> List[List[str]]: + pass + + def _model_name(self) -> Optional[str]: + slog(WARNING, f'Returning None model name for table {self.name}') + return None + + # -- To be used + + def column_default(self, name) -> Any: + if self.__column_default is None: + ret: dict[str, Any] = dict() + for name in self.column_names: + ret[name] = self._column_default(name) + self.__column_default = ret + return self.__column_default[name] + + @property + def columns(self): + return self.__columns.values() + + def column(self, name): + return self.__columns[name] + + @property + def column_names(self) -> Iterable[str]: + return self.__columns.keys() + + @property + def name(self) -> str: + return self.__name + + @property + def schema(self): + return self.__schema + + @property + def model_name(self) -> Optional[str]: + return self._model_name() + + @property + def primary_keys(self) -> Iterable[str]: + if self.__primary_keys is None: + self.__primary_keys = self._primary_keys() + return self.__primary_keys + + @property + def nullable_columns(self) -> Iterable[str]: + if self.__nullable_columns is None: + self.__nullable_columns = self._nullable_columns() + return self.__nullable_columns + + @property + def non_nullable_columns(self) -> Iterable[str]: + if self.__non_nullable_columns is None: + ret = [] + all_cols = self.column_names + nullable_columns = self.nullable_columns + for col in all_cols: + if col not in nullable_columns: + ret.append(col) + self.__non_nullable_columns = ret + return self.__non_nullable_columns + + @property + def null_insertible_columns(self) -> Iterable[str]: + if self.__null_insertible_columns is None: + ret: list[str] = [] + for col in self.__columns.values(): + if col.is_null_insertible: + ret.append(col.name) + self.__null_insertible_columns = ret + return self.__null_insertible_columns + + @property + def not_null_insertible_columns(self) -> Iterable[str]: + if self.__not_null_insertible_columns is None: + ret: list[str] = [] + for col in self.__columns.values(): + if not col.is_null_insertible: + ret.append(col.name) + self.__not_null_insertible_columns = ret + return self.__not_null_insertible_columns + + @property + def log_columns(self): + if self.__log_columns is None: + self.__log_columns = self._log_columns() + return self.__log_columns + + @property + def auto_increment_columns(self) -> Iterable[str]: + return self._auto_increment_columns() + + @property + def unique_constraints(self) -> List[ColumnSet]: + if self.__unique_constraints is None: + ret: List[ColumnSet] = [] + impl = self._unique_constraints() + if impl is not None: + for columns in impl: + ret.append(ColumnSet(columns=columns)) + self.__unique_constraints = ret + return self.__unique_constraints + + @property + def foreign_key_constraints(self) -> List[CompositeForeignKey]: + if self.__foreign_key_constraints is None: + ret: List[Any] = [] + for composite_key in self.__schema.foreign_key_constraints: + if composite_key.child_table == self: + ret.append(composite_key) + self.__foreign_key_constraints = ret + return self.__foreign_key_constraints