import Model.isp_model as isp_model from flask import session,request,jsonify import Model.model_manager as model_manager from werkzeug.security import check_password_hash,generate_password_hash import persistence import json import View.view_privilege as privileges import logging from utility.app_logging import logger_name import jwt import time logger = logging.getLogger(logger_name + ".VIEW") __api_login_url__ = "/api/login" __id_counter__ : int = 1 def define_basic_api(app) : def generate_auth_token(user_id, expires_in = 600): return jwt.encode({ 'id': user_id, 'exp': time.time() + expires_in }, key="caca", algorithm='HS256') def verify_auth_token(token): data = jwt.decode(token, key="caca", algorithms=['HS256']) with persistence.get_Session_Instance() as sess : user = sess.query(isp_model.user_account).filter(isp_model.user_account.id == data['id']).first() return user @app.before_request def before_request_func(): global __id_counter__ logger.debug("before_request processing") logger.debug("request from " + request.remote_addr) logger.debug("request header" + str(request.headers.__dict__)) if request.json : logger.debug("request json body : " + str(request.json)) if not request.path == __api_login_url__ and request.method != "OPTIONS": auth_header_value = request.headers.get('x-Auth-Token', None) if not auth_header_value: raise Exception("request does not have a authentification token") logger.debug("authorization token : " + auth_header_value) user : isp_model.user_account = verify_auth_token(auth_header_value) logger.debug("user authenficated by token : " + user.nickname + " with id " + str(user.id)) session["user_id"] = user.id #logger.warning("Unauthorized client with ip " + str(request.origin) + " try to access application") #resp = jsonify({'message' : 'Unauthorized access, request is not from and authentificated user.'}) #resp.status_code = 401 #return resp @app.route(__api_login_url__,methods = ['POST']) def login(): _json = request.json _username = _json['username'] _password = _json['password'] with persistence.get_Session_Instance() as sess : Item = sess.query(isp_model.user_account).filter(isp_model.user_account.nickname == _username).first() if not isinstance(Item,isp_model.user_account) : logger.warning("user tried to login with unknown account name : " + _username) resp = jsonify({'message' : 'Bad Request - user account not found'}) resp.status_code = 400 return resp if not check_password_hash(Item.password,_password) : logger.warning("user with account name '" + _username + "' tried to login with invalid password") resp = jsonify({'message' : 'Bad Request - invalid password for this account'}) resp.status_code = 400 return resp session["username"] = _username session["user_account_id"] = Item.id logger.info("account " + _username + " logged IN successfully with origin : " + str(request.origin)) resp = jsonify({'message' : 'login successful', "token" : generate_auth_token(Item.id) }) resp.status_code = 200 return resp @app.route('/api/logout',methods = ['DELETE']) def logout(): logger.info("account " + session["username"] + " logged OUT with id : " + session["user_id"]) session.clear() return jsonify('logout') @app.route('/api/me',methods = ['GET']) def user_description(): with persistence.get_Session_Instance() as sess : item : isp_model.user_account = sess.query(isp_model.user_account).filter(isp_model.user_account.id == session["client_id"]).first() json_string = model_manager.ModelObjectToJsonString(item) json_dict : dict = json.loads(json_string) json_dict.pop("password") # removing the password item for security json_string = json.dumps(json_dict) return jsonify(json_dict) @app.route('/api/password',methods = ['POST']) def change_password(): _json = request.json _old_password = _json['old_password'] _password = _json['new_password'] with persistence.get_Session_Instance() as sess : Item : isp_model.user_account = sess.query(isp_model.user_account).filter(isp_model.user_account.id == session["user_account_id"]).first() if not check_password_hash(Item.password,_password) : raise Exception("old password is incorrect") Item.password = generate_password_hash(_password) sess.commit() return jsonify('password changed') @app.route('/routes',methods = ['GET']) def routes(): routes = [] for route in app.url_map.iter_rules(): routes.append('%s' % route) return jsonify(routes)