Add some docs using sphinx autodoc

This commit is contained in:
Valentin Samir
2016-07-20 18:28:23 +02:00
parent 28dd67cb32
commit cec0cadb7a
20 changed files with 1223 additions and 234 deletions

View File

@ -9,4 +9,5 @@
#
# (c) 2015-2016 Valentin Samir
"""A django CAS server application"""
#: path the the application configuration class
default_app_config = 'cas_server.apps.CasAppConfig'

View File

@ -15,86 +15,155 @@ from .models import Username, ReplaceAttributName, ReplaceAttributValue, FilterA
from .models import FederatedIendityProvider
from .forms import TicketForm
TICKETS_READONLY_FIELDS = ('validate', 'service', 'service_pattern',
'creation', 'renew', 'single_log_out', 'value')
TICKETS_FIELDS = ('validate', 'service', 'service_pattern',
'creation', 'renew', 'single_log_out')
class BaseInlines(admin.TabularInline):
"""
Bases: :class:`django.contrib.admin.TabularInline`
Base class for inlines in the admin interface.
"""
#: This controls the number of extra forms the formset will display in addition to
#: the initial forms.
extra = 0
class ServiceTicketInline(admin.TabularInline):
"""`ServiceTicket` in admin interface"""
class UserAdminInlines(BaseInlines):
"""
Bases: :class:`BaseInlines`
Base class for inlines in :class:`UserAdmin` interface
"""
#: The form :class:`TicketForm<cas_server.forms.TicketForm>` used to display tickets.
form = TicketForm
#: Fields to display on a object that are read only (not editable).
readonly_fields = (
'validate', 'service', 'service_pattern',
'creation', 'renew', 'single_log_out', 'value'
)
#: Fields to display on a object.
fields = (
'validate', 'service', 'service_pattern',
'creation', 'renew', 'single_log_out'
)
class ServiceTicketInline(UserAdminInlines):
"""
Bases: :class:`UserAdminInlines`
:class:`ServiceTicket<cas_server.models.ServiceTicket>` in admin interface
"""
#: The model which the inline is using.
model = ServiceTicket
extra = 0
form = TicketForm
readonly_fields = TICKETS_READONLY_FIELDS
fields = TICKETS_FIELDS
class ProxyTicketInline(admin.TabularInline):
"""`ProxyTicket` in admin interface"""
class ProxyTicketInline(UserAdminInlines):
"""
Bases: :class:`UserAdminInlines`
:class:`ProxyTicket<cas_server.models.ProxyTicket>` in admin interface
"""
#: The model which the inline is using.
model = ProxyTicket
extra = 0
form = TicketForm
readonly_fields = TICKETS_READONLY_FIELDS
fields = TICKETS_FIELDS
class ProxyGrantingInline(admin.TabularInline):
"""`ProxyGrantingTicket` in admin interface"""
class ProxyGrantingInline(UserAdminInlines):
"""
Bases: :class:`UserAdminInlines`
:class:`ProxyGrantingTicket<cas_server.models.ProxyGrantingTicket>` in admin interface
"""
#: The model which the inline is using.
model = ProxyGrantingTicket
extra = 0
form = TicketForm
readonly_fields = TICKETS_READONLY_FIELDS
fields = TICKETS_FIELDS[1:]
class UserAdmin(admin.ModelAdmin):
"""`User` in admin interface"""
"""
Bases: :class:`django.contrib.admin.ModelAdmin`
:class:`User<cas_server.models.User>` in admin interface
"""
#: See :class:`ServiceTicketInline`, :class:`ProxyTicketInline`, :class:`ProxyGrantingInline`
#: objects below the :class:`UserAdmin` fields.
inlines = (ServiceTicketInline, ProxyTicketInline, ProxyGrantingInline)
#: Fields to display on a object that are read only (not editable).
readonly_fields = ('username', 'date', "session_key")
#: Fields to display on a object.
fields = ('username', 'date', "session_key")
#: Fields to display on the list of class:`UserAdmin` objects.
list_display = ('username', 'date', "session_key")
class UsernamesInline(admin.TabularInline):
"""`Username` in admin interface"""
class UsernamesInline(BaseInlines):
"""
Bases: :class:`BaseInlines`
:class:`Username<cas_server.models.Username>` in admin interface
"""
#: The model which the inline is using.
model = Username
extra = 0
class ReplaceAttributNameInline(admin.TabularInline):
"""`ReplaceAttributName` in admin interface"""
class ReplaceAttributNameInline(BaseInlines):
"""
Bases: :class:`BaseInlines`
:class:`ReplaceAttributName<cas_server.models.ReplaceAttributName>` in admin interface
"""
#: The model which the inline is using.
model = ReplaceAttributName
extra = 0
class ReplaceAttributValueInline(admin.TabularInline):
"""`ReplaceAttributValue` in admin interface"""
class ReplaceAttributValueInline(BaseInlines):
"""
Bases: :class:`BaseInlines`
:class:`ReplaceAttributValue<cas_server.models.ReplaceAttributValue>` in admin interface
"""
#: The model which the inline is using.
model = ReplaceAttributValue
extra = 0
class FilterAttributValueInline(admin.TabularInline):
"""`FilterAttributValue` in admin interface"""
class FilterAttributValueInline(BaseInlines):
"""
Bases: :class:`BaseInlines`
:class:`FilterAttributValue<cas_server.models.FilterAttributValue>` in admin interface
"""
#: The model which the inline is using.
model = FilterAttributValue
extra = 0
class ServicePatternAdmin(admin.ModelAdmin):
"""`ServicePattern` in admin interface"""
"""
Bases: :class:`django.contrib.admin.ModelAdmin`
:class:`ServicePattern<cas_server.models.ServicePattern>` in admin interface
"""
#: See :class:`UsernamesInline`, :class:`ReplaceAttributNameInline`,
#: :class:`ReplaceAttributValueInline`, :class:`FilterAttributValueInline` objects below
#: the :class:`ServicePatternAdmin` fields.
inlines = (
UsernamesInline,
ReplaceAttributNameInline,
ReplaceAttributValueInline,
FilterAttributValueInline
)
#: Fields to display on the list of class:`ServicePatternAdmin` objects.
list_display = ('pos', 'name', 'pattern', 'proxy',
'single_log_out', 'proxy_callback', 'restrict_users')
class FederatedIendityProviderAdmin(admin.ModelAdmin):
"""`FederatedIendityProvider` in admin interface"""
"""
Bases: :class:`django.contrib.admin.ModelAdmin`
:class:`FederatedIendityProvider<cas_server.models.FederatedIendityProvider>` in admin
interface
"""
#: Fields to display on a object.
fields = ('pos', 'suffix', 'server_url', 'cas_protocol_version', 'verbose_name', 'display')
#: Fields to display on the list of class:`FederatedIendityProviderAdmin` objects.
list_display = ('verbose_name', 'suffix', 'display')

View File

@ -14,6 +14,12 @@ from django.apps import AppConfig
class CasAppConfig(AppConfig):
"""django CAS application config class"""
"""
Bases: :class:`django.apps.AppConfig`
django CAS application config class
"""
#: Full Python path to the application. It must be unique across a Django project.
name = 'cas_server'
#: Human-readable name for the application.
verbose_name = _('Central Authentication Service')

View File

@ -26,55 +26,112 @@ from .models import FederatedUser
class AuthUser(object):
"""Authentication base class"""
"""
Authentication base class
:param unicode username: A username, stored in the :attr:`username` class attribute.
"""
#: username used to instanciate the current object
username = None
def __init__(self, username):
self.username = username
def test_password(self, password):
"""test `password` agains the user"""
"""
Tests ``password`` agains the user password.
:raises NotImplementedError: always. The method need to be implemented by subclasses
"""
raise NotImplementedError()
def attributs(self):
"""return a dict of user attributes"""
"""
The user attributes.
raises NotImplementedError: always. The method need to be implemented by subclasses
"""
raise NotImplementedError()
class DummyAuthUser(AuthUser): # pragma: no cover
"""A Dummy authentication class"""
"""
A Dummy authentication class. Authentication always fails
def __init__(self, username):
super(DummyAuthUser, self).__init__(username)
:param unicode username: A username, stored in the :attr:`username<AuthUser.username>`
class attribute. There is no valid value for this attribute here.
"""
def test_password(self, password):
"""test `password` agains the user"""
"""
Tests ``password`` agains the user password.
:param unicode password: a clear text password as submited by the user.
:return: always ``False``
:rtype: bool
"""
return False
def attributs(self):
"""return a dict of user attributes"""
"""
The user attributes.
:return: en empty :class:`dict`.
:rtype: dict
"""
return {}
class TestAuthUser(AuthUser):
"""A test authentication class with one user test having
alose test as password and some attributes"""
"""
A test authentication class only working for one unique user.
def __init__(self, username):
super(TestAuthUser, self).__init__(username)
:param unicode username: A username, stored in the :attr:`username<AuthUser.username>`
class attribute. The uniq valid value is ``settings.CAS_TEST_USER``.
"""
def test_password(self, password):
"""test `password` agains the user"""
"""
Tests ``password`` agains the user password.
:param unicode password: a clear text password as submited by the user.
:return: ``True`` if :attr:`username<AuthUser.username>` is valid and
``password`` is equal to ``settings.CAS_TEST_PASSWORD``, ``False`` otherwise.
:rtype: bool
"""
return self.username == settings.CAS_TEST_USER and password == settings.CAS_TEST_PASSWORD
def attributs(self):
"""return a dict of user attributes"""
return settings.CAS_TEST_ATTRIBUTES
"""
The user attributes.
:return: the ``settings.CAS_TEST_ATTRIBUTES`` :class:`dict` if
:attr:`username<AuthUser.username>` is valid, an empty :class:`dict` otherwise.
:rtype: dict
"""
if self.username == settings.CAS_TEST_USER:
return settings.CAS_TEST_ATTRIBUTES
else:
return {}
class MysqlAuthUser(AuthUser): # pragma: no cover
"""A mysql auth class: authentication user agains a mysql database"""
"""
A mysql authentication class: authentication user agains a mysql database
:param unicode username: A username, stored in the :attr:`username<AuthUser.username>`
class attribute. Valid value are fetched from the MySQL database set with
``settings.CAS_SQL_*`` settings parameters using the query
``settings.CAS_SQL_USER_QUERY``.
"""
#: Mysql user attributes as a :class:`dict` if the username is found in the database.
user = None
def __init__(self, username):
# see the connect function at
# http://mysql-python.sourceforge.net/MySQLdb.html#functions-and-attributes
# for possible mysql config parameters.
mysql_config = {
"user": settings.CAS_SQL_USERNAME,
"passwd": settings.CAS_SQL_PASSWORD,
@ -94,7 +151,14 @@ class MysqlAuthUser(AuthUser): # pragma: no cover
super(MysqlAuthUser, self).__init__(username)
def test_password(self, password):
"""test `password` agains the user"""
"""
Tests ``password`` agains the user password.
:param unicode password: a clear text password as submited by the user.
:return: ``True`` if :attr:`username<AuthUser.username>` is valid and ``password`` is
correct, ``False`` otherwise.
:rtype: bool
"""
if self.user:
return check_password(
settings.CAS_SQL_PASSWORD_CHECK,
@ -106,7 +170,14 @@ class MysqlAuthUser(AuthUser): # pragma: no cover
return False
def attributs(self):
"""return a dict of user attributes"""
"""
The user attributes.
:return: a :class:`dict` with the user attributes. Attributes may be :func:`unicode`
or :class:`list` of :func:`unicode`. If the user do not exists, the returned
:class:`dict` is empty.
:rtype: dict
"""
if self.user:
return self.user
else:
@ -114,7 +185,14 @@ class MysqlAuthUser(AuthUser): # pragma: no cover
class DjangoAuthUser(AuthUser): # pragma: no cover
"""A django auth class: authenticate user agains django internal users"""
"""
A django auth class: authenticate user agains django internal users
:param unicode username: A username, stored in the :attr:`username<AuthUser.username>`
class attribute. Valid value are usernames of django internal users.
"""
#: a django user object if the username is found. The user model is retreived
#: using :func:`django.contrib.auth.get_user_model`.
user = None
def __init__(self, username):
@ -126,14 +204,27 @@ class DjangoAuthUser(AuthUser): # pragma: no cover
super(DjangoAuthUser, self).__init__(username)
def test_password(self, password):
"""test `password` agains the user"""
"""
Tests ``password`` agains the user password.
:param unicode password: a clear text password as submited by the user.
:return: ``True`` if :attr:`user` is valid and ``password`` is
correct, ``False`` otherwise.
:rtype: bool
"""
if self.user:
return self.user.check_password(password)
else:
return False
def attributs(self):
"""return a dict of user attributes"""
"""
The user attributes, defined as the fields on the :attr:`user` object.
:return: a :class:`dict` with the :attr:`user` object fields. Attributes may be
If the user do not exists, the returned :class:`dict` is empty.
:rtype: dict
"""
if self.user:
attr = {}
for field in self.user._meta.fields:
@ -144,7 +235,16 @@ class DjangoAuthUser(AuthUser): # pragma: no cover
class CASFederateAuth(AuthUser):
"""Authentication class used then CAS_FEDERATE is True"""
"""
Authentication class used then CAS_FEDERATE is True
:param unicode username: A username, stored in the :attr:`username<AuthUser.username>`
class attribute. Valid value are usernames of
:class:`FederatedUser<cas_server.models.FederatedUser>` object.
:class:`FederatedUser<cas_server.models.FederatedUser>` object are created on CAS
backends successful ticket validation.
"""
#: a :class`FederatedUser<cas_server.models.FederatedUser>` object if ``username`` is found.
user = None
def __init__(self, username):
@ -157,7 +257,17 @@ class CASFederateAuth(AuthUser):
super(CASFederateAuth, self).__init__(username)
def test_password(self, ticket):
"""test `password` agains the user"""
"""
Tests ``password`` agains the user password.
:param unicode password: The CAS tickets just used to validate the user authentication
against its CAS backend.
:return: ``True`` if :attr:`user` is valid and ``password`` is
a ticket validated less than ``settings.CAS_TICKET_VALIDITY`` secondes and has not
being previously used for authenticated this
:class:`FederatedUser<cas_server.models.FederatedUser>`. ``False`` otherwise.
:rtype: bool
"""
if not self.user or not self.user.ticket:
return False
else:
@ -168,7 +278,13 @@ class CASFederateAuth(AuthUser):
)
def attributs(self):
"""return a dict of user attributes"""
"""
The user attributes, as returned by the CAS backend.
:return: :obj:`FederatedUser.attributs<cas_server.models.FederatedUser.attributs>`.
If the user do not exists, the returned :class:`dict` is empty.
:rtype: dict
"""
if not self.user: # pragma: no cover (should not happen)
return {}
else:

View File

@ -10,25 +10,32 @@
#
# (c) 2016 Valentin Samir
"""federated mode helper classes"""
from .default_settings import settings
from .default_settings import SessionStore
from django.db import IntegrityError
from .cas import CASClient
from .models import FederatedUser, FederateSLO, User
import logging
from importlib import import_module
from six.moves import urllib
SessionStore = import_module(settings.SESSION_ENGINE).SessionStore
#: logger facility
logger = logging.getLogger(__name__)
class CASFederateValidateUser(object):
"""Class CAS client used to authenticate the user again a CAS provider"""
"""
Class CAS client used to authenticate the user again a CAS provider
:param cas_server.models.FederatedIendityProvider provider: The provider to use for
authenticate the user.
:param unicode service_url: The service url to transmit to the ``provider``.
"""
#: the provider returned username
username = None
#: the provider returned attributes
attributs = {}
#: the CAS client instance
client = None
def __init__(self, provider, service_url):
@ -41,15 +48,31 @@ class CASFederateValidateUser(object):
)
def get_login_url(self):
"""return the CAS provider login url"""
"""
:return: the CAS provider login url
:rtype: unicode
"""
return self.client.get_login_url()
def get_logout_url(self, redirect_url=None):
"""return the CAS provider logout url"""
"""
:param redirect_url: The url to redirect to after logout from the provider, if provided.
:type redirect_url: :obj:`unicode` or :obj:`NoneType<types.NoneType>`
:return: the CAS provider logout url
:rtype: unicode
"""
return self.client.get_logout_url(redirect_url)
def verify_ticket(self, ticket):
"""test `ticket` agains the CAS provider, if valid, create the local federated user"""
"""
test ``ticket`` agains the CAS provider, if valid, create a
:class:`FederatedUser<cas_server.models.FederatedUser>` matching provider returned
username and attributes.
:param unicode ticket: The ticket to validate against the provider CAS
:return: ``True`` if the validation succeed, else ``False``.
:rtype: bool
"""
try:
username, attributs = self.client.verify_ticket(ticket)[:2]
except urllib.error.URLError:
@ -73,7 +96,15 @@ class CASFederateValidateUser(object):
@staticmethod
def register_slo(username, session_key, ticket):
"""association a ticket with a (username, session) for processing later SLO request"""
"""
association a ``ticket`` with a (``username``, ``session_key``) for processing later SLO
request by creating a :class:`cas_server.models.FederateSLO` object.
:param unicode username: A logged user username, with the ``@`` component.
:param unicode session_key: A logged user session_key matching ``username``.
:param unicode ticket: A ticket used to authentication ``username`` for the session
``session_key``.
"""
try:
FederateSLO.objects.create(
username=username,
@ -84,7 +115,14 @@ class CASFederateValidateUser(object):
pass
def clean_sessions(self, logout_request):
"""process a SLO request"""
"""
process a SLO request: Search for ticket values in ``logout_request``. For each
ticket value matching a :class:`cas_server.models.FederateSLO`, disconnect the
corresponding user.
:param unicode logout_request: An XML document contening one or more Single Log Out
requests.
"""
try:
slos = self.client.get_saml_slos(logout_request) or []
except NameError: # pragma: no cover (should not happen)

View File

@ -19,20 +19,33 @@ import cas_server.models as models
class WarnForm(forms.Form):
"""Form used on warn page before emiting a ticket"""
"""
Bases: :class:`django.forms.Form`
Form used on warn page before emiting a ticket
"""
#: The service url for which the user want a ticket
service = forms.CharField(widget=forms.HiddenInput(), required=False)
#: Is the service asking the authentication renewal ?
renew = forms.BooleanField(widget=forms.HiddenInput(), required=False)
#: Url to redirect to if the authentication fail (user not authenticated or bad service)
gateway = forms.CharField(widget=forms.HiddenInput(), required=False)
method = forms.CharField(widget=forms.HiddenInput(), required=False)
#: ``True`` if the user has been warned of the ticket emission
warned = forms.BooleanField(widget=forms.HiddenInput(), required=False)
#: A valid LoginTicket to prevent POST replay
lt = forms.CharField(widget=forms.HiddenInput(), required=False)
class FederateSelect(forms.Form):
"""
Form used on the login page when CAS_FEDERATE is True
allowing the user to choose a identity provider.
Bases: :class:`django.forms.Form`
Form used on the login page when ``settings.CAS_FEDERATE`` is ``True``
allowing the user to choose an identity provider.
"""
#: The providers the user can choose to be used as authentication backend
provider = forms.ModelChoiceField(
queryset=models.FederatedIendityProvider.objects.filter(display=True).order_by(
"pos",
@ -42,27 +55,49 @@ class FederateSelect(forms.Form):
to_field_name="suffix",
label=_('Identity provider'),
)
#: The service url for which the user want a ticket
service = forms.CharField(label=_('service'), widget=forms.HiddenInput(), required=False)
method = forms.CharField(widget=forms.HiddenInput(), required=False)
#: A checkbox to remember the user choices of :attr:`provider<FederateSelect.provider>`
remember = forms.BooleanField(label=_('Remember the identity provider'), required=False)
#: A checkbox to ask to be warn before emiting a ticket for another service
warn = forms.BooleanField(label=_('warn'), required=False)
#: Is the service asking the authentication renewal ?
renew = forms.BooleanField(widget=forms.HiddenInput(), required=False)
class UserCredential(forms.Form):
"""Form used on the login page to retrive user credentials"""
"""
Bases: :class:`django.forms.Form`
Form used on the login page to retrive user credentials
"""
#: The user username
username = forms.CharField(label=_('login'))
#: The service url for which the user want a ticket
service = forms.CharField(label=_('service'), widget=forms.HiddenInput(), required=False)
#: The user password
password = forms.CharField(label=_('password'), widget=forms.PasswordInput)
#: A valid LoginTicket to prevent POST replay
lt = forms.CharField(widget=forms.HiddenInput(), required=False)
method = forms.CharField(widget=forms.HiddenInput(), required=False)
#: A checkbox to ask to be warn before emiting a ticket for another service
warn = forms.BooleanField(label=_('warn'), required=False)
#: Is the service asking the authentication renewal ?
renew = forms.BooleanField(widget=forms.HiddenInput(), required=False)
def __init__(self, *args, **kwargs):
super(UserCredential, self).__init__(*args, **kwargs)
def clean(self):
"""
Validate that the submited :attr:`username` and :attr:`password` are valid
:raises django.forms.ValidationError: if the :attr:`username` and :attr:`password`
are not valid.
:return: The cleaned POST data
:rtype: dict
"""
cleaned_data = super(UserCredential, self).clean()
auth = utils.import_attr(settings.CAS_AUTH_CLASS)(cleaned_data.get("username"))
if auth.test_password(cleaned_data.get("password")):
@ -73,17 +108,51 @@ class UserCredential(forms.Form):
class FederateUserCredential(UserCredential):
"""Form used on the login page to retrive user credentials"""
"""
Bases: :class:`UserCredential`
Form used on a auto submited page for linking the views
:class:`FederateAuth<cas_server.views.FederateAuth>` and
:class:`LoginView<cas_server.views.LoginView>`.
On successful authentication on a provider, in the view
:class:`FederateAuth<cas_server.views.FederateAuth>` a
:class:`FederatedUser<cas_server.models.FederatedUser>` is created by
:meth:`cas_server.federate.CASFederateValidateUser.verify_ticket` and the user is redirected
to :class:`LoginView<cas_server.views.LoginView>`. This form is then automatically filled
with infos matching the created :class:`FederatedUser<cas_server.models.FederatedUser>`
using the ``ticket`` as one time password and submited using javascript. If javascript is
not enabled, a connect button is displayed.
This stub authentication form, allow to implement the federated mode with very few
modificatons to the :class:`LoginView<cas_server.views.LoginView>` view.
"""
#: the user username with the ``@`` component
username = forms.CharField(widget=forms.HiddenInput())
#: The service url for which the user want a ticket
service = forms.CharField(widget=forms.HiddenInput(), required=False)
#: The ``ticket`` used to authenticate the user against a provider
password = forms.CharField(widget=forms.HiddenInput())
#: alias of :attr:`password`
ticket = forms.CharField(widget=forms.HiddenInput())
#: A valid LoginTicket to prevent POST replay
lt = forms.CharField(widget=forms.HiddenInput(), required=False)
method = forms.CharField(widget=forms.HiddenInput(), required=False)
#: Has the user asked to be warn before emiting a ticket for another service
warn = forms.BooleanField(widget=forms.HiddenInput(), required=False)
#: Is the service asking the authentication renewal ?
renew = forms.BooleanField(widget=forms.HiddenInput(), required=False)
def clean(self):
"""
Validate that the submited :attr:`username` and :attr:`password` are valid using
the :class:`CASFederateAuth<cas_server.auth.CASFederateAuth>` auth class.
:raises django.forms.ValidationError: if the :attr:`username` and :attr:`password`
do not correspond to a :class:`FederatedUser<cas_server.models.FederatedUser>`.
:return: The cleaned POST data
:rtype: dict
"""
cleaned_data = super(FederateUserCredential, self).clean()
try:
user = models.FederatedUser.get_from_federated_username(cleaned_data["username"])
@ -99,7 +168,11 @@ class FederateUserCredential(UserCredential):
class TicketForm(forms.ModelForm):
"""Form for Tickets in the admin interface"""
"""
Bases: :class:`django.forms.ModelForm`
Form for Tickets in the admin interface
"""
class Meta:
model = models.Ticket
exclude = []

View File

@ -10,7 +10,7 @@
#
# (c) 2015-2016 Valentin Samir
"""models for the app"""
from .default_settings import settings
from .default_settings import settings, SessionStore
from django.db import models
from django.db.models import Q
@ -23,36 +23,42 @@ from picklefield.fields import PickledObjectField
import re
import sys
import logging
from importlib import import_module
from datetime import timedelta
from concurrent.futures import ThreadPoolExecutor
from requests_futures.sessions import FuturesSession
import cas_server.utils as utils
SessionStore = import_module(settings.SESSION_ENGINE).SessionStore
#: logger facility
logger = logging.getLogger(__name__)
@python_2_unicode_compatible
class FederatedIendityProvider(models.Model):
"""
Bases: :class:`django.db.models.Model`
An identity provider for the federated mode
"""
class Meta:
verbose_name = _(u"identity provider")
verbose_name_plural = _(u"identity providers")
#: Suffix append to backend CAS returned username: ``returned_username`` @ ``suffix``.
#: it must be unique.
suffix = models.CharField(
max_length=30,
unique=True,
verbose_name=_(u"suffix"),
help_text=_(
u"Suffix append to backend CAS returner "
u"Suffix append to backend CAS returned "
u"username: ``returned_username`` @ ``suffix``."
)
)
#: URL to the root of the CAS server application. If login page is
#: https://cas.example.net/cas/login then :attr:`server_url` should be
#: https://cas.example.net/cas/
server_url = models.CharField(max_length=255, verbose_name=_(u"server url"))
#: Version of the CAS protocol to use when sending requests the the backend CAS.
cas_protocol_version = models.CharField(
max_length=30,
choices=[
@ -67,11 +73,14 @@ class FederatedIendityProvider(models.Model):
),
default="3"
)
#: Name for this identity provider displayed on the login page.
verbose_name = models.CharField(
max_length=255,
verbose_name=_(u"verbose name"),
help_text=_(u"Name for this identity provider displayed on the login page.")
)
#: Position of the identity provider on the login page. Identity provider are sorted using the
#: (:attr:`pos`, :attr:`verbose_name`, :attr:`suffix`) attributes.
pos = models.IntegerField(
default=100,
verbose_name=_(u"position"),
@ -83,6 +92,9 @@ class FederatedIendityProvider(models.Model):
)
)
)
#: Display the provider on the login page. Beware that this do not disable the identity
#: provider, it just hide it on the login page. User will always be able to log in using this
#: provider by fetching ``/federate/suffix``.
display = models.BooleanField(
default=True,
verbose_name=_(u"display"),
@ -99,23 +111,40 @@ class FederatedIendityProvider(models.Model):
:param unicode username: A CAS backend returned username
:param unicode suffix: A suffix identifying the CAS backend
:return: The federated username: ``username`` @ ``suffix``.
:rtype: unicode
"""
return u'%s@%s' % (username, suffix)
def build_username(self, username):
"""Transform backend username into federated username"""
"""
Transform backend username into federated username
:param unicode username: A CAS backend returned username
:return: The federated username: ``username`` @ :attr:`suffix`.
:rtype: unicode
"""
return u'%s@%s' % (username, self.suffix)
@python_2_unicode_compatible
class FederatedUser(models.Model):
"""A federated user as returner by a CAS provider (username and attributes)"""
"""
Bases: :class:`django.db.models.Model`
A federated user as returner by a CAS provider (username and attributes)
"""
class Meta:
unique_together = ("username", "provider")
#: The user username returned by the CAS backend on successful ticket validation
username = models.CharField(max_length=124)
#: A foreign key to :class:`FederatedIendityProvider`
provider = models.ForeignKey(FederatedIendityProvider, on_delete=models.CASCADE)
#: The user attributes returned by the CAS backend on successful ticket validation
attributs = PickledObjectField()
#: The last ticket used to authenticate :attr:`username` against :attr:`provider`
ticket = models.CharField(max_length=255)
#: Last update timespampt. Usually, the last time :attr:`ticket` has been set.
last_update = models.DateTimeField(auto_now=True)
def __str__(self):
@ -123,12 +152,15 @@ class FederatedUser(models.Model):
@property
def federated_username(self):
"""return the federated username with a suffix"""
"""The federated username with a suffix for the current :class:`FederatedUser`."""
return self.provider.build_username(self.username)
@classmethod
def get_from_federated_username(cls, username):
"""return a FederatedUser object from a federated username"""
"""
:return: A :class:`FederatedUser` object from a federated ``username``
:rtype: :class:`FederatedUser`
"""
if username is None:
raise cls.DoesNotExist()
else:
@ -143,7 +175,7 @@ class FederatedUser(models.Model):
@classmethod
def clean_old_entries(cls):
"""remove old unused federated users"""
"""remove old unused :class:`FederatedUser`"""
federated_users = cls.objects.filter(
last_update__lt=(timezone.now() - timedelta(seconds=settings.CAS_TICKET_TIMEOUT))
)
@ -154,16 +186,23 @@ class FederatedUser(models.Model):
class FederateSLO(models.Model):
"""An association between a CAS provider ticket and a (username, session) for processing SLO"""
"""
Bases: :class:`django.db.models.Model`
An association between a CAS provider ticket and a (username, session) for processing SLO
"""
class Meta:
unique_together = ("username", "session_key", "ticket")
#: the federated username with the ``@``component
username = models.CharField(max_length=30)
#: the session key for the session :attr:`username` has been authenticated using :attr:`ticket`
session_key = models.CharField(max_length=40, blank=True, null=True)
#: The ticket used to authenticate :attr:`username`
ticket = models.CharField(max_length=255, db_index=True)
@classmethod
def clean_deleted_sessions(cls):
"""remove old object for which the session do not exists anymore"""
"""remove old :class:`FederateSLO` object for which the session do not exists anymore"""
for federate_slo in cls.objects.all():
if not SessionStore(session_key=federate_slo.session_key).get('authenticated'):
federate_slo.delete()
@ -171,17 +210,27 @@ class FederateSLO(models.Model):
@python_2_unicode_compatible
class User(models.Model):
"""A user logged into the CAS"""
"""
Bases: :class:`django.db.models.Model`
A user logged into the CAS
"""
class Meta:
unique_together = ("username", "session_key")
verbose_name = _("User")
verbose_name_plural = _("Users")
#: The session key of the current authenticated user
session_key = models.CharField(max_length=40, blank=True, null=True)
#: The username of the current authenticated user
username = models.CharField(max_length=30)
#: Last time the authenticated user has do something (auth, fetch ticket, etc…)
date = models.DateTimeField(auto_now=True)
def delete(self, *args, **kwargs):
"""remove the User"""
"""
Remove the current :class:`User`. If ``settings.CAS_FEDERATE`` is ``True``, also delete
the corresponding :class:`FederateSLO` object.
"""
if settings.CAS_FEDERATE:
FederateSLO.objects.filter(
username=self.username,
@ -191,7 +240,10 @@ class User(models.Model):
@classmethod
def clean_old_entries(cls):
"""Remove users inactive since more that SESSION_COOKIE_AGE"""
"""
Remove :class:`User` objects inactive since more that
:django:setting:`SESSION_COOKIE_AGE` and send corresponding SingleLogOut requests.
"""
users = cls.objects.filter(
date__lt=(timezone.now() - timedelta(seconds=settings.SESSION_COOKIE_AGE))
)
@ -201,7 +253,7 @@ class User(models.Model):
@classmethod
def clean_deleted_sessions(cls):
"""Remove user where the session do not exists anymore"""
"""Remove :class:`User` objects where the corresponding session do not exists anymore."""
for user in cls.objects.all():
if not SessionStore(session_key=user.session_key).get('authenticated'):
user.logout()
@ -209,14 +261,22 @@ class User(models.Model):
@property
def attributs(self):
"""return a fresh dict for the user attributs"""
"""
Property.
A fresh :class:`dict` for the user attributes, using ``settings.CAS_AUTH_CLASS``
"""
return utils.import_attr(settings.CAS_AUTH_CLASS)(self.username).attributs()
def __str__(self):
return u"%s - %s" % (self.username, self.session_key)
def logout(self, request=None):
"""Sending SLO request to all services the user logged in"""
"""
Send SLO requests to all services the user is logged in.
:param request: The current django HttpRequest to display possible failure to the user.
:type request: :class:`django.http.HttpRequest` or :obj:`NoneType<types.NoneType>`
"""
async_list = []
session = FuturesSession(
executor=ThreadPoolExecutor(max_workers=settings.CAS_SLO_MAX_PARALLEL_REQUESTS)
@ -249,9 +309,22 @@ class User(models.Model):
def get_ticket(self, ticket_class, service, service_pattern, renew):
"""
Generate a ticket using `ticket_class` for the service
`service` matching `service_pattern` and asking or not for
authentication renewal with `renew`
Generate a ticket using ``ticket_class`` for the service
``service`` matching ``service_pattern`` and asking or not for
authentication renewal with ``renew``
:param type ticket_class: :class:`ServiceTicket` or :class:`ProxyTicket` or
:class:`ProxyGrantingTicket`.
:param unicode service: The service url for which we want a ticket.
:param ServicePattern service_pattern: The service pattern matching ``service``.
Beware that ``service`` must match :attr:`ServicePattern.pattern` and the current
:class:`User` must pass :meth:`ServicePattern.check_user`. These checks are not done
here and you must perform them before calling this method.
:param bool renew: Should be ``True`` if authentication has been renewed. Must be
``False`` otherwise.
:return: A :class:`Ticket` object.
:rtype: :class:`ServiceTicket` or :class:`ProxyTicket` or
:class:`ProxyGrantingTicket`.
"""
attributs = dict(
(a.name, a.replace if a.replace else a.name) for a in service_pattern.attributs.all()
@ -286,8 +359,20 @@ class User(models.Model):
return ticket
def get_service_url(self, service, service_pattern, renew):
"""Return the url to which the user must be redirected to
after a Service Ticket has been generated"""
"""
Return the url to which the user must be redirected to
after a Service Ticket has been generated
:param unicode service: The service url for which we want a ticket.
:param ServicePattern service_pattern: The service pattern matching ``service``.
Beware that ``service`` must match :attr:`ServicePattern.pattern` and the current
:class:`User` must pass :meth:`ServicePattern.check_user`. These checks are not done
here and you must perform them before calling this method.
:param bool renew: Should be ``True`` if authentication has been renewed. Must be
``False`` otherwise.
:return unicode: The service url with the ticket GET param added.
:rtype: unicode
"""
ticket = self.get_ticket(ServiceTicket, service, service_pattern, renew)
url = utils.update_url(service, {'ticket': ticket.value})
logger.info("Service ticket created for service %s by user %s." % (service, self.username))
@ -295,41 +380,60 @@ class User(models.Model):
class ServicePatternException(Exception):
"""Base exception of exceptions raised in the ServicePattern model"""
"""
Bases: :class:`exceptions.Exception`
Base exception of exceptions raised in the ServicePattern model"""
pass
class BadUsername(ServicePatternException):
"""Exception raised then an non allowed username
try to get a ticket for a service"""
"""
Bases: :class:`ServicePatternException`
Exception raised then an non allowed username try to get a ticket for a service
"""
pass
class BadFilter(ServicePatternException):
""""Exception raised then a user try
to get a ticket for a service and do not reach a condition"""
"""
Bases: :class:`ServicePatternException`
Exception raised then a user try to get a ticket for a service and do not reach a condition
"""
pass
class UserFieldNotDefined(ServicePatternException):
"""Exception raised then a user try to get a ticket for a service
using as username an attribut not present on this user"""
"""
Bases: :class:`ServicePatternException`
Exception raised then a user try to get a ticket for a service using as username
an attribut not present on this user
"""
pass
@python_2_unicode_compatible
class ServicePattern(models.Model):
"""Allowed services pattern agains services are tested to"""
"""
Bases: :class:`django.db.models.Model`
Allowed services pattern agains services are tested to
"""
class Meta:
ordering = ("pos", )
verbose_name = _("Service pattern")
verbose_name_plural = _("Services patterns")
#: service patterns are sorted using the :attr:`pos` attribute
pos = models.IntegerField(
default=100,
verbose_name=_(u"position"),
help_text=_(u"service patterns are sorted using the position attribute")
)
#: A name for the service (this can bedisplayed to the user on the login page)
name = models.CharField(
max_length=255,
unique=True,
@ -338,6 +442,9 @@ class ServicePattern(models.Model):
verbose_name=_(u"name"),
help_text=_(u"A name for the service")
)
#: A regular expression matching services. "Will usually looks like
#: '^https://some\\.server\\.com/path/.*$'. As it is a regular expression, special character
#: must be escaped with a '\\'.
pattern = models.CharField(
max_length=255,
unique=True,
@ -348,6 +455,7 @@ class ServicePattern(models.Model):
"As it is a regular expression, special character must be escaped with a '\\'."
)
)
#: Name of the attribut to transmit as username, if empty the user login is used
user_field = models.CharField(
max_length=255,
default="",
@ -355,27 +463,35 @@ class ServicePattern(models.Model):
verbose_name=_(u"user field"),
help_text=_("Name of the attribut to transmit as username, empty = login")
)
#: A boolean allowing to limit username allowed to connect to :attr:`usernames`.
restrict_users = models.BooleanField(
default=False,
verbose_name=_(u"restrict username"),
help_text=_("Limit username allowed to connect to the list provided bellow")
)
#: A boolean allowing to deliver :class:`ProxyTicket` to the service.
proxy = models.BooleanField(
default=False,
verbose_name=_(u"proxy"),
help_text=_("Proxy tickets can be delivered to the service")
)
#: A boolean allowing the service to be used as a proxy callback (via the pgtUrl GET param)
#: to deliver :class:`ProxyGrantingTicket`.
proxy_callback = models.BooleanField(
default=False,
verbose_name=_(u"proxy callback"),
help_text=_("can be used as a proxy callback to deliver PGT")
)
#: Enable SingleLogOut for the service. Old validaed tickets for the service will be kept
#: until ``settings.CAS_TICKET_TIMEOUT`` after what a SLO request is send to the service and
#: the ticket is purged from database. A SLO can be send earlier if the user log-out.
single_log_out = models.BooleanField(
default=False,
verbose_name=_(u"single log out"),
help_text=_("Enable SLO for the service")
)
#: An URL where the SLO request will be POST. If empty the service url will be used.
#: This is usefull for non HTTP proxied services like smtp or imap.
single_log_out_callback = models.CharField(
max_length=255,
default="",
@ -393,7 +509,15 @@ class ServicePattern(models.Model):
Check if ``user`` if allowed to use theses services. If ``user`` is not allowed,
raises one of :class:`BadFilter`, :class:`UserFieldNotDefined`, :class:`BadUsername`
:param user: a :class:`User` object
:param User user: a :class:`User` object
:raises BadUsername: if :attr:`restrict_users` if ``True`` and :attr:`User.username`
is not within :attr:`usernames`.
:raises BadFilter: if a :class:`FilterAttributValue` condition of :attr:`filters`
connot be verified.
:raises UserFieldNotDefined: if :attr:`user_field` is defined and its value is not
within :attr:`User.attributs`.
:return: ``True``
:rtype: bool
"""
if self.restrict_users and not self.usernames.filter(value=user.username):
logger.warning("Username %s not allowed on service %s" % (user.username, self.name))
@ -434,8 +558,15 @@ class ServicePattern(models.Model):
@classmethod
def validate(cls, service):
"""Check if a Service Patern match `service` and
return it, else raise `ServicePattern.DoesNotExist`"""
"""
Get a :class:`ServicePattern` intance from a service url.
:param unicode service: A service url
:return: A :class:`ServicePattern` instance matching ``service``.
:rtype: :class:`ServicePattern`
:raises ServicePattern.DoesNotExist: if no :class:`ServicePattern` is matching
``service``.
"""
for service_pattern in cls.objects.all().order_by('pos'):
if re.match(service_pattern.pattern, service):
return service_pattern
@ -445,12 +576,20 @@ class ServicePattern(models.Model):
@python_2_unicode_compatible
class Username(models.Model):
"""A list of allowed usernames on a service pattern"""
"""
Bases: :class:`django.db.models.Model`
A list of allowed usernames on a :class:`ServicePattern`
"""
#: username allowed to connect to the service
value = models.CharField(
max_length=255,
verbose_name=_(u"username"),
help_text=_(u"username allowed to connect to the service")
)
#: ForeignKey to a :class:`ServicePattern`. :class:`Username` instances for a
#: :class:`ServicePattern` are accessible thought its :attr:`ServicePattern.usernames`
#: attribute.
service_pattern = models.ForeignKey(ServicePattern, related_name="usernames")
def __str__(self):
@ -459,14 +598,23 @@ class Username(models.Model):
@python_2_unicode_compatible
class ReplaceAttributName(models.Model):
"""A list of replacement of attributs name for a service pattern"""
"""
Bases: :class:`django.db.models.Model`
A replacement of an attribute name for a :class:`ServicePattern`. It also tell to transmit
an attribute of :attr:`User.attributs` to the service. An empty :attr:`replace` mean
to use the original attribute name.
"""
class Meta:
unique_together = ('name', 'replace', 'service_pattern')
#: Name the attribute: a key of :attr:`User.attributs`
name = models.CharField(
max_length=255,
verbose_name=_(u"name"),
help_text=_(u"name of an attribut to send to the service, use * for all attributes")
)
#: The name of the attribute to transmit to the service. If empty, the value of :attr:`name`
#: is used.
replace = models.CharField(
max_length=255,
blank=True,
@ -474,6 +622,9 @@ class ReplaceAttributName(models.Model):
help_text=_(u"name under which the attribut will be show"
u"to the service. empty = default name of the attribut")
)
#: ForeignKey to a :class:`ServicePattern`. :class:`ReplaceAttributName` instances for a
#: :class:`ServicePattern` are accessible thought its :attr:`ServicePattern.attributs`
#: attribute.
service_pattern = models.ForeignKey(ServicePattern, related_name="attributs")
def __str__(self):
@ -485,17 +636,29 @@ class ReplaceAttributName(models.Model):
@python_2_unicode_compatible
class FilterAttributValue(models.Model):
"""A list of filter on attributs for a service pattern"""
"""
Bases: :class:`django.db.models.Model`
A filter on :attr:`User.attributs` for a :class:`ServicePattern`. If a :class:`User` do not
have an attribute :attr:`attribut` or its value do not match :attr:`pattern`, then
:meth:`ServicePattern.check_user` will raises :class:`BadFilter` if called with that user.
"""
#: The name of a user attribute
attribut = models.CharField(
max_length=255,
verbose_name=_(u"attribut"),
help_text=_(u"Name of the attribut which must verify pattern")
)
#: A regular expression the attribute :attr:`attribut` value must verify. If :attr:`attribut`
#: if a list, only one of the list values needs to match.
pattern = models.CharField(
max_length=255,
verbose_name=_(u"pattern"),
help_text=_(u"a regular expression")
)
#: ForeignKey to a :class:`ServicePattern`. :class:`FilterAttributValue` instances for a
#: :class:`ServicePattern` are accessible thought its :attr:`ServicePattern.filters`
#: attribute.
service_pattern = models.ForeignKey(ServicePattern, related_name="filters")
def __str__(self):
@ -504,23 +667,34 @@ class FilterAttributValue(models.Model):
@python_2_unicode_compatible
class ReplaceAttributValue(models.Model):
"""Replacement to apply on attributs values for a service pattern"""
"""
Bases: :class:`django.db.models.Model`
A replacement (using a regular expression) of an attribute value for a
:class:`ServicePattern`.
"""
#: Name the attribute: a key of :attr:`User.attributs`
attribut = models.CharField(
max_length=255,
verbose_name=_(u"attribut"),
help_text=_(u"Name of the attribut for which the value must be replace")
)
#: A regular expression matching the part of the attribute value that need to be changed
pattern = models.CharField(
max_length=255,
verbose_name=_(u"pattern"),
help_text=_(u"An regular expression maching whats need to be replaced")
)
#: The replacement to what is mached by :attr:`pattern`. groups are capture by \\1, \\2 …
replace = models.CharField(
max_length=255,
blank=True,
verbose_name=_(u"replace"),
help_text=_(u"replace expression, groups are capture by \\1, \\2 …")
)
#: ForeignKey to a :class:`ServicePattern`. :class:`ReplaceAttributValue` instances for a
#: :class:`ServicePattern` are accessible thought its :attr:`ServicePattern.replacements`
#: attribute.
service_pattern = models.ForeignKey(ServicePattern, related_name="replacements")
def __str__(self):
@ -529,19 +703,37 @@ class ReplaceAttributValue(models.Model):
@python_2_unicode_compatible
class Ticket(models.Model):
"""Generic class for a Ticket"""
"""
Bases: :class:`django.db.models.Model`
Generic class for a Ticket
"""
class Meta:
abstract = True
#: ForeignKey to a :class:`User`.
user = models.ForeignKey(User, related_name="%(class)s")
#: The user attributes to be transmited to the service on successful validation
attributs = PickledObjectField()
#: A boolean. ``True`` if the ticket has been validated
validate = models.BooleanField(default=False)
#: The service url for the ticket
service = models.TextField()
#: ForeignKey to a :class:`ServicePattern`. The :class:`ServicePattern` corresponding to
#: :attr:`service`. Use :meth:`ServicePattern.validate` to find it.
service_pattern = models.ForeignKey(ServicePattern, related_name="%(class)s")
#: Date of the ticket creation
creation = models.DateTimeField(auto_now_add=True)
#: A boolean. ``True`` if the user has just renew his authentication
renew = models.BooleanField(default=False)
#: A boolean. Set to :attr:`service_pattern` attribute
#: :attr:`ServicePattern.single_log_out` value.
single_log_out = models.BooleanField(default=False)
#: Max duration between ticket creation and its validation. Any validation attempt for the
#: ticket after :attr:`creation` + VALIDITY will fail as if the ticket do not exists.
VALIDITY = settings.CAS_TICKET_VALIDITY
#: Time we keep ticket with :attr:`single_log_out` set to ``True`` before sending SingleLogOut
#: requests.
TIMEOUT = settings.CAS_TICKET_TIMEOUT
def __str__(self):
@ -615,6 +807,14 @@ class Ticket(models.Model):
@staticmethod
def get_class(ticket):
"""
Return the ticket class of ``ticket``
:param unicode ticket: A ticket
:return: The class corresponding to ``ticket`` (:class:`ServiceTicket` or
:class:`ProxyTicket` or :class:`ProxyGrantingTicket`) if found, ``None`` otherwise.
:rtype: :obj:`type` or :obj:`NoneType<types.NoneType>`
"""
for ticket_class in [ServiceTicket, ProxyTicket, ProxyGrantingTicket]:
if ticket.startswith(ticket_class.PREFIX):
return ticket_class
@ -622,8 +822,14 @@ class Ticket(models.Model):
@python_2_unicode_compatible
class ServiceTicket(Ticket):
"""A Service Ticket"""
"""
Bases: :class:`Ticket`
A Service Ticket
"""
#: The ticket prefix used to differentiate it from other tickets types
PREFIX = settings.CAS_SERVICE_TICKET_PREFIX
#: The ticket value
value = models.CharField(max_length=255, default=utils.gen_st, unique=True)
def __str__(self):
@ -632,8 +838,14 @@ class ServiceTicket(Ticket):
@python_2_unicode_compatible
class ProxyTicket(Ticket):
"""A Proxy Ticket"""
"""
Bases: :class:`Ticket`
A Proxy Ticket
"""
#: The ticket prefix used to differentiate it from other tickets types
PREFIX = settings.CAS_PROXY_TICKET_PREFIX
#: The ticket value
value = models.CharField(max_length=255, default=utils.gen_pt, unique=True)
def __str__(self):
@ -642,9 +854,17 @@ class ProxyTicket(Ticket):
@python_2_unicode_compatible
class ProxyGrantingTicket(Ticket):
"""A Proxy Granting Ticket"""
"""
Bases: :class:`Ticket`
A Proxy Granting Ticket
"""
#: The ticket prefix used to differentiate it from other tickets types
PREFIX = settings.CAS_PROXY_GRANTING_TICKET_PREFIX
#: ProxyGranting ticket are never validated. However, they can be used during :attr:`VALIDITY`
#: to get :class:`ProxyTicket` for :attr:`user`
VALIDITY = settings.CAS_PGT_VALIDITY
#: The ticket value
value = models.CharField(max_length=255, default=utils.gen_pgt, unique=True)
def __str__(self):
@ -653,10 +873,18 @@ class ProxyGrantingTicket(Ticket):
@python_2_unicode_compatible
class Proxy(models.Model):
"""A list of proxies on `ProxyTicket`"""
"""
Bases: :class:`django.db.models.Model`
A list of proxies on :class:`ProxyTicket`
"""
class Meta:
ordering = ("-pk", )
#: Service url of the PGT used for getting the associated :class:`ProxyTicket`
url = models.CharField(max_length=255)
#: ForeignKey to a :class:`ProxyTicket`. :class:`Proxy` instances for a
#: :class:`ProxyTicket` are accessible thought its :attr:`ProxyTicket.proxies`
#: attribute.
proxy_ticket = models.ForeignKey(ProxyTicket, related_name="proxies")
def __str__(self):

View File

@ -30,13 +30,27 @@ from six.moves.urllib.parse import urlparse, urlunparse, parse_qsl, urlencode
def context(params):
"""Function that add somes variable to the context before template rendering"""
"""
Function that add somes variable to the context before template rendering
:param dict params: The context dictionary used to render templates.
:return: The ``params`` dictionary with the key ``settings`` set to
:obj:`django.conf.settings`.
:rtype: dict
"""
params["settings"] = settings
return params
def json_response(request, data):
"""Wrapper dumping `data` to a json and sending it to the user with an HttpResponse"""
"""
Wrapper dumping `data` to a json and sending it to the user with an HttpResponse
:param django.http.HttpRequest request: The request object used to generate this response.
:param dict data: The python dictionnary to return as a json
:return: The content of ``data`` serialized in json
:rtype: django.http.HttpResponse
"""
data["messages"] = []
for msg in messages.get_messages(request):
data["messages"].append({'message': msg.message, 'level': msg.level_tag})
@ -44,7 +58,13 @@ def json_response(request, data):
def import_attr(path):
"""transform a python module.attr path to the attr"""
"""
transform a python dotted path to the attr
:param path: A dotted path to a python object or a python object
:type path: :obj:`unicode` or anything
:return: The python object pointed by the dotted path or the python object unchanged
"""
if not isinstance(path, str):
return path
if "." not in path:
@ -59,24 +79,50 @@ def import_attr(path):
def redirect_params(url_name, params=None):
"""Redirect to `url_name` with `params` as querystring"""
"""
Redirect to ``url_name`` with ``params`` as querystring
:param unicode url_name: a URL pattern name
:param params: Some parameter to append to the reversed URL
:type params: :obj:`dict` or :obj:`NoneType<types.NoneType>`
:return: A redirection to the URL with name ``url_name`` with ``params`` as querystring.
:rtype: django.http.HttpResponseRedirect
"""
url = reverse(url_name)
params = urlencode(params if params else {})
return HttpResponseRedirect(url + "?%s" % params)
def reverse_params(url_name, params=None, **kwargs):
"""compule the reverse url or `url_name` and add GET parameters from `params` to it"""
"""
compute the reverse url of ``url_name`` and add to it parameters from ``params``
as querystring
:param unicode url_name: a URL pattern name
:param params: Some parameter to append to the reversed URL
:type params: :obj:`dict` or :obj:`NoneType<types.NoneType>`
:param **kwargs: additional parameters needed to compure the reverse URL
:return: The computed reverse URL of ``url_name`` with possible querystring from ``params``
:rtype: unicode
"""
url = reverse(url_name, **kwargs)
params = urlencode(params if params else {})
if params:
return url + "?%s" % params
return u"%s?%s" % (url, params)
else:
return url
def copy_params(get_or_post_params, ignore=None):
"""copy from a dictionnary like `get_or_post_params` ignoring keys in the set `ignore`"""
"""
copy a :class:`django.http.QueryDict` in a :obj:`dict` ignoring keys in the set ``ignore``
:param django.http.QueryDict get_or_post_params: A GET or POST
:class:`QueryDict<django.http.QueryDict>`
:param set ignore: An optinal set of keys to ignore during the copy
:return: A copy of get_or_post_params
:rtype: dict
"""
if ignore is None:
ignore = set()
params = {}
@ -87,7 +133,14 @@ def copy_params(get_or_post_params, ignore=None):
def set_cookie(response, key, value, max_age):
"""Set the cookie `key` on `response` with value `value` valid for `max_age` secondes"""
"""
Set the cookie ``key`` on ``response`` with value ``value`` valid for ``max_age`` secondes
:param django.http.HttpResponse response: a django response where to set the cookie
:param unicode key: the cookie key
:param unicode value: the cookie value
:param int max_age: the maximum validity age of the cookie
"""
expires = datetime.strftime(
datetime.utcnow() + timedelta(seconds=max_age),
"%a, %d-%b-%Y %H:%M:%S GMT"
@ -103,20 +156,36 @@ def set_cookie(response, key, value, max_age):
def get_current_url(request, ignore_params=None):
"""Giving a django request, return the current http url, possibly ignoring some GET params"""
"""
Giving a django request, return the current http url, possibly ignoring some GET parameters
:param django.http.HttpRequest request: The current request object.
:param set ignore_params: An optional set of GET parameters to ignore
:return: The URL of the current page, possibly omitting some parameters from
``ignore_params`` in the querystring.
:rtype: unicode
"""
if ignore_params is None:
ignore_params = set()
protocol = 'https' if request.is_secure() else "http"
service_url = "%s://%s%s" % (protocol, request.get_host(), request.path)
protocol = u'https' if request.is_secure() else u"http"
service_url = u"%s://%s%s" % (protocol, request.get_host(), request.path)
if request.GET:
params = copy_params(request.GET, ignore_params)
if params:
service_url += "?%s" % urlencode(params)
service_url += u"?%s" % urlencode(params)
return service_url
def update_url(url, params):
"""update params in the `url` query string"""
"""
update parameters using ``params`` in the ``url`` query string
:param url: An URL possibily with a querystring
:type url: :obj:`unicode` or :obj:`str`
:param dict params: A dictionary of parameters for updating the url querystring
:return: The URL with an updated querystring
:rtype: unicode
"""
if not isinstance(url, bytes):
url = url.encode('utf-8')
for key, value in list(params.items()):
@ -140,7 +209,12 @@ def update_url(url, params):
def unpack_nested_exception(error):
"""If exception are stacked, return the first one"""
"""
If exception are stacked, return the first one
:param error: A python exception with possible exception embeded within
:return: A python exception with no exception embeded within
"""
i = 0
while True:
if error.args[i:]:
@ -154,52 +228,97 @@ def unpack_nested_exception(error):
return error
def _gen_ticket(prefix, lg=settings.CAS_TICKET_LEN):
"""Generate a ticket with prefix `prefix`"""
return '%s-%s' % (
prefix,
''.join(
random.choice(
string.ascii_letters + string.digits
) for _ in range(lg - len(prefix) - 1)
)
def _gen_ticket(prefix=None, lg=settings.CAS_TICKET_LEN):
"""
Generate a ticket with prefix ``prefix`` and length ``lg``
:param unicode prefix: An optional prefix (probably ST, PT, PGT or PGTIOU)
:param int lg: The length of the generated ticket (with the prefix)
:return: A randomlly generated ticket of length ``lg``
:rtype: unicode
"""
random_part = u''.join(
random.choice(
string.ascii_letters + string.digits
) for _ in range(lg - len(prefix or "") - 1)
)
if prefix is not None:
return u'%s-%s' % (prefix, random_part)
else:
return random_part
def gen_lt():
"""Generate a Service Ticket"""
"""
Generate a Login Ticket
:return: A ticket with prefix ``settings.CAS_LOGIN_TICKET_PREFIX`` and length
``settings.CAS_LT_LEN``
:rtype: unicode
"""
return _gen_ticket(settings.CAS_LOGIN_TICKET_PREFIX, settings.CAS_LT_LEN)
def gen_st():
"""Generate a Service Ticket"""
"""
Generate a Service Ticket
:return: A ticket with prefix ``settings.CAS_SERVICE_TICKET_PREFIX`` and length
``settings.CAS_ST_LEN``
:rtype: unicode
"""
return _gen_ticket(settings.CAS_SERVICE_TICKET_PREFIX, settings.CAS_ST_LEN)
def gen_pt():
"""Generate a Proxy Ticket"""
"""
Generate a Proxy Ticket
:return: A ticket with prefix ``settings.CAS_PROXY_TICKET_PREFIX`` and length
``settings.CAS_PT_LEN``
:rtype: unicode
"""
return _gen_ticket(settings.CAS_PROXY_TICKET_PREFIX, settings.CAS_PT_LEN)
def gen_pgt():
"""Generate a Proxy Granting Ticket"""
"""
Generate a Proxy Granting Ticket
:return: A ticket with prefix ``settings.CAS_PROXY_GRANTING_TICKET_PREFIX`` and length
``settings.CAS_PGT_LEN``
:rtype: unicode
"""
return _gen_ticket(settings.CAS_PROXY_GRANTING_TICKET_PREFIX, settings.CAS_PGT_LEN)
def gen_pgtiou():
"""Generate a Proxy Granting Ticket IOU"""
"""
Generate a Proxy Granting Ticket IOU
:return: A ticket with prefix ``settings.CAS_PROXY_GRANTING_TICKET_IOU_PREFIX`` and length
``settings.CAS_PGTIOU_LEN``
:rtype: unicode
"""
return _gen_ticket(settings.CAS_PROXY_GRANTING_TICKET_IOU_PREFIX, settings.CAS_PGTIOU_LEN)
def gen_saml_id():
"""Generate an saml id"""
return _gen_ticket('_')
"""
Generate an saml id
:return: A random id of length ``settings.CAS_TICKET_LEN``
:rtype: unicode
"""
return _gen_ticket()
def get_tuple(nuplet, index, default=None):
"""
return the value in index `index` of the tuple `nuplet` if it exists,
else return `default`
:param tuple nuplet: A tuple
:param int index: An index
:param default: An optional default value
:return: ``nuplet[index]`` if defined, else ``default`` (possibly ``None``)
"""
if nuplet is None:
return default
@ -210,7 +329,13 @@ def get_tuple(nuplet, index, default=None):
def crypt_salt_is_valid(salt):
"""Return True is salt is valid has a crypt salt, False otherwise"""
"""
Validate a salt as crypt salt
:param str salt: a password salt
:return: ``True`` if ``salt`` is a valid crypt salt on this system, ``False`` otherwise
:rtype: bool
"""
if len(salt) < 2:
return False
else:
@ -231,11 +356,17 @@ def crypt_salt_is_valid(salt):
class LdapHashUserPassword(object):
"""Please see https://tools.ietf.org/id/draft-stroeder-hashed-userpassword-values-01.html"""
"""
Class to deal with hashed password as defined at
https://tools.ietf.org/id/draft-stroeder-hashed-userpassword-values-01.html
"""
#: valide schemes that require a salt
schemes_salt = {b"{SMD5}", b"{SSHA}", b"{SSHA256}", b"{SSHA384}", b"{SSHA512}", b"{CRYPT}"}
#: valide sschemes that require no slat
schemes_nosalt = {b"{MD5}", b"{SHA}", b"{SHA256}", b"{SHA384}", b"{SHA512}"}
#: map beetween scheme and hash function
_schemes_to_hash = {
b"{SMD5}": hashlib.md5,
b"{MD5}": hashlib.md5,
@ -249,6 +380,7 @@ class LdapHashUserPassword(object):
b"{SHA512}": hashlib.sha512
}
#: map between scheme and hash length
_schemes_to_len = {
b"{SMD5}": 16,
b"{SSHA}": 20,
@ -258,7 +390,10 @@ class LdapHashUserPassword(object):
}
class BadScheme(ValueError):
"""Error raised then the hash scheme is not in schemes_salt + schemes_nosalt"""
"""
Error raised then the hash scheme is not in
:attr:`LdapHashUserPassword.schemes_salt` + :attr:`LdapHashUserPassword.schemes_nosalt`
"""
pass
class BadHash(ValueError):
@ -266,14 +401,19 @@ class LdapHashUserPassword(object):
pass
class BadSalt(ValueError):
"""Error raised then with the scheme {CRYPT} the salt is invalid"""
"""Error raised then, with the scheme ``{CRYPT}``, the salt is invalid"""
pass
@classmethod
def _raise_bad_scheme(cls, scheme, valid, msg):
"""
Raise BadScheme error for `scheme`, possible valid scheme are
in `valid`, the error message is `msg`
Raise :attr:`BadScheme` error for ``scheme``, possible valid scheme are
in ``valid``, the error message is ``msg``
:param bytes scheme: A bad scheme
:param list valid: A list a valid scheme
:param str msg: The error template message
:raises LdapHashUserPassword.BadScheme: always
"""
valid_schemes = [s.decode() for s in valid]
valid_schemes.sort()
@ -281,7 +421,12 @@ class LdapHashUserPassword(object):
@classmethod
def _test_scheme(cls, scheme):
"""Test if a scheme is valide or raise BadScheme"""
"""
Test if a scheme is valide or raise BadScheme
:param bytes scheme: A scheme
:raises BadScheme: if ``scheme`` is not a valid scheme
"""
if scheme not in cls.schemes_salt and scheme not in cls.schemes_nosalt:
cls._raise_bad_scheme(
scheme,
@ -291,7 +436,12 @@ class LdapHashUserPassword(object):
@classmethod
def _test_scheme_salt(cls, scheme):
"""Test if the scheme need a salt or raise BadScheme"""
"""
Test if the scheme need a salt or raise BadScheme
:param bytes scheme: A scheme
:raises BadScheme: if ``scheme` require no salt
"""
if scheme not in cls.schemes_salt:
cls._raise_bad_scheme(
scheme,
@ -301,7 +451,12 @@ class LdapHashUserPassword(object):
@classmethod
def _test_scheme_nosalt(cls, scheme):
"""Test if the scheme need no salt or raise BadScheme"""
"""
Test if the scheme need no salt or raise BadScheme
:param bytes scheme: A scheme
:raises BadScheme: if ``scheme` require a salt
"""
if scheme not in cls.schemes_nosalt:
cls._raise_bad_scheme(
scheme,
@ -312,8 +467,15 @@ class LdapHashUserPassword(object):
@classmethod
def hash(cls, scheme, password, salt=None, charset="utf8"):
"""
Hash `password` with `scheme` using `salt`.
This three variable beeing encoded in `charset`.
Hash ``password`` with ``scheme`` using ``salt``.
This three variable beeing encoded in ``charset``.
:param bytes scheme: A valid scheme
:param bytes password: A byte string to hash using ``scheme``
:param bytes salt: An optional salt to use if ``scheme`` requires any
:param str charset: The encoding of ``scheme``, ``password`` and ``salt``
:return: The hashed password encoded with ``charset``
:rtype: bytes
"""
scheme = scheme.upper()
cls._test_scheme(scheme)
@ -339,7 +501,14 @@ class LdapHashUserPassword(object):
@classmethod
def get_scheme(cls, hashed_passord):
"""Return the scheme of `hashed_passord` or raise BadHash"""
"""
Return the scheme of ``hashed_passord`` or raise :attr:`BadHash`
:param bytes hashed_passord: A hashed password
:return: The scheme used by the hashed password
:rtype: bytes
:raises BadHash: if no valid scheme is found within ``hashed_passord``
"""
if not hashed_passord[0] == b'{'[0] or b'}' not in hashed_passord:
raise cls.BadHash("%r should start with the scheme enclosed with { }" % hashed_passord)
scheme = hashed_passord.split(b'}', 1)[0]
@ -348,7 +517,15 @@ class LdapHashUserPassword(object):
@classmethod
def get_salt(cls, hashed_passord):
"""Return the salt of `hashed_passord` possibly empty"""
"""
Return the salt of ``hashed_passord`` possibly empty
:param bytes hashed_passord: A hashed password
:return: The salt used by the hashed password (empty if no salt is used)
:rtype: bytes
:raises BadHash: if no valid scheme is found within ``hashed_passord`` or if the
hashed password is too short for the scheme found.
"""
scheme = cls.get_scheme(hashed_passord)
cls._test_scheme(scheme)
if scheme in cls.schemes_nosalt:
@ -364,8 +541,20 @@ class LdapHashUserPassword(object):
def check_password(method, password, hashed_password, charset):
"""
Check that `password` match `hashed_password` using `method`,
assuming the encoding is `charset`.
Check that ``password`` match `hashed_password` using ``method``,
assuming the encoding is ``charset``.
:param str method: on of ``"crypt"``, ``"ldap"``, ``"hex_md5"``, ``"hex_sha1"``,
``"hex_sha224"``, ``"hex_sha256"``, ``"hex_sha384"``, ``"hex_sha512"``, ``"plain"``
:param password: The user inputed password
:type password: :obj:`str` or :obj:`unicode`
:param hashed_password: The hashed password as stored in the database
:type hashed_password: :obj:`str` or :obj:`unicode`
:param str charset: The used char encoding (also used internally, so it must be valid for
the charset used by ``password`` even if it is inputed as an :obj:`unicode`)
:return: True if ``password`` match ``hashed_password`` using ``method``,
``False`` otherwise
:rtype: bool
"""
if not isinstance(password, six.binary_type):
password = password.encode(charset)

View File

@ -10,7 +10,7 @@
#
# (c) 2015-2016 Valentin Samir
"""views for the app"""
from .default_settings import settings
from .default_settings import settings, SessionStore
from django.shortcuts import render, redirect
from django.core.urlresolvers import reverse
@ -30,7 +30,6 @@ import pprint
import requests
from lxml import etree
from datetime import timedelta
from importlib import import_module
import cas_server.utils as utils
import cas_server.forms as forms
@ -41,8 +40,6 @@ from .models import ServiceTicket, ProxyTicket, ProxyGrantingTicket
from .models import ServicePattern, FederatedIendityProvider, FederatedUser
from .federate import CASFederateValidateUser
SessionStore = import_module(settings.SESSION_ENGINE).SessionStore
logger = logging.getLogger(__name__)