I've spent a big chunk of today trying to track down a strange deadlock in the parallelised version of my current python script. Eventually, after much digging, I noticed that the hang always occurred when there were 32,767 items in the
I was then able to confirm that a Queue created with
So given that I know I need a work queue that can hold more than 32K items, it looks like I'm going to have to roll my own class to support a larger number of active semaphores — something that I hope will be as simple as subclassing
ETA: I finally fixed this by creating a wrapper class to add a local, producer-side buffer to hold any items that could not be immediately added to the queue. By attempting a flush of the buffer ahead of every attempted put, I was able to reduce the number of
Multiprocessing.Queue, even though the queue had been declared with an unlimited size and the put attempt had not generated a Full exception as might be expected.I was then able to confirm that a Queue created with
q = Queue(2**15) failed with an exception, while a one created with q = Queue(2**15-1) worked as expected and raised a Full condition when the limit was reached. Inspecting the source code of multiprocessing/queue.py, I noticed that maxsize defaults to _multiprocessing.SemLock.SEM_VALUE_MAX, which inevitably comes out at 32,767 and explains the whole problem — although it's unfortunate that the code simply hangs rather than raising an exception when the limit is reached.So given that I know I need a work queue that can hold more than 32K items, it looks like I'm going to have to roll my own class to support a larger number of active semaphores — something that I hope will be as simple as subclassing
Queue and overriding the semaphore routines...ETA: I finally fixed this by creating a wrapper class to add a local, producer-side buffer to hold any items that could not be immediately added to the queue. By attempting a flush of the buffer ahead of every attempted put, I was able to reduce the number of
Full exceptions I had to take by checking for pending elements in the buffer and only accessing the Queue when there were no pending items.