provision.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. import os
  2. import re
  3. from ... import exc
  4. from ...engine import url as sa_url
  5. from ...testing.provision import create_db
  6. from ...testing.provision import drop_db
  7. from ...testing.provision import follower_url_from_main
  8. from ...testing.provision import generate_driver_url
  9. from ...testing.provision import log
  10. from ...testing.provision import post_configure_engine
  11. from ...testing.provision import run_reap_dbs
  12. from ...testing.provision import stop_test_class_outside_fixtures
  13. from ...testing.provision import temp_table_keyword_args
  14. # TODO: I can't get this to build dynamically with pytest-xdist procs
  15. _drivernames = {"pysqlite", "aiosqlite", "pysqlcipher"}
  16. @generate_driver_url.for_db("sqlite")
  17. def generate_driver_url(url, driver, query_str):
  18. if driver == "pysqlcipher" and url.get_driver_name() != "pysqlcipher":
  19. if url.database:
  20. url = url.set(database=url.database + ".enc")
  21. url = url.set(password="test")
  22. url = url.set(drivername="sqlite+%s" % (driver,))
  23. try:
  24. url.get_dialect()
  25. except exc.NoSuchModuleError:
  26. return None
  27. else:
  28. return url
  29. @follower_url_from_main.for_db("sqlite")
  30. def _sqlite_follower_url_from_main(url, ident):
  31. url = sa_url.make_url(url)
  32. if not url.database or url.database == ":memory:":
  33. return url
  34. else:
  35. m = re.match(r"(.+?)\.(.+)$", url.database)
  36. name, ext = m.group(1, 2)
  37. drivername = url.get_driver_name()
  38. return sa_url.make_url(
  39. "sqlite+%s:///%s_%s.%s" % (drivername, drivername, ident, ext)
  40. )
  41. @post_configure_engine.for_db("sqlite")
  42. def _sqlite_post_configure_engine(url, engine, follower_ident):
  43. from sqlalchemy import event
  44. @event.listens_for(engine, "connect")
  45. def connect(dbapi_connection, connection_record):
  46. # use file DBs in all cases, memory acts kind of strangely
  47. # as an attached
  48. if not follower_ident:
  49. # note this test_schema.db gets created for all test runs.
  50. # there's not any dedicated cleanup step for it. it in some
  51. # ways corresponds to the "test.test_schema" schema that's
  52. # expected to be already present, so for now it just stays
  53. # in a given checkout directory.
  54. dbapi_connection.execute(
  55. 'ATTACH DATABASE "%s_test_schema.db" AS test_schema'
  56. % (engine.driver,)
  57. )
  58. else:
  59. dbapi_connection.execute(
  60. 'ATTACH DATABASE "%s_%s_test_schema.db" AS test_schema'
  61. % (follower_ident, engine.driver)
  62. )
  63. @create_db.for_db("sqlite")
  64. def _sqlite_create_db(cfg, eng, ident):
  65. pass
  66. @drop_db.for_db("sqlite")
  67. def _sqlite_drop_db(cfg, eng, ident):
  68. for path in [
  69. "%s.db" % ident,
  70. "%s_%s_test_schema.db" % (ident, eng.driver),
  71. ]:
  72. if os.path.exists(path):
  73. log.info("deleting SQLite database file: %s" % path)
  74. os.remove(path)
  75. @stop_test_class_outside_fixtures.for_db("sqlite")
  76. def stop_test_class_outside_fixtures(config, db, cls):
  77. with db.connect() as conn:
  78. files = [
  79. row.file
  80. for row in conn.exec_driver_sql("PRAGMA database_list")
  81. if row.file
  82. ]
  83. if files:
  84. db.dispose()
  85. # some sqlite file tests are not cleaning up well yet, so do this
  86. # just to make things simple for now
  87. for file_ in files:
  88. if file_ and os.path.exists(file_):
  89. os.remove(file_)
  90. @temp_table_keyword_args.for_db("sqlite")
  91. def _sqlite_temp_table_keyword_args(cfg, eng):
  92. return {"prefixes": ["TEMPORARY"]}
  93. @run_reap_dbs.for_db("sqlite")
  94. def _reap_sqlite_dbs(url, idents):
  95. log.info("db reaper connecting to %r", url)
  96. log.info("identifiers in file: %s", ", ".join(idents))
  97. for ident in idents:
  98. # we don't have a config so we can't call _sqlite_drop_db due to the
  99. # decorator
  100. for ext in ("db", "db.enc"):
  101. for path in (
  102. ["%s.%s" % (ident, ext)]
  103. + [
  104. "%s_%s.%s" % (drivername, ident, ext)
  105. for drivername in _drivernames
  106. ]
  107. + [
  108. "%s_test_schema.%s" % (drivername, ext)
  109. for drivername in _drivernames
  110. ]
  111. + [
  112. "%s_%s_test_schema.%s" % (ident, drivername, ext)
  113. for drivername in _drivernames
  114. ]
  115. ):
  116. if os.path.exists(path):
  117. log.info("deleting SQLite database file: %s" % path)
  118. os.remove(path)