367

In the example code below, I'd like to get the return value of the function worker. How can I go about doing this? Where is this value stored?

Example Code:

import multiprocessing

def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print jobs

Output:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]

I can't seem to find the relevant attribute in the objects stored in jobs.

14 Answers 14

365

Use a shared variable to communicate. For example, like this,

Example Code:

import multiprocessing


def worker(procnum, return_dict):
    """worker function"""
    print(str(procnum) + " represent!")
    return_dict[procnum] = procnum


if __name__ == "__main__":
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i, return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print(return_dict.values())

Output:

0 represent!
1 represent!
3 represent!
2 represent!
4 represent!
[0, 1, 3, 2, 4]
12
  • 78
    I would recommend using a multiprocessing.Queue, rather than a Manager here. Using a Manager requires spawning an entirely new process, which is overkill when a Queue would do.
    – dano
    Commented Apr 19, 2015 at 0:54
  • 6
    @dano : I wonder, if we use Queue() object, we can not sure the order when each process return the value. I mean if we need the order in the result, to do the next work. How could we sure where exactly which output is from which process
    – Chau Pham
    Commented Sep 29, 2016 at 11:08
  • 8
    @Catbuilts You could return a tuple from each process, where one value is the actual return value you care about, and the other is a unique identifier from the process. But I also wonder why you need to know which process is returning which value. If that what you actually need to know about the process, or do you need to correlate between your list of inputs and the list of outputs? In that case, I would recommend using multiprocessing.Pool.map to process your list of work items.
    – dano
    Commented Dec 1, 2016 at 14:43
  • 28
    caveats for functions with only a single argument : should use args=(my_function_argument, ). Note the , comma here! Or else Python will complain "missing positional arguments". Took me 10 minutes to figure out. Also check the manual usage (under the "process class" section).
    – yuqli
    Commented Apr 29, 2019 at 15:17
  • 6
    @vartec one drawback of using a multipriocessing.Manager() dictionary is that is pickles (serializes) the object it returns, so it has a bottleneck given by the pickle library of maximum 2GiB size for the object to return. Is there any other way of doing this avoiding the serialization of the returning object?
    – hirschme
    Commented Nov 13, 2019 at 21:46
99

I think the approach suggested by sega_sai is the better one. But it really needs a code example, so here goes:

import multiprocessing
from os import getpid

def worker(procnum):
    print('I am number %d in process %d' % (procnum, getpid()))
    return getpid()

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes = 3)
    print(pool.map(worker, range(5)))

Which will print the return values:

I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]

If you are familiar with map (the Python 2 built-in) this should not be too challenging. Otherwise have a look at sega_Sai's link.

Note how little code is needed. (Also note how processes are re-used.)

14
  • 2
    Any ideas why my getpid() return all the same value? I'm running Python3
    – zelusp
    Commented Oct 29, 2016 at 17:39
  • I'm not sure how Pool distributes tasks over workers. Maybe they can all end up at the same worker if they're really fast? Does it happen consistently? Also if you add a delay?
    – Mark
    Commented Oct 31, 2016 at 15:30
  • I also thought it was a speed related thing but when I feed pool.map a range of 1,000,000 using more than 10 processes I see at most two different pids.
    – zelusp
    Commented Oct 31, 2016 at 19:00
  • 1
    Then I'm not sure. I think it'd be interesting to open a separate question for this.
    – Mark
    Commented Nov 1, 2016 at 11:27
  • 1
    If the things you want to send a different function to each process, use pool.apply_async: docs.python.org/3/library/…
    – Kyle
    Commented Jun 5, 2019 at 20:28
71

For anyone else who is seeking how to get a value from a Process using Queue:

import multiprocessing

ret = {'foo': False}

def worker(queue):
    ret = queue.get()
    ret['foo'] = True
    queue.put(ret)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    queue.put(ret)
    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    p.join()
    print(queue.get())  # Prints {"foo": True}

Note that in Windows or Jupyter Notebook, with multithreading you have to save this as a file and execute the file. If you do it in a command prompt you will see an error like this:

 AttributeError: Can't get attribute 'worker' on <module '__main__' (built-in)>
15
  • 2
    when i put something in a queue in my worker process my join is never reached. Any idea how this might come? Commented Oct 6, 2016 at 12:30
  • @LaurensKoppenol do you mean that your main code hangs at p.join() permanently and never continues? Does your process have an infinite loop? Commented Oct 6, 2016 at 17:44
  • 7
    Yes, it hangs there infinitely. My workers all finish (loop within worker function ends, print statement afterwards is printed, for all workers). The join doesn't do anything. If i remove the Queue from my function it does let me pass the join() Commented Oct 10, 2016 at 8:11
  • @LaurensKoppenol Are you perhaps not calling queue.put(ret) prior to calling p.start() ? In that case, the worker thread will hang at queue.get() forever. You can replicate this by copying my snippet above while commenting out queue.put(ret). Commented Aug 16, 2017 at 2:47
  • 1
    @Bendemann Someone edited the answer and made it incorrect by placing the queue.get before the queue.join. I've fixed it now by placing queue.get after p.join. Please try again. Commented Jul 28, 2020 at 16:58
49

For some reason, I couldn't find a general example of how to do this with Queue anywhere (even Python's doc examples don't spawn multiple processes), so here's what I got working after like 10 tries:

from multiprocessing import Process, Queue

def add_helper(queue, arg1, arg2): # the func called in child processes
    ret = arg1 + arg2
    queue.put(ret)

def multi_add(): # spawns child processes
    q = Queue()
    processes = []
    rets = []
    for _ in range(0, 100):
        p = Process(target=add_helper, args=(q, 1, 2))
        processes.append(p)
        p.start()
    for p in processes:
        ret = q.get() # will block
        rets.append(ret)
    for p in processes:
        p.join()
    return rets

Queue is a blocking, thread-safe queue that you can use to store the return values from the child processes. So you have to pass the queue to each process. Something less obvious here is that you have to get() from the queue before you join the Processes or else the queue fills up and blocks everything.

Update for those who are object-oriented (tested in Python 3.4):

from multiprocessing import Process, Queue

class Multiprocessor():

    def __init__(self):
        self.processes = []
        self.queue = Queue()

    @staticmethod
    def _wrapper(func, queue, args, kwargs):
        ret = func(*args, **kwargs)
        queue.put(ret)

    def run(self, func, *args, **kwargs):
        args2 = [func, self.queue, args, kwargs]
        p = Process(target=self._wrapper, args=args2)
        self.processes.append(p)
        p.start()

    def wait(self):
        rets = []
        for p in self.processes:
            ret = self.queue.get()
            rets.append(ret)
        for p in self.processes:
            p.join()
        return rets

# tester
if __name__ == "__main__":
    mp = Multiprocessor()
    num_proc = 64
    for _ in range(num_proc): # queue up multiple tasks running `sum`
        mp.run(sum, [1, 2, 3, 4, 5])
    ret = mp.wait() # get all results
    print(ret)
    assert len(ret) == num_proc and all(r == 15 for r in ret)
0
41

This example shows how to use a list of multiprocessing.Pipe instances to return strings from an arbitrary number of processes:

import multiprocessing

def worker(procnum, send_end):
    '''worker function'''
    result = str(procnum) + ' represent!'
    print result
    send_end.send(result)

def main():
    jobs = []
    pipe_list = []
    for i in range(5):
        recv_end, send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=worker, args=(i, send_end))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()

    for proc in jobs:
        proc.join()
    result_list = [x.recv() for x in pipe_list]
    print result_list

if __name__ == '__main__':
    main()

Output:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']

This solution uses fewer resources than a multiprocessing.Queue which uses

  • a Pipe
  • at least one Lock
  • a buffer
  • a thread

or a multiprocessing.SimpleQueue which uses

  • a Pipe
  • at least one Lock

It is very instructive to look at the source for each of these types.

9
  • What would be the best way to do that without making the pipes a global variable?
    – Nickpick
    Commented Oct 25, 2016 at 13:15
  • I put all the global data and code into a main function and it works the same. Does that answer your question?
    – user3657941
    Commented Oct 25, 2016 at 13:43
  • does the pipe always have to be read before any new value can be added (sent) to it?
    – Nickpick
    Commented Oct 25, 2016 at 14:56
  • 6
    This answer causes a deadlock if the returning object is large. Instead of doing the proc.join() first I would first try to recv() the return value and then do the join.
    – L. Pes
    Commented Feb 12, 2020 at 20:13
  • 1
    I am with @L.Pes on this. Could be OS-specific, but I adapted this example to my use case and workers trying to send_end.send(result) for large result would hang indefinitely. Joining after receiving fixed it. Happy to provide an example if N=2 is too anecdotal for you.
    – Vlad
    Commented Apr 22, 2020 at 2:10
19

It seems that you should use the multiprocessing.Pool class instead and use the methods .apply() .apply_async(), map()

Reference: class multiprocessing.pool.AsyncResult

1
  • I have tensorflow code for which multiprocessing.Pool will hang but not multiprocessing.Process
    – Le Frite
    Commented Jun 24, 2019 at 23:34
16

You can use the exit built-in to set the exit code of a process. It can be obtained from the exitcode attribute of the process:

import multiprocessing

def worker(procnum):
    print str(procnum) + ' represent!'
    exit(procnum)

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    result = []
    for proc in jobs:
        proc.join()
        result.append(proc.exitcode)
    print result

Output:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
2
  • 5
    Be warned that this approach could become confusing. Processes should generally exit with exit code 0 is they completed without error. If you have anything monitoring your system process exit codes then you may see these reported as errors. Commented May 23, 2017 at 21:50
  • 2
    Perfect if you just want to raise an exception in the parent process on error.
    – crizCraig
    Commented Jul 19, 2018 at 17:45
12

The pebble package has a nice abstraction leveraging multiprocessing.Pipe which makes this quite straightforward:

from pebble import concurrent

@concurrent.process
def function(arg, kwarg=0):
    return arg + kwarg

future = function(1, kwarg=1)

print(future.result())

Example from: https://pythonhosted.org/Pebble/#concurrent-decorators

11

Thought I'd simplify the simplest examples copied from above, working for me on Py3.6. Simplest is multiprocessing.Pool:

import multiprocessing
import time

def worker(x):
    time.sleep(1)
    return x

pool = multiprocessing.Pool()
print(pool.map(worker, range(10)))

You can set the number of processes in the pool with, e.g., Pool(processes=5). However it defaults to CPU count, so leave it blank for CPU-bound tasks. (I/O-bound tasks often suit threads anyway, as the threads are mostly waiting so can share a CPU core.) Pool also applies chunking optimization.

(Note that the worker method cannot be nested within a method. I initially defined my worker method inside the method that makes the call to pool.map, to keep it all self-contained, but then the processes couldn't import it, and threw "AttributeError: Can't pickle local object outer_method..inner_method". More here. It can be inside a class.)

(Appreciate the original question specified printing 'represent!' rather than time.sleep(), but without it I thought some code was running concurrently when it wasn't.)


Py3's ProcessPoolExecutor is also two lines (.map returns a generator so you need the list()):

from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
    print(list(executor.map(worker, range(10))))

With plain Processes:

import multiprocessing
import time

def worker(x, queue):
    time.sleep(1)
    queue.put(x)

queue = multiprocessing.SimpleQueue()
tasks = range(10)

for task in tasks:
    multiprocessing.Process(target=worker, args=(task, queue,)).start()

for _ in tasks:
    print(queue.get())

Use SimpleQueue if all you need is put and get. The first loop starts all the processes, before the second makes the blocking queue.get calls. I don't think there's any reason to call p.join() too.

3

If you are using Python 3, you can use concurrent.futures.ProcessPoolExecutor as a convenient abstraction:

from concurrent.futures import ProcessPoolExecutor

def worker(procnum):
    '''worker function'''
    print(str(procnum) + ' represent!')
    return procnum


if __name__ == '__main__':
    with ProcessPoolExecutor() as executor:
        print(list(executor.map(worker, range(5))))

Output:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
2

You can use ProcessPoolExecutor to get a return value from a function as shown below:

from concurrent.futures import ProcessPoolExecutor

def test(num1, num2):
    return num1 + num2

with ProcessPoolExecutor() as executor:
    feature = executor.submit(test, 2, 3)
    print(feature.result()) # 5
1

A simple solution:

import multiprocessing

output=[]
data = range(0,10)

def f(x):
    return x**2

def handler():
    p = multiprocessing.Pool(64)
    r=p.map(f, data)
    return r

if __name__ == '__main__':
    output.append(handler())

print(output[0])

Output:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
0

I modified vartec's answer a bit since I needed to get the error codes from the function. (Thanks vartec!!! It's an awesome trick.)

This can also be done with a manager.list, but I think is better to have it in a dict and store a list within it. That way, way we keep the function and the results since we can't be sure of the order in which the list will be populated.

from multiprocessing import Process
import time
import datetime
import multiprocessing


def func1(fn, m_list):
    print 'func1: starting'
    time.sleep(1)
    m_list[fn] = "this is the first function"
    print 'func1: finishing'
    # return "func1"  # No need for return since Multiprocess doesn't return it =(

def func2(fn, m_list):
    print 'func2: starting'
    time.sleep(3)
    m_list[fn] = "this is function 2"
    print 'func2: finishing'
    # return "func2"

def func3(fn, m_list):
    print 'func3: starting'
    time.sleep(9)
    # If fail won't join the rest because it never populate the dict
    # or do a try/except to get something in return.
    raise ValueError("failed here")
    # if we want to get the error in the manager dict we can catch the error
    try:
        raise ValueError("failed here")
        m_list[fn] = "this is third"
    except:
        m_list[fn] = "this is third and it fail horrible"
        # print 'func3: finishing'
        # return "func3"


def runInParallel(*fns):  # * is to accept any input in list
    start_time = datetime.datetime.now()
    proc = []
    manager = multiprocessing.Manager()
    m_list = manager.dict()
    for fn in fns:
        # print fn
        # print dir(fn)
        p = Process(target=fn, name=fn.func_name, args=(fn, m_list))
        p.start()
        proc.append(p)
    for p in proc:
        p.join()  # 5 is the time-out

    print datetime.datetime.now() - start_time
    return m_list, proc

if __name__ == '__main__':
    manager, proc = runInParallel(func1, func2, func3)
    # print dir(proc[0])
    # print proc[0]._name
    # print proc[0].name
    # print proc[0].exitcode

    # Here you can check what did fail
    for i in proc:
        print i.name, i.exitcode  # 'name' was set up in the Process line 53

    # Here will only show the function that worked and where able to populate the
    # manager dict
    for i, j in manager.items():
        print dir(i)  # Things you can do to the function
        print i, j
-1
You can also use decorator for printing result of function

def printer(func):
    def inner(*args, **kwargs):
        result = func(*args, **kwargs)
        print(result)
        return result
    return inner


@printer
def cube(nums: list):
    result = []
    for v in nums:
        result.append(v ** 3)
    return result




if __name__ == '__main__':
    nums = [2, 3, 4, 6]
    p1 = Process(target=square, args=(nums,))
    p2 = Process(target=cube, args=(nums,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

Not the answer you're looking for? Browse other questions tagged or ask your own question.