123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- # This code is originally sourced from the aio-libs project "async_timeout",
- # under the Apache 2.0 license. You may see the original project at
- # https://github.com/aio-libs/async-timeout
- # It is vendored here to reduce chain-dependencies on this library, and
- # modified slightly to remove some features we don't use.
- import asyncio
- import sys
- from types import TracebackType
- from typing import Any, Optional, Type
- class timeout:
- """timeout context manager.
- Useful in cases when you want to apply timeout logic around block
- of code or in cases when asyncio.wait_for is not suitable. For example:
- >>> with timeout(0.001):
- ... async with aiohttp.get('https://github.com') as r:
- ... await r.text()
- timeout - value in seconds or None to disable timeout logic
- loop - asyncio compatible event loop
- """
- def __init__(
- self,
- timeout: Optional[float],
- *,
- loop: Optional[asyncio.AbstractEventLoop] = None,
- ) -> None:
- self._timeout = timeout
- if loop is None:
- loop = asyncio.get_event_loop()
- self._loop = loop
- self._task = None # type: Optional[asyncio.Task[Any]]
- self._cancelled = False
- self._cancel_handler = None # type: Optional[asyncio.Handle]
- self._cancel_at = None # type: Optional[float]
- def __enter__(self) -> "timeout":
- return self._do_enter()
- def __exit__(
- self,
- exc_type: Type[BaseException],
- exc_val: BaseException,
- exc_tb: TracebackType,
- ) -> Optional[bool]:
- self._do_exit(exc_type)
- return None
- async def __aenter__(self) -> "timeout":
- return self._do_enter()
- async def __aexit__(
- self,
- exc_type: Type[BaseException],
- exc_val: BaseException,
- exc_tb: TracebackType,
- ) -> None:
- self._do_exit(exc_type)
- @property
- def expired(self) -> bool:
- return self._cancelled
- @property
- def remaining(self) -> Optional[float]:
- if self._cancel_at is not None:
- return max(self._cancel_at - self._loop.time(), 0.0)
- else:
- return None
- def _do_enter(self) -> "timeout":
- # Support Tornado 5- without timeout
- # Details: https://github.com/python/asyncio/issues/392
- if self._timeout is None:
- return self
- self._task = current_task(self._loop)
- if self._task is None:
- raise RuntimeError(
- "Timeout context manager should be used " "inside a task"
- )
- if self._timeout <= 0:
- self._loop.call_soon(self._cancel_task)
- return self
- self._cancel_at = self._loop.time() + self._timeout
- self._cancel_handler = self._loop.call_at(self._cancel_at, self._cancel_task)
- return self
- def _do_exit(self, exc_type: Type[BaseException]) -> None:
- if exc_type is asyncio.CancelledError and self._cancelled:
- self._cancel_handler = None
- self._task = None
- raise asyncio.TimeoutError
- if self._timeout is not None and self._cancel_handler is not None:
- self._cancel_handler.cancel()
- self._cancel_handler = None
- self._task = None
- return None
- def _cancel_task(self) -> None:
- if self._task is not None:
- self._task.cancel()
- self._cancelled = True
- def current_task(loop: asyncio.AbstractEventLoop) -> "Optional[asyncio.Task[Any]]":
- if sys.version_info >= (3, 7):
- task = asyncio.current_task(loop=loop)
- else:
- task = asyncio.Task.current_task(loop=loop)
- if task is None:
- # this should be removed, tokio must use register_task and family API
- fn = getattr(loop, "current_task", None)
- if fn is not None:
- task = fn()
- return task
|