timeout.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. # This code is originally sourced from the aio-libs project "async_timeout",
  2. # under the Apache 2.0 license. You may see the original project at
  3. # https://github.com/aio-libs/async-timeout
  4. # It is vendored here to reduce chain-dependencies on this library, and
  5. # modified slightly to remove some features we don't use.
  6. import asyncio
  7. import sys
  8. from types import TracebackType
  9. from typing import Any, Optional, Type
  10. class timeout:
  11. """timeout context manager.
  12. Useful in cases when you want to apply timeout logic around block
  13. of code or in cases when asyncio.wait_for is not suitable. For example:
  14. >>> with timeout(0.001):
  15. ... async with aiohttp.get('https://github.com') as r:
  16. ... await r.text()
  17. timeout - value in seconds or None to disable timeout logic
  18. loop - asyncio compatible event loop
  19. """
  20. def __init__(
  21. self,
  22. timeout: Optional[float],
  23. *,
  24. loop: Optional[asyncio.AbstractEventLoop] = None,
  25. ) -> None:
  26. self._timeout = timeout
  27. if loop is None:
  28. loop = asyncio.get_event_loop()
  29. self._loop = loop
  30. self._task = None # type: Optional[asyncio.Task[Any]]
  31. self._cancelled = False
  32. self._cancel_handler = None # type: Optional[asyncio.Handle]
  33. self._cancel_at = None # type: Optional[float]
  34. def __enter__(self) -> "timeout":
  35. return self._do_enter()
  36. def __exit__(
  37. self,
  38. exc_type: Type[BaseException],
  39. exc_val: BaseException,
  40. exc_tb: TracebackType,
  41. ) -> Optional[bool]:
  42. self._do_exit(exc_type)
  43. return None
  44. async def __aenter__(self) -> "timeout":
  45. return self._do_enter()
  46. async def __aexit__(
  47. self,
  48. exc_type: Type[BaseException],
  49. exc_val: BaseException,
  50. exc_tb: TracebackType,
  51. ) -> None:
  52. self._do_exit(exc_type)
  53. @property
  54. def expired(self) -> bool:
  55. return self._cancelled
  56. @property
  57. def remaining(self) -> Optional[float]:
  58. if self._cancel_at is not None:
  59. return max(self._cancel_at - self._loop.time(), 0.0)
  60. else:
  61. return None
  62. def _do_enter(self) -> "timeout":
  63. # Support Tornado 5- without timeout
  64. # Details: https://github.com/python/asyncio/issues/392
  65. if self._timeout is None:
  66. return self
  67. self._task = current_task(self._loop)
  68. if self._task is None:
  69. raise RuntimeError(
  70. "Timeout context manager should be used " "inside a task"
  71. )
  72. if self._timeout <= 0:
  73. self._loop.call_soon(self._cancel_task)
  74. return self
  75. self._cancel_at = self._loop.time() + self._timeout
  76. self._cancel_handler = self._loop.call_at(self._cancel_at, self._cancel_task)
  77. return self
  78. def _do_exit(self, exc_type: Type[BaseException]) -> None:
  79. if exc_type is asyncio.CancelledError and self._cancelled:
  80. self._cancel_handler = None
  81. self._task = None
  82. raise asyncio.TimeoutError
  83. if self._timeout is not None and self._cancel_handler is not None:
  84. self._cancel_handler.cancel()
  85. self._cancel_handler = None
  86. self._task = None
  87. return None
  88. def _cancel_task(self) -> None:
  89. if self._task is not None:
  90. self._task.cancel()
  91. self._cancelled = True
  92. def current_task(loop: asyncio.AbstractEventLoop) -> "Optional[asyncio.Task[Any]]":
  93. if sys.version_info >= (3, 7):
  94. task = asyncio.current_task(loop=loop)
  95. else:
  96. task = asyncio.Task.current_task(loop=loop)
  97. if task is None:
  98. # this should be removed, tokio must use register_task and family API
  99. fn = getattr(loop, "current_task", None)
  100. if fn is not None:
  101. task = fn()
  102. return task