privilege_manager.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. from __future__ import annotations
  2. import functools
  3. __id_counter__ = 0
  4. def __generate_id__() :
  5. global __id_counter__
  6. ret = __id_counter__
  7. __id_counter__ += 1
  8. return ret
  9. class APIAuthError(Exception):
  10. code = 403
  11. description = "Authentication Error"
  12. import logging
  13. if __name__ == "__main__" :
  14. from app_logging import logger_name
  15. else :
  16. from utility.app_logging import logger_name
  17. logger = logging.getLogger(logger_name + ".PRIVILEGE")
  18. class Privilege_Role :
  19. def __init__(self, name, description = None, exclusive : bool = False,id = None) :
  20. if not id :
  21. self.__id__ = __generate_id__()
  22. else :
  23. self.__id__
  24. self.name = name
  25. self.exclusive = exclusive
  26. if description :
  27. self.description = description
  28. self.aggregated_roles = set([self])
  29. @property
  30. def id(self):
  31. return self.__id__
  32. @property
  33. def aggregated_roles_ids(self):
  34. ret = list()
  35. for item in self.aggregated_roles :
  36. ret.append(item.id)
  37. return ret
  38. class Privilege_Domain :
  39. def __init__(self, name, description = None,id = None) :
  40. if not id :
  41. self.__id__ = __generate_id__()
  42. else :
  43. self.__id__
  44. self.name = name
  45. self.description = description
  46. self.__roles__ : set[Privilege_Role] = set()
  47. def add_role(self,role : Privilege_Role) :
  48. for tmp_role in self.__roles__ :
  49. if tmp_role.name == role.name :
  50. raise BaseException("can't have role with the same name in a specific domain")
  51. self.__roles__.add(role)
  52. def remove_role(self,role : Privilege_Role) :
  53. self.__roles__.remove(role)
  54. @property
  55. def id(self):
  56. return self.__id__
  57. @property
  58. def roles(self):
  59. return self.__roles__.copy()
  60. def get_role_by_name(self,name : str):
  61. for tmp_role in self.__roles__ :
  62. if tmp_role.name == name :
  63. return tmp_role
  64. def get_role_by_id(self,id : int):
  65. for tmp_role in self.__roles__ :
  66. if tmp_role.id == id :
  67. return tmp_role
  68. #include role 2 in role 1
  69. def include_role(self,role1 : Privilege_Role, role2 : Privilege_Role) :
  70. if role1 not in self.__roles__ or role2 not in self.__roles__ :
  71. raise BaseException("roles are not in domains.")
  72. if role1.exclusive :
  73. raise "cannot include role, destination role is exclusive"
  74. role1.aggregated_roles.add(role2)
  75. #uninclude role 2 in role 1
  76. def uninclude_role(self,role1 : Privilege_Role, role2 : Privilege_Role) :
  77. if role1 not in self.__roles__ or role2 not in self.__roles__ :
  78. raise BaseException("roles are not in domains.")
  79. if role1 == role2 :
  80. raise BaseException("cannont include the role of himself")
  81. if role2 in role1.aggregated_roles :
  82. role1.aggregated_roles.remove(role2)
  83. @property
  84. def roles(self):
  85. return self.__roles__.copy()
  86. def check_role_by_id(self,role_id : int,role : Privilege_Role) :
  87. found = False
  88. for item in self.__roles__ :
  89. if item == role :
  90. found = True # at least one role is associated with this id
  91. if item.id == role_id :
  92. return True
  93. if not found :
  94. raise BaseException("id is not associated with any role !")
  95. return False
  96. def check_role_by_name(self,role_name : str,role : Privilege_Role) :
  97. found = False
  98. for item in self.__roles__ :
  99. if item == role :
  100. found = True # at least one role is associated with this id
  101. if item.name == role_name :
  102. return True
  103. if not found :
  104. raise BaseException("id is not associated with any role !")
  105. return False
  106. class Privilege_Manager :
  107. __domains__ : set[Privilege_Domain] = set()
  108. def create_domain(self, name : str, description : str = None) :
  109. for item in self.__domains__ :
  110. if item.name == name :
  111. raise BaseException("cannot have two privilege domain with the same name !")
  112. domain = Privilege_Domain(name=name,description=description)
  113. self.__domains__.add(domain)
  114. def remove_domain(self, name : str) :
  115. for item in list(self.__domains__) :
  116. if item.name == name :
  117. self.__domains__.remove(item)
  118. return
  119. raise BaseException("privilege domain remove failed ! item not found")
  120. @property
  121. def domains(self):
  122. return self.__domains__.copy()
  123. def get_domain_by_name(self, name : str) :
  124. for item in self.__domains__ :
  125. if item.name == name :
  126. return item
  127. raise BaseException("domain not found with this name : " + str(name))
  128. def check_role_by_name(self,domain_name : str, role_name : str,role : Privilege_Role) :
  129. domain = self.get_domain_by_name(domain_name)
  130. return domain.check_role_by_name(role_name,role)
  131. def get_roles_by_ids(self,ids : set[int]) -> set[Privilege_Role]:
  132. ret_roles = set()
  133. for id in ids :
  134. for domain in self.domains :
  135. try :
  136. ret_roles.add(domain.get_role_by_id(id))
  137. except :
  138. pass
  139. return ret_roles
  140. def get_role_by_domain_and_name(self,domain_name : str, role_name : str) :
  141. domain = self.get_domain_by_name(domain_name)
  142. return domain.get_role_by_name(role_name)
  143. def get_roles_by_names(self,roles_names : set[tuple[str,str]]) :
  144. ret_roles = set()
  145. for item in roles_names :
  146. try :
  147. ret_roles.add(self.get_role_by_domain_and_name(item[0],item[1]))
  148. except :
  149. pass
  150. return ret_roles
  151. def is_role_registered(self,role : Privilege_Role) :
  152. for domain in self.domains :
  153. if role in domain.roles :
  154. return True
  155. return False
  156. def get_role_domain(self,role : Privilege_Role) :
  157. for domain in self.domains :
  158. for item in domain.roles :
  159. if item == role :
  160. return domain
  161. raise BaseException("this role is not associated with any domain")
  162. #its a DECORATOR
  163. def require_authorization(self,required_role : Privilege_Role, ids_getter : callable) :
  164. def wrapper_of_wrap(f) :
  165. def wrap(*args, **kwargs):
  166. if not self.is_role_registered(required_role) :
  167. raise BaseException("role is not registered everywhere")
  168. logger.debug("checking authorization with roles for function " + f.__name__ + ": " )
  169. logger.debug("required role : " + required_role.name )
  170. for role in self.get_roles_by_ids(ids_getter()) : #an user can have multiple roles and each role can include other roles.
  171. logger.debug("checking role : " + role.name)
  172. for aggregated_role in role.aggregated_roles :
  173. if aggregated_role == required_role :
  174. return f(*args, **kwargs)
  175. def raiser(*args, **kwargs):
  176. logger.warning("access denied for function :" + f.__name__)
  177. raise APIAuthError("Access Denied")
  178. return raiser(*args, **kwargs)
  179. return wrap
  180. return wrapper_of_wrap
  181. def test() :
  182. manager = Privilege_Manager()
  183. manager.create_domain(name="inventory",description="privilege domain for inventory")
  184. inventory_privilege_domain = manager.get_domain_by_name("inventory")
  185. role1 = Privilege_Role(name="simple")
  186. role2 = Privilege_Role(name="intermediate")
  187. role3 = Privilege_Role(name="speciale")
  188. role4 = Privilege_Role(name="admin")
  189. inventory_privilege_domain.add_role(role1)
  190. inventory_privilege_domain.add_role(role2)
  191. inventory_privilege_domain.add_role(role3)
  192. inventory_privilege_domain.add_role(role4)
  193. user_role_ids = []
  194. def get_roles() :
  195. return user_role_ids
  196. def hello_simple() :
  197. print("hello_simple")
  198. def hello_intermediate() :
  199. print("hello_intermediate")
  200. @manager.require_authorization(role4,get_roles)
  201. def hello_admin(arg1,arg2) :
  202. print("hello_admin")
  203. print(str(arg1))
  204. print(str(arg2))
  205. @manager.require_authorization(role3,get_roles)
  206. def hello_speciale() :
  207. print("hello_speciale")
  208. inventory_privilege_domain.include_role(role4,role1)
  209. inventory_privilege_domain.include_role(role4,role2)
  210. inventory_privilege_domain.include_role(role4,role4)
  211. for item in inventory_privilege_domain.roles :
  212. print(item.name)
  213. try :
  214. hello_admin(23,45)
  215. except Exception as Ex:
  216. print(str(Ex))
  217. user_role_ids.append(role4.id)
  218. hello_admin(23,45)
  219. hello_intermediate()
  220. hello_speciale()
  221. if __name__ == "__main__" :
  222. test()