Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

part-persist: implement message aggregation #13039

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

AxelSchneewind
Copy link
Contributor

Created a component for partitioned communication that supports message aggregation, based on the existing part-persist component (using the refactored code from #12972). The goal is to prevent drops in effective bandwidth when using too fine-grained partitionings.

This component allows enforcing hard limits on size and count of transferred partitions, regardless of the partitioning required by the application. These limits can be specified using mca-parameters (min_message_size, max_message_count). Their default values might require revision.

If a user-provided partitioning violates the constraints, a more coarse-grained partitioning is selected, where multiple user-partitions are mapped to an internal (transfer) partition. Transfer of an internal partition is started as soon as Pready has been called on all corresponding user-partitions. Each transfer partition is associated with an atomic counter, tracking the number of corresponding user-partitions that have been marked ready.

This implementation is a result of "Benchmarking the State of MPI Partitioned Communication in Open MPI" presented at EuroMPI 2024 (https://events.vsc.ac.at/event/123/page/341-program).

@AxelSchneewind AxelSchneewind marked this pull request as ready for review January 15, 2025 15:00
Signed-off-by: Axel Schneewind <[email protected]>
This aggregation scheme is intended to allow OpenMPI to transfer larger messages
if the user-reported partitions are too small or too high in number.
This is achieved by using an internal partitioning where each internal (transfer)
partition corresponds to one or multiple user-reported partitions.

The scheme provides a method to mark a user partition as ready, that optionally
returns a transfer partition that is ready.

This is implemented by associating each transfer partition with an atomic counter,
tracking the number of corresponding pready-calls already made.
If a counter reaches the number of corresponding user-partitions, the corresponding
transfer partition is returned.

This implementation is thread-safe.

Signed-off-by: Axel Schneewind <[email protected]>
A few changes are made to the component to allow for using a different
partitioning internally than reported by the user:

During initialization, an internal partitioning is computed from the
user-provided one, taking into account the limits imposed by
the mca-parameters on partition size and count.
This internal partitioning can be equal to or less fine-grained
than the one provided by the user.

The sender side request objects now need to hold state for message aggregation,
therefore an aggregation_state field is added. This is not required for receive
side requests, as the receiver side code already supports differing send-side
partitionings.

As transfer partition sizes might not divide user-partition sizes, the last
transfer partition can correspond to fewer user-partitions than the other ones.
For this reason, a field (real_remainder) is introduced into request objects that
holds the number of elements corresponding to the last transfer
partition.
This field is also added to the setup information exchanged between sender and
receiver during initialization.

To account for this potentially smaller last
transfer partition, the initialization of the internal persistent requests is
adjusted.

Signed-off-by: Axel Schneewind <[email protected]>
@AxelSchneewind AxelSchneewind force-pushed the part-persist-aggregated branch from 224e839 to 84882d1 Compare January 17, 2025 13:54
@janjust janjust requested a review from mdosanjh January 28, 2025 16:14
@janjust
Copy link
Contributor

janjust commented Jan 28, 2025

@mdosanjh Any chance we could get an initial review from you on this PR?

@mdosanjh
Copy link
Contributor

mdosanjh commented Jan 28, 2025 via email

@hppritcha
Copy link
Member

i think someone needs to look at the mpi4py failure. looks specific to changes in this PR.

@cniethammer
Copy link
Contributor

Just an assumption from my side having a look at the error message without diving into the mpi4py code: It could be related to the extension of fields in the Request handle, which is mapped to a python object in mpi4py.

@hppritcha
Copy link
Member

I doubt that. Probably failing only with mpi4py because it has the most extensive test suite in our CI infrastructure. This pr should not be merged till mpi4py passes.

@hppritcha
Copy link
Member

If its any help here's where the segfault is occurring:

514	mca_part_persist_aggregated_start(size_t count, ompi_request_t** requests)
515	{
516	    int err = OMPI_SUCCESS;
517	    size_t _count = count;
518	
519	    for(size_t i = 0; i < _count && OMPI_SUCCESS == err; i++) {
520	        mca_part_persist_aggregated_request_t *req = (mca_part_persist_aggregated_request_t *)(requests[i]);
521	
522	        // reset aggregation state here
523	        if (MCA_PART_PERSIST_AGGREGATED_REQUEST_PSEND == req->req_type) {
524	            mca_part_persist_aggregated_psend_request_t *sendreq = (mca_part_persist_aggregated_psend_request_t *)(req);
525	            part_persist_aggregate_regular_reset(&sendreq->aggregation_state);
526	        }
527	
528	        /* First use is a special case, to support lazy initialization */
529	        if(false == req->first_send)
530	        {
531	            if(MCA_PART_PERSIST_AGGREGATED_REQUEST_PSEND == req->req_type) {
532	                req->done_count = 0;
533	                memset((void*)req->flags,0,sizeof(int32_t)*req->real_parts);
534	            } else {
535	                req->done_count = 0;
536	                err = req->persist_reqs[0]->req_start(req->real_parts, req->persist_reqs);
537	                memset((void*)req->flags,0,sizeof(int32_t)*req->real_parts);
538	            }
539	        } else {
540	            if(MCA_PART_PERSIST_AGGREGATED_REQUEST_PSEND == req->req_type) {
541	                req->done_count = 0;
542	                for(size_t j = 0; j < req->real_parts && OMPI_SUCCESS == err; j++) {
543	                    req->flags[j] = -1;
544	                }
545	            } else {
546	                req->done_count = 0;
547	            }
548	        }
549	        req->req_ompi.req_state = OMPI_REQUEST_ACTIVE;
550	        req->req_ompi.req_status.MPI_TAG = MPI_ANY_TAG;

at line 533. The problem is req->flags is NULL.

@AxelSchneewind
Copy link
Contributor Author

AxelSchneewind commented Feb 6, 2025

Thank you very much, I will look into it. That error never occurred with my tests but I probably missed an edge case

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants