current_thread_executor.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. import queue
  2. import threading
  3. from concurrent.futures import Executor, Future
  4. class _WorkItem:
  5. """
  6. Represents an item needing to be run in the executor.
  7. Copied from ThreadPoolExecutor (but it's private, so we're not going to rely on importing it)
  8. """
  9. def __init__(self, future, fn, args, kwargs):
  10. self.future = future
  11. self.fn = fn
  12. self.args = args
  13. self.kwargs = kwargs
  14. def run(self):
  15. if not self.future.set_running_or_notify_cancel():
  16. return
  17. try:
  18. result = self.fn(*self.args, **self.kwargs)
  19. except BaseException as exc:
  20. self.future.set_exception(exc)
  21. # Break a reference cycle with the exception 'exc'
  22. self = None
  23. else:
  24. self.future.set_result(result)
  25. class CurrentThreadExecutor(Executor):
  26. """
  27. An Executor that actually runs code in the thread it is instantiated in.
  28. Passed to other threads running async code, so they can run sync code in
  29. the thread they came from.
  30. """
  31. def __init__(self):
  32. self._work_thread = threading.current_thread()
  33. self._work_queue = queue.Queue()
  34. self._broken = False
  35. def run_until_future(self, future):
  36. """
  37. Runs the code in the work queue until a result is available from the future.
  38. Should be run from the thread the executor is initialised in.
  39. """
  40. # Check we're in the right thread
  41. if threading.current_thread() != self._work_thread:
  42. raise RuntimeError(
  43. "You cannot run CurrentThreadExecutor from a different thread"
  44. )
  45. future.add_done_callback(self._work_queue.put)
  46. # Keep getting and running work items until we get the future we're waiting for
  47. # back via the future's done callback.
  48. try:
  49. while True:
  50. # Get a work item and run it
  51. work_item = self._work_queue.get()
  52. if work_item is future:
  53. return
  54. work_item.run()
  55. del work_item
  56. finally:
  57. self._broken = True
  58. def submit(self, fn, *args, **kwargs):
  59. # Check they're not submitting from the same thread
  60. if threading.current_thread() == self._work_thread:
  61. raise RuntimeError(
  62. "You cannot submit onto CurrentThreadExecutor from its own thread"
  63. )
  64. # Check they're not too late or the executor errored
  65. if self._broken:
  66. raise RuntimeError("CurrentThreadExecutor already quit or is broken")
  67. # Add to work queue
  68. f = Future()
  69. work_item = _WorkItem(f, fn, args, kwargs)
  70. self._work_queue.put(work_item)
  71. # Return the future
  72. return f