0

Please Review this basic example

def queue_add():
    #this will exit when all lines from the 
    #file are added in the Queue
    with open('get_from.txt') as f:
        for line in f:
            q.add(line.strip())

def queue_save():
    #when to trigger save_to.close()?!?!
    save_to = open('save_to.txt')
    while True:
        data = q.get() #this functions blocks if `q` is empty, so how to know
                       #when to close the file handler `save_to`??
        save_to.write(data)

def worker():
    #this is daemon process 
    while True:
        #work with file from 
        get = q.get()
        #work with data
        q_done.put('processed data')
        q.task_done()

q = Queue()
q_done = Queue()
#starts the processes here..

So my question is how to know queue_save() has processed and saved all the data from the done_q and close it's file_handler?

1 Answer 1

1

How about using sentinel value.

import Queue
import threading

END_OF_DATA = object() # sentinel

def queue_add(q):
    with open('get_from.txt') as f:
        for line in f:
            q.put(line.strip())

def process(x):
    # dummy
    return str(len(x)) + '\n'

def worker(q_in, q_out):
    while True:
        data = q_in.get()
        if data is END_OF_DATA:
            break
        q_out.put(process(data))
        q_in.task_done()

def queue_save(q):
    save_to = open('save_to.txt', 'w')
    while True:
        data = q.get()
        if data is END_OF_DATA:
            break
        save_to.write(data)
        q.task_done()
    save_to.close()


q1 = Queue.Queue()
q2 = Queue.Queue()

n_workers = 4
t1 = threading.Thread(target=queue_add, args=(q1,))
workers = [
    threading.Thread(target=worker, args=(q1, q2,))
    for i in range(n_workers)
]
t3 = threading.Thread(target=queue_save, args=(q2,))

t1.start()
for worker in workers: worker.start()
t3.start()

t1.join()

q1.join()
for worker in workers: q1.put(END_OF_DATA)

for worker in workers: worker.join()

q2.join()
q2.put(END_OF_DATA)
t3.join()

EDIT: synchronize multiple workers.

4
  • 2
    If None is valid data then you should use END_OF_DATA = object() which guarantees that data is END_OF_DATA is true if and only if the data is really the sentinel value.
    – Bakuriu
    Commented Jun 10, 2013 at 10:34
  • data = q_in.get() data can't be None, if there isn't any items left in the Queue, Queue.get() blocks. Commented Jun 10, 2013 at 11:28
  • @nacholibre q_in.get() can be None because producer (queue_add, worker) puts None.
    – falsetru
    Commented Jun 10, 2013 at 15:53
  • +1 for END_OF_DATA = object() . Basically in this way you can send commands, without having separate channel for it. Question is how several workers can be synchronized using same method - probably global variable?
    – Nick
    Commented Jun 10, 2013 at 16:56

Not the answer you're looking for? Browse other questions tagged or ask your own question.