privilege_manager.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433
  1. from __future__ import annotations
  2. import json
  3. from itsdangerous import json
  4. __id_counter__ = 0
  5. def __generate_id__() :
  6. global __id_counter__
  7. ret = __id_counter__
  8. __id_counter__ += 1
  9. return ret
  10. class PrivilegeError(Exception):
  11. code = 403
  12. description = "Access Denied - this function cannot be executed with your current level of privilege"
  13. import logging
  14. if __name__ == "__main__" :
  15. from app_logging import logger_name
  16. else :
  17. from utility.app_logging import logger_name
  18. logger = logging.getLogger(logger_name + ".PRIVILEGE")
  19. class Privilege_Role :
  20. def __init__(self, name, description = None, exclusive : bool = False,id = None) :
  21. if not id :
  22. self.__id__ = __generate_id__()
  23. else :
  24. self.__id__
  25. self.name = name
  26. self.exclusive = exclusive
  27. if description :
  28. self.description = description
  29. self.aggregated_roles = set([self])
  30. @property
  31. def id(self):
  32. return self.__id__
  33. @property
  34. def aggregated_roles_ids(self):
  35. ret = list()
  36. for item in self.aggregated_roles :
  37. ret.append(item.id)
  38. return ret
  39. class Privilege_Domain :
  40. def __init__(self, name, description = None,id = None) :
  41. if not id :
  42. self.__id__ = __generate_id__()
  43. else :
  44. self.__id__
  45. self.name = name
  46. self.description = description
  47. self.__roles__ : set[Privilege_Role] = set()
  48. def add_role(self,role : Privilege_Role) :
  49. for tmp_role in self.__roles__ :
  50. if tmp_role.name == role.name :
  51. logger.error("can't have role with the same name in a specific domain")
  52. raise BaseException("can't have role with the same name in a specific domain")
  53. self.__roles__.add(role)
  54. def remove_role(self,role : Privilege_Role) :
  55. self.__roles__.remove(role)
  56. @property
  57. def id(self):
  58. return self.__id__
  59. @property
  60. def roles(self):
  61. return self.__roles__.copy()
  62. def get_role_by_name(self,name : str):
  63. for tmp_role in self.__roles__ :
  64. if tmp_role.name == name :
  65. return tmp_role
  66. def get_role_by_id(self,id : int):
  67. for tmp_role in self.__roles__ :
  68. if tmp_role.id == id :
  69. return tmp_role
  70. #include role 2 in role 1
  71. def include_role(self,role1 : Privilege_Role, role2 : Privilege_Role) :
  72. if role1 not in self.__roles__ or role2 not in self.__roles__ :
  73. logger.error("roles are not in domains.")
  74. raise Exception("roles are not in domains.")
  75. if role1.exclusive :
  76. logger.error("cannot include role, destination role is exclusive")
  77. raise Exception("cannot include role, destination role is exclusive")
  78. role1.aggregated_roles.add(role2)
  79. #uninclude role 2 in role 1
  80. def uninclude_role(self,role1 : Privilege_Role, role2 : Privilege_Role) :
  81. if role1 not in self.__roles__ or role2 not in self.__roles__ :
  82. raise BaseException("roles are not in domains.")
  83. if role1 == role2 :
  84. raise BaseException("cannont include the role of himself")
  85. if role2 in role1.aggregated_roles :
  86. role1.aggregated_roles.remove(role2)
  87. @property
  88. def roles(self):
  89. return self.__roles__.copy()
  90. def check_role_by_id(self,role_id : int,role : Privilege_Role) :
  91. found = False
  92. for item in self.__roles__ :
  93. if item == role :
  94. found = True # at least one role is associated with this id
  95. if item.id == role_id :
  96. return True
  97. if not found :
  98. raise BaseException("id is not associated with any role !")
  99. return False
  100. def check_role_by_name(self,role_name : str,role : Privilege_Role) :
  101. found = False
  102. for item in self.__roles__ :
  103. if item == role :
  104. found = True # at least one role is associated with this id
  105. if item.name == role_name :
  106. return True
  107. if not found :
  108. raise BaseException("id is not associated with any role !")
  109. return False
  110. class Privilege_Manager :
  111. class Privilege_User :
  112. def __init__(self,manager : Privilege_Manager, privileges_data_str : str,is_super_admin : bool = False,id : int = None) -> None:
  113. if not id :
  114. self.__id__ = __generate_id__()
  115. else :
  116. self.__id__ = id
  117. self.is_super_admin : bool = is_super_admin
  118. self.manager = manager
  119. self.roles : set[Privilege_Role] = set()
  120. if privileges_data_str :
  121. privileges_data : dict = json.load(privileges_data_str)
  122. for key in privileges_data.keys() :
  123. self.roles.add(manager.get_role_by_domain_and_name(key,privileges_data[key]))
  124. def get_roles(self) -> set[Privilege_Role]:
  125. return self.roles
  126. def roles_to_jsonStr(self) -> str :
  127. ret_dict = dict()
  128. for role in self.roles :
  129. domain = self.manager.get_role_domain(role)
  130. ret_dict[domain.name] = role.name
  131. return json.dump(json.load(ret_dict))
  132. @property
  133. def id(self):
  134. return self.__id__
  135. __domains__ : set[Privilege_Domain] = set()
  136. __users__ : set[Privilege_User] = set()
  137. def create_domain(self, name : str, description : str = None) :
  138. for item in self.__domains__ :
  139. if item.name == name :
  140. logger.error("cannot have two privilege domain with the same name !")
  141. raise BaseException("cannot have two privilege domain with the same name !")
  142. domain = Privilege_Domain(name=name,description=description)
  143. self.__domains__.add(domain)
  144. logger.info("domain with name " + name + "created")
  145. def register_domain(self, domain : Privilege_Domain) :
  146. for item in self.__domains__ :
  147. if item.name == domain.name :
  148. logger.error("cannot have two privilege domain with the same name !")
  149. raise BaseException("cannot have two privilege domain with the same name !")
  150. self.__domains__.add(domain)
  151. logger.info("domain with name " + domain.name + "created")
  152. def remove_domain(self, name : str) :
  153. for item in list(self.__domains__) :
  154. if item.name == name :
  155. self.__domains__.remove(item)
  156. logger.info("domain with name " + name + "removed")
  157. return
  158. logger.error("privilege domain remove failed ! item with name " + name + "not found")
  159. raise BaseException("privilege domain remove failed ! item not found")
  160. @property
  161. def domains(self):
  162. return self.__domains__.copy()
  163. def get_domain_by_name(self, name : str) :
  164. for item in self.__domains__ :
  165. if item.name == name :
  166. return item
  167. raise BaseException("domain not found with this name : " + str(name))
  168. def check_role_by_name(self,domain_name : str, role_name : str,role : Privilege_Role) :
  169. domain = self.get_domain_by_name(domain_name)
  170. return domain.check_role_by_name(role_name,role)
  171. def get_roles_by_ids(self,ids : set[int]) -> set[Privilege_Role]:
  172. ret_roles = set()
  173. for id in ids :
  174. for domain in self.domains :
  175. try :
  176. ret_roles.add(domain.get_role_by_id(id))
  177. except :
  178. pass
  179. return ret_roles
  180. def get_role_by_domain_and_name(self,domain_name : str, role_name : str) :
  181. domain = self.get_domain_by_name(domain_name)
  182. return domain.get_role_by_name(role_name)
  183. def get_roles_by_names(self,roles_names : set[tuple[str,str]]) :
  184. ret_roles = set()
  185. for item in roles_names :
  186. try :
  187. ret_roles.add(self.get_role_by_domain_and_name(item[0],item[1]))
  188. except :
  189. pass
  190. return ret_roles
  191. def is_role_registered(self,role : Privilege_Role) :
  192. for domain in self.domains :
  193. if role in domain.roles :
  194. return True
  195. return False
  196. def get_role_domain(self,role : Privilege_Role) :
  197. for domain in self.domains :
  198. for item in domain.roles :
  199. if item == role :
  200. return domain
  201. logger.error("this role is not associated with any domain")
  202. raise BaseException("this role is not associated with any domain")
  203. def roles_to_ids(roles : list[Privilege_Role]) :
  204. ret = []
  205. for role in roles :
  206. ret.append(role.aggregated_roles_ids)
  207. return ret
  208. #its a DECORATOR
  209. def require_authorization(self,required_role : Privilege_Role, get_privilege_func : callable) :
  210. def wrapper_of_wrap(f) :
  211. def wrap(*args, **kwargs):
  212. if not self.is_role_registered(required_role) :
  213. raise BaseException("role is not registered anywhere")
  214. logger.debug("checking authorization with roles for function " + f.__name__ + ": " )
  215. logger.debug("required role : " + required_role.name )
  216. privileges : Privilege_Manager.Privilege_User = get_privilege_func()
  217. if privileges.is_super_admin :
  218. logger.debug("user is super admin, access granted")
  219. return f(*args, **kwargs)
  220. for role in privileges.roles : #an user can have multiple roles and each role can include other roles.
  221. logger.debug("checking role : " + role.name)
  222. for aggregated_role in role.aggregated_roles :
  223. if aggregated_role == required_role :
  224. logger.debug("access granted")
  225. return f(*args, **kwargs)
  226. def raiser(*args, **kwargs):
  227. logger.warning("access denied for function :" + f.__name__)
  228. raise PrivilegeError("Access Denied")
  229. return raiser(*args, **kwargs)
  230. return wrap
  231. return wrapper_of_wrap
  232. def test() :
  233. manager = Privilege_Manager()
  234. manager.create_domain(name="inventory",description="privilege domain for inventory")
  235. inventory_privilege_domain = manager.get_domain_by_name("inventory")
  236. role1 = Privilege_Role(name="simple")
  237. role2 = Privilege_Role(name="intermediate")
  238. role3 = Privilege_Role(name="speciale")
  239. role4 = Privilege_Role(name="admin")
  240. inventory_privilege_domain.add_role(role1)
  241. inventory_privilege_domain.add_role(role2)
  242. inventory_privilege_domain.add_role(role3)
  243. inventory_privilege_domain.add_role(role4)
  244. user_role_ids = []
  245. def get_roles() :
  246. return user_role_ids
  247. def hello_simple() :
  248. print("hello_simple")
  249. def hello_intermediate() :
  250. print("hello_intermediate")
  251. @manager.require_authorization(role4,get_roles)
  252. def hello_admin(arg1,arg2) :
  253. print("hello_admin")
  254. print(str(arg1))
  255. print(str(arg2))
  256. @manager.require_authorization(role3,get_roles)
  257. def hello_speciale() :
  258. print("hello_speciale")
  259. inventory_privilege_domain.include_role(role4,role1)
  260. inventory_privilege_domain.include_role(role4,role2)
  261. inventory_privilege_domain.include_role(role4,role4)
  262. for item in inventory_privilege_domain.roles :
  263. print(item.name)
  264. try :
  265. hello_admin(23,45)
  266. return False
  267. except Exception as Ex:
  268. print(str(Ex))
  269. user_role_ids.append(role4.id)
  270. hello_admin(23,45)
  271. hello_intermediate()
  272. try :
  273. hello_speciale()
  274. return False
  275. except Exception as Ex:
  276. print(str(Ex))
  277. return True
  278. if __name__ == "__main__" :
  279. test()