4

I'm currently writing an image processing program in Python 3.x that needs to process frames in real-time (30 FPS) with low-latency (<60ms). I have 1 parent process that reads frames and sends them to multiple child processes via a SharedMemory object. The computations done by the child processes are CPU bound and running all of them on a single core is not possible at 30 FPS. But since they work independently of each other, I decided to run them as separate processes.

Currently, I'm using Pipes to send commands to the child processes, most importantly to inform them whenever the frame is updated. On measuring the time between the send() command of the parent and the recv() command on the child, the latency is always >100ms. I used time.time_ns() for this.

This is a problem because the output feed will now always be lagging by >100ms + time taken by all the children to finish processing (another 20-30ms + the delays between all the send() functions).

The application is meant to be used on a live sports feed and therefore cannot introduce such a high latency. So I have exactly 2 questions:

  1. Are Pipes actually that slow in Python? Or is something wrong with my implementation of them. (Note: I have tested the latency on an Intel i5 9th Gen as well as an Apple M1)

  2. If Pipes indeed are this slow, do I have any other options in Python? Other than resorting to some form of sockets?

Thanks.

Edit:

I've added the code I've used to test the Pipe latency here.

import multiprocessing as mp
import time

def proc(child_conn):
    
    child_conn.recv()
    ts = time.time_ns()
    child_conn.send(ts)
    child_conn.close()

if __name__ == "__main__":

    parent_conn, child_conn = mp.Pipe()
    p1 = mp.Process(target=proc, args=(child_conn,))
    p1.start()

    ts = time.time_ns()
    parent_conn.send("START")
    ts_end = parent_conn.recv()

    print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")
7
  • 1
    Have you checked how fast the OS' raw pipes are? Note that sending data between process will pickle and unpickle it, which might actually be the bottelneck. Using simpler data could speed this up. Commented Jan 20, 2022 at 10:30
  • Wait, you have a low latency application and you decided to use python? Are you joking? Commented Jan 20, 2022 at 10:32
  • its difficult to say without sample code + data
    – anon01
    Commented Jan 20, 2022 at 10:41
  • @MadPhysicist I wish I could have chosen C/C++, but unfortunately the choice wasn't in my hands. XD Commented Jan 20, 2022 at 11:19
  • 1
    @AbhishekSatish Your added testing code shows me 1.2 ms delay, see here online example (scroll down web page to see console output).
    – Arty
    Commented Jan 20, 2022 at 12:01

2 Answers 2

4

Just wrote one possible solution for you, using multiprocessing objects Process and Queue.

I measured its throughtput speed and it takes on average 150 mcs (micro-seconds) to process one task that does almost nothing. Processing just takes integer number from a task, adds 1 to it and sends it back. I think 150 micro-seconds delay should be totally enough for you to process 30 FPS.

Queue is used instead of your Pipe, as I think it is more suitable for multi-task processing. And also if your time measurements are precise then Queue is also 660x times faster than Pipe (150 Micro seconds compared to 100 Milli seconds delay).

You can notice that processing loop sends tasks in batches, meaning that first it sends many tasks to all processes and only after that gathers all sent and processed tasks. This kind of batch processing makes processing smooth, compared to sending just 1 task at a time and then gathering few results.

Even better would be if you send tasks to processes and then gather results asynchrounously in separate lighweight threads. This will prevent you blocking on waiting slowest process to finish tasks.

Processes are signalled to finish and exit by sending None task to them.

Try it online!

def process(idx, in_q, out_q):
    while True:
        task = in_q.get()
        if task is None:
            break
        out_q.put({'n': task['n'] + 1})

def main():
    import multiprocessing, time

    queue_size = 1 << 16
    procs = []
    for i in range(multiprocessing.cpu_count()):
        in_q, out_q = [multiprocessing.Queue(queue_size) for j in range(2)]
        procs.append({
            'in_q': in_q,
            'out_q': out_q,
            'proc': multiprocessing.Process(target = process,
                kwargs = dict(idx = i, in_q = in_q, out_q = out_q)),
        })
        procs[-1]['proc'].start()

    num_blocks = 1 << 2
    block = 1 << 10
    assert block <= queue_size

    tb = time.time()
    for k in range(num_blocks):
        # Send tasks
        for i in range(block):
            for j, proc in enumerate(procs):
                proc['in_q'].put({'n': k * block * len(procs) + i * len(procs) + j})
        # Receive tasks results
        for i in range(block):
            for proc in procs:
                proc['out_q'].get()
    print('Processing speed:', round((time.time() - tb) /
        (num_blocks * block * len(procs)) * 1_000_000, 1), 'mcs per task')
    
    # Send finish signals to processes
    for proc in procs:
        proc['in_q'].put(None)
    # Join processes (wait for exit)
    for proc in procs:
        proc['proc'].join()

if __name__ == '__main__':
    main()

Output:

Processing speed: 150.7 mcs per task

Also measured timings for sending just 1 task at a time (instead of 1000 tasks at a time) to all processes and receiving 1 task at a time. In this case delay is 460 mcs (micro-seconds). So you can think of this as if pure delay of Queue is 460 mcs in the worst case of using it (460 mcs include both send + recv).


I've taken your example snippet and modified it a bit to use Queue instead of Pipe, and got 0.1 ms delay.

Notice that I do this in a loop 5 times because first or second try initializes some Queue related stuff.

Try it online!

import multiprocessing as mp
import time

def proc(inp_q, out_q):
    for i in range(5):
        e = inp_q.get()
        ts = float(time.time_ns())
        out_q.put(ts)

if __name__ == "__main__":

    inp_q, out_q = [mp.Queue(1 << 10) for i in range(2)]
    p1 = mp.Process(target=proc, args=(inp_q, out_q))
    p1.start()

    for i in range(5):
        ts = float(time.time_ns())
        inp_q.put("START")
        ts_end = out_q.get()

        print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")
    p1.join()

Output:

Time taken in ms: 2.181632
Time taken in ms: 0.14336
Time taken in ms: 0.09856
Time taken in ms: 0.156928
Time taken in ms: 0.108032

Also running your example in loop several times makes second and other send/recv iterations much faster than first time.

First time is very slow due to Lazily initializing resources. Most algorithms are Lazily Initialized, meaning that they allocate all needed resources only on first call. This is needed to prevent unnecessary allocation when algorithm is not used at all. On the other side this makes first call much more slower, hence you have to do few first empty calls to pre-heat Lazy algorithm.

Try it online!

import multiprocessing as mp
import time

def proc(child_conn):
    for i in range(5):
        child_conn.recv()
        ts = time.time_ns()
        child_conn.send(ts)

if __name__ == "__main__":

    parent_conn, child_conn = mp.Pipe()
    p1 = mp.Process(target=proc, args=(child_conn,))
    p1.start()

    for i in range(5):
        ts = time.time_ns()
        parent_conn.send("START")
        ts_end = parent_conn.recv()

        print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")

Output:

Time taken in ms: 2.693857
Time taken in ms: 0.072593
Time taken in ms: 0.038733
Time taken in ms: 0.039086
Time taken in ms: 0.037021
11
  • 1
    Hey, thanks for your answer! However, the issue here is that the your latency is hidden by the sheer number of blocks you are processing. Your true latency would be the time taken to send and receive the result for your first block. In my case the frame arrives only every 33ms, so the latency is always the same throughout, which is the time that the Pipe takes to send a message from the parent to the child + the processing time. Commented Jan 20, 2022 at 11:16
  • @AbhishekSatish I just made block to be equal 1, and now processing speed is 460 mcs (micro seconds). In this case I send 1 frame at a time to all processes and wait for 1 frame at a time. So 460 mcs is true speed if you send just 1 frame. So 460 mcs is delay for using Queue send and recv.
    – Arty
    Commented Jan 20, 2022 at 11:26
  • Could you take a look at the snippet I've added? If it's not too much trouble, could you try Queue's with the same simplicity and tell me if you're still getting an answer in us? Thanks so much! Commented Jan 20, 2022 at 12:02
  • @AbhishekSatish Your added snippet show great time online, just 1.2 ms, see here (scroll down web page to see console output). What OS (Linux or Windows) do you use? This online server runs on Linux.
    – Arty
    Commented Jan 20, 2022 at 12:03
  • 1
    @AbhishekSatish Modified your axample to do several send/recv in a loop, and second send shows 0.05 ms, see code here. You have to send several times, because very first send does some heavy initialization of socket related stuff.
    – Arty
    Commented Jan 20, 2022 at 12:20
4

The following program sends a simple object through a pipe 1 million times and measures the total elapsed time in seconds and average send time in milliseconds. I am running on a fairly old Windows desktop, an Intel(R) Core(TM) i7-4790 CPU @ 3.60 GHz:

from multiprocessing import Pipe, Process
import time

class Message:
    def __init__(self, text):
        self.text = text

N = 1_000_000

def worker(recv_connection):
    for _ in range(N):
        msg = recv_connection.recv()

def main():
    recv_connection, send_connection = Pipe(duplex=False)
    p = Process(target=worker, args=(recv_connection,))
    p.start()
    msg = Message('dummy')
    start_time = time.time_ns()
    for _ in range(N):
        send_connection.send(msg)
    p.join()
    elapsed = time.time_ns() - start_time
    print(f'Total elapsed time: {elapsed / 1_000_000_000} seconds')
    print(f'Average send time: {elapsed / (1_000_000 * N)}ms.')

if __name__ == '__main__':
    main()

Prints:

Total elapsed time: 10.7369966 seconds
Average send time: 0.0107369966ms.

This is 10,000 times faster than what you are achieving (100ms.), so I can only conclude that it must be the complexity of the object that you are sending through the pipe.

Update

You do want to use multiprocessing but I would suggest a multiprocessing pool, specifically a multiprocessing.pool.Pool instance used in conjunction with the imap method. This would allow you to have a generator function that yields the next frame to be processed and submitted to the pool for processing and get the processed frame returned back to the main process as it becomes available and returned in the order in which the frames were submitted. The following outlines the basic idea:

from multiprocessing import Pool, cpu_count
import time

def process_frame(frame):
    # return processed frame
    time.sleep(.1)
    return frame.upper()

def generate_frames_for_processing():
    for i in range(100):
        time.sleep(.033)
        yield f'msg{i}'

def main():
    # Leave a processor for the main process:
    pool = Pool(cpu_count() - 1)
    start_time = time.time()
    # get processed results as they are returned in order of being processed:
    for processed_frame in pool.imap(process_frame, generate_frames_for_processing()):
        # Do something with returned processed frame
        # These will be in the same order as the frames are submitted
        ...
        print(processed_frame)
    pool.close()
    pool.join()
    print('Elapsed:', time.time() - start_time)

if __name__ == '__main__':
    main()

Prints:

MSG0
MSG1
MSG2
...
MSG97
MSG98
MSG99
Elapsed: 3.467884302139282

You can specify a chunksize argument on the imap call, but you probably do not want to. See the documentation for details.

5
  • Hey! Thanks for your answer! However, in your solution since you're processing a large batch of data at a time the latency is being hidden. In my case, I only receive a frame every 33ms, so the possibility of batch processing is highly unlikely. My individual sends need to have a low latency. Commented Jan 20, 2022 at 12:09
  • This has nothing to do with "batch processing". Your generate_frames_for_processing would be a totally different implementation that would be issuing a yield every 33ms. There is an underlying Queue that implements the imap call. Every time that yield is done by the generator, another task is queued to the pool and process_frame will be invoked and the main process will get another processed frame. (more ...)
    – Booboo
    Commented Jan 20, 2022 at 12:20
  • The solution posted by Arty has no control over the order in which items are placed on the output queue so every frame has to be tagged with a number and the output queue of processed frames are also tagged with numbers. You cannot do anything with the processed frames until they have been all received. Presumably you then have to build a dictionary using the frame numbers or if the frames are fixed size you can perhaps write them out as fixed offsets to a file. But the entire processed frames must be in memory.
    – Booboo
    Commented Jan 20, 2022 at 12:23
  • I have updated the demo program to make it more explicit.
    – Booboo
    Commented Jan 20, 2022 at 12:34
  • Thanks a lot! I think yours is a great way to go in the event that I was sending frames via the Pipe/Process and then also needing them back. Since I'm using a shared memory object however and then sending commands over the Pipe to receive certain bytes of information, I won't be able to use this solution for that part. I will definitely improve the way I'm utlising the pool with your example though! Thanks very much for taking the time out to give such a well explained answer! Commented Jan 20, 2022 at 13:03

Not the answer you're looking for? Browse other questions tagged or ask your own question.