Add SqlAuthUser and LdapAuthUser auth classes. Deprecate the usage of SqlAuthUser in favor of SqlAuthUser.
SqlAuthUser use django databases management, and thus is compatible with all SQL databases supported by django: postgresql, mysql, sqlite3 and oracle. LdapAuthUser use the full pythonic ldap3 module
This commit is contained in:
@ -13,16 +13,25 @@
|
||||
from django.conf import settings
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.utils import timezone
|
||||
from django.db import connections, DatabaseError
|
||||
|
||||
import warnings
|
||||
from datetime import timedelta
|
||||
from six.moves import range
|
||||
try: # pragma: no cover
|
||||
import MySQLdb
|
||||
import MySQLdb.cursors
|
||||
from utils import check_password
|
||||
except ImportError:
|
||||
MySQLdb = None
|
||||
|
||||
|
||||
try: # pragma: no cover
|
||||
import ldap3
|
||||
except ImportError:
|
||||
ldap3 = None
|
||||
|
||||
from .models import FederatedUser
|
||||
from .utils import check_password, dictfetchall
|
||||
|
||||
|
||||
class AuthUser(object):
|
||||
@ -116,19 +125,46 @@ class TestAuthUser(AuthUser):
|
||||
return {}
|
||||
|
||||
|
||||
class MysqlAuthUser(AuthUser): # pragma: no cover
|
||||
class DBAuthUser(AuthUser): # pragma: no cover
|
||||
"""base class for databate based auth classes"""
|
||||
#: DB user attributes as a :class:`dict` if the username is found in the database.
|
||||
user = None
|
||||
|
||||
def attributs(self):
|
||||
"""
|
||||
The user attributes.
|
||||
|
||||
:return: a :class:`dict` with the user attributes. Attributes may be :func:`unicode`
|
||||
or :class:`list` of :func:`unicode`. If the user do not exists, the returned
|
||||
:class:`dict` is empty.
|
||||
:rtype: dict
|
||||
"""
|
||||
if self.user:
|
||||
return self.user
|
||||
else:
|
||||
return {}
|
||||
|
||||
|
||||
class MysqlAuthUser(DBAuthUser): # pragma: no cover
|
||||
"""
|
||||
A mysql authentication class: authentication user agains a mysql database
|
||||
DEPRECATED, use :class:`SqlAuthUser` instead.
|
||||
|
||||
A mysql authentication class: authenticate user agains a mysql database
|
||||
|
||||
:param unicode username: A username, stored in the :attr:`username<AuthUser.username>`
|
||||
class attribute. Valid value are fetched from the MySQL database set with
|
||||
``settings.CAS_SQL_*`` settings parameters using the query
|
||||
``settings.CAS_SQL_USER_QUERY``.
|
||||
"""
|
||||
#: Mysql user attributes as a :class:`dict` if the username is found in the database.
|
||||
user = None
|
||||
|
||||
def __init__(self, username):
|
||||
warnings.warn(
|
||||
(
|
||||
"MysqlAuthUser authentication class is deprecated: "
|
||||
"use cas_server.auth.SqlAuthUser instead"
|
||||
),
|
||||
UserWarning
|
||||
)
|
||||
# see the connect function at
|
||||
# http://mysql-python.sourceforge.net/MySQLdb.html#functions-and-attributes
|
||||
# for possible mysql config parameters.
|
||||
@ -169,24 +205,130 @@ class MysqlAuthUser(AuthUser): # pragma: no cover
|
||||
else:
|
||||
return False
|
||||
|
||||
def attributs(self):
|
||||
"""
|
||||
The user attributes.
|
||||
|
||||
:return: a :class:`dict` with the user attributes. Attributes may be :func:`unicode`
|
||||
or :class:`list` of :func:`unicode`. If the user do not exists, the returned
|
||||
:class:`dict` is empty.
|
||||
:rtype: dict
|
||||
class SqlAuthUser(DBAuthUser): # pragma: no cover
|
||||
"""
|
||||
A SQL authentication class: authenticate user agains a SQL database. The SQL database
|
||||
must be configures in settings.py as ``settings.DATABASES['cas_server']``.
|
||||
|
||||
:param unicode username: A username, stored in the :attr:`username<AuthUser.username>`
|
||||
class attribute. Valid value are fetched from the MySQL database set with
|
||||
``settings.CAS_SQL_*`` settings parameters using the query
|
||||
``settings.CAS_SQL_USER_QUERY``.
|
||||
"""
|
||||
|
||||
def __init__(self, username):
|
||||
if "cas_server" not in connections:
|
||||
raise RuntimeError("Please configure the 'cas_server' database in settings.DATABASES")
|
||||
for retry_nb in range(3):
|
||||
try:
|
||||
with connections["cas_server"].cursor() as curs:
|
||||
curs.execute(settings.CAS_SQL_USER_QUERY, (username,))
|
||||
results = dictfetchall(curs)
|
||||
if len(results) == 1:
|
||||
self.user = results[0]
|
||||
super(SqlAuthUser, self).__init__(self.user['username'])
|
||||
else:
|
||||
super(SqlAuthUser, self).__init__(username)
|
||||
break
|
||||
except DatabaseError:
|
||||
connections["cas_server"].close()
|
||||
if retry_nb == 2:
|
||||
raise
|
||||
|
||||
def test_password(self, password):
|
||||
"""
|
||||
Tests ``password`` agains the user password.
|
||||
|
||||
:param unicode password: a clear text password as submited by the user.
|
||||
:return: ``True`` if :attr:`username<AuthUser.username>` is valid and ``password`` is
|
||||
correct, ``False`` otherwise.
|
||||
:rtype: bool
|
||||
"""
|
||||
if self.user:
|
||||
return self.user
|
||||
return check_password(
|
||||
settings.CAS_SQL_PASSWORD_CHECK,
|
||||
password,
|
||||
self.user["password"],
|
||||
settings.CAS_SQL_PASSWORD_CHARSET
|
||||
)
|
||||
else:
|
||||
return {}
|
||||
return False
|
||||
|
||||
|
||||
class LdapAuthUser(DBAuthUser): # pragma: no cover
|
||||
"""
|
||||
A ldap authentication class: authenticate user against a ldap database
|
||||
|
||||
:param unicode username: A username, stored in the :attr:`username<AuthUser.username>`
|
||||
class attribute. Valid value are fetched from the ldap database set with
|
||||
``settings.CAS_LDAP_*`` settings parameters.
|
||||
"""
|
||||
|
||||
_conn = None
|
||||
|
||||
@classmethod
|
||||
def get_conn(cls):
|
||||
"""Return a connection object to the ldap database"""
|
||||
conn = cls._conn
|
||||
if conn is None or conn.closed:
|
||||
conn = ldap3.Connection(
|
||||
settings.CAS_LDAP_SERVER,
|
||||
settings.CAS_LDAP_USER,
|
||||
settings.CAS_LDAP_PASSWORD,
|
||||
auto_bind=True
|
||||
)
|
||||
cls._conn = conn
|
||||
return conn
|
||||
|
||||
def __init__(self, username):
|
||||
if not ldap3:
|
||||
raise RuntimeError("Please install ldap3 before using the LdapAuthUser backend")
|
||||
# in case we got deconnected from the database, retry to connect 2 times
|
||||
for retry_nb in range(3):
|
||||
try:
|
||||
conn = self.get_conn()
|
||||
if conn.search(
|
||||
settings.CAS_LDAP_BASE_DN,
|
||||
settings.CAS_LDAP_USER_QUERY % ldap3.utils.conv.escape_bytes(username),
|
||||
attributes=ldap3.ALL_ATTRIBUTES
|
||||
) and len(conn.entries) == 1:
|
||||
user = conn.entries[0].entry_get_attributes_dict()
|
||||
if user.get(settings.CAS_LDAP_USERNAME_ATTR):
|
||||
self.user = user
|
||||
super(LdapAuthUser, self).__init__(user[settings.CAS_LDAP_USERNAME_ATTR][0])
|
||||
else:
|
||||
super(LdapAuthUser, self).__init__(username)
|
||||
else:
|
||||
super(LdapAuthUser, self).__init__(username)
|
||||
break
|
||||
except ldap3.LDAPCommunicationError:
|
||||
if retry_nb == 2:
|
||||
raise
|
||||
|
||||
def test_password(self, password):
|
||||
"""
|
||||
Tests ``password`` agains the user password.
|
||||
|
||||
:param unicode password: a clear text password as submited by the user.
|
||||
:return: ``True`` if :attr:`username<AuthUser.username>` is valid and ``password`` is
|
||||
correct, ``False`` otherwise.
|
||||
:rtype: bool
|
||||
"""
|
||||
if self.user and self.user.get(settings.CAS_LDAP_PASSWORD_ATTR):
|
||||
return check_password(
|
||||
settings.CAS_LDAP_PASSWORD_CHECK,
|
||||
password,
|
||||
self.user[settings.CAS_LDAP_PASSWORD_ATTR][0],
|
||||
settings.CAS_LDAP_PASSWORD_CHARSET
|
||||
)
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
class DjangoAuthUser(AuthUser): # pragma: no cover
|
||||
"""
|
||||
A django auth class: authenticate user agains django internal users
|
||||
A django auth class: authenticate user against django internal users
|
||||
|
||||
:param unicode username: A username, stored in the :attr:`username<AuthUser.username>`
|
||||
class attribute. Valid value are usernames of django internal users.
|
||||
|
@ -112,12 +112,39 @@ CAS_SQL_PASSWORD = ''
|
||||
CAS_SQL_DBNAME = ''
|
||||
#: Database charset.
|
||||
CAS_SQL_DBCHARSET = 'utf8'
|
||||
|
||||
#: The query performed upon user authentication.
|
||||
CAS_SQL_USER_QUERY = 'SELECT user AS usersame, pass AS password, users.* FROM users WHERE user = %s'
|
||||
CAS_SQL_USER_QUERY = 'SELECT user AS username, pass AS password, users.* FROM users WHERE user = %s'
|
||||
#: The method used to check the user password. Must be one of ``"crypt"``, ``"ldap"``,
|
||||
#: ``"hex_md5"``, ``"hex_sha1"``, ``"hex_sha224"``, ``"hex_sha256"``, ``"hex_sha384"``,
|
||||
#: ``"hex_sha512"``, ``"plain"``.
|
||||
CAS_SQL_PASSWORD_CHECK = 'crypt' # crypt or plain
|
||||
CAS_SQL_PASSWORD_CHECK = 'crypt'
|
||||
#: charset the SQL users passwords was hash with
|
||||
CAS_SQL_PASSWORD_CHARSET = "utf-8"
|
||||
|
||||
|
||||
#: Address of the LDAP server
|
||||
CAS_LDAP_SERVER = 'localhost'
|
||||
#: LDAP user bind address, for example ``"cn=admin,dc=crans,dc=org"`` for connecting to the LDAP
|
||||
#: server.
|
||||
CAS_LDAP_USER = None
|
||||
#: LDAP connection password
|
||||
CAS_LDAP_PASSWORD = None
|
||||
#: LDAP seach base DN, for example ``"ou=data,dc=crans,dc=org"``.
|
||||
CAS_LDAP_BASE_DN = None
|
||||
#: LDAP search filter for searching user by username. User inputed usernames are escaped using
|
||||
#: :func:`ldap3.utils.conv.escape_bytes`.
|
||||
CAS_LDAP_USER_QUERY = "(uid=%s)"
|
||||
#: LDAP attribute used for users usernames
|
||||
CAS_LDAP_USERNAME_ATTR = "uid"
|
||||
#: LDAP attribute used for users passwords
|
||||
CAS_LDAP_PASSWORD_ATTR = "userPassword"
|
||||
#: The method used to check the user password. Must be one of ``"crypt"``, ``"ldap"``,
|
||||
#: ``"hex_md5"``, ``"hex_sha1"``, ``"hex_sha224"``, ``"hex_sha256"``, ``"hex_sha384"``,
|
||||
#: ``"hex_sha512"``, ``"plain"``.
|
||||
CAS_LDAP_PASSWORD_CHECK = "ldap"
|
||||
#: charset the LDAP users passwords was hash with
|
||||
CAS_LDAP_PASSWORD_CHARSET = "utf-8"
|
||||
|
||||
|
||||
#: Username of the test user.
|
||||
|
@ -582,7 +582,7 @@ def check_password(method, password, hashed_password, charset):
|
||||
:param hashed_password: The hashed password as stored in the database
|
||||
:type hashed_password: :obj:`str` or :obj:`unicode`
|
||||
:param str charset: The used char encoding (also used internally, so it must be valid for
|
||||
the charset used by ``password`` even if it is inputed as an :obj:`unicode`)
|
||||
the charset used by ``password`` when it was initially )
|
||||
:return: True if ``password`` match ``hashed_password`` using ``method``,
|
||||
``False`` otherwise
|
||||
:rtype: bool
|
||||
@ -670,3 +670,12 @@ def last_version():
|
||||
"Unable to fetch %s: %s" % (settings.CAS_NEW_VERSION_JSON_URL, error)
|
||||
)
|
||||
last_version._cache = (time.time(), version, False)
|
||||
|
||||
|
||||
def dictfetchall(cursor):
|
||||
"Return all rows from a django cursor as a dict"
|
||||
columns = [col[0] for col in cursor.description]
|
||||
return [
|
||||
dict(zip(columns, row))
|
||||
for row in cursor.fetchall()
|
||||
]
|
||||
|
Reference in New Issue
Block a user