1

I am a bitt struggled with multiprocessing philosophy in Python. To test my knowledge I thought of a multiprocessed programm that computes the prime decomposition of an integer. It goes as follows. Put the integer in a queue. I then have a function that dequeue and search for a (prime) divisor of it. If one is found, the complementary integer is put back in the queue. How can I make this work. For the moment I have this :

import multiprocessing as mp

def f(queue, decomp):
    x = queue.get()
    prime = True
    for i in range(2, x):
        if (x % i) == 0:
            decomp.put(i)
            prime = False
            queue.put(x // i)
            break
    if prime:
        decomp.put(x)

class Num:
    def __init__(self, n):
        self.queue = mp.Queue()
        self.queue.put(n)
        self.decomposition = mp.Queue()

    def run(self):
        with mp.Pool(4) as pool:
            pool.apply_async(f, (self.queue, self.decomposition))

It raises

RuntimeError: Queue objects should only be shared between processes through inheritance

What is the standard way to make this ? (I know there may be better way to give the prime decomposition)

2

1 Answer 1

2

In order to use multiprocessing.Queue, you need to pass it to each child process as the point they are created (so they get "inherited"), rather than passing them as parameters to apply_async. If you're on Linux, you can do this by declaring them in the global scope, instead of as intance variables on the Num class - they will get inherited via the forking process:

import multiprocessing as mp

queue = mp.Queue()
decomposition = mp.Queue()

def f():
    x = queue.get()
    prime = True
    for i in range(2, x):
        if (x % i) == 0:
            decomposition.put(i)
            prime = False
            queue.put(x // i)
            break
    if prime:
        decomposition.put(x)
    
class Num:
    def __init__(self, n):
        queue.put(n)

    def run(self):
        with mp.Pool(4) as pool:
            pool.apply(f)

On Windows, it is a bit more involved, since it does not have support for forking. Instead, you have to use the init and initargs keyword parameters on the Pool constructor to pass the queues to the child processes, and then declare them as global variables inside the initializer function you provide. This will put the the queues in the global scope of your worker processes, allowing you to use them in the functions you pass to all Pool methods (map/map_async, apply/apply_async).

import multiprocessing as mp

def f():
    x = queue.get()
    prime = True
    for i in range(2, x):
        if (x % i) == 0:
            decomp.put(i)
            prime = False
            queue.put(x // i)
            break
    if prime:
        decomp.put(x)


def init(q, d):
    # Put the queues in the global scope of the worker processes
    global queue, decomp
    queue = q
    decomp = d        

class Num:
    def __init__(self, n):
        self.queue = mp.Queue()
        self.queue.put(n)
        self.decomposition = mp.Queue()

    def run(self):
        with mp.Pool(4, initializer=init, initargs=(self.queue, self.decomposition)) as pool:
            pool.apply(f)
2
  • Thanks for the clarification. In the Linux case, what is the use of the init function ? A side question. Do you have any reference on multiprogramming, I am a bit lost and the documentation of Python is not reallly helpful. Commented Dec 14, 2020 at 18:58
  • It was an error to include the init function in the Linux example; I've removed it. Sorry, I don't know of any resources that cover the correct use of multiprocessing. I agree that the documentation is difficult to digest and doesn't explicitly cover a lot of stuff that it probably should (like the fact that multiprocessing.Queue can't be pickled, and what you should do instead). The rules and best practices for working with multiprocessing are not very intuitive, so I think they really need something very example-driven to help people understand it, but that's not what's there now.
    – dano
    Commented Dec 14, 2020 at 20:27

Not the answer you're looking for? Browse other questions tagged or ask your own question.