123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701 |
- # sql/base.py
- # Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
- # <see AUTHORS file>
- #
- # This module is part of SQLAlchemy and is released under
- # the MIT License: https://www.opensource.org/licenses/mit-license.php
- """Foundational utilities common to many sql modules.
- """
- import itertools
- import operator
- import re
- from . import roles
- from . import visitors
- from .traversals import HasCacheKey # noqa
- from .traversals import HasCopyInternals # noqa
- from .traversals import MemoizedHasCacheKey # noqa
- from .visitors import ClauseVisitor
- from .visitors import ExtendedInternalTraversal
- from .visitors import InternalTraversal
- from .. import exc
- from .. import util
- from ..util import HasMemoized
- from ..util import hybridmethod
- coercions = None
- elements = None
- type_api = None
- PARSE_AUTOCOMMIT = util.symbol("PARSE_AUTOCOMMIT")
- NO_ARG = util.symbol("NO_ARG")
- class Immutable(object):
- """mark a ClauseElement as 'immutable' when expressions are cloned."""
- _is_immutable = True
- def unique_params(self, *optionaldict, **kwargs):
- raise NotImplementedError("Immutable objects do not support copying")
- def params(self, *optionaldict, **kwargs):
- raise NotImplementedError("Immutable objects do not support copying")
- def _clone(self, **kw):
- return self
- def _copy_internals(self, **kw):
- pass
- class SingletonConstant(Immutable):
- """Represent SQL constants like NULL, TRUE, FALSE"""
- _is_singleton_constant = True
- def __new__(cls, *arg, **kw):
- return cls._singleton
- @classmethod
- def _create_singleton(cls):
- obj = object.__new__(cls)
- obj.__init__()
- # for a long time this was an empty frozenset, meaning
- # a SingletonConstant would never be a "corresponding column" in
- # a statement. This referred to #6259. However, in #7154 we see
- # that we do in fact need "correspondence" to work when matching cols
- # in result sets, so the non-correspondence was moved to a more
- # specific level when we are actually adapting expressions for SQL
- # render only.
- obj.proxy_set = frozenset([obj])
- cls._singleton = obj
- def _from_objects(*elements):
- return itertools.chain.from_iterable(
- [element._from_objects for element in elements]
- )
- def _select_iterables(elements):
- """expand tables into individual columns in the
- given list of column expressions.
- """
- return itertools.chain.from_iterable(
- [c._select_iterable for c in elements]
- )
- def _generative(fn):
- """non-caching _generative() decorator.
- This is basically the legacy decorator that copies the object and
- runs a method on the new copy.
- """
- @util.decorator
- def _generative(fn, self, *args, **kw):
- """Mark a method as generative."""
- self = self._generate()
- x = fn(self, *args, **kw)
- assert x is None, "generative methods must have no return value"
- return self
- decorated = _generative(fn)
- decorated.non_generative = fn
- return decorated
- def _exclusive_against(*names, **kw):
- msgs = kw.pop("msgs", {})
- defaults = kw.pop("defaults", {})
- getters = [
- (name, operator.attrgetter(name), defaults.get(name, None))
- for name in names
- ]
- @util.decorator
- def check(fn, *args, **kw):
- # make pylance happy by not including "self" in the argument
- # list
- self = args[0]
- args = args[1:]
- for name, getter, default_ in getters:
- if getter(self) is not default_:
- msg = msgs.get(
- name,
- "Method %s() has already been invoked on this %s construct"
- % (fn.__name__, self.__class__),
- )
- raise exc.InvalidRequestError(msg)
- return fn(self, *args, **kw)
- return check
- def _clone(element, **kw):
- return element._clone(**kw)
- def _expand_cloned(elements):
- """expand the given set of ClauseElements to be the set of all 'cloned'
- predecessors.
- """
- return itertools.chain(*[x._cloned_set for x in elements])
- def _cloned_intersection(a, b):
- """return the intersection of sets a and b, counting
- any overlap between 'cloned' predecessors.
- The returned set is in terms of the entities present within 'a'.
- """
- all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b))
- return set(
- elem for elem in a if all_overlap.intersection(elem._cloned_set)
- )
- def _cloned_difference(a, b):
- all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b))
- return set(
- elem for elem in a if not all_overlap.intersection(elem._cloned_set)
- )
- class _DialectArgView(util.collections_abc.MutableMapping):
- """A dictionary view of dialect-level arguments in the form
- <dialectname>_<argument_name>.
- """
- def __init__(self, obj):
- self.obj = obj
- def _key(self, key):
- try:
- dialect, value_key = key.split("_", 1)
- except ValueError as err:
- util.raise_(KeyError(key), replace_context=err)
- else:
- return dialect, value_key
- def __getitem__(self, key):
- dialect, value_key = self._key(key)
- try:
- opt = self.obj.dialect_options[dialect]
- except exc.NoSuchModuleError as err:
- util.raise_(KeyError(key), replace_context=err)
- else:
- return opt[value_key]
- def __setitem__(self, key, value):
- try:
- dialect, value_key = self._key(key)
- except KeyError as err:
- util.raise_(
- exc.ArgumentError(
- "Keys must be of the form <dialectname>_<argname>"
- ),
- replace_context=err,
- )
- else:
- self.obj.dialect_options[dialect][value_key] = value
- def __delitem__(self, key):
- dialect, value_key = self._key(key)
- del self.obj.dialect_options[dialect][value_key]
- def __len__(self):
- return sum(
- len(args._non_defaults)
- for args in self.obj.dialect_options.values()
- )
- def __iter__(self):
- return (
- "%s_%s" % (dialect_name, value_name)
- for dialect_name in self.obj.dialect_options
- for value_name in self.obj.dialect_options[
- dialect_name
- ]._non_defaults
- )
- class _DialectArgDict(util.collections_abc.MutableMapping):
- """A dictionary view of dialect-level arguments for a specific
- dialect.
- Maintains a separate collection of user-specified arguments
- and dialect-specified default arguments.
- """
- def __init__(self):
- self._non_defaults = {}
- self._defaults = {}
- def __len__(self):
- return len(set(self._non_defaults).union(self._defaults))
- def __iter__(self):
- return iter(set(self._non_defaults).union(self._defaults))
- def __getitem__(self, key):
- if key in self._non_defaults:
- return self._non_defaults[key]
- else:
- return self._defaults[key]
- def __setitem__(self, key, value):
- self._non_defaults[key] = value
- def __delitem__(self, key):
- del self._non_defaults[key]
- @util.preload_module("sqlalchemy.dialects")
- def _kw_reg_for_dialect(dialect_name):
- dialect_cls = util.preloaded.dialects.registry.load(dialect_name)
- if dialect_cls.construct_arguments is None:
- return None
- return dict(dialect_cls.construct_arguments)
- class DialectKWArgs(object):
- """Establish the ability for a class to have dialect-specific arguments
- with defaults and constructor validation.
- The :class:`.DialectKWArgs` interacts with the
- :attr:`.DefaultDialect.construct_arguments` present on a dialect.
- .. seealso::
- :attr:`.DefaultDialect.construct_arguments`
- """
- _dialect_kwargs_traverse_internals = [
- ("dialect_options", InternalTraversal.dp_dialect_options)
- ]
- @classmethod
- def argument_for(cls, dialect_name, argument_name, default):
- """Add a new kind of dialect-specific keyword argument for this class.
- E.g.::
- Index.argument_for("mydialect", "length", None)
- some_index = Index('a', 'b', mydialect_length=5)
- The :meth:`.DialectKWArgs.argument_for` method is a per-argument
- way adding extra arguments to the
- :attr:`.DefaultDialect.construct_arguments` dictionary. This
- dictionary provides a list of argument names accepted by various
- schema-level constructs on behalf of a dialect.
- New dialects should typically specify this dictionary all at once as a
- data member of the dialect class. The use case for ad-hoc addition of
- argument names is typically for end-user code that is also using
- a custom compilation scheme which consumes the additional arguments.
- :param dialect_name: name of a dialect. The dialect must be
- locatable, else a :class:`.NoSuchModuleError` is raised. The
- dialect must also include an existing
- :attr:`.DefaultDialect.construct_arguments` collection, indicating
- that it participates in the keyword-argument validation and default
- system, else :class:`.ArgumentError` is raised. If the dialect does
- not include this collection, then any keyword argument can be
- specified on behalf of this dialect already. All dialects packaged
- within SQLAlchemy include this collection, however for third party
- dialects, support may vary.
- :param argument_name: name of the parameter.
- :param default: default value of the parameter.
- .. versionadded:: 0.9.4
- """
- construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
- if construct_arg_dictionary is None:
- raise exc.ArgumentError(
- "Dialect '%s' does have keyword-argument "
- "validation and defaults enabled configured" % dialect_name
- )
- if cls not in construct_arg_dictionary:
- construct_arg_dictionary[cls] = {}
- construct_arg_dictionary[cls][argument_name] = default
- @util.memoized_property
- def dialect_kwargs(self):
- """A collection of keyword arguments specified as dialect-specific
- options to this construct.
- The arguments are present here in their original ``<dialect>_<kwarg>``
- format. Only arguments that were actually passed are included;
- unlike the :attr:`.DialectKWArgs.dialect_options` collection, which
- contains all options known by this dialect including defaults.
- The collection is also writable; keys are accepted of the
- form ``<dialect>_<kwarg>`` where the value will be assembled
- into the list of options.
- .. versionadded:: 0.9.2
- .. versionchanged:: 0.9.4 The :attr:`.DialectKWArgs.dialect_kwargs`
- collection is now writable.
- .. seealso::
- :attr:`.DialectKWArgs.dialect_options` - nested dictionary form
- """
- return _DialectArgView(self)
- @property
- def kwargs(self):
- """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`."""
- return self.dialect_kwargs
- _kw_registry = util.PopulateDict(_kw_reg_for_dialect)
- def _kw_reg_for_dialect_cls(self, dialect_name):
- construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
- d = _DialectArgDict()
- if construct_arg_dictionary is None:
- d._defaults.update({"*": None})
- else:
- for cls in reversed(self.__class__.__mro__):
- if cls in construct_arg_dictionary:
- d._defaults.update(construct_arg_dictionary[cls])
- return d
- @util.memoized_property
- def dialect_options(self):
- """A collection of keyword arguments specified as dialect-specific
- options to this construct.
- This is a two-level nested registry, keyed to ``<dialect_name>``
- and ``<argument_name>``. For example, the ``postgresql_where``
- argument would be locatable as::
- arg = my_object.dialect_options['postgresql']['where']
- .. versionadded:: 0.9.2
- .. seealso::
- :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form
- """
- return util.PopulateDict(
- util.portable_instancemethod(self._kw_reg_for_dialect_cls)
- )
- def _validate_dialect_kwargs(self, kwargs):
- # validate remaining kwargs that they all specify DB prefixes
- if not kwargs:
- return
- for k in kwargs:
- m = re.match("^(.+?)_(.+)$", k)
- if not m:
- raise TypeError(
- "Additional arguments should be "
- "named <dialectname>_<argument>, got '%s'" % k
- )
- dialect_name, arg_name = m.group(1, 2)
- try:
- construct_arg_dictionary = self.dialect_options[dialect_name]
- except exc.NoSuchModuleError:
- util.warn(
- "Can't validate argument %r; can't "
- "locate any SQLAlchemy dialect named %r"
- % (k, dialect_name)
- )
- self.dialect_options[dialect_name] = d = _DialectArgDict()
- d._defaults.update({"*": None})
- d._non_defaults[arg_name] = kwargs[k]
- else:
- if (
- "*" not in construct_arg_dictionary
- and arg_name not in construct_arg_dictionary
- ):
- raise exc.ArgumentError(
- "Argument %r is not accepted by "
- "dialect %r on behalf of %r"
- % (k, dialect_name, self.__class__)
- )
- else:
- construct_arg_dictionary[arg_name] = kwargs[k]
- class CompileState(object):
- """Produces additional object state necessary for a statement to be
- compiled.
- the :class:`.CompileState` class is at the base of classes that assemble
- state for a particular statement object that is then used by the
- compiler. This process is essentially an extension of the process that
- the SQLCompiler.visit_XYZ() method takes, however there is an emphasis
- on converting raw user intent into more organized structures rather than
- producing string output. The top-level :class:`.CompileState` for the
- statement being executed is also accessible when the execution context
- works with invoking the statement and collecting results.
- The production of :class:`.CompileState` is specific to the compiler, such
- as within the :meth:`.SQLCompiler.visit_insert`,
- :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also
- responsible for associating the :class:`.CompileState` with the
- :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement,
- i.e. the outermost SQL statement that's actually being executed.
- There can be other :class:`.CompileState` objects that are not the
- toplevel, such as when a SELECT subquery or CTE-nested
- INSERT/UPDATE/DELETE is generated.
- .. versionadded:: 1.4
- """
- __slots__ = ("statement",)
- plugins = {}
- @classmethod
- def create_for_statement(cls, statement, compiler, **kw):
- # factory construction.
- if statement._propagate_attrs:
- plugin_name = statement._propagate_attrs.get(
- "compile_state_plugin", "default"
- )
- klass = cls.plugins.get(
- (plugin_name, statement._effective_plugin_target), None
- )
- if klass is None:
- klass = cls.plugins[
- ("default", statement._effective_plugin_target)
- ]
- else:
- klass = cls.plugins[
- ("default", statement._effective_plugin_target)
- ]
- if klass is cls:
- return cls(statement, compiler, **kw)
- else:
- return klass.create_for_statement(statement, compiler, **kw)
- def __init__(self, statement, compiler, **kw):
- self.statement = statement
- @classmethod
- def get_plugin_class(cls, statement):
- plugin_name = statement._propagate_attrs.get(
- "compile_state_plugin", None
- )
- if plugin_name:
- key = (plugin_name, statement._effective_plugin_target)
- if key in cls.plugins:
- return cls.plugins[key]
- # there's no case where we call upon get_plugin_class() and want
- # to get None back, there should always be a default. return that
- # if there was no plugin-specific class (e.g. Insert with "orm"
- # plugin)
- try:
- return cls.plugins[("default", statement._effective_plugin_target)]
- except KeyError:
- return None
- @classmethod
- def _get_plugin_class_for_plugin(cls, statement, plugin_name):
- try:
- return cls.plugins[
- (plugin_name, statement._effective_plugin_target)
- ]
- except KeyError:
- return None
- @classmethod
- def plugin_for(cls, plugin_name, visit_name):
- def decorate(cls_to_decorate):
- cls.plugins[(plugin_name, visit_name)] = cls_to_decorate
- return cls_to_decorate
- return decorate
- class Generative(HasMemoized):
- """Provide a method-chaining pattern in conjunction with the
- @_generative decorator."""
- def _generate(self):
- skip = self._memoized_keys
- cls = self.__class__
- s = cls.__new__(cls)
- if skip:
- s.__dict__ = {
- k: v for k, v in self.__dict__.items() if k not in skip
- }
- else:
- s.__dict__ = self.__dict__.copy()
- return s
- class InPlaceGenerative(HasMemoized):
- """Provide a method-chaining pattern in conjunction with the
- @_generative decorator that mutates in place."""
- def _generate(self):
- skip = self._memoized_keys
- for k in skip:
- self.__dict__.pop(k, None)
- return self
- class HasCompileState(Generative):
- """A class that has a :class:`.CompileState` associated with it."""
- _compile_state_plugin = None
- _attributes = util.immutabledict()
- _compile_state_factory = CompileState.create_for_statement
- class _MetaOptions(type):
- """metaclass for the Options class."""
- def __init__(cls, classname, bases, dict_):
- cls._cache_attrs = tuple(
- sorted(
- d
- for d in dict_
- if not d.startswith("__")
- and d not in ("_cache_key_traversal",)
- )
- )
- type.__init__(cls, classname, bases, dict_)
- def __add__(self, other):
- o1 = self()
- if set(other).difference(self._cache_attrs):
- raise TypeError(
- "dictionary contains attributes not covered by "
- "Options class %s: %r"
- % (self, set(other).difference(self._cache_attrs))
- )
- o1.__dict__.update(other)
- return o1
- class Options(util.with_metaclass(_MetaOptions)):
- """A cacheable option dictionary with defaults."""
- def __init__(self, **kw):
- self.__dict__.update(kw)
- def __add__(self, other):
- o1 = self.__class__.__new__(self.__class__)
- o1.__dict__.update(self.__dict__)
- if set(other).difference(self._cache_attrs):
- raise TypeError(
- "dictionary contains attributes not covered by "
- "Options class %s: %r"
- % (self, set(other).difference(self._cache_attrs))
- )
- o1.__dict__.update(other)
- return o1
- def __eq__(self, other):
- # TODO: very inefficient. This is used only in test suites
- # right now.
- for a, b in util.zip_longest(self._cache_attrs, other._cache_attrs):
- if getattr(self, a) != getattr(other, b):
- return False
- return True
- def __repr__(self):
- # TODO: fairly inefficient, used only in debugging right now.
- return "%s(%s)" % (
- self.__class__.__name__,
- ", ".join(
- "%s=%r" % (k, self.__dict__[k])
- for k in self._cache_attrs
- if k in self.__dict__
- ),
- )
- @classmethod
- def isinstance(cls, klass):
- return issubclass(cls, klass)
- @hybridmethod
- def add_to_element(self, name, value):
- return self + {name: getattr(self, name) + value}
- @hybridmethod
- def _state_dict(self):
- return self.__dict__
- _state_dict_const = util.immutabledict()
- @_state_dict.classlevel
- def _state_dict(cls):
- return cls._state_dict_const
- @classmethod
- def safe_merge(cls, other):
- d = other._state_dict()
- # only support a merge with another object of our class
- # and which does not have attrs that we don't. otherwise
- # we risk having state that might not be part of our cache
- # key strategy
- if (
- cls is not other.__class__
- and other._cache_attrs
- and set(other._cache_attrs).difference(cls._cache_attrs)
- ):
- raise TypeError(
- "other element %r is not empty, is not of type %s, "
- "and contains attributes not covered here %r"
- % (
- other,
- cls,
- set(other._cache_attrs).difference(cls._cache_attrs),
- )
- )
- return cls + d
- @classmethod
- def from_execution_options(
- cls, key, attrs, exec_options, statement_exec_options
- ):
- """process Options argument in terms of execution options.
- e.g.::
- (
- load_options,
- execution_options,
- ) = QueryContext.default_load_options.from_execution_options(
- "_sa_orm_load_options",
- {
- "populate_existing",
- "autoflush",
- "yield_per"
- },
- execution_options,
- statement._execution_options,
- )
- get back the Options and refresh "_sa_orm_load_options" in the
- exec options dict w/ the Options as well
- """
- # common case is that no options we are looking for are
- # in either dictionary, so cancel for that first
- check_argnames = attrs.intersection(
- set(exec_options).union(statement_exec_options)
- )
- existing_options = exec_options.get(key, cls)
- if check_argnames:
- result = {}
- for argname in check_argnames:
- local = "_" + argname
- if argname in exec_options:
- result[local] = exec_options[argname]
- elif argname in statement_exec_options:
- result[local] = statement_exec_options[argname]
- new_options = existing_options + result
- exec_options = util.immutabledict().merge_with(
- exec_options, {key: new_options}
- )
- return new_options, exec_options
- else:
- return existing_options, exec_options
- class CacheableOptions(Options, HasCacheKey):
- @hybridmethod
- def _gen_cache_key(self, anon_map, bindparams):
- return HasCacheKey._gen_cache_key(self, anon_map, bindparams)
- @_gen_cache_key.classlevel
- def _gen_cache_key(cls, anon_map, bindparams):
- return (cls, ())
- @hybridmethod
- def _generate_cache_key(self):
- return HasCacheKey._generate_cache_key_for_object(self)
- class ExecutableOption(HasCopyInternals):
- _annotations = util.EMPTY_DICT
- __visit_name__ = "executable_option"
- _is_has_cache_key = False
- def _clone(self, **kw):
- """Create a shallow copy of this ExecutableOption."""
- c = self.__class__.__new__(self.__class__)
- c.__dict__ = dict(self.__dict__)
- return c
- class Executable(roles.StatementRole, Generative):
- """Mark a :class:`_expression.ClauseElement` as supporting execution.
- :class:`.Executable` is a superclass for all "statement" types
- of objects, including :func:`select`, :func:`delete`, :func:`update`,
- :func:`insert`, :func:`text`.
- """
- supports_execution = True
- _execution_options = util.immutabledict()
- _bind = None
- _with_options = ()
- _with_context_options = ()
- _executable_traverse_internals = [
- ("_with_options", InternalTraversal.dp_executable_options),
- (
- "_with_context_options",
- ExtendedInternalTraversal.dp_with_context_options,
- ),
- ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs),
- ]
- is_select = False
- is_update = False
- is_insert = False
- is_text = False
- is_delete = False
- is_dml = False
- @property
- def _effective_plugin_target(self):
- return self.__visit_name__
- @_generative
- def options(self, *options):
- """Apply options to this statement.
- In the general sense, options are any kind of Python object
- that can be interpreted by the SQL compiler for the statement.
- These options can be consumed by specific dialects or specific kinds
- of compilers.
- The most commonly known kind of option are the ORM level options
- that apply "eager load" and other loading behaviors to an ORM
- query. However, options can theoretically be used for many other
- purposes.
- For background on specific kinds of options for specific kinds of
- statements, refer to the documentation for those option objects.
- .. versionchanged:: 1.4 - added :meth:`.Generative.options` to
- Core statement objects towards the goal of allowing unified
- Core / ORM querying capabilities.
- .. seealso::
- :ref:`deferred_options` - refers to options specific to the usage
- of ORM queries
- :ref:`relationship_loader_options` - refers to options specific
- to the usage of ORM queries
- """
- self._with_options += tuple(
- coercions.expect(roles.ExecutableOptionRole, opt)
- for opt in options
- )
- @_generative
- def _set_compile_options(self, compile_options):
- """Assign the compile options to a new value.
- :param compile_options: appropriate CacheableOptions structure
- """
- self._compile_options = compile_options
- @_generative
- def _update_compile_options(self, options):
- """update the _compile_options with new keys."""
- self._compile_options += options
- @_generative
- def _add_context_option(self, callable_, cache_args):
- """Add a context option to this statement.
- These are callable functions that will
- be given the CompileState object upon compilation.
- A second argument cache_args is required, which will be combined with
- the ``__code__`` identity of the function itself in order to produce a
- cache key.
- """
- self._with_context_options += ((callable_, cache_args),)
- @_generative
- def execution_options(self, **kw):
- """Set non-SQL options for the statement which take effect during
- execution.
- Execution options can be set on a per-statement or
- per :class:`_engine.Connection` basis. Additionally, the
- :class:`_engine.Engine` and ORM :class:`~.orm.query.Query`
- objects provide
- access to execution options which they in turn configure upon
- connections.
- The :meth:`execution_options` method is generative. A new
- instance of this statement is returned that contains the options::
- statement = select(table.c.x, table.c.y)
- statement = statement.execution_options(autocommit=True)
- Note that only a subset of possible execution options can be applied
- to a statement - these include "autocommit" and "stream_results",
- but not "isolation_level" or "compiled_cache".
- See :meth:`_engine.Connection.execution_options` for a full list of
- possible options.
- .. seealso::
- :meth:`_engine.Connection.execution_options`
- :meth:`_query.Query.execution_options`
- :meth:`.Executable.get_execution_options`
- """
- if "isolation_level" in kw:
- raise exc.ArgumentError(
- "'isolation_level' execution option may only be specified "
- "on Connection.execution_options(), or "
- "per-engine using the isolation_level "
- "argument to create_engine()."
- )
- if "compiled_cache" in kw:
- raise exc.ArgumentError(
- "'compiled_cache' execution option may only be specified "
- "on Connection.execution_options(), not per statement."
- )
- self._execution_options = self._execution_options.union(kw)
- def get_execution_options(self):
- """Get the non-SQL options which will take effect during execution.
- .. versionadded:: 1.3
- .. seealso::
- :meth:`.Executable.execution_options`
- """
- return self._execution_options
- @util.deprecated_20(
- ":meth:`.Executable.execute`",
- alternative="All statement execution in SQLAlchemy 2.0 is performed "
- "by the :meth:`_engine.Connection.execute` method of "
- ":class:`_engine.Connection`, "
- "or in the ORM by the :meth:`.Session.execute` method of "
- ":class:`.Session`.",
- )
- def execute(self, *multiparams, **params):
- """Compile and execute this :class:`.Executable`."""
- e = self.bind
- if e is None:
- label = (
- getattr(self, "description", None) or self.__class__.__name__
- )
- msg = (
- "This %s is not directly bound to a Connection or Engine. "
- "Use the .execute() method of a Connection or Engine "
- "to execute this construct." % label
- )
- raise exc.UnboundExecutionError(msg)
- return e._execute_clauseelement(
- self, multiparams, params, util.immutabledict()
- )
- @util.deprecated_20(
- ":meth:`.Executable.scalar`",
- alternative="Scalar execution in SQLAlchemy 2.0 is performed "
- "by the :meth:`_engine.Connection.scalar` method of "
- ":class:`_engine.Connection`, "
- "or in the ORM by the :meth:`.Session.scalar` method of "
- ":class:`.Session`.",
- )
- def scalar(self, *multiparams, **params):
- """Compile and execute this :class:`.Executable`, returning the
- result's scalar representation.
- """
- return self.execute(*multiparams, **params).scalar()
- @property
- @util.deprecated_20(
- ":attr:`.Executable.bind`",
- alternative="Bound metadata is being removed as of SQLAlchemy 2.0.",
- enable_warnings=False,
- )
- def bind(self):
- """Returns the :class:`_engine.Engine` or :class:`_engine.Connection`
- to
- which this :class:`.Executable` is bound, or None if none found.
- This is a traversal which checks locally, then
- checks among the "from" clauses of associated objects
- until a bound engine or connection is found.
- """
- if self._bind is not None:
- return self._bind
- for f in _from_objects(self):
- if f is self:
- continue
- engine = f.bind
- if engine is not None:
- return engine
- else:
- return None
- class prefix_anon_map(dict):
- """A map that creates new keys for missing key access.
- Considers keys of the form "<ident> <name>" to produce
- new symbols "<name>_<index>", where "index" is an incrementing integer
- corresponding to <name>.
- Inlines the approach taken by :class:`sqlalchemy.util.PopulateDict` which
- is otherwise usually used for this type of operation.
- """
- def __missing__(self, key):
- (ident, derived) = key.split(" ", 1)
- anonymous_counter = self.get(derived, 1)
- self[derived] = anonymous_counter + 1
- value = derived + "_" + str(anonymous_counter)
- self[key] = value
- return value
- class SchemaEventTarget(object):
- """Base class for elements that are the targets of :class:`.DDLEvents`
- events.
- This includes :class:`.SchemaItem` as well as :class:`.SchemaType`.
- """
- def _set_parent(self, parent, **kw):
- """Associate with this SchemaEvent's parent object."""
- def _set_parent_with_dispatch(self, parent, **kw):
- self.dispatch.before_parent_attach(self, parent)
- self._set_parent(parent, **kw)
- self.dispatch.after_parent_attach(self, parent)
- class SchemaVisitor(ClauseVisitor):
- """Define the visiting for ``SchemaItem`` objects."""
- __traverse_options__ = {"schema_visitor": True}
- class ColumnCollection(object):
- """Collection of :class:`_expression.ColumnElement` instances,
- typically for
- :class:`_sql.FromClause` objects.
- The :class:`_sql.ColumnCollection` object is most commonly available
- as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection
- on the :class:`_schema.Table` object, introduced at
- :ref:`metadata_tables_and_columns`.
- The :class:`_expression.ColumnCollection` has both mapping- and sequence-
- like behaviors. A :class:`_expression.ColumnCollection` usually stores
- :class:`_schema.Column` objects, which are then accessible both via mapping
- style access as well as attribute access style.
- To access :class:`_schema.Column` objects using ordinary attribute-style
- access, specify the name like any other object attribute, such as below
- a column named ``employee_name`` is accessed::
- >>> employee_table.c.employee_name
- To access columns that have names with special characters or spaces,
- index-style access is used, such as below which illustrates a column named
- ``employee ' payment`` is accessed::
- >>> employee_table.c["employee ' payment"]
- As the :class:`_sql.ColumnCollection` object provides a Python dictionary
- interface, common dictionary method names like
- :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`,
- and :meth:`_sql.ColumnCollection.items` are available, which means that
- database columns that are keyed under these names also need to use indexed
- access::
- >>> employee_table.c["values"]
- The name for which a :class:`_schema.Column` would be present is normally
- that of the :paramref:`_schema.Column.key` parameter. In some contexts,
- such as a :class:`_sql.Select` object that uses a label style set
- using the :meth:`_sql.Select.set_label_style` method, a column of a certain
- key may instead be represented under a particular label name such
- as ``tablename_columnname``::
- >>> from sqlalchemy import select, column, table
- >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL
- >>> t = table("t", column("c"))
- >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
- >>> subq = stmt.subquery()
- >>> subq.c.t_c
- <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c>
- :class:`.ColumnCollection` also indexes the columns in order and allows
- them to be accessible by their integer position::
- >>> cc[0]
- Column('x', Integer(), table=None)
- >>> cc[1]
- Column('y', Integer(), table=None)
- .. versionadded:: 1.4 :class:`_expression.ColumnCollection`
- allows integer-based
- index access to the collection.
- Iterating the collection yields the column expressions in order::
- >>> list(cc)
- [Column('x', Integer(), table=None),
- Column('y', Integer(), table=None)]
- The base :class:`_expression.ColumnCollection` object can store
- duplicates, which can
- mean either two columns with the same key, in which case the column
- returned by key access is **arbitrary**::
- >>> x1, x2 = Column('x', Integer), Column('x', Integer)
- >>> cc = ColumnCollection(columns=[(x1.name, x1), (x2.name, x2)])
- >>> list(cc)
- [Column('x', Integer(), table=None),
- Column('x', Integer(), table=None)]
- >>> cc['x'] is x1
- False
- >>> cc['x'] is x2
- True
- Or it can also mean the same column multiple times. These cases are
- supported as :class:`_expression.ColumnCollection`
- is used to represent the columns in
- a SELECT statement which may include duplicates.
- A special subclass :class:`.DedupeColumnCollection` exists which instead
- maintains SQLAlchemy's older behavior of not allowing duplicates; this
- collection is used for schema level objects like :class:`_schema.Table`
- and
- :class:`.PrimaryKeyConstraint` where this deduping is helpful. The
- :class:`.DedupeColumnCollection` class also has additional mutation methods
- as the schema constructs have more use cases that require removal and
- replacement of columns.
- .. versionchanged:: 1.4 :class:`_expression.ColumnCollection`
- now stores duplicate
- column keys as well as the same column in multiple positions. The
- :class:`.DedupeColumnCollection` class is added to maintain the
- former behavior in those cases where deduplication as well as
- additional replace/remove operations are needed.
- """
- __slots__ = "_collection", "_index", "_colset"
- def __init__(self, columns=None):
- object.__setattr__(self, "_colset", set())
- object.__setattr__(self, "_index", {})
- object.__setattr__(self, "_collection", [])
- if columns:
- self._initial_populate(columns)
- def _initial_populate(self, iter_):
- self._populate_separate_keys(iter_)
- @property
- def _all_columns(self):
- return [col for (k, col) in self._collection]
- def keys(self):
- """Return a sequence of string key names for all columns in this
- collection."""
- return [k for (k, col) in self._collection]
- def values(self):
- """Return a sequence of :class:`_sql.ColumnClause` or
- :class:`_schema.Column` objects for all columns in this
- collection."""
- return [col for (k, col) in self._collection]
- def items(self):
- """Return a sequence of (key, column) tuples for all columns in this
- collection each consisting of a string key name and a
- :class:`_sql.ColumnClause` or
- :class:`_schema.Column` object.
- """
- return list(self._collection)
- def __bool__(self):
- return bool(self._collection)
- def __len__(self):
- return len(self._collection)
- def __iter__(self):
- # turn to a list first to maintain over a course of changes
- return iter([col for k, col in self._collection])
- def __getitem__(self, key):
- try:
- return self._index[key]
- except KeyError as err:
- if isinstance(key, util.int_types):
- util.raise_(IndexError(key), replace_context=err)
- else:
- raise
- def __getattr__(self, key):
- try:
- return self._index[key]
- except KeyError as err:
- util.raise_(AttributeError(key), replace_context=err)
- def __contains__(self, key):
- if key not in self._index:
- if not isinstance(key, util.string_types):
- raise exc.ArgumentError(
- "__contains__ requires a string argument"
- )
- return False
- else:
- return True
- def compare(self, other):
- """Compare this :class:`_expression.ColumnCollection` to another
- based on the names of the keys"""
- for l, r in util.zip_longest(self, other):
- if l is not r:
- return False
- else:
- return True
- def __eq__(self, other):
- return self.compare(other)
- def get(self, key, default=None):
- """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object
- based on a string key name from this
- :class:`_expression.ColumnCollection`."""
- if key in self._index:
- return self._index[key]
- else:
- return default
- def __str__(self):
- return "%s(%s)" % (
- self.__class__.__name__,
- ", ".join(str(c) for c in self),
- )
- def __setitem__(self, key, value):
- raise NotImplementedError()
- def __delitem__(self, key):
- raise NotImplementedError()
- def __setattr__(self, key, obj):
- raise NotImplementedError()
- def clear(self):
- """Dictionary clear() is not implemented for
- :class:`_sql.ColumnCollection`."""
- raise NotImplementedError()
- def remove(self, column):
- """Dictionary remove() is not implemented for
- :class:`_sql.ColumnCollection`."""
- raise NotImplementedError()
- def update(self, iter_):
- """Dictionary update() is not implemented for
- :class:`_sql.ColumnCollection`."""
- raise NotImplementedError()
- __hash__ = None
- def _populate_separate_keys(self, iter_):
- """populate from an iterator of (key, column)"""
- cols = list(iter_)
- self._collection[:] = cols
- self._colset.update(c for k, c in self._collection)
- self._index.update(
- (idx, c) for idx, (k, c) in enumerate(self._collection)
- )
- self._index.update({k: col for k, col in reversed(self._collection)})
- def add(self, column, key=None):
- """Add a column to this :class:`_sql.ColumnCollection`.
- .. note::
- This method is **not normally used by user-facing code**, as the
- :class:`_sql.ColumnCollection` is usually part of an existing
- object such as a :class:`_schema.Table`. To add a
- :class:`_schema.Column` to an existing :class:`_schema.Table`
- object, use the :meth:`_schema.Table.append_column` method.
- """
- if key is None:
- key = column.key
- l = len(self._collection)
- self._collection.append((key, column))
- self._colset.add(column)
- self._index[l] = column
- if key not in self._index:
- self._index[key] = column
- def __getstate__(self):
- return {"_collection": self._collection, "_index": self._index}
- def __setstate__(self, state):
- object.__setattr__(self, "_index", state["_index"])
- object.__setattr__(self, "_collection", state["_collection"])
- object.__setattr__(
- self, "_colset", {col for k, col in self._collection}
- )
- def contains_column(self, col):
- """Checks if a column object exists in this collection"""
- if col not in self._colset:
- if isinstance(col, util.string_types):
- raise exc.ArgumentError(
- "contains_column cannot be used with string arguments. "
- "Use ``col_name in table.c`` instead."
- )
- return False
- else:
- return True
- def as_immutable(self):
- """Return an "immutable" form of this
- :class:`_sql.ColumnCollection`."""
- return ImmutableColumnCollection(self)
- def corresponding_column(self, column, require_embedded=False):
- """Given a :class:`_expression.ColumnElement`, return the exported
- :class:`_expression.ColumnElement` object from this
- :class:`_expression.ColumnCollection`
- which corresponds to that original :class:`_expression.ColumnElement`
- via a common
- ancestor column.
- :param column: the target :class:`_expression.ColumnElement`
- to be matched.
- :param require_embedded: only return corresponding columns for
- the given :class:`_expression.ColumnElement`, if the given
- :class:`_expression.ColumnElement`
- is actually present within a sub-element
- of this :class:`_expression.Selectable`.
- Normally the column will match if
- it merely shares a common ancestor with one of the exported
- columns of this :class:`_expression.Selectable`.
- .. seealso::
- :meth:`_expression.Selectable.corresponding_column`
- - invokes this method
- against the collection returned by
- :attr:`_expression.Selectable.exported_columns`.
- .. versionchanged:: 1.4 the implementation for ``corresponding_column``
- was moved onto the :class:`_expression.ColumnCollection` itself.
- """
- def embedded(expanded_proxy_set, target_set):
- for t in target_set.difference(expanded_proxy_set):
- if not set(_expand_cloned([t])).intersection(
- expanded_proxy_set
- ):
- return False
- return True
- # don't dig around if the column is locally present
- if column in self._colset:
- return column
- col, intersect = None, None
- target_set = column.proxy_set
- cols = [c for (k, c) in self._collection]
- for c in cols:
- expanded_proxy_set = set(_expand_cloned(c.proxy_set))
- i = target_set.intersection(expanded_proxy_set)
- if i and (
- not require_embedded
- or embedded(expanded_proxy_set, target_set)
- ):
- if col is None:
- # no corresponding column yet, pick this one.
- col, intersect = c, i
- elif len(i) > len(intersect):
- # 'c' has a larger field of correspondence than
- # 'col'. i.e. selectable.c.a1_x->a1.c.x->table.c.x
- # matches a1.c.x->table.c.x better than
- # selectable.c.x->table.c.x does.
- col, intersect = c, i
- elif i == intersect:
- # they have the same field of correspondence. see
- # which proxy_set has fewer columns in it, which
- # indicates a closer relationship with the root
- # column. Also take into account the "weight"
- # attribute which CompoundSelect() uses to give
- # higher precedence to columns based on vertical
- # position in the compound statement, and discard
- # columns that have no reference to the target
- # column (also occurs with CompoundSelect)
- col_distance = util.reduce(
- operator.add,
- [
- sc._annotations.get("weight", 1)
- for sc in col._uncached_proxy_set()
- if sc.shares_lineage(column)
- ],
- )
- c_distance = util.reduce(
- operator.add,
- [
- sc._annotations.get("weight", 1)
- for sc in c._uncached_proxy_set()
- if sc.shares_lineage(column)
- ],
- )
- if c_distance < col_distance:
- col, intersect = c, i
- return col
- class DedupeColumnCollection(ColumnCollection):
- """A :class:`_expression.ColumnCollection`
- that maintains deduplicating behavior.
- This is useful by schema level objects such as :class:`_schema.Table` and
- :class:`.PrimaryKeyConstraint`. The collection includes more
- sophisticated mutator methods as well to suit schema objects which
- require mutable column collections.
- .. versionadded:: 1.4
- """
- def add(self, column, key=None):
- if key is not None and column.key != key:
- raise exc.ArgumentError(
- "DedupeColumnCollection requires columns be under "
- "the same key as their .key"
- )
- key = column.key
- if key is None:
- raise exc.ArgumentError(
- "Can't add unnamed column to column collection"
- )
- if key in self._index:
- existing = self._index[key]
- if existing is column:
- return
- self.replace(column)
- # pop out memoized proxy_set as this
- # operation may very well be occurring
- # in a _make_proxy operation
- util.memoized_property.reset(column, "proxy_set")
- else:
- l = len(self._collection)
- self._collection.append((key, column))
- self._colset.add(column)
- self._index[l] = column
- self._index[key] = column
- def _populate_separate_keys(self, iter_):
- """populate from an iterator of (key, column)"""
- cols = list(iter_)
- replace_col = []
- for k, col in cols:
- if col.key != k:
- raise exc.ArgumentError(
- "DedupeColumnCollection requires columns be under "
- "the same key as their .key"
- )
- if col.name in self._index and col.key != col.name:
- replace_col.append(col)
- elif col.key in self._index:
- replace_col.append(col)
- else:
- self._index[k] = col
- self._collection.append((k, col))
- self._colset.update(c for (k, c) in self._collection)
- self._index.update(
- (idx, c) for idx, (k, c) in enumerate(self._collection)
- )
- for col in replace_col:
- self.replace(col)
- def extend(self, iter_):
- self._populate_separate_keys((col.key, col) for col in iter_)
- def remove(self, column):
- if column not in self._colset:
- raise ValueError(
- "Can't remove column %r; column is not in this collection"
- % column
- )
- del self._index[column.key]
- self._colset.remove(column)
- self._collection[:] = [
- (k, c) for (k, c) in self._collection if c is not column
- ]
- self._index.update(
- {idx: col for idx, (k, col) in enumerate(self._collection)}
- )
- # delete higher index
- del self._index[len(self._collection)]
- def replace(self, column):
- """add the given column to this collection, removing unaliased
- versions of this column as well as existing columns with the
- same key.
- e.g.::
- t = Table('sometable', metadata, Column('col1', Integer))
- t.columns.replace(Column('col1', Integer, key='columnone'))
- will remove the original 'col1' from the collection, and add
- the new column under the name 'columnname'.
- Used by schema.Column to override columns during table reflection.
- """
- remove_col = set()
- # remove up to two columns based on matches of name as well as key
- if column.name in self._index and column.key != column.name:
- other = self._index[column.name]
- if other.name == other.key:
- remove_col.add(other)
- if column.key in self._index:
- remove_col.add(self._index[column.key])
- new_cols = []
- replaced = False
- for k, col in self._collection:
- if col in remove_col:
- if not replaced:
- replaced = True
- new_cols.append((column.key, column))
- else:
- new_cols.append((k, col))
- if remove_col:
- self._colset.difference_update(remove_col)
- if not replaced:
- new_cols.append((column.key, column))
- self._colset.add(column)
- self._collection[:] = new_cols
- self._index.clear()
- self._index.update(
- {idx: col for idx, (k, col) in enumerate(self._collection)}
- )
- self._index.update(self._collection)
- class ImmutableColumnCollection(util.ImmutableContainer, ColumnCollection):
- __slots__ = ("_parent",)
- def __init__(self, collection):
- object.__setattr__(self, "_parent", collection)
- object.__setattr__(self, "_colset", collection._colset)
- object.__setattr__(self, "_index", collection._index)
- object.__setattr__(self, "_collection", collection._collection)
- def __getstate__(self):
- return {"_parent": self._parent}
- def __setstate__(self, state):
- parent = state["_parent"]
- self.__init__(parent)
- add = extend = remove = util.ImmutableContainer._immutable
- class ColumnSet(util.ordered_column_set):
- def contains_column(self, col):
- return col in self
- def extend(self, cols):
- for col in cols:
- self.add(col)
- def __add__(self, other):
- return list(self) + list(other)
- def __eq__(self, other):
- l = []
- for c in other:
- for local in self:
- if c.shares_lineage(local):
- l.append(c == local)
- return elements.and_(*l)
- def __hash__(self):
- return hash(tuple(x for x in self))
- def _bind_or_error(schemaitem, msg=None):
- util.warn_deprecated_20(
- "The ``bind`` argument for schema methods that invoke SQL "
- "against an engine or connection will be required in SQLAlchemy 2.0."
- )
- bind = schemaitem.bind
- if not bind:
- name = schemaitem.__class__.__name__
- label = getattr(
- schemaitem, "fullname", getattr(schemaitem, "name", None)
- )
- if label:
- item = "%s object %r" % (name, label)
- else:
- item = "%s object" % name
- if msg is None:
- msg = (
- "%s is not bound to an Engine or Connection. "
- "Execution can not proceed without a database to execute "
- "against." % item
- )
- raise exc.UnboundExecutionError(msg)
- return bind
- def _entity_namespace(entity):
- """Return the nearest .entity_namespace for the given entity.
- If not immediately available, does an iterate to find a sub-element
- that has one, if any.
- """
- try:
- return entity.entity_namespace
- except AttributeError:
- for elem in visitors.iterate(entity):
- if hasattr(elem, "entity_namespace"):
- return elem.entity_namespace
- else:
- raise
- def _entity_namespace_key(entity, key, default=NO_ARG):
- """Return an entry from an entity_namespace.
- Raises :class:`_exc.InvalidRequestError` rather than attribute error
- on not found.
- """
- try:
- ns = _entity_namespace(entity)
- if default is not NO_ARG:
- return getattr(ns, key, default)
- else:
- return getattr(ns, key)
- except AttributeError as err:
- util.raise_(
- exc.InvalidRequestError(
- 'Entity namespace for "%s" has no property "%s"'
- % (entity, key)
- ),
- replace_context=err,
- )
|