Add ldap bind auth method and CAS_TGT_VALIDITY parameter. Fix #18
This commit is contained in:
@ -30,7 +30,7 @@ try: # pragma: no cover
|
||||
except ImportError:
|
||||
ldap3 = None
|
||||
|
||||
from .models import FederatedUser
|
||||
from .models import FederatedUser, UserAttributes
|
||||
from .utils import check_password, dictfetchall
|
||||
|
||||
|
||||
@ -284,6 +284,10 @@ class LdapAuthUser(DBAuthUser): # pragma: no cover
|
||||
def __init__(self, username):
|
||||
if not ldap3:
|
||||
raise RuntimeError("Please install ldap3 before using the LdapAuthUser backend")
|
||||
if not settings.CAS_LDAP_BASE_DN:
|
||||
raise ValueError(
|
||||
"You must define CAS_LDAP_BASE_DN for using the ldap authentication backend"
|
||||
)
|
||||
# in case we got deconnected from the database, retry to connect 2 times
|
||||
for retry_nb in range(3):
|
||||
try:
|
||||
@ -294,6 +298,8 @@ class LdapAuthUser(DBAuthUser): # pragma: no cover
|
||||
attributes=ldap3.ALL_ATTRIBUTES
|
||||
) and len(conn.entries) == 1:
|
||||
user = conn.entries[0].entry_get_attributes_dict()
|
||||
# store the user dn
|
||||
user["dn"] = conn.entries[0].entry_get_dn()
|
||||
if user.get(settings.CAS_LDAP_USERNAME_ATTR):
|
||||
self.user = user
|
||||
super(LdapAuthUser, self).__init__(user[settings.CAS_LDAP_USERNAME_ATTR][0])
|
||||
@ -315,7 +321,34 @@ class LdapAuthUser(DBAuthUser): # pragma: no cover
|
||||
correct, ``False`` otherwise.
|
||||
:rtype: bool
|
||||
"""
|
||||
if self.user and self.user.get(settings.CAS_LDAP_PASSWORD_ATTR):
|
||||
if settings.CAS_LDAP_PASSWORD_CHECK == "bind":
|
||||
try:
|
||||
conn = ldap3.Connection(
|
||||
settings.CAS_LDAP_SERVER,
|
||||
self.user["dn"],
|
||||
password,
|
||||
auto_bind=True
|
||||
)
|
||||
try:
|
||||
# fetch the user attribute
|
||||
if conn.search(
|
||||
settings.CAS_LDAP_BASE_DN,
|
||||
settings.CAS_LDAP_USER_QUERY % ldap3.utils.conv.escape_bytes(self.username),
|
||||
attributes=ldap3.ALL_ATTRIBUTES
|
||||
) and len(conn.entries) == 1:
|
||||
attributes = conn.entries[0].entry_get_attributes_dict()
|
||||
attributes["dn"] = conn.entries[0].entry_get_dn()
|
||||
# cache the attributes locally as we wont have access to the user password
|
||||
# later.
|
||||
user = UserAttributes.objects.get_or_create(username=self.username)[0]
|
||||
user.attributs = attributes
|
||||
user.save()
|
||||
finally:
|
||||
conn.unbind()
|
||||
return True
|
||||
except (ldap3.LDAPBindError, ldap3.LDAPCommunicationError):
|
||||
return False
|
||||
elif self.user and self.user.get(settings.CAS_LDAP_PASSWORD_ATTR):
|
||||
return check_password(
|
||||
settings.CAS_LDAP_PASSWORD_CHECK,
|
||||
password,
|
||||
@ -325,6 +358,22 @@ class LdapAuthUser(DBAuthUser): # pragma: no cover
|
||||
else:
|
||||
return False
|
||||
|
||||
def attributs(self):
|
||||
"""
|
||||
The user attributes.
|
||||
|
||||
:return: a :class:`dict` with the user attributes. Attributes may be :func:`unicode`
|
||||
or :class:`list` of :func:`unicode`. If the user do not exists, the returned
|
||||
:class:`dict` is empty.
|
||||
:rtype: dict
|
||||
:raises NotImplementedError: if the password check method in `CAS_LDAP_PASSWORD_CHECK`
|
||||
do not allow to fetch the attributes without the user credentials.
|
||||
"""
|
||||
if settings.CAS_LDAP_PASSWORD_CHECK == "bind":
|
||||
raise NotImplementedError()
|
||||
else:
|
||||
return super(LdapAuthUser, self).attributs()
|
||||
|
||||
|
||||
class DjangoAuthUser(AuthUser): # pragma: no cover
|
||||
"""
|
||||
|
Reference in New Issue
Block a user