mirror of
ssh://git.janware.com/srv/git/janware/proj/jw-python
synced 2026-01-15 01:52:56 +01:00
160 lines
5.2 KiB
Python
160 lines
5.2 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
from typing import Optional, Union
|
|
|
|
import ldap
|
|
|
|
from ...log import *
|
|
from ... import Config
|
|
from .. import Access
|
|
from .. import Auth as AuthBase
|
|
from .. import Group as GroupBase
|
|
from .. import User as UserBase
|
|
from .. import ProjectFlags
|
|
|
|
class Group(GroupBase): # export
|
|
|
|
def __init__(self, auth: AuthBase, name: str):
|
|
self.__name = name
|
|
self.__auth = auth
|
|
|
|
def _name(self) -> str:
|
|
return self.__name
|
|
|
|
class User(UserBase):
|
|
|
|
def __init__(
|
|
self,
|
|
auth: AuthBase,
|
|
name: str,
|
|
cn: str,
|
|
email: str
|
|
):
|
|
|
|
self.__auth = auth
|
|
self.__name = name
|
|
self.__cn = cn
|
|
self.__email = email
|
|
self.__groups: Optional[list[GroupBase]] = None
|
|
|
|
def _name(self) -> str:
|
|
return self.__name
|
|
|
|
def _groups(self) -> list[GroupBase]:
|
|
raise NotImplementedError
|
|
|
|
def _email(self) -> str:
|
|
return self.__email
|
|
|
|
def _display_name(self) -> str:
|
|
return self.__cn
|
|
|
|
class Auth(AuthBase): # export
|
|
|
|
def __init__(self, conf: Config):
|
|
super().__init__(conf)
|
|
self.___users: Optional[dict[str, User]] = None
|
|
self.___user_by_email: Optional[dict[str, User]] = None
|
|
self.__groups = None
|
|
self.__current_user: User|None = None
|
|
self.__user_base_dn = conf['user_base_dn']
|
|
self.__conn = self.__bind()
|
|
self.__dummy = self.load('dummy', conf)
|
|
|
|
def __bind(self):
|
|
ldap_uri = self.conf['ldap_uri']
|
|
bind_dn = self.conf['bind_dn']
|
|
bind_pw = self.conf.get('password')
|
|
if bind_pw is None:
|
|
with open(ldap_secret_file, 'r') as file:
|
|
bind_pw = file.read()
|
|
file.closed
|
|
bind_pw = bind_pw.strip()
|
|
ret = ldap.initialize(ldap_uri)
|
|
ret.start_tls_s()
|
|
try:
|
|
rr = ret.bind_s(bind_dn, bind_pw) # method)
|
|
except Exception as e:
|
|
#pw = f' (pw={bind_pw})'
|
|
raise Exception(f'Failed to bind to {ldap_uri} with dn {bind_dn} ({e})')
|
|
return ret
|
|
|
|
@property
|
|
def __users(self) -> User:
|
|
if self.___users is None:
|
|
ret: dict[str, User] = {}
|
|
ret_by_email: dict[str, User] = {}
|
|
ldap_result_id = self.__conn.search(
|
|
self.__user_base_dn,
|
|
ldap.SCOPE_SUBTREE,
|
|
"objectClass=inetOrgPerson",
|
|
('uid', 'cn', 'uidNumber', 'mail', 'maildrop')
|
|
)
|
|
while True:
|
|
result_type, result_data = self.__conn.result(ldap_result_id, 0)
|
|
if (result_data == []):
|
|
break
|
|
if result_type != ldap.RES_SEARCH_ENTRY:
|
|
continue
|
|
for res in result_data:
|
|
try:
|
|
display_name = None
|
|
if 'displayName' in res[1]:
|
|
cn = res[1]['displayName'][0].decode('utf-8')
|
|
else:
|
|
cn = res[1]['cn'][0].decode('utf-8')
|
|
uid = res[1]['uid'][0].decode('utf-8')
|
|
uidNumber = res[1]['uidNumber'][0].decode('utf-8')
|
|
emails = []
|
|
#for attr in ['mail', 'maildrop']:
|
|
for attr in ['mail']:
|
|
if attr in res[1]:
|
|
for entry in res[1][attr]:
|
|
emails.append(entry.decode('utf-8'))
|
|
if not emails:
|
|
slog(DEBUG, f'No email for user "{uid}", skipping')
|
|
continue
|
|
user = User(self, name=uid, cn=cn, email=emails[0])
|
|
ret[uid] = user
|
|
for email in emails:
|
|
ret_by_email[email] = user
|
|
except Exception as e:
|
|
slog(WARNING, f'Exception {e}')
|
|
continue
|
|
for user in self.__dummy.users.values():
|
|
ret[user.name] = user
|
|
self.___users = ret
|
|
self.___user_by_email = ret_by_email
|
|
return self.___users
|
|
|
|
@property
|
|
def __user_by_email(self) -> User:
|
|
if self.___user_by_email is None:
|
|
self.__users
|
|
return self.___user_by_email
|
|
|
|
def _access(self, what: str, access_type: Optional[Access], who: User|GroupBase|None) -> bool: # type: ignore
|
|
slog(WARNING, f'Returning False for {access_type} access to resource {what} by {who}')
|
|
return False
|
|
|
|
def _user(self, name) -> User:
|
|
try:
|
|
return self.__users[name]
|
|
except:
|
|
slog(ERR, f'No such user: "{name}"')
|
|
raise
|
|
|
|
def _user_by_email(self, email: str) -> User:
|
|
return self.__user_by_email[email]
|
|
|
|
def _current_user(self) -> User:
|
|
raise NotImplementedError
|
|
|
|
def _users(self) -> list[User]:
|
|
return self.__users
|
|
|
|
def _projects(self, name, flags: ProjectFlags) -> list[str]:
|
|
if flags & ProjectFlags.Contributing:
|
|
# TODO: Ask LDAP
|
|
pass
|
|
return None
|