provision.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. import time
  2. from ... import exc
  3. from ... import inspect
  4. from ... import text
  5. from ...testing import warn_test_suite
  6. from ...testing.provision import create_db
  7. from ...testing.provision import drop_all_schema_objects_post_tables
  8. from ...testing.provision import drop_all_schema_objects_pre_tables
  9. from ...testing.provision import drop_db
  10. from ...testing.provision import log
  11. from ...testing.provision import prepare_for_drop_tables
  12. from ...testing.provision import set_default_schema_on_connection
  13. from ...testing.provision import temp_table_keyword_args
  14. @create_db.for_db("postgresql")
  15. def _pg_create_db(cfg, eng, ident):
  16. template_db = cfg.options.postgresql_templatedb
  17. with eng.execution_options(isolation_level="AUTOCOMMIT").begin() as conn:
  18. try:
  19. _pg_drop_db(cfg, conn, ident)
  20. except Exception:
  21. pass
  22. if not template_db:
  23. template_db = conn.exec_driver_sql(
  24. "select current_database()"
  25. ).scalar()
  26. attempt = 0
  27. while True:
  28. try:
  29. conn.exec_driver_sql(
  30. "CREATE DATABASE %s TEMPLATE %s" % (ident, template_db)
  31. )
  32. except exc.OperationalError as err:
  33. attempt += 1
  34. if attempt >= 3:
  35. raise
  36. if "accessed by other users" in str(err):
  37. log.info(
  38. "Waiting to create %s, URI %r, "
  39. "template DB %s is in use sleeping for .5",
  40. ident,
  41. eng.url,
  42. template_db,
  43. )
  44. time.sleep(0.5)
  45. except:
  46. raise
  47. else:
  48. break
  49. @drop_db.for_db("postgresql")
  50. def _pg_drop_db(cfg, eng, ident):
  51. with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
  52. with conn.begin():
  53. conn.execute(
  54. text(
  55. "select pg_terminate_backend(pid) from pg_stat_activity "
  56. "where usename=current_user and pid != pg_backend_pid() "
  57. "and datname=:dname"
  58. ),
  59. dict(dname=ident),
  60. )
  61. conn.exec_driver_sql("DROP DATABASE %s" % ident)
  62. @temp_table_keyword_args.for_db("postgresql")
  63. def _postgresql_temp_table_keyword_args(cfg, eng):
  64. return {"prefixes": ["TEMPORARY"]}
  65. @set_default_schema_on_connection.for_db("postgresql")
  66. def _postgresql_set_default_schema_on_connection(
  67. cfg, dbapi_connection, schema_name
  68. ):
  69. existing_autocommit = dbapi_connection.autocommit
  70. dbapi_connection.autocommit = True
  71. cursor = dbapi_connection.cursor()
  72. cursor.execute("SET SESSION search_path='%s'" % schema_name)
  73. cursor.close()
  74. dbapi_connection.autocommit = existing_autocommit
  75. @drop_all_schema_objects_pre_tables.for_db("postgresql")
  76. def drop_all_schema_objects_pre_tables(cfg, eng):
  77. with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
  78. for xid in conn.execute("select gid from pg_prepared_xacts").scalars():
  79. conn.execute("ROLLBACK PREPARED '%s'" % xid)
  80. @drop_all_schema_objects_post_tables.for_db("postgresql")
  81. def drop_all_schema_objects_post_tables(cfg, eng):
  82. from sqlalchemy.dialects import postgresql
  83. inspector = inspect(eng)
  84. with eng.begin() as conn:
  85. for enum in inspector.get_enums("*"):
  86. conn.execute(
  87. postgresql.DropEnumType(
  88. postgresql.ENUM(name=enum["name"], schema=enum["schema"])
  89. )
  90. )
  91. @prepare_for_drop_tables.for_db("postgresql")
  92. def prepare_for_drop_tables(config, connection):
  93. """Ensure there are no locks on the current username/database."""
  94. result = connection.exec_driver_sql(
  95. "select pid, state, wait_event_type, query "
  96. # "select pg_terminate_backend(pid), state, wait_event_type "
  97. "from pg_stat_activity where "
  98. "usename=current_user "
  99. "and datname=current_database() and state='idle in transaction' "
  100. "and pid != pg_backend_pid()"
  101. )
  102. rows = result.all() # noqa
  103. if rows:
  104. warn_test_suite(
  105. "PostgreSQL may not be able to DROP tables due to "
  106. "idle in transaction: %s"
  107. % ("; ".join(row._mapping["query"] for row in rows))
  108. )