pg8000.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594
  1. # postgresql/pg8000.py
  2. # Copyright (C) 2005-2022 the SQLAlchemy authors and contributors <see AUTHORS
  3. # file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. r"""
  8. .. dialect:: postgresql+pg8000
  9. :name: pg8000
  10. :dbapi: pg8000
  11. :connectstring: postgresql+pg8000://user:password@host:port/dbname[?key=value&key=value...]
  12. :url: https://pypi.org/project/pg8000/
  13. .. versionchanged:: 1.4 The pg8000 dialect has been updated for version
  14. 1.16.6 and higher, and is again part of SQLAlchemy's continuous integration
  15. with full feature support.
  16. .. _pg8000_unicode:
  17. Unicode
  18. -------
  19. pg8000 will encode / decode string values between it and the server using the
  20. PostgreSQL ``client_encoding`` parameter; by default this is the value in
  21. the ``postgresql.conf`` file, which often defaults to ``SQL_ASCII``.
  22. Typically, this can be changed to ``utf-8``, as a more useful default::
  23. #client_encoding = sql_ascii # actually, defaults to database
  24. # encoding
  25. client_encoding = utf8
  26. The ``client_encoding`` can be overridden for a session by executing the SQL:
  27. SET CLIENT_ENCODING TO 'utf8';
  28. SQLAlchemy will execute this SQL on all new connections based on the value
  29. passed to :func:`_sa.create_engine` using the ``client_encoding`` parameter::
  30. engine = create_engine(
  31. "postgresql+pg8000://user:pass@host/dbname", client_encoding='utf8')
  32. .. _pg8000_ssl:
  33. SSL Connections
  34. ---------------
  35. pg8000 accepts a Python ``SSLContext`` object which may be specified using the
  36. :paramref:`_sa.create_engine.connect_args` dictionary::
  37. import ssl
  38. ssl_context = ssl.create_default_context()
  39. engine = sa.create_engine(
  40. "postgresql+pg8000://scott:tiger@192.168.0.199/test",
  41. connect_args={"ssl_context": ssl_context},
  42. )
  43. If the server uses an automatically-generated certificate that is self-signed
  44. or does not match the host name (as seen from the client), it may also be
  45. necessary to disable hostname checking::
  46. import ssl
  47. ssl_context = ssl.create_default_context()
  48. ssl_context.check_hostname = False
  49. ssl_context.verify_mode = ssl.CERT_NONE
  50. engine = sa.create_engine(
  51. "postgresql+pg8000://scott:tiger@192.168.0.199/test",
  52. connect_args={"ssl_context": ssl_context},
  53. )
  54. .. _pg8000_isolation_level:
  55. pg8000 Transaction Isolation Level
  56. -------------------------------------
  57. The pg8000 dialect offers the same isolation level settings as that
  58. of the :ref:`psycopg2 <psycopg2_isolation_level>` dialect:
  59. * ``READ COMMITTED``
  60. * ``READ UNCOMMITTED``
  61. * ``REPEATABLE READ``
  62. * ``SERIALIZABLE``
  63. * ``AUTOCOMMIT``
  64. .. seealso::
  65. :ref:`postgresql_isolation_level`
  66. :ref:`psycopg2_isolation_level`
  67. """ # noqa
  68. import decimal
  69. import re
  70. from uuid import UUID as _python_UUID
  71. from .array import ARRAY as PGARRAY
  72. from .base import _ColonCast
  73. from .base import _DECIMAL_TYPES
  74. from .base import _FLOAT_TYPES
  75. from .base import _INT_TYPES
  76. from .base import ENUM
  77. from .base import INTERVAL
  78. from .base import PGCompiler
  79. from .base import PGDialect
  80. from .base import PGExecutionContext
  81. from .base import PGIdentifierPreparer
  82. from .base import UUID
  83. from .json import JSON
  84. from .json import JSONB
  85. from .json import JSONPathType
  86. from ... import exc
  87. from ... import processors
  88. from ... import types as sqltypes
  89. from ... import util
  90. from ...sql.elements import quoted_name
  91. class _PGNumeric(sqltypes.Numeric):
  92. def result_processor(self, dialect, coltype):
  93. if self.asdecimal:
  94. if coltype in _FLOAT_TYPES:
  95. return processors.to_decimal_processor_factory(
  96. decimal.Decimal, self._effective_decimal_return_scale
  97. )
  98. elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
  99. # pg8000 returns Decimal natively for 1700
  100. return None
  101. else:
  102. raise exc.InvalidRequestError(
  103. "Unknown PG numeric type: %d" % coltype
  104. )
  105. else:
  106. if coltype in _FLOAT_TYPES:
  107. # pg8000 returns float natively for 701
  108. return None
  109. elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
  110. return processors.to_float
  111. else:
  112. raise exc.InvalidRequestError(
  113. "Unknown PG numeric type: %d" % coltype
  114. )
  115. class _PGNumericNoBind(_PGNumeric):
  116. def bind_processor(self, dialect):
  117. return None
  118. class _PGJSON(JSON):
  119. def result_processor(self, dialect, coltype):
  120. return None
  121. def get_dbapi_type(self, dbapi):
  122. return dbapi.JSON
  123. class _PGJSONB(JSONB):
  124. def result_processor(self, dialect, coltype):
  125. return None
  126. def get_dbapi_type(self, dbapi):
  127. return dbapi.JSONB
  128. class _PGJSONIndexType(sqltypes.JSON.JSONIndexType):
  129. def get_dbapi_type(self, dbapi):
  130. raise NotImplementedError("should not be here")
  131. class _PGJSONIntIndexType(sqltypes.JSON.JSONIntIndexType):
  132. def get_dbapi_type(self, dbapi):
  133. return dbapi.INTEGER
  134. class _PGJSONStrIndexType(sqltypes.JSON.JSONStrIndexType):
  135. def get_dbapi_type(self, dbapi):
  136. return dbapi.STRING
  137. class _PGJSONPathType(JSONPathType):
  138. def get_dbapi_type(self, dbapi):
  139. return 1009
  140. class _PGUUID(UUID):
  141. def bind_processor(self, dialect):
  142. if not self.as_uuid:
  143. def process(value):
  144. if value is not None:
  145. value = _python_UUID(value)
  146. return value
  147. return process
  148. def result_processor(self, dialect, coltype):
  149. if not self.as_uuid:
  150. def process(value):
  151. if value is not None:
  152. value = str(value)
  153. return value
  154. return process
  155. class _PGEnum(ENUM):
  156. def get_dbapi_type(self, dbapi):
  157. return dbapi.UNKNOWN
  158. class _PGInterval(INTERVAL):
  159. def get_dbapi_type(self, dbapi):
  160. return dbapi.INTERVAL
  161. @classmethod
  162. def adapt_emulated_to_native(cls, interval, **kw):
  163. return _PGInterval(precision=interval.second_precision)
  164. class _PGTimeStamp(sqltypes.DateTime):
  165. def get_dbapi_type(self, dbapi):
  166. if self.timezone:
  167. # TIMESTAMPTZOID
  168. return 1184
  169. else:
  170. # TIMESTAMPOID
  171. return 1114
  172. class _PGTime(sqltypes.Time):
  173. def get_dbapi_type(self, dbapi):
  174. return dbapi.TIME
  175. class _PGInteger(sqltypes.Integer):
  176. def get_dbapi_type(self, dbapi):
  177. return dbapi.INTEGER
  178. class _PGSmallInteger(sqltypes.SmallInteger):
  179. def get_dbapi_type(self, dbapi):
  180. return dbapi.INTEGER
  181. class _PGNullType(sqltypes.NullType):
  182. def get_dbapi_type(self, dbapi):
  183. return dbapi.NULLTYPE
  184. class _PGBigInteger(sqltypes.BigInteger):
  185. def get_dbapi_type(self, dbapi):
  186. return dbapi.BIGINTEGER
  187. class _PGBoolean(sqltypes.Boolean):
  188. def get_dbapi_type(self, dbapi):
  189. return dbapi.BOOLEAN
  190. class _PGARRAY(PGARRAY):
  191. def bind_expression(self, bindvalue):
  192. return _ColonCast(bindvalue, self)
  193. _server_side_id = util.counter()
  194. class PGExecutionContext_pg8000(PGExecutionContext):
  195. def create_server_side_cursor(self):
  196. ident = "c_%s_%s" % (hex(id(self))[2:], hex(_server_side_id())[2:])
  197. return ServerSideCursor(self._dbapi_connection.cursor(), ident)
  198. def pre_exec(self):
  199. if not self.compiled:
  200. return
  201. class ServerSideCursor:
  202. server_side = True
  203. def __init__(self, cursor, ident):
  204. self.ident = ident
  205. self.cursor = cursor
  206. @property
  207. def connection(self):
  208. return self.cursor.connection
  209. @property
  210. def rowcount(self):
  211. return self.cursor.rowcount
  212. @property
  213. def description(self):
  214. return self.cursor.description
  215. def execute(self, operation, args=(), stream=None):
  216. op = "DECLARE " + self.ident + " NO SCROLL CURSOR FOR " + operation
  217. self.cursor.execute(op, args, stream=stream)
  218. return self
  219. def executemany(self, operation, param_sets):
  220. self.cursor.executemany(operation, param_sets)
  221. return self
  222. def fetchone(self):
  223. self.cursor.execute("FETCH FORWARD 1 FROM " + self.ident)
  224. return self.cursor.fetchone()
  225. def fetchmany(self, num=None):
  226. if num is None:
  227. return self.fetchall()
  228. else:
  229. self.cursor.execute(
  230. "FETCH FORWARD " + str(int(num)) + " FROM " + self.ident
  231. )
  232. return self.cursor.fetchall()
  233. def fetchall(self):
  234. self.cursor.execute("FETCH FORWARD ALL FROM " + self.ident)
  235. return self.cursor.fetchall()
  236. def close(self):
  237. self.cursor.execute("CLOSE " + self.ident)
  238. self.cursor.close()
  239. def setinputsizes(self, *sizes):
  240. self.cursor.setinputsizes(*sizes)
  241. def setoutputsize(self, size, column=None):
  242. pass
  243. class PGCompiler_pg8000(PGCompiler):
  244. def visit_mod_binary(self, binary, operator, **kw):
  245. return (
  246. self.process(binary.left, **kw)
  247. + " %% "
  248. + self.process(binary.right, **kw)
  249. )
  250. class PGIdentifierPreparer_pg8000(PGIdentifierPreparer):
  251. def __init__(self, *args, **kwargs):
  252. PGIdentifierPreparer.__init__(self, *args, **kwargs)
  253. self._double_percents = False
  254. class PGDialect_pg8000(PGDialect):
  255. driver = "pg8000"
  256. supports_statement_cache = True
  257. supports_unicode_statements = True
  258. supports_unicode_binds = True
  259. default_paramstyle = "format"
  260. supports_sane_multi_rowcount = True
  261. execution_ctx_cls = PGExecutionContext_pg8000
  262. statement_compiler = PGCompiler_pg8000
  263. preparer = PGIdentifierPreparer_pg8000
  264. supports_server_side_cursors = True
  265. use_setinputsizes = True
  266. # reversed as of pg8000 1.16.6. 1.16.5 and lower
  267. # are no longer compatible
  268. description_encoding = None
  269. # description_encoding = "use_encoding"
  270. colspecs = util.update_copy(
  271. PGDialect.colspecs,
  272. {
  273. sqltypes.Numeric: _PGNumericNoBind,
  274. sqltypes.Float: _PGNumeric,
  275. sqltypes.JSON: _PGJSON,
  276. sqltypes.Boolean: _PGBoolean,
  277. sqltypes.NullType: _PGNullType,
  278. JSONB: _PGJSONB,
  279. sqltypes.JSON.JSONPathType: _PGJSONPathType,
  280. sqltypes.JSON.JSONIndexType: _PGJSONIndexType,
  281. sqltypes.JSON.JSONIntIndexType: _PGJSONIntIndexType,
  282. sqltypes.JSON.JSONStrIndexType: _PGJSONStrIndexType,
  283. UUID: _PGUUID,
  284. sqltypes.Interval: _PGInterval,
  285. INTERVAL: _PGInterval,
  286. sqltypes.DateTime: _PGTimeStamp,
  287. sqltypes.Time: _PGTime,
  288. sqltypes.Integer: _PGInteger,
  289. sqltypes.SmallInteger: _PGSmallInteger,
  290. sqltypes.BigInteger: _PGBigInteger,
  291. sqltypes.Enum: _PGEnum,
  292. sqltypes.ARRAY: _PGARRAY,
  293. },
  294. )
  295. def __init__(self, client_encoding=None, **kwargs):
  296. PGDialect.__init__(self, **kwargs)
  297. self.client_encoding = client_encoding
  298. if self._dbapi_version < (1, 16, 6):
  299. raise NotImplementedError("pg8000 1.16.6 or greater is required")
  300. @util.memoized_property
  301. def _dbapi_version(self):
  302. if self.dbapi and hasattr(self.dbapi, "__version__"):
  303. return tuple(
  304. [
  305. int(x)
  306. for x in re.findall(
  307. r"(\d+)(?:[-\.]?|$)", self.dbapi.__version__
  308. )
  309. ]
  310. )
  311. else:
  312. return (99, 99, 99)
  313. @classmethod
  314. def dbapi(cls):
  315. return __import__("pg8000")
  316. def create_connect_args(self, url):
  317. opts = url.translate_connect_args(username="user")
  318. if "port" in opts:
  319. opts["port"] = int(opts["port"])
  320. opts.update(url.query)
  321. return ([], opts)
  322. def is_disconnect(self, e, connection, cursor):
  323. if isinstance(e, self.dbapi.InterfaceError) and "network error" in str(
  324. e
  325. ):
  326. # new as of pg8000 1.19.0 for broken connections
  327. return True
  328. # connection was closed normally
  329. return "connection is closed" in str(e)
  330. def set_isolation_level(self, connection, level):
  331. level = level.replace("_", " ")
  332. # adjust for ConnectionFairy possibly being present
  333. if hasattr(connection, "dbapi_connection"):
  334. connection = connection.dbapi_connection
  335. if level == "AUTOCOMMIT":
  336. connection.autocommit = True
  337. elif level in self._isolation_lookup:
  338. connection.autocommit = False
  339. cursor = connection.cursor()
  340. cursor.execute(
  341. "SET SESSION CHARACTERISTICS AS TRANSACTION "
  342. "ISOLATION LEVEL %s" % level
  343. )
  344. cursor.execute("COMMIT")
  345. cursor.close()
  346. else:
  347. raise exc.ArgumentError(
  348. "Invalid value '%s' for isolation_level. "
  349. "Valid isolation levels for %s are %s or AUTOCOMMIT"
  350. % (level, self.name, ", ".join(self._isolation_lookup))
  351. )
  352. def set_readonly(self, connection, value):
  353. cursor = connection.cursor()
  354. try:
  355. cursor.execute(
  356. "SET SESSION CHARACTERISTICS AS TRANSACTION %s"
  357. % ("READ ONLY" if value else "READ WRITE")
  358. )
  359. cursor.execute("COMMIT")
  360. finally:
  361. cursor.close()
  362. def get_readonly(self, connection):
  363. cursor = connection.cursor()
  364. try:
  365. cursor.execute("show transaction_read_only")
  366. val = cursor.fetchone()[0]
  367. finally:
  368. cursor.close()
  369. return val == "on"
  370. def set_deferrable(self, connection, value):
  371. cursor = connection.cursor()
  372. try:
  373. cursor.execute(
  374. "SET SESSION CHARACTERISTICS AS TRANSACTION %s"
  375. % ("DEFERRABLE" if value else "NOT DEFERRABLE")
  376. )
  377. cursor.execute("COMMIT")
  378. finally:
  379. cursor.close()
  380. def get_deferrable(self, connection):
  381. cursor = connection.cursor()
  382. try:
  383. cursor.execute("show transaction_deferrable")
  384. val = cursor.fetchone()[0]
  385. finally:
  386. cursor.close()
  387. return val == "on"
  388. def set_client_encoding(self, connection, client_encoding):
  389. # adjust for ConnectionFairy possibly being present
  390. if hasattr(connection, "dbapi_connection"):
  391. connection = connection.dbapi_connection
  392. cursor = connection.cursor()
  393. cursor.execute("SET CLIENT_ENCODING TO '" + client_encoding + "'")
  394. cursor.execute("COMMIT")
  395. cursor.close()
  396. def do_set_input_sizes(self, cursor, list_of_tuples, context):
  397. if self.positional:
  398. cursor.setinputsizes(
  399. *[dbtype for key, dbtype, sqltype in list_of_tuples]
  400. )
  401. else:
  402. cursor.setinputsizes(
  403. **{
  404. key: dbtype
  405. for key, dbtype, sqltype in list_of_tuples
  406. if dbtype
  407. }
  408. )
  409. def do_begin_twophase(self, connection, xid):
  410. connection.connection.tpc_begin((0, xid, ""))
  411. def do_prepare_twophase(self, connection, xid):
  412. connection.connection.tpc_prepare()
  413. def do_rollback_twophase(
  414. self, connection, xid, is_prepared=True, recover=False
  415. ):
  416. connection.connection.tpc_rollback((0, xid, ""))
  417. def do_commit_twophase(
  418. self, connection, xid, is_prepared=True, recover=False
  419. ):
  420. connection.connection.tpc_commit((0, xid, ""))
  421. def do_recover_twophase(self, connection):
  422. return [row[1] for row in connection.connection.tpc_recover()]
  423. def on_connect(self):
  424. fns = []
  425. def on_connect(conn):
  426. conn.py_types[quoted_name] = conn.py_types[util.text_type]
  427. fns.append(on_connect)
  428. if self.client_encoding is not None:
  429. def on_connect(conn):
  430. self.set_client_encoding(conn, self.client_encoding)
  431. fns.append(on_connect)
  432. if self.isolation_level is not None:
  433. def on_connect(conn):
  434. self.set_isolation_level(conn, self.isolation_level)
  435. fns.append(on_connect)
  436. if self._json_deserializer:
  437. def on_connect(conn):
  438. # json
  439. conn.register_in_adapter(114, self._json_deserializer)
  440. # jsonb
  441. conn.register_in_adapter(3802, self._json_deserializer)
  442. fns.append(on_connect)
  443. if len(fns) > 0:
  444. def on_connect(conn):
  445. for fn in fns:
  446. fn(conn)
  447. return on_connect
  448. else:
  449. return None
  450. dialect = PGDialect_pg8000