25

I need help in understanding multiprocessing.Queue. The problem I'm facing is that getting results from queue.get(...) are hilariously behind compared to calls to queue.put(...) and the queue's buffer (the deque).

This leaking abstraction led me to investigate the internals of the queue. Its straightforward source code just points me to the deque implementation, and that also seems simple enough that I cannot use it to explain the behavior I'm seeing. Also I read that Queue uses pipes, but I can't seem to find that in the source code.

I've boiled it down to a minimal example reproducing the problem, and I specify a possible output below that.

import threading
import multiprocessing
import queue

q = None
def enqueue(item):
    global q
    if q is None:
        q = multiprocessing.Queue()
        process = threading.Thread(target=worker, args=(q,))  # or multiprocessing.Process Doesn't matter
        process.start()
    q.put(item)
    print(f'len putted item: {len(item)}. qsize: {q.qsize()}. buffer len: {len(q._buffer)}')


def worker(local_queue):
    while True:
        try:
            while True:  # get all items
                item = local_queue.get(block=False)
                print(f'len got item: {len(item)}. qsize: {q.qsize()}. buffer len: {len(q._buffer)}')
        except queue.Empty:
            print('empty')


if __name__ == '__main__':
    for i in range(1, 100000, 1000):
        enqueue(list(range(i)))

Output:

empty
empty
empty
len putted item: 1. qsize: 1. buffer len: 1
len putted item: 1001. qsize: 2. buffer len: 2
len putted item: 2001. qsize: 3. buffer len: 1
len putted item: 3001. qsize: 4. buffer len: 2
len putted item: 4001. qsize: 5. buffer len: 3
len putted item: 5001. qsize: 6. buffer len: 4
len putted item: 6001. qsize: 7. buffer len: 5
len putted item: 7001. qsize: 8. buffer len: 6
len putted item: 8001. qsize: 9. buffer len: 7
len putted item: 9001. qsize: 10. buffer len: 8
len putted item: 10001. qsize: 11. buffer len: 9
len putted item: 11001. qsize: 12. buffer len: 10
len putted item: 12001. qsize: 13. buffer len: 11
len putted item: 13001. qsize: 14. buffer len: 12
len putted item: 14001. qsize: 15. buffer len: 13
len putted item: 15001. qsize: 16. buffer len: 14
len got item: 1. qsize: 15. buffer len: 14
len putted item: 16001. qsize: 16. buffer len: 15
len putted item: 17001. qsize: 17. buffer len: 16
len putted item: 18001. qsize: 18. buffer len: 17
len putted item: 19001. qsize: 19. buffer len: 18
len putted item: 20001. qsize: 20. buffer len: 19
len putted item: 21001. qsize: 21. buffer len: 20
len putted item: 22001. qsize: 22. buffer len: 21
len putted item: 23001. qsize: 23. buffer len: 22
len putted item: 24001. qsize: 24. buffer len: 23
len putted item: 25001. qsize: 25. buffer len: 24
len putted item: 26001. qsize: 26. buffer len: 25
len putted item: 27001. qsize: 27. buffer len: 26
len putted item: 28001. qsize: 28. buffer len: 27
len got item: 1001. qsize: 27. buffer len: 27
empty
len putted item: 29001. qsize: 28. buffer len: 28
empty
empty
empty
len got item: 2001. qsize: 27. buffer len: 27
empty
len putted item: 30001. qsize: 28. buffer len: 28

I want you to notice the following about the result: After inserting element 28001, the worker finds that there are no elements left in the queue, whereas there are dozens more. Because of synchronization, I'm okay with only getting all but a few of them. But it only manages to find two!

And this pattern continues.

This seems to do with the size of the objects I put on the queue. For small objects, say i as opposed to list(range(i)), this problem does not appear. But the sizes of the objects that were talking about are still kilobytes, not nearly large enough to dignify such significant delays (in my real-world non-minimal example this took minutes easily)

My question specifically is: How can I share (not so) large amounts of data between processes in Python? Additionally, I'd like to know where in the internal implementation of Queue does this sluggishness comes from

2
  • Also I'm new to Python, so I am open to remarks
    – JBSnorro
    Commented Nov 2, 2017 at 22:15
  • did you find any solution
    – vks
    Commented Apr 15, 2018 at 21:18

2 Answers 2

19

For future readers, you could also try using:

q = multiprocessing.Manager().Queue()

Instead of just

q = multiprocessing.Queue()

I haven't yet fully distilled and understood the mechanisms behind this behavior, but one source I've read claimed it's about:

"when pushing large items onto the queue, the items are essentially buffered, despite the immediate return of the queue’s put function."

The author goes on explaining more about it and a way to fix, but for me, adding the Manager did the trick easy and clean.

UPDATE: I believe this StackOverflow answer is helpful in explaining the issue.

FMQ, mentioned in the accepted answer, is also Python2 exclusive, which is one of the reasons I felt this answer could maybe help more people someday.

5
  • 2
    I have a slow queue although I am only putting tuples of three integers onto it.. Commented Oct 27, 2020 at 8:21
  • 2
    A relevant SO question: stackoverflow.com/a/45236748/2282531
    – Shlomi A
    Commented Jan 9, 2021 at 11:04
  • Replacing multiprocessing.Queue() with multiprocessing.Manager().Queue() solved the problem for me.
    – Shlomi A
    Commented Jan 9, 2021 at 12:28
  • q = multiprocessing.Manager().Queue() works well, resolve my issue. Commented Oct 28, 2021 at 11:01
  • This seems untrue at least for my exact usecase. (Numpy arrays 1920x1080x3 on windows). Manager's Queue is 10ms slower.
    – lucidbrot
    Commented Feb 5 at 13:08
8

I met this problem too. I was sending large numpy arrays (~300MB), and it was so slow at mp.queue.get().

After some look into the python2.7 source code of mp.Queue, I found the slowest part (on unix-like systems) is _conn_recvall() in socket_connection.c, but I was not looking deeper.

To workaround the problem I build an experimental package FMQ.

This project is inspired by the use of multiprocessing.Queue (mp.Queue). mp.Queue is slow for large data item because of the speed limitation of pipe (on Unix-like systems).

With mp.Queue handling the inter-process transfer, FMQ implements a stealer thread, which steals an item from mp.Queue once any item is available, and puts it into a Queue.Queue. Then, the consumer process can fetch the data from the Queue.Queue immediately.

The speed-up is based on the assumption that both producer and consumer processes are compute-intensive (thus multiprocessing is neccessary) and the data is large (eg. >50 227x227 images). Otherwise mp.Queue with multiprocessing or Queue.Queue with threading is good enough.

fmq.Queue is used easily like a mp.Queue.

Note that there are still some Known Issues, as this project is at its early stage.

Not the answer you're looking for? Browse other questions tagged or ask your own question.