2025-06-05 20:48:14 +02:00
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
|
|
|
|
from typing import Optional, Union
|
|
|
|
|
|
|
|
|
|
import ldap
|
|
|
|
|
|
|
|
|
|
from ...log import *
|
2025-07-26 13:51:42 +02:00
|
|
|
from ...ldap import bind
|
2025-09-11 21:35:01 +02:00
|
|
|
from ...Config import Config
|
2025-06-05 20:48:14 +02:00
|
|
|
from .. import Access
|
|
|
|
|
from .. import Auth as AuthBase
|
|
|
|
|
from .. import Group as GroupBase
|
|
|
|
|
from .. import User as UserBase
|
|
|
|
|
from .. import ProjectFlags
|
|
|
|
|
|
|
|
|
|
class Group(GroupBase): # export
|
|
|
|
|
|
|
|
|
|
def __init__(self, auth: AuthBase, name: str):
|
|
|
|
|
self.__name = name
|
|
|
|
|
self.__auth = auth
|
|
|
|
|
|
|
|
|
|
def _name(self) -> str:
|
|
|
|
|
return self.__name
|
|
|
|
|
|
|
|
|
|
class User(UserBase):
|
|
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
|
self,
|
|
|
|
|
auth: AuthBase,
|
|
|
|
|
name: str,
|
|
|
|
|
cn: str,
|
|
|
|
|
email: str
|
|
|
|
|
):
|
|
|
|
|
|
|
|
|
|
self.__auth = auth
|
|
|
|
|
self.__name = name
|
|
|
|
|
self.__cn = cn
|
|
|
|
|
self.__email = email
|
|
|
|
|
self.__groups: Optional[list[GroupBase]] = None
|
|
|
|
|
|
|
|
|
|
def _name(self) -> str:
|
|
|
|
|
return self.__name
|
|
|
|
|
|
|
|
|
|
def _groups(self) -> list[GroupBase]:
|
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
|
|
|
|
def _email(self) -> str:
|
|
|
|
|
return self.__email
|
|
|
|
|
|
|
|
|
|
def _display_name(self) -> str:
|
|
|
|
|
return self.__cn
|
|
|
|
|
|
|
|
|
|
class Auth(AuthBase): # export
|
|
|
|
|
|
|
|
|
|
def __init__(self, conf: Config):
|
|
|
|
|
super().__init__(conf)
|
2025-10-13 12:45:51 +02:00
|
|
|
self.___users: Optional[dict[str, UserBase]] = None
|
2025-06-05 20:48:14 +02:00
|
|
|
self.___user_by_email: Optional[dict[str, User]] = None
|
|
|
|
|
self.__groups = None
|
|
|
|
|
self.__current_user: User|None = None
|
|
|
|
|
self.__user_base_dn = conf['user_base_dn']
|
|
|
|
|
self.__conn = self.__bind()
|
2025-09-18 23:11:22 +02:00
|
|
|
self.__dummy = self.load(conf, 'dummy')
|
2025-06-05 20:48:14 +02:00
|
|
|
|
|
|
|
|
def __bind(self):
|
2025-07-26 13:51:42 +02:00
|
|
|
return bind(self.conf)
|
2025-06-05 20:48:14 +02:00
|
|
|
|
|
|
|
|
@property
|
2025-10-13 12:45:51 +02:00
|
|
|
def __users(self) -> dict[str, UserBase]:
|
2025-06-05 20:48:14 +02:00
|
|
|
if self.___users is None:
|
2025-10-13 12:45:51 +02:00
|
|
|
ret: dict[str, UserBase] = {}
|
2025-06-05 20:48:14 +02:00
|
|
|
ret_by_email: dict[str, User] = {}
|
2025-09-11 21:35:01 +02:00
|
|
|
for res in self.__conn.find(
|
2025-06-05 20:48:14 +02:00
|
|
|
self.__user_base_dn,
|
|
|
|
|
ldap.SCOPE_SUBTREE,
|
|
|
|
|
"objectClass=inetOrgPerson",
|
|
|
|
|
('uid', 'cn', 'uidNumber', 'mail', 'maildrop')
|
2025-09-11 21:35:01 +02:00
|
|
|
):
|
|
|
|
|
try:
|
|
|
|
|
display_name = None
|
|
|
|
|
if 'displayName' in res[1]:
|
|
|
|
|
cn = res[1]['displayName'][0].decode('utf-8')
|
|
|
|
|
else:
|
|
|
|
|
cn = res[1]['cn'][0].decode('utf-8')
|
|
|
|
|
uid = res[1]['uid'][0].decode('utf-8')
|
|
|
|
|
uidNumber = res[1]['uidNumber'][0].decode('utf-8')
|
|
|
|
|
emails = []
|
|
|
|
|
#for attr in ['mail', 'maildrop']:
|
|
|
|
|
for attr in ['mail']:
|
|
|
|
|
if attr in res[1]:
|
|
|
|
|
for entry in res[1][attr]:
|
|
|
|
|
emails.append(entry.decode('utf-8'))
|
|
|
|
|
if not emails:
|
|
|
|
|
slog(DEBUG, f'No email for user "{uid}", skipping')
|
2025-06-05 20:48:14 +02:00
|
|
|
continue
|
2025-09-11 21:35:01 +02:00
|
|
|
user = User(self, name=uid, cn=cn, email=emails[0])
|
|
|
|
|
ret[uid] = user
|
|
|
|
|
for email in emails:
|
|
|
|
|
ret_by_email[email] = user
|
|
|
|
|
except Exception as e:
|
|
|
|
|
slog(WARNING, f'Exception {e}')
|
|
|
|
|
raise
|
|
|
|
|
continue
|
2025-10-13 12:45:51 +02:00
|
|
|
for dummy_user in self.__dummy.users.values():
|
|
|
|
|
ret[dummy_user.name] = dummy_user
|
2025-06-05 20:48:14 +02:00
|
|
|
self.___users = ret
|
|
|
|
|
self.___user_by_email = ret_by_email
|
|
|
|
|
return self.___users
|
|
|
|
|
|
|
|
|
|
@property
|
2025-10-13 12:45:51 +02:00
|
|
|
def __user_by_email(self) -> dict[str, UserBase]:
|
2025-06-05 20:48:14 +02:00
|
|
|
if self.___user_by_email is None:
|
|
|
|
|
self.__users
|
2025-10-13 12:45:51 +02:00
|
|
|
return self.___user_by_email # type: ignore # We are sure that ___user_by_email is not None at this point
|
2025-06-05 20:48:14 +02:00
|
|
|
|
|
|
|
|
def _access(self, what: str, access_type: Optional[Access], who: User|GroupBase|None) -> bool: # type: ignore
|
|
|
|
|
slog(WARNING, f'Returning False for {access_type} access to resource {what} by {who}')
|
|
|
|
|
return False
|
|
|
|
|
|
2025-10-13 12:45:51 +02:00
|
|
|
def _user(self, name) -> UserBase:
|
2025-06-05 20:48:14 +02:00
|
|
|
try:
|
|
|
|
|
return self.__users[name]
|
|
|
|
|
except:
|
|
|
|
|
slog(ERR, f'No such user: "{name}"')
|
|
|
|
|
raise
|
|
|
|
|
|
2025-10-13 12:45:51 +02:00
|
|
|
def _user_by_email(self, email: str) -> UserBase:
|
2025-06-05 20:48:14 +02:00
|
|
|
return self.__user_by_email[email]
|
|
|
|
|
|
|
|
|
|
def _current_user(self) -> User:
|
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
2025-10-13 12:45:51 +02:00
|
|
|
def _users(self) -> dict[str, UserBase]:
|
2025-06-05 20:48:14 +02:00
|
|
|
return self.__users
|
|
|
|
|
|
|
|
|
|
def _projects(self, name, flags: ProjectFlags) -> list[str]:
|
|
|
|
|
if flags & ProjectFlags.Contributing:
|
|
|
|
|
# TODO: Ask LDAP
|
2025-10-13 12:45:51 +02:00
|
|
|
slog(WARNING, f'Querying LDAP for projects a user contributes to is not implemented, ignoring')
|
|
|
|
|
return []
|