psycopg2.py 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050
  1. # postgresql/psycopg2.py
  2. # Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. r"""
  8. .. dialect:: postgresql+psycopg2
  9. :name: psycopg2
  10. :dbapi: psycopg2
  11. :connectstring: postgresql+psycopg2://user:password@host:port/dbname[?key=value&key=value...]
  12. :url: https://pypi.org/project/psycopg2/
  13. psycopg2 Connect Arguments
  14. --------------------------
  15. Keyword arguments that are specific to the SQLAlchemy psycopg2 dialect
  16. may be passed to :func:`_sa.create_engine()`, and include the following:
  17. * ``isolation_level``: This option, available for all PostgreSQL dialects,
  18. includes the ``AUTOCOMMIT`` isolation level when using the psycopg2
  19. dialect. This option sets the **default** isolation level for the
  20. connection that is set immediately upon connection to the database before
  21. the connection is pooled. This option is generally superseded by the more
  22. modern :paramref:`_engine.Connection.execution_options.isolation_level`
  23. execution option, detailed at :ref:`dbapi_autocommit`.
  24. .. seealso::
  25. :ref:`psycopg2_isolation_level`
  26. :ref:`dbapi_autocommit`
  27. * ``client_encoding``: sets the client encoding in a libpq-agnostic way,
  28. using psycopg2's ``set_client_encoding()`` method.
  29. .. seealso::
  30. :ref:`psycopg2_unicode`
  31. * ``use_native_unicode``: Under Python 2 only, this can be set to False to
  32. disable the use of psycopg2's native Unicode support.
  33. .. seealso::
  34. :ref:`psycopg2_disable_native_unicode`
  35. * ``executemany_mode``, ``executemany_batch_page_size``,
  36. ``executemany_values_page_size``: Allows use of psycopg2
  37. extensions for optimizing "executemany"-style queries. See the referenced
  38. section below for details.
  39. .. seealso::
  40. :ref:`psycopg2_executemany_mode`
  41. .. tip::
  42. The above keyword arguments are **dialect** keyword arguments, meaning
  43. that they are passed as explicit keyword arguments to :func:`_sa.create_engine()`::
  44. engine = create_engine(
  45. "postgresql+psycopg2://scott:tiger@localhost/test",
  46. isolation_level="SERIALIZABLE",
  47. )
  48. These should not be confused with **DBAPI** connect arguments, which
  49. are passed as part of the :paramref:`_sa.create_engine.connect_args`
  50. dictionary and/or are passed in the URL query string, as detailed in
  51. the section :ref:`custom_dbapi_args`.
  52. .. _psycopg2_ssl:
  53. SSL Connections
  54. ---------------
  55. The psycopg2 module has a connection argument named ``sslmode`` for
  56. controlling its behavior regarding secure (SSL) connections. The default is
  57. ``sslmode=prefer``; it will attempt an SSL connection and if that fails it
  58. will fall back to an unencrypted connection. ``sslmode=require`` may be used
  59. to ensure that only secure connections are established. Consult the
  60. psycopg2 / libpq documentation for further options that are available.
  61. Note that ``sslmode`` is specific to psycopg2 so it is included in the
  62. connection URI::
  63. engine = sa.create_engine(
  64. "postgresql+psycopg2://scott:tiger@192.168.0.199:5432/test?sslmode=require"
  65. )
  66. Unix Domain Connections
  67. ------------------------
  68. psycopg2 supports connecting via Unix domain connections. When the ``host``
  69. portion of the URL is omitted, SQLAlchemy passes ``None`` to psycopg2,
  70. which specifies Unix-domain communication rather than TCP/IP communication::
  71. create_engine("postgresql+psycopg2://user:password@/dbname")
  72. By default, the socket file used is to connect to a Unix-domain socket
  73. in ``/tmp``, or whatever socket directory was specified when PostgreSQL
  74. was built. This value can be overridden by passing a pathname to psycopg2,
  75. using ``host`` as an additional keyword argument::
  76. create_engine("postgresql+psycopg2://user:password@/dbname?host=/var/lib/postgresql")
  77. .. seealso::
  78. `PQconnectdbParams \
  79. <https://www.postgresql.org/docs/9.1/static/libpq-connect.html#LIBPQ-PQCONNECTDBPARAMS>`_
  80. .. _psycopg2_multi_host:
  81. Specifying multiple fallback hosts
  82. -----------------------------------
  83. psycopg2 supports multiple connection points in the connection string.
  84. When the ``host`` parameter is used multiple times in the query section of
  85. the URL, SQLAlchemy will create a single string of the host and port
  86. information provided to make the connections::
  87. create_engine(
  88. "postgresql+psycopg2://user:password@/dbname?host=HostA:port1&host=HostB&host=HostC"
  89. )
  90. A connection to each host is then attempted until either a connection is successful
  91. or all connections are unsuccessful in which case an error is raised.
  92. .. versionadded:: 1.3.20 Support for multiple hosts in PostgreSQL connection
  93. string.
  94. .. seealso::
  95. `PQConnString \
  96. <https://www.postgresql.org/docs/10/libpq-connect.html#LIBPQ-CONNSTRING>`_
  97. Empty DSN Connections / Environment Variable Connections
  98. ---------------------------------------------------------
  99. The psycopg2 DBAPI can connect to PostgreSQL by passing an empty DSN to the
  100. libpq client library, which by default indicates to connect to a localhost
  101. PostgreSQL database that is open for "trust" connections. This behavior can be
  102. further tailored using a particular set of environment variables which are
  103. prefixed with ``PG_...``, which are consumed by ``libpq`` to take the place of
  104. any or all elements of the connection string.
  105. For this form, the URL can be passed without any elements other than the
  106. initial scheme::
  107. engine = create_engine('postgresql+psycopg2://')
  108. In the above form, a blank "dsn" string is passed to the ``psycopg2.connect()``
  109. function which in turn represents an empty DSN passed to libpq.
  110. .. versionadded:: 1.3.2 support for parameter-less connections with psycopg2.
  111. .. seealso::
  112. `Environment Variables\
  113. <https://www.postgresql.org/docs/current/libpq-envars.html>`_ -
  114. PostgreSQL documentation on how to use ``PG_...``
  115. environment variables for connections.
  116. .. _psycopg2_execution_options:
  117. Per-Statement/Connection Execution Options
  118. -------------------------------------------
  119. The following DBAPI-specific options are respected when used with
  120. :meth:`_engine.Connection.execution_options`,
  121. :meth:`.Executable.execution_options`,
  122. :meth:`_query.Query.execution_options`,
  123. in addition to those not specific to DBAPIs:
  124. * ``isolation_level`` - Set the transaction isolation level for the lifespan
  125. of a :class:`_engine.Connection` (can only be set on a connection,
  126. not a statement
  127. or query). See :ref:`psycopg2_isolation_level`.
  128. * ``stream_results`` - Enable or disable usage of psycopg2 server side
  129. cursors - this feature makes use of "named" cursors in combination with
  130. special result handling methods so that result rows are not fully buffered.
  131. Defaults to False, meaning cursors are buffered by default.
  132. * ``max_row_buffer`` - when using ``stream_results``, an integer value that
  133. specifies the maximum number of rows to buffer at a time. This is
  134. interpreted by the :class:`.BufferedRowCursorResult`, and if omitted the
  135. buffer will grow to ultimately store 1000 rows at a time.
  136. .. versionchanged:: 1.4 The ``max_row_buffer`` size can now be greater than
  137. 1000, and the buffer will grow to that size.
  138. .. _psycopg2_batch_mode:
  139. .. _psycopg2_executemany_mode:
  140. Psycopg2 Fast Execution Helpers
  141. -------------------------------
  142. Modern versions of psycopg2 include a feature known as
  143. `Fast Execution Helpers \
  144. <https://initd.org/psycopg/docs/extras.html#fast-execution-helpers>`_, which
  145. have been shown in benchmarking to improve psycopg2's executemany()
  146. performance, primarily with INSERT statements, by multiple orders of magnitude.
  147. SQLAlchemy internally makes use of these extensions for ``executemany()`` style
  148. calls, which correspond to lists of parameters being passed to
  149. :meth:`_engine.Connection.execute` as detailed in :ref:`multiple parameter
  150. sets <execute_multiple>`. The ORM also uses this mode internally whenever
  151. possible.
  152. The two available extensions on the psycopg2 side are the ``execute_values()``
  153. and ``execute_batch()`` functions. The psycopg2 dialect defaults to using the
  154. ``execute_values()`` extension for all qualifying INSERT statements.
  155. .. versionchanged:: 1.4 The psycopg2 dialect now defaults to a new mode
  156. ``"values_only"`` for ``executemany_mode``, which allows an order of
  157. magnitude performance improvement for INSERT statements, but does not
  158. include "batch" mode for UPDATE and DELETE statements which removes the
  159. ability of ``cursor.rowcount`` to function correctly.
  160. The use of these extensions is controlled by the ``executemany_mode`` flag
  161. which may be passed to :func:`_sa.create_engine`::
  162. engine = create_engine(
  163. "postgresql+psycopg2://scott:tiger@host/dbname",
  164. executemany_mode='values_plus_batch')
  165. Possible options for ``executemany_mode`` include:
  166. * ``values_only`` - this is the default value. the psycopg2 execute_values()
  167. extension is used for qualifying INSERT statements, which rewrites the INSERT
  168. to include multiple VALUES clauses so that many parameter sets can be
  169. inserted with one statement.
  170. .. versionadded:: 1.4 Added ``"values_only"`` setting for ``executemany_mode``
  171. which is also now the default.
  172. * ``None`` - No psycopg2 extensions are not used, and the usual
  173. ``cursor.executemany()`` method is used when invoking statements with
  174. multiple parameter sets.
  175. * ``'batch'`` - Uses ``psycopg2.extras.execute_batch`` for all qualifying
  176. INSERT, UPDATE and DELETE statements, so that multiple copies
  177. of a SQL query, each one corresponding to a parameter set passed to
  178. ``executemany()``, are joined into a single SQL string separated by a
  179. semicolon. When using this mode, the :attr:`_engine.CursorResult.rowcount`
  180. attribute will not contain a value for executemany-style executions.
  181. * ``'values_plus_batch'``- ``execute_values`` is used for qualifying INSERT
  182. statements, ``execute_batch`` is used for UPDATE and DELETE.
  183. When using this mode, the :attr:`_engine.CursorResult.rowcount`
  184. attribute will not contain a value for executemany-style executions against
  185. UPDATE and DELETE statements.
  186. By "qualifying statements", we mean that the statement being executed
  187. must be a Core :func:`_expression.insert`, :func:`_expression.update`
  188. or :func:`_expression.delete` construct, and not a plain textual SQL
  189. string or one constructed using :func:`_expression.text`. When using the
  190. ORM, all insert/update/delete statements used by the ORM flush process
  191. are qualifying.
  192. The "page size" for the "values" and "batch" strategies can be affected
  193. by using the ``executemany_batch_page_size`` and
  194. ``executemany_values_page_size`` engine parameters. These
  195. control how many parameter sets
  196. should be represented in each execution. The "values" page size defaults
  197. to 1000, which is different that psycopg2's default. The "batch" page
  198. size defaults to 100. These can be affected by passing new values to
  199. :func:`_engine.create_engine`::
  200. engine = create_engine(
  201. "postgresql+psycopg2://scott:tiger@host/dbname",
  202. executemany_mode='values',
  203. executemany_values_page_size=10000, executemany_batch_page_size=500)
  204. .. versionchanged:: 1.4
  205. The default for ``executemany_values_page_size`` is now 1000, up from
  206. 100.
  207. .. seealso::
  208. :ref:`execute_multiple` - General information on using the
  209. :class:`_engine.Connection`
  210. object to execute statements in such a way as to make
  211. use of the DBAPI ``.executemany()`` method.
  212. .. _psycopg2_unicode:
  213. Unicode with Psycopg2
  214. ----------------------
  215. The psycopg2 DBAPI driver supports Unicode data transparently. Under Python 2
  216. only, the SQLAlchemy psycopg2 dialect will enable the
  217. ``psycopg2.extensions.UNICODE`` extension by default to ensure Unicode is
  218. handled properly; under Python 3, this is psycopg2's default behavior.
  219. The client character encoding can be controlled for the psycopg2 dialect
  220. in the following ways:
  221. * For PostgreSQL 9.1 and above, the ``client_encoding`` parameter may be
  222. passed in the database URL; this parameter is consumed by the underlying
  223. ``libpq`` PostgreSQL client library::
  224. engine = create_engine("postgresql+psycopg2://user:pass@host/dbname?client_encoding=utf8")
  225. Alternatively, the above ``client_encoding`` value may be passed using
  226. :paramref:`_sa.create_engine.connect_args` for programmatic establishment with
  227. ``libpq``::
  228. engine = create_engine(
  229. "postgresql+psycopg2://user:pass@host/dbname",
  230. connect_args={'client_encoding': 'utf8'}
  231. )
  232. * For all PostgreSQL versions, psycopg2 supports a client-side encoding
  233. value that will be passed to database connections when they are first
  234. established. The SQLAlchemy psycopg2 dialect supports this using the
  235. ``client_encoding`` parameter passed to :func:`_sa.create_engine`::
  236. engine = create_engine(
  237. "postgresql+psycopg2://user:pass@host/dbname",
  238. client_encoding="utf8"
  239. )
  240. .. tip:: The above ``client_encoding`` parameter admittedly is very similar
  241. in appearance to usage of the parameter within the
  242. :paramref:`_sa.create_engine.connect_args` dictionary; the difference
  243. above is that the parameter is consumed by psycopg2 and is
  244. passed to the database connection using ``SET client_encoding TO
  245. 'utf8'``; in the previously mentioned style, the parameter is instead
  246. passed through psycopg2 and consumed by the ``libpq`` library.
  247. * A common way to set up client encoding with PostgreSQL databases is to
  248. ensure it is configured within the server-side postgresql.conf file;
  249. this is the recommended way to set encoding for a server that is
  250. consistently of one encoding in all databases::
  251. # postgresql.conf file
  252. # client_encoding = sql_ascii # actually, defaults to database
  253. # encoding
  254. client_encoding = utf8
  255. .. _psycopg2_disable_native_unicode:
  256. Disabling Native Unicode
  257. ^^^^^^^^^^^^^^^^^^^^^^^^
  258. Under Python 2 only, SQLAlchemy can also be instructed to skip the usage of the
  259. psycopg2 ``UNICODE`` extension and to instead utilize its own unicode
  260. encode/decode services, which are normally reserved only for those DBAPIs that
  261. don't fully support unicode directly. Passing ``use_native_unicode=False`` to
  262. :func:`_sa.create_engine` will disable usage of ``psycopg2.extensions.
  263. UNICODE``. SQLAlchemy will instead encode data itself into Python bytestrings
  264. on the way in and coerce from bytes on the way back, using the value of the
  265. :func:`_sa.create_engine` ``encoding`` parameter, which defaults to ``utf-8``.
  266. SQLAlchemy's own unicode encode/decode functionality is steadily becoming
  267. obsolete as most DBAPIs now support unicode fully.
  268. Transactions
  269. ------------
  270. The psycopg2 dialect fully supports SAVEPOINT and two-phase commit operations.
  271. .. _psycopg2_isolation_level:
  272. Psycopg2 Transaction Isolation Level
  273. -------------------------------------
  274. As discussed in :ref:`postgresql_isolation_level`,
  275. all PostgreSQL dialects support setting of transaction isolation level
  276. both via the ``isolation_level`` parameter passed to :func:`_sa.create_engine`
  277. ,
  278. as well as the ``isolation_level`` argument used by
  279. :meth:`_engine.Connection.execution_options`. When using the psycopg2 dialect
  280. , these
  281. options make use of psycopg2's ``set_isolation_level()`` connection method,
  282. rather than emitting a PostgreSQL directive; this is because psycopg2's
  283. API-level setting is always emitted at the start of each transaction in any
  284. case.
  285. The psycopg2 dialect supports these constants for isolation level:
  286. * ``READ COMMITTED``
  287. * ``READ UNCOMMITTED``
  288. * ``REPEATABLE READ``
  289. * ``SERIALIZABLE``
  290. * ``AUTOCOMMIT``
  291. .. seealso::
  292. :ref:`postgresql_isolation_level`
  293. :ref:`pg8000_isolation_level`
  294. NOTICE logging
  295. ---------------
  296. The psycopg2 dialect will log PostgreSQL NOTICE messages
  297. via the ``sqlalchemy.dialects.postgresql`` logger. When this logger
  298. is set to the ``logging.INFO`` level, notice messages will be logged::
  299. import logging
  300. logging.getLogger('sqlalchemy.dialects.postgresql').setLevel(logging.INFO)
  301. Above, it is assumed that logging is configured externally. If this is not
  302. the case, configuration such as ``logging.basicConfig()`` must be utilized::
  303. import logging
  304. logging.basicConfig() # log messages to stdout
  305. logging.getLogger('sqlalchemy.dialects.postgresql').setLevel(logging.INFO)
  306. .. seealso::
  307. `Logging HOWTO <https://docs.python.org/3/howto/logging.html>`_ - on the python.org website
  308. .. _psycopg2_hstore:
  309. HSTORE type
  310. ------------
  311. The ``psycopg2`` DBAPI includes an extension to natively handle marshalling of
  312. the HSTORE type. The SQLAlchemy psycopg2 dialect will enable this extension
  313. by default when psycopg2 version 2.4 or greater is used, and
  314. it is detected that the target database has the HSTORE type set up for use.
  315. In other words, when the dialect makes the first
  316. connection, a sequence like the following is performed:
  317. 1. Request the available HSTORE oids using
  318. ``psycopg2.extras.HstoreAdapter.get_oids()``.
  319. If this function returns a list of HSTORE identifiers, we then determine
  320. that the ``HSTORE`` extension is present.
  321. This function is **skipped** if the version of psycopg2 installed is
  322. less than version 2.4.
  323. 2. If the ``use_native_hstore`` flag is at its default of ``True``, and
  324. we've detected that ``HSTORE`` oids are available, the
  325. ``psycopg2.extensions.register_hstore()`` extension is invoked for all
  326. connections.
  327. The ``register_hstore()`` extension has the effect of **all Python
  328. dictionaries being accepted as parameters regardless of the type of target
  329. column in SQL**. The dictionaries are converted by this extension into a
  330. textual HSTORE expression. If this behavior is not desired, disable the
  331. use of the hstore extension by setting ``use_native_hstore`` to ``False`` as
  332. follows::
  333. engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test",
  334. use_native_hstore=False)
  335. The ``HSTORE`` type is **still supported** when the
  336. ``psycopg2.extensions.register_hstore()`` extension is not used. It merely
  337. means that the coercion between Python dictionaries and the HSTORE
  338. string format, on both the parameter side and the result side, will take
  339. place within SQLAlchemy's own marshalling logic, and not that of ``psycopg2``
  340. which may be more performant.
  341. """ # noqa
  342. from __future__ import absolute_import
  343. import decimal
  344. import logging
  345. import re
  346. from uuid import UUID as _python_UUID
  347. from .array import ARRAY as PGARRAY
  348. from .base import _ColonCast
  349. from .base import _DECIMAL_TYPES
  350. from .base import _FLOAT_TYPES
  351. from .base import _INT_TYPES
  352. from .base import ENUM
  353. from .base import PGCompiler
  354. from .base import PGDialect
  355. from .base import PGExecutionContext
  356. from .base import PGIdentifierPreparer
  357. from .base import UUID
  358. from .hstore import HSTORE
  359. from .json import JSON
  360. from .json import JSONB
  361. from ... import exc
  362. from ... import processors
  363. from ... import types as sqltypes
  364. from ... import util
  365. from ...engine import cursor as _cursor
  366. from ...util import collections_abc
  367. logger = logging.getLogger("sqlalchemy.dialects.postgresql")
  368. class _PGNumeric(sqltypes.Numeric):
  369. def bind_processor(self, dialect):
  370. return None
  371. def result_processor(self, dialect, coltype):
  372. if self.asdecimal:
  373. if coltype in _FLOAT_TYPES:
  374. return processors.to_decimal_processor_factory(
  375. decimal.Decimal, self._effective_decimal_return_scale
  376. )
  377. elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
  378. # pg8000 returns Decimal natively for 1700
  379. return None
  380. else:
  381. raise exc.InvalidRequestError(
  382. "Unknown PG numeric type: %d" % coltype
  383. )
  384. else:
  385. if coltype in _FLOAT_TYPES:
  386. # pg8000 returns float natively for 701
  387. return None
  388. elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
  389. return processors.to_float
  390. else:
  391. raise exc.InvalidRequestError(
  392. "Unknown PG numeric type: %d" % coltype
  393. )
  394. class _PGEnum(ENUM):
  395. def result_processor(self, dialect, coltype):
  396. if util.py2k and self._expect_unicode is True:
  397. # for py2k, if the enum type needs unicode data (which is set up as
  398. # part of the Enum() constructor based on values passed as py2k
  399. # unicode objects) we have to use our own converters since
  400. # psycopg2's don't work, a rare exception to the "modern DBAPIs
  401. # support unicode everywhere" theme of deprecating
  402. # convert_unicode=True. Use the special "force_nocheck" directive
  403. # which forces unicode conversion to happen on the Python side
  404. # without an isinstance() check. in py3k psycopg2 does the right
  405. # thing automatically.
  406. self._expect_unicode = "force_nocheck"
  407. return super(_PGEnum, self).result_processor(dialect, coltype)
  408. class _PGHStore(HSTORE):
  409. def bind_processor(self, dialect):
  410. if dialect._has_native_hstore:
  411. return None
  412. else:
  413. return super(_PGHStore, self).bind_processor(dialect)
  414. def result_processor(self, dialect, coltype):
  415. if dialect._has_native_hstore:
  416. return None
  417. else:
  418. return super(_PGHStore, self).result_processor(dialect, coltype)
  419. class _PGARRAY(PGARRAY):
  420. def bind_expression(self, bindvalue):
  421. return _ColonCast(bindvalue, self)
  422. class _PGJSON(JSON):
  423. def result_processor(self, dialect, coltype):
  424. return None
  425. class _PGJSONB(JSONB):
  426. def result_processor(self, dialect, coltype):
  427. return None
  428. class _PGUUID(UUID):
  429. def bind_processor(self, dialect):
  430. if not self.as_uuid and dialect.use_native_uuid:
  431. def process(value):
  432. if value is not None:
  433. value = _python_UUID(value)
  434. return value
  435. return process
  436. def result_processor(self, dialect, coltype):
  437. if not self.as_uuid and dialect.use_native_uuid:
  438. def process(value):
  439. if value is not None:
  440. value = str(value)
  441. return value
  442. return process
  443. _server_side_id = util.counter()
  444. class PGExecutionContext_psycopg2(PGExecutionContext):
  445. _psycopg2_fetched_rows = None
  446. def create_server_side_cursor(self):
  447. # use server-side cursors:
  448. # https://lists.initd.org/pipermail/psycopg/2007-January/005251.html
  449. ident = "c_%s_%s" % (hex(id(self))[2:], hex(_server_side_id())[2:])
  450. return self._dbapi_connection.cursor(ident)
  451. def post_exec(self):
  452. if (
  453. self._psycopg2_fetched_rows
  454. and self.compiled
  455. and self.compiled.returning
  456. ):
  457. # psycopg2 execute_values will provide for a real cursor where
  458. # cursor.description works correctly. however, it executes the
  459. # INSERT statement multiple times for multiple pages of rows, so
  460. # while this cursor also supports calling .fetchall() directly, in
  461. # order to get the list of all rows inserted across multiple pages,
  462. # we have to retrieve the aggregated list from the execute_values()
  463. # function directly.
  464. strat_cls = _cursor.FullyBufferedCursorFetchStrategy
  465. self.cursor_fetch_strategy = strat_cls(
  466. self.cursor, initial_buffer=self._psycopg2_fetched_rows
  467. )
  468. self._log_notices(self.cursor)
  469. def _log_notices(self, cursor):
  470. # check also that notices is an iterable, after it's already
  471. # established that we will be iterating through it. This is to get
  472. # around test suites such as SQLAlchemy's using a Mock object for
  473. # cursor
  474. if not cursor.connection.notices or not isinstance(
  475. cursor.connection.notices, collections_abc.Iterable
  476. ):
  477. return
  478. for notice in cursor.connection.notices:
  479. # NOTICE messages have a
  480. # newline character at the end
  481. logger.info(notice.rstrip())
  482. cursor.connection.notices[:] = []
  483. class PGCompiler_psycopg2(PGCompiler):
  484. pass
  485. class PGIdentifierPreparer_psycopg2(PGIdentifierPreparer):
  486. pass
  487. EXECUTEMANY_PLAIN = util.symbol("executemany_plain", canonical=0)
  488. EXECUTEMANY_BATCH = util.symbol("executemany_batch", canonical=1)
  489. EXECUTEMANY_VALUES = util.symbol("executemany_values", canonical=2)
  490. EXECUTEMANY_VALUES_PLUS_BATCH = util.symbol(
  491. "executemany_values_plus_batch",
  492. canonical=EXECUTEMANY_BATCH | EXECUTEMANY_VALUES,
  493. )
  494. class PGDialect_psycopg2(PGDialect):
  495. driver = "psycopg2"
  496. supports_statement_cache = True
  497. if util.py2k:
  498. # turn off supports_unicode_statements for Python 2. psycopg2 supports
  499. # unicode statements in Py2K. But! it does not support unicode *bound
  500. # parameter names* because it uses the Python "%" operator to
  501. # interpolate these into the string, and this fails. So for Py2K, we
  502. # have to use full-on encoding for statements and parameters before
  503. # passing to cursor.execute().
  504. supports_unicode_statements = False
  505. supports_server_side_cursors = True
  506. default_paramstyle = "pyformat"
  507. # set to true based on psycopg2 version
  508. supports_sane_multi_rowcount = False
  509. execution_ctx_cls = PGExecutionContext_psycopg2
  510. statement_compiler = PGCompiler_psycopg2
  511. preparer = PGIdentifierPreparer_psycopg2
  512. psycopg2_version = (0, 0)
  513. _has_native_hstore = True
  514. engine_config_types = PGDialect.engine_config_types.union(
  515. {"use_native_unicode": util.asbool}
  516. )
  517. colspecs = util.update_copy(
  518. PGDialect.colspecs,
  519. {
  520. sqltypes.Numeric: _PGNumeric,
  521. ENUM: _PGEnum, # needs force_unicode
  522. sqltypes.Enum: _PGEnum, # needs force_unicode
  523. HSTORE: _PGHStore,
  524. JSON: _PGJSON,
  525. sqltypes.JSON: _PGJSON,
  526. JSONB: _PGJSONB,
  527. UUID: _PGUUID,
  528. sqltypes.ARRAY: _PGARRAY,
  529. },
  530. )
  531. def __init__(
  532. self,
  533. use_native_unicode=True,
  534. client_encoding=None,
  535. use_native_hstore=True,
  536. use_native_uuid=True,
  537. executemany_mode="values_only",
  538. executemany_batch_page_size=100,
  539. executemany_values_page_size=1000,
  540. **kwargs
  541. ):
  542. PGDialect.__init__(self, **kwargs)
  543. self.use_native_unicode = use_native_unicode
  544. if not use_native_unicode and not util.py2k:
  545. raise exc.ArgumentError(
  546. "psycopg2 native_unicode mode is required under Python 3"
  547. )
  548. if not use_native_hstore:
  549. self._has_native_hstore = False
  550. self.use_native_hstore = use_native_hstore
  551. self.use_native_uuid = use_native_uuid
  552. self.supports_unicode_binds = use_native_unicode
  553. self.client_encoding = client_encoding
  554. # Parse executemany_mode argument, allowing it to be only one of the
  555. # symbol names
  556. self.executemany_mode = util.symbol.parse_user_argument(
  557. executemany_mode,
  558. {
  559. EXECUTEMANY_PLAIN: [None],
  560. EXECUTEMANY_BATCH: ["batch"],
  561. EXECUTEMANY_VALUES: ["values_only"],
  562. EXECUTEMANY_VALUES_PLUS_BATCH: ["values_plus_batch", "values"],
  563. },
  564. "executemany_mode",
  565. )
  566. if self.executemany_mode & EXECUTEMANY_VALUES:
  567. self.insert_executemany_returning = True
  568. self.executemany_batch_page_size = executemany_batch_page_size
  569. self.executemany_values_page_size = executemany_values_page_size
  570. if self.dbapi and hasattr(self.dbapi, "__version__"):
  571. m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", self.dbapi.__version__)
  572. if m:
  573. self.psycopg2_version = tuple(
  574. int(x) for x in m.group(1, 2, 3) if x is not None
  575. )
  576. if self.psycopg2_version < (2, 7):
  577. raise ImportError(
  578. "psycopg2 version 2.7 or higher is required."
  579. )
  580. def initialize(self, connection):
  581. super(PGDialect_psycopg2, self).initialize(connection)
  582. self._has_native_hstore = (
  583. self.use_native_hstore
  584. and self._hstore_oids(connection.connection) is not None
  585. )
  586. # PGDialect.initialize() checks server version for <= 8.2 and sets
  587. # this flag to False if so
  588. if not self.full_returning:
  589. self.insert_executemany_returning = False
  590. self.executemany_mode = EXECUTEMANY_PLAIN
  591. self.supports_sane_multi_rowcount = not (
  592. self.executemany_mode & EXECUTEMANY_BATCH
  593. )
  594. @classmethod
  595. def dbapi(cls):
  596. import psycopg2
  597. return psycopg2
  598. @classmethod
  599. def _psycopg2_extensions(cls):
  600. from psycopg2 import extensions
  601. return extensions
  602. @classmethod
  603. def _psycopg2_extras(cls):
  604. from psycopg2 import extras
  605. return extras
  606. @util.memoized_property
  607. def _isolation_lookup(self):
  608. extensions = self._psycopg2_extensions()
  609. return {
  610. "AUTOCOMMIT": extensions.ISOLATION_LEVEL_AUTOCOMMIT,
  611. "READ COMMITTED": extensions.ISOLATION_LEVEL_READ_COMMITTED,
  612. "READ UNCOMMITTED": extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
  613. "REPEATABLE READ": extensions.ISOLATION_LEVEL_REPEATABLE_READ,
  614. "SERIALIZABLE": extensions.ISOLATION_LEVEL_SERIALIZABLE,
  615. }
  616. def set_isolation_level(self, connection, level):
  617. try:
  618. level = self._isolation_lookup[level.replace("_", " ")]
  619. except KeyError as err:
  620. util.raise_(
  621. exc.ArgumentError(
  622. "Invalid value '%s' for isolation_level. "
  623. "Valid isolation levels for %s are %s"
  624. % (level, self.name, ", ".join(self._isolation_lookup))
  625. ),
  626. replace_context=err,
  627. )
  628. connection.set_isolation_level(level)
  629. def set_readonly(self, connection, value):
  630. connection.readonly = value
  631. def get_readonly(self, connection):
  632. return connection.readonly
  633. def set_deferrable(self, connection, value):
  634. connection.deferrable = value
  635. def get_deferrable(self, connection):
  636. return connection.deferrable
  637. def do_ping(self, dbapi_connection):
  638. cursor = None
  639. try:
  640. dbapi_connection.autocommit = True
  641. cursor = dbapi_connection.cursor()
  642. try:
  643. cursor.execute(self._dialect_specific_select_one)
  644. finally:
  645. cursor.close()
  646. if not dbapi_connection.closed:
  647. dbapi_connection.autocommit = False
  648. except self.dbapi.Error as err:
  649. if self.is_disconnect(err, dbapi_connection, cursor):
  650. return False
  651. else:
  652. raise
  653. else:
  654. return True
  655. def on_connect(self):
  656. extras = self._psycopg2_extras()
  657. extensions = self._psycopg2_extensions()
  658. fns = []
  659. if self.client_encoding is not None:
  660. def on_connect(conn):
  661. conn.set_client_encoding(self.client_encoding)
  662. fns.append(on_connect)
  663. if self.isolation_level is not None:
  664. def on_connect(conn):
  665. self.set_isolation_level(conn, self.isolation_level)
  666. fns.append(on_connect)
  667. if self.dbapi and self.use_native_uuid:
  668. def on_connect(conn):
  669. extras.register_uuid(None, conn)
  670. fns.append(on_connect)
  671. if util.py2k and self.dbapi and self.use_native_unicode:
  672. def on_connect(conn):
  673. extensions.register_type(extensions.UNICODE, conn)
  674. extensions.register_type(extensions.UNICODEARRAY, conn)
  675. fns.append(on_connect)
  676. if self.dbapi and self.use_native_hstore:
  677. def on_connect(conn):
  678. hstore_oids = self._hstore_oids(conn)
  679. if hstore_oids is not None:
  680. oid, array_oid = hstore_oids
  681. kw = {"oid": oid}
  682. if util.py2k:
  683. kw["unicode"] = True
  684. kw["array_oid"] = array_oid
  685. extras.register_hstore(conn, **kw)
  686. fns.append(on_connect)
  687. if self.dbapi and self._json_deserializer:
  688. def on_connect(conn):
  689. extras.register_default_json(
  690. conn, loads=self._json_deserializer
  691. )
  692. extras.register_default_jsonb(
  693. conn, loads=self._json_deserializer
  694. )
  695. fns.append(on_connect)
  696. if fns:
  697. def on_connect(conn):
  698. for fn in fns:
  699. fn(conn)
  700. return on_connect
  701. else:
  702. return None
  703. def do_executemany(self, cursor, statement, parameters, context=None):
  704. if (
  705. self.executemany_mode & EXECUTEMANY_VALUES
  706. and context
  707. and context.isinsert
  708. and context.compiled.insert_single_values_expr
  709. ):
  710. executemany_values = (
  711. "(%s)" % context.compiled.insert_single_values_expr
  712. )
  713. if not self.supports_unicode_statements:
  714. executemany_values = executemany_values.encode(self.encoding)
  715. # guard for statement that was altered via event hook or similar
  716. if executemany_values not in statement:
  717. executemany_values = None
  718. else:
  719. executemany_values = None
  720. if executemany_values:
  721. statement = statement.replace(executemany_values, "%s")
  722. if self.executemany_values_page_size:
  723. kwargs = {"page_size": self.executemany_values_page_size}
  724. else:
  725. kwargs = {}
  726. xtras = self._psycopg2_extras()
  727. context._psycopg2_fetched_rows = xtras.execute_values(
  728. cursor,
  729. statement,
  730. parameters,
  731. template=executemany_values,
  732. fetch=bool(context.compiled.returning),
  733. **kwargs
  734. )
  735. elif self.executemany_mode & EXECUTEMANY_BATCH:
  736. if self.executemany_batch_page_size:
  737. kwargs = {"page_size": self.executemany_batch_page_size}
  738. else:
  739. kwargs = {}
  740. self._psycopg2_extras().execute_batch(
  741. cursor, statement, parameters, **kwargs
  742. )
  743. else:
  744. cursor.executemany(statement, parameters)
  745. @util.memoized_instancemethod
  746. def _hstore_oids(self, conn):
  747. extras = self._psycopg2_extras()
  748. if hasattr(conn, "dbapi_connection"):
  749. conn = conn.dbapi_connection
  750. oids = extras.HstoreAdapter.get_oids(conn)
  751. if oids is not None and oids[0]:
  752. return oids[0:2]
  753. else:
  754. return None
  755. def create_connect_args(self, url):
  756. opts = url.translate_connect_args(username="user")
  757. is_multihost = False
  758. if "host" in url.query:
  759. is_multihost = isinstance(url.query["host"], (list, tuple))
  760. if opts:
  761. if "port" in opts:
  762. opts["port"] = int(opts["port"])
  763. opts.update(url.query)
  764. if is_multihost:
  765. opts["host"] = ",".join(url.query["host"])
  766. # send individual dbname, user, password, host, port
  767. # parameters to psycopg2.connect()
  768. return ([], opts)
  769. elif url.query:
  770. # any other connection arguments, pass directly
  771. opts.update(url.query)
  772. if is_multihost:
  773. opts["host"] = ",".join(url.query["host"])
  774. return ([], opts)
  775. else:
  776. # no connection arguments whatsoever; psycopg2.connect()
  777. # requires that "dsn" be present as a blank string.
  778. return ([""], opts)
  779. def is_disconnect(self, e, connection, cursor):
  780. if isinstance(e, self.dbapi.Error):
  781. # check the "closed" flag. this might not be
  782. # present on old psycopg2 versions. Also,
  783. # this flag doesn't actually help in a lot of disconnect
  784. # situations, so don't rely on it.
  785. if getattr(connection, "closed", False):
  786. return True
  787. # checks based on strings. in the case that .closed
  788. # didn't cut it, fall back onto these.
  789. str_e = str(e).partition("\n")[0]
  790. for msg in [
  791. # these error messages from libpq: interfaces/libpq/fe-misc.c
  792. # and interfaces/libpq/fe-secure.c.
  793. "terminating connection",
  794. "closed the connection",
  795. "connection not open",
  796. "could not receive data from server",
  797. "could not send data to server",
  798. # psycopg2 client errors, psycopg2/connection.h,
  799. # psycopg2/cursor.h
  800. "connection already closed",
  801. "cursor already closed",
  802. # not sure where this path is originally from, it may
  803. # be obsolete. It really says "losed", not "closed".
  804. "losed the connection unexpectedly",
  805. # these can occur in newer SSL
  806. "connection has been closed unexpectedly",
  807. "SSL error: decryption failed or bad record mac",
  808. "SSL SYSCALL error: Bad file descriptor",
  809. "SSL SYSCALL error: EOF detected",
  810. "SSL SYSCALL error: Operation timed out",
  811. "SSL SYSCALL error: Bad address",
  812. ]:
  813. idx = str_e.find(msg)
  814. if idx >= 0 and '"' not in str_e[:idx]:
  815. return True
  816. return False
  817. dialect = PGDialect_psycopg2