# -*- coding: utf-8 -*- from typing import Optional, Union import ldap from ...log import * from ...ldap import bind from .. import Access from .. import Auth as AuthBase from .. import Group as GroupBase from .. import User as UserBase from .. import ProjectFlags class Group(GroupBase): # export def __init__(self, auth: AuthBase, name: str): self.__name = name self.__auth = auth def _name(self) -> str: return self.__name class User(UserBase): def __init__( self, auth: AuthBase, name: str, cn: str, email: str ): self.__auth = auth self.__name = name self.__cn = cn self.__email = email self.__groups: Optional[list[GroupBase]] = None def _name(self) -> str: return self.__name def _groups(self) -> list[GroupBase]: raise NotImplementedError def _email(self) -> str: return self.__email def _display_name(self) -> str: return self.__cn class Auth(AuthBase): # export def __init__(self, conf: Config): super().__init__(conf) self.___users: Optional[dict[str, User]] = None self.___user_by_email: Optional[dict[str, User]] = None self.__groups = None self.__current_user: User|None = None self.__user_base_dn = conf['user_base_dn'] self.__conn = self.__bind() self.__dummy = self.load('dummy', conf) def __bind(self): return bind(self.conf) @property def __users(self) -> User: if self.___users is None: ret: dict[str, User] = {} ret_by_email: dict[str, User] = {} ldap_result_id = self.__conn.search( self.__user_base_dn, ldap.SCOPE_SUBTREE, "objectClass=inetOrgPerson", ('uid', 'cn', 'uidNumber', 'mail', 'maildrop') ) while True: result_type, result_data = self.__conn.result(ldap_result_id, 0) if (result_data == []): break if result_type != ldap.RES_SEARCH_ENTRY: continue for res in result_data: try: display_name = None if 'displayName' in res[1]: cn = res[1]['displayName'][0].decode('utf-8') else: cn = res[1]['cn'][0].decode('utf-8') uid = res[1]['uid'][0].decode('utf-8') uidNumber = res[1]['uidNumber'][0].decode('utf-8') emails = [] #for attr in ['mail', 'maildrop']: for attr in ['mail']: if attr in res[1]: for entry in res[1][attr]: emails.append(entry.decode('utf-8')) if not emails: slog(DEBUG, f'No email for user "{uid}", skipping') continue user = User(self, name=uid, cn=cn, email=emails[0]) ret[uid] = user for email in emails: ret_by_email[email] = user except Exception as e: slog(WARNING, f'Exception {e}') continue for user in self.__dummy.users.values(): ret[user.name] = user self.___users = ret self.___user_by_email = ret_by_email return self.___users @property def __user_by_email(self) -> User: if self.___user_by_email is None: self.__users return self.___user_by_email def _access(self, what: str, access_type: Optional[Access], who: User|GroupBase|None) -> bool: # type: ignore slog(WARNING, f'Returning False for {access_type} access to resource {what} by {who}') return False def _user(self, name) -> User: try: return self.__users[name] except: slog(ERR, f'No such user: "{name}"') raise def _user_by_email(self, email: str) -> User: return self.__user_by_email[email] def _current_user(self) -> User: raise NotImplementedError def _users(self) -> list[User]: return self.__users def _projects(self, name, flags: ProjectFlags) -> list[str]: if flags & ProjectFlags.Contributing: # TODO: Ask LDAP pass return None