123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- # firebird/kinterbasdb.py
- # Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
- # <see AUTHORS file>
- #
- # This module is part of SQLAlchemy and is released under
- # the MIT License: https://www.opensource.org/licenses/mit-license.php
- """
- .. dialect:: firebird+kinterbasdb
- :name: kinterbasdb
- :dbapi: kinterbasdb
- :connectstring: firebird+kinterbasdb://user:password@host:port/path/to/db[?key=value&key=value...]
- :url: https://firebirdsql.org/index.php?op=devel&sub=python
- Arguments
- ----------
- The Kinterbasdb backend accepts the ``enable_rowcount`` and ``retaining``
- arguments accepted by the :mod:`sqlalchemy.dialects.firebird.fdb` dialect.
- In addition, it also accepts the following:
- * ``type_conv`` - select the kind of mapping done on the types: by default
- SQLAlchemy uses 200 with Unicode, datetime and decimal support. See
- the linked documents below for further information.
- * ``concurrency_level`` - set the backend policy with regards to threading
- issues: by default SQLAlchemy uses policy 1. See the linked documents
- below for further information.
- .. seealso::
- https://sourceforge.net/projects/kinterbasdb
- https://kinterbasdb.sourceforge.net/dist_docs/usage.html#adv_param_conv_dynamic_type_translation
- https://kinterbasdb.sourceforge.net/dist_docs/usage.html#special_issue_concurrency
- """ # noqa
- import decimal
- from re import match
- from .base import FBDialect
- from .base import FBExecutionContext
- from ... import types as sqltypes
- from ... import util
- class _kinterbasdb_numeric(object):
- def bind_processor(self, dialect):
- def process(value):
- if isinstance(value, decimal.Decimal):
- return str(value)
- else:
- return value
- return process
- class _FBNumeric_kinterbasdb(_kinterbasdb_numeric, sqltypes.Numeric):
- pass
- class _FBFloat_kinterbasdb(_kinterbasdb_numeric, sqltypes.Float):
- pass
- class FBExecutionContext_kinterbasdb(FBExecutionContext):
- @property
- def rowcount(self):
- if self.execution_options.get(
- "enable_rowcount", self.dialect.enable_rowcount
- ):
- return self.cursor.rowcount
- else:
- return -1
- class FBDialect_kinterbasdb(FBDialect):
- driver = "kinterbasdb"
- supports_statement_cache = True
- supports_sane_rowcount = False
- supports_sane_multi_rowcount = False
- execution_ctx_cls = FBExecutionContext_kinterbasdb
- supports_native_decimal = True
- colspecs = util.update_copy(
- FBDialect.colspecs,
- {
- sqltypes.Numeric: _FBNumeric_kinterbasdb,
- sqltypes.Float: _FBFloat_kinterbasdb,
- },
- )
- def __init__(
- self,
- type_conv=200,
- concurrency_level=1,
- enable_rowcount=True,
- retaining=False,
- **kwargs
- ):
- super(FBDialect_kinterbasdb, self).__init__(**kwargs)
- self.enable_rowcount = enable_rowcount
- self.type_conv = type_conv
- self.concurrency_level = concurrency_level
- self.retaining = retaining
- if enable_rowcount:
- self.supports_sane_rowcount = True
- @classmethod
- def dbapi(cls):
- return __import__("kinterbasdb")
- def do_execute(self, cursor, statement, parameters, context=None):
- # kinterbase does not accept a None, but wants an empty list
- # when there are no arguments.
- cursor.execute(statement, parameters or [])
- def do_rollback(self, dbapi_connection):
- dbapi_connection.rollback(self.retaining)
- def do_commit(self, dbapi_connection):
- dbapi_connection.commit(self.retaining)
- def create_connect_args(self, url):
- opts = url.translate_connect_args(username="user")
- if opts.get("port"):
- opts["host"] = "%s/%s" % (opts["host"], opts["port"])
- del opts["port"]
- opts.update(url.query)
- util.coerce_kw_type(opts, "type_conv", int)
- type_conv = opts.pop("type_conv", self.type_conv)
- concurrency_level = opts.pop(
- "concurrency_level", self.concurrency_level
- )
- if self.dbapi is not None:
- initialized = getattr(self.dbapi, "initialized", None)
- if initialized is None:
- # CVS rev 1.96 changed the name of the attribute:
- # https://kinterbasdb.cvs.sourceforge.net/viewvc/kinterbasdb/
- # Kinterbasdb-3.0/__init__.py?r1=1.95&r2=1.96
- initialized = getattr(self.dbapi, "_initialized", False)
- if not initialized:
- self.dbapi.init(
- type_conv=type_conv, concurrency_level=concurrency_level
- )
- return ([], opts)
- def _get_server_version_info(self, connection):
- """Get the version of the Firebird server used by a connection.
- Returns a tuple of (`major`, `minor`, `build`), three integers
- representing the version of the attached server.
- """
- # This is the simpler approach (the other uses the services api),
- # that for backward compatibility reasons returns a string like
- # LI-V6.3.3.12981 Firebird 2.0
- # where the first version is a fake one resembling the old
- # Interbase signature.
- fbconn = connection.connection
- version = fbconn.server_version
- return self._parse_version_info(version)
- def _parse_version_info(self, version):
- m = match(
- r"\w+-V(\d+)\.(\d+)\.(\d+)\.(\d+)( \w+ (\d+)\.(\d+))?", version
- )
- if not m:
- raise AssertionError(
- "Could not determine version from string '%s'" % version
- )
- if m.group(5) != None:
- return tuple([int(x) for x in m.group(6, 7, 4)] + ["firebird"])
- else:
- return tuple([int(x) for x in m.group(1, 2, 3)] + ["interbase"])
- def is_disconnect(self, e, connection, cursor):
- if isinstance(
- e, (self.dbapi.OperationalError, self.dbapi.ProgrammingError)
- ):
- msg = str(e)
- return (
- "Error writing data to the connection" in msg
- or "Unable to complete network request to host" in msg
- or "Invalid connection state" in msg
- or "Invalid cursor state" in msg
- or "connection shutdown" in msg
- )
- else:
- return False
- dialect = FBDialect_kinterbasdb
|