mirror of
ssh://git.janware.com/srv/git/janware/proj/jw-python
synced 2026-01-15 18:03:31 +01:00
103 lines
3.7 KiB
Python
103 lines
3.7 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
from typing import Optional, Any
|
|
|
|
from jwutils.log import *
|
|
|
|
from .ColumnSet import ColumnSet
|
|
from .SingleForeignKey import SingleForeignKey
|
|
|
|
class CompositeForeignKey: # export
|
|
|
|
def __init__(self, child_col_set: ColumnSet, parent_col_set: ColumnSet): # TODO: Implement alternative ways to construct
|
|
|
|
def __table(s):
|
|
ret = None
|
|
for c in s:
|
|
if ret is None:
|
|
ret = c.table
|
|
else:
|
|
assert(ret == c.table)
|
|
assert(ret is not None)
|
|
return ret
|
|
|
|
self.__child_col_set = child_col_set
|
|
self.__parent_col_set = parent_col_set
|
|
self.__child_table = __table(child_col_set)
|
|
self.__parent_table = __table(parent_col_set)
|
|
|
|
assert(len(self.__child_col_set) == len(self.__parent_col_set))
|
|
self.__len = len(self.__child_col_set)
|
|
self.__column_relations: Optional[list[SingleForeignKey]] = None
|
|
self.__parent_columns_by_child_column: Optional[dict[str, Any]] = None
|
|
self.__child_columns_by_parent_column: Optional[dict[str, Any]] = None
|
|
|
|
def __table_rel_str(self):
|
|
return f'{self.__child_table.name} => {self.__parent_table.name}'
|
|
|
|
def __cols_rel_str(self, child, parent):
|
|
return f'{child.name} -> {parent.name}'
|
|
|
|
def __len__(self):
|
|
return self.__len
|
|
|
|
def __iter__(self):
|
|
yield from self.column_relations
|
|
|
|
def __repr__(self):
|
|
ret = self.__table_rel_str()
|
|
ret += ': ' + ', '.join([self.__cols_rel_str(rel.child_column, rel.parent_column) for rel in self.column_relations])
|
|
return ret
|
|
|
|
def __eq__(self, rhs):
|
|
if rhs.__child_col_set != self.__child_col_set:
|
|
return False
|
|
if rhs.__parent_col_set != self.__parent_col_set:
|
|
return False
|
|
return True
|
|
|
|
@property
|
|
def child_table(self) -> Any:
|
|
return self.__child_table
|
|
|
|
@property
|
|
def parent_table(self) -> Any:
|
|
return self.__parent_table
|
|
|
|
@property
|
|
def child_columns(self) -> ColumnSet:
|
|
return self.__child_col_set
|
|
|
|
@property
|
|
def parent_columns(self) -> ColumnSet:
|
|
return self.__parent_col_set
|
|
|
|
def parent_column(self, child_column) -> Any:
|
|
child_column_name = child_column if isinstance(child_column, str) else child_column.name
|
|
if self.__parent_columns_by_child_column is None:
|
|
d: dict[str, Any] = {}
|
|
assert(len(self.__child_col_set) == len(self.__parent_col_set))
|
|
for i in range(0, len(self.__child_col_set)):
|
|
d[self.__child_col_set[i].name] = self.__parent_col_set[i]
|
|
self.__parent_columns_by_child_column = d
|
|
return self.__parent_columns_by_child_column[child_column]
|
|
|
|
def child_column(self, parent_column) -> Any:
|
|
slog(WARNING, f'{self}: Looking for child column belonging to parent column "{parent_column}"')
|
|
parent_column_name = parent_column if isinstance(parent_column, str) else parent_column.name
|
|
if self.__child_columns_by_parent_column is None:
|
|
d: dict[str, Any] = {}
|
|
assert(len(self.__parent_col_set) == len(self.__child_col_set))
|
|
for i in range(0, len(self.__parent_col_set)):
|
|
d[self.__parent_col_set[i].name] = self.__child_col_set[i]
|
|
self.__child_columns_by_parent_column = d
|
|
return self.__child_columns_by_parent_column[parent_column]
|
|
|
|
@property
|
|
def column_relations(self) -> list[Any]:
|
|
ret = []
|
|
if self.__column_relations is None:
|
|
for i in range(0, self.__len):
|
|
ret.append(SingleForeignKey(self.__child_col_set[i], self.__parent_col_set[i]))
|
|
self.__column_relations = ret
|
|
return self.__column_relations
|