import asyncio import concurrent.futures import random from functools import partial NUM_RUNNERS = 4 remaining_tasks = 0 def work(num): # Calculate n-th Fibonacci number a, b = 0, 1 for i in range(num): a, b = b, a + b result = a new_tasks = [] if num > 1: for x in range(random.randrange(0, 3)): task = random.randrange(1, num) priority = random.randrange(1, 1000) new_tasks.append((priority, task)) return result, new_tasks async def runner_manager( task_queue: asyncio.PriorityQueue, result_queue: asyncio.Queue, executor: concurrent.futures.Executor, ): global remaining_tasks loop = asyncio.get_running_loop() while True: task = await task_queue.get() prepared_func = partial(work, task[1]) result, new_tasks = await loop.run_in_executor(executor, prepared_func) remaining_tasks += len(new_tasks) result_future = asyncio.ensure_future(result_queue.put((task, result))) await asyncio.gather(*(task_queue.put(new_task) for new_task in new_tasks)) await result_future async def run(tasks=[]): global remaining_tasks task_queue = asyncio.PriorityQueue() result_queue = asyncio.Queue() executor = concurrent.futures.ProcessPoolExecutor(max_workers=NUM_RUNNERS) runners = [ asyncio.Task(runner_manager(task_queue, result_queue, executor)) for x in range(NUM_RUNNERS) ] remaining_tasks += len(tasks) await asyncio.gather(*(task_queue.put(task) for task in tasks)) while remaining_tasks > 0: task, result = await result_queue.get() remaining_tasks -= 1 print( f"{task=} {result=} {remaining_tasks=} {task_queue._queue=} {result_queue._queue=}" ) # Stop the runners after all tasks are done for runner in runners: runner.cancel() asyncio.run( run( # Some tasks to run. Takes ~30s for me. [ (1, 100000), (10, 9875), (500, 1000), (19, 87421), (4, 123), (4, 20), (87, 100000), (1, 100000), (1, 100000), (56, 100000), (1, 100000), (1, 100000), (54, 100000), (1, 100000), (7, 100000), (41213, 100000), (564, 100000), (1, 100000), (1, 100000), (1, 100000), (54, 100000), (1, 100000), (98, 100000), ] * 10 ) )