Use django admin application to add/modif identty providers when CAS_FEDERATE is True
This commit is contained in:
@ -23,27 +23,28 @@ from cas_server.tests.utils import get_auth_client
|
||||
|
||||
class BaseServicePattern(object):
|
||||
"""Mixing for setting up service pattern for testing"""
|
||||
def setup_service_patterns(self, proxy=False):
|
||||
@classmethod
|
||||
def setup_service_patterns(cls, proxy=False):
|
||||
"""setting up service pattern"""
|
||||
# For general purpose testing
|
||||
self.service = "https://www.example.com"
|
||||
self.service_pattern = models.ServicePattern.objects.create(
|
||||
cls.service = "https://www.example.com"
|
||||
cls.service_pattern = models.ServicePattern.objects.create(
|
||||
name="example",
|
||||
pattern="^https://www\.example\.com(/.*)?$",
|
||||
proxy=proxy,
|
||||
)
|
||||
models.ReplaceAttributName.objects.create(name="*", service_pattern=self.service_pattern)
|
||||
models.ReplaceAttributName.objects.create(name="*", service_pattern=cls.service_pattern)
|
||||
|
||||
# For testing the restrict_users attributes
|
||||
self.service_restrict_user_fail = "https://restrict_user_fail.example.com"
|
||||
self.service_pattern_restrict_user_fail = models.ServicePattern.objects.create(
|
||||
cls.service_restrict_user_fail = "https://restrict_user_fail.example.com"
|
||||
cls.service_pattern_restrict_user_fail = models.ServicePattern.objects.create(
|
||||
name="restrict_user_fail",
|
||||
pattern="^https://restrict_user_fail\.example\.com(/.*)?$",
|
||||
restrict_users=True,
|
||||
proxy=proxy,
|
||||
)
|
||||
self.service_restrict_user_success = "https://restrict_user_success.example.com"
|
||||
self.service_pattern_restrict_user_success = models.ServicePattern.objects.create(
|
||||
cls.service_restrict_user_success = "https://restrict_user_success.example.com"
|
||||
cls.service_pattern_restrict_user_success = models.ServicePattern.objects.create(
|
||||
name="restrict_user_success",
|
||||
pattern="^https://restrict_user_success\.example\.com(/.*)?$",
|
||||
restrict_users=True,
|
||||
@ -51,12 +52,12 @@ class BaseServicePattern(object):
|
||||
)
|
||||
models.Username.objects.create(
|
||||
value=settings.CAS_TEST_USER,
|
||||
service_pattern=self.service_pattern_restrict_user_success
|
||||
service_pattern=cls.service_pattern_restrict_user_success
|
||||
)
|
||||
|
||||
# For testing the user attributes filtering conditions
|
||||
self.service_filter_fail = "https://filter_fail.example.com"
|
||||
self.service_pattern_filter_fail = models.ServicePattern.objects.create(
|
||||
cls.service_filter_fail = "https://filter_fail.example.com"
|
||||
cls.service_pattern_filter_fail = models.ServicePattern.objects.create(
|
||||
name="filter_fail",
|
||||
pattern="^https://filter_fail\.example\.com(/.*)?$",
|
||||
proxy=proxy,
|
||||
@ -64,10 +65,10 @@ class BaseServicePattern(object):
|
||||
models.FilterAttributValue.objects.create(
|
||||
attribut="right",
|
||||
pattern="^admin$",
|
||||
service_pattern=self.service_pattern_filter_fail
|
||||
service_pattern=cls.service_pattern_filter_fail
|
||||
)
|
||||
self.service_filter_fail_alt = "https://filter_fail_alt.example.com"
|
||||
self.service_pattern_filter_fail_alt = models.ServicePattern.objects.create(
|
||||
cls.service_filter_fail_alt = "https://filter_fail_alt.example.com"
|
||||
cls.service_pattern_filter_fail_alt = models.ServicePattern.objects.create(
|
||||
name="filter_fail_alt",
|
||||
pattern="^https://filter_fail_alt\.example\.com(/.*)?$",
|
||||
proxy=proxy,
|
||||
@ -75,10 +76,10 @@ class BaseServicePattern(object):
|
||||
models.FilterAttributValue.objects.create(
|
||||
attribut="nom",
|
||||
pattern="^toto$",
|
||||
service_pattern=self.service_pattern_filter_fail_alt
|
||||
service_pattern=cls.service_pattern_filter_fail_alt
|
||||
)
|
||||
self.service_filter_success = "https://filter_success.example.com"
|
||||
self.service_pattern_filter_success = models.ServicePattern.objects.create(
|
||||
cls.service_filter_success = "https://filter_success.example.com"
|
||||
cls.service_pattern_filter_success = models.ServicePattern.objects.create(
|
||||
name="filter_success",
|
||||
pattern="^https://filter_success\.example\.com(/.*)?$",
|
||||
proxy=proxy,
|
||||
@ -86,26 +87,26 @@ class BaseServicePattern(object):
|
||||
models.FilterAttributValue.objects.create(
|
||||
attribut="email",
|
||||
pattern="^%s$" % re.escape(settings.CAS_TEST_ATTRIBUTES['email']),
|
||||
service_pattern=self.service_pattern_filter_success
|
||||
service_pattern=cls.service_pattern_filter_success
|
||||
)
|
||||
|
||||
# For testing the user_field attributes
|
||||
self.service_field_needed_fail = "https://field_needed_fail.example.com"
|
||||
self.service_pattern_field_needed_fail = models.ServicePattern.objects.create(
|
||||
cls.service_field_needed_fail = "https://field_needed_fail.example.com"
|
||||
cls.service_pattern_field_needed_fail = models.ServicePattern.objects.create(
|
||||
name="field_needed_fail",
|
||||
pattern="^https://field_needed_fail\.example\.com(/.*)?$",
|
||||
user_field="uid",
|
||||
proxy=proxy,
|
||||
)
|
||||
self.service_field_needed_success = "https://field_needed_success.example.com"
|
||||
self.service_pattern_field_needed_success = models.ServicePattern.objects.create(
|
||||
cls.service_field_needed_success = "https://field_needed_success.example.com"
|
||||
cls.service_pattern_field_needed_success = models.ServicePattern.objects.create(
|
||||
name="field_needed_success",
|
||||
pattern="^https://field_needed_success\.example\.com(/.*)?$",
|
||||
user_field="alias",
|
||||
proxy=proxy,
|
||||
)
|
||||
self.service_field_needed_success_alt = "https://field_needed_success_alt.example.com"
|
||||
self.service_pattern_field_needed_success = models.ServicePattern.objects.create(
|
||||
cls.service_field_needed_success_alt = "https://field_needed_success_alt.example.com"
|
||||
cls.service_pattern_field_needed_success = models.ServicePattern.objects.create(
|
||||
name="field_needed_success_alt",
|
||||
pattern="^https://field_needed_success_alt\.example\.com(/.*)?$",
|
||||
user_field="nom",
|
||||
@ -238,3 +239,17 @@ class CanLogin(object):
|
||||
self.assertTrue(client.session.get("username") is None)
|
||||
self.assertTrue(client.session.get("warn") is None)
|
||||
self.assertTrue(client.session.get("authenticated") is None)
|
||||
|
||||
|
||||
class FederatedIendityProviderModel(object):
|
||||
"""Mixin for test classes using the FederatedIendityProvider model"""
|
||||
@staticmethod
|
||||
def setup_federated_identity_provider(providers):
|
||||
"""setting up federated identity providers"""
|
||||
for suffix, (server_url, cas_protocol_version, verbose_name) in providers.items():
|
||||
models.FederatedIendityProvider.objects.create(
|
||||
suffix=suffix,
|
||||
server_url=server_url,
|
||||
cas_protocol_version=cas_protocol_version,
|
||||
verbose_name=verbose_name
|
||||
)
|
||||
|
@ -19,43 +19,37 @@ from django.test.utils import override_settings
|
||||
|
||||
from six.moves import reload_module
|
||||
|
||||
from cas_server import utils, forms
|
||||
from cas_server.tests.mixin import BaseServicePattern, CanLogin
|
||||
from cas_server import utils, models
|
||||
from cas_server.tests.mixin import BaseServicePattern, CanLogin, FederatedIendityProviderModel
|
||||
from cas_server.tests import utils as tests_utils
|
||||
|
||||
PROVIDERS = {
|
||||
"example.com": ("http://127.0.0.1:8080", 1, "Example dot com"),
|
||||
"example.org": ("http://127.0.0.1:8081", 2, "Example dot org"),
|
||||
"example.net": ("http://127.0.0.1:8082", 3, "Example dot net"),
|
||||
"example.test": ("http://127.0.0.1:8083", 'CAS_2_SAML_1_0'),
|
||||
"example.com": ("http://127.0.0.1:8080", '1', "Example dot com"),
|
||||
"example.org": ("http://127.0.0.1:8081", '2', "Example dot org"),
|
||||
"example.net": ("http://127.0.0.1:8082", '3', "Example dot net"),
|
||||
"example.test": ("http://127.0.0.1:8083", 'CAS_2_SAML_1_0', 'Example fot test'),
|
||||
}
|
||||
|
||||
PROVIDERS_LIST = list(PROVIDERS.keys())
|
||||
PROVIDERS_LIST.sort()
|
||||
|
||||
|
||||
@override_settings(
|
||||
CAS_FEDERATE=True,
|
||||
CAS_FEDERATE_PROVIDERS=PROVIDERS,
|
||||
CAS_FEDERATE_PROVIDERS_LIST=PROVIDERS_LIST,
|
||||
CAS_AUTH_CLASS="cas_server.auth.CASFederateAuth",
|
||||
# test with a non ascii username
|
||||
CAS_TEST_USER=u"dédé"
|
||||
)
|
||||
class FederateAuthLoginLogoutTestCase(TestCase, BaseServicePattern, CanLogin):
|
||||
class FederateAuthLoginLogoutTestCase(
|
||||
TestCase, BaseServicePattern, CanLogin, FederatedIendityProviderModel
|
||||
):
|
||||
"""tests for the views login logout and federate then the federated mode is enabled"""
|
||||
def setUp(self):
|
||||
"""Prepare the test context"""
|
||||
self.setup_service_patterns()
|
||||
reload_module(forms)
|
||||
self.setup_federated_identity_provider(PROVIDERS)
|
||||
|
||||
def test_default_settings(self):
|
||||
"""default settings should populated some default variable then CAS_FEDERATE is True"""
|
||||
provider_list = settings.CAS_FEDERATE_PROVIDERS_LIST
|
||||
del settings.CAS_FEDERATE_PROVIDERS_LIST
|
||||
del settings.CAS_AUTH_CLASS
|
||||
reload_module(default_settings)
|
||||
self.assertEqual(settings.CAS_FEDERATE_PROVIDERS_LIST, provider_list)
|
||||
self.assertEqual(settings.CAS_AUTH_CLASS, "cas_server.auth.CASFederateAuth")
|
||||
|
||||
def test_login_get_provider(self):
|
||||
@ -63,10 +57,10 @@ class FederateAuthLoginLogoutTestCase(TestCase, BaseServicePattern, CanLogin):
|
||||
client = Client()
|
||||
response = client.get("/login")
|
||||
self.assertEqual(response.status_code, 200)
|
||||
for key, value in settings.CAS_FEDERATE_PROVIDERS.items():
|
||||
for provider in models.FederatedIendityProvider.objects.all():
|
||||
self.assertTrue('<option value="%s">%s</option>' % (
|
||||
key,
|
||||
utils.get_tuple(value, 2, key)
|
||||
provider.suffix,
|
||||
provider.verbose_name
|
||||
) in response.content.decode("utf-8"))
|
||||
self.assertEqual(response.context['post_url'], '/federate')
|
||||
|
||||
@ -74,10 +68,11 @@ class FederateAuthLoginLogoutTestCase(TestCase, BaseServicePattern, CanLogin):
|
||||
"""test a successful login wrokflow"""
|
||||
tickets = []
|
||||
# choose the example.com provider
|
||||
for (provider, cas_port) in [
|
||||
for (suffix, cas_port) in [
|
||||
("example.com", 8080), ("example.org", 8081),
|
||||
("example.net", 8082), ("example.test", 8083)
|
||||
]:
|
||||
provider = models.FederatedIendityProvider.objects.get(suffix=suffix)
|
||||
# get a bare client
|
||||
client = Client()
|
||||
# fetch the login page
|
||||
@ -86,7 +81,7 @@ class FederateAuthLoginLogoutTestCase(TestCase, BaseServicePattern, CanLogin):
|
||||
self.assertEqual(response.context['post_url'], '/federate')
|
||||
# get current form parameter
|
||||
params = tests_utils.copy_form(response.context["form"])
|
||||
params['provider'] = provider
|
||||
params['provider'] = provider.suffix
|
||||
if remember:
|
||||
params['remember'] = 'on'
|
||||
# post the choosed provider
|
||||
@ -96,22 +91,22 @@ class FederateAuthLoginLogoutTestCase(TestCase, BaseServicePattern, CanLogin):
|
||||
if remember:
|
||||
self.assertEqual(response["Location"], '%s/federate/%s?remember=on' % (
|
||||
'http://testserver' if django.VERSION < (1, 9) else "",
|
||||
provider
|
||||
provider.suffix
|
||||
))
|
||||
else:
|
||||
self.assertEqual(response["Location"], '%s/federate/%s' % (
|
||||
'http://testserver' if django.VERSION < (1, 9) else "",
|
||||
provider
|
||||
provider.suffix
|
||||
))
|
||||
# let's follow the redirect
|
||||
response = client.get('/federate/%s' % provider)
|
||||
response = client.get('/federate/%s' % provider.suffix)
|
||||
# we are redirected to the provider CAS for authentication
|
||||
self.assertEqual(response.status_code, 302)
|
||||
self.assertEqual(
|
||||
response["Location"],
|
||||
"%s/login?service=http%%3A%%2F%%2Ftestserver%%2Ffederate%%2F%s" % (
|
||||
settings.CAS_FEDERATE_PROVIDERS[provider][0],
|
||||
provider
|
||||
provider.server_url,
|
||||
provider.suffix
|
||||
)
|
||||
)
|
||||
# let's generate a ticket
|
||||
@ -119,7 +114,7 @@ class FederateAuthLoginLogoutTestCase(TestCase, BaseServicePattern, CanLogin):
|
||||
# we lauch a dummy CAS server that only validate once for the service
|
||||
# http://testserver/federate/example.com with `ticket`
|
||||
tests_utils.DummyCAS.run(
|
||||
("http://testserver/federate/%s" % provider).encode("ascii"),
|
||||
("http://testserver/federate/%s" % provider.suffix).encode("ascii"),
|
||||
ticket.encode("ascii"),
|
||||
settings.CAS_TEST_USER.encode("utf8"),
|
||||
[],
|
||||
@ -127,7 +122,7 @@ class FederateAuthLoginLogoutTestCase(TestCase, BaseServicePattern, CanLogin):
|
||||
)
|
||||
# we normally provide a good ticket and should be redirected to /login as the ticket
|
||||
# get successfully validated again the dummy CAS
|
||||
response = client.get('/federate/%s' % provider, {'ticket': ticket})
|
||||
response = client.get('/federate/%s' % provider.suffix, {'ticket': ticket})
|
||||
self.assertEqual(response.status_code, 302)
|
||||
self.assertEqual(response["Location"], "%s/login" % (
|
||||
'http://testserver' if django.VERSION < (1, 9) else ""
|
||||
@ -143,7 +138,7 @@ class FederateAuthLoginLogoutTestCase(TestCase, BaseServicePattern, CanLogin):
|
||||
response = client.post("/login", params)
|
||||
# the user should now being authenticated using username test@`provider`
|
||||
self.assert_logged(
|
||||
client, response, username='%s@%s' % (settings.CAS_TEST_USER, provider)
|
||||
client, response, username=provider.build_username(settings.CAS_TEST_USER)
|
||||
)
|
||||
tickets.append((provider, ticket, client))
|
||||
|
||||
@ -198,7 +193,7 @@ class FederateAuthLoginLogoutTestCase(TestCase, BaseServicePattern, CanLogin):
|
||||
self.assertEqual(
|
||||
response["Location"],
|
||||
"%s/login?service=http%%3A%%2F%%2Ftestserver%%2Ffederate%%2F%s" % (
|
||||
settings.CAS_FEDERATE_PROVIDERS[good_provider][0],
|
||||
models.FederatedIendityProvider.objects.get(suffix=good_provider).server_url,
|
||||
good_provider
|
||||
)
|
||||
)
|
||||
@ -216,7 +211,7 @@ class FederateAuthLoginLogoutTestCase(TestCase, BaseServicePattern, CanLogin):
|
||||
self.assertEqual(
|
||||
response["Location"],
|
||||
"%s/login?service=http%%3A%%2F%%2Ftestserver%%2Ffederate%%2F%s" % (
|
||||
settings.CAS_FEDERATE_PROVIDERS[good_provider][0],
|
||||
models.FederatedIendityProvider.objects.get(suffix=good_provider).server_url,
|
||||
good_provider
|
||||
)
|
||||
)
|
||||
@ -234,45 +229,45 @@ class FederateAuthLoginLogoutTestCase(TestCase, BaseServicePattern, CanLogin):
|
||||
for (provider, ticket, client) in tickets:
|
||||
# SLO for an unkown ticket should do nothing
|
||||
response = client.post(
|
||||
"/federate/%s" % provider,
|
||||
"/federate/%s" % provider.suffix,
|
||||
{'logoutRequest': tests_utils.logout_request(utils.gen_st())}
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(response.content, b"ok")
|
||||
# Bad SLO format should do nothing
|
||||
response = client.post(
|
||||
"/federate/%s" % provider,
|
||||
"/federate/%s" % provider.suffix,
|
||||
{'logoutRequest': ""}
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(response.content, b"ok")
|
||||
# Bad SLO format should do nothing
|
||||
response = client.post(
|
||||
"/federate/%s" % provider,
|
||||
"/federate/%s" % provider.suffix,
|
||||
{'logoutRequest': "<root></root>"}
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(response.content, b"ok")
|
||||
response = client.get("/login")
|
||||
self.assert_logged(
|
||||
client, response, username='%s@%s' % (settings.CAS_TEST_USER, provider)
|
||||
client, response, username=provider.build_username(settings.CAS_TEST_USER)
|
||||
)
|
||||
|
||||
# SLO for a previously logged ticket should log out the user if CAS version is
|
||||
# 3 or 'CAS_2_SAML_1_0'
|
||||
response = client.post(
|
||||
"/federate/%s" % provider,
|
||||
"/federate/%s" % provider.suffix,
|
||||
{'logoutRequest': tests_utils.logout_request(ticket)}
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(response.content, b"ok")
|
||||
|
||||
response = client.get("/login")
|
||||
if settings.CAS_FEDERATE_PROVIDERS[provider][1] in {3, 'CAS_2_SAML_1_0'}: # support SLO
|
||||
if provider.cas_protocol_version in {'3', 'CAS_2_SAML_1_0'}: # support SLO
|
||||
self.assert_login_failed(client, response)
|
||||
else:
|
||||
self.assert_logged(
|
||||
client, response, username='%s@%s' % (settings.CAS_TEST_USER, provider)
|
||||
client, response, username=provider.build_username(settings.CAS_TEST_USER)
|
||||
)
|
||||
|
||||
def test_federate_logout(self):
|
||||
@ -287,7 +282,7 @@ class FederateAuthLoginLogoutTestCase(TestCase, BaseServicePattern, CanLogin):
|
||||
self.assertEqual(response.status_code, 302)
|
||||
self.assertEqual(
|
||||
response["Location"],
|
||||
"%s/logout" % settings.CAS_FEDERATE_PROVIDERS[provider][0]
|
||||
"%s/logout" % provider.server_url,
|
||||
)
|
||||
response = client.get("/login")
|
||||
self.assert_login_failed(client, response)
|
||||
@ -326,7 +321,7 @@ class FederateAuthLoginLogoutTestCase(TestCase, BaseServicePattern, CanLogin):
|
||||
self.assertEqual(response.status_code, 302)
|
||||
self.assertEqual(response["Location"], "%s/federate/%s" % (
|
||||
'http://testserver' if django.VERSION < (1, 9) else "",
|
||||
provider
|
||||
provider.suffix
|
||||
))
|
||||
|
||||
def test_login_bad_ticket(self):
|
||||
@ -338,7 +333,10 @@ class FederateAuthLoginLogoutTestCase(TestCase, BaseServicePattern, CanLogin):
|
||||
# get a bare client
|
||||
client = Client()
|
||||
session = client.session
|
||||
session["federate_username"] = '%s@%s' % (settings.CAS_TEST_USER, provider)
|
||||
session["federate_username"] = models.FederatedIendityProvider.build_username_from_suffix(
|
||||
settings.CAS_TEST_USER,
|
||||
provider
|
||||
)
|
||||
session["federate_ticket"] = utils.gen_st()
|
||||
if django.VERSION >= (1, 8):
|
||||
session.save()
|
||||
@ -351,9 +349,12 @@ class FederateAuthLoginLogoutTestCase(TestCase, BaseServicePattern, CanLogin):
|
||||
# POST, as (username, ticket) are not valid, we should get the federate login page
|
||||
response = client.post("/login", params)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
for key, value in settings.CAS_FEDERATE_PROVIDERS.items():
|
||||
self.assertTrue('<option value="%s">%s</option>' % (
|
||||
key,
|
||||
utils.get_tuple(value, 2, key)
|
||||
) in response.content.decode("utf-8"))
|
||||
for provider in models.FederatedIendityProvider.objects.all():
|
||||
self.assertIn(
|
||||
'<option value="%s">%s</option>' % (
|
||||
provider.suffix,
|
||||
provider.verbose_name
|
||||
),
|
||||
response.content.decode("utf-8")
|
||||
)
|
||||
self.assertEqual(response.context['post_url'], '/federate')
|
||||
|
@ -22,32 +22,39 @@ from importlib import import_module
|
||||
|
||||
from cas_server import models, utils
|
||||
from cas_server.tests.utils import get_auth_client, HttpParamsHandler
|
||||
from cas_server.tests.mixin import UserModels, BaseServicePattern
|
||||
from cas_server.tests.mixin import UserModels, BaseServicePattern, FederatedIendityProviderModel
|
||||
from cas_server.tests.test_federate import PROVIDERS
|
||||
|
||||
SessionStore = import_module(settings.SESSION_ENGINE).SessionStore
|
||||
|
||||
|
||||
class FederatedUserTestCase(TestCase, UserModels):
|
||||
class FederatedUserTestCase(TestCase, UserModels, FederatedIendityProviderModel):
|
||||
"""test for the federated user model"""
|
||||
def setUp(self):
|
||||
"""Prepare the test context"""
|
||||
self.setup_federated_identity_provider(PROVIDERS)
|
||||
|
||||
def test_clean_old_entries(self):
|
||||
"""tests for clean_old_entries that should delete federated user no longer used"""
|
||||
client = Client()
|
||||
client.get("/login")
|
||||
provider = models.FederatedIendityProvider.objects.get(suffix="example.com")
|
||||
models.FederatedUser.objects.create(
|
||||
username="test1", provider="example.com", attributs={}, ticket=""
|
||||
username="test1", provider=provider, attributs={}, ticket=""
|
||||
)
|
||||
models.FederatedUser.objects.create(
|
||||
username="test2", provider="example.com", attributs={}, ticket=""
|
||||
username="test2", provider=provider, attributs={}, ticket=""
|
||||
)
|
||||
models.FederatedUser.objects.all().update(
|
||||
last_update=(timezone.now() - timedelta(seconds=settings.CAS_TICKET_TIMEOUT + 10))
|
||||
)
|
||||
models.FederatedUser.objects.create(
|
||||
username="test3", provider="example.com", attributs={}, ticket=""
|
||||
username="test3", provider=provider, attributs={}, ticket=""
|
||||
)
|
||||
models.User.objects.create(
|
||||
username="test1@example.com", session_key=client.session.session_key
|
||||
)
|
||||
self.assertEqual(len(models.FederatedUser.objects.all()), 3)
|
||||
models.FederatedUser.clean_old_entries()
|
||||
self.assertEqual(len(models.FederatedUser.objects.all()), 2)
|
||||
with self.assertRaises(models.FederatedUser.DoesNotExist):
|
||||
|
Reference in New Issue
Block a user