Skip to content

Commit

Permalink
Fixed multiprocessing bug whereby queue gets out-of-order by returnin…
Browse files Browse the repository at this point in the history
…g order index with each process
  • Loading branch information
TimWeaving committed Apr 22, 2024
1 parent 751d6fe commit 1105478
Showing 1 changed file with 8 additions and 5 deletions.
13 changes: 8 additions & 5 deletions symmer/process_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,24 +63,27 @@ def _process_mp(self, func, iter, shared):
if self.verbose:
print(f'*** executing in multiprocessing mode ***')
# wrapper function for putting results into queue
def _func(iter, shared, _queue=None):
def _func(iter, shared, _order=None, _queue=None):
data_out = func(iter, shared)
_queue.put(data_out)
_queue.put((_order, data_out))

chunks = list(self.prepare_chunks(iter))
procs = [] # for storing processes
queue = Queue(self.n_chunks) # storage of data from processes
for chunk in chunks:
proc = Process(target=_func, args=(chunk, shared, queue))
for index,chunk in enumerate(chunks): # index to ensure procs returned in correct order
proc = Process(target=_func, args=(chunk, shared, index, queue))
procs.append(proc)
proc.start()
# retrieve data from the queue
data = []
for _ in range(self.n_chunks):
data += queue.get()
data.append(queue.get())
# complete the processes
for proc in procs:
proc.join()
# sort by correct chunk ordering and flatten data
_,data = zip(*sorted(data))
data = [a for b in data for a in b]
return data

def _process_single(self, func, iter, shared):
Expand Down

0 comments on commit 1105478

Please sign in to comment.