sawyl: (Default)
[personal profile] sawyl
Having been asked to re-run an extremely slow piece of data analysis, I decided that I couldn't face waiting a week for my data and decided instead to parallelise my program. After investigating and rejecting python threads on the grounds of GIL contention, I eventually decided to use the multiprocessing module to distribute the work over a number of different OS processes, using a pair of Queue objects to assign work to each of the threads and to combine the results in the main process.

The structure of the final program was something a bit like this:

   class Analysis:
    
        def __init__(self, worklist, nprocs):
    
            self.work = Queue()
            self.results = Queue()
    
            for item in worklist:
                self.work.put(item)
    
            tasks = []
            for p in range(nprocs):
                # Create and start each task
                tasks.append(Process(target=self.work))
                tasks[-1].start()
    
            remaining = len(worklist)
            while True:
                # Process each result in turn
                item = self.results.get()
                remaining -= 1
    
                # Do some results processing
    
                if remaining == 0:
                    # Stop when everything is accounted for
                    break
    
            for p in tasks:
                # Wait for all the tasks to complete
                p.join()
    
            return None
    
    
        def work(self):
    
            while True:
                try:
                    item = self.work.get(timeout=30)
                except Queue.Empty:
                    break
    
                # Deal with the current item of work
                
                self.results.put(result)
    
            return True

Along the way I discovered a couple of potential gotchas. I found that unless I emptied both Queue data structures before I attempted to join() the worker tasks, the program would deadlock. I also found that I needed to explicitly count the number of remaining results in order to determine whether the output queue was empty, because otherwise I'd have needed to use a timeout that was longer than longest possible elapsed time of each unit of work — something that would have had a significant impact on the over all run time of the program.

The eventual performance of the end program was extremely satisfactory. I managed to parallelise the code in about a quarter of the elapsed time of shortest serial analysis and, because the scalability of the program was almost linear, I got a 60x improvement on a single run and 200x improvement on the reanalysis of all my data sets.

Profile

sawyl: (Default)
sawyl

August 2018

S M T W T F S
   123 4
5 6 7 8910 11
12131415161718
192021222324 25
262728293031 

Most Popular Tags

Style Credit

Expand Cut Tags

No cut tags
Page generated Feb. 4th, 2026 10:08 pm
Powered by Dreamwidth Studios