6

Here's what I am trying to accomplish -

  1. I have about a million files which I need to parse & append the parsed content to a single file.
  2. Since a single process takes ages, this option is out.
  3. Not using threads in Python as it essentially comes to running a single process (due to GIL).
  4. Hence using multiprocessing module. i.e. spawning 4 sub-processes to utilize all that raw core power :)

So far so good, now I need a shared object which all the sub-processes have access to. I am using Queues from the multiprocessing module. Also, all the sub-processes need to write their output to a single file. A potential place to use Locks I guess. With this setup when I run, I do not get any error (so the parent process seems fine), it just stalls. When I press ctrl-C I see a traceback (one for each sub-process). Also no output is written to the output file. Here's code (note that everything runs fine without multi-processes) -

import os
import glob
from multiprocessing import Process, Queue, Pool

data_file  = open('out.txt', 'w+')

def worker(task_queue):
    for file in iter(task_queue.get, 'STOP'):
        data = mine_imdb_page(os.path.join(DATA_DIR, file))
        if data:
            data_file.write(repr(data)+'\n')
    return

def main():
    task_queue = Queue()
    for file in glob.glob('*.csv'):
        task_queue.put(file)
    task_queue.put('STOP') # so that worker processes know when to stop

    # this is the block of code that needs correction.
    if multi_process:
        # One way to spawn 4 processes
        # pool = Pool(processes=4) #Start worker processes
        # res  = pool.apply_async(worker, [task_queue, data_file])

        # But I chose to do it like this for now.
        for i in range(4):
            proc = Process(target=worker, args=[task_queue])
            proc.start()
    else: # single process mode is working fine!
        worker(task_queue)
    data_file.close()
    return

what am I doing wrong? I also tried passing the open file_object to each of the processes at the time of spawning. But to no effect. e.g.- Process(target=worker, args=[task_queue, data_file]). But this did not change anything. I feel the subprocesses are not able to write to the file for some reason. Either the instance of the file_object is not getting replicated (at the time of spawn) or some other quirk... Anybody got an idea?

EXTRA: Also Is there any way to keep a persistent mysql_connection open & pass it across to the sub_processes? So I open a mysql connection in my parent process & the open connection should be accessible to all my sub-processes. Basically this is the equivalent of a shared_memory in python. Any ideas here?

7
  • If you don't write to file but do a print, does it work then? (on Linux I'd do python script.py > out.dat to prevent screen flooding).
    – extraneon
    Commented Aug 27, 2010 at 17:37
  • 1
    And I think proc.start is nonblocking, so you probably should have a wait somewhere to give the process the opportunity to do some work before you do datafile.close()
    – extraneon
    Commented Aug 27, 2010 at 17:42
  • data_file.close() is done at the very end. Should it effect here? Also print works fine. I see the output on the screen when I use print... But I want to use file. help! Also Is there any way to keep a persistent mysql_connection open & pass it across to the sub_processes? Commented Aug 27, 2010 at 17:47
  • @extraneon: good catch, but if the program attempted to write on a closed file, an exception should be raised.
    – badp
    Commented Aug 27, 2010 at 17:51
  • are you reading or writing to mysql, or both?
    – Eric Snow
    Commented Aug 27, 2010 at 18:00

2 Answers 2

4

Although the discussion with Eric was fruitful, later on I found a better way of doing this. Within the multiprocessing module there is a method called 'Pool' which is perfect for my needs.

It's optimizes itself to the number of cores my system has. i.e. only as many processes are spawned as the no. of cores. Of course this is customizable. So here's the code. Might help someone later-

from multiprocessing import Pool

def main():
    po = Pool()
    for file in glob.glob('*.csv'):
        filepath = os.path.join(DATA_DIR, file)
        po.apply_async(mine_page, (filepath,), callback=save_data)
    po.close()
    po.join()
    file_ptr.close()

def mine_page(filepath):
    #do whatever it is that you want to do in a separate process.
    return data

def save_data(data):
    #data is a object. Store it in a file, mysql or...
    return

Still going through this huge module. Not sure if save_data() is executed by parent process or this function is used by spawned child processes. If it's the child which does the saving it might lead to concurrency issues in some situations. If anyone has anymore experience in using this module, you appreciate more knowledge here...

3

The docs for multiprocessing indicate several methods of sharing state between processes:

http://docs.python.org/dev/library/multiprocessing.html#sharing-state-between-processes

I'm sure each process gets a fresh interpreter and then the target (function) and args are loaded into it. In that case, the global namespace from your script would have been bound to your worker function, so the data_file would be there. However, I am not sure what happens to the file descriptor as it is copied across. Have you tried passing the file object as one of the args?

An alternative is to pass another Queue that will hold the results from the workers. The workers put the results and the main code gets the results and writes it to the file.

2
  • Yeah! I could do that. I could have another Queue which would be something like an out_queue which the processes write into. Since the parent process has access to this It could keep reading this queue & write to file. This could work!! Also I tried passing the file object as one of the args. It does not seem to work. The threads don't write to the file. Also Eric, any idea how to pass around a persistent mysql connections to sub-processes? Commented Aug 27, 2010 at 18:13
  • @Srikar, hope that helps. As to the mysql connections, I'm not sure on that one. I would say you are better off with a separate connection for each process. Even if you could a connection, I am not sure how "thread-safe" it is. If you really had to share just one then you would probably have to do weird stuff. Then again, you could proxy the connection's query/response mechanism in a Queue as well. Then the main process (or a separate mysql handler process) get the queries from the Queue, run them, and put the results back...or something like that.
    – Eric Snow
    Commented Aug 27, 2010 at 18:39

Not the answer you're looking for? Browse other questions tagged or ask your own question.