From 6426aae2e4d30501974409b6a8d8b9cf030198da Mon Sep 17 00:00:00 2001 From: "Michael T. Campbell" Date: Thu, 7 Sep 2023 06:10:32 -0700 Subject: [PATCH 01/11] Add contextmanager import --- mirgecom/simutil.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mirgecom/simutil.py b/mirgecom/simutil.py index eebe03316..d348228a6 100644 --- a/mirgecom/simutil.py +++ b/mirgecom/simutil.py @@ -74,6 +74,8 @@ import sys from functools import partial from typing import TYPE_CHECKING, Dict, List, Optional +from contextlib import contextmanager + from logpyle import IntervalTimer import grudge.op as op From 98a951125ab6874deab1551059dc552a79e90ceb Mon Sep 17 00:00:00 2001 From: Mike Campbell Date: Thu, 7 Sep 2023 08:23:04 -0500 Subject: [PATCH 02/11] Add mesh partitioning utilities --- mirgecom/simutil.py | 88 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/mirgecom/simutil.py b/mirgecom/simutil.py index d348228a6..610ce757b 100644 --- a/mirgecom/simutil.py +++ b/mirgecom/simutil.py @@ -890,6 +890,94 @@ def generate_and_distribute_mesh(comm, generate_mesh, **kwargs): return distribute_mesh(comm, generate_mesh) +def _partition_single_volume_mesh( + mesh, num_ranks, rank_per_element, *, return_ranks=None): + rank_to_elements = { + rank: np.where(rank_per_element == rank)[0] + for rank in range(num_ranks)} + + from meshmode.mesh.processing import partition_mesh + return partition_mesh( + mesh, rank_to_elements, return_parts=return_ranks) + + +def _partition_multi_volume_mesh( + mesh, num_ranks, rank_per_element, tag_to_elements, volume_to_tags, *, + return_ranks=None): + if return_ranks is None: + return_ranks = [i for i in range(num_ranks)] + + tag_to_volume = { + tag: vol + for vol, tags in volume_to_tags.items() + for tag in tags} + + volumes = list(volume_to_tags.keys()) + + volume_index_per_element = np.full(mesh.nelements, -1, dtype=int) + for tag, elements in tag_to_elements.items(): + volume_index_per_element[elements] = volumes.index( + tag_to_volume[tag]) + + if np.any(volume_index_per_element < 0): + raise ValueError("Missing volume specification for some elements.") + + part_id_to_elements = { + PartID(volumes[vol_idx], rank): + np.where( + (volume_index_per_element == vol_idx) + & (rank_per_element == rank))[0] + for vol_idx in range(len(volumes)) + for rank in range(num_ranks)} + + # FIXME: Find a better way to do this + part_id_to_part_index = { + part_id: part_index + for part_index, part_id in enumerate(part_id_to_elements.keys())} + from meshmode.mesh.processing import _compute_global_elem_to_part_elem + global_elem_to_part_elem = _compute_global_elem_to_part_elem( + mesh.nelements, part_id_to_elements, part_id_to_part_index, + mesh.element_id_dtype) + + tag_to_global_to_part = { + tag: global_elem_to_part_elem[elements, :] + for tag, elements in tag_to_elements.items()} + + part_id_to_tag_to_elements = {} + for part_id in part_id_to_elements.keys(): + part_idx = part_id_to_part_index[part_id] + part_tag_to_elements = {} + for tag, global_to_part in tag_to_global_to_part.items(): + part_tag_to_elements[tag] = global_to_part[ + global_to_part[:, 0] == part_idx, 1] + part_id_to_tag_to_elements[part_id] = part_tag_to_elements + + return_parts = { + PartID(vol, rank) + for vol in volumes + for rank in return_ranks} + + from meshmode.mesh.processing import partition_mesh + part_id_to_mesh = partition_mesh( + mesh, part_id_to_elements, return_parts=return_parts) + + return { + rank: { + vol: ( + part_id_to_mesh[PartID(vol, rank)], + part_id_to_tag_to_elements[PartID(vol, rank)]) + for vol in volumes} + for rank in return_ranks} + + +@contextmanager +def _manage_mpi_comm(comm): + try: + yield comm + finally: + comm.Free() + + def distribute_mesh(comm, get_mesh_data, partition_generator_func=None, logmgr=None): r"""Distribute a mesh among all ranks in *comm*. From 1583d72553d98e4ff283ee0555b0c8281155659f Mon Sep 17 00:00:00 2001 From: Mike Campbell Date: Thu, 7 Sep 2023 09:26:45 -0500 Subject: [PATCH 03/11] Add majosm node-local mesh partitioning funcs --- mirgecom/simutil.py | 197 ++++++++++++++++++-------------------------- 1 file changed, 79 insertions(+), 118 deletions(-) diff --git a/mirgecom/simutil.py b/mirgecom/simutil.py index 610ce757b..0dee3006c 100644 --- a/mirgecom/simutil.py +++ b/mirgecom/simutil.py @@ -905,7 +905,7 @@ def _partition_multi_volume_mesh( mesh, num_ranks, rank_per_element, tag_to_elements, volume_to_tags, *, return_ranks=None): if return_ranks is None: - return_ranks = [i for i in range(num_ranks)] + return_ranks = list(range(num_ranks)) tag_to_volume = { tag: vol @@ -1014,10 +1014,12 @@ def distribute_mesh(comm, get_mesh_data, partition_generator_func=None, logmgr=N global_nelements: :class:`int` The number of elements in the global mesh """ + from mpi4py import MPI from mpi4py.util import pkl5 - comm_wrapper = pkl5.Intracomm(comm) + from meshmode.distributed import mpi_distribute + # pkl5_comm = pkl5.Intracomm(comm) - num_ranks = comm_wrapper.Get_size() + num_ranks = comm.Get_size() t_mesh_dist = IntervalTimer("t_mesh_dist", "Time spent distributing mesh data.") t_mesh_data = IntervalTimer("t_mesh_data", "Time spent getting mesh data.") t_mesh_part = IntervalTimer("t_mesh_part", "Time spent partitioning the mesh.") @@ -1028,132 +1030,91 @@ def partition_generator_func(mesh, tag_to_elements, num_ranks): from meshmode.distributed import get_partition_by_pymetis return get_partition_by_pymetis(mesh, num_ranks) - if comm_wrapper.Get_rank() == 0: - if logmgr: - logmgr.add_quantity(t_mesh_data) - with t_mesh_data.get_sub_timer(): + with _manage_mpi_comm( + comm.Split_type(MPI.COMM_TYPE_SHARED, comm.Get_rank(), MPI.INFO_NULL) + ) as node_comm: + node_comm_wrapper = pkl5.Intracomm(node_comm) + node_ranks = node_comm_wrapper.gather(comm.Get_rank(), root=0) + my_node_rank = node_comm_wrapper.Get_rank() + + if my_node_rank == 0: + if logmgr: + logmgr.add_quantity(t_mesh_data) + with t_mesh_data.get_sub_timer(): + global_data = get_mesh_data() + else: global_data = get_mesh_data() - else: - global_data = get_mesh_data() - - from meshmode.mesh import Mesh - if isinstance(global_data, Mesh): - mesh = global_data - tag_to_elements = None - volume_to_tags = None - elif isinstance(global_data, tuple) and len(global_data) == 3: - mesh, tag_to_elements, volume_to_tags = global_data - else: - raise TypeError("Unexpected result from get_mesh_data") - if logmgr: - logmgr.add_quantity(t_mesh_part) - with t_mesh_part.get_sub_timer(): + from meshmode.mesh import Mesh + if isinstance(global_data, Mesh): + mesh = global_data + tag_to_elements = None + volume_to_tags = None + elif isinstance(global_data, tuple) and len(global_data) == 3: + mesh, tag_to_elements, volume_to_tags = global_data + else: + raise TypeError("Unexpected result from get_mesh_data") + + if logmgr: + logmgr.add_quantity(t_mesh_part) + with t_mesh_part.get_sub_timer(): + rank_per_element = \ + partition_generator_func(mesh, tag_to_elements, + num_ranks) + else: rank_per_element = partition_generator_func(mesh, tag_to_elements, num_ranks) - else: - rank_per_element = partition_generator_func(mesh, tag_to_elements, - num_ranks) - def get_rank_to_mesh_data(): - from meshmode.mesh.processing import partition_mesh - if tag_to_elements is None: - rank_to_elements = { - rank: np.where(rank_per_element == rank)[0] - for rank in range(num_ranks)} + def get_rank_to_mesh_data(): + if tag_to_elements is None: + rank_to_mesh_data = _partition_single_volume_mesh( + mesh, num_ranks, rank_per_element, + return_ranks=node_ranks) + else: + rank_to_mesh_data = _partition_multi_volume_mesh( + mesh, num_ranks, rank_per_element, tag_to_elements, + volume_to_tags, return_ranks=node_ranks) - rank_to_mesh_data_dict = partition_mesh(mesh, rank_to_elements) + rank_to_node_rank = { + rank: node_rank + for node_rank, rank in enumerate(node_ranks)} - rank_to_mesh_data = [ - rank_to_mesh_data_dict[rank] - for rank in range(num_ranks)] + node_rank_to_mesh_data = { + rank_to_node_rank[rank]: mesh_data + for rank, mesh_data in rank_to_mesh_data.items()} - else: - tag_to_volume = { - tag: vol - for vol, tags in volume_to_tags.items() - for tag in tags} - - volumes = list(volume_to_tags.keys()) - - volume_index_per_element = np.full(mesh.nelements, -1, dtype=int) - for tag, elements in tag_to_elements.items(): - volume_index_per_element[elements] = volumes.index( - tag_to_volume[tag]) - - if np.any(volume_index_per_element < 0): - raise ValueError("Missing volume specification " - "for some elements.") - - part_id_to_elements = { - PartID(volumes[vol_idx], rank): - np.where( - (volume_index_per_element == vol_idx) - & (rank_per_element == rank))[0] - for vol_idx in range(len(volumes)) - for rank in range(num_ranks)} - - # TODO: Add a public meshmode function to accomplish this? So we're - # not depending on meshmode internals - part_id_to_part_index = { - part_id: part_index - for part_index, part_id in enumerate(part_id_to_elements.keys())} - from meshmode.mesh.processing import \ - _compute_global_elem_to_part_elem - global_elem_to_part_elem = _compute_global_elem_to_part_elem( - mesh.nelements, part_id_to_elements, part_id_to_part_index, - mesh.element_id_dtype) - - tag_to_global_to_part = { - tag: global_elem_to_part_elem[elements, :] - for tag, elements in tag_to_elements.items()} - - part_id_to_tag_to_elements = {} - for part_id in part_id_to_elements.keys(): - part_idx = part_id_to_part_index[part_id] - part_tag_to_elements = {} - for tag, global_to_part in tag_to_global_to_part.items(): - part_tag_to_elements[tag] = global_to_part[ - global_to_part[:, 0] == part_idx, 1] - part_id_to_tag_to_elements[part_id] = part_tag_to_elements - - part_id_to_mesh = partition_mesh(mesh, part_id_to_elements) - - rank_to_mesh_data = [ - { - vol: ( - part_id_to_mesh[PartID(vol, rank)], - part_id_to_tag_to_elements[PartID(vol, rank)]) - for vol in volumes} - for rank in range(num_ranks)] - - return rank_to_mesh_data - - if logmgr: - logmgr.add_quantity(t_mesh_split) - with t_mesh_split.get_sub_timer(): - rank_to_mesh_data = get_rank_to_mesh_data() - else: - rank_to_mesh_data = get_rank_to_mesh_data() - - global_nelements = comm_wrapper.bcast(mesh.nelements, root=0) + return node_rank_to_mesh_data - if logmgr: - logmgr.add_quantity(t_mesh_dist) - with t_mesh_dist.get_sub_timer(): - local_mesh_data = comm_wrapper.scatter(rank_to_mesh_data, root=0) - else: - local_mesh_data = comm_wrapper.scatter(rank_to_mesh_data, root=0) + if logmgr: + logmgr.add_quantity(t_mesh_split) + with t_mesh_split.get_sub_timer(): + node_rank_to_mesh_data = get_rank_to_mesh_data() + else: + node_rank_to_mesh_data = get_rank_to_mesh_data() - else: - global_nelements = comm_wrapper.bcast(None, root=0) + global_nelements = node_comm_wrapper.bcast(mesh.nelements, root=0) - if logmgr: - logmgr.add_quantity(t_mesh_dist) - with t_mesh_dist.get_sub_timer(): - local_mesh_data = comm_wrapper.scatter(None, root=0) - else: - local_mesh_data = comm_wrapper.scatter(None, root=0) + if logmgr: + logmgr.add_quantity(t_mesh_dist) + with t_mesh_dist.get_sub_timer(): + local_mesh_data = mpi_distribute( + node_comm_wrapper, source_rank=0, + source_data=node_rank_to_mesh_data) + else: + local_mesh_data = mpi_distribute( + node_comm_wrapper, source_rank=0, + source_data=node_rank_to_mesh_data) + + else: # my_node_rank > 0, get mesh part from MPI + global_nelements = node_comm_wrapper.bcast(None, root=0) + + if logmgr: + logmgr.add_quantity(t_mesh_dist) + with t_mesh_dist.get_sub_timer(): + local_mesh_data = \ + mpi_distribute(node_comm_wrapper, source_rank=0) + else: + local_mesh_data = mpi_distribute(node_comm_wrapper, source_rank=0) return local_mesh_data, global_nelements From 550d485c1bbe7840e20c292d0ca3a9e48f97d14c Mon Sep 17 00:00:00 2001 From: "Michael T. Campbell" Date: Thu, 7 Sep 2023 12:12:21 -0700 Subject: [PATCH 04/11] Dispense with pkl5 comm wrapper which doesnt play nice with splitted comm. --- mirgecom/simutil.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/mirgecom/simutil.py b/mirgecom/simutil.py index 0dee3006c..d3d355cdc 100644 --- a/mirgecom/simutil.py +++ b/mirgecom/simutil.py @@ -1015,9 +1015,7 @@ def distribute_mesh(comm, get_mesh_data, partition_generator_func=None, logmgr=N The number of elements in the global mesh """ from mpi4py import MPI - from mpi4py.util import pkl5 from meshmode.distributed import mpi_distribute - # pkl5_comm = pkl5.Intracomm(comm) num_ranks = comm.Get_size() t_mesh_dist = IntervalTimer("t_mesh_dist", "Time spent distributing mesh data.") @@ -1033,9 +1031,8 @@ def partition_generator_func(mesh, tag_to_elements, num_ranks): with _manage_mpi_comm( comm.Split_type(MPI.COMM_TYPE_SHARED, comm.Get_rank(), MPI.INFO_NULL) ) as node_comm: - node_comm_wrapper = pkl5.Intracomm(node_comm) - node_ranks = node_comm_wrapper.gather(comm.Get_rank(), root=0) - my_node_rank = node_comm_wrapper.Get_rank() + node_ranks = node_comm.gather(comm.Get_rank(), root=0) + my_node_rank = node_comm.Get_rank() if my_node_rank == 0: if logmgr: @@ -1092,29 +1089,29 @@ def get_rank_to_mesh_data(): else: node_rank_to_mesh_data = get_rank_to_mesh_data() - global_nelements = node_comm_wrapper.bcast(mesh.nelements, root=0) + global_nelements = node_comm.bcast(mesh.nelements, root=0) if logmgr: logmgr.add_quantity(t_mesh_dist) with t_mesh_dist.get_sub_timer(): local_mesh_data = mpi_distribute( - node_comm_wrapper, source_rank=0, + node_comm, source_rank=0, source_data=node_rank_to_mesh_data) else: local_mesh_data = mpi_distribute( - node_comm_wrapper, source_rank=0, + node_comm, source_rank=0, source_data=node_rank_to_mesh_data) else: # my_node_rank > 0, get mesh part from MPI - global_nelements = node_comm_wrapper.bcast(None, root=0) + global_nelements = node_comm.bcast(None, root=0) if logmgr: logmgr.add_quantity(t_mesh_dist) with t_mesh_dist.get_sub_timer(): local_mesh_data = \ - mpi_distribute(node_comm_wrapper, source_rank=0) + mpi_distribute(node_comm, source_rank=0) else: - local_mesh_data = mpi_distribute(node_comm_wrapper, source_rank=0) + local_mesh_data = mpi_distribute(node_comm, source_rank=0) return local_mesh_data, global_nelements From f09077f4f6fcb10763147d753b68c10c8aed040e Mon Sep 17 00:00:00 2001 From: Mike Campbell Date: Fri, 8 Sep 2023 08:51:56 -0500 Subject: [PATCH 05/11] Update mirgecom/simutil.py Co-authored-by: Matt Smith --- mirgecom/simutil.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mirgecom/simutil.py b/mirgecom/simutil.py index d3d355cdc..134d00ca2 100644 --- a/mirgecom/simutil.py +++ b/mirgecom/simutil.py @@ -930,7 +930,8 @@ def _partition_multi_volume_mesh( for vol_idx in range(len(volumes)) for rank in range(num_ranks)} - # FIXME: Find a better way to do this + # TODO: Add a public meshmode function to accomplish this? So we're + # not depending on meshmode internals part_id_to_part_index = { part_id: part_index for part_index, part_id in enumerate(part_id_to_elements.keys())} From 15a0eb98c9e2fe7308a2882b0dae174226267a76 Mon Sep 17 00:00:00 2001 From: Mike Campbell Date: Fri, 8 Sep 2023 11:02:07 -0500 Subject: [PATCH 06/11] rearrange comming in mesh dist --- mirgecom/simutil.py | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/mirgecom/simutil.py b/mirgecom/simutil.py index 134d00ca2..b9e709f15 100644 --- a/mirgecom/simutil.py +++ b/mirgecom/simutil.py @@ -1019,6 +1019,8 @@ def distribute_mesh(comm, get_mesh_data, partition_generator_func=None, logmgr=N from meshmode.distributed import mpi_distribute num_ranks = comm.Get_size() + my_global_rank = comm.Get_rank() + t_mesh_dist = IntervalTimer("t_mesh_dist", "Time spent distributing mesh data.") t_mesh_data = IntervalTimer("t_mesh_data", "Time spent getting mesh data.") t_mesh_part = IntervalTimer("t_mesh_part", "Time spent partitioning the mesh.") @@ -1031,9 +1033,12 @@ def partition_generator_func(mesh, tag_to_elements, num_ranks): with _manage_mpi_comm( comm.Split_type(MPI.COMM_TYPE_SHARED, comm.Get_rank(), MPI.INFO_NULL) - ) as node_comm: + ) as node_comm: node_ranks = node_comm.gather(comm.Get_rank(), root=0) my_node_rank = node_comm.Get_rank() + reader_color = 0 if my_node_rank == 0 else 1 + reader_comm = comm.Split(reader_color, my_global_rank) + my_reader_rank = reader_comm.Get_rank() if my_node_rank == 0: if logmgr: @@ -1043,6 +1048,10 @@ def partition_generator_func(mesh, tag_to_elements, num_ranks): else: global_data = get_mesh_data() + reader_comm.Barrier() + if my_reader_rank == 0: + print("Mesh reading done on all nodes.") + from meshmode.mesh import Mesh if isinstance(global_data, Mesh): mesh = global_data @@ -1060,8 +1069,13 @@ def partition_generator_func(mesh, tag_to_elements, num_ranks): partition_generator_func(mesh, tag_to_elements, num_ranks) else: - rank_per_element = partition_generator_func(mesh, tag_to_elements, - num_ranks) + rank_per_element = \ + partition_generator_func(mesh, tag_to_elements, + num_ranks) + + reader_comm.Barrier() + if my_reader_rank == 0: + print("Mesh partitioning done on each node.") def get_rank_to_mesh_data(): if tag_to_elements is None: @@ -1090,6 +1104,10 @@ def get_rank_to_mesh_data(): else: node_rank_to_mesh_data = get_rank_to_mesh_data() + reader_comm.Barrier() + if my_reader_rank == 0: + print("Rank-to-mesh data done on each node, distributing.") + global_nelements = node_comm.bcast(mesh.nelements, root=0) if logmgr: From 4f1eba22708df62384b0c68399c4aed860442c00 Mon Sep 17 00:00:00 2001 From: "Michael T. Campbell" Date: Tue, 12 Sep 2023 19:05:56 -0700 Subject: [PATCH 07/11] Batch the reads --- mirgecom/logging_quantities.py | 7 +++++ mirgecom/simutil.py | 49 +++++++++++++++++++++++++++------- 2 files changed, 47 insertions(+), 9 deletions(-) diff --git a/mirgecom/logging_quantities.py b/mirgecom/logging_quantities.py index c7a1064d3..6271ce620 100644 --- a/mirgecom/logging_quantities.py +++ b/mirgecom/logging_quantities.py @@ -32,6 +32,7 @@ .. autoclass:: DeviceMemoryUsage .. autofunction:: initialize_logmgr .. autofunction:: logmgr_add_cl_device_info +.. autofunction:: logmgr_add_simulation_info .. autofunction:: logmgr_add_device_memory_usage .. autofunction:: logmgr_add_many_discretization_quantities .. autofunction:: logmgr_add_mempool_usage @@ -101,6 +102,12 @@ def logmgr_add_cl_device_info(logmgr: LogManager, queue: cl.CommandQueue) -> Non logmgr.set_constant("cl_platform_version", dev.platform.version) +def logmgr_add_simulation_info(logmgr: LogManager, sim_info) -> None: + """Add some user-defined information to the logpyle output.""" + for field_name in sim_info: + logmgr.set_constant(field_name, sim_info[field_name]) + + def logmgr_add_device_name(logmgr: LogManager, queue: cl.CommandQueue): # noqa: D401 """Deprecated. Do not use in new code.""" from warnings import warn diff --git a/mirgecom/simutil.py b/mirgecom/simutil.py index b9e709f15..59f340520 100644 --- a/mirgecom/simutil.py +++ b/mirgecom/simutil.py @@ -979,7 +979,8 @@ def _manage_mpi_comm(comm): comm.Free() -def distribute_mesh(comm, get_mesh_data, partition_generator_func=None, logmgr=None): +def distribute_mesh(comm, get_mesh_data, partition_generator_func=None, logmgr=None, + num_per_batch=None): r"""Distribute a mesh among all ranks in *comm*. Retrieve the global mesh data with the user-supplied function *get_mesh_data*, @@ -1016,10 +1017,13 @@ def distribute_mesh(comm, get_mesh_data, partition_generator_func=None, logmgr=N The number of elements in the global mesh """ from mpi4py import MPI + from mpi4py.util import pkl5 + from socket import gethostname from meshmode.distributed import mpi_distribute num_ranks = comm.Get_size() my_global_rank = comm.Get_rank() + hostname = gethostname() t_mesh_dist = IntervalTimer("t_mesh_dist", "Time spent distributing mesh data.") t_mesh_data = IntervalTimer("t_mesh_data", "Time spent getting mesh data.") @@ -1032,8 +1036,10 @@ def partition_generator_func(mesh, tag_to_elements, num_ranks): return get_partition_by_pymetis(mesh, num_ranks) with _manage_mpi_comm( - comm.Split_type(MPI.COMM_TYPE_SHARED, comm.Get_rank(), MPI.INFO_NULL) + pkl5.Intracomm(comm.Split_type(MPI.COMM_TYPE_SHARED, + comm.Get_rank(), MPI.INFO_NULL)) ) as node_comm: + node_ranks = node_comm.gather(comm.Get_rank(), root=0) my_node_rank = node_comm.Get_rank() reader_color = 0 if my_node_rank == 0 else 1 @@ -1041,12 +1047,33 @@ def partition_generator_func(mesh, tag_to_elements, num_ranks): my_reader_rank = reader_comm.Get_rank() if my_node_rank == 0: + + num_reading_ranks = reader_comm.Get_size() + num_per_batch = num_per_batch or num_reading_ranks + num_reading_batches = max(int(num_reading_ranks / num_per_batch), 1) + read_batch = int(my_reader_rank / num_per_batch) + + print(f"Reading(rank, batch): ({my_reader_rank}, " + f"{read_batch}) on {hostname}.") + if logmgr: logmgr.add_quantity(t_mesh_data) with t_mesh_data.get_sub_timer(): - global_data = get_mesh_data() + for reading_batch in range(num_reading_batches): + if read_batch == reading_batch: + print(f"Reading mesh on {hostname}.") + global_data = get_mesh_data() + reader_comm.Barrier() + else: + reader_comm.Barrier() else: - global_data = get_mesh_data() + for reading_batch in range(num_reading_batches): + if read_batch == reading_batch: + print(f"Reading mesh on {hostname}.") + global_data = get_mesh_data() + reader_comm.Barrier() + else: + reader_comm.Barrier() reader_comm.Barrier() if my_reader_rank == 0: @@ -1062,6 +1089,10 @@ def partition_generator_func(mesh, tag_to_elements, num_ranks): else: raise TypeError("Unexpected result from get_mesh_data") + reader_comm.Barrier() + if my_reader_rank == 0: + print("Making partition table on all nodes.") + if logmgr: logmgr.add_quantity(t_mesh_part) with t_mesh_part.get_sub_timer(): @@ -1073,10 +1104,6 @@ def partition_generator_func(mesh, tag_to_elements, num_ranks): partition_generator_func(mesh, tag_to_elements, num_ranks) - reader_comm.Barrier() - if my_reader_rank == 0: - print("Mesh partitioning done on each node.") - def get_rank_to_mesh_data(): if tag_to_elements is None: rank_to_mesh_data = _partition_single_volume_mesh( @@ -1097,6 +1124,10 @@ def get_rank_to_mesh_data(): return node_rank_to_mesh_data + reader_comm.Barrier() + if my_reader_rank == 0: + print("Partitioning mesh on all nodes.") + if logmgr: logmgr.add_quantity(t_mesh_split) with t_mesh_split.get_sub_timer(): @@ -1106,7 +1137,7 @@ def get_rank_to_mesh_data(): reader_comm.Barrier() if my_reader_rank == 0: - print("Rank-to-mesh data done on each node, distributing.") + print("Partitioning done, distributing to node-local ranks.") global_nelements = node_comm.bcast(mesh.nelements, root=0) From 1fe0401eb1089caab51650e72c69b91030988b9a Mon Sep 17 00:00:00 2001 From: Mike Campbell Date: Tue, 12 Sep 2023 21:07:14 -0500 Subject: [PATCH 08/11] Add pkl dist method --- mirgecom/simutil.py | 207 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 207 insertions(+) diff --git a/mirgecom/simutil.py b/mirgecom/simutil.py index eebe03316..53b396bb3 100644 --- a/mirgecom/simutil.py +++ b/mirgecom/simutil.py @@ -1068,6 +1068,213 @@ def get_rank_to_mesh_data(): return local_mesh_data, global_nelements +def distribute_mesh_pkl(comm, get_mesh_data, filename="mesh", + num_target_ranks=0, num_reader_ranks=0, + partition_generator_func=None, logmgr=None): + r"""Distribute a mesh among all ranks in *comm*. + + Retrieve the global mesh data with the user-supplied function *get_mesh_data*, + partition the mesh, and distribute it to every rank in the provided MPI + communicator *comm*. + + .. note:: + This is a collective routine and must be called by all MPI ranks. + + Parameters + ---------- + comm: + MPI communicator over which to partition the mesh + get_mesh_data: + Callable of zero arguments returning *mesh* or + *(mesh, tag_to_elements, volume_to_tags)*, where *mesh* is a + :class:`meshmode.mesh.Mesh`, *tag_to_elements* is a + :class:`dict` mapping mesh volume tags to :class:`numpy.ndarray`\ s of + element numbers, and *volume_to_tags* is a :class:`dict` that maps volumes + in the resulting distributed mesh to volume tags in *tag_to_elements*. + partition_generator_func: + Optional callable that takes *mesh*, *tag_to_elements*, and *comm*'s size, + and returns a :class:`numpy.ndarray` indicating to which rank each element + belongs. + + Returns + ------- + local_mesh_data: :class:`meshmode.mesh.Mesh` or :class:`dict` + If the result of calling *get_mesh_data* specifies a single volume, + *local_mesh_data* is the local mesh. If it specifies multiple volumes, + *local_mesh_data* will be a :class:`dict` mapping volume tags to + tuples of the form *(local_mesh, local_tag_to_elements)*. + global_nelements: :class:`int` + The number of elements in the global mesh + """ + from mpi4py.util import pkl5 + comm_wrapper = pkl5.Intracomm(comm) + + num_ranks = comm_wrapper.Get_size() + rank = comm_wrapper.Get_rank() + + if num_target_ranks <= 0: + num_target_ranks = num_ranks + if num_reader_ranks <= 0: + num_reader_ranks = num_ranks + + reader_color = 1 if rank < num_reader_ranks else 0 + reader_comm = comm_wrapper.Split(reader_color, rank) + reader_comm_wrapper = pkl5.Intracomm(reader_comm) + reader_rank = reader_comm_wrapper.Get_rank() + num_ranks_per_reader = int(num_target_ranks / num_reader_ranks) + num_leftover = num_target_ranks - (num_ranks_per_reader * num_reader_ranks) + num_ranks_this_reader = num_ranks_per_reader + (1 if reader_rank + < num_leftover else 0) + + t_mesh_dist = IntervalTimer("t_mesh_dist", "Time spent distributing mesh data.") + t_mesh_data = IntervalTimer("t_mesh_data", "Time spent getting mesh data.") + t_mesh_part = IntervalTimer("t_mesh_part", "Time spent partitioning the mesh.") + t_mesh_split = IntervalTimer("t_mesh_split", "Time spent splitting mesh parts.") + + if reader_color and num_ranks_this_reader > 0: + my_starting_rank = (num_ranks_per_reader * reader_rank + + reader_rank if reader_rank + < num_leftover else num_leftover) + my_ending_rank = my_starting_rank + num_ranks_this_reader - 1 + ranks_to_write = range(my_starting_rank, my_ending_rank) + print(f"{rank=}{reader_rank=}{my_starting_rank=}{my_ending_rank=}") + + if partition_generator_func is None: + def partition_generator_func(mesh, tag_to_elements, num_target_ranks): + from meshmode.distributed import get_partition_by_pymetis + return get_partition_by_pymetis(mesh, num_target_ranks) + + if logmgr: + logmgr.add_quantity(t_mesh_data) + with t_mesh_data.get_sub_timer(): + global_data = get_mesh_data() + else: + global_data = get_mesh_data() + + from meshmode.mesh import Mesh + if isinstance(global_data, Mesh): + mesh = global_data + tag_to_elements = None + volume_to_tags = None + elif isinstance(global_data, tuple) and len(global_data) == 3: + mesh, tag_to_elements, volume_to_tags = global_data + else: + raise TypeError("Unexpected result from get_mesh_data") + + if logmgr: + logmgr.add_quantity(t_mesh_part) + with t_mesh_part.get_sub_timer(): + rank_per_element = partition_generator_func(mesh, tag_to_elements, + num_target_ranks) + else: + rank_per_element = partition_generator_func(mesh, tag_to_elements, + num_target_ranks) + + def get_rank_to_mesh_data(): + from meshmode.mesh.processing import partition_mesh + if tag_to_elements is None: + rank_to_elements = { + rank: np.where(rank_per_element == rank)[0] + for rank in ranks_to_write} + + rank_to_mesh_data_dict = partition_mesh(mesh, rank_to_elements) + + rank_to_mesh_data = [ + rank_to_mesh_data_dict[rank] + for rank in ranks_to_write] + + else: + tag_to_volume = { + tag: vol + for vol, tags in volume_to_tags.items() + for tag in tags} + + volumes = list(volume_to_tags.keys()) + + volume_index_per_element = np.full(mesh.nelements, -1, dtype=int) + for tag, elements in tag_to_elements.items(): + volume_index_per_element[elements] = volumes.index( + tag_to_volume[tag]) + + if np.any(volume_index_per_element < 0): + raise ValueError("Missing volume specification " + "for some elements.") + + part_id_to_elements = { + PartID(volumes[vol_idx], rank): + np.where( + (volume_index_per_element == vol_idx) + & (rank_per_element == rank))[0] + for vol_idx in range(len(volumes)) + for rank in ranks_to_write} + + # TODO: Add a public meshmode function to accomplish this? So we're + # not depending on meshmode internals + part_id_to_part_index = { + part_id: part_index + for part_index, part_id in enumerate(part_id_to_elements.keys())} + from meshmode.mesh.processing import \ + _compute_global_elem_to_part_elem + global_elem_to_part_elem = _compute_global_elem_to_part_elem( + mesh.nelements, part_id_to_elements, part_id_to_part_index, + mesh.element_id_dtype) + + tag_to_global_to_part = { + tag: global_elem_to_part_elem[elements, :] + for tag, elements in tag_to_elements.items()} + + part_id_to_tag_to_elements = {} + for part_id in part_id_to_elements.keys(): + part_idx = part_id_to_part_index[part_id] + part_tag_to_elements = {} + for tag, global_to_part in tag_to_global_to_part.items(): + part_tag_to_elements[tag] = global_to_part[ + global_to_part[:, 0] == part_idx, 1] + part_id_to_tag_to_elements[part_id] = part_tag_to_elements + + part_id_to_mesh = partition_mesh(mesh, part_id_to_elements) + + rank_to_mesh_data = [ + { + vol: ( + part_id_to_mesh[PartID(vol, rank)], + part_id_to_tag_to_elements[PartID(vol, rank)]) + for vol in volumes} + for rank in ranks_to_write] + + return rank_to_mesh_data + + if logmgr: + logmgr.add_quantity(t_mesh_split) + with t_mesh_split.get_sub_timer(): + rank_to_mesh_data = get_rank_to_mesh_data(ranks_to_write) + else: + rank_to_mesh_data = get_rank_to_mesh_data(ranks_to_write) + + import os + import pickle + if logmgr: + logmgr.add_quantity(t_mesh_dist) + with t_mesh_dist.get_sub_timer(): + for rank in ranks_to_write: + rank_mesh_data = rank_to_mesh_data[rank] + mesh_data_to_pickle = (mesh.nelements, rank_mesh_data) + pkl_filename = filename + f"_rank{rank}.pkl" + if os.path.exists(pkl_filename): + os.remove(pkl_filename) + with open(pkl_filename, "wb") as pkl_file: + pickle.dump(mesh_data_to_pickle, pkl_file) + else: + for rank in ranks_to_write: + rank_mesh_data = rank_to_mesh_data[rank] + mesh_data_to_pickle = (mesh.nelements, rank_mesh_data) + pkl_filename = filename + f"_rank{rank}.pkl" + if os.path.exists(pkl_filename): + os.remove(pkl_filename) + with open(pkl_filename, "wb") as pkl_file: + pickle.dump(mesh_data_to_pickle, pkl_file) + + def extract_volumes(mesh, tag_to_elements, selected_tags, boundary_tag): r""" Create a mesh containing a subset of another mesh's volumes. From 46faa2d219bb6627a38b62e5928314403f2070b0 Mon Sep 17 00:00:00 2001 From: "Michael T. Campbell" Date: Thu, 14 Sep 2023 08:36:46 -0700 Subject: [PATCH 09/11] Add mesh dist to pkl util --- mirgecom/simutil.py | 108 ++++++++++---------------------------------- 1 file changed, 24 insertions(+), 84 deletions(-) diff --git a/mirgecom/simutil.py b/mirgecom/simutil.py index b456dbb5c..2822a6b8d 100644 --- a/mirgecom/simutil.py +++ b/mirgecom/simutil.py @@ -611,8 +611,9 @@ def geometric_mesh_partitioner(mesh, num_ranks=None, *, nranks_per_axis=None, # Create geometrically even partitions elem_to_rank = ((elem_centroids-x_min) / part_interval).astype(int) - - print(f"{elem_to_rank=}") + + if debug: + print(f"{elem_to_rank=}") # map partition id to list of elements in that partition part_to_elements = {r: set(np.where(elem_to_rank == r)[0]) @@ -970,7 +971,6 @@ def _partition_multi_volume_mesh( for vol in volumes} for rank in return_ranks} - @contextmanager def _manage_mpi_comm(comm): try: @@ -1208,15 +1208,15 @@ def distribute_mesh_pkl(comm, get_mesh_data, filename="mesh", comm_wrapper = pkl5.Intracomm(comm) num_ranks = comm_wrapper.Get_size() - rank = comm_wrapper.Get_rank() + my_rank = comm_wrapper.Get_rank() if num_target_ranks <= 0: num_target_ranks = num_ranks if num_reader_ranks <= 0: num_reader_ranks = num_ranks - reader_color = 1 if rank < num_reader_ranks else 0 - reader_comm = comm_wrapper.Split(reader_color, rank) + reader_color = 1 if my_rank < num_reader_ranks else 0 + reader_comm = comm_wrapper.Split(reader_color, my_rank) reader_comm_wrapper = pkl5.Intracomm(reader_comm) reader_rank = reader_comm_wrapper.Get_rank() num_ranks_per_reader = int(num_target_ranks / num_reader_ranks) @@ -1230,12 +1230,13 @@ def distribute_mesh_pkl(comm, get_mesh_data, filename="mesh", t_mesh_split = IntervalTimer("t_mesh_split", "Time spent splitting mesh parts.") if reader_color and num_ranks_this_reader > 0: - my_starting_rank = (num_ranks_per_reader * reader_rank - + reader_rank if reader_rank - < num_leftover else num_leftover) + my_starting_rank = num_ranks_per_reader * reader_rank + my_starting_rank = my_starting_rank + (reader_rank if reader_rank + < num_leftover else num_leftover) my_ending_rank = my_starting_rank + num_ranks_this_reader - 1 - ranks_to_write = range(my_starting_rank, my_ending_rank) - print(f"{rank=}{reader_rank=}{my_starting_rank=}{my_ending_rank=}") + ranks_to_write = list(range(my_starting_rank, my_ending_rank+1)) + + print(f"R({my_rank},{reader_rank}): W({my_starting_rank},{my_ending_rank})") if partition_generator_func is None: def partition_generator_func(mesh, tag_to_elements, num_target_ranks): @@ -1269,85 +1270,22 @@ def partition_generator_func(mesh, tag_to_elements, num_target_ranks): num_target_ranks) def get_rank_to_mesh_data(): - from meshmode.mesh.processing import partition_mesh if tag_to_elements is None: - rank_to_elements = { - rank: np.where(rank_per_element == rank)[0] - for rank in ranks_to_write} - - rank_to_mesh_data_dict = partition_mesh(mesh, rank_to_elements) - - rank_to_mesh_data = [ - rank_to_mesh_data_dict[rank] - for rank in ranks_to_write] - + rank_to_mesh_data = _partition_single_volume_mesh( + mesh, num_target_ranks, rank_per_element, + return_ranks=ranks_to_write) else: - tag_to_volume = { - tag: vol - for vol, tags in volume_to_tags.items() - for tag in tags} - - volumes = list(volume_to_tags.keys()) - - volume_index_per_element = np.full(mesh.nelements, -1, dtype=int) - for tag, elements in tag_to_elements.items(): - volume_index_per_element[elements] = volumes.index( - tag_to_volume[tag]) - - if np.any(volume_index_per_element < 0): - raise ValueError("Missing volume specification " - "for some elements.") - - part_id_to_elements = { - PartID(volumes[vol_idx], rank): - np.where( - (volume_index_per_element == vol_idx) - & (rank_per_element == rank))[0] - for vol_idx in range(len(volumes)) - for rank in ranks_to_write} - - # TODO: Add a public meshmode function to accomplish this? So we're - # not depending on meshmode internals - part_id_to_part_index = { - part_id: part_index - for part_index, part_id in enumerate(part_id_to_elements.keys())} - from meshmode.mesh.processing import \ - _compute_global_elem_to_part_elem - global_elem_to_part_elem = _compute_global_elem_to_part_elem( - mesh.nelements, part_id_to_elements, part_id_to_part_index, - mesh.element_id_dtype) - - tag_to_global_to_part = { - tag: global_elem_to_part_elem[elements, :] - for tag, elements in tag_to_elements.items()} - - part_id_to_tag_to_elements = {} - for part_id in part_id_to_elements.keys(): - part_idx = part_id_to_part_index[part_id] - part_tag_to_elements = {} - for tag, global_to_part in tag_to_global_to_part.items(): - part_tag_to_elements[tag] = global_to_part[ - global_to_part[:, 0] == part_idx, 1] - part_id_to_tag_to_elements[part_id] = part_tag_to_elements - - part_id_to_mesh = partition_mesh(mesh, part_id_to_elements) - - rank_to_mesh_data = [ - { - vol: ( - part_id_to_mesh[PartID(vol, rank)], - part_id_to_tag_to_elements[PartID(vol, rank)]) - for vol in volumes} - for rank in ranks_to_write] - + rank_to_mesh_data = _partition_multi_volume_mesh( + mesh, num_target_ranks, rank_per_element, tag_to_elements, + volume_to_tags, return_ranks=ranks_to_write) return rank_to_mesh_data if logmgr: logmgr.add_quantity(t_mesh_split) with t_mesh_split.get_sub_timer(): - rank_to_mesh_data = get_rank_to_mesh_data(ranks_to_write) + rank_to_mesh_data = get_rank_to_mesh_data() else: - rank_to_mesh_data = get_rank_to_mesh_data(ranks_to_write) + rank_to_mesh_data = get_rank_to_mesh_data() import os import pickle @@ -1355,7 +1293,8 @@ def get_rank_to_mesh_data(): logmgr.add_quantity(t_mesh_dist) with t_mesh_dist.get_sub_timer(): for rank in ranks_to_write: - rank_mesh_data = rank_to_mesh_data[rank] + local_rank_index = rank - my_starting_rank + rank_mesh_data = rank_to_mesh_data[local_rank_index] mesh_data_to_pickle = (mesh.nelements, rank_mesh_data) pkl_filename = filename + f"_rank{rank}.pkl" if os.path.exists(pkl_filename): @@ -1364,7 +1303,8 @@ def get_rank_to_mesh_data(): pickle.dump(mesh_data_to_pickle, pkl_file) else: for rank in ranks_to_write: - rank_mesh_data = rank_to_mesh_data[rank] + local_rank_index = rank - my_starting_rank + rank_mesh_data = rank_to_mesh_data[local_rank_index] mesh_data_to_pickle = (mesh.nelements, rank_mesh_data) pkl_filename = filename + f"_rank{rank}.pkl" if os.path.exists(pkl_filename): From f004abf8c53349ac0d6161ec680bcb143f47e4cb Mon Sep 17 00:00:00 2001 From: "Michael T. Campbell" Date: Fri, 15 Sep 2023 09:02:00 -0700 Subject: [PATCH 10/11] Add meshdist util --- bin/meshdist.py | 260 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 260 insertions(+) create mode 100755 bin/meshdist.py diff --git a/bin/meshdist.py b/bin/meshdist.py new file mode 100755 index 000000000..68f0617d5 --- /dev/null +++ b/bin/meshdist.py @@ -0,0 +1,260 @@ +#!/usr/bin/env python +"""mirgecom mesh distribution utility""" + +__copyright__ = """ +Copyright (C) 2020 University of Illinois Board of Trustees +""" + +__license__ = """ +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +""" +import logging +import argparse +import sys +import os + +from pytools.obj_array import make_obj_array +from functools import partial + +from logpyle import IntervalTimer, set_dt +from mirgecom.logging_quantities import ( + initialize_logmgr, + logmgr_add_cl_device_info, + logmgr_set_time, + logmgr_add_simulation_info, + logmgr_add_device_memory_usage, + logmgr_add_mempool_usage, +) + +from mirgecom.simutil import ( + ApplicationOptionsError, + distribute_mesh, + distribute_mesh_pkl +) +from mirgecom.mpi import mpi_entry_point + + +class SingleLevelFilter(logging.Filter): + def __init__(self, passlevel, reject): + self.passlevel = passlevel + self.reject = reject + + def filter(self, record): + if self.reject: + return (record.levelno != self.passlevel) + else: + return (record.levelno == self.passlevel) + + +class MyRuntimeError(RuntimeError): + """Simple exception to kill the simulation.""" + + pass + + +@mpi_entry_point +def main(actx_class, mesh_source=None, ndist=None, + output_path=None, log_path=None, + casename=None, use_1d_part=None, use_wall=False): + + if mesh_source is None: + raise ApplicationOptionsError("Missing mesh source file.") + + mesh_source.strip("'") + + if log_path is None: + log_path = "log_data" + + log_path.strip("'") + + if output_path is None: + output_path = "." + output_path.strip("'") + + # control log messages + logger = logging.getLogger(__name__) + logger.propagate = False + + if (logger.hasHandlers()): + logger.handlers.clear() + + # send info level messages to stdout + h1 = logging.StreamHandler(sys.stdout) + f1 = SingleLevelFilter(logging.INFO, False) + h1.addFilter(f1) + logger.addHandler(h1) + + # send everything else to stderr + h2 = logging.StreamHandler(sys.stderr) + f2 = SingleLevelFilter(logging.INFO, True) + h2.addFilter(f2) + logger.addHandler(h2) + + from mpi4py import MPI + from mpi4py.util import pkl5 + comm_world = MPI.COMM_WORLD + comm = pkl5.Intracomm(comm_world) + rank = comm.Get_rank() + nparts = comm.Get_size() + + if ndist is None: + ndist = nparts + + if casename is None: + casename = f"mirgecom_np{nparts}" + casename.strip("'") + + if rank == 0: + print(f"Running mesh distribution on {nparts} MPI ranks.") + print(f"Casename: {casename}") + print(f"Mesh source file: {mesh_source}") + + # logging and profiling + logname = log_path + "/" + casename + ".sqlite" + + if rank == 0: + log_dir = os.path.dirname(logname) + if log_dir and not os.path.exists(log_dir): + os.makedirs(log_dir) + + comm.Barrier() + + logmgr = initialize_logmgr(True, + filename=logname, mode="wu", mpi_comm=comm) + + from mirgecom.array_context import initialize_actx, actx_class_is_profiling + actx = initialize_actx(actx_class, comm) + queue = getattr(actx, "queue", None) + use_profiling = actx_class_is_profiling(actx_class) + alloc = getattr(actx, "allocator", None) + + monitor_memory = True + monitor_performance = 2 + + logmgr_add_cl_device_info(logmgr, queue) + + if monitor_memory: + logmgr_add_device_memory_usage(logmgr, queue) + logmgr_add_mempool_usage(logmgr, alloc) + + logmgr.add_watches([ + ("memory_usage_python.max", + "| Memory:\n| \t python memory: {value:7g} Mb\n") + ]) + + try: + logmgr.add_watches([ + ("memory_usage_gpu.max", + "| \t gpu memory: {value:7g} Mb\n") + ]) + except KeyError: + pass + + logmgr.add_watches([ + ("memory_usage_hwm.max", + "| \t memory hwm: {value:7g} Mb\n")]) + + if rank == 0: + print(f"Reading mesh from {mesh_source}.") + print(f"Writing {ndist} mesh pkl files to {output_path}.") + + def get_mesh_data(): + from meshmode.mesh.io import read_gmsh + mesh, tag_to_elements = read_gmsh( + mesh_source, + return_tag_to_elements_map=True) + volume_to_tags = { + "fluid": ["fluid"]} + if use_wall: + volume_to_tags["wall"] = ["wall_insert", "wall_surround"] + else: + from mirgecom.simutil import extract_volumes + mesh, tag_to_elements = extract_volumes( + mesh, tag_to_elements, volume_to_tags["fluid"], + "wall_interface") + return mesh, tag_to_elements, volume_to_tags + + def my_partitioner(mesh, tag_to_elements, num_ranks): + from mirgecom.simutil import geometric_mesh_partitioner + return geometric_mesh_partitioner( + mesh, num_ranks, auto_balance=True, debug=False) + + part_func = my_partitioner if use_1d_part else None + + if os.path.exists(output_path): + if not os.path.isdir(output_path): + raise ApplicationOptionsError( + "Mesh dist mode requires \"output\"" + " parameter to be a directory for output.") + if rank == 0: + if not os.path.exists(output_path): + os.makedirs(output_path) + + comm.Barrier() + mesh_filename = output_path + "/" + casename + "_mesh" + + if rank == 0: + print(f"Writing mesh pkl files to {mesh_filename}.") + + distribute_mesh_pkl( + comm, get_mesh_data, filename=mesh_filename, + num_target_ranks=ndist, + partition_generator_func=part_func, logmgr=logmgr) + + comm.Barrier() + + logmgr_set_time(logmgr, 0, 0) + logmgr.tick_before() + logmgr.tick_after() + logmgr.close() + + +if __name__ == "__main__": + + logging.basicConfig( + format="%(asctime)s - %(levelname)s - %(name)s - %(message)s", + level=logging.INFO) + + parser = argparse.ArgumentParser( + description="MIRGE-Com Mesh Distribution") + parser.add_argument("-w", "--wall", dest="use_wall", + action="store_true", help="Include wall domain in mesh.") + parser.add_argument("-1", "--1dpart", dest="one_d_part", + action="store_true", help="Use 1D partitioner.") + parser.add_argument("-n", "--ndist", type=int, dest="ndist", + nargs="?", action="store", help="Number of distributed parts") + parser.add_argument("-s", "--source", type=str, dest="source", + nargs="?", action="store", help="Gmsh mesh source file") + parser.add_argument("-o", "--ouput-path", type=str, dest="output_path", + nargs="?", action="store", help="Output path for distributed mesh pkl files") + parser.add_argument("-c", "--casename", type=str, dest="casename", nargs="?", + action="store", help="Root name of distributed mesh pkl files.") + parser.add_argument("-g", "--logpath", type=str, dest="log_path", nargs="?", + action="store", help="simulation case name") + + args = parser.parse_args() + + from mirgecom.array_context import get_reasonable_array_context_class + actx_class = get_reasonable_array_context_class( + lazy=False, distributed=True, profiling=False, numpy=False) + + main(actx_class, mesh_source=args.source, + output_path=args.output_path, ndist=args.ndist, + log_path=args.log_path, casename=args.casename, + use_1d_part=args.one_d_part, use_wall=args.use_wall) From 372132615524318faa9905c4da7f29ca8bae118f Mon Sep 17 00:00:00 2001 From: "Michael T. Campbell" Date: Sat, 16 Sep 2023 14:59:09 -0700 Subject: [PATCH 11/11] Import at top, use better filenames, loop over actual items in parted meshes. --- bin/meshdist.py | 6 ++++-- mirgecom/simutil.py | 25 +++++++++++-------------- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/bin/meshdist.py b/bin/meshdist.py index 68f0617d5..48bef66e9 100755 --- a/bin/meshdist.py +++ b/bin/meshdist.py @@ -117,11 +117,11 @@ def main(actx_class, mesh_source=None, ndist=None, ndist = nparts if casename is None: - casename = f"mirgecom_np{nparts}" + casename = f"mirgecom_np{ndist}" casename.strip("'") if rank == 0: - print(f"Running mesh distribution on {nparts} MPI ranks.") + print(f"Distributing on {nparts} ranks into {ndist} parts.") print(f"Casename: {casename}") print(f"Mesh source file: {mesh_source}") @@ -220,7 +220,9 @@ def my_partitioner(mesh, tag_to_elements, num_ranks): comm.Barrier() logmgr_set_time(logmgr, 0, 0) + logmgr logmgr.tick_before() + set_dt(logmgr, 0.) logmgr.tick_after() logmgr.close() diff --git a/mirgecom/simutil.py b/mirgecom/simutil.py index 2822a6b8d..db10eca17 100644 --- a/mirgecom/simutil.py +++ b/mirgecom/simutil.py @@ -72,6 +72,8 @@ """ import logging import sys +import os +import pickle from functools import partial from typing import TYPE_CHECKING, Dict, List, Optional from contextlib import contextmanager @@ -1053,7 +1055,7 @@ def partition_generator_func(mesh, tag_to_elements, num_ranks): num_reading_batches = max(int(num_reading_ranks / num_per_batch), 1) read_batch = int(my_reader_rank / num_per_batch) - print(f"Reading(rank, batch): ({my_reader_rank}, " + print(f"Read(rank, batch): Dist({my_reader_rank}, " f"{read_batch}) on {hostname}.") if logmgr: @@ -1236,7 +1238,8 @@ def distribute_mesh_pkl(comm, get_mesh_data, filename="mesh", my_ending_rank = my_starting_rank + num_ranks_this_reader - 1 ranks_to_write = list(range(my_starting_rank, my_ending_rank+1)) - print(f"R({my_rank},{reader_rank}): W({my_starting_rank},{my_ending_rank})") + print(f"R({my_rank},{reader_rank}): " + f"W({my_starting_rank},{my_ending_rank})") if partition_generator_func is None: def partition_generator_func(mesh, tag_to_elements, num_target_ranks): @@ -1287,26 +1290,20 @@ def get_rank_to_mesh_data(): else: rank_to_mesh_data = get_rank_to_mesh_data() - import os - import pickle if logmgr: logmgr.add_quantity(t_mesh_dist) with t_mesh_dist.get_sub_timer(): - for rank in ranks_to_write: - local_rank_index = rank - my_starting_rank - rank_mesh_data = rank_to_mesh_data[local_rank_index] - mesh_data_to_pickle = (mesh.nelements, rank_mesh_data) - pkl_filename = filename + f"_rank{rank}.pkl" + for part_rank, part_mesh in rank_to_mesh_data.items(): + pkl_filename = filename + f"_rank{part_rank}.pkl" + mesh_data_to_pickle = (mesh.nelements, part_mesh) if os.path.exists(pkl_filename): os.remove(pkl_filename) with open(pkl_filename, "wb") as pkl_file: pickle.dump(mesh_data_to_pickle, pkl_file) else: - for rank in ranks_to_write: - local_rank_index = rank - my_starting_rank - rank_mesh_data = rank_to_mesh_data[local_rank_index] - mesh_data_to_pickle = (mesh.nelements, rank_mesh_data) - pkl_filename = filename + f"_rank{rank}.pkl" + for part_rank, part_mesh in rank_to_mesh_data.items(): + pkl_filename = filename + f"_rank{part_rank}.pkl" + mesh_data_to_pickle = (mesh.nelements, part_mesh) if os.path.exists(pkl_filename): os.remove(pkl_filename) with open(pkl_filename, "wb") as pkl_file: