Parallelising python programs
Jun. 8th, 2011 10:00 pmQueue objects to assign work to each of the threads and to combine the results in the main process.
The structure of the final program was something a bit like this:
class Analysis:
def __init__(self, worklist, nprocs):
self.work = Queue()
self.results = Queue()
for item in worklist:
self.work.put(item)
tasks = []
for p in range(nprocs):
# Create and start each task
tasks.append(Process(target=self.work))
tasks[-1].start()
remaining = len(worklist)
while True:
# Process each result in turn
item = self.results.get()
remaining -= 1
# Do some results processing
if remaining == 0:
# Stop when everything is accounted for
break
for p in tasks:
# Wait for all the tasks to complete
p.join()
return None
def work(self):
while True:
try:
item = self.work.get(timeout=30)
except Queue.Empty:
break
# Deal with the current item of work
self.results.put(result)
return True
Along the way I discovered a couple of potential gotchas. I found that unless I emptied both Queue data structures before I attempted to join() the worker tasks, the program would deadlock. I also found that I needed to explicitly count the number of remaining results in order to determine whether the output queue was empty, because otherwise I'd have needed to use a timeout that was longer than longest possible elapsed time of each unit of work — something that would have had a significant impact on the over all run time of the program.
The eventual performance of the end program was extremely satisfactory. I managed to parallelise the code in about a quarter of the elapsed time of shortest serial analysis and, because the scalability of the program was almost linear, I got a 60x improvement on a single run and 200x improvement on the reanalysis of all my data sets.