kinterbasdb.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. # firebird/kinterbasdb.py
  2. # Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. """
  8. .. dialect:: firebird+kinterbasdb
  9. :name: kinterbasdb
  10. :dbapi: kinterbasdb
  11. :connectstring: firebird+kinterbasdb://user:password@host:port/path/to/db[?key=value&key=value...]
  12. :url: https://firebirdsql.org/index.php?op=devel&sub=python
  13. Arguments
  14. ----------
  15. The Kinterbasdb backend accepts the ``enable_rowcount`` and ``retaining``
  16. arguments accepted by the :mod:`sqlalchemy.dialects.firebird.fdb` dialect.
  17. In addition, it also accepts the following:
  18. * ``type_conv`` - select the kind of mapping done on the types: by default
  19. SQLAlchemy uses 200 with Unicode, datetime and decimal support. See
  20. the linked documents below for further information.
  21. * ``concurrency_level`` - set the backend policy with regards to threading
  22. issues: by default SQLAlchemy uses policy 1. See the linked documents
  23. below for further information.
  24. .. seealso::
  25. https://sourceforge.net/projects/kinterbasdb
  26. https://kinterbasdb.sourceforge.net/dist_docs/usage.html#adv_param_conv_dynamic_type_translation
  27. https://kinterbasdb.sourceforge.net/dist_docs/usage.html#special_issue_concurrency
  28. """ # noqa
  29. import decimal
  30. from re import match
  31. from .base import FBDialect
  32. from .base import FBExecutionContext
  33. from ... import types as sqltypes
  34. from ... import util
  35. class _kinterbasdb_numeric(object):
  36. def bind_processor(self, dialect):
  37. def process(value):
  38. if isinstance(value, decimal.Decimal):
  39. return str(value)
  40. else:
  41. return value
  42. return process
  43. class _FBNumeric_kinterbasdb(_kinterbasdb_numeric, sqltypes.Numeric):
  44. pass
  45. class _FBFloat_kinterbasdb(_kinterbasdb_numeric, sqltypes.Float):
  46. pass
  47. class FBExecutionContext_kinterbasdb(FBExecutionContext):
  48. @property
  49. def rowcount(self):
  50. if self.execution_options.get(
  51. "enable_rowcount", self.dialect.enable_rowcount
  52. ):
  53. return self.cursor.rowcount
  54. else:
  55. return -1
  56. class FBDialect_kinterbasdb(FBDialect):
  57. driver = "kinterbasdb"
  58. supports_statement_cache = True
  59. supports_sane_rowcount = False
  60. supports_sane_multi_rowcount = False
  61. execution_ctx_cls = FBExecutionContext_kinterbasdb
  62. supports_native_decimal = True
  63. colspecs = util.update_copy(
  64. FBDialect.colspecs,
  65. {
  66. sqltypes.Numeric: _FBNumeric_kinterbasdb,
  67. sqltypes.Float: _FBFloat_kinterbasdb,
  68. },
  69. )
  70. def __init__(
  71. self,
  72. type_conv=200,
  73. concurrency_level=1,
  74. enable_rowcount=True,
  75. retaining=False,
  76. **kwargs
  77. ):
  78. super(FBDialect_kinterbasdb, self).__init__(**kwargs)
  79. self.enable_rowcount = enable_rowcount
  80. self.type_conv = type_conv
  81. self.concurrency_level = concurrency_level
  82. self.retaining = retaining
  83. if enable_rowcount:
  84. self.supports_sane_rowcount = True
  85. @classmethod
  86. def dbapi(cls):
  87. return __import__("kinterbasdb")
  88. def do_execute(self, cursor, statement, parameters, context=None):
  89. # kinterbase does not accept a None, but wants an empty list
  90. # when there are no arguments.
  91. cursor.execute(statement, parameters or [])
  92. def do_rollback(self, dbapi_connection):
  93. dbapi_connection.rollback(self.retaining)
  94. def do_commit(self, dbapi_connection):
  95. dbapi_connection.commit(self.retaining)
  96. def create_connect_args(self, url):
  97. opts = url.translate_connect_args(username="user")
  98. if opts.get("port"):
  99. opts["host"] = "%s/%s" % (opts["host"], opts["port"])
  100. del opts["port"]
  101. opts.update(url.query)
  102. util.coerce_kw_type(opts, "type_conv", int)
  103. type_conv = opts.pop("type_conv", self.type_conv)
  104. concurrency_level = opts.pop(
  105. "concurrency_level", self.concurrency_level
  106. )
  107. if self.dbapi is not None:
  108. initialized = getattr(self.dbapi, "initialized", None)
  109. if initialized is None:
  110. # CVS rev 1.96 changed the name of the attribute:
  111. # https://kinterbasdb.cvs.sourceforge.net/viewvc/kinterbasdb/
  112. # Kinterbasdb-3.0/__init__.py?r1=1.95&r2=1.96
  113. initialized = getattr(self.dbapi, "_initialized", False)
  114. if not initialized:
  115. self.dbapi.init(
  116. type_conv=type_conv, concurrency_level=concurrency_level
  117. )
  118. return ([], opts)
  119. def _get_server_version_info(self, connection):
  120. """Get the version of the Firebird server used by a connection.
  121. Returns a tuple of (`major`, `minor`, `build`), three integers
  122. representing the version of the attached server.
  123. """
  124. # This is the simpler approach (the other uses the services api),
  125. # that for backward compatibility reasons returns a string like
  126. # LI-V6.3.3.12981 Firebird 2.0
  127. # where the first version is a fake one resembling the old
  128. # Interbase signature.
  129. fbconn = connection.connection
  130. version = fbconn.server_version
  131. return self._parse_version_info(version)
  132. def _parse_version_info(self, version):
  133. m = match(
  134. r"\w+-V(\d+)\.(\d+)\.(\d+)\.(\d+)( \w+ (\d+)\.(\d+))?", version
  135. )
  136. if not m:
  137. raise AssertionError(
  138. "Could not determine version from string '%s'" % version
  139. )
  140. if m.group(5) != None:
  141. return tuple([int(x) for x in m.group(6, 7, 4)] + ["firebird"])
  142. else:
  143. return tuple([int(x) for x in m.group(1, 2, 3)] + ["interbase"])
  144. def is_disconnect(self, e, connection, cursor):
  145. if isinstance(
  146. e, (self.dbapi.OperationalError, self.dbapi.ProgrammingError)
  147. ):
  148. msg = str(e)
  149. return (
  150. "Error writing data to the connection" in msg
  151. or "Unable to complete network request to host" in msg
  152. or "Invalid connection state" in msg
  153. or "Invalid cursor state" in msg
  154. or "connection shutdown" in msg
  155. )
  156. else:
  157. return False
  158. dialect = FBDialect_kinterbasdb