0

Background:

.
├── app.py # API source code file
├── worker.py # CPU-bound task

I am using ProcessPoolExecutor().submit() in app.py to execute an entrance function main() stored in worker.py. As long as the worker process is triggered, the process id of it will be returned to app process via a Queue created from Manager.

My original source code is like this:

app.py:

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager

from fastapi import FastAPI
from loguru import logger

from worker import main

app = FastAPI()


executor = ProcessPoolExecutor()
queue = Manager().Queue() # A queue created via Manager is used for message exchange.


@app.get("/whatever-route")
def whatever-route():
    logger.info(f"Before submitting")
    executor.submit(main, queue) # submit the CPU-bound task
    sub_pid = queue.get()
    logger.info(f"Got sub pid from queue.")
    return sub_pid

worker.py:

... # Three dots here means there are many packages to be imported.

logger.info("Finished importing packages.")


def init():
    pass


def main(q: Queue):
    sub_pid = os.getpid()
    q.put(sub_pid)
    logger.info(f"Put sub pid into queue.")
    ... # Three dots here means additional CPU-bound task to be executed.
    return 1

When debugging the api, I noticed that it takes sooo much time in importing the packages in worker.py:

2024-06-25 17:54:23.661 | INFO | app:whatever_route:26 - Before submitting

2024-06-25 17:54:25.146 | INFO | worker::33 - Finished importing packages.

2024-06-25 17:54:25.151 | INFO | worker:main:44 - Put sub pid into queue.

2024-06-25 17:54:25.151 | INFO | app:whatever_route:29 - Got sub pid from queue.


One feasible solution:

After realising the problem, my motivation is to try executing import clauses in worker.py once app.py is executed, instead of being put off to the stage when literally executing the task. Therefore, I added a line in app.py, after created the executor. I tried it on, it works.

app.py(modified):

... # Remain unchanged.


executor = ProcessPoolExecutor()
executor.submit(init) # NEWLY ADDED LINE. init is a function in worker.py which does nothing.
queue = Manager().Queue() # A queue created via Manager is used for message exchange.


@app.get("/whatever-route")
... # Remain unchanged

2024-06-25 18:14:59.382 | INFO | worker::33 - Finished importing packages.

------------------------Triggered API-----------------------

2024-06-25 18:19:42.704 | INFO | app:whatever_route:29 - Before submitting

2024-06-25 18:19:42.721 | INFO | worker:main:44 - Put sub pid into queue.

2024-06-25 18:19:42.721 | INFO | app:whatever_route:32 - Got sub pid from queue.


My Question:

Though the time overhead issue has been solved, I am quite confused on why my newly added line makes improvement. Given that executor.submit(init) is submitted to process A, and executor.submit(main, queue) should be submitted to a separate process B, process B should not be interfered by A and it should execute import clauses again?What is the underlying principle behind this improvement?

Apart from your guidance, any links to python official docs are highly welcomed!

0