_concurrency_py3k.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. # util/_concurrency_py3k.py
  2. # Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. import asyncio
  8. import sys
  9. from typing import Any
  10. from typing import Callable
  11. from typing import Coroutine
  12. import greenlet
  13. from . import compat
  14. from .langhelpers import memoized_property
  15. from .. import exc
  16. if compat.py37:
  17. try:
  18. from contextvars import copy_context as _copy_context
  19. # If greenlet.gr_context is present in current version of greenlet,
  20. # it will be set with a copy of the current context on creation.
  21. # Refs: https://github.com/python-greenlet/greenlet/pull/198
  22. getattr(greenlet.greenlet, "gr_context")
  23. except (ImportError, AttributeError):
  24. _copy_context = None
  25. else:
  26. _copy_context = None
  27. def is_exit_exception(e):
  28. # note asyncio.CancelledError is already BaseException
  29. # so was an exit exception in any case
  30. return not isinstance(e, Exception) or isinstance(
  31. e, (asyncio.TimeoutError, asyncio.CancelledError)
  32. )
  33. # implementation based on snaury gist at
  34. # https://gist.github.com/snaury/202bf4f22c41ca34e56297bae5f33fef
  35. # Issue for context: https://github.com/python-greenlet/greenlet/issues/173
  36. class _AsyncIoGreenlet(greenlet.greenlet):
  37. def __init__(self, fn, driver):
  38. greenlet.greenlet.__init__(self, fn, driver)
  39. self.driver = driver
  40. if _copy_context is not None:
  41. self.gr_context = _copy_context()
  42. def await_only(awaitable: Coroutine) -> Any:
  43. """Awaits an async function in a sync method.
  44. The sync method must be inside a :func:`greenlet_spawn` context.
  45. :func:`await_` calls cannot be nested.
  46. :param awaitable: The coroutine to call.
  47. """
  48. # this is called in the context greenlet while running fn
  49. current = greenlet.getcurrent()
  50. if not isinstance(current, _AsyncIoGreenlet):
  51. raise exc.MissingGreenlet(
  52. "greenlet_spawn has not been called; can't call await_() here. "
  53. "Was IO attempted in an unexpected place?"
  54. )
  55. # returns the control to the driver greenlet passing it
  56. # a coroutine to run. Once the awaitable is done, the driver greenlet
  57. # switches back to this greenlet with the result of awaitable that is
  58. # then returned to the caller (or raised as error)
  59. return current.driver.switch(awaitable)
  60. def await_fallback(awaitable: Coroutine) -> Any:
  61. """Awaits an async function in a sync method.
  62. The sync method must be inside a :func:`greenlet_spawn` context.
  63. :func:`await_` calls cannot be nested.
  64. :param awaitable: The coroutine to call.
  65. """
  66. # this is called in the context greenlet while running fn
  67. current = greenlet.getcurrent()
  68. if not isinstance(current, _AsyncIoGreenlet):
  69. loop = get_event_loop()
  70. if loop.is_running():
  71. raise exc.MissingGreenlet(
  72. "greenlet_spawn has not been called and asyncio event "
  73. "loop is already running; can't call await_() here. "
  74. "Was IO attempted in an unexpected place?"
  75. )
  76. return loop.run_until_complete(awaitable)
  77. return current.driver.switch(awaitable)
  78. async def greenlet_spawn(
  79. fn: Callable, *args, _require_await=False, **kwargs
  80. ) -> Any:
  81. """Runs a sync function ``fn`` in a new greenlet.
  82. The sync function can then use :func:`await_` to wait for async
  83. functions.
  84. :param fn: The sync callable to call.
  85. :param \\*args: Positional arguments to pass to the ``fn`` callable.
  86. :param \\*\\*kwargs: Keyword arguments to pass to the ``fn`` callable.
  87. """
  88. context = _AsyncIoGreenlet(fn, greenlet.getcurrent())
  89. # runs the function synchronously in gl greenlet. If the execution
  90. # is interrupted by await_, context is not dead and result is a
  91. # coroutine to wait. If the context is dead the function has
  92. # returned, and its result can be returned.
  93. switch_occurred = False
  94. try:
  95. result = context.switch(*args, **kwargs)
  96. while not context.dead:
  97. switch_occurred = True
  98. try:
  99. # wait for a coroutine from await_ and then return its
  100. # result back to it.
  101. value = await result
  102. except BaseException:
  103. # this allows an exception to be raised within
  104. # the moderated greenlet so that it can continue
  105. # its expected flow.
  106. result = context.throw(*sys.exc_info())
  107. else:
  108. result = context.switch(value)
  109. finally:
  110. # clean up to avoid cycle resolution by gc
  111. del context.driver
  112. if _require_await and not switch_occurred:
  113. raise exc.AwaitRequired(
  114. "The current operation required an async execution but none was "
  115. "detected. This will usually happen when using a non compatible "
  116. "DBAPI driver. Please ensure that an async DBAPI is used."
  117. )
  118. return result
  119. class AsyncAdaptedLock:
  120. @memoized_property
  121. def mutex(self):
  122. # there should not be a race here for coroutines creating the
  123. # new lock as we are not using await, so therefore no concurrency
  124. return asyncio.Lock()
  125. def __enter__(self):
  126. # await is used to acquire the lock only after the first calling
  127. # coroutine has created the mutex.
  128. await_fallback(self.mutex.acquire())
  129. return self
  130. def __exit__(self, *arg, **kw):
  131. self.mutex.release()
  132. def _util_async_run_coroutine_function(fn, *args, **kwargs):
  133. """for test suite/ util only"""
  134. loop = get_event_loop()
  135. if loop.is_running():
  136. raise Exception(
  137. "for async run coroutine we expect that no greenlet or event "
  138. "loop is running when we start out"
  139. )
  140. return loop.run_until_complete(fn(*args, **kwargs))
  141. def _util_async_run(fn, *args, **kwargs):
  142. """for test suite/ util only"""
  143. loop = get_event_loop()
  144. if not loop.is_running():
  145. return loop.run_until_complete(greenlet_spawn(fn, *args, **kwargs))
  146. else:
  147. # allow for a wrapped test function to call another
  148. assert isinstance(greenlet.getcurrent(), _AsyncIoGreenlet)
  149. return fn(*args, **kwargs)
  150. def get_event_loop():
  151. """vendor asyncio.get_event_loop() for python 3.7 and above.
  152. Python 3.10 deprecates get_event_loop() as a standalone.
  153. """
  154. if compat.py37:
  155. try:
  156. return asyncio.get_running_loop()
  157. except RuntimeError:
  158. return asyncio.get_event_loop_policy().get_event_loop()
  159. else:
  160. return asyncio.get_event_loop()