Just wrote one possible solution for you, using multiprocessing objects Process and Queue.
I measured its throughtput speed and it takes on average 150 mcs
(micro-seconds) to process one task that does almost nothing. Processing just takes integer number from a task, adds 1 to it and sends it back. I think 150 micro-seconds delay should be totally enough for you to process 30 FPS.
Queue is used instead of your Pipe, as I think it is more suitable for multi-task processing. And also if your time measurements are precise then Queue is also 660x
times faster than Pipe (150 Micro seconds compared to 100 Milli seconds delay).
You can notice that processing loop sends tasks in batches, meaning that first it sends many tasks to all processes and only after that gathers all sent and processed tasks. This kind of batch processing makes processing smooth, compared to sending just 1 task at a time and then gathering few results.
Even better would be if you send tasks to processes and then gather results asynchrounously in separate lighweight threads. This will prevent you blocking on waiting slowest process to finish tasks.
Processes are signalled to finish and exit by sending None
task to them.
Try it online!
def process(idx, in_q, out_q):
while True:
task = in_q.get()
if task is None:
break
out_q.put({'n': task['n'] + 1})
def main():
import multiprocessing, time
queue_size = 1 << 16
procs = []
for i in range(multiprocessing.cpu_count()):
in_q, out_q = [multiprocessing.Queue(queue_size) for j in range(2)]
procs.append({
'in_q': in_q,
'out_q': out_q,
'proc': multiprocessing.Process(target = process,
kwargs = dict(idx = i, in_q = in_q, out_q = out_q)),
})
procs[-1]['proc'].start()
num_blocks = 1 << 2
block = 1 << 10
assert block <= queue_size
tb = time.time()
for k in range(num_blocks):
# Send tasks
for i in range(block):
for j, proc in enumerate(procs):
proc['in_q'].put({'n': k * block * len(procs) + i * len(procs) + j})
# Receive tasks results
for i in range(block):
for proc in procs:
proc['out_q'].get()
print('Processing speed:', round((time.time() - tb) /
(num_blocks * block * len(procs)) * 1_000_000, 1), 'mcs per task')
# Send finish signals to processes
for proc in procs:
proc['in_q'].put(None)
# Join processes (wait for exit)
for proc in procs:
proc['proc'].join()
if __name__ == '__main__':
main()
Output:
Processing speed: 150.7 mcs per task
Also measured timings for sending just 1 task at a time (instead of 1000 tasks at a time) to all processes and receiving 1 task at a time. In this case delay is 460 mcs
(micro-seconds). So you can think of this as if pure delay of Queue is 460 mcs in the worst case of using it (460 mcs include both send + recv).
I've taken your example snippet and modified it a bit to use Queue instead of Pipe, and got 0.1 ms
delay.
Notice that I do this in a loop 5 times because first or second try initializes some Queue related stuff.
Try it online!
import multiprocessing as mp
import time
def proc(inp_q, out_q):
for i in range(5):
e = inp_q.get()
ts = float(time.time_ns())
out_q.put(ts)
if __name__ == "__main__":
inp_q, out_q = [mp.Queue(1 << 10) for i in range(2)]
p1 = mp.Process(target=proc, args=(inp_q, out_q))
p1.start()
for i in range(5):
ts = float(time.time_ns())
inp_q.put("START")
ts_end = out_q.get()
print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")
p1.join()
Output:
Time taken in ms: 2.181632
Time taken in ms: 0.14336
Time taken in ms: 0.09856
Time taken in ms: 0.156928
Time taken in ms: 0.108032
Also running your example in loop several times makes second and other send/recv iterations much faster than first time.
First time is very slow due to Lazily initializing resources. Most algorithms are Lazily Initialized, meaning that they allocate all needed resources only on first call. This is needed to prevent unnecessary allocation when algorithm is not used at all. On the other side this makes first call much more slower, hence you have to do few first empty calls to pre-heat Lazy algorithm.
Try it online!
import multiprocessing as mp
import time
def proc(child_conn):
for i in range(5):
child_conn.recv()
ts = time.time_ns()
child_conn.send(ts)
if __name__ == "__main__":
parent_conn, child_conn = mp.Pipe()
p1 = mp.Process(target=proc, args=(child_conn,))
p1.start()
for i in range(5):
ts = time.time_ns()
parent_conn.send("START")
ts_end = parent_conn.recv()
print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")
Output:
Time taken in ms: 2.693857
Time taken in ms: 0.072593
Time taken in ms: 0.038733
Time taken in ms: 0.039086
Time taken in ms: 0.037021
1.2 ms
delay, see here online example (scroll down web page to see console output).