test_insert.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. from .. import config
  2. from .. import engines
  3. from .. import fixtures
  4. from ..assertions import eq_
  5. from ..config import requirements
  6. from ..schema import Column
  7. from ..schema import Table
  8. from ... import Integer
  9. from ... import literal
  10. from ... import literal_column
  11. from ... import select
  12. from ... import String
  13. class LastrowidTest(fixtures.TablesTest):
  14. run_deletes = "each"
  15. __backend__ = True
  16. __requires__ = "implements_get_lastrowid", "autoincrement_insert"
  17. __engine_options__ = {"implicit_returning": False}
  18. @classmethod
  19. def define_tables(cls, metadata):
  20. Table(
  21. "autoinc_pk",
  22. metadata,
  23. Column(
  24. "id", Integer, primary_key=True, test_needs_autoincrement=True
  25. ),
  26. Column("data", String(50)),
  27. )
  28. Table(
  29. "manual_pk",
  30. metadata,
  31. Column("id", Integer, primary_key=True, autoincrement=False),
  32. Column("data", String(50)),
  33. )
  34. def _assert_round_trip(self, table, conn):
  35. row = conn.execute(table.select()).first()
  36. eq_(
  37. row,
  38. (
  39. conn.dialect.default_sequence_base,
  40. "some data",
  41. ),
  42. )
  43. def test_autoincrement_on_insert(self, connection):
  44. connection.execute(
  45. self.tables.autoinc_pk.insert(), dict(data="some data")
  46. )
  47. self._assert_round_trip(self.tables.autoinc_pk, connection)
  48. def test_last_inserted_id(self, connection):
  49. r = connection.execute(
  50. self.tables.autoinc_pk.insert(), dict(data="some data")
  51. )
  52. pk = connection.scalar(select(self.tables.autoinc_pk.c.id))
  53. eq_(r.inserted_primary_key, (pk,))
  54. @requirements.dbapi_lastrowid
  55. def test_native_lastrowid_autoinc(self, connection):
  56. r = connection.execute(
  57. self.tables.autoinc_pk.insert(), dict(data="some data")
  58. )
  59. lastrowid = r.lastrowid
  60. pk = connection.scalar(select(self.tables.autoinc_pk.c.id))
  61. eq_(lastrowid, pk)
  62. class InsertBehaviorTest(fixtures.TablesTest):
  63. run_deletes = "each"
  64. __backend__ = True
  65. @classmethod
  66. def define_tables(cls, metadata):
  67. Table(
  68. "autoinc_pk",
  69. metadata,
  70. Column(
  71. "id", Integer, primary_key=True, test_needs_autoincrement=True
  72. ),
  73. Column("data", String(50)),
  74. )
  75. Table(
  76. "manual_pk",
  77. metadata,
  78. Column("id", Integer, primary_key=True, autoincrement=False),
  79. Column("data", String(50)),
  80. )
  81. Table(
  82. "includes_defaults",
  83. metadata,
  84. Column(
  85. "id", Integer, primary_key=True, test_needs_autoincrement=True
  86. ),
  87. Column("data", String(50)),
  88. Column("x", Integer, default=5),
  89. Column(
  90. "y",
  91. Integer,
  92. default=literal_column("2", type_=Integer) + literal(2),
  93. ),
  94. )
  95. @requirements.autoincrement_insert
  96. def test_autoclose_on_insert(self):
  97. if requirements.returning.enabled:
  98. engine = engines.testing_engine(
  99. options={"implicit_returning": False}
  100. )
  101. else:
  102. engine = config.db
  103. with engine.begin() as conn:
  104. r = conn.execute(
  105. self.tables.autoinc_pk.insert(), dict(data="some data")
  106. )
  107. assert r._soft_closed
  108. assert not r.closed
  109. assert r.is_insert
  110. # new as of I8091919d45421e3f53029b8660427f844fee0228; for the moment
  111. # an insert where the PK was taken from a row that the dialect
  112. # selected, as is the case for mssql/pyodbc, will still report
  113. # returns_rows as true because there's a cursor description. in that
  114. # case, the row had to have been consumed at least.
  115. assert not r.returns_rows or r.fetchone() is None
  116. @requirements.returning
  117. def test_autoclose_on_insert_implicit_returning(self, connection):
  118. r = connection.execute(
  119. self.tables.autoinc_pk.insert(), dict(data="some data")
  120. )
  121. assert r._soft_closed
  122. assert not r.closed
  123. assert r.is_insert
  124. # note we are experimenting with having this be True
  125. # as of I8091919d45421e3f53029b8660427f844fee0228 .
  126. # implicit returning has fetched the row, but it still is a
  127. # "returns rows"
  128. assert r.returns_rows
  129. # and we should be able to fetchone() on it, we just get no row
  130. eq_(r.fetchone(), None)
  131. # and the keys, etc.
  132. eq_(r.keys(), ["id"])
  133. # but the dialect took in the row already. not really sure
  134. # what the best behavior is.
  135. @requirements.empty_inserts
  136. def test_empty_insert(self, connection):
  137. r = connection.execute(self.tables.autoinc_pk.insert())
  138. assert r._soft_closed
  139. assert not r.closed
  140. r = connection.execute(
  141. self.tables.autoinc_pk.select().where(
  142. self.tables.autoinc_pk.c.id != None
  143. )
  144. )
  145. eq_(len(r.all()), 1)
  146. @requirements.empty_inserts_executemany
  147. def test_empty_insert_multiple(self, connection):
  148. r = connection.execute(self.tables.autoinc_pk.insert(), [{}, {}, {}])
  149. assert r._soft_closed
  150. assert not r.closed
  151. r = connection.execute(
  152. self.tables.autoinc_pk.select().where(
  153. self.tables.autoinc_pk.c.id != None
  154. )
  155. )
  156. eq_(len(r.all()), 3)
  157. @requirements.insert_from_select
  158. def test_insert_from_select_autoinc(self, connection):
  159. src_table = self.tables.manual_pk
  160. dest_table = self.tables.autoinc_pk
  161. connection.execute(
  162. src_table.insert(),
  163. [
  164. dict(id=1, data="data1"),
  165. dict(id=2, data="data2"),
  166. dict(id=3, data="data3"),
  167. ],
  168. )
  169. result = connection.execute(
  170. dest_table.insert().from_select(
  171. ("data",),
  172. select(src_table.c.data).where(
  173. src_table.c.data.in_(["data2", "data3"])
  174. ),
  175. )
  176. )
  177. eq_(result.inserted_primary_key, (None,))
  178. result = connection.execute(
  179. select(dest_table.c.data).order_by(dest_table.c.data)
  180. )
  181. eq_(result.fetchall(), [("data2",), ("data3",)])
  182. @requirements.insert_from_select
  183. def test_insert_from_select_autoinc_no_rows(self, connection):
  184. src_table = self.tables.manual_pk
  185. dest_table = self.tables.autoinc_pk
  186. result = connection.execute(
  187. dest_table.insert().from_select(
  188. ("data",),
  189. select(src_table.c.data).where(
  190. src_table.c.data.in_(["data2", "data3"])
  191. ),
  192. )
  193. )
  194. eq_(result.inserted_primary_key, (None,))
  195. result = connection.execute(
  196. select(dest_table.c.data).order_by(dest_table.c.data)
  197. )
  198. eq_(result.fetchall(), [])
  199. @requirements.insert_from_select
  200. def test_insert_from_select(self, connection):
  201. table = self.tables.manual_pk
  202. connection.execute(
  203. table.insert(),
  204. [
  205. dict(id=1, data="data1"),
  206. dict(id=2, data="data2"),
  207. dict(id=3, data="data3"),
  208. ],
  209. )
  210. connection.execute(
  211. table.insert()
  212. .inline()
  213. .from_select(
  214. ("id", "data"),
  215. select(table.c.id + 5, table.c.data).where(
  216. table.c.data.in_(["data2", "data3"])
  217. ),
  218. )
  219. )
  220. eq_(
  221. connection.execute(
  222. select(table.c.data).order_by(table.c.data)
  223. ).fetchall(),
  224. [("data1",), ("data2",), ("data2",), ("data3",), ("data3",)],
  225. )
  226. @requirements.insert_from_select
  227. def test_insert_from_select_with_defaults(self, connection):
  228. table = self.tables.includes_defaults
  229. connection.execute(
  230. table.insert(),
  231. [
  232. dict(id=1, data="data1"),
  233. dict(id=2, data="data2"),
  234. dict(id=3, data="data3"),
  235. ],
  236. )
  237. connection.execute(
  238. table.insert()
  239. .inline()
  240. .from_select(
  241. ("id", "data"),
  242. select(table.c.id + 5, table.c.data).where(
  243. table.c.data.in_(["data2", "data3"])
  244. ),
  245. )
  246. )
  247. eq_(
  248. connection.execute(
  249. select(table).order_by(table.c.data, table.c.id)
  250. ).fetchall(),
  251. [
  252. (1, "data1", 5, 4),
  253. (2, "data2", 5, 4),
  254. (7, "data2", 5, 4),
  255. (3, "data3", 5, 4),
  256. (8, "data3", 5, 4),
  257. ],
  258. )
  259. class ReturningTest(fixtures.TablesTest):
  260. run_create_tables = "each"
  261. __requires__ = "returning", "autoincrement_insert"
  262. __backend__ = True
  263. __engine_options__ = {"implicit_returning": True}
  264. def _assert_round_trip(self, table, conn):
  265. row = conn.execute(table.select()).first()
  266. eq_(
  267. row,
  268. (
  269. conn.dialect.default_sequence_base,
  270. "some data",
  271. ),
  272. )
  273. @classmethod
  274. def define_tables(cls, metadata):
  275. Table(
  276. "autoinc_pk",
  277. metadata,
  278. Column(
  279. "id", Integer, primary_key=True, test_needs_autoincrement=True
  280. ),
  281. Column("data", String(50)),
  282. )
  283. @requirements.fetch_rows_post_commit
  284. def test_explicit_returning_pk_autocommit(self, connection):
  285. table = self.tables.autoinc_pk
  286. r = connection.execute(
  287. table.insert().returning(table.c.id), dict(data="some data")
  288. )
  289. pk = r.first()[0]
  290. fetched_pk = connection.scalar(select(table.c.id))
  291. eq_(fetched_pk, pk)
  292. def test_explicit_returning_pk_no_autocommit(self, connection):
  293. table = self.tables.autoinc_pk
  294. r = connection.execute(
  295. table.insert().returning(table.c.id), dict(data="some data")
  296. )
  297. pk = r.first()[0]
  298. fetched_pk = connection.scalar(select(table.c.id))
  299. eq_(fetched_pk, pk)
  300. def test_autoincrement_on_insert_implicit_returning(self, connection):
  301. connection.execute(
  302. self.tables.autoinc_pk.insert(), dict(data="some data")
  303. )
  304. self._assert_round_trip(self.tables.autoinc_pk, connection)
  305. def test_last_inserted_id_implicit_returning(self, connection):
  306. r = connection.execute(
  307. self.tables.autoinc_pk.insert(), dict(data="some data")
  308. )
  309. pk = connection.scalar(select(self.tables.autoinc_pk.c.id))
  310. eq_(r.inserted_primary_key, (pk,))
  311. __all__ = ("LastrowidTest", "InsertBehaviorTest", "ReturningTest")