12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- import asyncio
- import time
- from .compatibility import guarantee_single_callable
- from .timeout import timeout as async_timeout
- class ApplicationCommunicator:
- """
- Runs an ASGI application in a test mode, allowing sending of
- messages to it and retrieval of messages it sends.
- """
- def __init__(self, application, scope):
- self.application = guarantee_single_callable(application)
- self.scope = scope
- self.input_queue = asyncio.Queue()
- self.output_queue = asyncio.Queue()
- self.future = asyncio.ensure_future(
- self.application(scope, self.input_queue.get, self.output_queue.put)
- )
- async def wait(self, timeout=1):
- """
- Waits for the application to stop itself and returns any exceptions.
- """
- try:
- async with async_timeout(timeout):
- try:
- await self.future
- self.future.result()
- except asyncio.CancelledError:
- pass
- finally:
- if not self.future.done():
- self.future.cancel()
- try:
- await self.future
- except asyncio.CancelledError:
- pass
- def stop(self, exceptions=True):
- if not self.future.done():
- self.future.cancel()
- elif exceptions:
- # Give a chance to raise any exceptions
- self.future.result()
- def __del__(self):
- # Clean up on deletion
- try:
- self.stop(exceptions=False)
- except RuntimeError:
- # Event loop already stopped
- pass
- async def send_input(self, message):
- """
- Sends a single message to the application
- """
- # Give it the message
- await self.input_queue.put(message)
- async def receive_output(self, timeout=1):
- """
- Receives a single message from the application, with optional timeout.
- """
- # Make sure there's not an exception to raise from the task
- if self.future.done():
- self.future.result()
- # Wait and receive the message
- try:
- async with async_timeout(timeout):
- return await self.output_queue.get()
- except asyncio.TimeoutError as e:
- # See if we have another error to raise inside
- if self.future.done():
- self.future.result()
- else:
- self.future.cancel()
- try:
- await self.future
- except asyncio.CancelledError:
- pass
- raise e
- async def receive_nothing(self, timeout=0.1, interval=0.01):
- """
- Checks that there is no message to receive in the given time.
- """
- # `interval` has precedence over `timeout`
- start = time.monotonic()
- while time.monotonic() - start < timeout:
- if not self.output_queue.empty():
- return False
- await asyncio.sleep(interval)
- return self.output_queue.empty()
|