Skip to content

Commit

Permalink
Switch to Flux.flatMapSequential(…) to prevent backpressure shaping.
Browse files Browse the repository at this point in the history
We now use Flux.flatMapSequential(…) instead of concatMap as concatMap reduces the request size to 1. The change in backpressure/request size reduces parallelism and impacts the batch size by fetching 2 documents instead of considering the actual backpressure.

flatMapSequential doesn't tamper the requested amount while retaining the sequence order.

Closes: #4543
Original Pull Request: #4550
  • Loading branch information
mp911de authored and christophstrobl committed Nov 7, 2023
1 parent faccaf0 commit 88d639d
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ private Mono<BulkWriteResult> bulkWriteTo(MongoCollection<Document> collection)
collection = collection.withWriteConcern(defaultWriteConcern);
}

Flux<SourceAwareWriteModelHolder> concat = Flux.concat(models).flatMap(it -> {
Flux<SourceAwareWriteModelHolder> concat = Flux.concat(models).flatMapSequential(it -> {

if (it.model()instanceof InsertOneModel<Document> iom) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,7 @@ private <O> Flux<O> aggregateAndMap(MongoCollection<Document> collection, List<D
return (isOutOrMerge ? Flux.from(cursor.toCollection()) : Flux.from(cursor.first())).thenMany(Mono.empty());
}

return Flux.from(cursor).concatMap(readCallback::doWith);
return Flux.from(cursor).flatMapSequential(readCallback::doWith);
}

@Override
Expand Down Expand Up @@ -1088,7 +1088,7 @@ protected <T> Flux<GeoResult<T>> geoNear(NearQuery near, Class<?> entityClass, S
.withOptions(optionsBuilder.build());

return aggregate($geoNear, collection, Document.class) //
.concatMap(callback::doWith);
.flatMapSequential(callback::doWith);
}

@Override
Expand Down Expand Up @@ -1314,7 +1314,7 @@ public <T> Flux<T> insertAll(Mono<? extends Collection<? extends T>> batchToSave

Assert.notNull(batchToSave, "Batch to insert must not be null");

return Flux.from(batchToSave).flatMap(collection -> insert(collection, collectionName));
return Flux.from(batchToSave).flatMapSequential(collection -> insert(collection, collectionName));
}

@Override
Expand Down Expand Up @@ -1382,7 +1382,7 @@ public <T> Flux<T> insertAll(Collection<? extends T> objectsToSave) {

@Override
public <T> Flux<T> insertAll(Mono<? extends Collection<? extends T>> objectsToSave) {
return Flux.from(objectsToSave).flatMap(this::insertAll);
return Flux.from(objectsToSave).flatMapSequential(this::insertAll);
}

protected <T> Flux<T> doInsertAll(Collection<? extends T> listToSave, MongoWriter<Object> writer) {
Expand Down Expand Up @@ -1433,7 +1433,7 @@ protected <T> Flux<T> doInsertBatch(String collectionName, Collection<? extends
return insertDocumentList(collectionName, documents).thenMany(Flux.fromIterable(tuples));
});

return insertDocuments.flatMap(tuple -> {
return insertDocuments.flatMapSequential(tuple -> {

Document document = tuple.getT2();
Object id = MappedDocument.of(document).getId();
Expand Down Expand Up @@ -1590,7 +1590,7 @@ protected Flux<ObjectId> insertDocumentList(String collectionName, List<Document

return collectionToUse.insertMany(documents);

}).flatMap(s -> {
}).flatMapSequential(s -> {

return Flux.fromStream(documents.stream() //
.map(MappedDocument::of) //
Expand Down Expand Up @@ -2147,7 +2147,7 @@ public <T> Flux<T> mapReduce(Query filterQuery, Class<?> domainType, String inpu
publisher = collation.map(Collation::toMongoCollation).map(publisher::collation).orElse(publisher);

return Flux.from(publisher)
.concatMap(new ReadDocumentCallback<>(mongoConverter, resultType, inputCollectionName)::doWith);
.flatMapSequential(new ReadDocumentCallback<>(mongoConverter, resultType, inputCollectionName)::doWith);
});
}

Expand Down Expand Up @@ -2215,7 +2215,7 @@ protected <T> Flux<T> doFindAndDelete(String collectionName, Query query, Class<

return Flux.from(flux).collectList().filter(it -> !it.isEmpty())
.flatMapMany(list -> Flux.from(remove(operations.getByIdInQuery(list), entityClass, collectionName))
.flatMap(deleteResult -> Flux.fromIterable(list)));
.flatMapSequential(deleteResult -> Flux.fromIterable(list)));
}

/**
Expand Down Expand Up @@ -2674,7 +2674,7 @@ private <T> Flux<T> executeFindMultiInternal(ReactiveCollectionQueryCallback<Doc

return createFlux(collectionName, collection -> {
return Flux.from(preparer.initiateFind(collection, collectionCallback::doInCollection))
.concatMap(objectCallback::doWith);
.flatMapSequential(objectCallback::doWith);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) {

Assert.notNull(entityStream, "The given Publisher of entities must not be null");

return Flux.from(entityStream).flatMap(entity -> entityInformation.isNew(entity) ? //
return Flux.from(entityStream).flatMapSequential(entity -> entityInformation.isNew(entity) ? //
mongoOperations.insert(entity, entityInformation.getCollectionName()) : //
mongoOperations.save(entity, entityInformation.getCollectionName()));
}
Expand Down Expand Up @@ -167,7 +167,7 @@ public Flux<T> findAllById(Publisher<ID> ids) {

Assert.notNull(ids, "The given Publisher of Id's must not be null");

return Flux.from(ids).buffer().flatMap(this::findAllById);
return Flux.from(ids).buffer().flatMapSequential(this::findAllById);
}

@Override
Expand Down Expand Up @@ -297,7 +297,8 @@ public <S extends T> Flux<S> insert(Publisher<S> entities) {

Assert.notNull(entities, "The given Publisher of entities must not be null");

return Flux.from(entities).flatMap(entity -> mongoOperations.insert(entity, entityInformation.getCollectionName()));
return Flux.from(entities)
.flatMapSequential(entity -> mongoOperations.insert(entity, entityInformation.getCollectionName()));
}

// -------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import org.assertj.core.api.Assertions;
Expand Down Expand Up @@ -698,6 +699,28 @@ void aggreateShouldUseReadReadPreference() {
verify(collection).withReadPreference(ReadPreference.primaryPreferred());
}

@Test // GH-4543
void aggregateDoesNotLimitBackpressure() {

reset(collection);

AtomicLong request = new AtomicLong();
Publisher<Document> realPublisher = Flux.just(new Document()).doOnRequest(request::addAndGet);

doAnswer(invocation -> {
Subscriber<Document> subscriber = invocation.getArgument(0);
realPublisher.subscribe(subscriber);
return null;
}).when(aggregatePublisher).subscribe(any());

when(collection.aggregate(anyList())).thenReturn(aggregatePublisher);
when(collection.aggregate(anyList(), any(Class.class))).thenReturn(aggregatePublisher);

template.aggregate(newAggregation(Sith.class, project("id")), AutogenerateableId.class, Document.class).subscribe();

assertThat(request).hasValueGreaterThan(128);
}

@Test // DATAMONGO-1854
void aggreateShouldUseCollationFromOptionsEvenIfDefaultCollationIsPresent() {

Expand Down Expand Up @@ -1262,6 +1285,17 @@ void findShouldInvokeAfterConvertCallbacks() {
assertThat(results.get(0).id).isEqualTo("after-convert");
}

@Test // GH-4543
void findShouldNotLimitBackpressure() {

AtomicLong request = new AtomicLong();
stubFindSubscribe(new Document(), request);

template.find(new Query(), Person.class).subscribe();

assertThat(request).hasValueGreaterThan(128);
}

@Test // DATAMONGO-2479
void findByIdShouldInvokeAfterConvertCallbacks() {

Expand Down Expand Up @@ -1626,8 +1660,12 @@ void changeStreamOptionFullDocumentBeforeChangeShouldBeApplied() {
}

private void stubFindSubscribe(Document document) {
stubFindSubscribe(document, new AtomicLong());
}

private void stubFindSubscribe(Document document, AtomicLong request) {

Publisher<Document> realPublisher = Flux.just(document);
Publisher<Document> realPublisher = Flux.just(document).doOnRequest(request::addAndGet);

doAnswer(invocation -> {
Subscriber<Document> subscriber = invocation.getArgument(0);
Expand Down

0 comments on commit 88d639d

Please sign in to comment.