# -*- coding: utf-8 -*- from typing import Optional, Union import ldap from ...log import * from ...ldap import bind from ...Config import Config from .. import Access from .. import Auth as AuthBase from .. import Group as GroupBase from .. import User as UserBase from .. import ProjectFlags class Group(GroupBase): # export def __init__(self, auth: AuthBase, name: str): self.__name = name self.__auth = auth def _name(self) -> str: return self.__name class User(UserBase): def __init__( self, auth: AuthBase, name: str, cn: str, email: str ): self.__auth = auth self.__name = name self.__cn = cn self.__email = email self.__groups: Optional[list[GroupBase]] = None def _name(self) -> str: return self.__name def _groups(self) -> list[GroupBase]: raise NotImplementedError def _email(self) -> str: return self.__email def _display_name(self) -> str: return self.__cn class Auth(AuthBase): # export def __init__(self, conf: Config): super().__init__(conf) self.___users: Optional[dict[str, UserBase]] = None self.___user_by_email: Optional[dict[str, User]] = None self.__groups = None self.__current_user: User|None = None self.__user_base_dn = conf['user_base_dn'] self.__conn = self.__bind() self.__dummy = self.load(conf, 'dummy') def __bind(self): return bind(self.conf) @property def __users(self) -> dict[str, UserBase]: if self.___users is None: ret: dict[str, UserBase] = {} ret_by_email: dict[str, User] = {} for res in self.__conn.find( self.__user_base_dn, ldap.SCOPE_SUBTREE, "objectClass=inetOrgPerson", ('uid', 'cn', 'uidNumber', 'mail', 'maildrop') ): try: display_name = None if 'displayName' in res[1]: cn = res[1]['displayName'][0].decode('utf-8') else: cn = res[1]['cn'][0].decode('utf-8') uid = res[1]['uid'][0].decode('utf-8') uidNumber = res[1]['uidNumber'][0].decode('utf-8') emails = [] #for attr in ['mail', 'maildrop']: for attr in ['mail']: if attr in res[1]: for entry in res[1][attr]: emails.append(entry.decode('utf-8')) if not emails: slog(DEBUG, f'No email for user "{uid}", skipping') continue user = User(self, name=uid, cn=cn, email=emails[0]) ret[uid] = user for email in emails: ret_by_email[email] = user except Exception as e: slog(WARNING, f'Exception {e}') raise continue for dummy_user in self.__dummy.users.values(): ret[dummy_user.name] = dummy_user self.___users = ret self.___user_by_email = ret_by_email return self.___users @property def __user_by_email(self) -> dict[str, UserBase]: if self.___user_by_email is None: self.__users return self.___user_by_email # type: ignore # We are sure that ___user_by_email is not None at this point def _access(self, what: str, access_type: Optional[Access], who: User|GroupBase|None) -> bool: # type: ignore slog(WARNING, f'Returning False for {access_type} access to resource {what} by {who}') return False def _user(self, name) -> UserBase: try: return self.__users[name] except: slog(ERR, f'No such user: "{name}"') raise def _user_by_email(self, email: str) -> UserBase: return self.__user_by_email[email] def _current_user(self) -> User: raise NotImplementedError def _users(self) -> dict[str, UserBase]: return self.__users def _projects(self, name, flags: ProjectFlags) -> list[str]: if flags & ProjectFlags.Contributing: # TODO: Ask LDAP slog(WARNING, f'Querying LDAP for projects a user contributes to is not implemented, ignoring') return []