123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- import Model.isp_model as isp_model
- from flask import session,request,jsonify
- import Model.model_manager as model_manager
- from werkzeug.security import check_password_hash,generate_password_hash
- import persistence
- import json
- import View.view_privilege as privileges
- import logging
- from utility.app_logging import logger_name
- import jwt
- import time
- logger = logging.getLogger(logger_name + ".VIEW")
- __api_login_url__ = "/api/login"
- __id_counter__ : int = 1
- def define_basic_api(app) :
- def generate_auth_token(user_id, expires_in = 600):
- return jwt.encode({ 'id': user_id, 'exp': time.time() + expires_in },
- key="caca", algorithm='HS256')
- def verify_auth_token(token):
- data = jwt.decode(token, key="caca", algorithms=['HS256'])
- with persistence.get_Session_Instance() as sess :
- user = sess.query(isp_model.user_account).filter(isp_model.user_account.id == data['id']).first()
- return user
- @app.before_request
- def before_request_func():
- global __id_counter__
- logger.debug("before_request processing")
- logger.debug("request from " + request.remote_addr)
- logger.debug("request header" + str(request.headers.__dict__))
- if request.json :
- logger.debug("request json body : " + str(request.json))
- if not request.path == __api_login_url__ and request.method != "OPTIONS":
- auth_header_value = request.headers.get('x-Auth-Token', None)
- if not auth_header_value:
- raise Exception("request does not have a authentification token")
- logger.debug("authorization token : " + auth_header_value)
- user : isp_model.user_account = verify_auth_token(auth_header_value)
- logger.debug("user authenficated by token : " + user.nickname + " with id " + str(user.id))
- session["user_id"] = user.id
- #logger.warning("Unauthorized client with ip " + str(request.origin) + " try to access application")
- #resp = jsonify({'message' : 'Unauthorized access, request is not from and authentificated user.'})
- #resp.status_code = 401
- #return resp
- @app.route(__api_login_url__,methods = ['POST'])
- def login():
- _json = request.json
- _username = _json['username']
- _password = _json['password']
- with persistence.get_Session_Instance() as sess :
- Item = sess.query(isp_model.user_account).filter(isp_model.user_account.nickname == _username).first()
- if not isinstance(Item,isp_model.user_account) :
- logger.warning("user tried to login with unknown account name : " + _username)
- resp = jsonify({'message' : 'Bad Request - user account not found'})
- resp.status_code = 400
- return resp
- if not check_password_hash(Item.password,_password) :
- logger.warning("user with account name '" + _username + "' tried to login with invalid password")
- resp = jsonify({'message' : 'Bad Request - invalid password for this account'})
- resp.status_code = 400
- return resp
- session["username"] = _username
- session["user_id"] = Item.id
- logger.info("account " + _username + " logged IN successfully with origin : " + str(request.origin))
- resp = jsonify({'message' : 'login successful', "token" : generate_auth_token(Item.id) })
- resp.status_code = 200
- return resp
- @app.route('/api/logout',methods = ['DELETE'])
- def logout():
- logger.info("account " + session["username"] + " logged OUT with id : " + session["user_id"])
- session.clear()
- return jsonify('logout')
- @app.route('/api/me',methods = ['GET'])
- def user_description():
- with persistence.get_Session_Instance() as sess :
- item : isp_model.user_account = sess.query(isp_model.user_account).filter(isp_model.user_account.id == session["user_id"]).first()
- json_string = model_manager.ModelObjectToJsonString(item)
- json_dict : dict = json.loads(json_string)
- json_dict.pop("password") # removing the password item for security
- json_string = json.dumps(json_dict)
- return jsonify(json_dict)
- @app.route('/api/password',methods = ['POST'])
- def change_password():
- _json = request.json
- _old_password = _json['old_password']
- _password = _json['new_password']
- with persistence.get_Session_Instance() as sess :
- Item : isp_model.user_account = sess.query(isp_model.user_account).filter(isp_model.user_account.id == session["user_account_id"]).first()
- if not check_password_hash(Item.password,_password) :
- raise Exception("old password is incorrect")
- Item.password = generate_password_hash(_password)
- sess.commit()
- return jsonify('password changed')
- @app.route('/routes',methods = ['GET'])
- def routes():
- routes = []
- for route in app.url_map.iter_rules():
- routes.append('%s' % route)
- return jsonify(routes)
|