persistence.py 81 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407
  1. # orm/persistence.py
  2. # Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. """private module containing functions used to emit INSERT, UPDATE
  8. and DELETE statements on behalf of a :class:`_orm.Mapper` and its descending
  9. mappers.
  10. The functions here are called only by the unit of work functions
  11. in unitofwork.py.
  12. """
  13. from itertools import chain
  14. from itertools import groupby
  15. import operator
  16. from . import attributes
  17. from . import evaluator
  18. from . import exc as orm_exc
  19. from . import loading
  20. from . import sync
  21. from .base import NO_VALUE
  22. from .base import state_str
  23. from .. import exc as sa_exc
  24. from .. import future
  25. from .. import sql
  26. from .. import util
  27. from ..engine import result as _result
  28. from ..sql import coercions
  29. from ..sql import expression
  30. from ..sql import operators
  31. from ..sql import roles
  32. from ..sql import select
  33. from ..sql import sqltypes
  34. from ..sql.base import _entity_namespace_key
  35. from ..sql.base import CompileState
  36. from ..sql.base import Options
  37. from ..sql.dml import DeleteDMLState
  38. from ..sql.dml import UpdateDMLState
  39. from ..sql.elements import BooleanClauseList
  40. from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
  41. def _bulk_insert(
  42. mapper,
  43. mappings,
  44. session_transaction,
  45. isstates,
  46. return_defaults,
  47. render_nulls,
  48. ):
  49. base_mapper = mapper.base_mapper
  50. if session_transaction.session.connection_callable:
  51. raise NotImplementedError(
  52. "connection_callable / per-instance sharding "
  53. "not supported in bulk_insert()"
  54. )
  55. if isstates:
  56. if return_defaults:
  57. states = [(state, state.dict) for state in mappings]
  58. mappings = [dict_ for (state, dict_) in states]
  59. else:
  60. mappings = [state.dict for state in mappings]
  61. else:
  62. mappings = list(mappings)
  63. connection = session_transaction.connection(base_mapper)
  64. for table, super_mapper in base_mapper._sorted_tables.items():
  65. if not mapper.isa(super_mapper):
  66. continue
  67. records = (
  68. (
  69. None,
  70. state_dict,
  71. params,
  72. mapper,
  73. connection,
  74. value_params,
  75. has_all_pks,
  76. has_all_defaults,
  77. )
  78. for (
  79. state,
  80. state_dict,
  81. params,
  82. mp,
  83. conn,
  84. value_params,
  85. has_all_pks,
  86. has_all_defaults,
  87. ) in _collect_insert_commands(
  88. table,
  89. ((None, mapping, mapper, connection) for mapping in mappings),
  90. bulk=True,
  91. return_defaults=return_defaults,
  92. render_nulls=render_nulls,
  93. )
  94. )
  95. _emit_insert_statements(
  96. base_mapper,
  97. None,
  98. super_mapper,
  99. table,
  100. records,
  101. bookkeeping=return_defaults,
  102. )
  103. if return_defaults and isstates:
  104. identity_cls = mapper._identity_class
  105. identity_props = [p.key for p in mapper._identity_key_props]
  106. for state, dict_ in states:
  107. state.key = (
  108. identity_cls,
  109. tuple([dict_[key] for key in identity_props]),
  110. )
  111. def _bulk_update(
  112. mapper, mappings, session_transaction, isstates, update_changed_only
  113. ):
  114. base_mapper = mapper.base_mapper
  115. search_keys = mapper._primary_key_propkeys
  116. if mapper._version_id_prop:
  117. search_keys = {mapper._version_id_prop.key}.union(search_keys)
  118. def _changed_dict(mapper, state):
  119. return dict(
  120. (k, v)
  121. for k, v in state.dict.items()
  122. if k in state.committed_state or k in search_keys
  123. )
  124. if isstates:
  125. if update_changed_only:
  126. mappings = [_changed_dict(mapper, state) for state in mappings]
  127. else:
  128. mappings = [state.dict for state in mappings]
  129. else:
  130. mappings = list(mappings)
  131. if session_transaction.session.connection_callable:
  132. raise NotImplementedError(
  133. "connection_callable / per-instance sharding "
  134. "not supported in bulk_update()"
  135. )
  136. connection = session_transaction.connection(base_mapper)
  137. for table, super_mapper in base_mapper._sorted_tables.items():
  138. if not mapper.isa(super_mapper):
  139. continue
  140. records = _collect_update_commands(
  141. None,
  142. table,
  143. (
  144. (
  145. None,
  146. mapping,
  147. mapper,
  148. connection,
  149. (
  150. mapping[mapper._version_id_prop.key]
  151. if mapper._version_id_prop
  152. else None
  153. ),
  154. )
  155. for mapping in mappings
  156. ),
  157. bulk=True,
  158. )
  159. _emit_update_statements(
  160. base_mapper,
  161. None,
  162. super_mapper,
  163. table,
  164. records,
  165. bookkeeping=False,
  166. )
  167. def save_obj(base_mapper, states, uowtransaction, single=False):
  168. """Issue ``INSERT`` and/or ``UPDATE`` statements for a list
  169. of objects.
  170. This is called within the context of a UOWTransaction during a
  171. flush operation, given a list of states to be flushed. The
  172. base mapper in an inheritance hierarchy handles the inserts/
  173. updates for all descendant mappers.
  174. """
  175. # if batch=false, call _save_obj separately for each object
  176. if not single and not base_mapper.batch:
  177. for state in _sort_states(base_mapper, states):
  178. save_obj(base_mapper, [state], uowtransaction, single=True)
  179. return
  180. states_to_update = []
  181. states_to_insert = []
  182. for (
  183. state,
  184. dict_,
  185. mapper,
  186. connection,
  187. has_identity,
  188. row_switch,
  189. update_version_id,
  190. ) in _organize_states_for_save(base_mapper, states, uowtransaction):
  191. if has_identity or row_switch:
  192. states_to_update.append(
  193. (state, dict_, mapper, connection, update_version_id)
  194. )
  195. else:
  196. states_to_insert.append((state, dict_, mapper, connection))
  197. for table, mapper in base_mapper._sorted_tables.items():
  198. if table not in mapper._pks_by_table:
  199. continue
  200. insert = _collect_insert_commands(table, states_to_insert)
  201. update = _collect_update_commands(
  202. uowtransaction, table, states_to_update
  203. )
  204. _emit_update_statements(
  205. base_mapper,
  206. uowtransaction,
  207. mapper,
  208. table,
  209. update,
  210. )
  211. _emit_insert_statements(
  212. base_mapper,
  213. uowtransaction,
  214. mapper,
  215. table,
  216. insert,
  217. )
  218. _finalize_insert_update_commands(
  219. base_mapper,
  220. uowtransaction,
  221. chain(
  222. (
  223. (state, state_dict, mapper, connection, False)
  224. for (state, state_dict, mapper, connection) in states_to_insert
  225. ),
  226. (
  227. (state, state_dict, mapper, connection, True)
  228. for (
  229. state,
  230. state_dict,
  231. mapper,
  232. connection,
  233. update_version_id,
  234. ) in states_to_update
  235. ),
  236. ),
  237. )
  238. def post_update(base_mapper, states, uowtransaction, post_update_cols):
  239. """Issue UPDATE statements on behalf of a relationship() which
  240. specifies post_update.
  241. """
  242. states_to_update = list(
  243. _organize_states_for_post_update(base_mapper, states, uowtransaction)
  244. )
  245. for table, mapper in base_mapper._sorted_tables.items():
  246. if table not in mapper._pks_by_table:
  247. continue
  248. update = (
  249. (
  250. state,
  251. state_dict,
  252. sub_mapper,
  253. connection,
  254. mapper._get_committed_state_attr_by_column(
  255. state, state_dict, mapper.version_id_col
  256. )
  257. if mapper.version_id_col is not None
  258. else None,
  259. )
  260. for state, state_dict, sub_mapper, connection in states_to_update
  261. if table in sub_mapper._pks_by_table
  262. )
  263. update = _collect_post_update_commands(
  264. base_mapper, uowtransaction, table, update, post_update_cols
  265. )
  266. _emit_post_update_statements(
  267. base_mapper,
  268. uowtransaction,
  269. mapper,
  270. table,
  271. update,
  272. )
  273. def delete_obj(base_mapper, states, uowtransaction):
  274. """Issue ``DELETE`` statements for a list of objects.
  275. This is called within the context of a UOWTransaction during a
  276. flush operation.
  277. """
  278. states_to_delete = list(
  279. _organize_states_for_delete(base_mapper, states, uowtransaction)
  280. )
  281. table_to_mapper = base_mapper._sorted_tables
  282. for table in reversed(list(table_to_mapper.keys())):
  283. mapper = table_to_mapper[table]
  284. if table not in mapper._pks_by_table:
  285. continue
  286. elif mapper.inherits and mapper.passive_deletes:
  287. continue
  288. delete = _collect_delete_commands(
  289. base_mapper, uowtransaction, table, states_to_delete
  290. )
  291. _emit_delete_statements(
  292. base_mapper,
  293. uowtransaction,
  294. mapper,
  295. table,
  296. delete,
  297. )
  298. for (
  299. state,
  300. state_dict,
  301. mapper,
  302. connection,
  303. update_version_id,
  304. ) in states_to_delete:
  305. mapper.dispatch.after_delete(mapper, connection, state)
  306. def _organize_states_for_save(base_mapper, states, uowtransaction):
  307. """Make an initial pass across a set of states for INSERT or
  308. UPDATE.
  309. This includes splitting out into distinct lists for
  310. each, calling before_insert/before_update, obtaining
  311. key information for each state including its dictionary,
  312. mapper, the connection to use for the execution per state,
  313. and the identity flag.
  314. """
  315. for state, dict_, mapper, connection in _connections_for_states(
  316. base_mapper, uowtransaction, states
  317. ):
  318. has_identity = bool(state.key)
  319. instance_key = state.key or mapper._identity_key_from_state(state)
  320. row_switch = update_version_id = None
  321. # call before_XXX extensions
  322. if not has_identity:
  323. mapper.dispatch.before_insert(mapper, connection, state)
  324. else:
  325. mapper.dispatch.before_update(mapper, connection, state)
  326. if mapper._validate_polymorphic_identity:
  327. mapper._validate_polymorphic_identity(mapper, state, dict_)
  328. # detect if we have a "pending" instance (i.e. has
  329. # no instance_key attached to it), and another instance
  330. # with the same identity key already exists as persistent.
  331. # convert to an UPDATE if so.
  332. if (
  333. not has_identity
  334. and instance_key in uowtransaction.session.identity_map
  335. ):
  336. instance = uowtransaction.session.identity_map[instance_key]
  337. existing = attributes.instance_state(instance)
  338. if not uowtransaction.was_already_deleted(existing):
  339. if not uowtransaction.is_deleted(existing):
  340. util.warn(
  341. "New instance %s with identity key %s conflicts "
  342. "with persistent instance %s"
  343. % (state_str(state), instance_key, state_str(existing))
  344. )
  345. else:
  346. base_mapper._log_debug(
  347. "detected row switch for identity %s. "
  348. "will update %s, remove %s from "
  349. "transaction",
  350. instance_key,
  351. state_str(state),
  352. state_str(existing),
  353. )
  354. # remove the "delete" flag from the existing element
  355. uowtransaction.remove_state_actions(existing)
  356. row_switch = existing
  357. if (has_identity or row_switch) and mapper.version_id_col is not None:
  358. update_version_id = mapper._get_committed_state_attr_by_column(
  359. row_switch if row_switch else state,
  360. row_switch.dict if row_switch else dict_,
  361. mapper.version_id_col,
  362. )
  363. yield (
  364. state,
  365. dict_,
  366. mapper,
  367. connection,
  368. has_identity,
  369. row_switch,
  370. update_version_id,
  371. )
  372. def _organize_states_for_post_update(base_mapper, states, uowtransaction):
  373. """Make an initial pass across a set of states for UPDATE
  374. corresponding to post_update.
  375. This includes obtaining key information for each state
  376. including its dictionary, mapper, the connection to use for
  377. the execution per state.
  378. """
  379. return _connections_for_states(base_mapper, uowtransaction, states)
  380. def _organize_states_for_delete(base_mapper, states, uowtransaction):
  381. """Make an initial pass across a set of states for DELETE.
  382. This includes calling out before_delete and obtaining
  383. key information for each state including its dictionary,
  384. mapper, the connection to use for the execution per state.
  385. """
  386. for state, dict_, mapper, connection in _connections_for_states(
  387. base_mapper, uowtransaction, states
  388. ):
  389. mapper.dispatch.before_delete(mapper, connection, state)
  390. if mapper.version_id_col is not None:
  391. update_version_id = mapper._get_committed_state_attr_by_column(
  392. state, dict_, mapper.version_id_col
  393. )
  394. else:
  395. update_version_id = None
  396. yield (state, dict_, mapper, connection, update_version_id)
  397. def _collect_insert_commands(
  398. table,
  399. states_to_insert,
  400. bulk=False,
  401. return_defaults=False,
  402. render_nulls=False,
  403. ):
  404. """Identify sets of values to use in INSERT statements for a
  405. list of states.
  406. """
  407. for state, state_dict, mapper, connection in states_to_insert:
  408. if table not in mapper._pks_by_table:
  409. continue
  410. params = {}
  411. value_params = {}
  412. propkey_to_col = mapper._propkey_to_col[table]
  413. eval_none = mapper._insert_cols_evaluating_none[table]
  414. for propkey in set(propkey_to_col).intersection(state_dict):
  415. value = state_dict[propkey]
  416. col = propkey_to_col[propkey]
  417. if value is None and col not in eval_none and not render_nulls:
  418. continue
  419. elif not bulk and (
  420. hasattr(value, "__clause_element__")
  421. or isinstance(value, sql.ClauseElement)
  422. ):
  423. value_params[col] = (
  424. value.__clause_element__()
  425. if hasattr(value, "__clause_element__")
  426. else value
  427. )
  428. else:
  429. params[col.key] = value
  430. if not bulk:
  431. # for all the columns that have no default and we don't have
  432. # a value and where "None" is not a special value, add
  433. # explicit None to the INSERT. This is a legacy behavior
  434. # which might be worth removing, as it should not be necessary
  435. # and also produces confusion, given that "missing" and None
  436. # now have distinct meanings
  437. for colkey in (
  438. mapper._insert_cols_as_none[table]
  439. .difference(params)
  440. .difference([c.key for c in value_params])
  441. ):
  442. params[colkey] = None
  443. if not bulk or return_defaults:
  444. # params are in terms of Column key objects, so
  445. # compare to pk_keys_by_table
  446. has_all_pks = mapper._pk_keys_by_table[table].issubset(params)
  447. if mapper.base_mapper.eager_defaults:
  448. has_all_defaults = mapper._server_default_cols[table].issubset(
  449. params
  450. )
  451. else:
  452. has_all_defaults = True
  453. else:
  454. has_all_defaults = has_all_pks = True
  455. if (
  456. mapper.version_id_generator is not False
  457. and mapper.version_id_col is not None
  458. and mapper.version_id_col in mapper._cols_by_table[table]
  459. ):
  460. params[mapper.version_id_col.key] = mapper.version_id_generator(
  461. None
  462. )
  463. yield (
  464. state,
  465. state_dict,
  466. params,
  467. mapper,
  468. connection,
  469. value_params,
  470. has_all_pks,
  471. has_all_defaults,
  472. )
  473. def _collect_update_commands(
  474. uowtransaction, table, states_to_update, bulk=False
  475. ):
  476. """Identify sets of values to use in UPDATE statements for a
  477. list of states.
  478. This function works intricately with the history system
  479. to determine exactly what values should be updated
  480. as well as how the row should be matched within an UPDATE
  481. statement. Includes some tricky scenarios where the primary
  482. key of an object might have been changed.
  483. """
  484. for (
  485. state,
  486. state_dict,
  487. mapper,
  488. connection,
  489. update_version_id,
  490. ) in states_to_update:
  491. if table not in mapper._pks_by_table:
  492. continue
  493. pks = mapper._pks_by_table[table]
  494. value_params = {}
  495. propkey_to_col = mapper._propkey_to_col[table]
  496. if bulk:
  497. # keys here are mapped attribute keys, so
  498. # look at mapper attribute keys for pk
  499. params = dict(
  500. (propkey_to_col[propkey].key, state_dict[propkey])
  501. for propkey in set(propkey_to_col)
  502. .intersection(state_dict)
  503. .difference(mapper._pk_attr_keys_by_table[table])
  504. )
  505. has_all_defaults = True
  506. else:
  507. params = {}
  508. for propkey in set(propkey_to_col).intersection(
  509. state.committed_state
  510. ):
  511. value = state_dict[propkey]
  512. col = propkey_to_col[propkey]
  513. if hasattr(value, "__clause_element__") or isinstance(
  514. value, sql.ClauseElement
  515. ):
  516. value_params[col] = (
  517. value.__clause_element__()
  518. if hasattr(value, "__clause_element__")
  519. else value
  520. )
  521. # guard against values that generate non-__nonzero__
  522. # objects for __eq__()
  523. elif (
  524. state.manager[propkey].impl.is_equal(
  525. value, state.committed_state[propkey]
  526. )
  527. is not True
  528. ):
  529. params[col.key] = value
  530. if mapper.base_mapper.eager_defaults:
  531. has_all_defaults = (
  532. mapper._server_onupdate_default_cols[table]
  533. ).issubset(params)
  534. else:
  535. has_all_defaults = True
  536. if (
  537. update_version_id is not None
  538. and mapper.version_id_col in mapper._cols_by_table[table]
  539. ):
  540. if not bulk and not (params or value_params):
  541. # HACK: check for history in other tables, in case the
  542. # history is only in a different table than the one
  543. # where the version_id_col is. This logic was lost
  544. # from 0.9 -> 1.0.0 and restored in 1.0.6.
  545. for prop in mapper._columntoproperty.values():
  546. history = state.manager[prop.key].impl.get_history(
  547. state, state_dict, attributes.PASSIVE_NO_INITIALIZE
  548. )
  549. if history.added:
  550. break
  551. else:
  552. # no net change, break
  553. continue
  554. col = mapper.version_id_col
  555. no_params = not params and not value_params
  556. params[col._label] = update_version_id
  557. if (
  558. bulk or col.key not in params
  559. ) and mapper.version_id_generator is not False:
  560. val = mapper.version_id_generator(update_version_id)
  561. params[col.key] = val
  562. elif mapper.version_id_generator is False and no_params:
  563. # no version id generator, no values set on the table,
  564. # and version id wasn't manually incremented.
  565. # set version id to itself so we get an UPDATE
  566. # statement
  567. params[col.key] = update_version_id
  568. elif not (params or value_params):
  569. continue
  570. has_all_pks = True
  571. expect_pk_cascaded = False
  572. if bulk:
  573. # keys here are mapped attribute keys, so
  574. # look at mapper attribute keys for pk
  575. pk_params = dict(
  576. (propkey_to_col[propkey]._label, state_dict.get(propkey))
  577. for propkey in set(propkey_to_col).intersection(
  578. mapper._pk_attr_keys_by_table[table]
  579. )
  580. )
  581. else:
  582. pk_params = {}
  583. for col in pks:
  584. propkey = mapper._columntoproperty[col].key
  585. history = state.manager[propkey].impl.get_history(
  586. state, state_dict, attributes.PASSIVE_OFF
  587. )
  588. if history.added:
  589. if (
  590. not history.deleted
  591. or ("pk_cascaded", state, col)
  592. in uowtransaction.attributes
  593. ):
  594. expect_pk_cascaded = True
  595. pk_params[col._label] = history.added[0]
  596. params.pop(col.key, None)
  597. else:
  598. # else, use the old value to locate the row
  599. pk_params[col._label] = history.deleted[0]
  600. if col in value_params:
  601. has_all_pks = False
  602. else:
  603. pk_params[col._label] = history.unchanged[0]
  604. if pk_params[col._label] is None:
  605. raise orm_exc.FlushError(
  606. "Can't update table %s using NULL for primary "
  607. "key value on column %s" % (table, col)
  608. )
  609. if params or value_params:
  610. params.update(pk_params)
  611. yield (
  612. state,
  613. state_dict,
  614. params,
  615. mapper,
  616. connection,
  617. value_params,
  618. has_all_defaults,
  619. has_all_pks,
  620. )
  621. elif expect_pk_cascaded:
  622. # no UPDATE occurs on this table, but we expect that CASCADE rules
  623. # have changed the primary key of the row; propagate this event to
  624. # other columns that expect to have been modified. this normally
  625. # occurs after the UPDATE is emitted however we invoke it here
  626. # explicitly in the absence of our invoking an UPDATE
  627. for m, equated_pairs in mapper._table_to_equated[table]:
  628. sync.populate(
  629. state,
  630. m,
  631. state,
  632. m,
  633. equated_pairs,
  634. uowtransaction,
  635. mapper.passive_updates,
  636. )
  637. def _collect_post_update_commands(
  638. base_mapper, uowtransaction, table, states_to_update, post_update_cols
  639. ):
  640. """Identify sets of values to use in UPDATE statements for a
  641. list of states within a post_update operation.
  642. """
  643. for (
  644. state,
  645. state_dict,
  646. mapper,
  647. connection,
  648. update_version_id,
  649. ) in states_to_update:
  650. # assert table in mapper._pks_by_table
  651. pks = mapper._pks_by_table[table]
  652. params = {}
  653. hasdata = False
  654. for col in mapper._cols_by_table[table]:
  655. if col in pks:
  656. params[col._label] = mapper._get_state_attr_by_column(
  657. state, state_dict, col, passive=attributes.PASSIVE_OFF
  658. )
  659. elif col in post_update_cols or col.onupdate is not None:
  660. prop = mapper._columntoproperty[col]
  661. history = state.manager[prop.key].impl.get_history(
  662. state, state_dict, attributes.PASSIVE_NO_INITIALIZE
  663. )
  664. if history.added:
  665. value = history.added[0]
  666. params[col.key] = value
  667. hasdata = True
  668. if hasdata:
  669. if (
  670. update_version_id is not None
  671. and mapper.version_id_col in mapper._cols_by_table[table]
  672. ):
  673. col = mapper.version_id_col
  674. params[col._label] = update_version_id
  675. if (
  676. bool(state.key)
  677. and col.key not in params
  678. and mapper.version_id_generator is not False
  679. ):
  680. val = mapper.version_id_generator(update_version_id)
  681. params[col.key] = val
  682. yield state, state_dict, mapper, connection, params
  683. def _collect_delete_commands(
  684. base_mapper, uowtransaction, table, states_to_delete
  685. ):
  686. """Identify values to use in DELETE statements for a list of
  687. states to be deleted."""
  688. for (
  689. state,
  690. state_dict,
  691. mapper,
  692. connection,
  693. update_version_id,
  694. ) in states_to_delete:
  695. if table not in mapper._pks_by_table:
  696. continue
  697. params = {}
  698. for col in mapper._pks_by_table[table]:
  699. params[
  700. col.key
  701. ] = value = mapper._get_committed_state_attr_by_column(
  702. state, state_dict, col
  703. )
  704. if value is None:
  705. raise orm_exc.FlushError(
  706. "Can't delete from table %s "
  707. "using NULL for primary "
  708. "key value on column %s" % (table, col)
  709. )
  710. if (
  711. update_version_id is not None
  712. and mapper.version_id_col in mapper._cols_by_table[table]
  713. ):
  714. params[mapper.version_id_col.key] = update_version_id
  715. yield params, connection
  716. def _emit_update_statements(
  717. base_mapper,
  718. uowtransaction,
  719. mapper,
  720. table,
  721. update,
  722. bookkeeping=True,
  723. ):
  724. """Emit UPDATE statements corresponding to value lists collected
  725. by _collect_update_commands()."""
  726. needs_version_id = (
  727. mapper.version_id_col is not None
  728. and mapper.version_id_col in mapper._cols_by_table[table]
  729. )
  730. execution_options = {"compiled_cache": base_mapper._compiled_cache}
  731. def update_stmt():
  732. clauses = BooleanClauseList._construct_raw(operators.and_)
  733. for col in mapper._pks_by_table[table]:
  734. clauses.clauses.append(
  735. col == sql.bindparam(col._label, type_=col.type)
  736. )
  737. if needs_version_id:
  738. clauses.clauses.append(
  739. mapper.version_id_col
  740. == sql.bindparam(
  741. mapper.version_id_col._label,
  742. type_=mapper.version_id_col.type,
  743. )
  744. )
  745. stmt = table.update().where(clauses)
  746. return stmt
  747. cached_stmt = base_mapper._memo(("update", table), update_stmt)
  748. for (
  749. (connection, paramkeys, hasvalue, has_all_defaults, has_all_pks),
  750. records,
  751. ) in groupby(
  752. update,
  753. lambda rec: (
  754. rec[4], # connection
  755. set(rec[2]), # set of parameter keys
  756. bool(rec[5]), # whether or not we have "value" parameters
  757. rec[6], # has_all_defaults
  758. rec[7], # has all pks
  759. ),
  760. ):
  761. rows = 0
  762. records = list(records)
  763. statement = cached_stmt
  764. return_defaults = False
  765. if not has_all_pks:
  766. statement = statement.return_defaults()
  767. return_defaults = True
  768. elif (
  769. bookkeeping
  770. and not has_all_defaults
  771. and mapper.base_mapper.eager_defaults
  772. ):
  773. statement = statement.return_defaults()
  774. return_defaults = True
  775. elif mapper.version_id_col is not None:
  776. statement = statement.return_defaults(mapper.version_id_col)
  777. return_defaults = True
  778. assert_singlerow = (
  779. connection.dialect.supports_sane_rowcount
  780. if not return_defaults
  781. else connection.dialect.supports_sane_rowcount_returning
  782. )
  783. assert_multirow = (
  784. assert_singlerow
  785. and connection.dialect.supports_sane_multi_rowcount
  786. )
  787. allow_multirow = has_all_defaults and not needs_version_id
  788. if hasvalue:
  789. for (
  790. state,
  791. state_dict,
  792. params,
  793. mapper,
  794. connection,
  795. value_params,
  796. has_all_defaults,
  797. has_all_pks,
  798. ) in records:
  799. c = connection._execute_20(
  800. statement.values(value_params),
  801. params,
  802. execution_options=execution_options,
  803. )
  804. if bookkeeping:
  805. _postfetch(
  806. mapper,
  807. uowtransaction,
  808. table,
  809. state,
  810. state_dict,
  811. c,
  812. c.context.compiled_parameters[0],
  813. value_params,
  814. True,
  815. c.returned_defaults,
  816. )
  817. rows += c.rowcount
  818. check_rowcount = assert_singlerow
  819. else:
  820. if not allow_multirow:
  821. check_rowcount = assert_singlerow
  822. for (
  823. state,
  824. state_dict,
  825. params,
  826. mapper,
  827. connection,
  828. value_params,
  829. has_all_defaults,
  830. has_all_pks,
  831. ) in records:
  832. c = connection._execute_20(
  833. statement, params, execution_options=execution_options
  834. )
  835. # TODO: why with bookkeeping=False?
  836. if bookkeeping:
  837. _postfetch(
  838. mapper,
  839. uowtransaction,
  840. table,
  841. state,
  842. state_dict,
  843. c,
  844. c.context.compiled_parameters[0],
  845. value_params,
  846. True,
  847. c.returned_defaults,
  848. )
  849. rows += c.rowcount
  850. else:
  851. multiparams = [rec[2] for rec in records]
  852. check_rowcount = assert_multirow or (
  853. assert_singlerow and len(multiparams) == 1
  854. )
  855. c = connection._execute_20(
  856. statement, multiparams, execution_options=execution_options
  857. )
  858. rows += c.rowcount
  859. for (
  860. state,
  861. state_dict,
  862. params,
  863. mapper,
  864. connection,
  865. value_params,
  866. has_all_defaults,
  867. has_all_pks,
  868. ) in records:
  869. if bookkeeping:
  870. _postfetch(
  871. mapper,
  872. uowtransaction,
  873. table,
  874. state,
  875. state_dict,
  876. c,
  877. c.context.compiled_parameters[0],
  878. value_params,
  879. True,
  880. c.returned_defaults
  881. if not c.context.executemany
  882. else None,
  883. )
  884. if check_rowcount:
  885. if rows != len(records):
  886. raise orm_exc.StaleDataError(
  887. "UPDATE statement on table '%s' expected to "
  888. "update %d row(s); %d were matched."
  889. % (table.description, len(records), rows)
  890. )
  891. elif needs_version_id:
  892. util.warn(
  893. "Dialect %s does not support updated rowcount "
  894. "- versioning cannot be verified."
  895. % c.dialect.dialect_description
  896. )
  897. def _emit_insert_statements(
  898. base_mapper,
  899. uowtransaction,
  900. mapper,
  901. table,
  902. insert,
  903. bookkeeping=True,
  904. ):
  905. """Emit INSERT statements corresponding to value lists collected
  906. by _collect_insert_commands()."""
  907. cached_stmt = base_mapper._memo(("insert", table), table.insert)
  908. execution_options = {"compiled_cache": base_mapper._compiled_cache}
  909. for (
  910. (connection, pkeys, hasvalue, has_all_pks, has_all_defaults),
  911. records,
  912. ) in groupby(
  913. insert,
  914. lambda rec: (
  915. rec[4], # connection
  916. set(rec[2]), # parameter keys
  917. bool(rec[5]), # whether we have "value" parameters
  918. rec[6],
  919. rec[7],
  920. ),
  921. ):
  922. statement = cached_stmt
  923. if (
  924. not bookkeeping
  925. or (
  926. has_all_defaults
  927. or not base_mapper.eager_defaults
  928. or not connection.dialect.implicit_returning
  929. )
  930. and has_all_pks
  931. and not hasvalue
  932. ):
  933. # the "we don't need newly generated values back" section.
  934. # here we have all the PKs, all the defaults or we don't want
  935. # to fetch them, or the dialect doesn't support RETURNING at all
  936. # so we have to post-fetch / use lastrowid anyway.
  937. records = list(records)
  938. multiparams = [rec[2] for rec in records]
  939. c = connection._execute_20(
  940. statement, multiparams, execution_options=execution_options
  941. )
  942. if bookkeeping:
  943. for (
  944. (
  945. state,
  946. state_dict,
  947. params,
  948. mapper_rec,
  949. conn,
  950. value_params,
  951. has_all_pks,
  952. has_all_defaults,
  953. ),
  954. last_inserted_params,
  955. ) in zip(records, c.context.compiled_parameters):
  956. if state:
  957. _postfetch(
  958. mapper_rec,
  959. uowtransaction,
  960. table,
  961. state,
  962. state_dict,
  963. c,
  964. last_inserted_params,
  965. value_params,
  966. False,
  967. c.returned_defaults
  968. if not c.context.executemany
  969. else None,
  970. )
  971. else:
  972. _postfetch_bulk_save(mapper_rec, state_dict, table)
  973. else:
  974. # here, we need defaults and/or pk values back.
  975. records = list(records)
  976. if (
  977. not hasvalue
  978. and connection.dialect.insert_executemany_returning
  979. and len(records) > 1
  980. ):
  981. do_executemany = True
  982. else:
  983. do_executemany = False
  984. if not has_all_defaults and base_mapper.eager_defaults:
  985. statement = statement.return_defaults()
  986. elif mapper.version_id_col is not None:
  987. statement = statement.return_defaults(mapper.version_id_col)
  988. elif do_executemany:
  989. statement = statement.return_defaults(*table.primary_key)
  990. if do_executemany:
  991. multiparams = [rec[2] for rec in records]
  992. c = connection._execute_20(
  993. statement, multiparams, execution_options=execution_options
  994. )
  995. if bookkeeping:
  996. for (
  997. (
  998. state,
  999. state_dict,
  1000. params,
  1001. mapper_rec,
  1002. conn,
  1003. value_params,
  1004. has_all_pks,
  1005. has_all_defaults,
  1006. ),
  1007. last_inserted_params,
  1008. inserted_primary_key,
  1009. returned_defaults,
  1010. ) in util.zip_longest(
  1011. records,
  1012. c.context.compiled_parameters,
  1013. c.inserted_primary_key_rows,
  1014. c.returned_defaults_rows or (),
  1015. ):
  1016. for pk, col in zip(
  1017. inserted_primary_key,
  1018. mapper._pks_by_table[table],
  1019. ):
  1020. prop = mapper_rec._columntoproperty[col]
  1021. if state_dict.get(prop.key) is None:
  1022. state_dict[prop.key] = pk
  1023. if state:
  1024. _postfetch(
  1025. mapper_rec,
  1026. uowtransaction,
  1027. table,
  1028. state,
  1029. state_dict,
  1030. c,
  1031. last_inserted_params,
  1032. value_params,
  1033. False,
  1034. returned_defaults,
  1035. )
  1036. else:
  1037. _postfetch_bulk_save(mapper_rec, state_dict, table)
  1038. else:
  1039. for (
  1040. state,
  1041. state_dict,
  1042. params,
  1043. mapper_rec,
  1044. connection,
  1045. value_params,
  1046. has_all_pks,
  1047. has_all_defaults,
  1048. ) in records:
  1049. if value_params:
  1050. result = connection._execute_20(
  1051. statement.values(value_params),
  1052. params,
  1053. execution_options=execution_options,
  1054. )
  1055. else:
  1056. result = connection._execute_20(
  1057. statement,
  1058. params,
  1059. execution_options=execution_options,
  1060. )
  1061. primary_key = result.inserted_primary_key
  1062. for pk, col in zip(
  1063. primary_key, mapper._pks_by_table[table]
  1064. ):
  1065. prop = mapper_rec._columntoproperty[col]
  1066. if (
  1067. col in value_params
  1068. or state_dict.get(prop.key) is None
  1069. ):
  1070. state_dict[prop.key] = pk
  1071. if bookkeeping:
  1072. if state:
  1073. _postfetch(
  1074. mapper_rec,
  1075. uowtransaction,
  1076. table,
  1077. state,
  1078. state_dict,
  1079. result,
  1080. result.context.compiled_parameters[0],
  1081. value_params,
  1082. False,
  1083. result.returned_defaults
  1084. if not result.context.executemany
  1085. else None,
  1086. )
  1087. else:
  1088. _postfetch_bulk_save(mapper_rec, state_dict, table)
  1089. def _emit_post_update_statements(
  1090. base_mapper, uowtransaction, mapper, table, update
  1091. ):
  1092. """Emit UPDATE statements corresponding to value lists collected
  1093. by _collect_post_update_commands()."""
  1094. execution_options = {"compiled_cache": base_mapper._compiled_cache}
  1095. needs_version_id = (
  1096. mapper.version_id_col is not None
  1097. and mapper.version_id_col in mapper._cols_by_table[table]
  1098. )
  1099. def update_stmt():
  1100. clauses = BooleanClauseList._construct_raw(operators.and_)
  1101. for col in mapper._pks_by_table[table]:
  1102. clauses.clauses.append(
  1103. col == sql.bindparam(col._label, type_=col.type)
  1104. )
  1105. if needs_version_id:
  1106. clauses.clauses.append(
  1107. mapper.version_id_col
  1108. == sql.bindparam(
  1109. mapper.version_id_col._label,
  1110. type_=mapper.version_id_col.type,
  1111. )
  1112. )
  1113. stmt = table.update().where(clauses)
  1114. if mapper.version_id_col is not None:
  1115. stmt = stmt.return_defaults(mapper.version_id_col)
  1116. return stmt
  1117. statement = base_mapper._memo(("post_update", table), update_stmt)
  1118. # execute each UPDATE in the order according to the original
  1119. # list of states to guarantee row access order, but
  1120. # also group them into common (connection, cols) sets
  1121. # to support executemany().
  1122. for key, records in groupby(
  1123. update,
  1124. lambda rec: (rec[3], set(rec[4])), # connection # parameter keys
  1125. ):
  1126. rows = 0
  1127. records = list(records)
  1128. connection = key[0]
  1129. assert_singlerow = (
  1130. connection.dialect.supports_sane_rowcount
  1131. if mapper.version_id_col is None
  1132. else connection.dialect.supports_sane_rowcount_returning
  1133. )
  1134. assert_multirow = (
  1135. assert_singlerow
  1136. and connection.dialect.supports_sane_multi_rowcount
  1137. )
  1138. allow_multirow = not needs_version_id or assert_multirow
  1139. if not allow_multirow:
  1140. check_rowcount = assert_singlerow
  1141. for state, state_dict, mapper_rec, connection, params in records:
  1142. c = connection._execute_20(
  1143. statement, params, execution_options=execution_options
  1144. )
  1145. _postfetch_post_update(
  1146. mapper_rec,
  1147. uowtransaction,
  1148. table,
  1149. state,
  1150. state_dict,
  1151. c,
  1152. c.context.compiled_parameters[0],
  1153. )
  1154. rows += c.rowcount
  1155. else:
  1156. multiparams = [
  1157. params
  1158. for state, state_dict, mapper_rec, conn, params in records
  1159. ]
  1160. check_rowcount = assert_multirow or (
  1161. assert_singlerow and len(multiparams) == 1
  1162. )
  1163. c = connection._execute_20(
  1164. statement, multiparams, execution_options=execution_options
  1165. )
  1166. rows += c.rowcount
  1167. for state, state_dict, mapper_rec, connection, params in records:
  1168. _postfetch_post_update(
  1169. mapper_rec,
  1170. uowtransaction,
  1171. table,
  1172. state,
  1173. state_dict,
  1174. c,
  1175. c.context.compiled_parameters[0],
  1176. )
  1177. if check_rowcount:
  1178. if rows != len(records):
  1179. raise orm_exc.StaleDataError(
  1180. "UPDATE statement on table '%s' expected to "
  1181. "update %d row(s); %d were matched."
  1182. % (table.description, len(records), rows)
  1183. )
  1184. elif needs_version_id:
  1185. util.warn(
  1186. "Dialect %s does not support updated rowcount "
  1187. "- versioning cannot be verified."
  1188. % c.dialect.dialect_description
  1189. )
  1190. def _emit_delete_statements(
  1191. base_mapper, uowtransaction, mapper, table, delete
  1192. ):
  1193. """Emit DELETE statements corresponding to value lists collected
  1194. by _collect_delete_commands()."""
  1195. need_version_id = (
  1196. mapper.version_id_col is not None
  1197. and mapper.version_id_col in mapper._cols_by_table[table]
  1198. )
  1199. def delete_stmt():
  1200. clauses = BooleanClauseList._construct_raw(operators.and_)
  1201. for col in mapper._pks_by_table[table]:
  1202. clauses.clauses.append(
  1203. col == sql.bindparam(col.key, type_=col.type)
  1204. )
  1205. if need_version_id:
  1206. clauses.clauses.append(
  1207. mapper.version_id_col
  1208. == sql.bindparam(
  1209. mapper.version_id_col.key, type_=mapper.version_id_col.type
  1210. )
  1211. )
  1212. return table.delete().where(clauses)
  1213. statement = base_mapper._memo(("delete", table), delete_stmt)
  1214. for connection, recs in groupby(delete, lambda rec: rec[1]): # connection
  1215. del_objects = [params for params, connection in recs]
  1216. execution_options = {"compiled_cache": base_mapper._compiled_cache}
  1217. expected = len(del_objects)
  1218. rows_matched = -1
  1219. only_warn = False
  1220. if (
  1221. need_version_id
  1222. and not connection.dialect.supports_sane_multi_rowcount
  1223. ):
  1224. if connection.dialect.supports_sane_rowcount:
  1225. rows_matched = 0
  1226. # execute deletes individually so that versioned
  1227. # rows can be verified
  1228. for params in del_objects:
  1229. c = connection._execute_20(
  1230. statement, params, execution_options=execution_options
  1231. )
  1232. rows_matched += c.rowcount
  1233. else:
  1234. util.warn(
  1235. "Dialect %s does not support deleted rowcount "
  1236. "- versioning cannot be verified."
  1237. % connection.dialect.dialect_description
  1238. )
  1239. connection._execute_20(
  1240. statement, del_objects, execution_options=execution_options
  1241. )
  1242. else:
  1243. c = connection._execute_20(
  1244. statement, del_objects, execution_options=execution_options
  1245. )
  1246. if not need_version_id:
  1247. only_warn = True
  1248. rows_matched = c.rowcount
  1249. if (
  1250. base_mapper.confirm_deleted_rows
  1251. and rows_matched > -1
  1252. and expected != rows_matched
  1253. and (
  1254. connection.dialect.supports_sane_multi_rowcount
  1255. or len(del_objects) == 1
  1256. )
  1257. ):
  1258. # TODO: why does this "only warn" if versioning is turned off,
  1259. # whereas the UPDATE raises?
  1260. if only_warn:
  1261. util.warn(
  1262. "DELETE statement on table '%s' expected to "
  1263. "delete %d row(s); %d were matched. Please set "
  1264. "confirm_deleted_rows=False within the mapper "
  1265. "configuration to prevent this warning."
  1266. % (table.description, expected, rows_matched)
  1267. )
  1268. else:
  1269. raise orm_exc.StaleDataError(
  1270. "DELETE statement on table '%s' expected to "
  1271. "delete %d row(s); %d were matched. Please set "
  1272. "confirm_deleted_rows=False within the mapper "
  1273. "configuration to prevent this warning."
  1274. % (table.description, expected, rows_matched)
  1275. )
  1276. def _finalize_insert_update_commands(base_mapper, uowtransaction, states):
  1277. """finalize state on states that have been inserted or updated,
  1278. including calling after_insert/after_update events.
  1279. """
  1280. for state, state_dict, mapper, connection, has_identity in states:
  1281. if mapper._readonly_props:
  1282. readonly = state.unmodified_intersection(
  1283. [
  1284. p.key
  1285. for p in mapper._readonly_props
  1286. if (
  1287. p.expire_on_flush
  1288. and (not p.deferred or p.key in state.dict)
  1289. )
  1290. or (
  1291. not p.expire_on_flush
  1292. and not p.deferred
  1293. and p.key not in state.dict
  1294. )
  1295. ]
  1296. )
  1297. if readonly:
  1298. state._expire_attributes(state.dict, readonly)
  1299. # if eager_defaults option is enabled, load
  1300. # all expired cols. Else if we have a version_id_col, make sure
  1301. # it isn't expired.
  1302. toload_now = []
  1303. if base_mapper.eager_defaults:
  1304. toload_now.extend(
  1305. state._unloaded_non_object.intersection(
  1306. mapper._server_default_plus_onupdate_propkeys
  1307. )
  1308. )
  1309. if (
  1310. mapper.version_id_col is not None
  1311. and mapper.version_id_generator is False
  1312. ):
  1313. if mapper._version_id_prop.key in state.unloaded:
  1314. toload_now.extend([mapper._version_id_prop.key])
  1315. if toload_now:
  1316. state.key = base_mapper._identity_key_from_state(state)
  1317. stmt = future.select(mapper).set_label_style(
  1318. LABEL_STYLE_TABLENAME_PLUS_COL
  1319. )
  1320. loading.load_on_ident(
  1321. uowtransaction.session,
  1322. stmt,
  1323. state.key,
  1324. refresh_state=state,
  1325. only_load_props=toload_now,
  1326. )
  1327. # call after_XXX extensions
  1328. if not has_identity:
  1329. mapper.dispatch.after_insert(mapper, connection, state)
  1330. else:
  1331. mapper.dispatch.after_update(mapper, connection, state)
  1332. if (
  1333. mapper.version_id_generator is False
  1334. and mapper.version_id_col is not None
  1335. ):
  1336. if state_dict[mapper._version_id_prop.key] is None:
  1337. raise orm_exc.FlushError(
  1338. "Instance does not contain a non-NULL version value"
  1339. )
  1340. def _postfetch_post_update(
  1341. mapper, uowtransaction, table, state, dict_, result, params
  1342. ):
  1343. if uowtransaction.is_deleted(state):
  1344. return
  1345. prefetch_cols = result.context.compiled.prefetch
  1346. postfetch_cols = result.context.compiled.postfetch
  1347. if (
  1348. mapper.version_id_col is not None
  1349. and mapper.version_id_col in mapper._cols_by_table[table]
  1350. ):
  1351. prefetch_cols = list(prefetch_cols) + [mapper.version_id_col]
  1352. refresh_flush = bool(mapper.class_manager.dispatch.refresh_flush)
  1353. if refresh_flush:
  1354. load_evt_attrs = []
  1355. for c in prefetch_cols:
  1356. if c.key in params and c in mapper._columntoproperty:
  1357. dict_[mapper._columntoproperty[c].key] = params[c.key]
  1358. if refresh_flush:
  1359. load_evt_attrs.append(mapper._columntoproperty[c].key)
  1360. if refresh_flush and load_evt_attrs:
  1361. mapper.class_manager.dispatch.refresh_flush(
  1362. state, uowtransaction, load_evt_attrs
  1363. )
  1364. if postfetch_cols:
  1365. state._expire_attributes(
  1366. state.dict,
  1367. [
  1368. mapper._columntoproperty[c].key
  1369. for c in postfetch_cols
  1370. if c in mapper._columntoproperty
  1371. ],
  1372. )
  1373. def _postfetch(
  1374. mapper,
  1375. uowtransaction,
  1376. table,
  1377. state,
  1378. dict_,
  1379. result,
  1380. params,
  1381. value_params,
  1382. isupdate,
  1383. returned_defaults,
  1384. ):
  1385. """Expire attributes in need of newly persisted database state,
  1386. after an INSERT or UPDATE statement has proceeded for that
  1387. state."""
  1388. prefetch_cols = result.context.compiled.prefetch
  1389. postfetch_cols = result.context.compiled.postfetch
  1390. returning_cols = result.context.compiled.returning
  1391. if (
  1392. mapper.version_id_col is not None
  1393. and mapper.version_id_col in mapper._cols_by_table[table]
  1394. ):
  1395. prefetch_cols = list(prefetch_cols) + [mapper.version_id_col]
  1396. refresh_flush = bool(mapper.class_manager.dispatch.refresh_flush)
  1397. if refresh_flush:
  1398. load_evt_attrs = []
  1399. if returning_cols:
  1400. row = returned_defaults
  1401. if row is not None:
  1402. for row_value, col in zip(row, returning_cols):
  1403. # pk cols returned from insert are handled
  1404. # distinctly, don't step on the values here
  1405. if col.primary_key and result.context.isinsert:
  1406. continue
  1407. # note that columns can be in the "return defaults" that are
  1408. # not mapped to this mapper, typically because they are
  1409. # "excluded", which can be specified directly or also occurs
  1410. # when using declarative w/ single table inheritance
  1411. prop = mapper._columntoproperty.get(col)
  1412. if prop:
  1413. dict_[prop.key] = row_value
  1414. if refresh_flush:
  1415. load_evt_attrs.append(prop.key)
  1416. for c in prefetch_cols:
  1417. if c.key in params and c in mapper._columntoproperty:
  1418. dict_[mapper._columntoproperty[c].key] = params[c.key]
  1419. if refresh_flush:
  1420. load_evt_attrs.append(mapper._columntoproperty[c].key)
  1421. if refresh_flush and load_evt_attrs:
  1422. mapper.class_manager.dispatch.refresh_flush(
  1423. state, uowtransaction, load_evt_attrs
  1424. )
  1425. if isupdate and value_params:
  1426. # explicitly suit the use case specified by
  1427. # [ticket:3801], PK SQL expressions for UPDATE on non-RETURNING
  1428. # database which are set to themselves in order to do a version bump.
  1429. postfetch_cols.extend(
  1430. [
  1431. col
  1432. for col in value_params
  1433. if col.primary_key and col not in returning_cols
  1434. ]
  1435. )
  1436. if postfetch_cols:
  1437. state._expire_attributes(
  1438. state.dict,
  1439. [
  1440. mapper._columntoproperty[c].key
  1441. for c in postfetch_cols
  1442. if c in mapper._columntoproperty
  1443. ],
  1444. )
  1445. # synchronize newly inserted ids from one table to the next
  1446. # TODO: this still goes a little too often. would be nice to
  1447. # have definitive list of "columns that changed" here
  1448. for m, equated_pairs in mapper._table_to_equated[table]:
  1449. sync.populate(
  1450. state,
  1451. m,
  1452. state,
  1453. m,
  1454. equated_pairs,
  1455. uowtransaction,
  1456. mapper.passive_updates,
  1457. )
  1458. def _postfetch_bulk_save(mapper, dict_, table):
  1459. for m, equated_pairs in mapper._table_to_equated[table]:
  1460. sync.bulk_populate_inherit_keys(dict_, m, equated_pairs)
  1461. def _connections_for_states(base_mapper, uowtransaction, states):
  1462. """Return an iterator of (state, state.dict, mapper, connection).
  1463. The states are sorted according to _sort_states, then paired
  1464. with the connection they should be using for the given
  1465. unit of work transaction.
  1466. """
  1467. # if session has a connection callable,
  1468. # organize individual states with the connection
  1469. # to use for update
  1470. if uowtransaction.session.connection_callable:
  1471. connection_callable = uowtransaction.session.connection_callable
  1472. else:
  1473. connection = uowtransaction.transaction.connection(base_mapper)
  1474. connection_callable = None
  1475. for state in _sort_states(base_mapper, states):
  1476. if connection_callable:
  1477. connection = connection_callable(base_mapper, state.obj())
  1478. mapper = state.manager.mapper
  1479. yield state, state.dict, mapper, connection
  1480. def _sort_states(mapper, states):
  1481. pending = set(states)
  1482. persistent = set(s for s in pending if s.key is not None)
  1483. pending.difference_update(persistent)
  1484. try:
  1485. persistent_sorted = sorted(
  1486. persistent, key=mapper._persistent_sortkey_fn
  1487. )
  1488. except TypeError as err:
  1489. util.raise_(
  1490. sa_exc.InvalidRequestError(
  1491. "Could not sort objects by primary key; primary key "
  1492. "values must be sortable in Python (was: %s)" % err
  1493. ),
  1494. replace_context=err,
  1495. )
  1496. return (
  1497. sorted(pending, key=operator.attrgetter("insert_order"))
  1498. + persistent_sorted
  1499. )
  1500. _EMPTY_DICT = util.immutabledict()
  1501. class BulkUDCompileState(CompileState):
  1502. class default_update_options(Options):
  1503. _synchronize_session = "evaluate"
  1504. _autoflush = True
  1505. _subject_mapper = None
  1506. _resolved_values = _EMPTY_DICT
  1507. _resolved_keys_as_propnames = _EMPTY_DICT
  1508. _value_evaluators = _EMPTY_DICT
  1509. _matched_objects = None
  1510. _matched_rows = None
  1511. _refresh_identity_token = None
  1512. @classmethod
  1513. def orm_pre_session_exec(
  1514. cls,
  1515. session,
  1516. statement,
  1517. params,
  1518. execution_options,
  1519. bind_arguments,
  1520. is_reentrant_invoke,
  1521. ):
  1522. if is_reentrant_invoke:
  1523. return statement, execution_options
  1524. (
  1525. update_options,
  1526. execution_options,
  1527. ) = BulkUDCompileState.default_update_options.from_execution_options(
  1528. "_sa_orm_update_options",
  1529. {"synchronize_session"},
  1530. execution_options,
  1531. statement._execution_options,
  1532. )
  1533. sync = update_options._synchronize_session
  1534. if sync is not None:
  1535. if sync not in ("evaluate", "fetch", False):
  1536. raise sa_exc.ArgumentError(
  1537. "Valid strategies for session synchronization "
  1538. "are 'evaluate', 'fetch', False"
  1539. )
  1540. bind_arguments["clause"] = statement
  1541. try:
  1542. plugin_subject = statement._propagate_attrs["plugin_subject"]
  1543. except KeyError:
  1544. assert False, "statement had 'orm' plugin but no plugin_subject"
  1545. else:
  1546. bind_arguments["mapper"] = plugin_subject.mapper
  1547. update_options += {"_subject_mapper": plugin_subject.mapper}
  1548. if update_options._autoflush:
  1549. session._autoflush()
  1550. statement = statement._annotate(
  1551. {"synchronize_session": update_options._synchronize_session}
  1552. )
  1553. # this stage of the execution is called before the do_orm_execute event
  1554. # hook. meaning for an extension like horizontal sharding, this step
  1555. # happens before the extension splits out into multiple backends and
  1556. # runs only once. if we do pre_sync_fetch, we execute a SELECT
  1557. # statement, which the horizontal sharding extension splits amongst the
  1558. # shards and combines the results together.
  1559. if update_options._synchronize_session == "evaluate":
  1560. update_options = cls._do_pre_synchronize_evaluate(
  1561. session,
  1562. statement,
  1563. params,
  1564. execution_options,
  1565. bind_arguments,
  1566. update_options,
  1567. )
  1568. elif update_options._synchronize_session == "fetch":
  1569. update_options = cls._do_pre_synchronize_fetch(
  1570. session,
  1571. statement,
  1572. params,
  1573. execution_options,
  1574. bind_arguments,
  1575. update_options,
  1576. )
  1577. return (
  1578. statement,
  1579. util.immutabledict(execution_options).union(
  1580. {"_sa_orm_update_options": update_options}
  1581. ),
  1582. )
  1583. @classmethod
  1584. def orm_setup_cursor_result(
  1585. cls,
  1586. session,
  1587. statement,
  1588. params,
  1589. execution_options,
  1590. bind_arguments,
  1591. result,
  1592. ):
  1593. # this stage of the execution is called after the
  1594. # do_orm_execute event hook. meaning for an extension like
  1595. # horizontal sharding, this step happens *within* the horizontal
  1596. # sharding event handler which calls session.execute() re-entrantly
  1597. # and will occur for each backend individually.
  1598. # the sharding extension then returns its own merged result from the
  1599. # individual ones we return here.
  1600. update_options = execution_options["_sa_orm_update_options"]
  1601. if update_options._synchronize_session == "evaluate":
  1602. cls._do_post_synchronize_evaluate(session, result, update_options)
  1603. elif update_options._synchronize_session == "fetch":
  1604. cls._do_post_synchronize_fetch(session, result, update_options)
  1605. return result
  1606. @classmethod
  1607. def _adjust_for_extra_criteria(cls, global_attributes, ext_info):
  1608. """Apply extra criteria filtering.
  1609. For all distinct single-table-inheritance mappers represented in the
  1610. table being updated or deleted, produce additional WHERE criteria such
  1611. that only the appropriate subtypes are selected from the total results.
  1612. Additionally, add WHERE criteria originating from LoaderCriteriaOptions
  1613. collected from the statement.
  1614. """
  1615. return_crit = ()
  1616. adapter = ext_info._adapter if ext_info.is_aliased_class else None
  1617. if (
  1618. "additional_entity_criteria",
  1619. ext_info.mapper,
  1620. ) in global_attributes:
  1621. return_crit += tuple(
  1622. ae._resolve_where_criteria(ext_info)
  1623. for ae in global_attributes[
  1624. ("additional_entity_criteria", ext_info.mapper)
  1625. ]
  1626. if ae.include_aliases or ae.entity is ext_info
  1627. )
  1628. if ext_info.mapper._single_table_criterion is not None:
  1629. return_crit += (ext_info.mapper._single_table_criterion,)
  1630. if adapter:
  1631. return_crit = tuple(adapter.traverse(crit) for crit in return_crit)
  1632. return return_crit
  1633. @classmethod
  1634. def _do_pre_synchronize_evaluate(
  1635. cls,
  1636. session,
  1637. statement,
  1638. params,
  1639. execution_options,
  1640. bind_arguments,
  1641. update_options,
  1642. ):
  1643. mapper = update_options._subject_mapper
  1644. target_cls = mapper.class_
  1645. value_evaluators = resolved_keys_as_propnames = _EMPTY_DICT
  1646. try:
  1647. evaluator_compiler = evaluator.EvaluatorCompiler(target_cls)
  1648. crit = ()
  1649. if statement._where_criteria:
  1650. crit += statement._where_criteria
  1651. global_attributes = {}
  1652. for opt in statement._with_options:
  1653. if opt._is_criteria_option:
  1654. opt.get_global_criteria(global_attributes)
  1655. if global_attributes:
  1656. crit += cls._adjust_for_extra_criteria(
  1657. global_attributes, mapper
  1658. )
  1659. if crit:
  1660. eval_condition = evaluator_compiler.process(*crit)
  1661. else:
  1662. def eval_condition(obj):
  1663. return True
  1664. except evaluator.UnevaluatableError as err:
  1665. util.raise_(
  1666. sa_exc.InvalidRequestError(
  1667. 'Could not evaluate current criteria in Python: "%s". '
  1668. "Specify 'fetch' or False for the "
  1669. "synchronize_session execution option." % err
  1670. ),
  1671. from_=err,
  1672. )
  1673. if statement.__visit_name__ == "lambda_element":
  1674. # ._resolved is called on every LambdaElement in order to
  1675. # generate the cache key, so this access does not add
  1676. # additional expense
  1677. effective_statement = statement._resolved
  1678. else:
  1679. effective_statement = statement
  1680. if effective_statement.__visit_name__ == "update":
  1681. resolved_values = cls._get_resolved_values(
  1682. mapper, effective_statement
  1683. )
  1684. value_evaluators = {}
  1685. resolved_keys_as_propnames = cls._resolved_keys_as_propnames(
  1686. mapper, resolved_values
  1687. )
  1688. for key, value in resolved_keys_as_propnames:
  1689. try:
  1690. _evaluator = evaluator_compiler.process(
  1691. coercions.expect(roles.ExpressionElementRole, value)
  1692. )
  1693. except evaluator.UnevaluatableError:
  1694. pass
  1695. else:
  1696. value_evaluators[key] = _evaluator
  1697. # TODO: detect when the where clause is a trivial primary key match.
  1698. matched_objects = [
  1699. state.obj()
  1700. for state in session.identity_map.all_states()
  1701. if state.mapper.isa(mapper)
  1702. and not state.expired
  1703. and eval_condition(state.obj())
  1704. and (
  1705. update_options._refresh_identity_token is None
  1706. # TODO: coverage for the case where horizontal sharding
  1707. # invokes an update() or delete() given an explicit identity
  1708. # token up front
  1709. or state.identity_token
  1710. == update_options._refresh_identity_token
  1711. )
  1712. ]
  1713. return update_options + {
  1714. "_matched_objects": matched_objects,
  1715. "_value_evaluators": value_evaluators,
  1716. "_resolved_keys_as_propnames": resolved_keys_as_propnames,
  1717. }
  1718. @classmethod
  1719. def _get_resolved_values(cls, mapper, statement):
  1720. if statement._multi_values:
  1721. return []
  1722. elif statement._ordered_values:
  1723. return list(statement._ordered_values)
  1724. elif statement._values:
  1725. return list(statement._values.items())
  1726. else:
  1727. return []
  1728. @classmethod
  1729. def _resolved_keys_as_propnames(cls, mapper, resolved_values):
  1730. values = []
  1731. for k, v in resolved_values:
  1732. if isinstance(k, attributes.QueryableAttribute):
  1733. values.append((k.key, v))
  1734. continue
  1735. elif hasattr(k, "__clause_element__"):
  1736. k = k.__clause_element__()
  1737. if mapper and isinstance(k, expression.ColumnElement):
  1738. try:
  1739. attr = mapper._columntoproperty[k]
  1740. except orm_exc.UnmappedColumnError:
  1741. pass
  1742. else:
  1743. values.append((attr.key, v))
  1744. else:
  1745. raise sa_exc.InvalidRequestError(
  1746. "Invalid expression type: %r" % k
  1747. )
  1748. return values
  1749. @classmethod
  1750. def _do_pre_synchronize_fetch(
  1751. cls,
  1752. session,
  1753. statement,
  1754. params,
  1755. execution_options,
  1756. bind_arguments,
  1757. update_options,
  1758. ):
  1759. mapper = update_options._subject_mapper
  1760. select_stmt = (
  1761. select(*(mapper.primary_key + (mapper.select_identity_token,)))
  1762. .select_from(mapper)
  1763. .options(*statement._with_options)
  1764. )
  1765. select_stmt._where_criteria = statement._where_criteria
  1766. def skip_for_full_returning(orm_context):
  1767. bind = orm_context.session.get_bind(**orm_context.bind_arguments)
  1768. if bind.dialect.full_returning:
  1769. return _result.null_result()
  1770. else:
  1771. return None
  1772. result = session.execute(
  1773. select_stmt,
  1774. params,
  1775. execution_options,
  1776. bind_arguments,
  1777. _add_event=skip_for_full_returning,
  1778. )
  1779. matched_rows = result.fetchall()
  1780. value_evaluators = _EMPTY_DICT
  1781. if statement.__visit_name__ == "lambda_element":
  1782. # ._resolved is called on every LambdaElement in order to
  1783. # generate the cache key, so this access does not add
  1784. # additional expense
  1785. effective_statement = statement._resolved
  1786. else:
  1787. effective_statement = statement
  1788. if effective_statement.__visit_name__ == "update":
  1789. target_cls = mapper.class_
  1790. evaluator_compiler = evaluator.EvaluatorCompiler(target_cls)
  1791. resolved_values = cls._get_resolved_values(
  1792. mapper, effective_statement
  1793. )
  1794. resolved_keys_as_propnames = cls._resolved_keys_as_propnames(
  1795. mapper, resolved_values
  1796. )
  1797. resolved_keys_as_propnames = cls._resolved_keys_as_propnames(
  1798. mapper, resolved_values
  1799. )
  1800. value_evaluators = {}
  1801. for key, value in resolved_keys_as_propnames:
  1802. try:
  1803. _evaluator = evaluator_compiler.process(
  1804. coercions.expect(roles.ExpressionElementRole, value)
  1805. )
  1806. except evaluator.UnevaluatableError:
  1807. pass
  1808. else:
  1809. value_evaluators[key] = _evaluator
  1810. else:
  1811. resolved_keys_as_propnames = _EMPTY_DICT
  1812. return update_options + {
  1813. "_value_evaluators": value_evaluators,
  1814. "_matched_rows": matched_rows,
  1815. "_resolved_keys_as_propnames": resolved_keys_as_propnames,
  1816. }
  1817. @CompileState.plugin_for("orm", "update")
  1818. class BulkORMUpdate(UpdateDMLState, BulkUDCompileState):
  1819. @classmethod
  1820. def create_for_statement(cls, statement, compiler, **kw):
  1821. self = cls.__new__(cls)
  1822. ext_info = statement.table._annotations["parententity"]
  1823. self.mapper = mapper = ext_info.mapper
  1824. self.extra_criteria_entities = {}
  1825. self._resolved_values = cls._get_resolved_values(mapper, statement)
  1826. extra_criteria_attributes = {}
  1827. for opt in statement._with_options:
  1828. if opt._is_criteria_option:
  1829. opt.get_global_criteria(extra_criteria_attributes)
  1830. if not statement._preserve_parameter_order and statement._values:
  1831. self._resolved_values = dict(self._resolved_values)
  1832. new_stmt = sql.Update.__new__(sql.Update)
  1833. new_stmt.__dict__.update(statement.__dict__)
  1834. new_stmt.table = mapper.local_table
  1835. # note if the statement has _multi_values, these
  1836. # are passed through to the new statement, which will then raise
  1837. # InvalidRequestError because UPDATE doesn't support multi_values
  1838. # right now.
  1839. if statement._ordered_values:
  1840. new_stmt._ordered_values = self._resolved_values
  1841. elif statement._values:
  1842. new_stmt._values = self._resolved_values
  1843. new_crit = cls._adjust_for_extra_criteria(
  1844. extra_criteria_attributes, mapper
  1845. )
  1846. if new_crit:
  1847. new_stmt = new_stmt.where(*new_crit)
  1848. # if we are against a lambda statement we might not be the
  1849. # topmost object that received per-execute annotations
  1850. if (
  1851. compiler._annotations.get("synchronize_session", None) == "fetch"
  1852. and compiler.dialect.full_returning
  1853. ):
  1854. if new_stmt._returning:
  1855. raise sa_exc.InvalidRequestError(
  1856. "Can't use synchronize_session='fetch' "
  1857. "with explicit returning()"
  1858. )
  1859. new_stmt = new_stmt.returning(*mapper.primary_key)
  1860. UpdateDMLState.__init__(self, new_stmt, compiler, **kw)
  1861. return self
  1862. @classmethod
  1863. def _get_crud_kv_pairs(cls, statement, kv_iterator):
  1864. plugin_subject = statement._propagate_attrs["plugin_subject"]
  1865. core_get_crud_kv_pairs = UpdateDMLState._get_crud_kv_pairs
  1866. if not plugin_subject or not plugin_subject.mapper:
  1867. return core_get_crud_kv_pairs(statement, kv_iterator)
  1868. mapper = plugin_subject.mapper
  1869. values = []
  1870. for k, v in kv_iterator:
  1871. k = coercions.expect(roles.DMLColumnRole, k)
  1872. if isinstance(k, util.string_types):
  1873. desc = _entity_namespace_key(mapper, k, default=NO_VALUE)
  1874. if desc is NO_VALUE:
  1875. values.append(
  1876. (
  1877. k,
  1878. coercions.expect(
  1879. roles.ExpressionElementRole,
  1880. v,
  1881. type_=sqltypes.NullType(),
  1882. is_crud=True,
  1883. ),
  1884. )
  1885. )
  1886. else:
  1887. values.extend(
  1888. core_get_crud_kv_pairs(
  1889. statement, desc._bulk_update_tuples(v)
  1890. )
  1891. )
  1892. elif "entity_namespace" in k._annotations:
  1893. k_anno = k._annotations
  1894. attr = _entity_namespace_key(
  1895. k_anno["entity_namespace"], k_anno["proxy_key"]
  1896. )
  1897. values.extend(
  1898. core_get_crud_kv_pairs(
  1899. statement, attr._bulk_update_tuples(v)
  1900. )
  1901. )
  1902. else:
  1903. values.append(
  1904. (
  1905. k,
  1906. coercions.expect(
  1907. roles.ExpressionElementRole,
  1908. v,
  1909. type_=sqltypes.NullType(),
  1910. is_crud=True,
  1911. ),
  1912. )
  1913. )
  1914. return values
  1915. @classmethod
  1916. def _do_post_synchronize_evaluate(cls, session, result, update_options):
  1917. states = set()
  1918. evaluated_keys = list(update_options._value_evaluators.keys())
  1919. values = update_options._resolved_keys_as_propnames
  1920. attrib = set(k for k, v in values)
  1921. for obj in update_options._matched_objects:
  1922. state, dict_ = (
  1923. attributes.instance_state(obj),
  1924. attributes.instance_dict(obj),
  1925. )
  1926. # the evaluated states were gathered across all identity tokens.
  1927. # however the post_sync events are called per identity token,
  1928. # so filter.
  1929. if (
  1930. update_options._refresh_identity_token is not None
  1931. and state.identity_token
  1932. != update_options._refresh_identity_token
  1933. ):
  1934. continue
  1935. # only evaluate unmodified attributes
  1936. to_evaluate = state.unmodified.intersection(evaluated_keys)
  1937. for key in to_evaluate:
  1938. if key in dict_:
  1939. dict_[key] = update_options._value_evaluators[key](obj)
  1940. state.manager.dispatch.refresh(state, None, to_evaluate)
  1941. state._commit(dict_, list(to_evaluate))
  1942. to_expire = attrib.intersection(dict_).difference(to_evaluate)
  1943. if to_expire:
  1944. state._expire_attributes(dict_, to_expire)
  1945. states.add(state)
  1946. session._register_altered(states)
  1947. @classmethod
  1948. def _do_post_synchronize_fetch(cls, session, result, update_options):
  1949. target_mapper = update_options._subject_mapper
  1950. states = set()
  1951. evaluated_keys = list(update_options._value_evaluators.keys())
  1952. if result.returns_rows:
  1953. matched_rows = [
  1954. tuple(row) + (update_options._refresh_identity_token,)
  1955. for row in result.all()
  1956. ]
  1957. else:
  1958. matched_rows = update_options._matched_rows
  1959. objs = [
  1960. session.identity_map[identity_key]
  1961. for identity_key in [
  1962. target_mapper.identity_key_from_primary_key(
  1963. list(primary_key),
  1964. identity_token=identity_token,
  1965. )
  1966. for primary_key, identity_token in [
  1967. (row[0:-1], row[-1]) for row in matched_rows
  1968. ]
  1969. if update_options._refresh_identity_token is None
  1970. or identity_token == update_options._refresh_identity_token
  1971. ]
  1972. if identity_key in session.identity_map
  1973. ]
  1974. values = update_options._resolved_keys_as_propnames
  1975. attrib = set(k for k, v in values)
  1976. for obj in objs:
  1977. state, dict_ = (
  1978. attributes.instance_state(obj),
  1979. attributes.instance_dict(obj),
  1980. )
  1981. to_evaluate = state.unmodified.intersection(evaluated_keys)
  1982. for key in to_evaluate:
  1983. if key in dict_:
  1984. dict_[key] = update_options._value_evaluators[key](obj)
  1985. state.manager.dispatch.refresh(state, None, to_evaluate)
  1986. state._commit(dict_, list(to_evaluate))
  1987. to_expire = attrib.intersection(dict_).difference(to_evaluate)
  1988. if to_expire:
  1989. state._expire_attributes(dict_, to_expire)
  1990. states.add(state)
  1991. session._register_altered(states)
  1992. @CompileState.plugin_for("orm", "delete")
  1993. class BulkORMDelete(DeleteDMLState, BulkUDCompileState):
  1994. @classmethod
  1995. def create_for_statement(cls, statement, compiler, **kw):
  1996. self = cls.__new__(cls)
  1997. ext_info = statement.table._annotations["parententity"]
  1998. self.mapper = mapper = ext_info.mapper
  1999. self.extra_criteria_entities = {}
  2000. extra_criteria_attributes = {}
  2001. for opt in statement._with_options:
  2002. if opt._is_criteria_option:
  2003. opt.get_global_criteria(extra_criteria_attributes)
  2004. new_crit = cls._adjust_for_extra_criteria(
  2005. extra_criteria_attributes, mapper
  2006. )
  2007. if new_crit:
  2008. statement = statement.where(*new_crit)
  2009. if (
  2010. mapper
  2011. and compiler._annotations.get("synchronize_session", None)
  2012. == "fetch"
  2013. and compiler.dialect.full_returning
  2014. ):
  2015. statement = statement.returning(*mapper.primary_key)
  2016. DeleteDMLState.__init__(self, statement, compiler, **kw)
  2017. return self
  2018. @classmethod
  2019. def _do_post_synchronize_evaluate(cls, session, result, update_options):
  2020. session._remove_newly_deleted(
  2021. [
  2022. attributes.instance_state(obj)
  2023. for obj in update_options._matched_objects
  2024. ]
  2025. )
  2026. @classmethod
  2027. def _do_post_synchronize_fetch(cls, session, result, update_options):
  2028. target_mapper = update_options._subject_mapper
  2029. if result.returns_rows:
  2030. matched_rows = [
  2031. tuple(row) + (update_options._refresh_identity_token,)
  2032. for row in result.all()
  2033. ]
  2034. else:
  2035. matched_rows = update_options._matched_rows
  2036. for row in matched_rows:
  2037. primary_key = row[0:-1]
  2038. identity_token = row[-1]
  2039. # TODO: inline this and call remove_newly_deleted
  2040. # once
  2041. identity_key = target_mapper.identity_key_from_primary_key(
  2042. list(primary_key),
  2043. identity_token=identity_token,
  2044. )
  2045. if identity_key in session.identity_map:
  2046. session._remove_newly_deleted(
  2047. [
  2048. attributes.instance_state(
  2049. session.identity_map[identity_key]
  2050. )
  2051. ]
  2052. )