1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465 |
- # orm/loading.py
- # Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
- # <see AUTHORS file>
- #
- # This module is part of SQLAlchemy and is released under
- # the MIT License: https://www.opensource.org/licenses/mit-license.php
- """private module containing functions used to convert database
- rows into object instances and associated state.
- the functions here are called primarily by Query, Mapper,
- as well as some of the attribute loading strategies.
- """
- from __future__ import absolute_import
- from . import attributes
- from . import exc as orm_exc
- from . import path_registry
- from . import strategy_options
- from .base import _DEFER_FOR_STATE
- from .base import _RAISE_FOR_STATE
- from .base import _SET_DEFERRED_EXPIRED
- from .util import _none_set
- from .util import state_str
- from .. import exc as sa_exc
- from .. import future
- from .. import util
- from ..engine import result_tuple
- from ..engine.result import ChunkedIteratorResult
- from ..engine.result import FrozenResult
- from ..engine.result import SimpleResultMetaData
- from ..sql import util as sql_util
- from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
- from ..sql.selectable import SelectState
- _new_runid = util.counter()
- def instances(cursor, context):
- """Return a :class:`.Result` given an ORM query context.
- :param cursor: a :class:`.CursorResult`, generated by a statement
- which came from :class:`.ORMCompileState`
- :param context: a :class:`.QueryContext` object
- :return: a :class:`.Result` object representing ORM results
- .. versionchanged:: 1.4 The instances() function now uses
- :class:`.Result` objects and has an all new interface.
- """
- context.runid = _new_runid()
- context.post_load_paths = {}
- compile_state = context.compile_state
- filtered = compile_state._has_mapper_entities
- single_entity = (
- not context.load_options._only_return_tuples
- and len(compile_state._entities) == 1
- and compile_state._entities[0].supports_single_entity
- )
- try:
- (process, labels, extra) = list(
- zip(
- *[
- query_entity.row_processor(context, cursor)
- for query_entity in context.compile_state._entities
- ]
- )
- )
- if context.yield_per and (
- context.loaders_require_buffering
- or context.loaders_require_uniquing
- ):
- raise sa_exc.InvalidRequestError(
- "Can't use yield_per with eager loaders that require uniquing "
- "or row buffering, e.g. joinedload() against collections "
- "or subqueryload(). Consider the selectinload() strategy "
- "for better flexibility in loading objects."
- )
- except Exception:
- with util.safe_reraise():
- cursor.close()
- def _no_unique(entry):
- raise sa_exc.InvalidRequestError(
- "Can't use the ORM yield_per feature in conjunction with unique()"
- )
- def _not_hashable(datatype):
- def go(obj):
- raise sa_exc.InvalidRequestError(
- "Can't apply uniqueness to row tuple containing value of "
- "type %r; this datatype produces non-hashable values"
- % datatype
- )
- return go
- if context.load_options._legacy_uniquing:
- unique_filters = [
- _no_unique
- if context.yield_per
- else id
- if (
- ent.use_id_for_hash
- or ent._non_hashable_value
- or ent._null_column_type
- )
- else None
- for ent in context.compile_state._entities
- ]
- else:
- unique_filters = [
- _no_unique
- if context.yield_per
- else _not_hashable(ent.column.type)
- if (not ent.use_id_for_hash and ent._non_hashable_value)
- else id
- if ent.use_id_for_hash
- else None
- for ent in context.compile_state._entities
- ]
- row_metadata = SimpleResultMetaData(
- labels, extra, _unique_filters=unique_filters
- )
- def chunks(size):
- while True:
- yield_per = size
- context.partials = {}
- if yield_per:
- fetch = cursor.fetchmany(yield_per)
- if not fetch:
- break
- else:
- fetch = cursor._raw_all_rows()
- if single_entity:
- proc = process[0]
- rows = [proc(row) for row in fetch]
- else:
- rows = [
- tuple([proc(row) for proc in process]) for row in fetch
- ]
- for path, post_load in context.post_load_paths.items():
- post_load.invoke(context, path)
- yield rows
- if not yield_per:
- break
- if context.execution_options.get("prebuffer_rows", False):
- # this is a bit of a hack at the moment.
- # I would rather have some option in the result to pre-buffer
- # internally.
- _prebuffered = list(chunks(None))
- def chunks(size):
- return iter(_prebuffered)
- result = ChunkedIteratorResult(
- row_metadata,
- chunks,
- source_supports_scalars=single_entity,
- raw=cursor,
- dynamic_yield_per=cursor.context._is_server_side,
- )
- # filtered and single_entity are used to indicate to legacy Query that the
- # query has ORM entities, so legacy deduping and scalars should be called
- # on the result.
- result._attributes = result._attributes.union(
- dict(filtered=filtered, is_single_entity=single_entity)
- )
- # multi_row_eager_loaders OTOH is specific to joinedload.
- if context.compile_state.multi_row_eager_loaders:
- def require_unique(obj):
- raise sa_exc.InvalidRequestError(
- "The unique() method must be invoked on this Result, "
- "as it contains results that include joined eager loads "
- "against collections"
- )
- result._unique_filter_state = (None, require_unique)
- if context.yield_per:
- result.yield_per(context.yield_per)
- return result
- @util.preload_module("sqlalchemy.orm.context")
- def merge_frozen_result(session, statement, frozen_result, load=True):
- """Merge a :class:`_engine.FrozenResult` back into a :class:`_orm.Session`,
- returning a new :class:`_engine.Result` object with :term:`persistent`
- objects.
- See the section :ref:`do_orm_execute_re_executing` for an example.
- .. seealso::
- :ref:`do_orm_execute_re_executing`
- :meth:`_engine.Result.freeze`
- :class:`_engine.FrozenResult`
- """
- querycontext = util.preloaded.orm_context
- if load:
- # flush current contents if we expect to load data
- session._autoflush()
- ctx = querycontext.ORMSelectCompileState._create_entities_collection(
- statement, legacy=False
- )
- autoflush = session.autoflush
- try:
- session.autoflush = False
- mapped_entities = [
- i
- for i, e in enumerate(ctx._entities)
- if isinstance(e, querycontext._MapperEntity)
- ]
- keys = [ent._label_name for ent in ctx._entities]
- keyed_tuple = result_tuple(
- keys, [ent._extra_entities for ent in ctx._entities]
- )
- result = []
- for newrow in frozen_result.rewrite_rows():
- for i in mapped_entities:
- if newrow[i] is not None:
- newrow[i] = session._merge(
- attributes.instance_state(newrow[i]),
- attributes.instance_dict(newrow[i]),
- load=load,
- _recursive={},
- _resolve_conflict_map={},
- )
- result.append(keyed_tuple(newrow))
- return frozen_result.with_new_rows(result)
- finally:
- session.autoflush = autoflush
- @util.deprecated_20(
- ":func:`_orm.merge_result`",
- alternative="The function as well as the method on :class:`_orm.Query` "
- "is superseded by the :func:`_orm.merge_frozen_result` function.",
- becomes_legacy=True,
- )
- @util.preload_module("sqlalchemy.orm.context")
- def merge_result(query, iterator, load=True):
- """Merge a result into the given :class:`.Query` object's Session.
- See :meth:`_orm.Query.merge_result` for top-level documentation on this
- function.
- """
- querycontext = util.preloaded.orm_context
- session = query.session
- if load:
- # flush current contents if we expect to load data
- session._autoflush()
- # TODO: need test coverage and documentation for the FrozenResult
- # use case.
- if isinstance(iterator, FrozenResult):
- frozen_result = iterator
- iterator = iter(frozen_result.data)
- else:
- frozen_result = None
- ctx = querycontext.ORMSelectCompileState._create_entities_collection(
- query, legacy=True
- )
- autoflush = session.autoflush
- try:
- session.autoflush = False
- single_entity = not frozen_result and len(ctx._entities) == 1
- if single_entity:
- if isinstance(ctx._entities[0], querycontext._MapperEntity):
- result = [
- session._merge(
- attributes.instance_state(instance),
- attributes.instance_dict(instance),
- load=load,
- _recursive={},
- _resolve_conflict_map={},
- )
- for instance in iterator
- ]
- else:
- result = list(iterator)
- else:
- mapped_entities = [
- i
- for i, e in enumerate(ctx._entities)
- if isinstance(e, querycontext._MapperEntity)
- ]
- result = []
- keys = [ent._label_name for ent in ctx._entities]
- keyed_tuple = result_tuple(
- keys, [ent._extra_entities for ent in ctx._entities]
- )
- for row in iterator:
- newrow = list(row)
- for i in mapped_entities:
- if newrow[i] is not None:
- newrow[i] = session._merge(
- attributes.instance_state(newrow[i]),
- attributes.instance_dict(newrow[i]),
- load=load,
- _recursive={},
- _resolve_conflict_map={},
- )
- result.append(keyed_tuple(newrow))
- if frozen_result:
- return frozen_result.with_data(result)
- else:
- return iter(result)
- finally:
- session.autoflush = autoflush
- def get_from_identity(session, mapper, key, passive):
- """Look up the given key in the given session's identity map,
- check the object for expired state if found.
- """
- instance = session.identity_map.get(key)
- if instance is not None:
- state = attributes.instance_state(instance)
- if mapper.inherits and not state.mapper.isa(mapper):
- return attributes.PASSIVE_CLASS_MISMATCH
- # expired - ensure it still exists
- if state.expired:
- if not passive & attributes.SQL_OK:
- # TODO: no coverage here
- return attributes.PASSIVE_NO_RESULT
- elif not passive & attributes.RELATED_OBJECT_OK:
- # this mode is used within a flush and the instance's
- # expired state will be checked soon enough, if necessary.
- # also used by immediateloader for a mutually-dependent
- # o2m->m2m load, :ticket:`6301`
- return instance
- try:
- state._load_expired(state, passive)
- except orm_exc.ObjectDeletedError:
- session._remove_newly_deleted([state])
- return None
- return instance
- else:
- return None
- def load_on_ident(
- session,
- statement,
- key,
- load_options=None,
- refresh_state=None,
- with_for_update=None,
- only_load_props=None,
- no_autoflush=False,
- bind_arguments=util.EMPTY_DICT,
- execution_options=util.EMPTY_DICT,
- ):
- """Load the given identity key from the database."""
- if key is not None:
- ident = key[1]
- identity_token = key[2]
- else:
- ident = identity_token = None
- return load_on_pk_identity(
- session,
- statement,
- ident,
- load_options=load_options,
- refresh_state=refresh_state,
- with_for_update=with_for_update,
- only_load_props=only_load_props,
- identity_token=identity_token,
- no_autoflush=no_autoflush,
- bind_arguments=bind_arguments,
- execution_options=execution_options,
- )
- def load_on_pk_identity(
- session,
- statement,
- primary_key_identity,
- load_options=None,
- refresh_state=None,
- with_for_update=None,
- only_load_props=None,
- identity_token=None,
- no_autoflush=False,
- bind_arguments=util.EMPTY_DICT,
- execution_options=util.EMPTY_DICT,
- ):
- """Load the given primary key identity from the database."""
- query = statement
- q = query._clone()
- assert not q._is_lambda_element
- # TODO: fix these imports ....
- from .context import QueryContext, ORMCompileState
- if load_options is None:
- load_options = QueryContext.default_load_options
- if (
- statement._compile_options
- is SelectState.default_select_compile_options
- ):
- compile_options = ORMCompileState.default_compile_options
- else:
- compile_options = statement._compile_options
- if primary_key_identity is not None:
- mapper = query._propagate_attrs["plugin_subject"]
- (_get_clause, _get_params) = mapper._get_clause
- # None present in ident - turn those comparisons
- # into "IS NULL"
- if None in primary_key_identity:
- nones = set(
- [
- _get_params[col].key
- for col, value in zip(
- mapper.primary_key, primary_key_identity
- )
- if value is None
- ]
- )
- _get_clause = sql_util.adapt_criterion_to_null(_get_clause, nones)
- if len(nones) == len(primary_key_identity):
- util.warn(
- "fully NULL primary key identity cannot load any "
- "object. This condition may raise an error in a future "
- "release."
- )
- q._where_criteria = (
- sql_util._deep_annotate(_get_clause, {"_orm_adapt": True}),
- )
- params = dict(
- [
- (_get_params[primary_key].key, id_val)
- for id_val, primary_key in zip(
- primary_key_identity, mapper.primary_key
- )
- ]
- )
- else:
- params = None
- if with_for_update is not None:
- version_check = True
- q._for_update_arg = with_for_update
- elif query._for_update_arg is not None:
- version_check = True
- q._for_update_arg = query._for_update_arg
- else:
- version_check = False
- if refresh_state and refresh_state.load_options:
- compile_options += {"_current_path": refresh_state.load_path.parent}
- q = q.options(*refresh_state.load_options)
- new_compile_options, load_options = _set_get_options(
- compile_options,
- load_options,
- version_check=version_check,
- only_load_props=only_load_props,
- refresh_state=refresh_state,
- identity_token=identity_token,
- )
- q._compile_options = new_compile_options
- q._order_by = None
- if no_autoflush:
- load_options += {"_autoflush": False}
- execution_options = util.EMPTY_DICT.merge_with(
- execution_options, {"_sa_orm_load_options": load_options}
- )
- result = (
- session.execute(
- q,
- params=params,
- execution_options=execution_options,
- bind_arguments=bind_arguments,
- )
- .unique()
- .scalars()
- )
- try:
- return result.one()
- except orm_exc.NoResultFound:
- return None
- def _set_get_options(
- compile_opt,
- load_opt,
- populate_existing=None,
- version_check=None,
- only_load_props=None,
- refresh_state=None,
- identity_token=None,
- ):
- compile_options = {}
- load_options = {}
- if version_check:
- load_options["_version_check"] = version_check
- if populate_existing:
- load_options["_populate_existing"] = populate_existing
- if refresh_state:
- load_options["_refresh_state"] = refresh_state
- compile_options["_for_refresh_state"] = True
- if only_load_props:
- compile_options["_only_load_props"] = frozenset(only_load_props)
- if identity_token:
- load_options["_refresh_identity_token"] = identity_token
- if load_options:
- load_opt += load_options
- if compile_options:
- compile_opt += compile_options
- return compile_opt, load_opt
- def _setup_entity_query(
- compile_state,
- mapper,
- query_entity,
- path,
- adapter,
- column_collection,
- with_polymorphic=None,
- only_load_props=None,
- polymorphic_discriminator=None,
- **kw
- ):
- if with_polymorphic:
- poly_properties = mapper._iterate_polymorphic_properties(
- with_polymorphic
- )
- else:
- poly_properties = mapper._polymorphic_properties
- quick_populators = {}
- path.set(compile_state.attributes, "memoized_setups", quick_populators)
- # for the lead entities in the path, e.g. not eager loads, and
- # assuming a user-passed aliased class, e.g. not a from_self() or any
- # implicit aliasing, don't add columns to the SELECT that aren't
- # in the thing that's aliased.
- check_for_adapt = adapter and len(path) == 1 and path[-1].is_aliased_class
- for value in poly_properties:
- if only_load_props and value.key not in only_load_props:
- continue
- value.setup(
- compile_state,
- query_entity,
- path,
- adapter,
- only_load_props=only_load_props,
- column_collection=column_collection,
- memoized_populators=quick_populators,
- check_for_adapt=check_for_adapt,
- **kw
- )
- if (
- polymorphic_discriminator is not None
- and polymorphic_discriminator is not mapper.polymorphic_on
- ):
- if adapter:
- pd = adapter.columns[polymorphic_discriminator]
- else:
- pd = polymorphic_discriminator
- column_collection.append(pd)
- def _warn_for_runid_changed(state):
- util.warn(
- "Loading context for %s has changed within a load/refresh "
- "handler, suggesting a row refresh operation took place. If this "
- "event handler is expected to be "
- "emitting row refresh operations within an existing load or refresh "
- "operation, set restore_load_context=True when establishing the "
- "listener to ensure the context remains unchanged when the event "
- "handler completes." % (state_str(state),)
- )
- def _instance_processor(
- query_entity,
- mapper,
- context,
- result,
- path,
- adapter,
- only_load_props=None,
- refresh_state=None,
- polymorphic_discriminator=None,
- _polymorphic_from=None,
- ):
- """Produce a mapper level row processor callable
- which processes rows into mapped instances."""
- # note that this method, most of which exists in a closure
- # called _instance(), resists being broken out, as
- # attempts to do so tend to add significant function
- # call overhead. _instance() is the most
- # performance-critical section in the whole ORM.
- identity_class = mapper._identity_class
- compile_state = context.compile_state
- # look for "row getter" functions that have been assigned along
- # with the compile state that were cached from a previous load.
- # these are operator.itemgetter() objects that each will extract a
- # particular column from each row.
- getter_key = ("getters", mapper)
- getters = path.get(compile_state.attributes, getter_key, None)
- if getters is None:
- # no getters, so go through a list of attributes we are loading for,
- # and the ones that are column based will have already put information
- # for us in another collection "memoized_setups", which represents the
- # output of the LoaderStrategy.setup_query() method. We can just as
- # easily call LoaderStrategy.create_row_processor for each, but by
- # getting it all at once from setup_query we save another method call
- # per attribute.
- props = mapper._prop_set
- if only_load_props is not None:
- props = props.intersection(
- mapper._props[k] for k in only_load_props
- )
- quick_populators = path.get(
- context.attributes, "memoized_setups", _none_set
- )
- todo = []
- cached_populators = {
- "new": [],
- "quick": [],
- "deferred": [],
- "expire": [],
- "delayed": [],
- "existing": [],
- "eager": [],
- }
- if refresh_state is None:
- # we can also get the "primary key" tuple getter function
- pk_cols = mapper.primary_key
- if adapter:
- pk_cols = [adapter.columns[c] for c in pk_cols]
- primary_key_getter = result._tuple_getter(pk_cols)
- else:
- primary_key_getter = None
- getters = {
- "cached_populators": cached_populators,
- "todo": todo,
- "primary_key_getter": primary_key_getter,
- }
- for prop in props:
- if prop in quick_populators:
- # this is an inlined path just for column-based attributes.
- col = quick_populators[prop]
- if col is _DEFER_FOR_STATE:
- cached_populators["new"].append(
- (prop.key, prop._deferred_column_loader)
- )
- elif col is _SET_DEFERRED_EXPIRED:
- # note that in this path, we are no longer
- # searching in the result to see if the column might
- # be present in some unexpected way.
- cached_populators["expire"].append((prop.key, False))
- elif col is _RAISE_FOR_STATE:
- cached_populators["new"].append(
- (prop.key, prop._raise_column_loader)
- )
- else:
- getter = None
- if adapter:
- # this logic had been removed for all 1.4 releases
- # up until 1.4.18; the adapter here is particularly
- # the compound eager adapter which isn't accommodated
- # in the quick_populators right now. The "fallback"
- # logic below instead took over in many more cases
- # until issue #6596 was identified.
- # note there is still an issue where this codepath
- # produces no "getter" for cases where a joined-inh
- # mapping includes a labeled column property, meaning
- # KeyError is caught internally and we fall back to
- # _getter(col), which works anyway. The adapter
- # here for joined inh without any aliasing might not
- # be useful. Tests which see this include
- # test.orm.inheritance.test_basic ->
- # EagerTargetingTest.test_adapt_stringency
- # OptimizedLoadTest.test_column_expression_joined
- # PolymorphicOnNotLocalTest.test_polymorphic_on_column_prop # noqa E501
- #
- adapted_col = adapter.columns[col]
- if adapted_col is not None:
- getter = result._getter(adapted_col, False)
- if not getter:
- getter = result._getter(col, False)
- if getter:
- cached_populators["quick"].append((prop.key, getter))
- else:
- # fall back to the ColumnProperty itself, which
- # will iterate through all of its columns
- # to see if one fits
- prop.create_row_processor(
- context,
- query_entity,
- path,
- mapper,
- result,
- adapter,
- cached_populators,
- )
- else:
- # loader strategies like subqueryload, selectinload,
- # joinedload, basically relationships, these need to interact
- # with the context each time to work correctly.
- todo.append(prop)
- path.set(compile_state.attributes, getter_key, getters)
- cached_populators = getters["cached_populators"]
- populators = {key: list(value) for key, value in cached_populators.items()}
- for prop in getters["todo"]:
- prop.create_row_processor(
- context, query_entity, path, mapper, result, adapter, populators
- )
- propagated_loader_options = context.propagated_loader_options
- load_path = (
- context.compile_state.current_path + path
- if context.compile_state.current_path.path
- else path
- )
- session_identity_map = context.session.identity_map
- populate_existing = context.populate_existing or mapper.always_refresh
- load_evt = bool(mapper.class_manager.dispatch.load)
- refresh_evt = bool(mapper.class_manager.dispatch.refresh)
- persistent_evt = bool(context.session.dispatch.loaded_as_persistent)
- if persistent_evt:
- loaded_as_persistent = context.session.dispatch.loaded_as_persistent
- instance_state = attributes.instance_state
- instance_dict = attributes.instance_dict
- session_id = context.session.hash_key
- runid = context.runid
- identity_token = context.identity_token
- version_check = context.version_check
- if version_check:
- version_id_col = mapper.version_id_col
- if version_id_col is not None:
- if adapter:
- version_id_col = adapter.columns[version_id_col]
- version_id_getter = result._getter(version_id_col)
- else:
- version_id_getter = None
- if not refresh_state and _polymorphic_from is not None:
- key = ("loader", path.path)
- if key in context.attributes and context.attributes[key].strategy == (
- ("selectinload_polymorphic", True),
- ):
- selectin_load_via = mapper._should_selectin_load(
- context.attributes[key].local_opts["entities"],
- _polymorphic_from,
- )
- else:
- selectin_load_via = mapper._should_selectin_load(
- None, _polymorphic_from
- )
- if selectin_load_via and selectin_load_via is not _polymorphic_from:
- # only_load_props goes w/ refresh_state only, and in a refresh
- # we are a single row query for the exact entity; polymorphic
- # loading does not apply
- assert only_load_props is None
- callable_ = _load_subclass_via_in(context, path, selectin_load_via)
- PostLoad.callable_for_path(
- context,
- load_path,
- selectin_load_via.mapper,
- selectin_load_via,
- callable_,
- selectin_load_via,
- )
- post_load = PostLoad.for_context(context, load_path, only_load_props)
- if refresh_state:
- refresh_identity_key = refresh_state.key
- if refresh_identity_key is None:
- # super-rare condition; a refresh is being called
- # on a non-instance-key instance; this is meant to only
- # occur within a flush()
- refresh_identity_key = mapper._identity_key_from_state(
- refresh_state
- )
- else:
- refresh_identity_key = None
- primary_key_getter = getters["primary_key_getter"]
- if mapper.allow_partial_pks:
- is_not_primary_key = _none_set.issuperset
- else:
- is_not_primary_key = _none_set.intersection
- def _instance(row):
- # determine the state that we'll be populating
- if refresh_identity_key:
- # fixed state that we're refreshing
- state = refresh_state
- instance = state.obj()
- dict_ = instance_dict(instance)
- isnew = state.runid != runid
- currentload = True
- loaded_instance = False
- else:
- # look at the row, see if that identity is in the
- # session, or we have to create a new one
- identitykey = (
- identity_class,
- primary_key_getter(row),
- identity_token,
- )
- instance = session_identity_map.get(identitykey)
- if instance is not None:
- # existing instance
- state = instance_state(instance)
- dict_ = instance_dict(instance)
- isnew = state.runid != runid
- currentload = not isnew
- loaded_instance = False
- if version_check and version_id_getter and not currentload:
- _validate_version_id(
- mapper, state, dict_, row, version_id_getter
- )
- else:
- # create a new instance
- # check for non-NULL values in the primary key columns,
- # else no entity is returned for the row
- if is_not_primary_key(identitykey[1]):
- return None
- isnew = True
- currentload = True
- loaded_instance = True
- instance = mapper.class_manager.new_instance()
- dict_ = instance_dict(instance)
- state = instance_state(instance)
- state.key = identitykey
- state.identity_token = identity_token
- # attach instance to session.
- state.session_id = session_id
- session_identity_map._add_unpresent(state, identitykey)
- effective_populate_existing = populate_existing
- if refresh_state is state:
- effective_populate_existing = True
- # populate. this looks at whether this state is new
- # for this load or was existing, and whether or not this
- # row is the first row with this identity.
- if currentload or effective_populate_existing:
- # full population routines. Objects here are either
- # just created, or we are doing a populate_existing
- # be conservative about setting load_path when populate_existing
- # is in effect; want to maintain options from the original
- # load. see test_expire->test_refresh_maintains_deferred_options
- if isnew and (
- propagated_loader_options or not effective_populate_existing
- ):
- state.load_options = propagated_loader_options
- state.load_path = load_path
- _populate_full(
- context,
- row,
- state,
- dict_,
- isnew,
- load_path,
- loaded_instance,
- effective_populate_existing,
- populators,
- )
- if isnew:
- # state.runid should be equal to context.runid / runid
- # here, however for event checks we are being more conservative
- # and checking against existing run id
- # assert state.runid == runid
- existing_runid = state.runid
- if loaded_instance:
- if load_evt:
- state.manager.dispatch.load(state, context)
- if state.runid != existing_runid:
- _warn_for_runid_changed(state)
- if persistent_evt:
- loaded_as_persistent(context.session, state)
- if state.runid != existing_runid:
- _warn_for_runid_changed(state)
- elif refresh_evt:
- state.manager.dispatch.refresh(
- state, context, only_load_props
- )
- if state.runid != runid:
- _warn_for_runid_changed(state)
- if effective_populate_existing or state.modified:
- if refresh_state and only_load_props:
- state._commit(dict_, only_load_props)
- else:
- state._commit_all(dict_, session_identity_map)
- if post_load:
- post_load.add_state(state, True)
- else:
- # partial population routines, for objects that were already
- # in the Session, but a row matches them; apply eager loaders
- # on existing objects, etc.
- unloaded = state.unloaded
- isnew = state not in context.partials
- if not isnew or unloaded or populators["eager"]:
- # state is having a partial set of its attributes
- # refreshed. Populate those attributes,
- # and add to the "context.partials" collection.
- to_load = _populate_partial(
- context,
- row,
- state,
- dict_,
- isnew,
- load_path,
- unloaded,
- populators,
- )
- if isnew:
- if refresh_evt:
- existing_runid = state.runid
- state.manager.dispatch.refresh(state, context, to_load)
- if state.runid != existing_runid:
- _warn_for_runid_changed(state)
- state._commit(dict_, to_load)
- if post_load and context.invoke_all_eagers:
- post_load.add_state(state, False)
- return instance
- if mapper.polymorphic_map and not _polymorphic_from and not refresh_state:
- # if we are doing polymorphic, dispatch to a different _instance()
- # method specific to the subclass mapper
- def ensure_no_pk(row):
- identitykey = (
- identity_class,
- primary_key_getter(row),
- identity_token,
- )
- if not is_not_primary_key(identitykey[1]):
- return identitykey
- else:
- return None
- _instance = _decorate_polymorphic_switch(
- _instance,
- context,
- query_entity,
- mapper,
- result,
- path,
- polymorphic_discriminator,
- adapter,
- ensure_no_pk,
- )
- return _instance
- def _load_subclass_via_in(context, path, entity):
- mapper = entity.mapper
- zero_idx = len(mapper.base_mapper.primary_key) == 1
- if entity.is_aliased_class:
- q, enable_opt, disable_opt = mapper._subclass_load_via_in(entity)
- else:
- q, enable_opt, disable_opt = mapper._subclass_load_via_in_mapper
- def do_load(context, path, states, load_only, effective_entity):
- orig_query = context.query
- options = (enable_opt,) + orig_query._with_options + (disable_opt,)
- q2 = q.options(*options)
- q2._compile_options = context.compile_state.default_compile_options
- q2._compile_options += {"_current_path": path.parent}
- if context.populate_existing:
- q2 = q2.execution_options(populate_existing=True)
- context.session.execute(
- q2,
- dict(
- primary_keys=[
- state.key[1][0] if zero_idx else state.key[1]
- for state, load_attrs in states
- ]
- ),
- ).unique().scalars().all()
- return do_load
- def _populate_full(
- context,
- row,
- state,
- dict_,
- isnew,
- load_path,
- loaded_instance,
- populate_existing,
- populators,
- ):
- if isnew:
- # first time we are seeing a row with this identity.
- state.runid = context.runid
- for key, getter in populators["quick"]:
- dict_[key] = getter(row)
- if populate_existing:
- for key, set_callable in populators["expire"]:
- dict_.pop(key, None)
- if set_callable:
- state.expired_attributes.add(key)
- else:
- for key, set_callable in populators["expire"]:
- if set_callable:
- state.expired_attributes.add(key)
- for key, populator in populators["new"]:
- populator(state, dict_, row)
- for key, populator in populators["delayed"]:
- populator(state, dict_, row)
- elif load_path != state.load_path:
- # new load path, e.g. object is present in more than one
- # column position in a series of rows
- state.load_path = load_path
- # if we have data, and the data isn't in the dict, OK, let's put
- # it in.
- for key, getter in populators["quick"]:
- if key not in dict_:
- dict_[key] = getter(row)
- # otherwise treat like an "already seen" row
- for key, populator in populators["existing"]:
- populator(state, dict_, row)
- # TODO: allow "existing" populator to know this is
- # a new path for the state:
- # populator(state, dict_, row, new_path=True)
- else:
- # have already seen rows with this identity in this same path.
- for key, populator in populators["existing"]:
- populator(state, dict_, row)
- # TODO: same path
- # populator(state, dict_, row, new_path=False)
- def _populate_partial(
- context, row, state, dict_, isnew, load_path, unloaded, populators
- ):
- if not isnew:
- to_load = context.partials[state]
- for key, populator in populators["existing"]:
- if key in to_load:
- populator(state, dict_, row)
- else:
- to_load = unloaded
- context.partials[state] = to_load
- for key, getter in populators["quick"]:
- if key in to_load:
- dict_[key] = getter(row)
- for key, set_callable in populators["expire"]:
- if key in to_load:
- dict_.pop(key, None)
- if set_callable:
- state.expired_attributes.add(key)
- for key, populator in populators["new"]:
- if key in to_load:
- populator(state, dict_, row)
- for key, populator in populators["delayed"]:
- if key in to_load:
- populator(state, dict_, row)
- for key, populator in populators["eager"]:
- if key not in unloaded:
- populator(state, dict_, row)
- return to_load
- def _validate_version_id(mapper, state, dict_, row, getter):
- if mapper._get_state_attr_by_column(
- state, dict_, mapper.version_id_col
- ) != getter(row):
- raise orm_exc.StaleDataError(
- "Instance '%s' has version id '%s' which "
- "does not match database-loaded version id '%s'."
- % (
- state_str(state),
- mapper._get_state_attr_by_column(
- state, dict_, mapper.version_id_col
- ),
- getter(row),
- )
- )
- def _decorate_polymorphic_switch(
- instance_fn,
- context,
- query_entity,
- mapper,
- result,
- path,
- polymorphic_discriminator,
- adapter,
- ensure_no_pk,
- ):
- if polymorphic_discriminator is not None:
- polymorphic_on = polymorphic_discriminator
- else:
- polymorphic_on = mapper.polymorphic_on
- if polymorphic_on is None:
- return instance_fn
- if adapter:
- polymorphic_on = adapter.columns[polymorphic_on]
- def configure_subclass_mapper(discriminator):
- try:
- sub_mapper = mapper.polymorphic_map[discriminator]
- except KeyError:
- raise AssertionError(
- "No such polymorphic_identity %r is defined" % discriminator
- )
- else:
- if sub_mapper is mapper:
- return None
- elif not sub_mapper.isa(mapper):
- return False
- return _instance_processor(
- query_entity,
- sub_mapper,
- context,
- result,
- path,
- adapter,
- _polymorphic_from=mapper,
- )
- polymorphic_instances = util.PopulateDict(configure_subclass_mapper)
- getter = result._getter(polymorphic_on)
- def polymorphic_instance(row):
- discriminator = getter(row)
- if discriminator is not None:
- _instance = polymorphic_instances[discriminator]
- if _instance:
- return _instance(row)
- elif _instance is False:
- identitykey = ensure_no_pk(row)
- if identitykey:
- raise sa_exc.InvalidRequestError(
- "Row with identity key %s can't be loaded into an "
- "object; the polymorphic discriminator column '%s' "
- "refers to %s, which is not a sub-mapper of "
- "the requested %s"
- % (
- identitykey,
- polymorphic_on,
- mapper.polymorphic_map[discriminator],
- mapper,
- )
- )
- else:
- return None
- else:
- return instance_fn(row)
- else:
- identitykey = ensure_no_pk(row)
- if identitykey:
- raise sa_exc.InvalidRequestError(
- "Row with identity key %s can't be loaded into an "
- "object; the polymorphic discriminator column '%s' is "
- "NULL" % (identitykey, polymorphic_on)
- )
- else:
- return None
- return polymorphic_instance
- class PostLoad(object):
- """Track loaders and states for "post load" operations."""
- __slots__ = "loaders", "states", "load_keys"
- def __init__(self):
- self.loaders = {}
- self.states = util.OrderedDict()
- self.load_keys = None
- def add_state(self, state, overwrite):
- # the states for a polymorphic load here are all shared
- # within a single PostLoad object among multiple subtypes.
- # Filtering of callables on a per-subclass basis needs to be done at
- # the invocation level
- self.states[state] = overwrite
- def invoke(self, context, path):
- if not self.states:
- return
- path = path_registry.PathRegistry.coerce(path)
- for token, limit_to_mapper, loader, arg, kw in self.loaders.values():
- states = [
- (state, overwrite)
- for state, overwrite in self.states.items()
- if state.manager.mapper.isa(limit_to_mapper)
- ]
- if states:
- loader(context, path, states, self.load_keys, *arg, **kw)
- self.states.clear()
- @classmethod
- def for_context(cls, context, path, only_load_props):
- pl = context.post_load_paths.get(path.path)
- if pl is not None and only_load_props:
- pl.load_keys = only_load_props
- return pl
- @classmethod
- def path_exists(self, context, path, key):
- return (
- path.path in context.post_load_paths
- and key in context.post_load_paths[path.path].loaders
- )
- @classmethod
- def callable_for_path(
- cls, context, path, limit_to_mapper, token, loader_callable, *arg, **kw
- ):
- if path.path in context.post_load_paths:
- pl = context.post_load_paths[path.path]
- else:
- pl = context.post_load_paths[path.path] = PostLoad()
- pl.loaders[token] = (token, limit_to_mapper, loader_callable, arg, kw)
- def load_scalar_attributes(mapper, state, attribute_names, passive):
- """initiate a column-based attribute refresh operation."""
- # assert mapper is _state_mapper(state)
- session = state.session
- if not session:
- raise orm_exc.DetachedInstanceError(
- "Instance %s is not bound to a Session; "
- "attribute refresh operation cannot proceed" % (state_str(state))
- )
- has_key = bool(state.key)
- result = False
- no_autoflush = (
- bool(passive & attributes.NO_AUTOFLUSH) or state.session.autocommit
- )
- # in the case of inheritance, particularly concrete and abstract
- # concrete inheritance, the class manager might have some keys
- # of attributes on the superclass that we didn't actually map.
- # These could be mapped as "concrete, don't load" or could be completely
- # excluded from the mapping and we know nothing about them. Filter them
- # here to prevent them from coming through.
- if attribute_names:
- attribute_names = attribute_names.intersection(mapper.attrs.keys())
- if mapper.inherits and not mapper.concrete:
- # because we are using Core to produce a select() that we
- # pass to the Query, we aren't calling setup() for mapped
- # attributes; in 1.0 this means deferred attrs won't get loaded
- # by default
- statement = mapper._optimized_get_statement(state, attribute_names)
- if statement is not None:
- # this was previously aliased(mapper, statement), however,
- # statement is a select() and Query's coercion now raises for this
- # since you can't "select" from a "SELECT" statement. only
- # from_statement() allows this.
- # note: using from_statement() here means there is an adaption
- # with adapt_on_names set up. the other option is to make the
- # aliased() against a subquery which affects the SQL.
- from .query import FromStatement
- stmt = FromStatement(mapper, statement).options(
- strategy_options.Load(mapper).undefer("*")
- )
- result = load_on_ident(
- session,
- stmt,
- None,
- only_load_props=attribute_names,
- refresh_state=state,
- no_autoflush=no_autoflush,
- )
- if result is False:
- if has_key:
- identity_key = state.key
- else:
- # this codepath is rare - only valid when inside a flush, and the
- # object is becoming persistent but hasn't yet been assigned
- # an identity_key.
- # check here to ensure we have the attrs we need.
- pk_attrs = [
- mapper._columntoproperty[col].key for col in mapper.primary_key
- ]
- if state.expired_attributes.intersection(pk_attrs):
- raise sa_exc.InvalidRequestError(
- "Instance %s cannot be refreshed - it's not "
- " persistent and does not "
- "contain a full primary key." % state_str(state)
- )
- identity_key = mapper._identity_key_from_state(state)
- if (
- _none_set.issubset(identity_key) and not mapper.allow_partial_pks
- ) or _none_set.issuperset(identity_key):
- util.warn_limited(
- "Instance %s to be refreshed doesn't "
- "contain a full primary key - can't be refreshed "
- "(and shouldn't be expired, either).",
- state_str(state),
- )
- return
- result = load_on_ident(
- session,
- future.select(mapper).set_label_style(
- LABEL_STYLE_TABLENAME_PLUS_COL
- ),
- identity_key,
- refresh_state=state,
- only_load_props=attribute_names,
- no_autoflush=no_autoflush,
- )
- # if instance is pending, a refresh operation
- # may not complete (even if PK attributes are assigned)
- if has_key and result is None:
- raise orm_exc.ObjectDeletedError(state)
|