Skip to content

Commit

Permalink
try hybrid MPI
Browse files Browse the repository at this point in the history
  • Loading branch information
Qiang Zhu committed Sep 1, 2024
1 parent a146cc6 commit 503b38f
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 26 deletions.
2 changes: 1 addition & 1 deletion pyxtal/optimize/WFS.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def run_mpi(self):

# broadcast
current_xtals = self.comm.bcast(current_xtals, root=0)
print(f"Rank {self.rank} after broadcast: current_xtals = {current_xtals}")
#print(f"Rank {self.rank} after broadcast: current_xtals = {current_xtals}")

# Local optimization
gen_results = self.local_optimization(gen, current_xtals)
Expand Down
39 changes: 14 additions & 25 deletions pyxtal/optimize/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,19 @@ def run_optimizer_single_with_timeout(args, timeout=60.0):
"""Run optimizer_single with a timeout and error handling."""
def worker(*args):
try:
return optimizer_single(*args)
xtal, match = optimizer_single(*args)
return args[1], xtal, match
except Exception as e:
print(f"Error in worker thread: {e}")
return None, False # Or any default value you prefer
return None, None, False # Or any default value you prefer

with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(worker, *args)
try:
return future.result(timeout=timeout)
except concurrent.futures.TimeoutError:
except TimeoutError:
print(f"Timeout: Optimization took longer than {timeout} seconds.")
return None, False # Return a default value indicating a timeout
return None, None, False # Return a default value indicating a timeout

def run_optimizer(args, timeout=60):
"""
Expand Down Expand Up @@ -126,11 +127,8 @@ def __init__(
from mpi4py import MPI
self.comm = MPI.COMM_WORLD
self.rank = self.comm.Get_rank()
node_name = MPI.Get_processor_name()
unique_node_names = self.comm.gather(node_name, root=0)
self.node = node_name
self.size = len(unique_node_names)
#self.size = self.comm.Get_size()
self.node = MPI.Get_processor_name()
self.size = self.ncpu
else:
self.rank = 0
self.size = self.ncpu
Expand Down Expand Up @@ -983,36 +981,27 @@ def local_optimization_mpi(self, gen, xtals, qrs=False):
# Distribute args_lists across available ranks (processes)
local_args = args_lists[self.rank::self.size]

# Run the optimizer in parallel using MPI
#local_results = []
#for args in local_args:
# # print('rank', self.rank, 'id', args[1])
# xtal, match = run_optimizer_single_with_timeout(args, timeout=60)
# # (xtal, match) = optimizer_single(*args)
# local_results.append((args[1], xtal, match))

# Determine the number of cores available on the node
num_cores = multiprocessing.cpu_count() // self.size
num_cores = multiprocessing.cpu_count() #// self.size
print("Local optimization distribution", self.rank, num_cores)

# Use multiprocessing within each MPI rank (node)
with multiprocessing.Pool(processes=num_cores) as pool:
local_results = pool.map(run_optimizer, local_args)

#local_results = []
#for args in local_args:
# print("Local optimization", self.rank, args[1])
# result = run_optimizer(args)
# print("Local optimization", self.rank, result)
# local_results.append(result)
# Synchronize before gathering
self.comm.Barrier()
print(f"Rank {self.rank} reached before gather")

# Gather all results at the root process
print('Gather all results at the root process')
all_results = self.comm.gather(local_results, root=0)

# Process results at the root process
print("Process results at the root process")
gen_results = None
if self.rank == 0:
gen_results = [(None, None)] * len(xtals)
gen_results = [(None, None, None)] * len(xtals)
for result_set in all_results:
for res in result_set:
(id, xtal, match) = res
Expand Down

0 comments on commit 503b38f

Please sign in to comment.