mirror of
				https://gitlab.crans.org/bde/nk20
				synced 2025-11-04 01:12:08 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			352 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			352 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# Copyright (C) 2018-2025 by BDE ENS Paris-Saclay
 | 
						||
# SPDX-License-Identifier: GPL-3.0-or-later
 | 
						||
 | 
						||
import functools
 | 
						||
import json
 | 
						||
import operator
 | 
						||
from copy import copy
 | 
						||
 | 
						||
from django.contrib.contenttypes.models import ContentType
 | 
						||
from django.core.exceptions import ValidationError
 | 
						||
from django.core.mail import mail_admins
 | 
						||
from django.db import models, transaction
 | 
						||
from django.db.models import F, Q, Model
 | 
						||
from django.forms import model_to_dict
 | 
						||
from django.utils.translation import gettext_lazy as _
 | 
						||
 | 
						||
 | 
						||
class InstancedPermission:
 | 
						||
 | 
						||
    def __init__(self, model, query, type, field, mask, **kwargs):
 | 
						||
        self.model = model
 | 
						||
        self.raw_query = query
 | 
						||
        self.query = None
 | 
						||
        self.type = type
 | 
						||
        self.field = field
 | 
						||
        self.mask = mask
 | 
						||
        self.kwargs = kwargs
 | 
						||
 | 
						||
    def __repr__(self):
 | 
						||
        if self.field:
 | 
						||
            return _("Can {type} {model}.{field} in {query}").format(type=self.type, model=self.model, field=self.field, query=self.query)
 | 
						||
        else:
 | 
						||
            return _("Can {type} {model} in {query}").format(type=self.type, model=self.model, query=self.query)
 | 
						||
 | 
						||
    def __str__(self):
 | 
						||
        return self.__repr__()
 | 
						||
 | 
						||
    def applies(self, obj, permission_type, field_name=None):
 | 
						||
        """
 | 
						||
        Returns True if the permission applies to
 | 
						||
        the field `field_name` object `obj`
 | 
						||
        """
 | 
						||
 | 
						||
        if not isinstance(obj, self.model.model_class()):
 | 
						||
            # The permission does not apply to the model
 | 
						||
            return False
 | 
						||
 | 
						||
        if self.type == 'add':
 | 
						||
            if permission_type == self.type:
 | 
						||
                self.update_query()
 | 
						||
 | 
						||
                obj = copy(obj)
 | 
						||
                obj.pk = 0
 | 
						||
                with transaction.atomic():
 | 
						||
                    sid = transaction.savepoint()
 | 
						||
                    for o in self.model.model_class().objects.filter(pk=0).all():
 | 
						||
                        o._no_signal = True
 | 
						||
                        o._force_delete = True
 | 
						||
                        Model.delete(o)
 | 
						||
                        # An object with pk 0 wouldn't deleted. That's not normal, we alert admins.
 | 
						||
                        msg = "Lors de la vérification d'une permission d'ajout, un objet de clé primaire nulle était "\
 | 
						||
                              "encore présent.\n"\
 | 
						||
                              "Type de permission : " + self.type + "\n"\
 | 
						||
                              "Modèle : " + str(self.model) + "\n"\
 | 
						||
                              "Objet trouvé : " + str(model_to_dict(o)) + "\n\n"\
 | 
						||
                              "--\nLe BDE"
 | 
						||
                        mail_admins("[Note Kfet] Un objet a été supprimé de force", msg)
 | 
						||
 | 
						||
                    # Force insertion, no data verification, no trigger
 | 
						||
                    obj._force_save = True
 | 
						||
                    # We don't want to trigger any signal (log, ...)
 | 
						||
                    obj._no_signal = True
 | 
						||
                    Model.save(obj, force_insert=True)
 | 
						||
                    ret = self.model.model_class().objects.filter(self.query & Q(pk=0)).exists()
 | 
						||
                    transaction.savepoint_rollback(sid)
 | 
						||
 | 
						||
                return ret
 | 
						||
 | 
						||
        if permission_type == self.type:
 | 
						||
            if self.field and field_name != self.field:
 | 
						||
                return False
 | 
						||
            self.update_query()
 | 
						||
            return self.model.model_class().objects.filter(self.query & Q(pk=obj.pk)).exists()
 | 
						||
        else:
 | 
						||
            return False
 | 
						||
 | 
						||
    def update_query(self):
 | 
						||
        """
 | 
						||
        The query is not analysed in a first time. It is analysed at most once if needed.
 | 
						||
        :return:
 | 
						||
        """
 | 
						||
        if not self.query:
 | 
						||
            # noinspection PyProtectedMember
 | 
						||
            self.query = Permission._about(self.raw_query, **self.kwargs)
 | 
						||
 | 
						||
 | 
						||
class PermissionMask(models.Model):
 | 
						||
    """
 | 
						||
    Permissions that are hidden behind a mask
 | 
						||
    """
 | 
						||
    rank = models.PositiveSmallIntegerField(
 | 
						||
        unique=True,
 | 
						||
        verbose_name=_('rank'),
 | 
						||
    )
 | 
						||
 | 
						||
    description = models.CharField(
 | 
						||
        max_length=255,
 | 
						||
        unique=True,
 | 
						||
        verbose_name=_('description'),
 | 
						||
    )
 | 
						||
 | 
						||
    class Meta:
 | 
						||
        verbose_name = _("permission mask")
 | 
						||
        verbose_name_plural = _("permission masks")
 | 
						||
 | 
						||
    def __str__(self):
 | 
						||
        return self.description
 | 
						||
 | 
						||
 | 
						||
class Permission(models.Model):
 | 
						||
 | 
						||
    PERMISSION_TYPES = [
 | 
						||
        ('add', _('add')),
 | 
						||
        ('view', _('view')),
 | 
						||
        ('change', _('change')),
 | 
						||
        ('delete', _('delete'))
 | 
						||
    ]
 | 
						||
 | 
						||
    model = models.ForeignKey(
 | 
						||
        ContentType,
 | 
						||
        on_delete=models.CASCADE,
 | 
						||
        related_name='+',
 | 
						||
        verbose_name=_("model"),
 | 
						||
    )
 | 
						||
 | 
						||
    # A json encoded Q object with the following grammar
 | 
						||
    #  query -> [] | {}  (the empty query representing all objects)
 | 
						||
    #  query -> ["AND", query, ...]          AND multiple queries
 | 
						||
    #         | ["OR", query, ...]           OR multiple queries
 | 
						||
    #         | ["NOT", query]               Opposite of query
 | 
						||
    #  query -> {key: value, ...}            A list of fields and values of a Q object
 | 
						||
    #  key   -> string                       A field name
 | 
						||
    #  value -> int | string | bool | null   Literal values
 | 
						||
    #         | [parameter, ...]             A parameter. See compute_param for more details.
 | 
						||
    #         | {"F": oper}                  An F object
 | 
						||
    #  oper  -> [string, ...]                A parameter. See compute_param for more details.
 | 
						||
    #         | ["ADD", oper, ...]           Sum multiple F objects or literal
 | 
						||
    #         | ["SUB", oper, oper]          Substract two F objects or literal
 | 
						||
    #         | ["MUL", oper, ...]           Multiply F objects or literals
 | 
						||
    #         | int | string | bool | null   Literal values
 | 
						||
    #         | ["F", string]                A field
 | 
						||
    #
 | 
						||
    # Examples:
 | 
						||
    #  Q(is_superuser=True)  := {"is_superuser": true}
 | 
						||
    #  ~Q(is_superuser=True) := ["NOT", {"is_superuser": true}]
 | 
						||
    query = models.TextField(
 | 
						||
        verbose_name=_("query"),
 | 
						||
    )
 | 
						||
 | 
						||
    type = models.CharField(
 | 
						||
        max_length=15,
 | 
						||
        choices=PERMISSION_TYPES,
 | 
						||
        verbose_name=_("type"),
 | 
						||
    )
 | 
						||
 | 
						||
    mask = models.ForeignKey(
 | 
						||
        PermissionMask,
 | 
						||
        on_delete=models.PROTECT,
 | 
						||
        related_name="permissions",
 | 
						||
        verbose_name=_("mask"),
 | 
						||
    )
 | 
						||
 | 
						||
    field = models.CharField(
 | 
						||
        max_length=255,
 | 
						||
        blank=True,
 | 
						||
        verbose_name=_("field"),
 | 
						||
    )
 | 
						||
 | 
						||
    permanent = models.BooleanField(
 | 
						||
        default=False,
 | 
						||
        help_text=_("Tells if the permission should be granted even if the membership of the user is expired."),
 | 
						||
        verbose_name=_("permanent"),
 | 
						||
    )
 | 
						||
 | 
						||
    description = models.CharField(
 | 
						||
        max_length=255,
 | 
						||
        blank=True,
 | 
						||
        verbose_name=_("description"),
 | 
						||
    )
 | 
						||
 | 
						||
    class Meta:
 | 
						||
        unique_together = ('model', 'query', 'type', 'field')
 | 
						||
        verbose_name = _("permission")
 | 
						||
        verbose_name_plural = _("permissions")
 | 
						||
 | 
						||
    def __str__(self):
 | 
						||
        return self.description
 | 
						||
 | 
						||
    @transaction.atomic
 | 
						||
    def save(self, **kwargs):
 | 
						||
        self.full_clean()
 | 
						||
        super().save()
 | 
						||
 | 
						||
    def clean(self):
 | 
						||
        self.query = json.dumps(json.loads(self.query))
 | 
						||
        if self.field and self.type not in {'view', 'change'}:
 | 
						||
            raise ValidationError(_("Specifying field applies only to view and change permission types."))
 | 
						||
 | 
						||
    @staticmethod
 | 
						||
    def compute_f(oper, **kwargs):
 | 
						||
        if isinstance(oper, list):
 | 
						||
            if oper[0] == 'ADD':
 | 
						||
                return functools.reduce(operator.add, [Permission.compute_f(oper, **kwargs) for oper in oper[1:]])
 | 
						||
            elif oper[0] == 'SUB':
 | 
						||
                return Permission.compute_f(oper[1], **kwargs) - Permission.compute_f(oper[2], **kwargs)
 | 
						||
            elif oper[0] == 'MUL':
 | 
						||
                return functools.reduce(operator.mul, [Permission.compute_f(oper, **kwargs) for oper in oper[1:]])
 | 
						||
            elif oper[0] == 'F':
 | 
						||
                return F(oper[1])
 | 
						||
            else:
 | 
						||
                field = kwargs[oper[0]]
 | 
						||
                for i in range(1, len(oper)):
 | 
						||
                    field = getattr(field, oper[i])
 | 
						||
                return field
 | 
						||
        else:
 | 
						||
            return oper
 | 
						||
 | 
						||
    @staticmethod
 | 
						||
    def compute_param(value, **kwargs):
 | 
						||
        """
 | 
						||
        A parameter is given by a list. The first argument is the name of the parameter.
 | 
						||
        The parameters are the user, the club, and some classes (Note, ...)
 | 
						||
        If there are more arguments in the list, then attributes are queried.
 | 
						||
        For example, ["user", "note", "balance"] will return the balance of the note of the user.
 | 
						||
        If an argument is a list, then this is interpreted with a function call:
 | 
						||
            First argument is the name of the function, next arguments are parameters, and if there is a dict,
 | 
						||
            then the dict is given as kwargs.
 | 
						||
            For example: NoteUser.objects.filter(user__memberships__club__name="Kfet").all() is translated by:
 | 
						||
            ["NoteUser", "objects", ["filter", {"user__memberships__club__name": "Kfet"}], ["all"]]
 | 
						||
        """
 | 
						||
 | 
						||
        if not isinstance(value, list):
 | 
						||
            return value
 | 
						||
 | 
						||
        field = kwargs[value[0]]
 | 
						||
        for i in range(1, len(value)):
 | 
						||
            if isinstance(value[i], list):
 | 
						||
                if value[i][0] in kwargs:
 | 
						||
                    field = Permission.compute_param(value[i], **kwargs)
 | 
						||
                    continue
 | 
						||
 | 
						||
                if not hasattr(field, value[i][0]):
 | 
						||
                    return False
 | 
						||
 | 
						||
                field = getattr(field, value[i][0])
 | 
						||
                params = []
 | 
						||
                call_kwargs = {}
 | 
						||
                for j in range(1, len(value[i])):
 | 
						||
                    param = Permission.compute_param(value[i][j], **kwargs)
 | 
						||
                    if isinstance(param, dict):
 | 
						||
                        for key in param:
 | 
						||
                            val = Permission.compute_param(param[key], **kwargs)
 | 
						||
                            call_kwargs[key] = val
 | 
						||
                    else:
 | 
						||
                        params.append(param)
 | 
						||
                field = field(*params, **call_kwargs)
 | 
						||
            else:
 | 
						||
                if not hasattr(field, value[i]):
 | 
						||
                    return False
 | 
						||
 | 
						||
                field = getattr(field, value[i])
 | 
						||
        return field
 | 
						||
 | 
						||
    @staticmethod
 | 
						||
    def _about(query, **kwargs):
 | 
						||
        """
 | 
						||
        Translate JSON query into a Q query.
 | 
						||
        :param query: The JSON query
 | 
						||
        :param kwargs: Additional params
 | 
						||
        :return: A Q object
 | 
						||
        """
 | 
						||
        if len(query) == 0:
 | 
						||
            # The query is either [] or {} and
 | 
						||
            # applies to all objects of the model
 | 
						||
            # to represent this we return a trivial request
 | 
						||
            return Q(pk=F("pk"))
 | 
						||
        if isinstance(query, list):
 | 
						||
            if query[0] == 'AND':
 | 
						||
                return functools.reduce(operator.and_, [Permission._about(query, **kwargs) for query in query[1:]])
 | 
						||
            elif query[0] == 'OR':
 | 
						||
                return functools.reduce(operator.or_, [Permission._about(query, **kwargs) for query in query[1:]])
 | 
						||
            elif query[0] == 'NOT':
 | 
						||
                return ~Permission._about(query[1], **kwargs)
 | 
						||
            else:
 | 
						||
                return Q(pk=F("pk")) if Permission.compute_param(query, **kwargs) else ~Q(pk=F("pk"))
 | 
						||
        elif isinstance(query, dict):
 | 
						||
            q_kwargs = {}
 | 
						||
            for key in query:
 | 
						||
                value = query[key]
 | 
						||
                if isinstance(value, list):
 | 
						||
                    # It is a parameter we query its return value
 | 
						||
                    q_kwargs[key] = Permission.compute_param(value, **kwargs)
 | 
						||
                elif isinstance(value, dict):
 | 
						||
                    # It is an F object
 | 
						||
                    q_kwargs[key] = Permission.compute_f(value['F'], **kwargs)
 | 
						||
                else:
 | 
						||
                    q_kwargs[key] = value
 | 
						||
            return Q(**q_kwargs)
 | 
						||
        else:
 | 
						||
            # TODO: find a better way to crash here
 | 
						||
            raise Exception("query {} is wrong".format(query))
 | 
						||
 | 
						||
    def about(self, **kwargs):
 | 
						||
        """
 | 
						||
        Return an InstancedPermission with the parameters
 | 
						||
        replaced by their values and the query interpreted
 | 
						||
        """
 | 
						||
        query = json.loads(self.query)
 | 
						||
        # query = self._about(query, **kwargs)
 | 
						||
        return InstancedPermission(self.model, query, self.type, self.field, self.mask, **kwargs)
 | 
						||
 | 
						||
 | 
						||
class Role(models.Model):
 | 
						||
    """
 | 
						||
    Permissions associated with a Role
 | 
						||
    """
 | 
						||
    name = models.CharField(
 | 
						||
        max_length=255,
 | 
						||
        verbose_name=_("name"),
 | 
						||
    )
 | 
						||
 | 
						||
    permissions = models.ManyToManyField(
 | 
						||
        Permission,
 | 
						||
        verbose_name=_("permissions"),
 | 
						||
    )
 | 
						||
 | 
						||
    for_club = models.ForeignKey(
 | 
						||
        "member.Club",
 | 
						||
        verbose_name=_("for club"),
 | 
						||
        on_delete=models.PROTECT,
 | 
						||
        blank=True,
 | 
						||
        null=True,
 | 
						||
        default=None,
 | 
						||
    )
 | 
						||
 | 
						||
    class Meta:
 | 
						||
        verbose_name = _("role permissions")
 | 
						||
        verbose_name_plural = _("role permissions")
 | 
						||
 | 
						||
    def __str__(self):
 | 
						||
        return self.name
 |