test_dialect.py 11 KB


  1. #! coding: utf-8
  2. from . import testing
  3. from .. import assert_raises
  4. from .. import config
  5. from .. import engines
  6. from .. import eq_
  7. from .. import fixtures
  8. from .. import ne_
  9. from .. import provide_metadata
  10. from ..config import requirements
  11. from ..provision import set_default_schema_on_connection
  12. from ..schema import Column
  13. from ..schema import Table
  14. from ... import bindparam
  15. from ... import event
  16. from ... import exc
  17. from ... import Integer
  18. from ... import literal_column
  19. from ... import select
  20. from ... import String
  21. from ...util import compat
  22. class ExceptionTest(fixtures.TablesTest):
  23. """Test basic exception wrapping.
  24. DBAPIs vary a lot in exception behavior so to actually anticipate
  25. specific exceptions from real round trips, we need to be conservative.
  26. """
  27. run_deletes = "each"
  28. __backend__ = True
  29. @classmethod
  30. def define_tables(cls, metadata):
  31. Table(
  32. "manual_pk",
  33. metadata,
  34. Column("id", Integer, primary_key=True, autoincrement=False),
  35. Column("data", String(50)),
  36. )
  37. @requirements.duplicate_key_raises_integrity_error
  38. def test_integrity_error(self):
  39. with config.db.connect() as conn:
  40. trans = conn.begin()
  41. conn.execute(
  42. self.tables.manual_pk.insert(), {"id": 1, "data": "d1"}
  43. )
  44. assert_raises(
  45. exc.IntegrityError,
  46. conn.execute,
  47. self.tables.manual_pk.insert(),
  48. {"id": 1, "data": "d1"},
  49. )
  50. trans.rollback()
  51. def test_exception_with_non_ascii(self):
  52. with config.db.connect() as conn:
  53. try:
  54. # try to create an error message that likely has non-ascii
  55. # characters in the DBAPI's message string. unfortunately
  56. # there's no way to make this happen with some drivers like
  57. # mysqlclient, pymysql. this at least does produce a non-
  58. # ascii error message for cx_oracle, psycopg2
  59. conn.execute(select(literal_column(u"méil")))
  60. assert False
  61. except exc.DBAPIError as err:
  62. err_str = str(err)
  63. assert str(err.orig) in str(err)
  64. # test that we are actually getting string on Py2k, unicode
  65. # on Py3k.
  66. if compat.py2k:
  67. assert isinstance(err_str, str)
  68. else:
  69. assert isinstance(err_str, str)
  70. class IsolationLevelTest(fixtures.TestBase):
  71. __backend__ = True
  72. __requires__ = ("isolation_level",)
  73. def _get_non_default_isolation_level(self):
  74. levels = requirements.get_isolation_levels(config)
  75. default = levels["default"]
  76. supported = levels["supported"]
  77. s = set(supported).difference(["AUTOCOMMIT", default])
  78. if s:
  79. return s.pop()
  80. else:
  81. config.skip_test("no non-default isolation level available")
  82. def test_default_isolation_level(self):
  83. eq_(
  84. config.db.dialect.default_isolation_level,
  85. requirements.get_isolation_levels(config)["default"],
  86. )
  87. def test_non_default_isolation_level(self):
  88. non_default = self._get_non_default_isolation_level()
  89. with config.db.connect() as conn:
  90. existing = conn.get_isolation_level()
  91. ne_(existing, non_default)
  92. conn.execution_options(isolation_level=non_default)
  93. eq_(conn.get_isolation_level(), non_default)
  94. conn.dialect.reset_isolation_level(conn.connection)
  95. eq_(conn.get_isolation_level(), existing)
  96. def test_all_levels(self):
  97. levels = requirements.get_isolation_levels(config)
  98. all_levels = levels["supported"]
  99. for level in set(all_levels).difference(["AUTOCOMMIT"]):
  100. with config.db.connect() as conn:
  101. conn.execution_options(isolation_level=level)
  102. eq_(conn.get_isolation_level(), level)
  103. trans = conn.begin()
  104. trans.rollback()
  105. eq_(conn.get_isolation_level(), level)
  106. with config.db.connect() as conn:
  107. eq_(
  108. conn.get_isolation_level(),
  109. levels["default"],
  110. )
  111. class AutocommitIsolationTest(fixtures.TablesTest):
  112. run_deletes = "each"
  113. __requires__ = ("autocommit",)
  114. __backend__ = True
  115. @classmethod
  116. def define_tables(cls, metadata):
  117. Table(
  118. "some_table",
  119. metadata,
  120. Column("id", Integer, primary_key=True, autoincrement=False),
  121. Column("data", String(50)),
  122. test_needs_acid=True,
  123. )
  124. def _test_conn_autocommits(self, conn, autocommit):
  125. trans = conn.begin()
  126. conn.execute(
  127. self.tables.some_table.insert(), {"id": 1, "data": "some data"}
  128. )
  129. trans.rollback()
  130. eq_(
  131. conn.scalar(select(self.tables.some_table.c.id)),
  132. 1 if autocommit else None,
  133. )
  134. with conn.begin():
  135. conn.execute(self.tables.some_table.delete())
  136. def test_autocommit_on(self, connection_no_trans):
  137. conn = connection_no_trans
  138. c2 = conn.execution_options(isolation_level="AUTOCOMMIT")
  139. self._test_conn_autocommits(c2, True)
  140. c2.dialect.reset_isolation_level(c2.connection)
  141. self._test_conn_autocommits(conn, False)
  142. def test_autocommit_off(self, connection_no_trans):
  143. conn = connection_no_trans
  144. self._test_conn_autocommits(conn, False)
  145. def test_turn_autocommit_off_via_default_iso_level(
  146. self, connection_no_trans
  147. ):
  148. conn = connection_no_trans
  149. conn = conn.execution_options(isolation_level="AUTOCOMMIT")
  150. self._test_conn_autocommits(conn, True)
  151. conn.execution_options(
  152. isolation_level=requirements.get_isolation_levels(config)[
  153. "default"
  154. ]
  155. )
  156. self._test_conn_autocommits(conn, False)
  157. class EscapingTest(fixtures.TestBase):
  158. @provide_metadata
  159. def test_percent_sign_round_trip(self):
  160. """test that the DBAPI accommodates for escaped / nonescaped
  161. percent signs in a way that matches the compiler
  162. """
  163. m = self.metadata
  164. t = Table("t", m, Column("data", String(50)))
  165. t.create(config.db)
  166. with config.db.begin() as conn:
  167. conn.execute(t.insert(), dict(data="some % value"))
  168. conn.execute(t.insert(), dict(data="some %% other value"))
  169. eq_(
  170. conn.scalar(
  171. select(t.c.data).where(
  172. t.c.data == literal_column("'some % value'")
  173. )
  174. ),
  175. "some % value",
  176. )
  177. eq_(
  178. conn.scalar(
  179. select(t.c.data).where(
  180. t.c.data == literal_column("'some %% other value'")
  181. )
  182. ),
  183. "some %% other value",
  184. )
  185. class WeCanSetDefaultSchemaWEventsTest(fixtures.TestBase):
  186. __backend__ = True
  187. __requires__ = ("default_schema_name_switch",)
  188. def test_control_case(self):
  189. default_schema_name = config.db.dialect.default_schema_name
  190. eng = engines.testing_engine()
  191. with eng.connect():
  192. pass
  193. eq_(eng.dialect.default_schema_name, default_schema_name)
  194. def test_wont_work_wo_insert(self):
  195. default_schema_name = config.db.dialect.default_schema_name
  196. eng = engines.testing_engine()
  197. @event.listens_for(eng, "connect")
  198. def on_connect(dbapi_connection, connection_record):
  199. set_default_schema_on_connection(
  200. config, dbapi_connection, config.test_schema
  201. )
  202. with eng.connect() as conn:
  203. what_it_should_be = eng.dialect._get_default_schema_name(conn)
  204. eq_(what_it_should_be, config.test_schema)
  205. eq_(eng.dialect.default_schema_name, default_schema_name)
  206. def test_schema_change_on_connect(self):
  207. eng = engines.testing_engine()
  208. @event.listens_for(eng, "connect", insert=True)
  209. def on_connect(dbapi_connection, connection_record):
  210. set_default_schema_on_connection(
  211. config, dbapi_connection, config.test_schema
  212. )
  213. with eng.connect() as conn:
  214. what_it_should_be = eng.dialect._get_default_schema_name(conn)
  215. eq_(what_it_should_be, config.test_schema)
  216. eq_(eng.dialect.default_schema_name, config.test_schema)
  217. def test_schema_change_works_w_transactions(self):
  218. eng = engines.testing_engine()
  219. @event.listens_for(eng, "connect", insert=True)
  220. def on_connect(dbapi_connection, *arg):
  221. set_default_schema_on_connection(
  222. config, dbapi_connection, config.test_schema
  223. )
  224. with eng.connect() as conn:
  225. trans = conn.begin()
  226. what_it_should_be = eng.dialect._get_default_schema_name(conn)
  227. eq_(what_it_should_be, config.test_schema)
  228. trans.rollback()
  229. what_it_should_be = eng.dialect._get_default_schema_name(conn)
  230. eq_(what_it_should_be, config.test_schema)
  231. eq_(eng.dialect.default_schema_name, config.test_schema)
  232. class FutureWeCanSetDefaultSchemaWEventsTest(
  233. fixtures.FutureEngineMixin, WeCanSetDefaultSchemaWEventsTest
  234. ):
  235. pass
  236. class DifficultParametersTest(fixtures.TestBase):
  237. __backend__ = True
  238. @testing.combinations(
  239. ("boring",),
  240. ("per cent",),
  241. ("per % cent",),
  242. ("%percent",),
  243. ("par(ens)",),
  244. ("percent%(ens)yah",),
  245. ("col:ons",),
  246. ("more :: %colons%",),
  247. ("/slashes/",),
  248. ("more/slashes",),
  249. ("q?marks",),
  250. ("1param",),
  251. ("1col:on",),
  252. argnames="name",
  253. )
  254. def test_round_trip(self, name, connection, metadata):
  255. t = Table(
  256. "t",
  257. metadata,
  258. Column("id", Integer, primary_key=True),
  259. Column(name, String(50), nullable=False),
  260. )
  261. # table is created
  262. t.create(connection)
  263. # automatic param generated by insert
  264. connection.execute(t.insert().values({"id": 1, name: "some name"}))
  265. # automatic param generated by criteria, plus selecting the column
  266. stmt = select(t.c[name]).where(t.c[name] == "some name")
  267. eq_(connection.scalar(stmt), "some name")
  268. # use the name in a param explicitly
  269. stmt = select(t.c[name]).where(t.c[name] == bindparam(name))
  270. row = connection.execute(stmt, {name: "some name"}).first()
  271. # name works as the key from cursor.description
  272. eq_(row._mapping[name], "some name")