Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for tags that affected PointQuery on some systems. #1327

Merged
merged 9 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ and this project aspires to adhere to [Semantic Versioning](https://semver.org/s
#### Conduit
- Changed the MPI CMake target used by conduit from `MPI:MPI_CXX` to `MPI:MPI_C` to provide better compatibility with downstream tools.

#### Blueprint
- Certain algorithms that use MPI tags had their tag values lowered since some MPI implementations do not support large values.

#### Relay
- User-supplied warning and error handlers are suspended during `conduit::relay::communicate_using_schema::execute()` so exceptions will be thrown properly when there is an MPI error. The handlers are restored before the execute method returns.
- `conduit::relay::communicate_using_schema::execute()` flushes logs as they are generated, in case of error. This is mostly to facilitate internal debugging.
- Changes were made to how Relay queries the upper limit for MPI tags to work around problems on some systems.

## [0.9.2] - Released 2024-05-21

### Added
Expand Down
17 changes: 9 additions & 8 deletions src/libs/blueprint/conduit_blueprint_mpi_mesh_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ PointQuery::execute(const std::string &coordsetName)
// the results. The results get stored in m_domResults.
std::map<std::pair<int,int>, conduit::Node *> input_sends, result_sends,
input_recvs, result_recvs;
int inputs_tag = 55000000;
int results_tag = 66000000;
const int inputs_tag = 550;
const int results_tag = 660;
for(int pass = 0; pass < 2; pass++)
{
conduit::relay::mpi::communicate_using_schema C(m_comm);
Expand All @@ -187,10 +187,10 @@ PointQuery::execute(const std::string &coordsetName)
#endif
for(size_t i = 0; i < allqueries.size(); i += 3)
{
int asker = allqueries[i];
int domain = allqueries[i+1];
int npts = allqueries[i+2];
int owner = domain_to_rank[domain];
const int asker = allqueries[i];
const int domain = allqueries[i+1];
const int npts = allqueries[i+2];
const int owner = domain_to_rank[domain];

if(asker == rank)
{
Expand Down Expand Up @@ -278,6 +278,7 @@ PointQuery::execute(const std::string &coordsetName)
for(auto it = result_recvs.begin(); it != result_recvs.end(); it++)
{
int domain = it->first.second;

const conduit::Node &r = it->second->fetch_existing("results");
auto acc = r.as_int_accessor();
std::vector<int> &result = m_domResults[domain];
Expand Down Expand Up @@ -435,7 +436,7 @@ MatchQuery::execute()
C.set_logging_root("mpi_matchquery");
C.set_logging(true);
#endif
int query_tag = 77000000;
int query_tag = 1;
for(size_t i = 0; i < allqueries.size(); i += ntuple_values)
{
int owner = allqueries[i];
Expand Down Expand Up @@ -667,7 +668,7 @@ compare_pointwise_impl(conduit::Node &mesh, const std::string &adjsetName,

// Iterate over each of the possible adjset relationships. Not all of these
// will have adjset groups.
const int tag = 12211221;
const int tag = 1;
for(int d0 = 0; d0 < maxDomains; d0++)
{
for(int d1 = d0 + 1; d1 < maxDomains; d1++)
Expand Down
86 changes: 72 additions & 14 deletions src/libs/relay/conduit_relay_mpi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,15 +277,35 @@ bool invalid_tag(int tag)
/**
@brief MPI tags can be in the range [0,MPI_TAG_UB]. The values are
implementation-dependent. If the tag is not in that range, return
MPI_TAG_UB so it is safe to use with MPI functions.
the value for MPI_TAG_UB so it is safe to use with MPI functions.

@param comm The MPI communicator.
@param tag The input tag.

@return A tag value that is safe to use with MPI.
*/
int safe_tag(int tag)
int safe_tag(MPI_Comm comm, int tag)
{
return (tag < MPI_TAG_UB) ? ((tag >= 0) ? tag : MPI_TAG_UB) : MPI_TAG_UB;
// Get the tag upper bound for the communicator.
// MPI_Comm_get_attr with MPI_TAG_UB is not a very reliable function.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's always fun :( Thanks for identifying this issue.

constexpr int backup_tag_limit = 4096;
int tag_ub = 0, flag = 0;
int mpi_error = MPI_Comm_get_attr(comm, MPI_TAG_UB, &tag_ub, &flag);
if(mpi_error == MPI_SUCCESS && flag != 0)
{
// Some MPI implementations give a negative upper bound; maybe they
// do not really support querying MPI_TAG_UB.
if(tag_ub < 0)
{
tag_ub = backup_tag_limit;
}
}
else
{
tag_ub = backup_tag_limit;
}

return (tag < tag_ub) ? ((tag >= 0) ? tag : tag_ub) : tag_ub;
}

//---------------------------------------------------------------------------//
Expand Down Expand Up @@ -992,7 +1012,7 @@ isend(const Node &node,
static_cast<int>(data_size),
MPI_BYTE,
dest,
safe_tag(tag),
safe_tag(mpi_comm, tag),
mpi_comm,
&(request->m_request));

Expand Down Expand Up @@ -1045,7 +1065,7 @@ irecv(Node &node,
static_cast<int>(data_size),
MPI_BYTE,
src,
safe_tag(tag),
safe_tag(mpi_comm, tag),
mpi_comm,
&(request->m_request));

Expand Down Expand Up @@ -1886,14 +1906,47 @@ communicate_using_schema::add_irecv(Node &node, int src, int tag)
//-----------------------------------------------------------------------------
int
communicate_using_schema::execute()
{
// Uncomment this to install an MPI_Comm error handler.
// HandleMPICommError errHandler(comm);

// Get the currently installed warning and error handlers.
conduit::utils::conduit_warning_handler onWarning = conduit::utils::warning_handler();
conduit::utils::conduit_error_handler onError = conduit::utils::error_handler();

// Install the default exception-throwing handlers.
conduit::utils::set_warning_handler(conduit::utils::default_warning_handler);
conduit::utils::set_error_handler(conduit::utils::default_error_handler);

int retval = 0;
try
{
retval = execute_internal();

// Restore warning/error handlers.
conduit::utils::set_warning_handler(onWarning);
conduit::utils::set_error_handler(onError);
}
catch(conduit::Error &e)
{
// Restore warning/error handlers.
conduit::utils::set_warning_handler(onWarning);
conduit::utils::set_error_handler(onError);

// Rethrow the exception.
throw e;
}
return retval;
}

//-----------------------------------------------------------------------------
int
communicate_using_schema::execute_internal()
{
int mpi_error = 0;
std::vector<MPI_Request> requests(operations.size());
std::vector<MPI_Status> statuses(operations.size());

// Uncomment this to install an MPI_Comm error handler.
// HandleMPICommError errHandler(comm);

int rank, size;
MPI_Comm_rank(comm, &rank);
MPI_Comm_size(comm, &size);
Expand Down Expand Up @@ -1953,7 +2006,7 @@ communicate_using_schema::execute()
<< msg_data_size << ", "
<< "MPI_BYTE, "
<< operations[i].rank << ", "
<< safe_tag(operations[i].tag) << ", "
<< safe_tag(comm, operations[i].tag) << ", "
<< "comm, &requests[" << i << "]);" << std::endl;
}

Expand All @@ -1968,7 +2021,7 @@ communicate_using_schema::execute()
static_cast<int>(msg_data_size),
MPI_BYTE,
operations[i].rank,
safe_tag(operations[i].tag),
safe_tag(comm, operations[i].tag),
comm,
&requests[i]);
CONDUIT_CHECK_MPI_ERROR(mpi_error);
Expand All @@ -1978,6 +2031,7 @@ communicate_using_schema::execute()
if(logging)
{
log << "* Time issuing MPI_Isend calls: " << (t1-t0) << std::endl;
log.flush();
}

// Issue all the recvs.
Expand All @@ -1990,10 +2044,10 @@ communicate_using_schema::execute()
{
log << " MPI_Probe("
<< operations[i].rank << ", "
<< safe_tag(operations[i].tag) << ", "
<< safe_tag(comm, operations[i].tag) << ", "
<< "comm, &statuses[" << i << "]);" << std::endl;
}
mpi_error = MPI_Probe(operations[i].rank, safe_tag(operations[i].tag), comm, &statuses[i]);
mpi_error = MPI_Probe(operations[i].rank, safe_tag(comm, operations[i].tag), comm, &statuses[i]);
CONDUIT_CHECK_MPI_ERROR(mpi_error);

int buffer_size = 0;
Expand All @@ -2015,7 +2069,7 @@ communicate_using_schema::execute()
<< buffer_size << ", "
<< "MPI_BYTE, "
<< operations[i].rank << ", "
<< safe_tag(operations[i].tag) << ", "
<< safe_tag(comm, operations[i].tag) << ", "
<< "comm, &requests[" << i << "]);" << std::endl;
}

Expand All @@ -2024,7 +2078,7 @@ communicate_using_schema::execute()
buffer_size,
MPI_BYTE,
operations[i].rank,
safe_tag(operations[i].tag),
safe_tag(comm, operations[i].tag),
comm,
&requests[i]);
CONDUIT_CHECK_MPI_ERROR(mpi_error);
Expand All @@ -2034,20 +2088,23 @@ communicate_using_schema::execute()
if(logging)
{
log << "* Time issuing MPI_Irecv calls: " << (t2-t1) << std::endl;
log.flush();
}

// Wait for the requests to complete.
int n = static_cast<int>(operations.size());
if(logging)
{
log << " MPI_Waitall(" << n << ", &requests[0], &statuses[0]);" << std::endl;
log.flush();
}
mpi_error = MPI_Waitall(n, &requests[0], &statuses[0]);
CONDUIT_CHECK_MPI_ERROR(mpi_error);
double t3 = MPI_Wtime();
if(logging)
{
log << "* Time in MPI_Waitall: " << (t3-t2) << std::endl;
log.flush();
}

// Finish building the nodes for which we received data.
Expand Down Expand Up @@ -2082,6 +2139,7 @@ communicate_using_schema::execute()
if(logging)
{
log << "* Built output node " << i << std::endl;
log.flush();
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions src/libs/relay/conduit_relay_mpi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,13 @@ class CONDUIT_RELAY_API communicate_using_schema
private:
void clear();

/**
@brief Execute all the outstanding isend/irecv calls and reconstruct any
nodes after doing the data movement.
@return The return value from MPI_Waitall.
*/
int execute_internal();

static const int OP_SEND;
static const int OP_RECV;
struct operation
Expand Down
Loading