privilege_manager.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. from __future__ import annotations
  2. import json
  3. __id_counter__ = 0
  4. def __generate_id__() :
  5. global __id_counter__
  6. ret = __id_counter__
  7. __id_counter__ += 1
  8. return ret
  9. class PrivilegeError(Exception):
  10. code = 403
  11. description = "Access Denied - this function cannot be executed with your current level of privilege"
  12. import logging
  13. if __name__ == "__main__" :
  14. from app_logging import logger_name
  15. else :
  16. from utility.app_logging import logger_name
  17. logger = logging.getLogger(logger_name + ".PRIVILEGE")
  18. class Privilege_Role :
  19. def __init__(self, name, description = None, exclusive : bool = False,id = None) :
  20. if not id :
  21. self.__id__ = __generate_id__()
  22. else :
  23. self.__id__
  24. self.name = name
  25. self.exclusive = exclusive
  26. if description :
  27. self.description = description
  28. self.aggregated_roles = set([self])
  29. @property
  30. def id(self):
  31. return self.__id__
  32. @property
  33. def aggregated_roles_ids(self):
  34. ret = list()
  35. for item in self.aggregated_roles :
  36. ret.append(item.id)
  37. return ret
  38. class Privilege_Domain :
  39. def __init__(self, name, description = None,id = None) :
  40. if not id :
  41. self.__id__ = __generate_id__()
  42. else :
  43. self.__id__
  44. self.name = name
  45. self.description = description
  46. self.__roles__ : set[Privilege_Role] = set()
  47. def add_role(self,role : Privilege_Role) :
  48. for tmp_role in self.__roles__ :
  49. if tmp_role.name == role.name :
  50. logger.error("can't have role with the same name in a specific domain")
  51. raise BaseException("can't have role with the same name in a specific domain")
  52. self.__roles__.add(role)
  53. def remove_role(self,role : Privilege_Role) :
  54. self.__roles__.remove(role)
  55. @property
  56. def id(self):
  57. return self.__id__
  58. @property
  59. def roles(self):
  60. return self.__roles__.copy()
  61. def get_role_by_name(self,name : str):
  62. for tmp_role in self.__roles__ :
  63. if tmp_role.name == name :
  64. return tmp_role
  65. def get_role_by_id(self,id : int):
  66. for tmp_role in self.__roles__ :
  67. if tmp_role.id == id :
  68. return tmp_role
  69. #include role 2 in role 1
  70. def include_role(self,role1 : Privilege_Role, role2 : Privilege_Role) :
  71. if role1 not in self.__roles__ or role2 not in self.__roles__ :
  72. logger.error("roles are not in domains.")
  73. raise Exception("roles are not in domains.")
  74. if role1.exclusive :
  75. logger.error("cannot include role, destination role is exclusive")
  76. raise Exception("cannot include role, destination role is exclusive")
  77. role1.aggregated_roles.add(role2)
  78. #uninclude role 2 in role 1
  79. def uninclude_role(self,role1 : Privilege_Role, role2 : Privilege_Role) :
  80. if role1 not in self.__roles__ or role2 not in self.__roles__ :
  81. raise BaseException("roles are not in domains.")
  82. if role1 == role2 :
  83. raise BaseException("cannont include the role of himself")
  84. if role2 in role1.aggregated_roles :
  85. role1.aggregated_roles.remove(role2)
  86. @property
  87. def roles(self):
  88. return self.__roles__.copy()
  89. def check_role_by_id(self,role_id : int,role : Privilege_Role) :
  90. found = False
  91. for item in self.__roles__ :
  92. if item == role :
  93. found = True # at least one role is associated with this id
  94. if item.id == role_id :
  95. return True
  96. if not found :
  97. raise BaseException("id is not associated with any role !")
  98. return False
  99. def check_role_by_name(self,role_name : str,role : Privilege_Role) :
  100. found = False
  101. for item in self.__roles__ :
  102. if item == role :
  103. found = True # at least one role is associated with this id
  104. if item.name == role_name :
  105. return True
  106. if not found :
  107. raise BaseException("id is not associated with any role !")
  108. return False
  109. class Privilege_Manager :
  110. class Privilege_User :
  111. def __init__(self,manager : Privilege_Manager, privileges_data_str : str,is_super_admin : bool = False,id : int = None) -> None:
  112. if not id :
  113. self.__id__ = __generate_id__()
  114. else :
  115. self.__id__ = id
  116. self.is_super_admin : bool = is_super_admin
  117. self.manager = manager
  118. self.roles : set[Privilege_Role] = set()
  119. if privileges_data_str :
  120. privileges_data = json.loads(privileges_data_str)
  121. for key in privileges_data.keys() :
  122. self.roles.add(manager.get_role_by_domain_and_name(key,privileges_data[key]))
  123. def get_roles(self) -> set[Privilege_Role]:
  124. return self.roles
  125. def roles_to_jsonStr(self) -> str :
  126. ret_dict = dict()
  127. for role in self.roles :
  128. domain = self.manager.get_role_domain(role)
  129. ret_dict[domain.name] = role.name
  130. return json.dumps(ret_dict)
  131. @property
  132. def id(self):
  133. return self.__id__
  134. __domains__ : set[Privilege_Domain] = set()
  135. __users__ : set[Privilege_User] = set()
  136. def create_domain(self, name : str, description : str = None) :
  137. for item in self.__domains__ :
  138. if item.name == name :
  139. logger.error("cannot have two privilege domain with the same name !")
  140. raise BaseException("cannot have two privilege domain with the same name !")
  141. domain = Privilege_Domain(name=name,description=description)
  142. self.__domains__.add(domain)
  143. logger.info("domain with name " + name + " created")
  144. def register_domain(self, domain : Privilege_Domain) :
  145. for item in self.__domains__ :
  146. if item.name == domain.name :
  147. logger.error("cannot have two privilege domain with the same name !")
  148. raise BaseException("cannot have two privilege domain with the same name !")
  149. self.__domains__.add(domain)
  150. logger.info("domain with name " + domain.name + " registered")
  151. def remove_domain(self, name : str) :
  152. for item in list(self.__domains__) :
  153. if item.name == name :
  154. self.__domains__.remove(item)
  155. logger.info("domain with name " + name + "removed")
  156. return
  157. logger.error("privilege domain remove failed ! item with name " + name + "not found")
  158. raise BaseException("privilege domain remove failed ! item not found")
  159. @property
  160. def domains(self):
  161. return self.__domains__.copy()
  162. def get_domain_by_name(self, name : str) :
  163. for item in self.__domains__ :
  164. if item.name == name :
  165. return item
  166. raise BaseException("domain not found with this name : " + str(name))
  167. def check_role_by_name(self,domain_name : str, role_name : str,role : Privilege_Role) :
  168. domain = self.get_domain_by_name(domain_name)
  169. return domain.check_role_by_name(role_name,role)
  170. def get_roles_by_ids(self,ids : set[int]) -> set[Privilege_Role]:
  171. ret_roles = set()
  172. for id in ids :
  173. for domain in self.domains :
  174. try :
  175. ret_roles.add(domain.get_role_by_id(id))
  176. except :
  177. pass
  178. return ret_roles
  179. def get_role_by_domain_and_name(self,domain_name : str, role_name : str) :
  180. domain = self.get_domain_by_name(domain_name)
  181. return domain.get_role_by_name(role_name)
  182. def get_roles_by_names(self,roles_names : set[tuple[str,str]]) :
  183. ret_roles = set()
  184. for item in roles_names :
  185. try :
  186. ret_roles.add(self.get_role_by_domain_and_name(item[0],item[1]))
  187. except :
  188. pass
  189. return ret_roles
  190. def is_role_registered(self,role : Privilege_Role) :
  191. for domain in self.domains :
  192. if role in domain.roles :
  193. return True
  194. return False
  195. def get_role_domain(self,role : Privilege_Role) :
  196. for domain in self.domains :
  197. for item in domain.roles :
  198. if item == role :
  199. return domain
  200. logger.error("this role is not associated with any domain")
  201. raise BaseException("this role is not associated with any domain")
  202. def roles_to_ids(roles : list[Privilege_Role]) :
  203. ret = []
  204. for role in roles :
  205. ret.append(role.aggregated_roles_ids)
  206. return ret
  207. #its a DECORATOR
  208. def require_authorization(self,required_role : Privilege_Role, get_privilege_func : callable) :
  209. def wrapper_of_wrap(f) :
  210. def wrap(*args, **kwargs):
  211. if not self.is_role_registered(required_role) :
  212. raise BaseException("required role is not registered anywhere")
  213. logger.debug("checking authorization with roles for function " + f.__name__ + ": " )
  214. logger.debug("required role : " + required_role.name )
  215. privileges : Privilege_Manager.Privilege_User = get_privilege_func()
  216. if privileges.is_super_admin :
  217. logger.debug("user is super admin, access granted")
  218. return f(*args, **kwargs)
  219. for role in privileges.roles : #an user can have multiple roles and each role can include other roles.
  220. logger.debug("checking role : " + role.name)
  221. for aggregated_role in role.aggregated_roles :
  222. if aggregated_role == required_role :
  223. logger.debug("access granted")
  224. return f(*args, **kwargs)
  225. def raiser(*args, **kwargs):
  226. logger.warning("access denied for function :" + f.__name__)
  227. raise PrivilegeError("Access Denied")
  228. return raiser(*args, **kwargs)
  229. #we change the name of the function, for flash bluerprint, as we cannot register two function with the same name.
  230. wrap.__name__ = f.__name__ + "_authorization_wrap"
  231. return wrap
  232. return wrapper_of_wrap
  233. def test() :
  234. manager = Privilege_Manager()
  235. manager.create_domain(name="inventory",description="privilege domain for inventory")
  236. inventory_privilege_domain = manager.get_domain_by_name("inventory")
  237. role1 = Privilege_Role(name="simple")
  238. role2 = Privilege_Role(name="intermediate")
  239. role3 = Privilege_Role(name="speciale")
  240. role4 = Privilege_Role(name="admin")
  241. inventory_privilege_domain.add_role(role1)
  242. inventory_privilege_domain.add_role(role2)
  243. inventory_privilege_domain.add_role(role3)
  244. inventory_privilege_domain.add_role(role4)
  245. user_role_ids = []
  246. def get_roles() :
  247. return user_role_ids
  248. def hello_simple() :
  249. print("hello_simple")
  250. def hello_intermediate() :
  251. print("hello_intermediate")
  252. @manager.require_authorization(role4,get_roles)
  253. def hello_admin(arg1,arg2) :
  254. print("hello_admin")
  255. print(str(arg1))
  256. print(str(arg2))
  257. @manager.require_authorization(role3,get_roles)
  258. def hello_speciale() :
  259. print("hello_speciale")
  260. inventory_privilege_domain.include_role(role4,role1)
  261. inventory_privilege_domain.include_role(role4,role2)
  262. inventory_privilege_domain.include_role(role4,role4)
  263. for item in inventory_privilege_domain.roles :
  264. print(item.name)
  265. try :
  266. hello_admin(23,45)
  267. return False
  268. except Exception as Ex:
  269. print(str(Ex))
  270. user_role_ids.append(role4.id)
  271. hello_admin(23,45)
  272. hello_intermediate()
  273. try :
  274. hello_speciale()
  275. return False
  276. except Exception as Ex:
  277. print(str(Ex))
  278. return True
  279. if __name__ == "__main__" :
  280. test()