diff --git a/symmer/process_handler.py b/symmer/process_handler.py index 4f15a363..aeccc936 100644 --- a/symmer/process_handler.py +++ b/symmer/process_handler.py @@ -63,24 +63,27 @@ def _process_mp(self, func, iter, shared): if self.verbose: print(f'*** executing in multiprocessing mode ***') # wrapper function for putting results into queue - def _func(iter, shared, _queue=None): + def _func(iter, shared, _order=None, _queue=None): data_out = func(iter, shared) - _queue.put(data_out) + _queue.put((_order, data_out)) chunks = list(self.prepare_chunks(iter)) procs = [] # for storing processes queue = Queue(self.n_chunks) # storage of data from processes - for chunk in chunks: - proc = Process(target=_func, args=(chunk, shared, queue)) + for index,chunk in enumerate(chunks): # index to ensure procs returned in correct order + proc = Process(target=_func, args=(chunk, shared, index, queue)) procs.append(proc) proc.start() # retrieve data from the queue data = [] for _ in range(self.n_chunks): - data += queue.get() + data.append(queue.get()) # complete the processes for proc in procs: proc.join() + # sort by correct chunk ordering and flatten data + _,data = zip(*sorted(data)) + data = [a for b in data for a in b] return data def _process_single(self, func, iter, shared):