I need help in understanding multiprocessing.Queue
. The problem I'm facing is that getting results from queue.get(...)
are hilariously behind compared to calls to queue.put(...)
and the queue's buffer (the deque).
This leaking abstraction led me to investigate the internals of the queue. Its straightforward source code just points me to the deque implementation, and that also seems simple enough that I cannot use it to explain the behavior I'm seeing. Also I read that Queue uses pipes, but I can't seem to find that in the source code.
I've boiled it down to a minimal example reproducing the problem, and I specify a possible output below that.
import threading
import multiprocessing
import queue
q = None
def enqueue(item):
global q
if q is None:
q = multiprocessing.Queue()
process = threading.Thread(target=worker, args=(q,)) # or multiprocessing.Process Doesn't matter
process.start()
q.put(item)
print(f'len putted item: {len(item)}. qsize: {q.qsize()}. buffer len: {len(q._buffer)}')
def worker(local_queue):
while True:
try:
while True: # get all items
item = local_queue.get(block=False)
print(f'len got item: {len(item)}. qsize: {q.qsize()}. buffer len: {len(q._buffer)}')
except queue.Empty:
print('empty')
if __name__ == '__main__':
for i in range(1, 100000, 1000):
enqueue(list(range(i)))
Output:
empty
empty
empty
len putted item: 1. qsize: 1. buffer len: 1
len putted item: 1001. qsize: 2. buffer len: 2
len putted item: 2001. qsize: 3. buffer len: 1
len putted item: 3001. qsize: 4. buffer len: 2
len putted item: 4001. qsize: 5. buffer len: 3
len putted item: 5001. qsize: 6. buffer len: 4
len putted item: 6001. qsize: 7. buffer len: 5
len putted item: 7001. qsize: 8. buffer len: 6
len putted item: 8001. qsize: 9. buffer len: 7
len putted item: 9001. qsize: 10. buffer len: 8
len putted item: 10001. qsize: 11. buffer len: 9
len putted item: 11001. qsize: 12. buffer len: 10
len putted item: 12001. qsize: 13. buffer len: 11
len putted item: 13001. qsize: 14. buffer len: 12
len putted item: 14001. qsize: 15. buffer len: 13
len putted item: 15001. qsize: 16. buffer len: 14
len got item: 1. qsize: 15. buffer len: 14
len putted item: 16001. qsize: 16. buffer len: 15
len putted item: 17001. qsize: 17. buffer len: 16
len putted item: 18001. qsize: 18. buffer len: 17
len putted item: 19001. qsize: 19. buffer len: 18
len putted item: 20001. qsize: 20. buffer len: 19
len putted item: 21001. qsize: 21. buffer len: 20
len putted item: 22001. qsize: 22. buffer len: 21
len putted item: 23001. qsize: 23. buffer len: 22
len putted item: 24001. qsize: 24. buffer len: 23
len putted item: 25001. qsize: 25. buffer len: 24
len putted item: 26001. qsize: 26. buffer len: 25
len putted item: 27001. qsize: 27. buffer len: 26
len putted item: 28001. qsize: 28. buffer len: 27
len got item: 1001. qsize: 27. buffer len: 27
empty
len putted item: 29001. qsize: 28. buffer len: 28
empty
empty
empty
len got item: 2001. qsize: 27. buffer len: 27
empty
len putted item: 30001. qsize: 28. buffer len: 28
I want you to notice the following about the result: After inserting element 28001, the worker finds that there are no elements left in the queue, whereas there are dozens more. Because of synchronization, I'm okay with only getting all but a few of them. But it only manages to find two!
And this pattern continues.
This seems to do with the size of the objects I put on the queue. For small objects, say i
as opposed to list(range(i))
, this problem does not appear. But the sizes of the objects that were talking about are still kilobytes, not nearly large enough to dignify such significant delays (in my real-world non-minimal example this took minutes easily)
My question specifically is: How can I share (not so) large amounts of data between processes in Python? Additionally, I'd like to know where in the internal implementation of Queue does this sluggishness comes from