123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905 |
- # engine/default.py
- # Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
- # <see AUTHORS file>
- #
- # This module is part of SQLAlchemy and is released under
- # the MIT License: https://www.opensource.org/licenses/mit-license.php
- """Default implementations of per-dialect sqlalchemy.engine classes.
- These are semi-private implementation classes which are only of importance
- to database dialect authors; dialects will usually use the classes here
- as the base class for their own corresponding classes.
- """
- import codecs
- import functools
- import random
- import re
- import weakref
- from . import characteristics
- from . import cursor as _cursor
- from . import interfaces
- from .base import Connection
- from .. import event
- from .. import exc
- from .. import pool
- from .. import processors
- from .. import types as sqltypes
- from .. import util
- from ..sql import compiler
- from ..sql import expression
- from ..sql.elements import quoted_name
- AUTOCOMMIT_REGEXP = re.compile(
- r"\s*(?:UPDATE|INSERT|CREATE|DELETE|DROP|ALTER)", re.I | re.UNICODE
- )
- # When we're handed literal SQL, ensure it's a SELECT query
- SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE)
- CACHE_HIT = util.symbol("CACHE_HIT")
- CACHE_MISS = util.symbol("CACHE_MISS")
- CACHING_DISABLED = util.symbol("CACHING_DISABLED")
- NO_CACHE_KEY = util.symbol("NO_CACHE_KEY")
- NO_DIALECT_SUPPORT = util.symbol("NO_DIALECT_SUPPORT")
- class DefaultDialect(interfaces.Dialect):
- """Default implementation of Dialect"""
- statement_compiler = compiler.SQLCompiler
- ddl_compiler = compiler.DDLCompiler
- type_compiler = compiler.GenericTypeCompiler
- preparer = compiler.IdentifierPreparer
- supports_alter = True
- supports_comments = False
- inline_comments = False
- use_setinputsizes = False
- supports_statement_cache = True
- # the first value we'd get for an autoincrement
- # column.
- default_sequence_base = 1
- # most DBAPIs happy with this for execute().
- # not cx_oracle.
- execute_sequence_format = tuple
- supports_schemas = True
- supports_views = True
- supports_sequences = False
- sequences_optional = False
- preexecute_autoincrement_sequences = False
- supports_identity_columns = False
- postfetch_lastrowid = True
- implicit_returning = False
- full_returning = False
- insert_executemany_returning = False
- cte_follows_insert = False
- supports_native_enum = False
- supports_native_boolean = False
- non_native_boolean_check_constraint = True
- supports_simple_order_by_label = True
- tuple_in_values = False
- connection_characteristics = util.immutabledict(
- {"isolation_level": characteristics.IsolationLevelCharacteristic()}
- )
- engine_config_types = util.immutabledict(
- [
- ("convert_unicode", util.bool_or_str("force")),
- ("pool_timeout", util.asint),
- ("echo", util.bool_or_str("debug")),
- ("echo_pool", util.bool_or_str("debug")),
- ("pool_recycle", util.asint),
- ("pool_size", util.asint),
- ("max_overflow", util.asint),
- ("future", util.asbool),
- ]
- )
- # if the NUMERIC type
- # returns decimal.Decimal.
- # *not* the FLOAT type however.
- supports_native_decimal = False
- if util.py3k:
- supports_unicode_statements = True
- supports_unicode_binds = True
- returns_unicode_strings = sqltypes.String.RETURNS_UNICODE
- description_encoding = None
- else:
- supports_unicode_statements = False
- supports_unicode_binds = False
- returns_unicode_strings = sqltypes.String.RETURNS_UNKNOWN
- description_encoding = "use_encoding"
- name = "default"
- # length at which to truncate
- # any identifier.
- max_identifier_length = 9999
- _user_defined_max_identifier_length = None
- isolation_level = None
- # sub-categories of max_identifier_length.
- # currently these accommodate for MySQL which allows alias names
- # of 255 but DDL names only of 64.
- max_index_name_length = None
- max_constraint_name_length = None
- supports_sane_rowcount = True
- supports_sane_multi_rowcount = True
- colspecs = {}
- default_paramstyle = "named"
- supports_default_values = False
- """dialect supports INSERT... DEFAULT VALUES syntax"""
- supports_default_metavalue = False
- """dialect supports INSERT... VALUES (DEFAULT) syntax"""
- # not sure if this is a real thing but the compiler will deliver it
- # if this is the only flag enabled.
- supports_empty_insert = True
- """dialect supports INSERT () VALUES ()"""
- supports_multivalues_insert = False
- supports_is_distinct_from = True
- supports_server_side_cursors = False
- server_side_cursors = False
- # extra record-level locking features (#4860)
- supports_for_update_of = False
- server_version_info = None
- default_schema_name = None
- construct_arguments = None
- """Optional set of argument specifiers for various SQLAlchemy
- constructs, typically schema items.
- To implement, establish as a series of tuples, as in::
- construct_arguments = [
- (schema.Index, {
- "using": False,
- "where": None,
- "ops": None
- })
- ]
- If the above construct is established on the PostgreSQL dialect,
- the :class:`.Index` construct will now accept the keyword arguments
- ``postgresql_using``, ``postgresql_where``, nad ``postgresql_ops``.
- Any other argument specified to the constructor of :class:`.Index`
- which is prefixed with ``postgresql_`` will raise :class:`.ArgumentError`.
- A dialect which does not include a ``construct_arguments`` member will
- not participate in the argument validation system. For such a dialect,
- any argument name is accepted by all participating constructs, within
- the namespace of arguments prefixed with that dialect name. The rationale
- here is so that third-party dialects that haven't yet implemented this
- feature continue to function in the old way.
- .. versionadded:: 0.9.2
- .. seealso::
- :class:`.DialectKWArgs` - implementing base class which consumes
- :attr:`.DefaultDialect.construct_arguments`
- """
- # indicates symbol names are
- # UPPERCASEd if they are case insensitive
- # within the database.
- # if this is True, the methods normalize_name()
- # and denormalize_name() must be provided.
- requires_name_normalize = False
- reflection_options = ()
- dbapi_exception_translation_map = util.immutabledict()
- """mapping used in the extremely unusual case that a DBAPI's
- published exceptions don't actually have the __name__ that they
- are linked towards.
- .. versionadded:: 1.0.5
- """
- is_async = False
- CACHE_HIT = CACHE_HIT
- CACHE_MISS = CACHE_MISS
- CACHING_DISABLED = CACHING_DISABLED
- NO_CACHE_KEY = NO_CACHE_KEY
- NO_DIALECT_SUPPORT = NO_DIALECT_SUPPORT
- @util.deprecated_params(
- convert_unicode=(
- "1.3",
- "The :paramref:`_sa.create_engine.convert_unicode` parameter "
- "and corresponding dialect-level parameters are deprecated, "
- "and will be removed in a future release. Modern DBAPIs support "
- "Python Unicode natively and this parameter is unnecessary.",
- ),
- empty_in_strategy=(
- "1.4",
- "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is "
- "deprecated, and no longer has any effect. All IN expressions "
- "are now rendered using "
- 'the "expanding parameter" strategy which renders a set of bound'
- 'expressions, or an "empty set" SELECT, at statement execution'
- "time.",
- ),
- case_sensitive=(
- "1.4",
- "The :paramref:`_sa.create_engine.case_sensitive` parameter "
- "is deprecated and will be removed in a future release. "
- "Applications should work with result column names in a case "
- "sensitive fashion.",
- ),
- server_side_cursors=(
- "1.4",
- "The :paramref:`_sa.create_engine.server_side_cursors` parameter "
- "is deprecated and will be removed in a future release. Please "
- "use the "
- ":paramref:`_engine.Connection.execution_options.stream_results` "
- "parameter.",
- ),
- )
- def __init__(
- self,
- convert_unicode=False,
- encoding="utf-8",
- paramstyle=None,
- dbapi=None,
- implicit_returning=None,
- case_sensitive=True,
- supports_native_boolean=None,
- max_identifier_length=None,
- label_length=None,
- # int() is because the @deprecated_params decorator cannot accommodate
- # the direct reference to the "NO_LINTING" object
- compiler_linting=int(compiler.NO_LINTING),
- server_side_cursors=False,
- **kwargs
- ):
- if not getattr(self, "ported_sqla_06", True):
- util.warn(
- "The %s dialect is not yet ported to the 0.6 format"
- % self.name
- )
- if server_side_cursors:
- if not self.supports_server_side_cursors:
- raise exc.ArgumentError(
- "Dialect %s does not support server side cursors" % self
- )
- else:
- self.server_side_cursors = True
- self.convert_unicode = convert_unicode
- self.encoding = encoding
- self.positional = False
- self._ischema = None
- self.dbapi = dbapi
- if paramstyle is not None:
- self.paramstyle = paramstyle
- elif self.dbapi is not None:
- self.paramstyle = self.dbapi.paramstyle
- else:
- self.paramstyle = self.default_paramstyle
- if implicit_returning is not None:
- self.implicit_returning = implicit_returning
- self.positional = self.paramstyle in ("qmark", "format", "numeric")
- self.identifier_preparer = self.preparer(self)
- self.type_compiler = self.type_compiler(self)
- if supports_native_boolean is not None:
- self.supports_native_boolean = supports_native_boolean
- self.case_sensitive = case_sensitive
- self._user_defined_max_identifier_length = max_identifier_length
- if self._user_defined_max_identifier_length:
- self.max_identifier_length = (
- self._user_defined_max_identifier_length
- )
- self.label_length = label_length
- self.compiler_linting = compiler_linting
- if self.description_encoding == "use_encoding":
- self._description_decoder = (
- processors.to_unicode_processor_factory
- )(encoding)
- elif self.description_encoding is not None:
- self._description_decoder = (
- processors.to_unicode_processor_factory
- )(self.description_encoding)
- self._encoder = codecs.getencoder(self.encoding)
- self._decoder = processors.to_unicode_processor_factory(self.encoding)
- def _ensure_has_table_connection(self, arg):
- if not isinstance(arg, Connection):
- raise exc.ArgumentError(
- "The argument passed to Dialect.has_table() should be a "
- "%s, got %s. "
- "Additionally, the Dialect.has_table() method is for "
- "internal dialect "
- "use only; please use "
- "``inspect(some_engine).has_table(<tablename>>)`` "
- "for public API use." % (Connection, type(arg))
- )
- @util.memoized_property
- def _supports_statement_cache(self):
- ssc = self.__class__.__dict__.get("supports_statement_cache", None)
- if ssc is None:
- util.warn(
- "Dialect %s:%s will not make use of SQL compilation caching "
- "as it does not set the 'supports_statement_cache' attribute "
- "to ``True``. This can have "
- "significant performance implications including some "
- "performance degradations in comparison to prior SQLAlchemy "
- "versions. Dialect maintainers should seek to set this "
- "attribute to True after appropriate development and testing "
- "for SQLAlchemy 1.4 caching support. Alternatively, this "
- "attribute may be set to False which will disable this "
- "warning." % (self.name, self.driver),
- code="cprf",
- )
- return bool(ssc)
- @util.memoized_property
- def _type_memos(self):
- return weakref.WeakKeyDictionary()
- @property
- def dialect_description(self):
- return self.name + "+" + self.driver
- @property
- def supports_sane_rowcount_returning(self):
- """True if this dialect supports sane rowcount even if RETURNING is
- in use.
- For dialects that don't support RETURNING, this is synonymous with
- ``supports_sane_rowcount``.
- """
- return self.supports_sane_rowcount
- @classmethod
- def get_pool_class(cls, url):
- return getattr(cls, "poolclass", pool.QueuePool)
- def get_dialect_pool_class(self, url):
- return self.get_pool_class(url)
- @classmethod
- def load_provisioning(cls):
- package = ".".join(cls.__module__.split(".")[0:-1])
- try:
- __import__(package + ".provision")
- except ImportError:
- pass
- def initialize(self, connection):
- try:
- self.server_version_info = self._get_server_version_info(
- connection
- )
- except NotImplementedError:
- self.server_version_info = None
- try:
- self.default_schema_name = self._get_default_schema_name(
- connection
- )
- except NotImplementedError:
- self.default_schema_name = None
- try:
- self.default_isolation_level = self.get_default_isolation_level(
- connection.connection
- )
- except NotImplementedError:
- self.default_isolation_level = None
- if self.returns_unicode_strings is sqltypes.String.RETURNS_UNKNOWN:
- if util.py3k:
- raise exc.InvalidRequestError(
- "RETURNS_UNKNOWN is unsupported in Python 3"
- )
- self.returns_unicode_strings = self._check_unicode_returns(
- connection
- )
- if (
- self.description_encoding is not None
- and self._check_unicode_description(connection)
- ):
- self._description_decoder = self.description_encoding = None
- if not self._user_defined_max_identifier_length:
- max_ident_length = self._check_max_identifier_length(connection)
- if max_ident_length:
- self.max_identifier_length = max_ident_length
- if (
- self.label_length
- and self.label_length > self.max_identifier_length
- ):
- raise exc.ArgumentError(
- "Label length of %d is greater than this dialect's"
- " maximum identifier length of %d"
- % (self.label_length, self.max_identifier_length)
- )
- def on_connect(self):
- # inherits the docstring from interfaces.Dialect.on_connect
- return None
- def _check_max_identifier_length(self, connection):
- """Perform a connection / server version specific check to determine
- the max_identifier_length.
- If the dialect's class level max_identifier_length should be used,
- can return None.
- .. versionadded:: 1.3.9
- """
- return None
- def get_default_isolation_level(self, dbapi_conn):
- """Given a DBAPI connection, return its isolation level, or
- a default isolation level if one cannot be retrieved.
- May be overridden by subclasses in order to provide a
- "fallback" isolation level for databases that cannot reliably
- retrieve the actual isolation level.
- By default, calls the :meth:`_engine.Interfaces.get_isolation_level`
- method, propagating any exceptions raised.
- .. versionadded:: 1.3.22
- """
- return self.get_isolation_level(dbapi_conn)
- def _check_unicode_returns(self, connection, additional_tests=None):
- # this now runs in py2k only and will be removed in 2.0; disabled for
- # Python 3 in all cases under #5315
- if util.py2k and not self.supports_unicode_statements:
- cast_to = util.binary_type
- else:
- cast_to = util.text_type
- if self.positional:
- parameters = self.execute_sequence_format()
- else:
- parameters = {}
- def check_unicode(test):
- statement = cast_to(expression.select(test).compile(dialect=self))
- try:
- cursor = connection.connection.cursor()
- connection._cursor_execute(cursor, statement, parameters)
- row = cursor.fetchone()
- cursor.close()
- except exc.DBAPIError as de:
- # note that _cursor_execute() will have closed the cursor
- # if an exception is thrown.
- util.warn(
- "Exception attempting to "
- "detect unicode returns: %r" % de
- )
- return False
- else:
- return isinstance(row[0], util.text_type)
- tests = [
- # detect plain VARCHAR
- expression.cast(
- expression.literal_column("'test plain returns'"),
- sqltypes.VARCHAR(60),
- ),
- # detect if there's an NVARCHAR type with different behavior
- # available
- expression.cast(
- expression.literal_column("'test unicode returns'"),
- sqltypes.Unicode(60),
- ),
- ]
- if additional_tests:
- tests += additional_tests
- results = {check_unicode(test) for test in tests}
- if results.issuperset([True, False]):
- return sqltypes.String.RETURNS_CONDITIONAL
- else:
- return (
- sqltypes.String.RETURNS_UNICODE
- if results == {True}
- else sqltypes.String.RETURNS_BYTES
- )
- def _check_unicode_description(self, connection):
- # all DBAPIs on Py2K return cursor.description as encoded
- if util.py2k and not self.supports_unicode_statements:
- cast_to = util.binary_type
- else:
- cast_to = util.text_type
- cursor = connection.connection.cursor()
- try:
- cursor.execute(
- cast_to(
- expression.select(
- expression.literal_column("'x'").label("some_label")
- ).compile(dialect=self)
- )
- )
- return isinstance(cursor.description[0][0], util.text_type)
- finally:
- cursor.close()
- def type_descriptor(self, typeobj):
- """Provide a database-specific :class:`.TypeEngine` object, given
- the generic object which comes from the types module.
- This method looks for a dictionary called
- ``colspecs`` as a class or instance-level variable,
- and passes on to :func:`_types.adapt_type`.
- """
- return sqltypes.adapt_type(typeobj, self.colspecs)
- def has_index(self, connection, table_name, index_name, schema=None):
- if not self.has_table(connection, table_name, schema=schema):
- return False
- for idx in self.get_indexes(connection, table_name, schema=schema):
- if idx["name"] == index_name:
- return True
- else:
- return False
- def validate_identifier(self, ident):
- if len(ident) > self.max_identifier_length:
- raise exc.IdentifierError(
- "Identifier '%s' exceeds maximum length of %d characters"
- % (ident, self.max_identifier_length)
- )
- def connect(self, *cargs, **cparams):
- # inherits the docstring from interfaces.Dialect.connect
- return self.dbapi.connect(*cargs, **cparams)
- def create_connect_args(self, url):
- # inherits the docstring from interfaces.Dialect.create_connect_args
- opts = url.translate_connect_args()
- opts.update(url.query)
- return [[], opts]
- def set_engine_execution_options(self, engine, opts):
- supported_names = set(self.connection_characteristics).intersection(
- opts
- )
- if supported_names:
- characteristics = util.immutabledict(
- (name, opts[name]) for name in supported_names
- )
- @event.listens_for(engine, "engine_connect")
- def set_connection_characteristics(connection, branch):
- if not branch:
- self._set_connection_characteristics(
- connection, characteristics
- )
- def set_connection_execution_options(self, connection, opts):
- supported_names = set(self.connection_characteristics).intersection(
- opts
- )
- if supported_names:
- characteristics = util.immutabledict(
- (name, opts[name]) for name in supported_names
- )
- self._set_connection_characteristics(connection, characteristics)
- def _set_connection_characteristics(self, connection, characteristics):
- characteristic_values = [
- (name, self.connection_characteristics[name], value)
- for name, value in characteristics.items()
- ]
- if connection.in_transaction():
- trans_objs = [
- (name, obj)
- for name, obj, value in characteristic_values
- if obj.transactional
- ]
- if trans_objs:
- if connection._is_future:
- raise exc.InvalidRequestError(
- "This connection has already initialized a SQLAlchemy "
- "Transaction() object via begin() or autobegin; "
- "%s may not be altered unless rollback() or commit() "
- "is called first."
- % (", ".join(name for name, obj in trans_objs))
- )
- else:
- util.warn(
- "Connection is already established with a "
- "Transaction; "
- "setting %s may implicitly rollback or "
- "commit "
- "the existing transaction, or have no effect until "
- "next transaction"
- % (", ".join(name for name, obj in trans_objs))
- )
- dbapi_connection = connection.connection.dbapi_connection
- for name, characteristic, value in characteristic_values:
- characteristic.set_characteristic(self, dbapi_connection, value)
- connection.connection._connection_record.finalize_callback.append(
- functools.partial(self._reset_characteristics, characteristics)
- )
- def _reset_characteristics(self, characteristics, dbapi_connection):
- for characteristic_name in characteristics:
- characteristic = self.connection_characteristics[
- characteristic_name
- ]
- characteristic.reset_characteristic(self, dbapi_connection)
- def do_begin(self, dbapi_connection):
- pass
- def do_rollback(self, dbapi_connection):
- dbapi_connection.rollback()
- def do_commit(self, dbapi_connection):
- dbapi_connection.commit()
- def do_close(self, dbapi_connection):
- dbapi_connection.close()
- @util.memoized_property
- def _dialect_specific_select_one(self):
- return str(expression.select(1).compile(dialect=self))
- def do_ping(self, dbapi_connection):
- cursor = None
- try:
- cursor = dbapi_connection.cursor()
- try:
- cursor.execute(self._dialect_specific_select_one)
- finally:
- cursor.close()
- except self.dbapi.Error as err:
- if self.is_disconnect(err, dbapi_connection, cursor):
- return False
- else:
- raise
- else:
- return True
- def create_xid(self):
- """Create a random two-phase transaction ID.
- This id will be passed to do_begin_twophase(), do_rollback_twophase(),
- do_commit_twophase(). Its format is unspecified.
- """
- return "_sa_%032x" % random.randint(0, 2 ** 128)
- def do_savepoint(self, connection, name):
- connection.execute(expression.SavepointClause(name))
- def do_rollback_to_savepoint(self, connection, name):
- connection.execute(expression.RollbackToSavepointClause(name))
- def do_release_savepoint(self, connection, name):
- connection.execute(expression.ReleaseSavepointClause(name))
- def do_executemany(self, cursor, statement, parameters, context=None):
- cursor.executemany(statement, parameters)
- def do_execute(self, cursor, statement, parameters, context=None):
- cursor.execute(statement, parameters)
- def do_execute_no_params(self, cursor, statement, context=None):
- cursor.execute(statement)
- def is_disconnect(self, e, connection, cursor):
- return False
- def reset_isolation_level(self, dbapi_conn):
- # default_isolation_level is read from the first connection
- # after the initial set of 'isolation_level', if any, so is
- # the configured default of this dialect.
- self.set_isolation_level(dbapi_conn, self.default_isolation_level)
- def normalize_name(self, name):
- if name is None:
- return None
- if util.py2k:
- if isinstance(name, str):
- name = name.decode(self.encoding)
- name_lower = name.lower()
- name_upper = name.upper()
- if name_upper == name_lower:
- # name has no upper/lower conversion, e.g. non-european characters.
- # return unchanged
- return name
- elif name_upper == name and not (
- self.identifier_preparer._requires_quotes
- )(name_lower):
- # name is all uppercase and doesn't require quoting; normalize
- # to all lower case
- return name_lower
- elif name_lower == name:
- # name is all lower case, which if denormalized means we need to
- # force quoting on it
- return quoted_name(name, quote=True)
- else:
- # name is mixed case, means it will be quoted in SQL when used
- # later, no normalizes
- return name
- def denormalize_name(self, name):
- if name is None:
- return None
- name_lower = name.lower()
- name_upper = name.upper()
- if name_upper == name_lower:
- # name has no upper/lower conversion, e.g. non-european characters.
- # return unchanged
- return name
- elif name_lower == name and not (
- self.identifier_preparer._requires_quotes
- )(name_lower):
- name = name_upper
- if util.py2k:
- if not self.supports_unicode_binds:
- name = name.encode(self.encoding)
- else:
- name = unicode(name) # noqa
- return name
- def get_driver_connection(self, connection):
- return connection
- class _RendersLiteral(object):
- def literal_processor(self, dialect):
- def process(value):
- return "'%s'" % value
- return process
- class _StrDateTime(_RendersLiteral, sqltypes.DateTime):
- pass
- class _StrDate(_RendersLiteral, sqltypes.Date):
- pass
- class _StrTime(_RendersLiteral, sqltypes.Time):
- pass
- class StrCompileDialect(DefaultDialect):
- statement_compiler = compiler.StrSQLCompiler
- ddl_compiler = compiler.DDLCompiler
- type_compiler = compiler.StrSQLTypeCompiler
- preparer = compiler.IdentifierPreparer
- supports_statement_cache = True
- supports_identity_columns = True
- supports_sequences = True
- sequences_optional = True
- preexecute_autoincrement_sequences = False
- implicit_returning = False
- supports_native_boolean = True
- supports_multivalues_insert = True
- supports_simple_order_by_label = True
- colspecs = {
- sqltypes.DateTime: _StrDateTime,
- sqltypes.Date: _StrDate,
- sqltypes.Time: _StrTime,
- }
- class DefaultExecutionContext(interfaces.ExecutionContext):
- isinsert = False
- isupdate = False
- isdelete = False
- is_crud = False
- is_text = False
- isddl = False
- executemany = False
- compiled = None
- statement = None
- result_column_struct = None
- returned_default_rows = None
- execution_options = util.immutabledict()
- include_set_input_sizes = None
- exclude_set_input_sizes = None
- cursor_fetch_strategy = _cursor._DEFAULT_FETCH
- cache_stats = None
- invoked_statement = None
- _is_implicit_returning = False
- _is_explicit_returning = False
- _is_future_result = False
- _is_server_side = False
- _soft_closed = False
- # a hook for SQLite's translation of
- # result column names
- # NOTE: pyhive is using this hook, can't remove it :(
- _translate_colname = None
- _expanded_parameters = util.immutabledict()
- cache_hit = NO_CACHE_KEY
- @classmethod
- def _init_ddl(
- cls,
- dialect,
- connection,
- dbapi_connection,
- execution_options,
- compiled_ddl,
- ):
- """Initialize execution context for a DDLElement construct."""
- self = cls.__new__(cls)
- self.root_connection = connection
- self._dbapi_connection = dbapi_connection
- self.dialect = connection.dialect
- self.compiled = compiled = compiled_ddl
- self.isddl = True
- self.execution_options = execution_options
- self._is_future_result = (
- connection._is_future
- or self.execution_options.get("future_result", False)
- )
- self.unicode_statement = util.text_type(compiled)
- if compiled.schema_translate_map:
- schema_translate_map = self.execution_options.get(
- "schema_translate_map", {}
- )
- rst = compiled.preparer._render_schema_translates
- self.unicode_statement = rst(
- self.unicode_statement, schema_translate_map
- )
- if not dialect.supports_unicode_statements:
- self.statement = dialect._encoder(self.unicode_statement)[0]
- else:
- self.statement = self.unicode_statement
- self.cursor = self.create_cursor()
- self.compiled_parameters = []
- if dialect.positional:
- self.parameters = [dialect.execute_sequence_format()]
- else:
- self.parameters = [{}]
- return self
- @classmethod
- def _init_compiled(
- cls,
- dialect,
- connection,
- dbapi_connection,
- execution_options,
- compiled,
- parameters,
- invoked_statement,
- extracted_parameters,
- cache_hit=CACHING_DISABLED,
- ):
- """Initialize execution context for a Compiled construct."""
- self = cls.__new__(cls)
- self.root_connection = connection
- self._dbapi_connection = dbapi_connection
- self.dialect = connection.dialect
- self.extracted_parameters = extracted_parameters
- self.invoked_statement = invoked_statement
- self.compiled = compiled
- self.cache_hit = cache_hit
- self.execution_options = execution_options
- self._is_future_result = (
- connection._is_future
- or self.execution_options.get("future_result", False)
- )
- self.result_column_struct = (
- compiled._result_columns,
- compiled._ordered_columns,
- compiled._textual_ordered_columns,
- compiled._loose_column_name_matching,
- )
- self.isinsert = compiled.isinsert
- self.isupdate = compiled.isupdate
- self.isdelete = compiled.isdelete
- self.is_text = compiled.isplaintext
- if self.isinsert or self.isupdate or self.isdelete:
- self.is_crud = True
- self._is_explicit_returning = bool(compiled.statement._returning)
- self._is_implicit_returning = bool(
- compiled.returning and not compiled.statement._returning
- )
- if not parameters:
- self.compiled_parameters = [
- compiled.construct_params(
- extracted_parameters=extracted_parameters
- )
- ]
- else:
- self.compiled_parameters = [
- compiled.construct_params(
- m,
- _group_number=grp,
- extracted_parameters=extracted_parameters,
- )
- for grp, m in enumerate(parameters)
- ]
- self.executemany = len(parameters) > 1
- # this must occur before create_cursor() since the statement
- # has to be regexed in some cases for server side cursor
- if util.py2k:
- self.unicode_statement = util.text_type(compiled.string)
- else:
- self.unicode_statement = compiled.string
- self.cursor = self.create_cursor()
- if self.compiled.insert_prefetch or self.compiled.update_prefetch:
- if self.executemany:
- self._process_executemany_defaults()
- else:
- self._process_executesingle_defaults()
- processors = compiled._bind_processors
- if compiled.literal_execute_params or compiled.post_compile_params:
- if self.executemany:
- raise exc.InvalidRequestError(
- "'literal_execute' or 'expanding' parameters can't be "
- "used with executemany()"
- )
- expanded_state = compiled._process_parameters_for_postcompile(
- self.compiled_parameters[0]
- )
- # re-assign self.unicode_statement
- self.unicode_statement = expanded_state.statement
- # used by set_input_sizes() which is needed for Oracle
- self._expanded_parameters = expanded_state.parameter_expansion
- processors = dict(processors)
- processors.update(expanded_state.processors)
- positiontup = expanded_state.positiontup
- elif compiled.positional:
- positiontup = self.compiled.positiontup
- if compiled.schema_translate_map:
- schema_translate_map = self.execution_options.get(
- "schema_translate_map", {}
- )
- rst = compiled.preparer._render_schema_translates
- self.unicode_statement = rst(
- self.unicode_statement, schema_translate_map
- )
- # final self.unicode_statement is now assigned, encode if needed
- # by dialect
- if not dialect.supports_unicode_statements:
- self.statement = self.unicode_statement.encode(
- self.dialect.encoding
- )
- else:
- self.statement = self.unicode_statement
- # Convert the dictionary of bind parameter values
- # into a dict or list to be sent to the DBAPI's
- # execute() or executemany() method.
- parameters = []
- if compiled.positional:
- for compiled_params in self.compiled_parameters:
- param = [
- processors[key](compiled_params[key])
- if key in processors
- else compiled_params[key]
- for key in positiontup
- ]
- parameters.append(dialect.execute_sequence_format(param))
- else:
- encode = not dialect.supports_unicode_statements
- if encode:
- encoder = dialect._encoder
- for compiled_params in self.compiled_parameters:
- if encode:
- param = {
- encoder(key)[0]: processors[key](compiled_params[key])
- if key in processors
- else compiled_params[key]
- for key in compiled_params
- }
- else:
- param = {
- key: processors[key](compiled_params[key])
- if key in processors
- else compiled_params[key]
- for key in compiled_params
- }
- parameters.append(param)
- self.parameters = dialect.execute_sequence_format(parameters)
- return self
- @classmethod
- def _init_statement(
- cls,
- dialect,
- connection,
- dbapi_connection,
- execution_options,
- statement,
- parameters,
- ):
- """Initialize execution context for a string SQL statement."""
- self = cls.__new__(cls)
- self.root_connection = connection
- self._dbapi_connection = dbapi_connection
- self.dialect = connection.dialect
- self.is_text = True
- self.execution_options = execution_options
- self._is_future_result = (
- connection._is_future
- or self.execution_options.get("future_result", False)
- )
- if not parameters:
- if self.dialect.positional:
- self.parameters = [dialect.execute_sequence_format()]
- else:
- self.parameters = [{}]
- elif isinstance(parameters[0], dialect.execute_sequence_format):
- self.parameters = parameters
- elif isinstance(parameters[0], dict):
- if dialect.supports_unicode_statements:
- self.parameters = parameters
- else:
- self.parameters = [
- {dialect._encoder(k)[0]: d[k] for k in d}
- for d in parameters
- ] or [{}]
- else:
- self.parameters = [
- dialect.execute_sequence_format(p) for p in parameters
- ]
- self.executemany = len(parameters) > 1
- if not dialect.supports_unicode_statements and isinstance(
- statement, util.text_type
- ):
- self.unicode_statement = statement
- self.statement = dialect._encoder(statement)[0]
- else:
- self.statement = self.unicode_statement = statement
- self.cursor = self.create_cursor()
- return self
- @classmethod
- def _init_default(
- cls, dialect, connection, dbapi_connection, execution_options
- ):
- """Initialize execution context for a ColumnDefault construct."""
- self = cls.__new__(cls)
- self.root_connection = connection
- self._dbapi_connection = dbapi_connection
- self.dialect = connection.dialect
- self.execution_options = execution_options
- self._is_future_result = (
- connection._is_future
- or self.execution_options.get("future_result", False)
- )
- self.cursor = self.create_cursor()
- return self
- def _get_cache_stats(self):
- if self.compiled is None:
- return "raw sql"
- now = util.perf_counter()
- ch = self.cache_hit
- if ch is NO_CACHE_KEY:
- return "no key %.5fs" % (now - self.compiled._gen_time,)
- elif ch is CACHE_HIT:
- return "cached since %.4gs ago" % (now - self.compiled._gen_time,)
- elif ch is CACHE_MISS:
- return "generated in %.5fs" % (now - self.compiled._gen_time,)
- elif ch is CACHING_DISABLED:
- return "caching disabled %.5fs" % (now - self.compiled._gen_time,)
- elif ch is NO_DIALECT_SUPPORT:
- return "dialect %s+%s does not support caching %.5fs" % (
- self.dialect.name,
- self.dialect.driver,
- now - self.compiled._gen_time,
- )
- else:
- return "unknown"
- @util.memoized_property
- def identifier_preparer(self):
- if self.compiled:
- return self.compiled.preparer
- elif "schema_translate_map" in self.execution_options:
- return self.dialect.identifier_preparer._with_schema_translate(
- self.execution_options["schema_translate_map"]
- )
- else:
- return self.dialect.identifier_preparer
- @util.memoized_property
- def engine(self):
- return self.root_connection.engine
- @util.memoized_property
- def postfetch_cols(self):
- return self.compiled.postfetch
- @util.memoized_property
- def prefetch_cols(self):
- if self.isinsert:
- return self.compiled.insert_prefetch
- elif self.isupdate:
- return self.compiled.update_prefetch
- else:
- return ()
- @util.memoized_property
- def returning_cols(self):
- self.compiled.returning
- @util.memoized_property
- def no_parameters(self):
- return self.execution_options.get("no_parameters", False)
- @util.memoized_property
- def should_autocommit(self):
- autocommit = self.execution_options.get(
- "autocommit",
- not self.compiled
- and self.statement
- and expression.PARSE_AUTOCOMMIT
- or False,
- )
- if autocommit is expression.PARSE_AUTOCOMMIT:
- return self.should_autocommit_text(self.unicode_statement)
- else:
- return autocommit
- def _execute_scalar(self, stmt, type_, parameters=None):
- """Execute a string statement on the current cursor, returning a
- scalar result.
- Used to fire off sequences, default phrases, and "select lastrowid"
- types of statements individually or in the context of a parent INSERT
- or UPDATE statement.
- """
- conn = self.root_connection
- if (
- isinstance(stmt, util.text_type)
- and not self.dialect.supports_unicode_statements
- ):
- stmt = self.dialect._encoder(stmt)[0]
- if "schema_translate_map" in self.execution_options:
- schema_translate_map = self.execution_options.get(
- "schema_translate_map", {}
- )
- rst = self.identifier_preparer._render_schema_translates
- stmt = rst(stmt, schema_translate_map)
- if not parameters:
- if self.dialect.positional:
- parameters = self.dialect.execute_sequence_format()
- else:
- parameters = {}
- conn._cursor_execute(self.cursor, stmt, parameters, context=self)
- r = self.cursor.fetchone()[0]
- if type_ is not None:
- # apply type post processors to the result
- proc = type_._cached_result_processor(
- self.dialect, self.cursor.description[0][1]
- )
- if proc:
- return proc(r)
- return r
- @property
- def connection(self):
- conn = self.root_connection
- if conn._is_future:
- return conn
- else:
- return conn._branch()
- def should_autocommit_text(self, statement):
- return AUTOCOMMIT_REGEXP.match(statement)
- def _use_server_side_cursor(self):
- if not self.dialect.supports_server_side_cursors:
- return False
- if self.dialect.server_side_cursors:
- # this is deprecated
- use_server_side = self.execution_options.get(
- "stream_results", True
- ) and (
- (
- self.compiled
- and isinstance(
- self.compiled.statement, expression.Selectable
- )
- or (
- (
- not self.compiled
- or isinstance(
- self.compiled.statement, expression.TextClause
- )
- )
- and self.unicode_statement
- and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement)
- )
- )
- )
- else:
- use_server_side = self.execution_options.get(
- "stream_results", False
- )
- return use_server_side
- def create_cursor(self):
- if (
- # inlining initial preference checks for SS cursors
- self.dialect.supports_server_side_cursors
- and (
- self.execution_options.get("stream_results", False)
- or (
- self.dialect.server_side_cursors
- and self._use_server_side_cursor()
- )
- )
- ):
- self._is_server_side = True
- return self.create_server_side_cursor()
- else:
- self._is_server_side = False
- return self.create_default_cursor()
- def create_default_cursor(self):
- return self._dbapi_connection.cursor()
- def create_server_side_cursor(self):
- raise NotImplementedError()
- def pre_exec(self):
- pass
- def get_out_parameter_values(self, names):
- raise NotImplementedError(
- "This dialect does not support OUT parameters"
- )
- def post_exec(self):
- pass
- def get_result_processor(self, type_, colname, coltype):
- """Return a 'result processor' for a given type as present in
- cursor.description.
- This has a default implementation that dialects can override
- for context-sensitive result type handling.
- """
- return type_._cached_result_processor(self.dialect, coltype)
- def get_lastrowid(self):
- """return self.cursor.lastrowid, or equivalent, after an INSERT.
- This may involve calling special cursor functions, issuing a new SELECT
- on the cursor (or a new one), or returning a stored value that was
- calculated within post_exec().
- This function will only be called for dialects which support "implicit"
- primary key generation, keep preexecute_autoincrement_sequences set to
- False, and when no explicit id value was bound to the statement.
- The function is called once for an INSERT statement that would need to
- return the last inserted primary key for those dialects that make use
- of the lastrowid concept. In these cases, it is called directly after
- :meth:`.ExecutionContext.post_exec`.
- """
- return self.cursor.lastrowid
- def handle_dbapi_exception(self, e):
- pass
- @property
- def rowcount(self):
- return self.cursor.rowcount
- def supports_sane_rowcount(self):
- return self.dialect.supports_sane_rowcount
- def supports_sane_multi_rowcount(self):
- return self.dialect.supports_sane_multi_rowcount
- def _setup_result_proxy(self):
- if self.is_crud or self.is_text:
- result = self._setup_dml_or_text_result()
- else:
- strategy = self.cursor_fetch_strategy
- if self._is_server_side and strategy is _cursor._DEFAULT_FETCH:
- strategy = _cursor.BufferedRowCursorFetchStrategy(
- self.cursor, self.execution_options
- )
- cursor_description = (
- strategy.alternate_cursor_description
- or self.cursor.description
- )
- if cursor_description is None:
- strategy = _cursor._NO_CURSOR_DQL
- if self._is_future_result:
- if self.root_connection.should_close_with_result:
- raise exc.InvalidRequestError(
- "can't use future_result=True with close_with_result"
- )
- result = _cursor.CursorResult(
- self, strategy, cursor_description
- )
- else:
- result = _cursor.LegacyCursorResult(
- self, strategy, cursor_description
- )
- if (
- self.compiled
- and not self.isddl
- and self.compiled.has_out_parameters
- ):
- self._setup_out_parameters(result)
- self._soft_closed = result._soft_closed
- return result
- def _setup_out_parameters(self, result):
- out_bindparams = [
- (param, name)
- for param, name in self.compiled.bind_names.items()
- if param.isoutparam
- ]
- out_parameters = {}
- for bindparam, raw_value in zip(
- [param for param, name in out_bindparams],
- self.get_out_parameter_values(
- [name for param, name in out_bindparams]
- ),
- ):
- type_ = bindparam.type
- impl_type = type_.dialect_impl(self.dialect)
- dbapi_type = impl_type.get_dbapi_type(self.dialect.dbapi)
- result_processor = impl_type.result_processor(
- self.dialect, dbapi_type
- )
- if result_processor is not None:
- raw_value = result_processor(raw_value)
- out_parameters[bindparam.key] = raw_value
- result.out_parameters = out_parameters
- def _setup_dml_or_text_result(self):
- if self.isinsert:
- if self.compiled.postfetch_lastrowid:
- self.inserted_primary_key_rows = (
- self._setup_ins_pk_from_lastrowid()
- )
- # else if not self._is_implicit_returning,
- # the default inserted_primary_key_rows accessor will
- # return an "empty" primary key collection when accessed.
- strategy = self.cursor_fetch_strategy
- if self._is_server_side and strategy is _cursor._DEFAULT_FETCH:
- strategy = _cursor.BufferedRowCursorFetchStrategy(
- self.cursor, self.execution_options
- )
- cursor_description = (
- strategy.alternate_cursor_description or self.cursor.description
- )
- if cursor_description is None:
- strategy = _cursor._NO_CURSOR_DML
- if self._is_future_result:
- result = _cursor.CursorResult(self, strategy, cursor_description)
- else:
- result = _cursor.LegacyCursorResult(
- self, strategy, cursor_description
- )
- if self.isinsert:
- if self._is_implicit_returning:
- rows = result.all()
- self.returned_default_rows = rows
- self.inserted_primary_key_rows = (
- self._setup_ins_pk_from_implicit_returning(result, rows)
- )
- # test that it has a cursor metadata that is accurate. the
- # first row will have been fetched and current assumptions
- # are that the result has only one row, until executemany()
- # support is added here.
- assert result._metadata.returns_rows
- result._soft_close()
- elif not self._is_explicit_returning:
- result._soft_close()
- # we assume here the result does not return any rows.
- # *usually*, this will be true. However, some dialects
- # such as that of MSSQL/pyodbc need to SELECT a post fetch
- # function so this is not necessarily true.
- # assert not result.returns_rows
- elif self.isupdate and self._is_implicit_returning:
- row = result.fetchone()
- self.returned_default_rows = [row]
- result._soft_close()
- # test that it has a cursor metadata that is accurate.
- # the rows have all been fetched however.
- assert result._metadata.returns_rows
- elif not result._metadata.returns_rows:
- # no results, get rowcount
- # (which requires open cursor on some drivers
- # such as kintersbasdb, mxodbc)
- result.rowcount
- result._soft_close()
- return result
- @util.memoized_property
- def inserted_primary_key_rows(self):
- # if no specific "get primary key" strategy was set up
- # during execution, return a "default" primary key based
- # on what's in the compiled_parameters and nothing else.
- return self._setup_ins_pk_from_empty()
- def _setup_ins_pk_from_lastrowid(self):
- getter = self.compiled._inserted_primary_key_from_lastrowid_getter
- lastrowid = self.get_lastrowid()
- return [getter(lastrowid, self.compiled_parameters[0])]
- def _setup_ins_pk_from_empty(self):
- getter = self.compiled._inserted_primary_key_from_lastrowid_getter
- return [getter(None, param) for param in self.compiled_parameters]
- def _setup_ins_pk_from_implicit_returning(self, result, rows):
- if not rows:
- return []
- getter = self.compiled._inserted_primary_key_from_returning_getter
- compiled_params = self.compiled_parameters
- return [
- getter(row, param) for row, param in zip(rows, compiled_params)
- ]
- def lastrow_has_defaults(self):
- return (self.isinsert or self.isupdate) and bool(
- self.compiled.postfetch
- )
- def _set_input_sizes(self):
- """Given a cursor and ClauseParameters, call the appropriate
- style of ``setinputsizes()`` on the cursor, using DB-API types
- from the bind parameter's ``TypeEngine`` objects.
- This method only called by those dialects which require it,
- currently cx_oracle, asyncpg and pg8000.
- """
- if self.isddl or self.is_text:
- return
- inputsizes = self.compiled._get_set_input_sizes_lookup(
- include_types=self.include_set_input_sizes,
- exclude_types=self.exclude_set_input_sizes,
- )
- if inputsizes is None:
- return
- if self.dialect._has_events:
- inputsizes = dict(inputsizes)
- self.dialect.dispatch.do_setinputsizes(
- inputsizes, self.cursor, self.statement, self.parameters, self
- )
- has_escaped_names = bool(self.compiled.escaped_bind_names)
- if has_escaped_names:
- escaped_bind_names = self.compiled.escaped_bind_names
- if self.dialect.positional:
- items = [
- (key, self.compiled.binds[key])
- for key in self.compiled.positiontup
- ]
- else:
- items = [
- (key, bindparam)
- for bindparam, key in self.compiled.bind_names.items()
- ]
- generic_inputsizes = []
- for key, bindparam in items:
- if bindparam in self.compiled.literal_execute_params:
- continue
- if key in self._expanded_parameters:
- if bindparam.type._is_tuple_type:
- num = len(bindparam.type.types)
- dbtypes = inputsizes[bindparam]
- generic_inputsizes.extend(
- (
- (
- escaped_bind_names.get(paramname, paramname)
- if has_escaped_names
- else paramname
- ),
- dbtypes[idx % num],
- bindparam.type.types[idx % num],
- )
- for idx, paramname in enumerate(
- self._expanded_parameters[key]
- )
- )
- else:
- dbtype = inputsizes.get(bindparam, None)
- generic_inputsizes.extend(
- (
- (
- escaped_bind_names.get(paramname, paramname)
- if has_escaped_names
- else paramname
- ),
- dbtype,
- bindparam.type,
- )
- for paramname in self._expanded_parameters[key]
- )
- else:
- dbtype = inputsizes.get(bindparam, None)
- escaped_name = (
- escaped_bind_names.get(key, key)
- if has_escaped_names
- else key
- )
- generic_inputsizes.append(
- (escaped_name, dbtype, bindparam.type)
- )
- try:
- self.dialect.do_set_input_sizes(
- self.cursor, generic_inputsizes, self
- )
- except BaseException as e:
- self.root_connection._handle_dbapi_exception(
- e, None, None, None, self
- )
- def _exec_default(self, column, default, type_):
- if default.is_sequence:
- return self.fire_sequence(default, type_)
- elif default.is_callable:
- self.current_column = column
- return default.arg(self)
- elif default.is_clause_element:
- return self._exec_default_clause_element(column, default, type_)
- else:
- return default.arg
- def _exec_default_clause_element(self, column, default, type_):
- # execute a default that's a complete clause element. Here, we have
- # to re-implement a miniature version of the compile->parameters->
- # cursor.execute() sequence, since we don't want to modify the state
- # of the connection / result in progress or create new connection/
- # result objects etc.
- # .. versionchanged:: 1.4
- if not default._arg_is_typed:
- default_arg = expression.type_coerce(default.arg, type_)
- else:
- default_arg = default.arg
- compiled = expression.select(default_arg).compile(dialect=self.dialect)
- compiled_params = compiled.construct_params()
- processors = compiled._bind_processors
- if compiled.positional:
- positiontup = compiled.positiontup
- parameters = self.dialect.execute_sequence_format(
- [
- processors[key](compiled_params[key])
- if key in processors
- else compiled_params[key]
- for key in positiontup
- ]
- )
- else:
- parameters = dict(
- (
- key,
- processors[key](compiled_params[key])
- if key in processors
- else compiled_params[key],
- )
- for key in compiled_params
- )
- return self._execute_scalar(
- util.text_type(compiled), type_, parameters=parameters
- )
- current_parameters = None
- """A dictionary of parameters applied to the current row.
- This attribute is only available in the context of a user-defined default
- generation function, e.g. as described at :ref:`context_default_functions`.
- It consists of a dictionary which includes entries for each column/value
- pair that is to be part of the INSERT or UPDATE statement. The keys of the
- dictionary will be the key value of each :class:`_schema.Column`,
- which is usually
- synonymous with the name.
- Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute
- does not accommodate for the "multi-values" feature of the
- :meth:`_expression.Insert.values` method. The
- :meth:`.DefaultExecutionContext.get_current_parameters` method should be
- preferred.
- .. seealso::
- :meth:`.DefaultExecutionContext.get_current_parameters`
- :ref:`context_default_functions`
- """
- def get_current_parameters(self, isolate_multiinsert_groups=True):
- """Return a dictionary of parameters applied to the current row.
- This method can only be used in the context of a user-defined default
- generation function, e.g. as described at
- :ref:`context_default_functions`. When invoked, a dictionary is
- returned which includes entries for each column/value pair that is part
- of the INSERT or UPDATE statement. The keys of the dictionary will be
- the key value of each :class:`_schema.Column`,
- which is usually synonymous
- with the name.
- :param isolate_multiinsert_groups=True: indicates that multi-valued
- INSERT constructs created using :meth:`_expression.Insert.values`
- should be
- handled by returning only the subset of parameters that are local
- to the current column default invocation. When ``False``, the
- raw parameters of the statement are returned including the
- naming convention used in the case of multi-valued INSERT.
- .. versionadded:: 1.2 added
- :meth:`.DefaultExecutionContext.get_current_parameters`
- which provides more functionality over the existing
- :attr:`.DefaultExecutionContext.current_parameters`
- attribute.
- .. seealso::
- :attr:`.DefaultExecutionContext.current_parameters`
- :ref:`context_default_functions`
- """
- try:
- parameters = self.current_parameters
- column = self.current_column
- except AttributeError:
- raise exc.InvalidRequestError(
- "get_current_parameters() can only be invoked in the "
- "context of a Python side column default function"
- )
- compile_state = self.compiled.compile_state
- if (
- isolate_multiinsert_groups
- and self.isinsert
- and compile_state._has_multi_parameters
- ):
- if column._is_multiparam_column:
- index = column.index + 1
- d = {column.original.key: parameters[column.key]}
- else:
- d = {column.key: parameters[column.key]}
- index = 0
- keys = compile_state._dict_parameters.keys()
- d.update(
- (key, parameters["%s_m%d" % (key, index)]) for key in keys
- )
- return d
- else:
- return parameters
- def get_insert_default(self, column):
- if column.default is None:
- return None
- else:
- return self._exec_default(column, column.default, column.type)
- def get_update_default(self, column):
- if column.onupdate is None:
- return None
- else:
- return self._exec_default(column, column.onupdate, column.type)
- def _process_executemany_defaults(self):
- key_getter = self.compiled._key_getters_for_crud_column[2]
- scalar_defaults = {}
- insert_prefetch = self.compiled.insert_prefetch
- update_prefetch = self.compiled.update_prefetch
- # pre-determine scalar Python-side defaults
- # to avoid many calls of get_insert_default()/
- # get_update_default()
- for c in insert_prefetch:
- if c.default and not c.default.is_sequence and c.default.is_scalar:
- scalar_defaults[c] = c.default.arg
- for c in update_prefetch:
- if c.onupdate and c.onupdate.is_scalar:
- scalar_defaults[c] = c.onupdate.arg
- for param in self.compiled_parameters:
- self.current_parameters = param
- for c in insert_prefetch:
- if c in scalar_defaults:
- val = scalar_defaults[c]
- else:
- val = self.get_insert_default(c)
- if val is not None:
- param[key_getter(c)] = val
- for c in update_prefetch:
- if c in scalar_defaults:
- val = scalar_defaults[c]
- else:
- val = self.get_update_default(c)
- if val is not None:
- param[key_getter(c)] = val
- del self.current_parameters
- def _process_executesingle_defaults(self):
- key_getter = self.compiled._key_getters_for_crud_column[2]
- self.current_parameters = (
- compiled_parameters
- ) = self.compiled_parameters[0]
- for c in self.compiled.insert_prefetch:
- if c.default and not c.default.is_sequence and c.default.is_scalar:
- val = c.default.arg
- else:
- val = self.get_insert_default(c)
- if val is not None:
- compiled_parameters[key_getter(c)] = val
- for c in self.compiled.update_prefetch:
- val = self.get_update_default(c)
- if val is not None:
- compiled_parameters[key_getter(c)] = val
- del self.current_parameters
- DefaultDialect.execution_ctx_cls = DefaultExecutionContext
|