from __future__ import annotations import json __id_counter__ = 0 def __generate_id__() : global __id_counter__ ret = __id_counter__ __id_counter__ += 1 return ret class PrivilegeError(Exception): code = 403 description = "Access Denied - this function cannot be executed with your current level of privilege" import logging if __name__ == "__main__" : from app_logging import logger_name else : from utility.app_logging import logger_name logger = logging.getLogger(logger_name + ".PRIVILEGE") class Privilege_Role : def __init__(self, name, description = None, exclusive : bool = False,id = None) : if not id : self.__id__ = __generate_id__() else : self.__id__ self.name = name self.exclusive = exclusive if description : self.description = description self.aggregated_roles = set([self]) @property def id(self): return self.__id__ @property def aggregated_roles_ids(self): ret = list() for item in self.aggregated_roles : ret.append(item.id) return ret class Privilege_Domain : def __init__(self, name, description = None,id = None) : if not id : self.__id__ = __generate_id__() else : self.__id__ self.name = name self.description = description self.__roles__ : set[Privilege_Role] = set() def add_role(self,role : Privilege_Role) : for tmp_role in self.__roles__ : if tmp_role.name == role.name : logger.error("can't have role with the same name in a specific domain") raise BaseException("can't have role with the same name in a specific domain") self.__roles__.add(role) def remove_role(self,role : Privilege_Role) : self.__roles__.remove(role) @property def id(self): return self.__id__ @property def roles(self): return self.__roles__.copy() def get_role_by_name(self,name : str): for tmp_role in self.__roles__ : if tmp_role.name == name : return tmp_role def get_role_by_id(self,id : int): for tmp_role in self.__roles__ : if tmp_role.id == id : return tmp_role #include role 2 in role 1 def include_role(self,role1 : Privilege_Role, role2 : Privilege_Role) : if role1 not in self.__roles__ or role2 not in self.__roles__ : logger.error("roles are not in domains.") raise Exception("roles are not in domains.") if role1.exclusive : logger.error("cannot include role, destination role is exclusive") raise Exception("cannot include role, destination role is exclusive") role1.aggregated_roles.add(role2) #uninclude role 2 in role 1 def uninclude_role(self,role1 : Privilege_Role, role2 : Privilege_Role) : if role1 not in self.__roles__ or role2 not in self.__roles__ : raise BaseException("roles are not in domains.") if role1 == role2 : raise BaseException("cannont include the role of himself") if role2 in role1.aggregated_roles : role1.aggregated_roles.remove(role2) @property def roles(self): return self.__roles__.copy() def check_role_by_id(self,role_id : int,role : Privilege_Role) : found = False for item in self.__roles__ : if item == role : found = True # at least one role is associated with this id if item.id == role_id : return True if not found : raise BaseException("id is not associated with any role !") return False def check_role_by_name(self,role_name : str,role : Privilege_Role) : found = False for item in self.__roles__ : if item == role : found = True # at least one role is associated with this id if item.name == role_name : return True if not found : raise BaseException("id is not associated with any role !") return False class Privilege_Manager : class Privilege_User : def __init__(self,manager : Privilege_Manager, privileges_data_str : str,is_super_admin : bool = False,id : int = None) -> None: if not id : self.__id__ = __generate_id__() else : self.__id__ = id self.is_super_admin : bool = is_super_admin self.manager = manager self.roles : set[Privilege_Role] = set() if privileges_data_str : privileges_data = json.loads(privileges_data_str) for key in privileges_data.keys() : self.roles.add(manager.get_role_by_domain_and_name(key,privileges_data[key])) def get_roles(self) -> set[Privilege_Role]: return self.roles def roles_to_jsonStr(self) -> str : ret_dict = dict() for role in self.roles : domain = self.manager.get_role_domain(role) ret_dict[domain.name] = role.name return json.dumps(ret_dict) @property def id(self): return self.__id__ __domains__ : set[Privilege_Domain] = set() __users__ : set[Privilege_User] = set() def create_domain(self, name : str, description : str = None) : for item in self.__domains__ : if item.name == name : logger.error("cannot have two privilege domain with the same name !") raise BaseException("cannot have two privilege domain with the same name !") domain = Privilege_Domain(name=name,description=description) self.__domains__.add(domain) logger.info("domain with name " + name + " created") def register_domain(self, domain : Privilege_Domain) : for item in self.__domains__ : if item.name == domain.name : logger.error("cannot have two privilege domain with the same name !") raise BaseException("cannot have two privilege domain with the same name !") self.__domains__.add(domain) logger.info("domain with name " + domain.name + " registered") def remove_domain(self, name : str) : for item in list(self.__domains__) : if item.name == name : self.__domains__.remove(item) logger.info("domain with name " + name + "removed") return logger.error("privilege domain remove failed ! item with name " + name + "not found") raise BaseException("privilege domain remove failed ! item not found") @property def domains(self): return self.__domains__.copy() def get_domain_by_name(self, name : str) : for item in self.__domains__ : if item.name == name : return item raise BaseException("domain not found with this name : " + str(name)) def check_role_by_name(self,domain_name : str, role_name : str,role : Privilege_Role) : domain = self.get_domain_by_name(domain_name) return domain.check_role_by_name(role_name,role) def get_roles_by_ids(self,ids : set[int]) -> set[Privilege_Role]: ret_roles = set() for id in ids : for domain in self.domains : try : ret_roles.add(domain.get_role_by_id(id)) except : pass return ret_roles def get_role_by_domain_and_name(self,domain_name : str, role_name : str) : domain = self.get_domain_by_name(domain_name) return domain.get_role_by_name(role_name) def get_roles_by_names(self,roles_names : set[tuple[str,str]]) : ret_roles = set() for item in roles_names : try : ret_roles.add(self.get_role_by_domain_and_name(item[0],item[1])) except : pass return ret_roles def is_role_registered(self,role : Privilege_Role) : for domain in self.domains : if role in domain.roles : return True return False def get_role_domain(self,role : Privilege_Role) : for domain in self.domains : for item in domain.roles : if item == role : return domain logger.error("this role is not associated with any domain") raise BaseException("this role is not associated with any domain") def roles_to_ids(roles : list[Privilege_Role]) : ret = [] for role in roles : ret.append(role.aggregated_roles_ids) return ret #its a DECORATOR def require_authorization(self,required_role : Privilege_Role, get_privilege_func : callable) : def wrapper_of_wrap(f) : def wrap(*args, **kwargs): if not self.is_role_registered(required_role) : raise BaseException("required role is not registered anywhere") logger.debug("checking authorization with roles for function " + f.__name__ + ": " ) logger.debug("required role : " + required_role.name ) privileges : Privilege_Manager.Privilege_User = get_privilege_func() if privileges.is_super_admin : logger.debug("user is super admin, access granted") return f(*args, **kwargs) for role in privileges.roles : #an user can have multiple roles and each role can include other roles. logger.debug("checking role : " + role.name) for aggregated_role in role.aggregated_roles : if aggregated_role == required_role : logger.debug("access granted") return f(*args, **kwargs) def raiser(*args, **kwargs): logger.warning("access denied for function :" + f.__name__) raise PrivilegeError("Access Denied") return raiser(*args, **kwargs) #we change the name of the function, for flash bluerprint, as we cannot register two function with the same name. wrap.__name__ = f.__name__ + "_authorization_wrap" return wrap return wrapper_of_wrap def test() : manager = Privilege_Manager() manager.create_domain(name="inventory",description="privilege domain for inventory") inventory_privilege_domain = manager.get_domain_by_name("inventory") role1 = Privilege_Role(name="simple") role2 = Privilege_Role(name="intermediate") role3 = Privilege_Role(name="speciale") role4 = Privilege_Role(name="admin") inventory_privilege_domain.add_role(role1) inventory_privilege_domain.add_role(role2) inventory_privilege_domain.add_role(role3) inventory_privilege_domain.add_role(role4) user_role_ids = [] def get_roles() : return user_role_ids def hello_simple() : print("hello_simple") def hello_intermediate() : print("hello_intermediate") @manager.require_authorization(role4,get_roles) def hello_admin(arg1,arg2) : print("hello_admin") print(str(arg1)) print(str(arg2)) @manager.require_authorization(role3,get_roles) def hello_speciale() : print("hello_speciale") inventory_privilege_domain.include_role(role4,role1) inventory_privilege_domain.include_role(role4,role2) inventory_privilege_domain.include_role(role4,role4) for item in inventory_privilege_domain.roles : print(item.name) try : hello_admin(23,45) return False except Exception as Ex: print(str(Ex)) user_role_ids.append(role4.id) hello_admin(23,45) hello_intermediate() try : hello_speciale() return False except Exception as Ex: print(str(Ex)) return True if __name__ == "__main__" : test()