aiosqlite.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. # sqlite/aiosqlite.py
  2. # Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. r"""
  8. .. dialect:: sqlite+aiosqlite
  9. :name: aiosqlite
  10. :dbapi: aiosqlite
  11. :connectstring: sqlite+aiosqlite:///file_path
  12. :url: https://pypi.org/project/aiosqlite/
  13. The aiosqlite dialect provides support for the SQLAlchemy asyncio interface
  14. running on top of pysqlite.
  15. aiosqlite is a wrapper around pysqlite that uses a background thread for
  16. each connection. It does not actually use non-blocking IO, as SQLite
  17. databases are not socket-based. However it does provide a working asyncio
  18. interface that's useful for testing and prototyping purposes.
  19. Using a special asyncio mediation layer, the aiosqlite dialect is usable
  20. as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
  21. extension package.
  22. This dialect should normally be used only with the
  23. :func:`_asyncio.create_async_engine` engine creation function::
  24. from sqlalchemy.ext.asyncio import create_async_engine
  25. engine = create_async_engine("sqlite+aiosqlite:///filename")
  26. The URL passes through all arguments to the ``pysqlite`` driver, so all
  27. connection arguments are the same as they are for that of :ref:`pysqlite`.
  28. """ # noqa
  29. from .base import SQLiteExecutionContext
  30. from .pysqlite import SQLiteDialect_pysqlite
  31. from ... import pool
  32. from ... import util
  33. from ...engine import AdaptedConnection
  34. from ...util.concurrency import await_fallback
  35. from ...util.concurrency import await_only
  36. class AsyncAdapt_aiosqlite_cursor:
  37. __slots__ = (
  38. "_adapt_connection",
  39. "_connection",
  40. "description",
  41. "await_",
  42. "_rows",
  43. "arraysize",
  44. "rowcount",
  45. "lastrowid",
  46. )
  47. server_side = False
  48. def __init__(self, adapt_connection):
  49. self._adapt_connection = adapt_connection
  50. self._connection = adapt_connection._connection
  51. self.await_ = adapt_connection.await_
  52. self.arraysize = 1
  53. self.rowcount = -1
  54. self.description = None
  55. self._rows = []
  56. def close(self):
  57. self._rows[:] = []
  58. def execute(self, operation, parameters=None):
  59. try:
  60. _cursor = self.await_(self._connection.cursor())
  61. if parameters is None:
  62. self.await_(_cursor.execute(operation))
  63. else:
  64. self.await_(_cursor.execute(operation, parameters))
  65. if _cursor.description:
  66. self.description = _cursor.description
  67. self.lastrowid = self.rowcount = -1
  68. if not self.server_side:
  69. self._rows = self.await_(_cursor.fetchall())
  70. else:
  71. self.description = None
  72. self.lastrowid = _cursor.lastrowid
  73. self.rowcount = _cursor.rowcount
  74. if not self.server_side:
  75. self.await_(_cursor.close())
  76. else:
  77. self._cursor = _cursor
  78. except Exception as error:
  79. self._adapt_connection._handle_exception(error)
  80. def executemany(self, operation, seq_of_parameters):
  81. try:
  82. _cursor = self.await_(self._connection.cursor())
  83. self.await_(_cursor.executemany(operation, seq_of_parameters))
  84. self.description = None
  85. self.lastrowid = _cursor.lastrowid
  86. self.rowcount = _cursor.rowcount
  87. self.await_(_cursor.close())
  88. except Exception as error:
  89. self._adapt_connection._handle_exception(error)
  90. def setinputsizes(self, *inputsizes):
  91. pass
  92. def __iter__(self):
  93. while self._rows:
  94. yield self._rows.pop(0)
  95. def fetchone(self):
  96. if self._rows:
  97. return self._rows.pop(0)
  98. else:
  99. return None
  100. def fetchmany(self, size=None):
  101. if size is None:
  102. size = self.arraysize
  103. retval = self._rows[0:size]
  104. self._rows[:] = self._rows[size:]
  105. return retval
  106. def fetchall(self):
  107. retval = self._rows[:]
  108. self._rows[:] = []
  109. return retval
  110. class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_aiosqlite_cursor):
  111. __slots__ = "_cursor"
  112. server_side = True
  113. def __init__(self, *arg, **kw):
  114. super().__init__(*arg, **kw)
  115. self._cursor = None
  116. def close(self):
  117. if self._cursor is not None:
  118. self.await_(self._cursor.close())
  119. self._cursor = None
  120. def fetchone(self):
  121. return self.await_(self._cursor.fetchone())
  122. def fetchmany(self, size=None):
  123. if size is None:
  124. size = self.arraysize
  125. return self.await_(self._cursor.fetchmany(size=size))
  126. def fetchall(self):
  127. return self.await_(self._cursor.fetchall())
  128. class AsyncAdapt_aiosqlite_connection(AdaptedConnection):
  129. await_ = staticmethod(await_only)
  130. __slots__ = ("dbapi", "_connection")
  131. def __init__(self, dbapi, connection):
  132. self.dbapi = dbapi
  133. self._connection = connection
  134. @property
  135. def isolation_level(self):
  136. return self._connection.isolation_level
  137. @isolation_level.setter
  138. def isolation_level(self, value):
  139. try:
  140. self._connection.isolation_level = value
  141. except Exception as error:
  142. self._handle_exception(error)
  143. def create_function(self, *args, **kw):
  144. try:
  145. self.await_(self._connection.create_function(*args, **kw))
  146. except Exception as error:
  147. self._handle_exception(error)
  148. def cursor(self, server_side=False):
  149. if server_side:
  150. return AsyncAdapt_aiosqlite_ss_cursor(self)
  151. else:
  152. return AsyncAdapt_aiosqlite_cursor(self)
  153. def execute(self, *args, **kw):
  154. return self.await_(self._connection.execute(*args, **kw))
  155. def rollback(self):
  156. try:
  157. self.await_(self._connection.rollback())
  158. except Exception as error:
  159. self._handle_exception(error)
  160. def commit(self):
  161. try:
  162. self.await_(self._connection.commit())
  163. except Exception as error:
  164. self._handle_exception(error)
  165. def close(self):
  166. # print(">close", self)
  167. try:
  168. self.await_(self._connection.close())
  169. except Exception as error:
  170. self._handle_exception(error)
  171. def _handle_exception(self, error):
  172. if (
  173. isinstance(error, ValueError)
  174. and error.args[0] == "no active connection"
  175. ):
  176. util.raise_(
  177. self.dbapi.sqlite.OperationalError("no active connection"),
  178. from_=error,
  179. )
  180. else:
  181. raise error
  182. class AsyncAdaptFallback_aiosqlite_connection(AsyncAdapt_aiosqlite_connection):
  183. __slots__ = ()
  184. await_ = staticmethod(await_fallback)
  185. class AsyncAdapt_aiosqlite_dbapi:
  186. def __init__(self, aiosqlite, sqlite):
  187. self.aiosqlite = aiosqlite
  188. self.sqlite = sqlite
  189. self.paramstyle = "qmark"
  190. self._init_dbapi_attributes()
  191. def _init_dbapi_attributes(self):
  192. for name in (
  193. "DatabaseError",
  194. "Error",
  195. "IntegrityError",
  196. "NotSupportedError",
  197. "OperationalError",
  198. "ProgrammingError",
  199. "sqlite_version",
  200. "sqlite_version_info",
  201. ):
  202. setattr(self, name, getattr(self.aiosqlite, name))
  203. for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"):
  204. setattr(self, name, getattr(self.sqlite, name))
  205. for name in ("Binary",):
  206. setattr(self, name, getattr(self.sqlite, name))
  207. def connect(self, *arg, **kw):
  208. async_fallback = kw.pop("async_fallback", False)
  209. # Q. WHY do we need this?
  210. # A. Because there is no way to set connection.isolation_level
  211. # otherwise
  212. # Q. BUT HOW do you know it is SAFE ?????
  213. # A. The only operation that isn't safe is the isolation level set
  214. # operation which aiosqlite appears to have let slip through even
  215. # though pysqlite appears to do check_same_thread for this.
  216. # All execute operations etc. should be safe because they all
  217. # go through the single executor thread.
  218. kw["check_same_thread"] = False
  219. connection = self.aiosqlite.connect(*arg, **kw)
  220. # it's a Thread. you'll thank us later
  221. connection.daemon = True
  222. if util.asbool(async_fallback):
  223. return AsyncAdaptFallback_aiosqlite_connection(
  224. self,
  225. await_fallback(connection),
  226. )
  227. else:
  228. return AsyncAdapt_aiosqlite_connection(
  229. self,
  230. await_only(connection),
  231. )
  232. class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext):
  233. def create_server_side_cursor(self):
  234. return self._dbapi_connection.cursor(server_side=True)
  235. class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite):
  236. driver = "aiosqlite"
  237. supports_statement_cache = True
  238. is_async = True
  239. supports_server_side_cursors = True
  240. execution_ctx_cls = SQLiteExecutionContext_aiosqlite
  241. @classmethod
  242. def dbapi(cls):
  243. return AsyncAdapt_aiosqlite_dbapi(
  244. __import__("aiosqlite"), __import__("sqlite3")
  245. )
  246. @classmethod
  247. def get_pool_class(cls, url):
  248. if cls._is_url_file_db(url):
  249. return pool.NullPool
  250. else:
  251. return pool.StaticPool
  252. def is_disconnect(self, e, connection, cursor):
  253. if isinstance(
  254. e, self.dbapi.OperationalError
  255. ) and "no active connection" in str(e):
  256. return True
  257. return super().is_disconnect(e, connection, cursor)
  258. def get_driver_connection(self, connection):
  259. return connection._connection
  260. dialect = SQLiteDialect_aiosqlite