123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336 |
- # sqlite/aiosqlite.py
- # Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
- # <see AUTHORS file>
- #
- # This module is part of SQLAlchemy and is released under
- # the MIT License: https://www.opensource.org/licenses/mit-license.php
- r"""
- .. dialect:: sqlite+aiosqlite
- :name: aiosqlite
- :dbapi: aiosqlite
- :connectstring: sqlite+aiosqlite:///file_path
- :url: https://pypi.org/project/aiosqlite/
- The aiosqlite dialect provides support for the SQLAlchemy asyncio interface
- running on top of pysqlite.
- aiosqlite is a wrapper around pysqlite that uses a background thread for
- each connection. It does not actually use non-blocking IO, as SQLite
- databases are not socket-based. However it does provide a working asyncio
- interface that's useful for testing and prototyping purposes.
- Using a special asyncio mediation layer, the aiosqlite dialect is usable
- as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
- extension package.
- This dialect should normally be used only with the
- :func:`_asyncio.create_async_engine` engine creation function::
- from sqlalchemy.ext.asyncio import create_async_engine
- engine = create_async_engine("sqlite+aiosqlite:///filename")
- The URL passes through all arguments to the ``pysqlite`` driver, so all
- connection arguments are the same as they are for that of :ref:`pysqlite`.
- """ # noqa
- from .base import SQLiteExecutionContext
- from .pysqlite import SQLiteDialect_pysqlite
- from ... import pool
- from ... import util
- from ...engine import AdaptedConnection
- from ...util.concurrency import await_fallback
- from ...util.concurrency import await_only
- class AsyncAdapt_aiosqlite_cursor:
- __slots__ = (
- "_adapt_connection",
- "_connection",
- "description",
- "await_",
- "_rows",
- "arraysize",
- "rowcount",
- "lastrowid",
- )
- server_side = False
- def __init__(self, adapt_connection):
- self._adapt_connection = adapt_connection
- self._connection = adapt_connection._connection
- self.await_ = adapt_connection.await_
- self.arraysize = 1
- self.rowcount = -1
- self.description = None
- self._rows = []
- def close(self):
- self._rows[:] = []
- def execute(self, operation, parameters=None):
- try:
- _cursor = self.await_(self._connection.cursor())
- if parameters is None:
- self.await_(_cursor.execute(operation))
- else:
- self.await_(_cursor.execute(operation, parameters))
- if _cursor.description:
- self.description = _cursor.description
- self.lastrowid = self.rowcount = -1
- if not self.server_side:
- self._rows = self.await_(_cursor.fetchall())
- else:
- self.description = None
- self.lastrowid = _cursor.lastrowid
- self.rowcount = _cursor.rowcount
- if not self.server_side:
- self.await_(_cursor.close())
- else:
- self._cursor = _cursor
- except Exception as error:
- self._adapt_connection._handle_exception(error)
- def executemany(self, operation, seq_of_parameters):
- try:
- _cursor = self.await_(self._connection.cursor())
- self.await_(_cursor.executemany(operation, seq_of_parameters))
- self.description = None
- self.lastrowid = _cursor.lastrowid
- self.rowcount = _cursor.rowcount
- self.await_(_cursor.close())
- except Exception as error:
- self._adapt_connection._handle_exception(error)
- def setinputsizes(self, *inputsizes):
- pass
- def __iter__(self):
- while self._rows:
- yield self._rows.pop(0)
- def fetchone(self):
- if self._rows:
- return self._rows.pop(0)
- else:
- return None
- def fetchmany(self, size=None):
- if size is None:
- size = self.arraysize
- retval = self._rows[0:size]
- self._rows[:] = self._rows[size:]
- return retval
- def fetchall(self):
- retval = self._rows[:]
- self._rows[:] = []
- return retval
- class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_aiosqlite_cursor):
- __slots__ = "_cursor"
- server_side = True
- def __init__(self, *arg, **kw):
- super().__init__(*arg, **kw)
- self._cursor = None
- def close(self):
- if self._cursor is not None:
- self.await_(self._cursor.close())
- self._cursor = None
- def fetchone(self):
- return self.await_(self._cursor.fetchone())
- def fetchmany(self, size=None):
- if size is None:
- size = self.arraysize
- return self.await_(self._cursor.fetchmany(size=size))
- def fetchall(self):
- return self.await_(self._cursor.fetchall())
- class AsyncAdapt_aiosqlite_connection(AdaptedConnection):
- await_ = staticmethod(await_only)
- __slots__ = ("dbapi", "_connection")
- def __init__(self, dbapi, connection):
- self.dbapi = dbapi
- self._connection = connection
- @property
- def isolation_level(self):
- return self._connection.isolation_level
- @isolation_level.setter
- def isolation_level(self, value):
- try:
- self._connection.isolation_level = value
- except Exception as error:
- self._handle_exception(error)
- def create_function(self, *args, **kw):
- try:
- self.await_(self._connection.create_function(*args, **kw))
- except Exception as error:
- self._handle_exception(error)
- def cursor(self, server_side=False):
- if server_side:
- return AsyncAdapt_aiosqlite_ss_cursor(self)
- else:
- return AsyncAdapt_aiosqlite_cursor(self)
- def execute(self, *args, **kw):
- return self.await_(self._connection.execute(*args, **kw))
- def rollback(self):
- try:
- self.await_(self._connection.rollback())
- except Exception as error:
- self._handle_exception(error)
- def commit(self):
- try:
- self.await_(self._connection.commit())
- except Exception as error:
- self._handle_exception(error)
- def close(self):
- # print(">close", self)
- try:
- self.await_(self._connection.close())
- except Exception as error:
- self._handle_exception(error)
- def _handle_exception(self, error):
- if (
- isinstance(error, ValueError)
- and error.args[0] == "no active connection"
- ):
- util.raise_(
- self.dbapi.sqlite.OperationalError("no active connection"),
- from_=error,
- )
- else:
- raise error
- class AsyncAdaptFallback_aiosqlite_connection(AsyncAdapt_aiosqlite_connection):
- __slots__ = ()
- await_ = staticmethod(await_fallback)
- class AsyncAdapt_aiosqlite_dbapi:
- def __init__(self, aiosqlite, sqlite):
- self.aiosqlite = aiosqlite
- self.sqlite = sqlite
- self.paramstyle = "qmark"
- self._init_dbapi_attributes()
- def _init_dbapi_attributes(self):
- for name in (
- "DatabaseError",
- "Error",
- "IntegrityError",
- "NotSupportedError",
- "OperationalError",
- "ProgrammingError",
- "sqlite_version",
- "sqlite_version_info",
- ):
- setattr(self, name, getattr(self.aiosqlite, name))
- for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"):
- setattr(self, name, getattr(self.sqlite, name))
- for name in ("Binary",):
- setattr(self, name, getattr(self.sqlite, name))
- def connect(self, *arg, **kw):
- async_fallback = kw.pop("async_fallback", False)
- # Q. WHY do we need this?
- # A. Because there is no way to set connection.isolation_level
- # otherwise
- # Q. BUT HOW do you know it is SAFE ?????
- # A. The only operation that isn't safe is the isolation level set
- # operation which aiosqlite appears to have let slip through even
- # though pysqlite appears to do check_same_thread for this.
- # All execute operations etc. should be safe because they all
- # go through the single executor thread.
- kw["check_same_thread"] = False
- connection = self.aiosqlite.connect(*arg, **kw)
- # it's a Thread. you'll thank us later
- connection.daemon = True
- if util.asbool(async_fallback):
- return AsyncAdaptFallback_aiosqlite_connection(
- self,
- await_fallback(connection),
- )
- else:
- return AsyncAdapt_aiosqlite_connection(
- self,
- await_only(connection),
- )
- class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext):
- def create_server_side_cursor(self):
- return self._dbapi_connection.cursor(server_side=True)
- class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite):
- driver = "aiosqlite"
- supports_statement_cache = True
- is_async = True
- supports_server_side_cursors = True
- execution_ctx_cls = SQLiteExecutionContext_aiosqlite
- @classmethod
- def dbapi(cls):
- return AsyncAdapt_aiosqlite_dbapi(
- __import__("aiosqlite"), __import__("sqlite3")
- )
- @classmethod
- def get_pool_class(cls, url):
- if cls._is_url_file_db(url):
- return pool.NullPool
- else:
- return pool.StaticPool
- def is_disconnect(self, e, connection, cursor):
- if isinstance(
- e, self.dbapi.OperationalError
- ) and "no active connection" in str(e):
- return True
- return super().is_disconnect(e, connection, cursor)
- def get_driver_connection(self, connection):
- return connection._connection
- dialect = SQLiteDialect_aiosqlite
|