testing.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. import asyncio
  2. import time
  3. from .compatibility import guarantee_single_callable
  4. from .timeout import timeout as async_timeout
  5. class ApplicationCommunicator:
  6. """
  7. Runs an ASGI application in a test mode, allowing sending of
  8. messages to it and retrieval of messages it sends.
  9. """
  10. def __init__(self, application, scope):
  11. self.application = guarantee_single_callable(application)
  12. self.scope = scope
  13. self.input_queue = asyncio.Queue()
  14. self.output_queue = asyncio.Queue()
  15. self.future = asyncio.ensure_future(
  16. self.application(scope, self.input_queue.get, self.output_queue.put)
  17. )
  18. async def wait(self, timeout=1):
  19. """
  20. Waits for the application to stop itself and returns any exceptions.
  21. """
  22. try:
  23. async with async_timeout(timeout):
  24. try:
  25. await self.future
  26. self.future.result()
  27. except asyncio.CancelledError:
  28. pass
  29. finally:
  30. if not self.future.done():
  31. self.future.cancel()
  32. try:
  33. await self.future
  34. except asyncio.CancelledError:
  35. pass
  36. def stop(self, exceptions=True):
  37. if not self.future.done():
  38. self.future.cancel()
  39. elif exceptions:
  40. # Give a chance to raise any exceptions
  41. self.future.result()
  42. def __del__(self):
  43. # Clean up on deletion
  44. try:
  45. self.stop(exceptions=False)
  46. except RuntimeError:
  47. # Event loop already stopped
  48. pass
  49. async def send_input(self, message):
  50. """
  51. Sends a single message to the application
  52. """
  53. # Give it the message
  54. await self.input_queue.put(message)
  55. async def receive_output(self, timeout=1):
  56. """
  57. Receives a single message from the application, with optional timeout.
  58. """
  59. # Make sure there's not an exception to raise from the task
  60. if self.future.done():
  61. self.future.result()
  62. # Wait and receive the message
  63. try:
  64. async with async_timeout(timeout):
  65. return await self.output_queue.get()
  66. except asyncio.TimeoutError as e:
  67. # See if we have another error to raise inside
  68. if self.future.done():
  69. self.future.result()
  70. else:
  71. self.future.cancel()
  72. try:
  73. await self.future
  74. except asyncio.CancelledError:
  75. pass
  76. raise e
  77. async def receive_nothing(self, timeout=0.1, interval=0.01):
  78. """
  79. Checks that there is no message to receive in the given time.
  80. """
  81. # `interval` has precedence over `timeout`
  82. start = time.monotonic()
  83. while time.monotonic() - start < timeout:
  84. if not self.output_queue.empty():
  85. return False
  86. await asyncio.sleep(interval)
  87. return self.output_queue.empty()