|
- #
- # ---- persistence global module ----
- #
- # this module has the purpose to manage the database (sqlite) general mechanism (creating, accessing, etc...)
- #
- import logging
- from utility.app_logging import logger_name
- from sqlalchemy import create_engine
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy.orm.session import sessionmaker
- from sqlalchemy.orm import scoped_session
- from sqlalchemy.pool import SingletonThreadPool
- from sqlalchemy_utils import database_exists, create_database
- logger = logging.getLogger(logger_name + ".PERSISTENCE")
- __db_engine = None
- __db_session = None
- __sqlite_db_path = "OpenISP.db"
- __sqlite_db_pattern = 'sqlite:///' + __sqlite_db_path
- __initialized = False
- __db_Base = declarative_base()
- def get_session():
- if not __initialized:
- logger.error(
- "can't get session, persistence engine is not initialized")
- raise "can't get session, persistence engine is not initialized"
- return __db_session
- def get_Session_Instance():
- if not __initialized:
- logger.error(
- "can't get session, persistence engine is not initialized")
- raise "can't get session, persistence engine is not initialized"
- session_instance = __db_session()
- return session_instance
- def get_db_base():
- if __initialized:
- logger.error(
- "can't get declarative Base, persistence engine is already initialized")
- raise "can't get declarative Base, persistence engine is already initialized"
- return __db_Base
- def is_init():
- return __initialized
- def init():
- logger.debug("creating db engine....")
- global __db_engine
- __db_engine = create_engine(__sqlite_db_pattern,
- connect_args={'check_same_thread': False}) # Warning that's not so safe.
- # from https://stackoverflow.com/questions/2614984/sqlite-sqlalchemy-how-to-enforce-foreign-keys
- def _fk_pragma_on_connect(dbapi_con, con_record):
- dbapi_con.execute('pragma foreign_keys=ON')
- from sqlalchemy import event
- event.listen(__db_engine, 'connect', _fk_pragma_on_connect)
- # enforce sqlite foreign keys constraint option
- __db_engine.execute('pragma foreign_keys=on')
- logger.info("Database Engine Created.")
- logger.debug("building declarative model...")
- import Model.model_manager
- Model.model_manager.declare_model()
- logger.info("declarative model built.")
- if not database_exists(__db_engine.url):
- logger.info("Database is not created, creating one...")
- create_database(__db_engine.url)
- logger.info("Done !")
- __db_Base.metadata.create_all(__db_engine)
- if is_sane_database(__db_Base, __db_engine):
- logger.info("database is compliant to the model")
- else:
- logger.error("database is NOT compliant to the model")
- raise Exception("database is not compliant to the model !")
- global __db_session
- __db_session = scoped_session(sessionmaker(bind=__db_engine))
- global __initialized
- __initialized = True
- logger.info("Persistence Session started.")
- def is_sane_database(Base, engine):
- from sqlalchemy import inspect
- from sqlalchemy.orm.clsregistry import _ModuleMarker
- from sqlalchemy.orm import RelationshipProperty
- iengine = inspect(engine)
- current_db_tables = iengine.get_table_names()
- model_tables = Base.metadata.sorted_tables
- for model_table in model_tables:
- logger.debug("table '" + model_table.name +
- "' in model, checking in database...")
- table_found = False
- for db_table in current_db_tables:
- logger.debug("checking db table : " + db_table)
- if db_table == model_table.name:
- table_found = True
- logger.debug("model table found in db : " + model_table.name)
- break
- if not table_found:
- logger.error("model table not found in db : " + model_table.name)
- return False
- logger.debug(
- "now checking columns informations between model and database for table : " + model_table.name)
- for model_col in model_table.columns:
- logger.debug("checking model column : " + model_col.name)
- col_found = False
- for db_col in iengine.get_columns(model_table.name):
- logger.debug("with db table column : " + db_col["name"])
- if model_col.name == db_col["name"]:
- # TODO type checking and other attributes
- """
- #checking type (model type are formatted like "VARCHAR(500)")
- # (db type are formatted like "VARCHAR(length=500)")
- print(type(model_col.type))
- str1 = re.sub(r'[A-Z]+', ' ', str(model_col.type))
- str2 = re.sub(r'[A-Z]+', ' ', db_col["type"])
- if str1 != str2 :
- logger.debug("table column : '" + db_col["name"] + "' does not have the same type.")
- """
- col_found = True
- logger.debug(
- "table column : '" + db_col["name"] + "' found and compliant to the model table informations")
- break
- if not col_found:
- logger.error("column '" + model_col.name +
- "' in table '" + model_table.name + "'not found in db ")
- return False
- # checking name and type
- return True
- def wipeout_database():
- import os
- os.remove(__sqlite_db_path)
- # TODO adapt to program, find what is an inspector.
- def drop_all_tables(engine, inspector, schema=None, include_names=None):
- from sqlalchemy import Column, Table, Integer, MetaData, \
- ForeignKeyConstraint
- from sqlalchemy.schema import DropTable, DropConstraint
- if include_names is not None:
- include_names = set(include_names)
- with engine.connect() as conn:
- for tname, fkcs in reversed(
- inspector.get_sorted_table_and_fkc_names(schema=schema)):
- if tname:
- if include_names is not None and tname not in include_names:
- continue
- conn.execute(DropTable(
- Table(tname, MetaData(), schema=schema)
- ))
- elif fkcs:
- if not engine.dialect.supports_alter:
- continue
- for tname, fkc in fkcs:
- if include_names is not None and \
- tname not in include_names:
- continue
- tb = Table(
- tname, MetaData(),
- Column('x', Integer),
- Column('y', Integer),
- schema=schema
- )
- conn.execute(DropConstraint(
- ForeignKeyConstraint(
- [tb.c.x], [tb.c.y], name=fkc)
- ))
|