123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278 |
- # postgresql/pygresql.py
- # Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
- # <see AUTHORS file>
- #
- # This module is part of SQLAlchemy and is released under
- # the MIT License: https://www.opensource.org/licenses/mit-license.php
- """
- .. dialect:: postgresql+pygresql
- :name: pygresql
- :dbapi: pgdb
- :connectstring: postgresql+pygresql://user:password@host:port/dbname[?key=value&key=value...]
- :url: https://www.pygresql.org/
- .. note::
- The pygresql dialect is **not tested as part of SQLAlchemy's continuous
- integration** and may have unresolved issues. The recommended PostgreSQL
- dialect is psycopg2.
- .. deprecated:: 1.4 The pygresql DBAPI is deprecated and will be removed
- in a future version. Please use one of the supported DBAPIs to
- connect to PostgreSQL.
- """ # noqa
- import decimal
- import re
- from .base import _DECIMAL_TYPES
- from .base import _FLOAT_TYPES
- from .base import _INT_TYPES
- from .base import PGCompiler
- from .base import PGDialect
- from .base import PGIdentifierPreparer
- from .base import UUID
- from .hstore import HSTORE
- from .json import JSON
- from .json import JSONB
- from ... import exc
- from ... import processors
- from ... import util
- from ...sql.elements import Null
- from ...types import JSON as Json
- from ...types import Numeric
- class _PGNumeric(Numeric):
- def bind_processor(self, dialect):
- return None
- def result_processor(self, dialect, coltype):
- if not isinstance(coltype, int):
- coltype = coltype.oid
- if self.asdecimal:
- if coltype in _FLOAT_TYPES:
- return processors.to_decimal_processor_factory(
- decimal.Decimal, self._effective_decimal_return_scale
- )
- elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
- # PyGreSQL returns Decimal natively for 1700 (numeric)
- return None
- else:
- raise exc.InvalidRequestError(
- "Unknown PG numeric type: %d" % coltype
- )
- else:
- if coltype in _FLOAT_TYPES:
- # PyGreSQL returns float natively for 701 (float8)
- return None
- elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
- return processors.to_float
- else:
- raise exc.InvalidRequestError(
- "Unknown PG numeric type: %d" % coltype
- )
- class _PGHStore(HSTORE):
- def bind_processor(self, dialect):
- if not dialect.has_native_hstore:
- return super(_PGHStore, self).bind_processor(dialect)
- hstore = dialect.dbapi.Hstore
- def process(value):
- if isinstance(value, dict):
- return hstore(value)
- return value
- return process
- def result_processor(self, dialect, coltype):
- if not dialect.has_native_hstore:
- return super(_PGHStore, self).result_processor(dialect, coltype)
- class _PGJSON(JSON):
- def bind_processor(self, dialect):
- if not dialect.has_native_json:
- return super(_PGJSON, self).bind_processor(dialect)
- json = dialect.dbapi.Json
- def process(value):
- if value is self.NULL:
- value = None
- elif isinstance(value, Null) or (
- value is None and self.none_as_null
- ):
- return None
- if value is None or isinstance(value, (dict, list)):
- return json(value)
- return value
- return process
- def result_processor(self, dialect, coltype):
- if not dialect.has_native_json:
- return super(_PGJSON, self).result_processor(dialect, coltype)
- class _PGJSONB(JSONB):
- def bind_processor(self, dialect):
- if not dialect.has_native_json:
- return super(_PGJSONB, self).bind_processor(dialect)
- json = dialect.dbapi.Json
- def process(value):
- if value is self.NULL:
- value = None
- elif isinstance(value, Null) or (
- value is None and self.none_as_null
- ):
- return None
- if value is None or isinstance(value, (dict, list)):
- return json(value)
- return value
- return process
- def result_processor(self, dialect, coltype):
- if not dialect.has_native_json:
- return super(_PGJSONB, self).result_processor(dialect, coltype)
- class _PGUUID(UUID):
- def bind_processor(self, dialect):
- if not dialect.has_native_uuid:
- return super(_PGUUID, self).bind_processor(dialect)
- uuid = dialect.dbapi.Uuid
- def process(value):
- if value is None:
- return None
- if isinstance(value, (str, bytes)):
- if len(value) == 16:
- return uuid(bytes=value)
- return uuid(value)
- if isinstance(value, int):
- return uuid(int=value)
- return value
- return process
- def result_processor(self, dialect, coltype):
- if not dialect.has_native_uuid:
- return super(_PGUUID, self).result_processor(dialect, coltype)
- if not self.as_uuid:
- def process(value):
- if value is not None:
- return str(value)
- return process
- class _PGCompiler(PGCompiler):
- def visit_mod_binary(self, binary, operator, **kw):
- return (
- self.process(binary.left, **kw)
- + " %% "
- + self.process(binary.right, **kw)
- )
- def post_process_text(self, text):
- return text.replace("%", "%%")
- class _PGIdentifierPreparer(PGIdentifierPreparer):
- def _escape_identifier(self, value):
- value = value.replace(self.escape_quote, self.escape_to_quote)
- return value.replace("%", "%%")
- class PGDialect_pygresql(PGDialect):
- driver = "pygresql"
- supports_statement_cache = True
- statement_compiler = _PGCompiler
- preparer = _PGIdentifierPreparer
- @classmethod
- def dbapi(cls):
- import pgdb
- util.warn_deprecated(
- "The pygresql DBAPI is deprecated and will be removed "
- "in a future version. Please use one of the supported DBAPIs to "
- "connect to PostgreSQL.",
- version="1.4",
- )
- return pgdb
- colspecs = util.update_copy(
- PGDialect.colspecs,
- {
- Numeric: _PGNumeric,
- HSTORE: _PGHStore,
- Json: _PGJSON,
- JSON: _PGJSON,
- JSONB: _PGJSONB,
- UUID: _PGUUID,
- },
- )
- def __init__(self, **kwargs):
- super(PGDialect_pygresql, self).__init__(**kwargs)
- try:
- version = self.dbapi.version
- m = re.match(r"(\d+)\.(\d+)", version)
- version = (int(m.group(1)), int(m.group(2)))
- except (AttributeError, ValueError, TypeError):
- version = (0, 0)
- self.dbapi_version = version
- if version < (5, 0):
- has_native_hstore = has_native_json = has_native_uuid = False
- if version != (0, 0):
- util.warn(
- "PyGreSQL is only fully supported by SQLAlchemy"
- " since version 5.0."
- )
- else:
- self.supports_unicode_statements = True
- self.supports_unicode_binds = True
- has_native_hstore = has_native_json = has_native_uuid = True
- self.has_native_hstore = has_native_hstore
- self.has_native_json = has_native_json
- self.has_native_uuid = has_native_uuid
- def create_connect_args(self, url):
- opts = url.translate_connect_args(username="user")
- if "port" in opts:
- opts["host"] = "%s:%s" % (
- opts.get("host", "").rsplit(":", 1)[0],
- opts.pop("port"),
- )
- opts.update(url.query)
- return [], opts
- def is_disconnect(self, e, connection, cursor):
- if isinstance(e, self.dbapi.Error):
- if not connection:
- return False
- try:
- connection = connection.connection
- except AttributeError:
- pass
- else:
- if not connection:
- return False
- try:
- return connection.closed
- except AttributeError: # PyGreSQL < 5.0
- return connection._cnx is None
- return False
- dialect = PGDialect_pygresql
|