session.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749
  1. # ext/asyncio/session.py
  2. # Copyright (C) 2020-2022 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. from . import engine
  8. from . import result as _result
  9. from .base import ReversibleProxy
  10. from .base import StartableContext
  11. from ... import util
  12. from ...orm import object_session
  13. from ...orm import Session
  14. from ...orm import state as _instance_state
  15. from ...util.concurrency import greenlet_spawn
  16. _EXECUTE_OPTIONS = util.immutabledict({"prebuffer_rows": True})
  17. _STREAM_OPTIONS = util.immutabledict({"stream_results": True})
  18. @util.create_proxy_methods(
  19. Session,
  20. ":class:`_orm.Session`",
  21. ":class:`_asyncio.AsyncSession`",
  22. classmethods=["object_session", "identity_key"],
  23. methods=[
  24. "__contains__",
  25. "__iter__",
  26. "add",
  27. "add_all",
  28. "expire",
  29. "expire_all",
  30. "expunge",
  31. "expunge_all",
  32. "is_modified",
  33. "in_transaction",
  34. "in_nested_transaction",
  35. ],
  36. attributes=[
  37. "dirty",
  38. "deleted",
  39. "new",
  40. "identity_map",
  41. "is_active",
  42. "autoflush",
  43. "no_autoflush",
  44. "info",
  45. ],
  46. )
  47. class AsyncSession(ReversibleProxy):
  48. """Asyncio version of :class:`_orm.Session`.
  49. The :class:`_asyncio.AsyncSession` is a proxy for a traditional
  50. :class:`_orm.Session` instance.
  51. .. versionadded:: 1.4
  52. To use an :class:`_asyncio.AsyncSession` with custom :class:`_orm.Session`
  53. implementations, see the
  54. :paramref:`_asyncio.AsyncSession.sync_session_class` parameter.
  55. """
  56. _is_asyncio = True
  57. dispatch = None
  58. def __init__(self, bind=None, binds=None, sync_session_class=None, **kw):
  59. r"""Construct a new :class:`_asyncio.AsyncSession`.
  60. All parameters other than ``sync_session_class`` are passed to the
  61. ``sync_session_class`` callable directly to instantiate a new
  62. :class:`_orm.Session`. Refer to :meth:`_orm.Session.__init__` for
  63. parameter documentation.
  64. :param sync_session_class:
  65. A :class:`_orm.Session` subclass or other callable which will be used
  66. to construct the :class:`_orm.Session` which will be proxied. This
  67. parameter may be used to provide custom :class:`_orm.Session`
  68. subclasses. Defaults to the
  69. :attr:`_asyncio.AsyncSession.sync_session_class` class-level
  70. attribute.
  71. .. versionadded:: 1.4.24
  72. """
  73. kw["future"] = True
  74. if bind:
  75. self.bind = bind
  76. bind = engine._get_sync_engine_or_connection(bind)
  77. if binds:
  78. self.binds = binds
  79. binds = {
  80. key: engine._get_sync_engine_or_connection(b)
  81. for key, b in binds.items()
  82. }
  83. if sync_session_class:
  84. self.sync_session_class = sync_session_class
  85. self.sync_session = self._proxied = self._assign_proxied(
  86. self.sync_session_class(bind=bind, binds=binds, **kw)
  87. )
  88. sync_session_class = Session
  89. """The class or callable that provides the
  90. underlying :class:`_orm.Session` instance for a particular
  91. :class:`_asyncio.AsyncSession`.
  92. At the class level, this attribute is the default value for the
  93. :paramref:`_asyncio.AsyncSession.sync_session_class` parameter. Custom
  94. subclasses of :class:`_asyncio.AsyncSession` can override this.
  95. At the instance level, this attribute indicates the current class or
  96. callable that was used to provide the :class:`_orm.Session` instance for
  97. this :class:`_asyncio.AsyncSession` instance.
  98. .. versionadded:: 1.4.24
  99. """
  100. sync_session: Session
  101. """Reference to the underlying :class:`_orm.Session` this
  102. :class:`_asyncio.AsyncSession` proxies requests towards.
  103. This instance can be used as an event target.
  104. .. seealso::
  105. :ref:`asyncio_events`
  106. """
  107. async def refresh(
  108. self, instance, attribute_names=None, with_for_update=None
  109. ):
  110. """Expire and refresh the attributes on the given instance.
  111. A query will be issued to the database and all attributes will be
  112. refreshed with their current database value.
  113. This is the async version of the :meth:`_orm.Session.refresh` method.
  114. See that method for a complete description of all options.
  115. .. seealso::
  116. :meth:`_orm.Session.refresh` - main documentation for refresh
  117. """
  118. return await greenlet_spawn(
  119. self.sync_session.refresh,
  120. instance,
  121. attribute_names=attribute_names,
  122. with_for_update=with_for_update,
  123. )
  124. async def run_sync(self, fn, *arg, **kw):
  125. """Invoke the given sync callable passing sync self as the first
  126. argument.
  127. This method maintains the asyncio event loop all the way through
  128. to the database connection by running the given callable in a
  129. specially instrumented greenlet.
  130. E.g.::
  131. with AsyncSession(async_engine) as session:
  132. await session.run_sync(some_business_method)
  133. .. note::
  134. The provided callable is invoked inline within the asyncio event
  135. loop, and will block on traditional IO calls. IO within this
  136. callable should only call into SQLAlchemy's asyncio database
  137. APIs which will be properly adapted to the greenlet context.
  138. .. seealso::
  139. :ref:`session_run_sync`
  140. """
  141. return await greenlet_spawn(fn, self.sync_session, *arg, **kw)
  142. async def execute(
  143. self,
  144. statement,
  145. params=None,
  146. execution_options=util.EMPTY_DICT,
  147. bind_arguments=None,
  148. **kw
  149. ):
  150. """Execute a statement and return a buffered
  151. :class:`_engine.Result` object.
  152. .. seealso::
  153. :meth:`_orm.Session.execute` - main documentation for execute
  154. """
  155. if execution_options:
  156. execution_options = util.immutabledict(execution_options).union(
  157. _EXECUTE_OPTIONS
  158. )
  159. else:
  160. execution_options = _EXECUTE_OPTIONS
  161. return await greenlet_spawn(
  162. self.sync_session.execute,
  163. statement,
  164. params=params,
  165. execution_options=execution_options,
  166. bind_arguments=bind_arguments,
  167. **kw
  168. )
  169. async def scalar(
  170. self,
  171. statement,
  172. params=None,
  173. execution_options=util.EMPTY_DICT,
  174. bind_arguments=None,
  175. **kw
  176. ):
  177. """Execute a statement and return a scalar result.
  178. .. seealso::
  179. :meth:`_orm.Session.scalar` - main documentation for scalar
  180. """
  181. result = await self.execute(
  182. statement,
  183. params=params,
  184. execution_options=execution_options,
  185. bind_arguments=bind_arguments,
  186. **kw
  187. )
  188. return result.scalar()
  189. async def scalars(
  190. self,
  191. statement,
  192. params=None,
  193. execution_options=util.EMPTY_DICT,
  194. bind_arguments=None,
  195. **kw
  196. ):
  197. """Execute a statement and return scalar results.
  198. :return: a :class:`_result.ScalarResult` object
  199. .. versionadded:: 1.4.24
  200. .. seealso::
  201. :meth:`_orm.Session.scalars` - main documentation for scalars
  202. :meth:`_asyncio.AsyncSession.stream_scalars` - streaming version
  203. """
  204. result = await self.execute(
  205. statement,
  206. params=params,
  207. execution_options=execution_options,
  208. bind_arguments=bind_arguments,
  209. **kw
  210. )
  211. return result.scalars()
  212. async def get(
  213. self,
  214. entity,
  215. ident,
  216. options=None,
  217. populate_existing=False,
  218. with_for_update=None,
  219. identity_token=None,
  220. ):
  221. """Return an instance based on the given primary key identifier,
  222. or ``None`` if not found.
  223. .. seealso::
  224. :meth:`_orm.Session.get` - main documentation for get
  225. """
  226. return await greenlet_spawn(
  227. self.sync_session.get,
  228. entity,
  229. ident,
  230. options=options,
  231. populate_existing=populate_existing,
  232. with_for_update=with_for_update,
  233. identity_token=identity_token,
  234. )
  235. async def stream(
  236. self,
  237. statement,
  238. params=None,
  239. execution_options=util.EMPTY_DICT,
  240. bind_arguments=None,
  241. **kw
  242. ):
  243. """Execute a statement and return a streaming
  244. :class:`_asyncio.AsyncResult` object."""
  245. if execution_options:
  246. execution_options = util.immutabledict(execution_options).union(
  247. _STREAM_OPTIONS
  248. )
  249. else:
  250. execution_options = _STREAM_OPTIONS
  251. result = await greenlet_spawn(
  252. self.sync_session.execute,
  253. statement,
  254. params=params,
  255. execution_options=execution_options,
  256. bind_arguments=bind_arguments,
  257. **kw
  258. )
  259. return _result.AsyncResult(result)
  260. async def stream_scalars(
  261. self,
  262. statement,
  263. params=None,
  264. execution_options=util.EMPTY_DICT,
  265. bind_arguments=None,
  266. **kw
  267. ):
  268. """Execute a statement and return a stream of scalar results.
  269. :return: an :class:`_asyncio.AsyncScalarResult` object
  270. .. versionadded:: 1.4.24
  271. .. seealso::
  272. :meth:`_orm.Session.scalars` - main documentation for scalars
  273. :meth:`_asyncio.AsyncSession.scalars` - non streaming version
  274. """
  275. result = await self.stream(
  276. statement,
  277. params=params,
  278. execution_options=execution_options,
  279. bind_arguments=bind_arguments,
  280. **kw
  281. )
  282. return result.scalars()
  283. async def delete(self, instance):
  284. """Mark an instance as deleted.
  285. The database delete operation occurs upon ``flush()``.
  286. As this operation may need to cascade along unloaded relationships,
  287. it is awaitable to allow for those queries to take place.
  288. .. seealso::
  289. :meth:`_orm.Session.delete` - main documentation for delete
  290. """
  291. return await greenlet_spawn(self.sync_session.delete, instance)
  292. async def merge(self, instance, load=True, options=None):
  293. """Copy the state of a given instance into a corresponding instance
  294. within this :class:`_asyncio.AsyncSession`.
  295. .. seealso::
  296. :meth:`_orm.Session.merge` - main documentation for merge
  297. """
  298. return await greenlet_spawn(
  299. self.sync_session.merge, instance, load=load, options=options
  300. )
  301. async def flush(self, objects=None):
  302. """Flush all the object changes to the database.
  303. .. seealso::
  304. :meth:`_orm.Session.flush` - main documentation for flush
  305. """
  306. await greenlet_spawn(self.sync_session.flush, objects=objects)
  307. def get_transaction(self):
  308. """Return the current root transaction in progress, if any.
  309. :return: an :class:`_asyncio.AsyncSessionTransaction` object, or
  310. ``None``.
  311. .. versionadded:: 1.4.18
  312. """
  313. trans = self.sync_session.get_transaction()
  314. if trans is not None:
  315. return AsyncSessionTransaction._retrieve_proxy_for_target(trans)
  316. else:
  317. return None
  318. def get_nested_transaction(self):
  319. """Return the current nested transaction in progress, if any.
  320. :return: an :class:`_asyncio.AsyncSessionTransaction` object, or
  321. ``None``.
  322. .. versionadded:: 1.4.18
  323. """
  324. trans = self.sync_session.get_nested_transaction()
  325. if trans is not None:
  326. return AsyncSessionTransaction._retrieve_proxy_for_target(trans)
  327. else:
  328. return None
  329. def get_bind(self, mapper=None, clause=None, bind=None, **kw):
  330. """Return a "bind" to which the synchronous proxied :class:`_orm.Session`
  331. is bound.
  332. Unlike the :meth:`_orm.Session.get_bind` method, this method is
  333. currently **not** used by this :class:`.AsyncSession` in any way
  334. in order to resolve engines for requests.
  335. .. note::
  336. This method proxies directly to the :meth:`_orm.Session.get_bind`
  337. method, however is currently **not** useful as an override target,
  338. in contrast to that of the :meth:`_orm.Session.get_bind` method.
  339. The example below illustrates how to implement custom
  340. :meth:`_orm.Session.get_bind` schemes that work with
  341. :class:`.AsyncSession` and :class:`.AsyncEngine`.
  342. The pattern introduced at :ref:`session_custom_partitioning`
  343. illustrates how to apply a custom bind-lookup scheme to a
  344. :class:`_orm.Session` given a set of :class:`_engine.Engine` objects.
  345. To apply a corresponding :meth:`_orm.Session.get_bind` implementation
  346. for use with a :class:`.AsyncSession` and :class:`.AsyncEngine`
  347. objects, continue to subclass :class:`_orm.Session` and apply it to
  348. :class:`.AsyncSession` using
  349. :paramref:`.AsyncSession.sync_session_class`. The inner method must
  350. continue to return :class:`_engine.Engine` instances, which can be
  351. acquired from a :class:`_asyncio.AsyncEngine` using the
  352. :attr:`_asyncio.AsyncEngine.sync_engine` attribute::
  353. # using example from "Custom Vertical Partitioning"
  354. import random
  355. from sqlalchemy.ext.asyncio import AsyncSession
  356. from sqlalchemy.ext.asyncio import create_async_engine
  357. from sqlalchemy.orm import Session, sessionmaker
  358. # construct async engines w/ async drivers
  359. engines = {
  360. 'leader':create_async_engine("sqlite+aiosqlite:///leader.db"),
  361. 'other':create_async_engine("sqlite+aiosqlite:///other.db"),
  362. 'follower1':create_async_engine("sqlite+aiosqlite:///follower1.db"),
  363. 'follower2':create_async_engine("sqlite+aiosqlite:///follower2.db"),
  364. }
  365. class RoutingSession(Session):
  366. def get_bind(self, mapper=None, clause=None, **kw):
  367. # within get_bind(), return sync engines
  368. if mapper and issubclass(mapper.class_, MyOtherClass):
  369. return engines['other'].sync_engine
  370. elif self._flushing or isinstance(clause, (Update, Delete)):
  371. return engines['leader'].sync_engine
  372. else:
  373. return engines[
  374. random.choice(['follower1','follower2'])
  375. ].sync_engine
  376. # apply to AsyncSession using sync_session_class
  377. AsyncSessionMaker = sessionmaker(
  378. class_=AsyncSession,
  379. sync_session_class=RoutingSession
  380. )
  381. The :meth:`_orm.Session.get_bind` method is called in a non-asyncio,
  382. implicitly non-blocking context in the same manner as ORM event hooks
  383. and functions that are invoked via :meth:`.AsyncSession.run_sync`, so
  384. routines that wish to run SQL commands inside of
  385. :meth:`_orm.Session.get_bind` can continue to do so using
  386. blocking-style code, which will be translated to implicitly async calls
  387. at the point of invoking IO on the database drivers.
  388. """ # noqa E501
  389. return self.sync_session.get_bind(
  390. mapper=mapper, clause=clause, bind=bind, **kw
  391. )
  392. async def connection(self, **kw):
  393. r"""Return a :class:`_asyncio.AsyncConnection` object corresponding to
  394. this :class:`.Session` object's transactional state.
  395. This method may also be used to establish execution options for the
  396. database connection used by the current transaction.
  397. .. versionadded:: 1.4.24 Added **kw arguments which are passed through
  398. to the underlying :meth:`_orm.Session.connection` method.
  399. .. seealso::
  400. :meth:`_orm.Session.connection` - main documentation for
  401. "connection"
  402. """
  403. sync_connection = await greenlet_spawn(
  404. self.sync_session.connection, **kw
  405. )
  406. return engine.AsyncConnection._retrieve_proxy_for_target(
  407. sync_connection
  408. )
  409. def begin(self, **kw):
  410. """Return an :class:`_asyncio.AsyncSessionTransaction` object.
  411. The underlying :class:`_orm.Session` will perform the
  412. "begin" action when the :class:`_asyncio.AsyncSessionTransaction`
  413. object is entered::
  414. async with async_session.begin():
  415. # .. ORM transaction is begun
  416. Note that database IO will not normally occur when the session-level
  417. transaction is begun, as database transactions begin on an
  418. on-demand basis. However, the begin block is async to accommodate
  419. for a :meth:`_orm.SessionEvents.after_transaction_create`
  420. event hook that may perform IO.
  421. For a general description of ORM begin, see
  422. :meth:`_orm.Session.begin`.
  423. """
  424. return AsyncSessionTransaction(self)
  425. def begin_nested(self, **kw):
  426. """Return an :class:`_asyncio.AsyncSessionTransaction` object
  427. which will begin a "nested" transaction, e.g. SAVEPOINT.
  428. Behavior is the same as that of :meth:`_asyncio.AsyncSession.begin`.
  429. For a general description of ORM begin nested, see
  430. :meth:`_orm.Session.begin_nested`.
  431. """
  432. return AsyncSessionTransaction(self, nested=True)
  433. async def rollback(self):
  434. """Rollback the current transaction in progress."""
  435. return await greenlet_spawn(self.sync_session.rollback)
  436. async def commit(self):
  437. """Commit the current transaction in progress."""
  438. return await greenlet_spawn(self.sync_session.commit)
  439. async def close(self):
  440. """Close out the transactional resources and ORM objects used by this
  441. :class:`_asyncio.AsyncSession`.
  442. This expunges all ORM objects associated with this
  443. :class:`_asyncio.AsyncSession`, ends any transaction in progress and
  444. :term:`releases` any :class:`_asyncio.AsyncConnection` objects which
  445. this :class:`_asyncio.AsyncSession` itself has checked out from
  446. associated :class:`_asyncio.AsyncEngine` objects. The operation then
  447. leaves the :class:`_asyncio.AsyncSession` in a state which it may be
  448. used again.
  449. .. tip::
  450. The :meth:`_asyncio.AsyncSession.close` method **does not prevent
  451. the Session from being used again**. The
  452. :class:`_asyncio.AsyncSession` itself does not actually have a
  453. distinct "closed" state; it merely means the
  454. :class:`_asyncio.AsyncSession` will release all database
  455. connections and ORM objects.
  456. .. seealso::
  457. :ref:`session_closing` - detail on the semantics of
  458. :meth:`_asyncio.AsyncSession.close`
  459. """
  460. return await greenlet_spawn(self.sync_session.close)
  461. async def invalidate(self):
  462. """Close this Session, using connection invalidation.
  463. For a complete description, see :meth:`_orm.Session.invalidate`.
  464. """
  465. return await greenlet_spawn(self.sync_session.invalidate)
  466. @classmethod
  467. async def close_all(self):
  468. """Close all :class:`_asyncio.AsyncSession` sessions."""
  469. return await greenlet_spawn(self.sync_session.close_all)
  470. async def __aenter__(self):
  471. return self
  472. async def __aexit__(self, type_, value, traceback):
  473. await self.close()
  474. def _maker_context_manager(self):
  475. # no @contextlib.asynccontextmanager until python3.7, gr
  476. return _AsyncSessionContextManager(self)
  477. class _AsyncSessionContextManager:
  478. def __init__(self, async_session):
  479. self.async_session = async_session
  480. async def __aenter__(self):
  481. self.trans = self.async_session.begin()
  482. await self.trans.__aenter__()
  483. return self.async_session
  484. async def __aexit__(self, type_, value, traceback):
  485. await self.trans.__aexit__(type_, value, traceback)
  486. await self.async_session.__aexit__(type_, value, traceback)
  487. class AsyncSessionTransaction(ReversibleProxy, StartableContext):
  488. """A wrapper for the ORM :class:`_orm.SessionTransaction` object.
  489. This object is provided so that a transaction-holding object
  490. for the :meth:`_asyncio.AsyncSession.begin` may be returned.
  491. The object supports both explicit calls to
  492. :meth:`_asyncio.AsyncSessionTransaction.commit` and
  493. :meth:`_asyncio.AsyncSessionTransaction.rollback`, as well as use as an
  494. async context manager.
  495. .. versionadded:: 1.4
  496. """
  497. __slots__ = ("session", "sync_transaction", "nested")
  498. def __init__(self, session, nested=False):
  499. self.session = session
  500. self.nested = nested
  501. self.sync_transaction = None
  502. @property
  503. def is_active(self):
  504. return (
  505. self._sync_transaction() is not None
  506. and self._sync_transaction().is_active
  507. )
  508. def _sync_transaction(self):
  509. if not self.sync_transaction:
  510. self._raise_for_not_started()
  511. return self.sync_transaction
  512. async def rollback(self):
  513. """Roll back this :class:`_asyncio.AsyncTransaction`."""
  514. await greenlet_spawn(self._sync_transaction().rollback)
  515. async def commit(self):
  516. """Commit this :class:`_asyncio.AsyncTransaction`."""
  517. await greenlet_spawn(self._sync_transaction().commit)
  518. async def start(self, is_ctxmanager=False):
  519. self.sync_transaction = self._assign_proxied(
  520. await greenlet_spawn(
  521. self.session.sync_session.begin_nested
  522. if self.nested
  523. else self.session.sync_session.begin
  524. )
  525. )
  526. if is_ctxmanager:
  527. self.sync_transaction.__enter__()
  528. return self
  529. async def __aexit__(self, type_, value, traceback):
  530. await greenlet_spawn(
  531. self._sync_transaction().__exit__, type_, value, traceback
  532. )
  533. def async_object_session(instance):
  534. """Return the :class:`_asyncio.AsyncSession` to which the given instance
  535. belongs.
  536. This function makes use of the sync-API function
  537. :class:`_orm.object_session` to retrieve the :class:`_orm.Session` which
  538. refers to the given instance, and from there links it to the original
  539. :class:`_asyncio.AsyncSession`.
  540. If the :class:`_asyncio.AsyncSession` has been garbage collected, the
  541. return value is ``None``.
  542. This functionality is also available from the
  543. :attr:`_orm.InstanceState.async_session` accessor.
  544. :param instance: an ORM mapped instance
  545. :return: an :class:`_asyncio.AsyncSession` object, or ``None``.
  546. .. versionadded:: 1.4.18
  547. """
  548. session = object_session(instance)
  549. if session is not None:
  550. return async_session(session)
  551. else:
  552. return None
  553. def async_session(session):
  554. """Return the :class:`_asyncio.AsyncSession` which is proxying the given
  555. :class:`_orm.Session` object, if any.
  556. :param session: a :class:`_orm.Session` instance.
  557. :return: a :class:`_asyncio.AsyncSession` instance, or ``None``.
  558. .. versionadded:: 1.4.18
  559. """
  560. return AsyncSession._retrieve_proxy_for_target(session, regenerate=False)
  561. _instance_state._async_provider = async_session