privilege_manager.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. from __future__ import annotations
  2. import functools
  3. __id_counter__ = 0
  4. def __generate_id__() :
  5. global __id_counter__
  6. ret = __id_counter__
  7. __id_counter__ += 1
  8. return ret
  9. class PrivilegeError(Exception):
  10. code = 403
  11. description = "Access Denied - this function cannot be executed with your current level of privilege"
  12. import logging
  13. if __name__ == "__main__" :
  14. from app_logging import logger_name
  15. else :
  16. from utility.app_logging import logger_name
  17. logger = logging.getLogger(logger_name + ".PRIVILEGE")
  18. class Privilege_Role :
  19. def __init__(self, name, description = None, exclusive : bool = False,id = None) :
  20. if not id :
  21. self.__id__ = __generate_id__()
  22. else :
  23. self.__id__
  24. self.name = name
  25. self.exclusive = exclusive
  26. if description :
  27. self.description = description
  28. self.aggregated_roles = set([self])
  29. @property
  30. def id(self):
  31. return self.__id__
  32. @property
  33. def aggregated_roles_ids(self):
  34. ret = list()
  35. for item in self.aggregated_roles :
  36. ret.append(item.id)
  37. return ret
  38. class Privilege_Domain :
  39. def __init__(self, name, description = None,id = None) :
  40. if not id :
  41. self.__id__ = __generate_id__()
  42. else :
  43. self.__id__
  44. self.name = name
  45. self.description = description
  46. self.__roles__ : set[Privilege_Role] = set()
  47. def add_role(self,role : Privilege_Role) :
  48. for tmp_role in self.__roles__ :
  49. if tmp_role.name == role.name :
  50. logger.error("can't have role with the same name in a specific domain")
  51. raise BaseException("can't have role with the same name in a specific domain")
  52. self.__roles__.add(role)
  53. def remove_role(self,role : Privilege_Role) :
  54. self.__roles__.remove(role)
  55. @property
  56. def id(self):
  57. return self.__id__
  58. @property
  59. def roles(self):
  60. return self.__roles__.copy()
  61. def get_role_by_name(self,name : str):
  62. for tmp_role in self.__roles__ :
  63. if tmp_role.name == name :
  64. return tmp_role
  65. def get_role_by_id(self,id : int):
  66. for tmp_role in self.__roles__ :
  67. if tmp_role.id == id :
  68. return tmp_role
  69. #include role 2 in role 1
  70. def include_role(self,role1 : Privilege_Role, role2 : Privilege_Role) :
  71. if role1 not in self.__roles__ or role2 not in self.__roles__ :
  72. logger.error("roles are not in domains.")
  73. raise Exception("roles are not in domains.")
  74. if role1.exclusive :
  75. logger.error("cannot include role, destination role is exclusive")
  76. raise Exception("cannot include role, destination role is exclusive")
  77. role1.aggregated_roles.add(role2)
  78. #uninclude role 2 in role 1
  79. def uninclude_role(self,role1 : Privilege_Role, role2 : Privilege_Role) :
  80. if role1 not in self.__roles__ or role2 not in self.__roles__ :
  81. raise BaseException("roles are not in domains.")
  82. if role1 == role2 :
  83. raise BaseException("cannont include the role of himself")
  84. if role2 in role1.aggregated_roles :
  85. role1.aggregated_roles.remove(role2)
  86. @property
  87. def roles(self):
  88. return self.__roles__.copy()
  89. def check_role_by_id(self,role_id : int,role : Privilege_Role) :
  90. found = False
  91. for item in self.__roles__ :
  92. if item == role :
  93. found = True # at least one role is associated with this id
  94. if item.id == role_id :
  95. return True
  96. if not found :
  97. raise BaseException("id is not associated with any role !")
  98. return False
  99. def check_role_by_name(self,role_name : str,role : Privilege_Role) :
  100. found = False
  101. for item in self.__roles__ :
  102. if item == role :
  103. found = True # at least one role is associated with this id
  104. if item.name == role_name :
  105. return True
  106. if not found :
  107. raise BaseException("id is not associated with any role !")
  108. return False
  109. class Privilege_Manager :
  110. __domains__ : set[Privilege_Domain] = set()
  111. def create_domain(self, name : str, description : str = None) :
  112. for item in self.__domains__ :
  113. if item.name == name :
  114. logger.error("cannot have two privilege domain with the same name !")
  115. raise BaseException("cannot have two privilege domain with the same name !")
  116. domain = Privilege_Domain(name=name,description=description)
  117. self.__domains__.add(domain)
  118. logger.info("domain with name " + name + "created")
  119. def register_domain(self, domain : Privilege_Domain) :
  120. for item in self.__domains__ :
  121. if item.name == domain.name :
  122. logger.error("cannot have two privilege domain with the same name !")
  123. raise BaseException("cannot have two privilege domain with the same name !")
  124. self.__domains__.add(domain)
  125. logger.info("domain with name " + domain.name + "created")
  126. def remove_domain(self, name : str) :
  127. for item in list(self.__domains__) :
  128. if item.name == name :
  129. self.__domains__.remove(item)
  130. logger.info("domain with name " + name + "removed")
  131. return
  132. logger.error("privilege domain remove failed ! item with name " + name + "not found")
  133. raise BaseException("privilege domain remove failed ! item not found")
  134. @property
  135. def domains(self):
  136. return self.__domains__.copy()
  137. def get_domain_by_name(self, name : str) :
  138. for item in self.__domains__ :
  139. if item.name == name :
  140. return item
  141. raise BaseException("domain not found with this name : " + str(name))
  142. def check_role_by_name(self,domain_name : str, role_name : str,role : Privilege_Role) :
  143. domain = self.get_domain_by_name(domain_name)
  144. return domain.check_role_by_name(role_name,role)
  145. def get_roles_by_ids(self,ids : set[int]) -> set[Privilege_Role]:
  146. ret_roles = set()
  147. for id in ids :
  148. for domain in self.domains :
  149. try :
  150. ret_roles.add(domain.get_role_by_id(id))
  151. except :
  152. pass
  153. return ret_roles
  154. def get_role_by_domain_and_name(self,domain_name : str, role_name : str) :
  155. domain = self.get_domain_by_name(domain_name)
  156. return domain.get_role_by_name(role_name)
  157. def get_roles_by_names(self,roles_names : set[tuple[str,str]]) :
  158. ret_roles = set()
  159. for item in roles_names :
  160. try :
  161. ret_roles.add(self.get_role_by_domain_and_name(item[0],item[1]))
  162. except :
  163. pass
  164. return ret_roles
  165. def is_role_registered(self,role : Privilege_Role) :
  166. for domain in self.domains :
  167. if role in domain.roles :
  168. return True
  169. return False
  170. def get_role_domain(self,role : Privilege_Role) :
  171. for domain in self.domains :
  172. for item in domain.roles :
  173. if item == role :
  174. return domain
  175. logger.error("this role is not associated with any domain")
  176. raise BaseException("this role is not associated with any domain")
  177. def roles_to_ids(roles : list[Privilege_Role]) :
  178. ret = []
  179. for role in roles :
  180. ret.append(role.aggregated_roles_ids)
  181. return ret
  182. #its a DECORATOR
  183. def require_authorization(self,required_role : Privilege_Role, ids_getter : callable) :
  184. def wrapper_of_wrap(f) :
  185. def wrap(*args, **kwargs):
  186. if not self.is_role_registered(required_role) :
  187. raise BaseException("role is not registered anywhere")
  188. logger.debug("checking authorization with roles for function " + f.__name__ + ": " )
  189. logger.debug("required role : " + required_role.name )
  190. for role in self.get_roles_by_ids(ids_getter()) : #an user can have multiple roles and each role can include other roles.
  191. logger.debug("checking role : " + role.name)
  192. for aggregated_role in role.aggregated_roles :
  193. if aggregated_role == required_role :
  194. return f(*args, **kwargs)
  195. def raiser(*args, **kwargs):
  196. logger.warning("access denied for function :" + f.__name__)
  197. raise PrivilegeError("Access Denied")
  198. return raiser(*args, **kwargs)
  199. return wrap
  200. return wrapper_of_wrap
  201. def test() :
  202. manager = Privilege_Manager()
  203. manager.create_domain(name="inventory",description="privilege domain for inventory")
  204. inventory_privilege_domain = manager.get_domain_by_name("inventory")
  205. role1 = Privilege_Role(name="simple")
  206. role2 = Privilege_Role(name="intermediate")
  207. role3 = Privilege_Role(name="speciale")
  208. role4 = Privilege_Role(name="admin")
  209. inventory_privilege_domain.add_role(role1)
  210. inventory_privilege_domain.add_role(role2)
  211. inventory_privilege_domain.add_role(role3)
  212. inventory_privilege_domain.add_role(role4)
  213. user_role_ids = []
  214. def get_roles() :
  215. return user_role_ids
  216. def hello_simple() :
  217. print("hello_simple")
  218. def hello_intermediate() :
  219. print("hello_intermediate")
  220. @manager.require_authorization(role4,get_roles)
  221. def hello_admin(arg1,arg2) :
  222. print("hello_admin")
  223. print(str(arg1))
  224. print(str(arg2))
  225. @manager.require_authorization(role3,get_roles)
  226. def hello_speciale() :
  227. print("hello_speciale")
  228. inventory_privilege_domain.include_role(role4,role1)
  229. inventory_privilege_domain.include_role(role4,role2)
  230. inventory_privilege_domain.include_role(role4,role4)
  231. for item in inventory_privilege_domain.roles :
  232. print(item.name)
  233. try :
  234. hello_admin(23,45)
  235. except Exception as Ex:
  236. print(str(Ex))
  237. user_role_ids.append(role4.id)
  238. hello_admin(23,45)
  239. hello_intermediate()
  240. hello_speciale()
  241. if __name__ == "__main__" :
  242. test()