Skip to content

Commit

Permalink
Merge pull request #186 from UCL-CCS/noncon_mp_bugfix
Browse files Browse the repository at this point in the history
Encountered rare bug whereby multiprocessing queue gets out of order …
  • Loading branch information
TimWeaving authored Apr 22, 2024
2 parents 852b2e0 + 1105478 commit 0a2b968
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 8 deletions.
5 changes: 2 additions & 3 deletions symmer/operators/noncontextual_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -698,8 +698,7 @@ def energy_via_brute_force(self) -> Tuple[float, np.array, np.array]:
nu_list[:,~self.fixed_ev_mask] = np.array(list(itertools.product([-1,1],repeat=np.sum(~self.fixed_ev_mask))))

# # optimize over all discrete value assignments of nu in parallel
tracker = get_noncon_energy(nu_list, self.NC_op)
full_search_results = zip(tracker, nu_list)
full_search_results = get_noncon_energy(nu_list, self.NC_op)
energy, fixed_nu = min(full_search_results, key=lambda x:x[0])

return energy, fixed_nu
Expand Down Expand Up @@ -736,4 +735,4 @@ def get_noncon_energy(nu: np.array, noncon_H:NoncontextualOp) -> float:
"""
The classical objective function that encodes the noncontextual energies.
"""
return noncon_H.get_energy(nu)
return noncon_H.get_energy(nu), nu
13 changes: 8 additions & 5 deletions symmer/process_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,24 +63,27 @@ def _process_mp(self, func, iter, shared):
if self.verbose:
print(f'*** executing in multiprocessing mode ***')
# wrapper function for putting results into queue
def _func(iter, shared, _queue=None):
def _func(iter, shared, _order=None, _queue=None):
data_out = func(iter, shared)
_queue.put(data_out)
_queue.put((_order, data_out))

chunks = list(self.prepare_chunks(iter))
procs = [] # for storing processes
queue = Queue(self.n_chunks) # storage of data from processes
for chunk in chunks:
proc = Process(target=_func, args=(chunk, shared, queue))
for index,chunk in enumerate(chunks): # index to ensure procs returned in correct order
proc = Process(target=_func, args=(chunk, shared, index, queue))
procs.append(proc)
proc.start()
# retrieve data from the queue
data = []
for _ in range(self.n_chunks):
data += queue.get()
data.append(queue.get())
# complete the processes
for proc in procs:
proc.join()
# sort by correct chunk ordering and flatten data
_,data = zip(*sorted(data))
data = [a for b in data for a in b]
return data

def _process_single(self, func, iter, shared):
Expand Down

0 comments on commit 0a2b968

Please sign in to comment.