# conc.py -rw-r--r-- 2.5 KiB View raw
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import asyncio
import concurrent.futures
import random
from functools import partial

NUM_RUNNERS = 4

remaining_tasks = 0


def work(num):
    # Calculate n-th Fibonacci number
    a, b = 0, 1
    for i in range(num):
        a, b = b, a + b
    result = a

    new_tasks = []
    if num > 1:
        for x in range(random.randrange(0, 3)):
            task = random.randrange(1, num)
            priority = random.randrange(1, 1000)
            new_tasks.append((priority, task))
    return result, new_tasks


async def runner_manager(
    task_queue: asyncio.PriorityQueue,
    result_queue: asyncio.Queue,
    executor: concurrent.futures.Executor,
):
    global remaining_tasks
    loop = asyncio.get_running_loop()
    while True:
        task = await task_queue.get()
        prepared_func = partial(work, task[1])
        result, new_tasks = await loop.run_in_executor(executor, prepared_func)
        remaining_tasks += len(new_tasks)
        result_future = asyncio.ensure_future(result_queue.put((task, result)))
        await asyncio.gather(*(task_queue.put(new_task) for new_task in new_tasks))
        await result_future


async def run(tasks=[]):
    global remaining_tasks
    task_queue = asyncio.PriorityQueue()
    result_queue = asyncio.Queue()
    executor = concurrent.futures.ProcessPoolExecutor(max_workers=NUM_RUNNERS)

    runners = [
        asyncio.Task(runner_manager(task_queue, result_queue, executor))
        for x in range(NUM_RUNNERS)
    ]

    remaining_tasks += len(tasks)
    await asyncio.gather(*(task_queue.put(task) for task in tasks))

    while remaining_tasks > 0:
        task, result = await result_queue.get()
        remaining_tasks -= 1
        print(
            f"{task=} {result=} {remaining_tasks=} {task_queue._queue=} {result_queue._queue=}"
        )

    # Stop the runners after all tasks are done
    for runner in runners:
        runner.cancel()


asyncio.run(
    run(
        # Some tasks to run. Takes ~30s for me.
        [
            (1, 100000),
            (10, 9875),
            (500, 1000),
            (19, 87421),
            (4, 123),
            (4, 20),
            (87, 100000),
            (1, 100000),
            (1, 100000),
            (56, 100000),
            (1, 100000),
            (1, 100000),
            (54, 100000),
            (1, 100000),
            (7, 100000),
            (41213, 100000),
            (564, 100000),
            (1, 100000),
            (1, 100000),
            (1, 100000),
            (54, 100000),
            (1, 100000),
            (98, 100000),
        ] * 10
    )
)