0

The problem is that I need to create an async method/library as follow (so that it does not block asyncio event loop):

  1. Create future from the being defined async method (let's say it methodA)
  2. Put the future into some Queue/List/Dict and one service will fill result into the future when available (the result take long time to be available)
  3. await the future in the methodA

The problem I have is awaiting for the created future is blocking forever as simplified example below.

import asyncio from asyncio import Future from queue import Queue from threading import Thread futures_queue: Queue[Future] = Queue() def fill_result_service(): counter = 0 while True: fut = futures_queue.get() print(f"Processing fut={id(fut)}") fut.set_result(f"OK: {counter}") counter += 1 filler_thread = Thread(target=fill_result_service) filler_thread.start() async def main_not_ok(): fut: Future[str] = asyncio.get_running_loop().create_future() print(f"Putting fut={id(fut)} into queue") futures_queue.put(fut) result = await fut assert result.startswith("OK") print("main_not_ok() completed") async def main_ok(): fut: Future[str] = asyncio.get_running_loop().create_future() tmp_thread = Thread(target=lambda: fut.set_result("OK: Local thread")) tmp_thread.start() result = await fut assert result.startswith("OK") print("main_ok() completed") if __name__ == "__main__": print("Running main_ok: ") asyncio.run(main_ok()) # work as expected print("\n\n\nRunning main_not_ok: ") asyncio.run(main_not_ok()) #blocking forever 

I have struggled to debug it for half-day and can't figure it out. Please help me.

1 Answer 1

1

You can do it without Queue the following way:

import asyncio import time from random import randint from threading import Thread def set_future_result(future: asyncio.Future, event: asyncio.Event, loop: asyncio.AbstractEventLoop): """Thread target function. It gives some result to futures.""" async def _event_status_change(_event: asyncio.Event): """Wrap event in coroutine to run it threadsafe""" _event.set() time.sleep(3) res = randint(1, 10) if res > 8: future.set_exception(Exception(f"Result: {res} " + "Error !!! " * 2)) else: future.set_result(res) asyncio.run_coroutine_threadsafe(_event_status_change(event), loop) async def asyncio_loop_killer(task: asyncio.Task): """Just task to finish our app in several seconds.""" n = 20 while n: await asyncio.sleep(1) n -= 1 if task.done(): break else: task.cancel() async def tasks_producer(): """Main function of our app. It produces futures for children threads.""" loop = asyncio.get_event_loop() while True: future, event = asyncio.Future(), asyncio.Event() event.clear() worker = Thread(target=set_future_result, args=(future, event, loop,), daemon=True) worker.start() await event.wait() if res := future.exception(): print(f"Error: {res}") break print(f"Result: {future.result()}") async def async_main(): """Wrapper around all async activity.""" producer_task = asyncio.create_task(tasks_producer()) await asyncio_loop_killer(producer_task) if __name__ == '__main__': asyncio.run(async_main()) 

Also check add_done_callback of Future

Sign up to request clarification or add additional context in comments.

6 Comments

Thank for the detailed suggestion. The problem is that start a thread every method call is not efficient for the task like pulling result from a broker.
@Chicky you can change code and use the same pool of threads, it is just example, not solution to solve your use case.
@Chicky check run_in_executor in asyncio library.
Oh sorry, my bad. I just stared at the need of Queue but not try your solution with Queue (It worked). Can you help me to understand why my code not working or some reference so that I can dive into.
@Chicky unfortunately not, you have to rewrite code.
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.