jw-python/tools/python/jwutils/auth/ldap/Auth.py

145 lines
4.6 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
from typing import Optional, Union
import ldap
from ...log import *
from ...ldap import bind
from .. import Access
from .. import Auth as AuthBase
from .. import Group as GroupBase
from .. import User as UserBase
from .. import ProjectFlags
class Group(GroupBase): # export
def __init__(self, auth: AuthBase, name: str):
self.__name = name
self.__auth = auth
def _name(self) -> str:
return self.__name
class User(UserBase):
def __init__(
self,
auth: AuthBase,
name: str,
cn: str,
email: str
):
self.__auth = auth
self.__name = name
self.__cn = cn
self.__email = email
self.__groups: Optional[list[GroupBase]] = None
def _name(self) -> str:
return self.__name
def _groups(self) -> list[GroupBase]:
raise NotImplementedError
def _email(self) -> str:
return self.__email
def _display_name(self) -> str:
return self.__cn
class Auth(AuthBase): # export
def __init__(self, conf: Config):
super().__init__(conf)
self.___users: Optional[dict[str, User]] = None
self.___user_by_email: Optional[dict[str, User]] = None
self.__groups = None
self.__current_user: User|None = None
self.__user_base_dn = conf['user_base_dn']
self.__conn = self.__bind()
self.__dummy = self.load('dummy', conf)
def __bind(self):
return bind(self.conf)
@property
def __users(self) -> User:
if self.___users is None:
ret: dict[str, User] = {}
ret_by_email: dict[str, User] = {}
ldap_result_id = self.__conn.search(
self.__user_base_dn,
ldap.SCOPE_SUBTREE,
"objectClass=inetOrgPerson",
('uid', 'cn', 'uidNumber', 'mail', 'maildrop')
)
while True:
result_type, result_data = self.__conn.result(ldap_result_id, 0)
if (result_data == []):
break
if result_type != ldap.RES_SEARCH_ENTRY:
continue
for res in result_data:
try:
display_name = None
if 'displayName' in res[1]:
cn = res[1]['displayName'][0].decode('utf-8')
else:
cn = res[1]['cn'][0].decode('utf-8')
uid = res[1]['uid'][0].decode('utf-8')
uidNumber = res[1]['uidNumber'][0].decode('utf-8')
emails = []
#for attr in ['mail', 'maildrop']:
for attr in ['mail']:
if attr in res[1]:
for entry in res[1][attr]:
emails.append(entry.decode('utf-8'))
if not emails:
slog(DEBUG, f'No email for user "{uid}", skipping')
continue
user = User(self, name=uid, cn=cn, email=emails[0])
ret[uid] = user
for email in emails:
ret_by_email[email] = user
except Exception as e:
slog(WARNING, f'Exception {e}')
continue
for user in self.__dummy.users.values():
ret[user.name] = user
self.___users = ret
self.___user_by_email = ret_by_email
return self.___users
@property
def __user_by_email(self) -> User:
if self.___user_by_email is None:
self.__users
return self.___user_by_email
def _access(self, what: str, access_type: Optional[Access], who: User|GroupBase|None) -> bool: # type: ignore
slog(WARNING, f'Returning False for {access_type} access to resource {what} by {who}')
return False
def _user(self, name) -> User:
try:
return self.__users[name]
except:
slog(ERR, f'No such user: "{name}"')
raise
def _user_by_email(self, email: str) -> User:
return self.__user_by_email[email]
def _current_user(self) -> User:
raise NotImplementedError
def _users(self) -> list[User]:
return self.__users
def _projects(self, name, flags: ProjectFlags) -> list[str]:
if flags & ProjectFlags.Contributing:
# TODO: Ask LDAP
pass
return None