123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167 |
- # ext/declarative/base.py
- # Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
- # <see AUTHORS file>
- #
- # This module is part of SQLAlchemy and is released under
- # the MIT License: https://www.opensource.org/licenses/mit-license.php
- """Internal implementation for declarative."""
- from __future__ import absolute_import
- import collections
- import weakref
- from sqlalchemy.orm import attributes
- from sqlalchemy.orm import instrumentation
- from . import clsregistry
- from . import exc as orm_exc
- from . import mapper as mapperlib
- from .attributes import InstrumentedAttribute
- from .attributes import QueryableAttribute
- from .base import _is_mapped_class
- from .base import InspectionAttr
- from .descriptor_props import CompositeProperty
- from .descriptor_props import SynonymProperty
- from .interfaces import MapperProperty
- from .mapper import Mapper as mapper
- from .properties import ColumnProperty
- from .util import class_mapper
- from .. import event
- from .. import exc
- from .. import util
- from ..sql import expression
- from ..sql.schema import Column
- from ..sql.schema import Table
- from ..util import topological
- def _declared_mapping_info(cls):
- # deferred mapping
- if _DeferredMapperConfig.has_cls(cls):
- return _DeferredMapperConfig.config_for_cls(cls)
- # regular mapping
- elif _is_mapped_class(cls):
- return class_mapper(cls, configure=False)
- else:
- return None
- def _resolve_for_abstract_or_classical(cls):
- if cls is object:
- return None
- if cls.__dict__.get("__abstract__", False):
- for sup in cls.__bases__:
- sup = _resolve_for_abstract_or_classical(sup)
- if sup is not None:
- return sup
- else:
- return None
- else:
- clsmanager = _dive_for_cls_manager(cls)
- if clsmanager:
- return clsmanager.class_
- else:
- return cls
- def _get_immediate_cls_attr(cls, attrname, strict=False):
- """return an attribute of the class that is either present directly
- on the class, e.g. not on a superclass, or is from a superclass but
- this superclass is a non-mapped mixin, that is, not a descendant of
- the declarative base and is also not classically mapped.
- This is used to detect attributes that indicate something about
- a mapped class independently from any mapped classes that it may
- inherit from.
- """
- # the rules are different for this name than others,
- # make sure we've moved it out. transitional
- assert attrname != "__abstract__"
- if not issubclass(cls, object):
- return None
- if attrname in cls.__dict__:
- return getattr(cls, attrname)
- for base in cls.__mro__[1:]:
- _is_classicial_inherits = _dive_for_cls_manager(base)
- if attrname in base.__dict__ and (
- base is cls
- or (
- (base in cls.__bases__ if strict else True)
- and not _is_classicial_inherits
- )
- ):
- return getattr(base, attrname)
- else:
- return None
- def _dive_for_cls_manager(cls):
- # because the class manager registration is pluggable,
- # we need to do the search for every class in the hierarchy,
- # rather than just a simple "cls._sa_class_manager"
- # python 2 old style class
- if not hasattr(cls, "__mro__"):
- return None
- for base in cls.__mro__:
- manager = attributes.manager_of_class(base)
- if manager:
- return manager
- return None
- def _as_declarative(registry, cls, dict_):
- # declarative scans the class for attributes. no table or mapper
- # args passed separately.
- return _MapperConfig.setup_mapping(registry, cls, dict_, None, {})
- def _mapper(registry, cls, table, mapper_kw):
- _ImperativeMapperConfig(registry, cls, table, mapper_kw)
- return cls.__mapper__
- @util.preload_module("sqlalchemy.orm.decl_api")
- def _is_declarative_props(obj):
- declared_attr = util.preloaded.orm_decl_api.declared_attr
- return isinstance(obj, (declared_attr, util.classproperty))
- def _check_declared_props_nocascade(obj, name, cls):
- if _is_declarative_props(obj):
- if getattr(obj, "_cascading", False):
- util.warn(
- "@declared_attr.cascading is not supported on the %s "
- "attribute on class %s. This attribute invokes for "
- "subclasses in any case." % (name, cls)
- )
- return True
- else:
- return False
- class _MapperConfig(object):
- __slots__ = ("cls", "classname", "properties", "declared_attr_reg")
- @classmethod
- def setup_mapping(cls, registry, cls_, dict_, table, mapper_kw):
- manager = attributes.manager_of_class(cls)
- if manager and manager.class_ is cls_:
- raise exc.InvalidRequestError(
- "Class %r already has been " "instrumented declaratively" % cls
- )
- if cls_.__dict__.get("__abstract__", False):
- return
- defer_map = _get_immediate_cls_attr(
- cls_, "_sa_decl_prepare_nocascade", strict=True
- ) or hasattr(cls_, "_sa_decl_prepare")
- if defer_map:
- cfg_cls = _DeferredMapperConfig
- else:
- cfg_cls = _ClassScanMapperConfig
- return cfg_cls(registry, cls_, dict_, table, mapper_kw)
- def __init__(self, registry, cls_, mapper_kw):
- self.cls = util.assert_arg_type(cls_, type, "cls_")
- self.classname = cls_.__name__
- self.properties = util.OrderedDict()
- self.declared_attr_reg = {}
- if not mapper_kw.get("non_primary", False):
- instrumentation.register_class(
- self.cls,
- finalize=False,
- registry=registry,
- declarative_scan=self,
- init_method=registry.constructor,
- )
- else:
- manager = attributes.manager_of_class(self.cls)
- if not manager or not manager.is_mapped:
- raise exc.InvalidRequestError(
- "Class %s has no primary mapper configured. Configure "
- "a primary mapper first before setting up a non primary "
- "Mapper." % self.cls
- )
- def set_cls_attribute(self, attrname, value):
- manager = instrumentation.manager_of_class(self.cls)
- manager.install_member(attrname, value)
- return value
- def _early_mapping(self, mapper_kw):
- self.map(mapper_kw)
- class _ImperativeMapperConfig(_MapperConfig):
- __slots__ = ("dict_", "local_table", "inherits")
- def __init__(
- self,
- registry,
- cls_,
- table,
- mapper_kw,
- ):
- super(_ImperativeMapperConfig, self).__init__(
- registry, cls_, mapper_kw
- )
- self.dict_ = {}
- self.local_table = self.set_cls_attribute("__table__", table)
- with mapperlib._CONFIGURE_MUTEX:
- if not mapper_kw.get("non_primary", False):
- clsregistry.add_class(
- self.classname, self.cls, registry._class_registry
- )
- self._setup_inheritance(mapper_kw)
- self._early_mapping(mapper_kw)
- def map(self, mapper_kw=util.EMPTY_DICT):
- mapper_cls = mapper
- return self.set_cls_attribute(
- "__mapper__",
- mapper_cls(self.cls, self.local_table, **mapper_kw),
- )
- def _setup_inheritance(self, mapper_kw):
- cls = self.cls
- inherits = mapper_kw.get("inherits", None)
- if inherits is None:
- # since we search for classical mappings now, search for
- # multiple mapped bases as well and raise an error.
- inherits_search = []
- for c in cls.__bases__:
- c = _resolve_for_abstract_or_classical(c)
- if c is None:
- continue
- if _declared_mapping_info(
- c
- ) is not None and not _get_immediate_cls_attr(
- c, "_sa_decl_prepare_nocascade", strict=True
- ):
- inherits_search.append(c)
- if inherits_search:
- if len(inherits_search) > 1:
- raise exc.InvalidRequestError(
- "Class %s has multiple mapped bases: %r"
- % (cls, inherits_search)
- )
- inherits = inherits_search[0]
- elif isinstance(inherits, mapper):
- inherits = inherits.class_
- self.inherits = inherits
- class _ClassScanMapperConfig(_MapperConfig):
- __slots__ = (
- "dict_",
- "local_table",
- "persist_selectable",
- "declared_columns",
- "column_copies",
- "table_args",
- "tablename",
- "mapper_args",
- "mapper_args_fn",
- "inherits",
- )
- def __init__(
- self,
- registry,
- cls_,
- dict_,
- table,
- mapper_kw,
- ):
- super(_ClassScanMapperConfig, self).__init__(registry, cls_, mapper_kw)
- self.dict_ = dict(dict_) if dict_ else {}
- self.persist_selectable = None
- self.declared_columns = set()
- self.column_copies = {}
- self._setup_declared_events()
- self._scan_attributes()
- with mapperlib._CONFIGURE_MUTEX:
- clsregistry.add_class(
- self.classname, self.cls, registry._class_registry
- )
- self._extract_mappable_attributes()
- self._extract_declared_columns()
- self._setup_table(table)
- self._setup_inheritance(mapper_kw)
- self._early_mapping(mapper_kw)
- def _setup_declared_events(self):
- if _get_immediate_cls_attr(self.cls, "__declare_last__"):
- @event.listens_for(mapper, "after_configured")
- def after_configured():
- self.cls.__declare_last__()
- if _get_immediate_cls_attr(self.cls, "__declare_first__"):
- @event.listens_for(mapper, "before_configured")
- def before_configured():
- self.cls.__declare_first__()
- def _cls_attr_override_checker(self, cls):
- """Produce a function that checks if a class has overridden an
- attribute, taking SQLAlchemy-enabled dataclass fields into account.
- """
- sa_dataclass_metadata_key = _get_immediate_cls_attr(
- cls, "__sa_dataclass_metadata_key__", None
- )
- if sa_dataclass_metadata_key is None:
- def attribute_is_overridden(key, obj):
- return getattr(cls, key) is not obj
- else:
- all_datacls_fields = {
- f.name: f.metadata[sa_dataclass_metadata_key]
- for f in util.dataclass_fields(cls)
- if sa_dataclass_metadata_key in f.metadata
- }
- local_datacls_fields = {
- f.name: f.metadata[sa_dataclass_metadata_key]
- for f in util.local_dataclass_fields(cls)
- if sa_dataclass_metadata_key in f.metadata
- }
- absent = object()
- def attribute_is_overridden(key, obj):
- if _is_declarative_props(obj):
- obj = obj.fget
- # this function likely has some failure modes still if
- # someone is doing a deep mixing of the same attribute
- # name as plain Python attribute vs. dataclass field.
- ret = local_datacls_fields.get(key, absent)
- if _is_declarative_props(ret):
- ret = ret.fget
- if ret is obj:
- return False
- elif ret is not absent:
- return True
- all_field = all_datacls_fields.get(key, absent)
- ret = getattr(cls, key, obj)
- if ret is obj:
- return False
- # for dataclasses, this could be the
- # 'default' of the field. so filter more specifically
- # for an already-mapped InstrumentedAttribute
- if ret is not absent and isinstance(
- ret, InstrumentedAttribute
- ):
- return True
- if all_field is obj:
- return False
- elif all_field is not absent:
- return True
- # can't find another attribute
- return False
- return attribute_is_overridden
- def _cls_attr_resolver(self, cls):
- """produce a function to iterate the "attributes" of a class,
- adjusting for SQLAlchemy fields embedded in dataclass fields.
- """
- sa_dataclass_metadata_key = _get_immediate_cls_attr(
- cls, "__sa_dataclass_metadata_key__", None
- )
- if sa_dataclass_metadata_key is None:
- def local_attributes_for_class():
- for name, obj in vars(cls).items():
- yield name, obj, False
- else:
- field_names = set()
- def local_attributes_for_class():
- for field in util.local_dataclass_fields(cls):
- if sa_dataclass_metadata_key in field.metadata:
- field_names.add(field.name)
- yield field.name, _as_dc_declaredattr(
- field.metadata, sa_dataclass_metadata_key
- ), True
- for name, obj in vars(cls).items():
- if name not in field_names:
- yield name, obj, False
- return local_attributes_for_class
- def _scan_attributes(self):
- cls = self.cls
- dict_ = self.dict_
- column_copies = self.column_copies
- mapper_args_fn = None
- table_args = inherited_table_args = None
- tablename = None
- attribute_is_overridden = self._cls_attr_override_checker(self.cls)
- for base in cls.__mro__:
- class_mapped = (
- base is not cls
- and _declared_mapping_info(base) is not None
- and not _get_immediate_cls_attr(
- base, "_sa_decl_prepare_nocascade", strict=True
- )
- )
- local_attributes_for_class = self._cls_attr_resolver(base)
- if not class_mapped and base is not cls:
- self._produce_column_copies(
- local_attributes_for_class, attribute_is_overridden
- )
- for name, obj, is_dataclass in local_attributes_for_class():
- if name == "__mapper_args__":
- check_decl = _check_declared_props_nocascade(
- obj, name, cls
- )
- if not mapper_args_fn and (not class_mapped or check_decl):
- # don't even invoke __mapper_args__ until
- # after we've determined everything about the
- # mapped table.
- # make a copy of it so a class-level dictionary
- # is not overwritten when we update column-based
- # arguments.
- def mapper_args_fn():
- return dict(cls.__mapper_args__)
- elif name == "__tablename__":
- check_decl = _check_declared_props_nocascade(
- obj, name, cls
- )
- if not tablename and (not class_mapped or check_decl):
- tablename = cls.__tablename__
- elif name == "__table_args__":
- check_decl = _check_declared_props_nocascade(
- obj, name, cls
- )
- if not table_args and (not class_mapped or check_decl):
- table_args = cls.__table_args__
- if not isinstance(
- table_args, (tuple, dict, type(None))
- ):
- raise exc.ArgumentError(
- "__table_args__ value must be a tuple, "
- "dict, or None"
- )
- if base is not cls:
- inherited_table_args = True
- elif class_mapped:
- if _is_declarative_props(obj):
- util.warn(
- "Regular (i.e. not __special__) "
- "attribute '%s.%s' uses @declared_attr, "
- "but owning class %s is mapped - "
- "not applying to subclass %s."
- % (base.__name__, name, base, cls)
- )
- continue
- elif base is not cls:
- # we're a mixin, abstract base, or something that is
- # acting like that for now.
- if isinstance(obj, Column):
- # already copied columns to the mapped class.
- continue
- elif isinstance(obj, MapperProperty):
- raise exc.InvalidRequestError(
- "Mapper properties (i.e. deferred,"
- "column_property(), relationship(), etc.) must "
- "be declared as @declared_attr callables "
- "on declarative mixin classes. For dataclass "
- "field() objects, use a lambda:"
- )
- elif _is_declarative_props(obj):
- if obj._cascading:
- if name in dict_:
- # unfortunately, while we can use the user-
- # defined attribute here to allow a clean
- # override, if there's another
- # subclass below then it still tries to use
- # this. not sure if there is enough
- # information here to add this as a feature
- # later on.
- util.warn(
- "Attribute '%s' on class %s cannot be "
- "processed due to "
- "@declared_attr.cascading; "
- "skipping" % (name, cls)
- )
- dict_[name] = column_copies[
- obj
- ] = ret = obj.__get__(obj, cls)
- setattr(cls, name, ret)
- else:
- if is_dataclass:
- # access attribute using normal class access
- # first, to see if it's been mapped on a
- # superclass. note if the dataclasses.field()
- # has "default", this value can be anything.
- ret = getattr(cls, name, None)
- # so, if it's anything that's not ORM
- # mapped, assume we should invoke the
- # declared_attr
- if not isinstance(ret, InspectionAttr):
- ret = obj.fget()
- else:
- # access attribute using normal class access.
- # if the declared attr already took place
- # on a superclass that is mapped, then
- # this is no longer a declared_attr, it will
- # be the InstrumentedAttribute
- ret = getattr(cls, name)
- # correct for proxies created from hybrid_property
- # or similar. note there is no known case that
- # produces nested proxies, so we are only
- # looking one level deep right now.
- if (
- isinstance(ret, InspectionAttr)
- and ret._is_internal_proxy
- and not isinstance(
- ret.original_property, MapperProperty
- )
- ):
- ret = ret.descriptor
- dict_[name] = column_copies[obj] = ret
- if (
- isinstance(ret, (Column, MapperProperty))
- and ret.doc is None
- ):
- ret.doc = obj.__doc__
- # here, the attribute is some other kind of property that
- # we assume is not part of the declarative mapping.
- # however, check for some more common mistakes
- else:
- self._warn_for_decl_attributes(base, name, obj)
- elif is_dataclass and (
- name not in dict_ or dict_[name] is not obj
- ):
- # here, we are definitely looking at the target class
- # and not a superclass. this is currently a
- # dataclass-only path. if the name is only
- # a dataclass field and isn't in local cls.__dict__,
- # put the object there.
- # assert that the dataclass-enabled resolver agrees
- # with what we are seeing
- assert not attribute_is_overridden(name, obj)
- if _is_declarative_props(obj):
- obj = obj.fget()
- dict_[name] = obj
- if inherited_table_args and not tablename:
- table_args = None
- self.table_args = table_args
- self.tablename = tablename
- self.mapper_args_fn = mapper_args_fn
- def _warn_for_decl_attributes(self, cls, key, c):
- if isinstance(c, expression.ColumnClause):
- util.warn(
- "Attribute '%s' on class %s appears to be a non-schema "
- "'sqlalchemy.sql.column()' "
- "object; this won't be part of the declarative mapping"
- % (key, cls)
- )
- def _produce_column_copies(
- self, attributes_for_class, attribute_is_overridden
- ):
- cls = self.cls
- dict_ = self.dict_
- column_copies = self.column_copies
- # copy mixin columns to the mapped class
- for name, obj, is_dataclass in attributes_for_class():
- if isinstance(obj, Column):
- if attribute_is_overridden(name, obj):
- # if column has been overridden
- # (like by the InstrumentedAttribute of the
- # superclass), skip
- continue
- elif obj.foreign_keys:
- raise exc.InvalidRequestError(
- "Columns with foreign keys to other columns "
- "must be declared as @declared_attr callables "
- "on declarative mixin classes. For dataclass "
- "field() objects, use a lambda:."
- )
- elif name not in dict_ and not (
- "__table__" in dict_
- and (obj.name or name) in dict_["__table__"].c
- ):
- column_copies[obj] = copy_ = obj._copy()
- copy_._creation_order = obj._creation_order
- setattr(cls, name, copy_)
- dict_[name] = copy_
- def _extract_mappable_attributes(self):
- cls = self.cls
- dict_ = self.dict_
- our_stuff = self.properties
- late_mapped = _get_immediate_cls_attr(
- cls, "_sa_decl_prepare_nocascade", strict=True
- )
- for k in list(dict_):
- if k in ("__table__", "__tablename__", "__mapper_args__"):
- continue
- value = dict_[k]
- if _is_declarative_props(value):
- if value._cascading:
- util.warn(
- "Use of @declared_attr.cascading only applies to "
- "Declarative 'mixin' and 'abstract' classes. "
- "Currently, this flag is ignored on mapped class "
- "%s" % self.cls
- )
- value = getattr(cls, k)
- elif (
- isinstance(value, QueryableAttribute)
- and value.class_ is not cls
- and value.key != k
- ):
- # detect a QueryableAttribute that's already mapped being
- # assigned elsewhere in userland, turn into a synonym()
- value = SynonymProperty(value.key)
- setattr(cls, k, value)
- if (
- isinstance(value, tuple)
- and len(value) == 1
- and isinstance(value[0], (Column, MapperProperty))
- ):
- util.warn(
- "Ignoring declarative-like tuple value of attribute "
- "'%s': possibly a copy-and-paste error with a comma "
- "accidentally placed at the end of the line?" % k
- )
- continue
- elif not isinstance(value, (Column, MapperProperty)):
- # using @declared_attr for some object that
- # isn't Column/MapperProperty; remove from the dict_
- # and place the evaluated value onto the class.
- if not k.startswith("__"):
- dict_.pop(k)
- self._warn_for_decl_attributes(cls, k, value)
- if not late_mapped:
- setattr(cls, k, value)
- continue
- # we expect to see the name 'metadata' in some valid cases;
- # however at this point we see it's assigned to something trying
- # to be mapped, so raise for that.
- elif k == "metadata":
- raise exc.InvalidRequestError(
- "Attribute name 'metadata' is reserved "
- "for the MetaData instance when using a "
- "declarative base class."
- )
- our_stuff[k] = value
- def _extract_declared_columns(self):
- our_stuff = self.properties
- # set up attributes in the order they were created
- util.sort_dictionary(
- our_stuff, key=lambda key: our_stuff[key]._creation_order
- )
- # extract columns from the class dict
- declared_columns = self.declared_columns
- name_to_prop_key = collections.defaultdict(set)
- for key, c in list(our_stuff.items()):
- if isinstance(c, (ColumnProperty, CompositeProperty)):
- for col in c.columns:
- if isinstance(col, Column) and col.table is None:
- _undefer_column_name(key, col)
- if not isinstance(c, CompositeProperty):
- name_to_prop_key[col.name].add(key)
- declared_columns.add(col)
- elif isinstance(c, Column):
- _undefer_column_name(key, c)
- name_to_prop_key[c.name].add(key)
- declared_columns.add(c)
- # if the column is the same name as the key,
- # remove it from the explicit properties dict.
- # the normal rules for assigning column-based properties
- # will take over, including precedence of columns
- # in multi-column ColumnProperties.
- if key == c.key:
- del our_stuff[key]
- for name, keys in name_to_prop_key.items():
- if len(keys) > 1:
- util.warn(
- "On class %r, Column object %r named "
- "directly multiple times, "
- "only one will be used: %s. "
- "Consider using orm.synonym instead"
- % (self.classname, name, (", ".join(sorted(keys))))
- )
- def _setup_table(self, table=None):
- cls = self.cls
- tablename = self.tablename
- table_args = self.table_args
- dict_ = self.dict_
- declared_columns = self.declared_columns
- manager = attributes.manager_of_class(cls)
- declared_columns = self.declared_columns = sorted(
- declared_columns, key=lambda c: c._creation_order
- )
- if "__table__" not in dict_ and table is None:
- if hasattr(cls, "__table_cls__"):
- table_cls = util.unbound_method_to_callable(cls.__table_cls__)
- else:
- table_cls = Table
- if tablename is not None:
- args, table_kw = (), {}
- if table_args:
- if isinstance(table_args, dict):
- table_kw = table_args
- elif isinstance(table_args, tuple):
- if isinstance(table_args[-1], dict):
- args, table_kw = table_args[0:-1], table_args[-1]
- else:
- args = table_args
- autoload_with = dict_.get("__autoload_with__")
- if autoload_with:
- table_kw["autoload_with"] = autoload_with
- autoload = dict_.get("__autoload__")
- if autoload:
- table_kw["autoload"] = True
- table = self.set_cls_attribute(
- "__table__",
- table_cls(
- tablename,
- self._metadata_for_cls(manager),
- *(tuple(declared_columns) + tuple(args)),
- **table_kw
- ),
- )
- else:
- if table is None:
- table = cls.__table__
- if declared_columns:
- for c in declared_columns:
- if not table.c.contains_column(c):
- raise exc.ArgumentError(
- "Can't add additional column %r when "
- "specifying __table__" % c.key
- )
- self.local_table = table
- def _metadata_for_cls(self, manager):
- if hasattr(self.cls, "metadata"):
- return self.cls.metadata
- else:
- return manager.registry.metadata
- def _setup_inheritance(self, mapper_kw):
- table = self.local_table
- cls = self.cls
- table_args = self.table_args
- declared_columns = self.declared_columns
- inherits = mapper_kw.get("inherits", None)
- if inherits is None:
- # since we search for classical mappings now, search for
- # multiple mapped bases as well and raise an error.
- inherits_search = []
- for c in cls.__bases__:
- c = _resolve_for_abstract_or_classical(c)
- if c is None:
- continue
- if _declared_mapping_info(
- c
- ) is not None and not _get_immediate_cls_attr(
- c, "_sa_decl_prepare_nocascade", strict=True
- ):
- if c not in inherits_search:
- inherits_search.append(c)
- if inherits_search:
- if len(inherits_search) > 1:
- raise exc.InvalidRequestError(
- "Class %s has multiple mapped bases: %r"
- % (cls, inherits_search)
- )
- inherits = inherits_search[0]
- elif isinstance(inherits, mapper):
- inherits = inherits.class_
- self.inherits = inherits
- if (
- table is None
- and self.inherits is None
- and not _get_immediate_cls_attr(cls, "__no_table__")
- ):
- raise exc.InvalidRequestError(
- "Class %r does not have a __table__ or __tablename__ "
- "specified and does not inherit from an existing "
- "table-mapped class." % cls
- )
- elif self.inherits:
- inherited_mapper = _declared_mapping_info(self.inherits)
- inherited_table = inherited_mapper.local_table
- inherited_persist_selectable = inherited_mapper.persist_selectable
- if table is None:
- # single table inheritance.
- # ensure no table args
- if table_args:
- raise exc.ArgumentError(
- "Can't place __table_args__ on an inherited class "
- "with no table."
- )
- # add any columns declared here to the inherited table.
- for c in declared_columns:
- if c.name in inherited_table.c:
- if inherited_table.c[c.name] is c:
- continue
- raise exc.ArgumentError(
- "Column '%s' on class %s conflicts with "
- "existing column '%s'"
- % (c, cls, inherited_table.c[c.name])
- )
- if c.primary_key:
- raise exc.ArgumentError(
- "Can't place primary key columns on an inherited "
- "class with no table."
- )
- inherited_table.append_column(c)
- if (
- inherited_persist_selectable is not None
- and inherited_persist_selectable is not inherited_table
- ):
- inherited_persist_selectable._refresh_for_new_column(c)
- def _prepare_mapper_arguments(self, mapper_kw):
- properties = self.properties
- if self.mapper_args_fn:
- mapper_args = self.mapper_args_fn()
- else:
- mapper_args = {}
- if mapper_kw:
- mapper_args.update(mapper_kw)
- if "properties" in mapper_args:
- properties = dict(properties)
- properties.update(mapper_args["properties"])
- # make sure that column copies are used rather
- # than the original columns from any mixins
- for k in ("version_id_col", "polymorphic_on"):
- if k in mapper_args:
- v = mapper_args[k]
- mapper_args[k] = self.column_copies.get(v, v)
- if "inherits" in mapper_args:
- inherits_arg = mapper_args["inherits"]
- if isinstance(inherits_arg, mapper):
- inherits_arg = inherits_arg.class_
- if inherits_arg is not self.inherits:
- raise exc.InvalidRequestError(
- "mapper inherits argument given for non-inheriting "
- "class %s" % (mapper_args["inherits"])
- )
- if self.inherits:
- mapper_args["inherits"] = self.inherits
- if self.inherits and not mapper_args.get("concrete", False):
- # single or joined inheritance
- # exclude any cols on the inherited table which are
- # not mapped on the parent class, to avoid
- # mapping columns specific to sibling/nephew classes
- inherited_mapper = _declared_mapping_info(self.inherits)
- inherited_table = inherited_mapper.local_table
- if "exclude_properties" not in mapper_args:
- mapper_args["exclude_properties"] = exclude_properties = set(
- [
- c.key
- for c in inherited_table.c
- if c not in inherited_mapper._columntoproperty
- ]
- ).union(inherited_mapper.exclude_properties or ())
- exclude_properties.difference_update(
- [c.key for c in self.declared_columns]
- )
- # look through columns in the current mapper that
- # are keyed to a propname different than the colname
- # (if names were the same, we'd have popped it out above,
- # in which case the mapper makes this combination).
- # See if the superclass has a similar column property.
- # If so, join them together.
- for k, col in list(properties.items()):
- if not isinstance(col, expression.ColumnElement):
- continue
- if k in inherited_mapper._props:
- p = inherited_mapper._props[k]
- if isinstance(p, ColumnProperty):
- # note here we place the subclass column
- # first. See [ticket:1892] for background.
- properties[k] = [col] + p.columns
- result_mapper_args = mapper_args.copy()
- result_mapper_args["properties"] = properties
- self.mapper_args = result_mapper_args
- def map(self, mapper_kw=util.EMPTY_DICT):
- self._prepare_mapper_arguments(mapper_kw)
- if hasattr(self.cls, "__mapper_cls__"):
- mapper_cls = util.unbound_method_to_callable(
- self.cls.__mapper_cls__
- )
- else:
- mapper_cls = mapper
- return self.set_cls_attribute(
- "__mapper__",
- mapper_cls(self.cls, self.local_table, **self.mapper_args),
- )
- @util.preload_module("sqlalchemy.orm.decl_api")
- def _as_dc_declaredattr(field_metadata, sa_dataclass_metadata_key):
- # wrap lambdas inside dataclass fields inside an ad-hoc declared_attr.
- # we can't write it because field.metadata is immutable :( so we have
- # to go through extra trouble to compare these
- decl_api = util.preloaded.orm_decl_api
- obj = field_metadata[sa_dataclass_metadata_key]
- if callable(obj) and not isinstance(obj, decl_api.declared_attr):
- return decl_api.declared_attr(obj)
- else:
- return obj
- class _DeferredMapperConfig(_ClassScanMapperConfig):
- _configs = util.OrderedDict()
- def _early_mapping(self, mapper_kw):
- pass
- @property
- def cls(self):
- return self._cls()
- @cls.setter
- def cls(self, class_):
- self._cls = weakref.ref(class_, self._remove_config_cls)
- self._configs[self._cls] = self
- @classmethod
- def _remove_config_cls(cls, ref):
- cls._configs.pop(ref, None)
- @classmethod
- def has_cls(cls, class_):
- # 2.6 fails on weakref if class_ is an old style class
- return isinstance(class_, type) and weakref.ref(class_) in cls._configs
- @classmethod
- def raise_unmapped_for_cls(cls, class_):
- if hasattr(class_, "_sa_raise_deferred_config"):
- class_._sa_raise_deferred_config()
- raise orm_exc.UnmappedClassError(
- class_,
- msg="Class %s has a deferred mapping on it. It is not yet "
- "usable as a mapped class." % orm_exc._safe_cls_name(class_),
- )
- @classmethod
- def config_for_cls(cls, class_):
- return cls._configs[weakref.ref(class_)]
- @classmethod
- def classes_for_base(cls, base_cls, sort=True):
- classes_for_base = [
- m
- for m, cls_ in [(m, m.cls) for m in cls._configs.values()]
- if cls_ is not None and issubclass(cls_, base_cls)
- ]
- if not sort:
- return classes_for_base
- all_m_by_cls = dict((m.cls, m) for m in classes_for_base)
- tuples = []
- for m_cls in all_m_by_cls:
- tuples.extend(
- (all_m_by_cls[base_cls], all_m_by_cls[m_cls])
- for base_cls in m_cls.__bases__
- if base_cls in all_m_by_cls
- )
- return list(topological.sort(tuples, classes_for_base))
- def map(self, mapper_kw=util.EMPTY_DICT):
- self._configs.pop(self._cls, None)
- return super(_DeferredMapperConfig, self).map(mapper_kw)
- def _add_attribute(cls, key, value):
- """add an attribute to an existing declarative class.
- This runs through the logic to determine MapperProperty,
- adds it to the Mapper, adds a column to the mapped Table, etc.
- """
- if "__mapper__" in cls.__dict__:
- if isinstance(value, Column):
- _undefer_column_name(key, value)
- cls.__table__.append_column(value, replace_existing=True)
- cls.__mapper__.add_property(key, value)
- elif isinstance(value, ColumnProperty):
- for col in value.columns:
- if isinstance(col, Column) and col.table is None:
- _undefer_column_name(key, col)
- cls.__table__.append_column(col, replace_existing=True)
- cls.__mapper__.add_property(key, value)
- elif isinstance(value, MapperProperty):
- cls.__mapper__.add_property(key, value)
- elif isinstance(value, QueryableAttribute) and value.key != key:
- # detect a QueryableAttribute that's already mapped being
- # assigned elsewhere in userland, turn into a synonym()
- value = SynonymProperty(value.key)
- cls.__mapper__.add_property(key, value)
- else:
- type.__setattr__(cls, key, value)
- cls.__mapper__._expire_memoizations()
- else:
- type.__setattr__(cls, key, value)
- def _del_attribute(cls, key):
- if (
- "__mapper__" in cls.__dict__
- and key in cls.__dict__
- and not cls.__mapper__._dispose_called
- ):
- value = cls.__dict__[key]
- if isinstance(
- value, (Column, ColumnProperty, MapperProperty, QueryableAttribute)
- ):
- raise NotImplementedError(
- "Can't un-map individual mapped attributes on a mapped class."
- )
- else:
- type.__delattr__(cls, key)
- cls.__mapper__._expire_memoizations()
- else:
- type.__delattr__(cls, key)
- def _declarative_constructor(self, **kwargs):
- """A simple constructor that allows initialization from kwargs.
- Sets attributes on the constructed instance using the names and
- values in ``kwargs``.
- Only keys that are present as
- attributes of the instance's class are allowed. These could be,
- for example, any mapped columns or relationships.
- """
- cls_ = type(self)
- for k in kwargs:
- if not hasattr(cls_, k):
- raise TypeError(
- "%r is an invalid keyword argument for %s" % (k, cls_.__name__)
- )
- setattr(self, k, kwargs[k])
- _declarative_constructor.__name__ = "__init__"
- def _undefer_column_name(key, column):
- if column.key is None:
- column.key = key
- if column.name is None:
- column.name = key
|