1
0
mirror of https://gitlab.crans.org/bde/nk20 synced 2025-11-18 04:17:48 +01:00

Compare commits

...

15 Commits

Author SHA1 Message Date
quark
9998189dbf token access 2025-11-09 14:48:29 +01:00
quark
08593700fc implicit flow #137 2025-11-09 11:18:11 +01:00
quark
54d28b30e5 Authorization Code Flow #137 2025-11-08 23:12:42 +01:00
quark
c09f133652 ropb implementation #137 2025-11-07 18:46:55 +01:00
quark
bfd50e3cd5 Client Credential Flow implementation 2025-11-07 15:49:01 +01:00
quark
68341a2a7e Add test for oauth2 flow, add temporary ROPB for NoteApp #137 2025-11-07 10:41:10 +01:00
quark
d2cc1b902d allows mask for Oauth2 2025-10-17 17:45:41 +02:00
ehouarn
4c40566513 Merge branch 'small_features' into 'main'
Small features

See merge request bde/nk20!355
2025-10-16 20:25:02 +02:00
Ehouarn
7c45b59298 Fixed treasury test 2025-10-16 20:05:27 +02:00
ehouarn
418268db27 Merge branch 'update_invoice_template' into 'main'
Replace Diolistos_bg.jpg

See merge request bde/nk20!354
2025-10-12 18:49:43 +02:00
ehouarn
73045586a3 Replace Diolistos_bg.jpg 2025-10-12 18:26:39 +02:00
quark
22d668a75c membership date end 2025-10-02 19:11:26 +02:00
quark
5dfa12fad2 update django_polymorphic (3.1 to 3.2) 2025-10-02 18:58:59 +02:00
Ehouarn
5af69f719d First step to re-write logic of SogeCredit validity 2025-09-28 22:13:52 +02:00
Ehouarn
4f6b1d5b6c More open food 2025-09-28 21:51:54 +02:00
14 changed files with 641 additions and 24 deletions

View File

@@ -74,11 +74,15 @@ class FoodListView(ProtectQuerysetMixin, LoginRequiredMixin, MultiTableMixin, Li
search_table = qs.filter(PermissionBackend.filter_queryset(self.request, Food, 'view')) search_table = qs.filter(PermissionBackend.filter_queryset(self.request, Food, 'view'))
# table open # table open
open_table = self.get_queryset().order_by('expiry_date').filter( open_table = self.get_queryset().filter(
Q(polymorphic_ctype__model='transformedfood') Q(polymorphic_ctype__model='transformedfood')
| Q(polymorphic_ctype__model='basicfood', basicfood__date_type='DLC')).filter( | Q(polymorphic_ctype__model='basicfood', basicfood__date_type='DLC')).filter(
expiry_date__lt=timezone.now(), end_of_life='').filter( expiry_date__lt=timezone.now(), end_of_life='').filter(
PermissionBackend.filter_queryset(self.request, Food, 'view')) PermissionBackend.filter_queryset(self.request, Food, 'view'))
open_table = open_table.union(self.get_queryset().filter(
Q(end_of_life='', order__iexact='open')
).filter(
PermissionBackend.filter_queryset(self.request, Food, 'view'))).order_by('expiry_date')
# table served # table served
served_table = self.get_queryset().order_by('-pk').filter( served_table = self.get_queryset().order_by('-pk').filter(
end_of_life='', is_ready=True).exclude( end_of_life='', is_ready=True).exclude(

View File

@@ -417,7 +417,7 @@ class Membership(models.Model):
A membership is valid if today is between the start and the end date. A membership is valid if today is between the start and the end date.
""" """
if self.date_end is not None: if self.date_end is not None:
return self.date_start.toordinal() <= datetime.datetime.now().toordinal() < self.date_end.toordinal() return self.date_start.toordinal() <= datetime.datetime.now().toordinal() <= self.date_end.toordinal()
else: else:
return self.date_start.toordinal() <= datetime.datetime.now().toordinal() return self.date_start.toordinal() <= datetime.datetime.now().toordinal()

View File

@@ -228,7 +228,7 @@ function consume (source, source_alias, dest, quantity, amount, reason, type, ca
addMsg(interpolate(gettext('Warning, the transaction from the note %s succeed, ' + addMsg(interpolate(gettext('Warning, the transaction from the note %s succeed, ' +
'but the emitter note %s is negative.'), [source_alias, source_alias]), 'warning', 30000) 'but the emitter note %s is negative.'), [source_alias, source_alias]), 'warning', 30000)
} }
if (source.membership && source.membership.date_end < new Date().toISOString()) { if (source.membership && source.membership.date_end <= new Date().toISOString()) {
addMsg(interpolate(gettext('Warning, the emitter note %s is no more a BDE member.'), [source_alias]), addMsg(interpolate(gettext('Warning, the emitter note %s is no more a BDE member.'), [source_alias]),
'danger', 30000) 'danger', 30000)
} }

View File

@@ -310,10 +310,10 @@ $('#btn_transfer').click(function () {
destination: dest.note.id, destination: dest.note.id,
destination_alias: dest.name destination_alias: dest.name
}).done(function () { }).done(function () {
if (source.note.membership && source.note.membership.date_end < new Date().toISOString()) { if (source.note.membership && source.note.membership.date_end <= new Date().toISOString()) {
addMsg(interpolate(gettext('Warning, the emitter note %s is no more a BDE member.'), [source.name]), 'danger', 30000) addMsg(interpolate(gettext('Warning, the emitter note %s is no more a BDE member.'), [source.name]), 'danger', 30000)
} }
if (dest.note.membership && dest.note.membership.date_end < new Date().toISOString()) { if (dest.note.membership && dest.note.membership.date_end <= new Date().toISOString()) {
addMsg(interpolate(gettext('Warning, the destination note %s is no more a BDE member.'), [dest.name]), 'danger', 30000) addMsg(interpolate(gettext('Warning, the destination note %s is no more a BDE member.'), [dest.name]), 'danger', 30000)
} }
@@ -414,7 +414,7 @@ $('#btn_transfer').click(function () {
bank: $('#bank').val() bank: $('#bank').val()
}).done(function () { }).done(function () {
addMsg(gettext('Credit/debit succeed!'), 'success', 10000) addMsg(gettext('Credit/debit succeed!'), 'success', 10000)
if (user_note.membership && user_note.membership.date_end < new Date().toISOString()) { addMsg(gettext('Warning, the emitter note %s is no more a BDE member.'), 'danger', 10000) } if (user_note.membership && user_note.membership.date_end <= new Date().toISOString()) { addMsg(gettext('Warning, the emitter note %s is no more a BDE member.'), 'danger', 10000) }
reset() reset()
}).fail(function (err) { }).fail(function (err) {
const errObj = JSON.parse(err.responseText) const errObj = JSON.parse(err.responseText)

View File

@@ -26,24 +26,42 @@ class PermissionBackend(ModelBackend):
@staticmethod @staticmethod
@memoize @memoize
def get_raw_permissions(request, t): def get_raw_permissions(request, t): # noqa: C901
""" """
Query permissions of a certain type for a user, then memoize it. Query permissions of a certain type for a user, then memoize it.
:param request: The current request :param request: The current request
:param t: The type of the permissions: view, change, add or delete :param t: The type of the permissions: view, change, add or delete
:return: The queryset of the permissions of the user (memoized) grouped by clubs :return: The queryset of the permissions of the user (memoized) grouped by clubs
""" """
if hasattr(request, 'auth') and request.auth is not None and hasattr(request.auth, 'scope'): # Permission for auth
if hasattr(request, 'oauth2') and request.oauth2 is not None and 'scope' in request.oauth2:
# OAuth2 Authentication # OAuth2 Authentication
user = request.oauth2['user']
def permission_filter(membership_obj):
query = Q(pk=-1)
for scope in request.oauth2['scope']:
if scope == "openid":
continue
permission_id, club_id = scope.split('_')
if int(club_id) == membership_obj.club_id:
query |= Q(pk=permission_id, mask__rank__lte=request.oauth2['mask'])
return query
# Restreint token permission to his scope
elif hasattr(request, 'auth') and request.auth is not None and hasattr(request.auth, 'scope'):
user = request.auth.user user = request.auth.user
def permission_filter(membership_obj): def permission_filter(membership_obj):
query = Q(pk=-1) query = Q(pk=-1)
for scope in request.auth.scope.split(' '): for scope in request.auth.scope.split(' '):
if scope == "openid" or scope == "0_0":
continue
permission_id, club_id = scope.split('_') permission_id, club_id = scope.split('_')
if int(club_id) == membership_obj.club_id: if int(club_id) == membership_obj.club_id:
query |= Q(pk=permission_id) query |= Q(pk=permission_id)
return query return query
else: else:
user = request.user user = request.user
@@ -77,7 +95,6 @@ class PermissionBackend(ModelBackend):
:param type: The type of the permissions: view, change, add or delete :param type: The type of the permissions: view, change, add or delete
:return: A generator of the requested permissions :return: A generator of the requested permissions
""" """
if hasattr(request, 'auth') and request.auth is not None and hasattr(request.auth, 'scope'): if hasattr(request, 'auth') and request.auth is not None and hasattr(request.auth, 'scope'):
# OAuth2 Authentication # OAuth2 Authentication
user = request.auth.user user = request.auth.user

View File

@@ -10,6 +10,8 @@ from note_kfet.middlewares import get_current_request
from .backends import PermissionBackend from .backends import PermissionBackend
from .models import Permission from .models import Permission
from django.utils.translation import gettext_lazy as _
class PermissionScopes(BaseScopes): class PermissionScopes(BaseScopes):
""" """
@@ -23,7 +25,9 @@ class PermissionScopes(BaseScopes):
if 'scopes' in kwargs: if 'scopes' in kwargs:
for scope in kwargs['scopes']: for scope in kwargs['scopes']:
if scope == 'openid': if scope == 'openid':
scopes['openid'] = "OpenID Connect" scopes['openid'] = _("OpenID Connect (username and email)")
elif scope == '0_0':
scopes['0_0'] = _("Useless scope which do nothing")
else: else:
p = Permission.objects.get(id=scope.split('_')[0]) p = Permission.objects.get(id=scope.split('_')[0])
club = Club.objects.get(id=scope.split('_')[1]) club = Club.objects.get(id=scope.split('_')[1])
@@ -32,7 +36,8 @@ class PermissionScopes(BaseScopes):
scopes = {f"{p.id}_{club.id}": f"{p.description} (club {club.name})" scopes = {f"{p.id}_{club.id}": f"{p.description} (club {club.name})"
for p in Permission.objects.all() for club in Club.objects.all()} for p in Permission.objects.all() for club in Club.objects.all()}
scopes['openid'] = "OpenID Connect" scopes['openid'] = _("OpenID Connect (username and email)")
scopes['0_0'] = _("Useless scope which do nothing")
return scopes return scopes
def get_available_scopes(self, application=None, request=None, *args, **kwargs): def get_available_scopes(self, application=None, request=None, *args, **kwargs):
@@ -41,7 +46,7 @@ class PermissionScopes(BaseScopes):
scopes = [f"{p.id}_{p.membership.club.id}" scopes = [f"{p.id}_{p.membership.club.id}"
for t in Permission.PERMISSION_TYPES for t in Permission.PERMISSION_TYPES
for p in PermissionBackend.get_raw_permissions(get_current_request(), t[0])] for p in PermissionBackend.get_raw_permissions(get_current_request(), t[0])]
scopes.append('openid') scopes.append('0_0') # always available
return scopes return scopes
def get_default_scopes(self, application=None, request=None, *args, **kwargs): def get_default_scopes(self, application=None, request=None, *args, **kwargs):
@@ -49,7 +54,7 @@ class PermissionScopes(BaseScopes):
return [] return []
scopes = [f"{p.id}_{p.membership.club.id}" scopes = [f"{p.id}_{p.membership.club.id}"
for p in PermissionBackend.get_raw_permissions(get_current_request(), 'view')] for p in PermissionBackend.get_raw_permissions(get_current_request(), 'view')]
scopes.append('openid') scopes = ['0_0'] # always default
return scopes return scopes
@@ -67,10 +72,77 @@ class PermissionOAuth2Validator(OAuth2Validator):
"email": request.user.email, "email": request.user.email,
} }
def get_userinfo_claims(self, request):
claims = super().get_userinfo_claims(request)
claims['is_active'] = request.user.is_active
return claims
def get_discovery_claims(self, request): def get_discovery_claims(self, request):
claims = super().get_discovery_claims(self) claims = super().get_discovery_claims(self)
return claims + ["name", "normalized_name", "email"] return claims + ["name", "normalized_name", "email"]
def validate_client_credentials_scopes(self, client_id, scopes, client, request, *args, **kwargs):
"""
For client credentials valid scopes are scope of the app owner
"""
valid_scopes = set()
request.oauth2 = {}
request.oauth2['user'] = client.user
request.oauth2['user'].is_anomymous = False
request.oauth2['scope'] = scopes
# mask implementation
if hasattr(request.decoded_body, 'mask'):
try:
request.oauth2['mask'] = int(request.decoded_body['mask'])
except ValueError:
request.oauth2['mask'] = 42
else:
request.oauth2['mask'] = 42
for t in Permission.PERMISSION_TYPES:
for p in PermissionBackend.get_raw_permissions(request, t[0]):
scope = f"{p.id}_{p.membership.club.id}"
if scope in scopes:
valid_scopes.add(scope)
# Always give one scope to generate token
if not valid_scopes:
valid_scopes.add('0_0')
request.scopes = valid_scopes
return valid_scopes
def validate_ropb_scopes(self, client_id, scopes, client, request, *args, **kwargs):
"""
For ROPB valid scopes are scope of the user
"""
valid_scopes = set()
request.oauth2 = {}
request.oauth2['user'] = request.user
request.oauth2['user'].is_anomymous = False
request.oauth2['scope'] = scopes
# mask implementation
if hasattr(request.decoded_body, 'mask'):
try:
request.oauth2['mask'] = int(request.decoded_body['mask'])
except ValueError:
request.oauth2['mask'] = 42
else:
request.oauth2['mask'] = 42
for t in Permission.PERMISSION_TYPES:
for p in PermissionBackend.get_raw_permissions(request, t[0]):
scope = f"{p.id}_{p.membership.club.id}"
if scope in scopes:
valid_scopes.add(scope)
# Always give one scope to generate token
if not valid_scopes:
valid_scopes.add('0_0')
request.scopes = valid_scopes
return valid_scopes
def validate_scopes(self, client_id, scopes, client, request, *args, **kwargs): def validate_scopes(self, client_id, scopes, client, request, *args, **kwargs):
""" """
User can request as many scope as he wants, including invalid scopes, User can request as many scope as he wants, including invalid scopes,
@@ -79,17 +151,35 @@ class PermissionOAuth2Validator(OAuth2Validator):
This allows clients to request more permission to get finally a This allows clients to request more permission to get finally a
subset of permissions. subset of permissions.
""" """
valid_scopes = set()
if hasattr(request, 'grant_type') and request.grant_type == 'client_credentials':
return self.validate_client_credentials_scopes(client_id, scopes, client, request, args, kwargs)
if hasattr(request, 'grant_type') and request.grant_type == 'password':
return self.validate_ropb_scopes(client_id, scopes, client, request, args, kwargs)
# Authorization code and Implicit are the same for scope, OIDC it's only a layer
valid_scopes = set() valid_scopes = set()
req = get_current_request()
request.oauth2 = {}
request.oauth2['user'] = req.user
request.oauth2['scope'] = scopes
# mask implementation
request.oauth2['mask'] = req.session.load()['permission_mask']
for t in Permission.PERMISSION_TYPES: for t in Permission.PERMISSION_TYPES:
for p in PermissionBackend.get_raw_permissions(get_current_request(), t[0]): for p in PermissionBackend.get_raw_permissions(request, t[0]):
scope = f"{p.id}_{p.membership.club.id}" scope = f"{p.id}_{p.membership.club.id}"
if scope in scopes: if scope in scopes:
valid_scopes.add(scope) valid_scopes.add(scope)
if 'openid' in scopes: # We grant openid scope if user is active
if 'openid' in scopes and req.user.is_active:
valid_scopes.add('openid') valid_scopes.add('openid')
# Always give one scope to generate token
if not valid_scopes:
valid_scopes.add('0_0')
request.scopes = valid_scopes request.scopes = valid_scopes
return valid_scopes return valid_scopes

View File

@@ -21,6 +21,7 @@ class OAuth2TestCase(TestCase):
def setUp(self): def setUp(self):
self.user = User.objects.create( self.user = User.objects.create(
username="toto", username="toto",
password="toto1234",
) )
self.application = Application.objects.create( self.application = Application.objects.create(
name="Test", name="Test",
@@ -92,3 +93,40 @@ class OAuth2TestCase(TestCase):
self.assertEqual(resp.status_code, 200) self.assertEqual(resp.status_code, 200)
self.assertIn(self.application, resp.context['scopes']) self.assertIn(self.application, resp.context['scopes'])
self.assertIn('1_1', resp.context['scopes'][self.application]) # Now the user has this permission self.assertIn('1_1', resp.context['scopes'][self.application]) # Now the user has this permission
def test_oidc(self):
"""
Ensure OIDC work
"""
# Create access token that has access to our own user detail
token = AccessToken.objects.create(
user=self.user,
application=self.application,
scope="openid",
token=get_random_string(64),
expires=timezone.now() + timedelta(days=365),
)
# No access without token
resp = self.client.get('/o/userinfo/') # userinfo endpoint
self.assertEqual(resp.status_code, 401)
# Valid token
resp = self.client.get('/o/userinfo/', **{'Authorization': f'Bearer {token.token}'})
self.assertEqual(resp.status_code, 200)
# Create membership to test api
NoteUser.objects.create(user=self.user)
membership = Membership.objects.create(user=self.user, club_id=1)
membership.roles.add(Role.objects.get(name="Adhérent⋅e BDE"))
membership.save()
# Token can always be use to see yourself
resp = self.client.get('/api/me/',
**{'Authorization': f'Bearer {token.token}'})
# Token is not granted to see other api
resp = self.client.get(f'/api/members/profile/{self.user.profile.pk}/',
**{'Authorization': f'Bearer {token.token}'})
self.assertEqual(resp.status_code, 404)

View File

@@ -0,0 +1,444 @@
# Copyright (C) 2018-2025 by BDE ENS Paris-Saclay
# SPDX-License-Identifier: GPL-3.0-or-later
import base64
import hashlib
from django.contrib.auth.hashers import PBKDF2PasswordHasher
from django.contrib.auth.models import User
from django.utils.crypto import get_random_string
from django.test import TestCase
from member.models import Membership, Club
from note.models import NoteUser
from oauth2_provider.models import Application, AccessToken, Grant
from ..models import Role, Permission
class OAuth2FlowTestCase(TestCase):
fixtures = ('initial', )
def setUp(self):
self.user_password = "toto1234"
hasher = PBKDF2PasswordHasher()
self.user = User.objects.create(
username="toto",
password=hasher.encode(self.user_password, hasher.salt()),
)
NoteUser.objects.create(user=self.user)
membership = Membership.objects.create(user=self.user, club_id=1)
membership.roles.add(Role.objects.get(name="Adhérent⋅e BDE"))
membership.save()
bde = Club.objects.get(name="BDE")
view_user_perm = Permission.objects.get(pk=1) # View own user detail
self.base_scope = f'{view_user_perm.pk}_{bde.pk}'
def test_oauth2_authorization_code_flow(self):
"""
Ensure OAuth2 Authorization Code Flow work
"""
app = Application.objects.create(
name="Test Authorization Code",
client_type=Application.CLIENT_CONFIDENTIAL,
authorization_grant_type=Application.GRANT_AUTHORIZATION_CODE,
user=self.user,
hash_client_secret=False,
redirect_uris='http://127.0.0.1:8000/noexist/callback',
algorithm=Application.NO_ALGORITHM,
)
credential = base64.b64encode(f'{app.client_id}:{app.client_secret}'.encode('utf-8')).decode()
############################
# Minimal RFC6749 requests #
############################
resp = self.client.get('/o/authorize/',
data={"response_type": "code", # REQUIRED
"client_id": app.client_id}, # REQUIRED
**{"Content-Type": 'application/x-www-form-urlencoded'})
# Get user authorization
##################################################################################
url = resp.url
csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0]
resp = self.client.post(url,
data={"username": self.user.username,
"password": self.user_password,
"permission_mask": 1,
"csrfmiddlewaretoken": csrf_token})
url = resp.url
resp = self.client.get(url)
csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0]
resp = self.client.post(url,
follow=True,
data={"allow": "Authorize",
"scope": "0_0",
"csrfmiddlewaretoken": csrf_token,
"response_type": "code",
"client_id": app.client_id,
"redirect_uri": app.redirect_uris})
keys = resp.request['QUERY_STRING'].split("&")
for key in keys:
if len(key.split('code=')) == 2:
code = key.split('code=')[1]
##################################################################################
grant = Grant.objects.get(code=code)
self.assertEqual(grant.scope, '0_0')
# Now we can ask an Access Token
resp = self.client.post('/o/token/',
data={"grant_type": 'authorization_code', # REQUIRED
"code": code}, # REQUIRED
**{"Content-Type": 'application/x-www-form-urlencoded',
"HTTP_Authorization": f'Basic {credential}'})
# We should have refresh token
self.assertEqual('refresh_token' in resp.json(), True)
token = AccessToken.objects.get(token=resp.json()['access_token'])
# Token do nothing, it should be have the useless scope
self.assertEqual(token.scope, '0_0')
# Logout user
self.client.logout()
#############################################
# Maximal RFC6749 + RFC7636 (PKCE) requests #
#############################################
state = get_random_string(32)
# PKCE
code_verifier = get_random_string(100) # 43-128 characters [A-Z,a-z,0-9,"-",".","_","~"]
code_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest()
code_challenge = base64.urlsafe_b64encode(code_challenge).decode('utf-8').replace('=', '')
cc_method = "S256"
resp = self.client.get('/o/authorize/',
data={"response_type": "code", # REQUIRED
"code_challenge": code_challenge, # PKCE REQUIRED
"code_challenge_method": cc_method, # PKCE REQUIRED
"client_id": app.client_id, # REQUIRED
"redirect_uri": app.redirect_uris, # OPTIONAL
"scope": self.base_scope, # OPTIONAL
"state": state}, # RECOMMENDED
**{"Content-Type": 'application/x-www-form-urlencoded'})
# Get user authorization
##################################################################################
url = resp.url
csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0]
resp = self.client.post(url,
data={"username": self.user.username,
"password": self.user_password,
"permission_mask": 1,
"csrfmiddlewaretoken": csrf_token})
url = resp.url
resp = self.client.get(url)
csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0]
resp = self.client.post(url,
follow=True,
data={"allow": "Authorize",
"scope": self.base_scope,
"csrfmiddlewaretoken": csrf_token,
"response_type": "code",
"code_challenge": code_challenge,
"code_challenge_method": cc_method,
"client_id": app.client_id,
"state": state,
"redirect_uri": app.redirect_uris})
keys = resp.request['QUERY_STRING'].split("&")
for key in keys:
if len(key.split('code=')) == 2:
code = key.split('code=')[1]
if len(key.split('state=')) == 2:
resp_state = key.split('state=')[1]
##################################################################################
grant = Grant.objects.get(code=code)
self.assertEqual(grant.scope, self.base_scope)
self.assertEqual(state, resp_state)
# Now we can ask an Access Token
resp = self.client.post('/o/token/',
data={"grant_type": 'authorization_code', # REQUIRED
"code": code, # REQUIRED
"code_verifier": code_verifier, # PKCE REQUIRED
"redirect_uri": app.redirect_uris}, # REQUIRED
**{"Content-Type": 'application/x-www-form-urlencoded',
"HTTP_Authorization": f'Basic {credential}'})
# We should have refresh token
self.assertEqual('refresh_token' in resp.json(), True)
token = AccessToken.objects.get(token=resp.json()['access_token'])
# Token can have access, it shouldn't have the useless scope
self.assertEqual(token.scope, self.base_scope)
def test_oauth2_implicit_flow(self):
"""
Ensure OAuth2 Implicit Flow work
"""
app = Application.objects.create(
name="Test Implicit Flow",
client_type=Application.CLIENT_CONFIDENTIAL,
authorization_grant_type=Application.GRANT_IMPLICIT,
user=self.user,
hash_client_secret=False,
algorithm=Application.NO_ALGORITHM,
redirect_uris='http://127.0.0.1:8000/noexist/callback/',
)
############################
# Minimal RFC6749 requests #
############################
resp = self.client.get('/o/authorize/',
data={'response_type': 'token', # REQUIRED
'client_id': app.client_id}, # REQUIRED
**{"Content-Type": 'application/x-www-form-urlencoded'}
)
# Get user authorization
##################################################################################
url = resp.url
csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0]
resp = self.client.post(url,
data={"username": self.user.username,
"password": self.user_password,
"permission_mask": 1,
"csrfmiddlewaretoken": csrf_token})
url = resp.url
resp = self.client.get(url)
csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0]
resp = self.client.post(url,
follow=True,
data={"allow": "Authorize",
"scope": '0_0',
"csrfmiddlewaretoken": csrf_token,
"response_type": "token",
"client_id": app.client_id,
"redirect_uri": app.redirect_uris})
url = resp.redirect_chain[0][0]
keys = url.split('#')[1]
refresh_token = ''
for couple in keys.split('&'):
if couple.split('=')[0] == 'access_token':
token = couple.split('=')[1]
if couple.split('=')[0] == 'refresh_token':
refresh_token = couple.split('=')[1]
##################################################################################
self.assertEqual(refresh_token, '')
access_token = AccessToken.objects.get(token=token)
# Token do nothing, it should be have the useless scope
self.assertEqual(access_token.scope, '0_0')
# Logout user
self.client.logout()
############################
# Maximal RFC6749 requests #
############################
state = get_random_string(32)
resp = self.client.get('/o/authorize/',
data={'response_type': 'token', # REQUIRED
'client_id': app.client_id, # REQUIRED
'redirect_uri': app.redirect_uris, # OPTIONAL
'scope': self.base_scope, # OPTIONAL
'state': state}, # RECOMMENDED
**{"Content-Type": 'application/x-www-form-urlencoded'}
)
# Get user authorization
##################################################################################
url = resp.url
csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0]
resp = self.client.post(url,
data={"username": self.user.username,
"password": self.user_password,
"permission_mask": 1,
"csrfmiddlewaretoken": csrf_token})
url = resp.url
resp = self.client.get(url)
csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0]
resp = self.client.post(url,
follow=True,
data={"allow": "Authorize",
"scope": self.base_scope,
"state": state,
"csrfmiddlewaretoken": csrf_token,
"response_type": "token",
"client_id": app.client_id,
"redirect_uri": app.redirect_uris})
url = resp.redirect_chain[0][0]
keys = url.split('#')[1]
refresh_token = ''
for couple in keys.split('&'):
if couple.split('=')[0] == 'access_token':
token = couple.split('=')[1]
if couple.split('=')[0] == 'refresh_token':
refresh_token = couple.split('=')[1]
if couple.split('=')[0] == 'state':
resp_state = couple.split('=')[1]
##################################################################################
self.assertEqual(refresh_token, '')
access_token = AccessToken.objects.get(token=token)
# Token can have access, it shouldn't have the useless scope
self.assertEqual(access_token.scope, self.base_scope)
self.assertEqual(state, resp_state)
def test_oauth2_resource_owner_password_credentials_flow(self):
"""
Ensure OAuth2 Resource Owner Password Credentials Flow work
"""
app = Application.objects.create(
name="Test ROPB",
client_type=Application.CLIENT_CONFIDENTIAL,
authorization_grant_type=Application.GRANT_PASSWORD,
user=self.user,
hash_client_secret=False,
algorithm=Application.NO_ALGORITHM,
)
credential = base64.b64encode(f'{app.client_id}:{app.client_secret}'.encode('utf-8')).decode()
# No token without real password
resp = self.client.post('/o/token/',
data={"grant_type": "password", # REQUIRED
"username": self.user, # REQUIRED
"password": "password"}, # REQUIRED
**{"Content-Type": 'application/x-www-form-urlencoded',
"Http_Authorization": f'Basic {credential}'}
)
self.assertEqual(resp.status_code, 400)
resp = self.client.post('/o/token/',
data={"grant_type": "password", # REQUIRED
"username": self.user, # REQUIRED
"password": self.user_password}, # REQUIRED
**{"Content-Type": 'application/x-www-form-urlencoded',
"HTTP_Authorization": f'Basic {credential}'}
)
self.assertEqual(resp.status_code, 200)
access_token = AccessToken.objects.get(token=resp.json()['access_token'])
self.assertEqual('refresh_token' in resp.json(), True)
self.assertEqual(access_token.scope, '0_0') # token do nothing
# RFC6749 4.3.2 allows use of scope in ROPB token access request
resp = self.client.post('/o/token/',
data={"grant_type": "password", # REQUIRED
"username": self.user, # REQUIRED
"password": self.user_password, # REQUIRED
"scope": self.base_scope}, # OPTIONAL
**{"Content-Type": 'application/x-www-form-urlencoded',
"HTTP_Authorization": f'Basic {credential}'}
)
token = AccessToken.objects.get(token=resp.json()['access_token'])
self.assertEqual(token.scope, self.base_scope) # token do nothing more than base_scope
def test_oauth2_client_credentials(self):
"""
Ensure OAuth2 Client Credentials work
"""
app = Application.objects.create(
name="Test client_credentials",
client_type=Application.CLIENT_CONFIDENTIAL,
authorization_grant_type=Application.GRANT_CLIENT_CREDENTIALS,
user=self.user,
hash_client_secret=False,
algorithm=Application.NO_ALGORITHM,
)
# No token without credential
resp = self.client.post('/o/token/',
data={"grant_type": "client_credentials"}, # REQUIRED
**{"Content-Type": 'application/x-www-form-urlencoded'}
)
self.assertEqual(resp.status_code, 401)
# Access with credential
credential = base64.b64encode(f'{app.client_id}:{app.client_secret}'.encode('utf-8')).decode()
resp = self.client.post('/o/token/',
data={"grant_type": "client_credentials"}, # REQUIRED
**{'HTTP_Authorization': f'Basic {credential}',
"Content-Type": 'application/x-www-form-urlencoded'}
)
self.assertEqual(resp.status_code, 200)
token = AccessToken.objects.get(token=resp.json()['access_token'])
# Token do nothing, it should be have the useless scope
self.assertEqual(token.scope, '0_0')
# RFC6749 4.4.2 allows use of scope in client credential flow
resp = self.client.post('/o/token/',
data={"grant_type": "client_credentials", # REQUIRED
"scope": self.base_scope}, # OPTIONAL
**{'http_Authorization': f'Basic {credential}',
"Content-Type": 'application/x-www-form-urlencoded'}
)
self.assertEqual(resp.status_code, 200)
token = AccessToken.objects.get(token=resp.json()['access_token'])
# Token can have access, it shouldn't have the useless scope
self.assertEqual(token.scope, self.base_scope)

View File

@@ -0,0 +1,18 @@
# Generated by Django 5.2.6 on 2025-09-28 20:12
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('treasury', '0010_alter_invoice_bde'),
]
operations = [
migrations.AddField(
model_name='sogecredit',
name='valid',
field=models.BooleanField(blank=True, default=False, verbose_name='Valid'),
),
]

View File

@@ -308,6 +308,12 @@ class SogeCredit(models.Model):
null=True, null=True,
) )
valid = models.BooleanField(
default=False,
verbose_name=_("Valid"),
blank=True,
)
class Meta: class Meta:
verbose_name = _("Credit from the Société générale") verbose_name = _("Credit from the Société générale")
verbose_name_plural = _("Credits from the Société générale") verbose_name_plural = _("Credits from the Société générale")
@@ -338,7 +344,7 @@ class SogeCredit(models.Model):
credit_transaction.save() credit_transaction.save()
credit_transaction.refresh_from_db() credit_transaction.refresh_from_db()
self.credit_transaction = credit_transaction self.credit_transaction = credit_transaction
elif not self.valid: elif not self.valid_legacy:
self.credit_transaction.amount = self.amount self.credit_transaction.amount = self.amount
self.credit_transaction._force_save = True self.credit_transaction._force_save = True
self.credit_transaction.save() self.credit_transaction.save()
@@ -346,12 +352,12 @@ class SogeCredit(models.Model):
return super().save(*args, **kwargs) return super().save(*args, **kwargs)
@property @property
def valid(self): def valid_legacy(self):
return self.credit_transaction and self.credit_transaction.valid return self.credit_transaction and self.credit_transaction.valid
@property @property
def amount(self): def amount(self):
if self.valid: if self.valid_legacy:
return self.credit_transaction.total return self.credit_transaction.total
amount = 0 amount = 0
transactions_wei = self.transactions.filter(membership__club__weiclub__isnull=False) transactions_wei = self.transactions.filter(membership__club__weiclub__isnull=False)
@@ -365,7 +371,7 @@ class SogeCredit(models.Model):
The Sogé credit may be created after the user already paid its memberships. The Sogé credit may be created after the user already paid its memberships.
We query transactions and update the credit, if it is unvalid. We query transactions and update the credit, if it is unvalid.
""" """
if self.valid or not self.pk: if self.valid_legacy or not self.pk:
return return
# Soge do not pay BDE and kfet memberships since 2022 # Soge do not pay BDE and kfet memberships since 2022
@@ -405,7 +411,7 @@ class SogeCredit(models.Model):
Invalidating a Société générale delete the transaction of the bank if it was already created. Invalidating a Société générale delete the transaction of the bank if it was already created.
Treasurers must know what they do, With Great Power Comes Great Responsibility... Treasurers must know what they do, With Great Power Comes Great Responsibility...
""" """
if self.valid: if self.valid_legacy:
self.credit_transaction.valid = False self.credit_transaction.valid = False
self.credit_transaction.save() self.credit_transaction.save()
for tr in self.transactions.all(): for tr in self.transactions.all():
@@ -414,7 +420,7 @@ class SogeCredit(models.Model):
tr.save() tr.save()
def validate(self, force=False): def validate(self, force=False):
if self.valid and not force: if self.valid_legacy and not force:
# The credit is already done # The credit is already done
return return

Binary file not shown.

Before

Width:  |  Height:  |  Size: 284 KiB

After

Width:  |  Height:  |  Size: 104 KiB

View File

@@ -359,7 +359,7 @@ class TestSogeCredits(TestCase):
)) ))
self.assertRedirects(response, reverse("treasury:manage_soge_credit", args=(soge_credit.pk,)), 302, 200) self.assertRedirects(response, reverse("treasury:manage_soge_credit", args=(soge_credit.pk,)), 302, 200)
soge_credit.refresh_from_db() soge_credit.refresh_from_db()
self.assertTrue(soge_credit.valid) self.assertTrue(soge_credit.valid_legacy)
self.user.note.refresh_from_db() self.user.note.refresh_from_db()
self.assertEqual( self.assertEqual(
Transaction.objects.filter(Q(source=self.user.note) | Q(destination=self.user.note)).count(), 3) Transaction.objects.filter(Q(source=self.user.note) | Q(destination=self.user.note)).count(), 3)

View File

@@ -273,9 +273,9 @@ OAUTH2_PROVIDER = {
'REFRESH_TOKEN_EXPIRE_SECONDS': timedelta(days=14), 'REFRESH_TOKEN_EXPIRE_SECONDS': timedelta(days=14),
'PKCE_REQUIRED': False, # PKCE (fix a breaking change of django-oauth-toolkit 2.0.0) 'PKCE_REQUIRED': False, # PKCE (fix a breaking change of django-oauth-toolkit 2.0.0)
'OIDC_ENABLED': True, 'OIDC_ENABLED': True,
'OIDC_RP_INITIATED_LOGOUT_ENABLED': False,
'OIDC_RSA_PRIVATE_KEY': 'OIDC_RSA_PRIVATE_KEY':
os.getenv('OIDC_RSA_PRIVATE_KEY', 'CHANGE_ME_IN_ENV_SETTINGS').replace('\\n', '\n'), # for multilines os.getenv('OIDC_RSA_PRIVATE_KEY', 'CHANGE_ME_IN_ENV_SETTINGS').replace('\\n', '\n'), # for multilines
'SCOPES': { 'openid': "OpenID Connect scope" },
} }
# Take control on how widget templates are sourced # Take control on how widget templates are sourced

View File

@@ -12,7 +12,7 @@ django-filter~=25.1
django-mailer~=2.3.2 django-mailer~=2.3.2
django-oauth-toolkit~=3.0.1 django-oauth-toolkit~=3.0.1
django-phonenumber-field~=8.1.0 django-phonenumber-field~=8.1.0
django-polymorphic~=3.1.0 django-polymorphic~=4.1.0
djangorestframework~=3.16.0 djangorestframework~=3.16.0
django-rest-polymorphic~=0.1.10 django-rest-polymorphic~=0.1.10
django-tables2~=2.7.5 django-tables2~=2.7.5