123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435 |
- from __future__ import annotations
- import json
- from itsdangerous import json
- __id_counter__ = 0
- def __generate_id__() :
- global __id_counter__
- ret = __id_counter__
- __id_counter__ += 1
- return ret
- class PrivilegeError(Exception):
- code = 403
- description = "Access Denied - this function cannot be executed with your current level of privilege"
- import logging
- if __name__ == "__main__" :
- from app_logging import logger_name
- else :
- from utility.app_logging import logger_name
- logger = logging.getLogger(logger_name + ".PRIVILEGE")
- class Privilege_Role :
- def __init__(self, name, description = None, exclusive : bool = False,id = None) :
- if not id :
- self.__id__ = __generate_id__()
- else :
- self.__id__
- self.name = name
- self.exclusive = exclusive
- if description :
- self.description = description
- self.aggregated_roles = set([self])
- @property
- def id(self):
- return self.__id__
- @property
- def aggregated_roles_ids(self):
- ret = list()
- for item in self.aggregated_roles :
- ret.append(item.id)
- return ret
- class Privilege_Domain :
- def __init__(self, name, description = None,id = None) :
- if not id :
- self.__id__ = __generate_id__()
- else :
- self.__id__
- self.name = name
- self.description = description
- self.__roles__ : set[Privilege_Role] = set()
- def add_role(self,role : Privilege_Role) :
- for tmp_role in self.__roles__ :
- if tmp_role.name == role.name :
- logger.error("can't have role with the same name in a specific domain")
- raise BaseException("can't have role with the same name in a specific domain")
- self.__roles__.add(role)
- def remove_role(self,role : Privilege_Role) :
- self.__roles__.remove(role)
- @property
- def id(self):
- return self.__id__
- @property
- def roles(self):
- return self.__roles__.copy()
- def get_role_by_name(self,name : str):
- for tmp_role in self.__roles__ :
- if tmp_role.name == name :
- return tmp_role
- def get_role_by_id(self,id : int):
- for tmp_role in self.__roles__ :
- if tmp_role.id == id :
- return tmp_role
- #include role 2 in role 1
- def include_role(self,role1 : Privilege_Role, role2 : Privilege_Role) :
- if role1 not in self.__roles__ or role2 not in self.__roles__ :
- logger.error("roles are not in domains.")
- raise Exception("roles are not in domains.")
- if role1.exclusive :
- logger.error("cannot include role, destination role is exclusive")
- raise Exception("cannot include role, destination role is exclusive")
- role1.aggregated_roles.add(role2)
- #uninclude role 2 in role 1
- def uninclude_role(self,role1 : Privilege_Role, role2 : Privilege_Role) :
- if role1 not in self.__roles__ or role2 not in self.__roles__ :
- raise BaseException("roles are not in domains.")
- if role1 == role2 :
- raise BaseException("cannont include the role of himself")
- if role2 in role1.aggregated_roles :
- role1.aggregated_roles.remove(role2)
- @property
- def roles(self):
- return self.__roles__.copy()
- def check_role_by_id(self,role_id : int,role : Privilege_Role) :
- found = False
- for item in self.__roles__ :
- if item == role :
- found = True # at least one role is associated with this id
- if item.id == role_id :
- return True
- if not found :
- raise BaseException("id is not associated with any role !")
- return False
- def check_role_by_name(self,role_name : str,role : Privilege_Role) :
- found = False
- for item in self.__roles__ :
- if item == role :
- found = True # at least one role is associated with this id
- if item.name == role_name :
- return True
- if not found :
- raise BaseException("id is not associated with any role !")
- return False
- class Privilege_Manager :
- class Privilege_User :
- def __init__(self,manager : Privilege_Manager, privileges_data_str : str,is_super_admin : bool = False,id : int = None) -> None:
- if not id :
- self.__id__ = __generate_id__()
- else :
- self.__id__ = id
- self.is_super_admin : bool = is_super_admin
- self.manager = manager
- self.roles : set[Privilege_Role] = set()
- if privileges_data_str :
- privileges_data = json.loads(privileges_data_str)
- for key in privileges_data.keys() :
- self.roles.add(manager.get_role_by_domain_and_name(key,privileges_data[key]))
- def get_roles(self) -> set[Privilege_Role]:
- return self.roles
- def roles_to_jsonStr(self) -> str :
- ret_dict = dict()
- for role in self.roles :
- domain = self.manager.get_role_domain(role)
- ret_dict[domain.name] = role.name
- return json.dumps(ret_dict)
- @property
- def id(self):
- return self.__id__
- __domains__ : set[Privilege_Domain] = set()
- __users__ : set[Privilege_User] = set()
- def create_domain(self, name : str, description : str = None) :
- for item in self.__domains__ :
- if item.name == name :
- logger.error("cannot have two privilege domain with the same name !")
- raise BaseException("cannot have two privilege domain with the same name !")
- domain = Privilege_Domain(name=name,description=description)
- self.__domains__.add(domain)
- logger.info("domain with name " + name + "created")
- def register_domain(self, domain : Privilege_Domain) :
- for item in self.__domains__ :
- if item.name == domain.name :
- logger.error("cannot have two privilege domain with the same name !")
- raise BaseException("cannot have two privilege domain with the same name !")
- self.__domains__.add(domain)
- logger.info("domain with name " + domain.name + "created")
- def remove_domain(self, name : str) :
- for item in list(self.__domains__) :
- if item.name == name :
- self.__domains__.remove(item)
- logger.info("domain with name " + name + "removed")
- return
- logger.error("privilege domain remove failed ! item with name " + name + "not found")
- raise BaseException("privilege domain remove failed ! item not found")
- @property
- def domains(self):
- return self.__domains__.copy()
- def get_domain_by_name(self, name : str) :
- for item in self.__domains__ :
- if item.name == name :
- return item
- raise BaseException("domain not found with this name : " + str(name))
- def check_role_by_name(self,domain_name : str, role_name : str,role : Privilege_Role) :
- domain = self.get_domain_by_name(domain_name)
- return domain.check_role_by_name(role_name,role)
- def get_roles_by_ids(self,ids : set[int]) -> set[Privilege_Role]:
- ret_roles = set()
- for id in ids :
- for domain in self.domains :
- try :
- ret_roles.add(domain.get_role_by_id(id))
- except :
- pass
- return ret_roles
- def get_role_by_domain_and_name(self,domain_name : str, role_name : str) :
- domain = self.get_domain_by_name(domain_name)
- return domain.get_role_by_name(role_name)
- def get_roles_by_names(self,roles_names : set[tuple[str,str]]) :
- ret_roles = set()
- for item in roles_names :
- try :
- ret_roles.add(self.get_role_by_domain_and_name(item[0],item[1]))
- except :
- pass
- return ret_roles
- def is_role_registered(self,role : Privilege_Role) :
- for domain in self.domains :
- if role in domain.roles :
- return True
- return False
- def get_role_domain(self,role : Privilege_Role) :
- for domain in self.domains :
- for item in domain.roles :
- if item == role :
- return domain
- logger.error("this role is not associated with any domain")
- raise BaseException("this role is not associated with any domain")
- def roles_to_ids(roles : list[Privilege_Role]) :
- ret = []
- for role in roles :
- ret.append(role.aggregated_roles_ids)
- return ret
- #its a DECORATOR
- def require_authorization(self,required_role : Privilege_Role, get_privilege_func : callable) :
- def wrapper_of_wrap(f) :
- def wrap(*args, **kwargs):
- if not self.is_role_registered(required_role) :
- raise BaseException("required role is not registered anywhere")
- logger.debug("checking authorization with roles for function " + f.__name__ + ": " )
- logger.debug("required role : " + required_role.name )
- privileges : Privilege_Manager.Privilege_User = get_privilege_func()
- if privileges.is_super_admin :
- logger.debug("user is super admin, access granted")
- return f(*args, **kwargs)
- for role in privileges.roles : #an user can have multiple roles and each role can include other roles.
- logger.debug("checking role : " + role.name)
- for aggregated_role in role.aggregated_roles :
- if aggregated_role == required_role :
- logger.debug("access granted")
- return f(*args, **kwargs)
- def raiser(*args, **kwargs):
- logger.warning("access denied for function :" + f.__name__)
- raise PrivilegeError("Access Denied")
- return raiser(*args, **kwargs)
- #we change the name of the function, for flash bluerprint, as we cannot register two function with the same name.
- wrap.__name__ = f.__name__ + "_authorization_wrap"
- return wrap
- return wrapper_of_wrap
- def test() :
- manager = Privilege_Manager()
- manager.create_domain(name="inventory",description="privilege domain for inventory")
- inventory_privilege_domain = manager.get_domain_by_name("inventory")
- role1 = Privilege_Role(name="simple")
- role2 = Privilege_Role(name="intermediate")
- role3 = Privilege_Role(name="speciale")
- role4 = Privilege_Role(name="admin")
- inventory_privilege_domain.add_role(role1)
- inventory_privilege_domain.add_role(role2)
- inventory_privilege_domain.add_role(role3)
- inventory_privilege_domain.add_role(role4)
- user_role_ids = []
- def get_roles() :
- return user_role_ids
- def hello_simple() :
- print("hello_simple")
- def hello_intermediate() :
- print("hello_intermediate")
- @manager.require_authorization(role4,get_roles)
- def hello_admin(arg1,arg2) :
- print("hello_admin")
- print(str(arg1))
- print(str(arg2))
- @manager.require_authorization(role3,get_roles)
- def hello_speciale() :
- print("hello_speciale")
- inventory_privilege_domain.include_role(role4,role1)
- inventory_privilege_domain.include_role(role4,role2)
- inventory_privilege_domain.include_role(role4,role4)
- for item in inventory_privilege_domain.roles :
- print(item.name)
- try :
- hello_admin(23,45)
- return False
- except Exception as Ex:
- print(str(Ex))
- user_role_ids.append(role4.id)
- hello_admin(23,45)
- hello_intermediate()
- try :
- hello_speciale()
- return False
- except Exception as Ex:
- print(str(Ex))
- return True
- if __name__ == "__main__" :
- test()
|