from __future__ import annotations import functools __id_counter__ = 0 def __generate_id__() : global __id_counter__ ret = __id_counter__ __id_counter__ += 1 return ret class APIAuthError(Exception): code = 403 description = "Authentication Error" import logging if __name__ == "__main__" : from app_logging import logger_name else : from utility.app_logging import logger_name logger = logging.getLogger(logger_name + ".PRIVILEGE") class Privilege_Role : def __init__(self, name, description = None, exclusive : bool = False,id = None) : if not id : self.__id__ = __generate_id__() else : self.__id__ self.name = name self.exclusive = exclusive if description : self.description = description self.aggregated_roles = set([self]) @property def id(self): return self.__id__ @property def aggregated_roles_ids(self): ret = list() for item in self.aggregated_roles : ret.append(item.id) return ret class Privilege_Domain : def __init__(self, name, description = None,id = None) : if not id : self.__id__ = __generate_id__() else : self.__id__ self.name = name self.description = description self.__roles__ : set[Privilege_Role] = set() def add_role(self,role : Privilege_Role) : for tmp_role in self.__roles__ : if tmp_role.name == role.name : raise BaseException("can't have role with the same name in a specific domain") self.__roles__.add(role) def remove_role(self,role : Privilege_Role) : self.__roles__.remove(role) @property def id(self): return self.__id__ @property def roles(self): return self.__roles__.copy() def get_role_by_name(self,name : str): for tmp_role in self.__roles__ : if tmp_role.name == name : return tmp_role def get_role_by_id(self,id : int): for tmp_role in self.__roles__ : if tmp_role.id == id : return tmp_role #include role 2 in role 1 def include_role(self,role1 : Privilege_Role, role2 : Privilege_Role) : if role1 not in self.__roles__ or role2 not in self.__roles__ : raise BaseException("roles are not in domains.") if role1.exclusive : raise "cannot include role, destination role is exclusive" role1.aggregated_roles.add(role2) #uninclude role 2 in role 1 def uninclude_role(self,role1 : Privilege_Role, role2 : Privilege_Role) : if role1 not in self.__roles__ or role2 not in self.__roles__ : raise BaseException("roles are not in domains.") if role1 == role2 : raise BaseException("cannont include the role of himself") if role2 in role1.aggregated_roles : role1.aggregated_roles.remove(role2) @property def roles(self): return self.__roles__.copy() def check_role_by_id(self,role_id : int,role : Privilege_Role) : found = False for item in self.__roles__ : if item == role : found = True # at least one role is associated with this id if item.id == role_id : return True if not found : raise BaseException("id is not associated with any role !") return False def check_role_by_name(self,role_name : str,role : Privilege_Role) : found = False for item in self.__roles__ : if item == role : found = True # at least one role is associated with this id if item.name == role_name : return True if not found : raise BaseException("id is not associated with any role !") return False class Privilege_Manager : __domains__ : set[Privilege_Domain] = set() def create_domain(self, name : str, description : str = None) : for item in self.__domains__ : if item.name == name : raise BaseException("cannot have two privilege domain with the same name !") domain = Privilege_Domain(name=name,description=description) self.__domains__.add(domain) def remove_domain(self, name : str) : for item in list(self.__domains__) : if item.name == name : self.__domains__.remove(item) return raise BaseException("privilege domain remove failed ! item not found") @property def domains(self): return self.__domains__.copy() def get_domain_by_name(self, name : str) : for item in self.__domains__ : if item.name == name : return item raise BaseException("domain not found with this name : " + str(name)) def check_role_by_name(self,domain_name : str, role_name : str,role : Privilege_Role) : domain = self.get_domain_by_name(domain_name) return domain.check_role_by_name(role_name,role) def get_roles_by_ids(self,ids : set[int]) -> set[Privilege_Role]: ret_roles = set() for id in ids : for domain in self.domains : try : ret_roles.add(domain.get_role_by_id(id)) except : pass return ret_roles def get_role_by_domain_and_name(self,domain_name : str, role_name : str) : domain = self.get_domain_by_name(domain_name) return domain.get_role_by_name(role_name) def get_roles_by_names(self,roles_names : set[tuple[str,str]]) : ret_roles = set() for item in roles_names : try : ret_roles.add(self.get_role_by_domain_and_name(item[0],item[1])) except : pass return ret_roles def is_role_registered(self,role : Privilege_Role) : for domain in self.domains : if role in domain.roles : return True return False def get_role_domain(self,role : Privilege_Role) : for domain in self.domains : for item in domain.roles : if item == role : return domain raise BaseException("this role is not associated with any domain") #its a DECORATOR def require_authorization(self,required_role : Privilege_Role, ids_getter : callable) : def wrapper_of_wrap(f) : def wrap(*args, **kwargs): if not self.is_role_registered(required_role) : raise BaseException("role is not registered everywhere") logger.debug("checking authorization with roles for function " + f.__name__ + ": " ) logger.debug("required role : " + required_role.name ) for role in self.get_roles_by_ids(ids_getter()) : #an user can have multiple roles and each role can include other roles. logger.debug("checking role : " + role.name) for aggregated_role in role.aggregated_roles : if aggregated_role == required_role : return f(*args, **kwargs) def raiser(*args, **kwargs): logger.warning("access denied for function :" + f.__name__) raise APIAuthError("Access Denied") return raiser(*args, **kwargs) return wrap return wrapper_of_wrap def test() : manager = Privilege_Manager() manager.create_domain(name="inventory",description="privilege domain for inventory") inventory_privilege_domain = manager.get_domain_by_name("inventory") role1 = Privilege_Role(name="simple") role2 = Privilege_Role(name="intermediate") role3 = Privilege_Role(name="speciale") role4 = Privilege_Role(name="admin") inventory_privilege_domain.add_role(role1) inventory_privilege_domain.add_role(role2) inventory_privilege_domain.add_role(role3) inventory_privilege_domain.add_role(role4) user_role_ids = [] def get_roles() : return user_role_ids def hello_simple() : print("hello_simple") def hello_intermediate() : print("hello_intermediate") @manager.require_authorization(role4,get_roles) def hello_admin(arg1,arg2) : print("hello_admin") print(str(arg1)) print(str(arg2)) @manager.require_authorization(role3,get_roles) def hello_speciale() : print("hello_speciale") inventory_privilege_domain.include_role(role4,role1) inventory_privilege_domain.include_role(role4,role2) inventory_privilege_domain.include_role(role4,role4) for item in inventory_privilege_domain.roles : print(item.name) try : hello_admin(23,45) except Exception as Ex: print(str(Ex)) user_role_ids.append(role4.id) hello_admin(23,45) hello_intermediate() hello_speciale() if __name__ == "__main__" : test()