# # ---- persistence global module ---- # # this module has the purpose to manage the database (sqlite) general mechanism (creating, accessing, etc...) # import logging from utility.app_logging import logger_name from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm.session import sessionmaker from sqlalchemy.orm import scoped_session from sqlalchemy.pool import SingletonThreadPool from sqlalchemy_utils import database_exists, create_database logger = logging.getLogger(logger_name + ".PERSISTENCE") __db_engine = None __db_session = None __sqlite_db_path = "OpenISP.db" __sqlite_db_pattern = 'sqlite:///' + __sqlite_db_path __initialized = False __db_Base = declarative_base() def get_session(): if not __initialized: logger.error( "can't get session, persistence engine is not initialized") raise "can't get session, persistence engine is not initialized" return __db_session def get_Session_Instance(): if not __initialized: logger.error( "can't get session, persistence engine is not initialized") raise "can't get session, persistence engine is not initialized" session_instance = __db_session() return session_instance def get_db_base(): if __initialized: logger.error( "can't get declarative Base, persistence engine is already initialized") raise "can't get declarative Base, persistence engine is already initialized" return __db_Base def is_init(): return __initialized def init(): logger.debug("creating db engine....") global __db_engine __db_engine = create_engine(__sqlite_db_pattern, connect_args={'check_same_thread': False}) # Warning that's not so safe. # from https://stackoverflow.com/questions/2614984/sqlite-sqlalchemy-how-to-enforce-foreign-keys def _fk_pragma_on_connect(dbapi_con, con_record): dbapi_con.execute('pragma foreign_keys=ON') from sqlalchemy import event event.listen(__db_engine, 'connect', _fk_pragma_on_connect) # enforce sqlite foreign keys constraint option __db_engine.execute('pragma foreign_keys=on') logger.info("Database Engine Created.") logger.debug("building declarative model...") import Model.model_manager Model.model_manager.declare_model() logger.info("declarative model built.") if not database_exists(__db_engine.url): logger.info("Database is not created, creating one...") create_database(__db_engine.url) logger.info("Done !") __db_Base.metadata.create_all(__db_engine) if is_sane_database(__db_Base, __db_engine): logger.info("database is compliant to the model") else: logger.error("database is NOT compliant to the model") raise Exception("database is not compliant to the model !") global __db_session __db_session = scoped_session(sessionmaker(bind=__db_engine)) global __initialized __initialized = True logger.info("Persistence Session started.") def is_sane_database(Base, engine): from sqlalchemy import inspect from sqlalchemy.orm.clsregistry import _ModuleMarker from sqlalchemy.orm import RelationshipProperty iengine = inspect(engine) current_db_tables = iengine.get_table_names() model_tables = Base.metadata.sorted_tables for model_table in model_tables: logger.debug("table '" + model_table.name + "' in model, checking in database...") table_found = False for db_table in current_db_tables: logger.debug("checking db table : " + db_table) if db_table == model_table.name: table_found = True logger.debug("model table found in db : " + model_table.name) break if not table_found: logger.error("model table not found in db : " + model_table.name) return False logger.debug( "now checking columns informations between model and database for table : " + model_table.name) for model_col in model_table.columns: logger.debug("checking model column : " + model_col.name) col_found = False for db_col in iengine.get_columns(model_table.name): logger.debug("with db table column : " + db_col["name"]) if model_col.name == db_col["name"]: # TODO type checking and other attributes """ #checking type (model type are formatted like "VARCHAR(500)") # (db type are formatted like "VARCHAR(length=500)") print(type(model_col.type)) str1 = re.sub(r'[A-Z]+', ' ', str(model_col.type)) str2 = re.sub(r'[A-Z]+', ' ', db_col["type"]) if str1 != str2 : logger.debug("table column : '" + db_col["name"] + "' does not have the same type.") """ col_found = True logger.debug( "table column : '" + db_col["name"] + "' found and compliant to the model table informations") break if not col_found: logger.error("column '" + model_col.name + "' in table '" + model_table.name + "'not found in db ") return False # checking name and type return True def wipeout_database(): import os os.remove(__sqlite_db_path) # TODO adapt to program, find what is an inspector. def drop_all_tables(engine, inspector, schema=None, include_names=None): from sqlalchemy import Column, Table, Integer, MetaData, \ ForeignKeyConstraint from sqlalchemy.schema import DropTable, DropConstraint if include_names is not None: include_names = set(include_names) with engine.connect() as conn: for tname, fkcs in reversed( inspector.get_sorted_table_and_fkc_names(schema=schema)): if tname: if include_names is not None and tname not in include_names: continue conn.execute(DropTable( Table(tname, MetaData(), schema=schema) )) elif fkcs: if not engine.dialect.supports_alter: continue for tname, fkc in fkcs: if include_names is not None and \ tname not in include_names: continue tb = Table( tname, MetaData(), Column('x', Integer), Column('y', Integer), schema=schema ) conn.execute(DropConstraint( ForeignKeyConstraint( [tb.c.x], [tb.c.y], name=fkc) ))