From 88d639d5f652cf005ac656beb6f56d09c5e68a56 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Mon, 6 Nov 2023 14:57:19 +0100 Subject: [PATCH] =?UTF-8?q?Switch=20to=20`Flux.flatMapSequential(=E2=80=A6?= =?UTF-8?q?)`=20to=20prevent=20backpressure=20shaping.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We now use Flux.flatMapSequential(…) instead of concatMap as concatMap reduces the request size to 1. The change in backpressure/request size reduces parallelism and impacts the batch size by fetching 2 documents instead of considering the actual backpressure. flatMapSequential doesn't tamper the requested amount while retaining the sequence order. Closes: #4543 Original Pull Request: #4550 --- .../core/DefaultReactiveBulkOperations.java | 2 +- .../mongodb/core/ReactiveMongoTemplate.java | 18 ++++----- .../SimpleReactiveMongoRepository.java | 7 ++-- .../core/ReactiveMongoTemplateUnitTests.java | 40 ++++++++++++++++++- 4 files changed, 53 insertions(+), 14 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java index 45d6709c91..8939589888 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java @@ -216,7 +216,7 @@ private Mono bulkWriteTo(MongoCollection collection) collection = collection.withWriteConcern(defaultWriteConcern); } - Flux concat = Flux.concat(models).flatMap(it -> { + Flux concat = Flux.concat(models).flatMapSequential(it -> { if (it.model()instanceof InsertOneModel iom) { diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java index dd9afa1e72..7e96040d6e 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java @@ -1041,7 +1041,7 @@ private Flux aggregateAndMap(MongoCollection collection, List Flux> geoNear(NearQuery near, Class entityClass, S .withOptions(optionsBuilder.build()); return aggregate($geoNear, collection, Document.class) // - .concatMap(callback::doWith); + .flatMapSequential(callback::doWith); } @Override @@ -1314,7 +1314,7 @@ public Flux insertAll(Mono> batchToSave Assert.notNull(batchToSave, "Batch to insert must not be null"); - return Flux.from(batchToSave).flatMap(collection -> insert(collection, collectionName)); + return Flux.from(batchToSave).flatMapSequential(collection -> insert(collection, collectionName)); } @Override @@ -1382,7 +1382,7 @@ public Flux insertAll(Collection objectsToSave) { @Override public Flux insertAll(Mono> objectsToSave) { - return Flux.from(objectsToSave).flatMap(this::insertAll); + return Flux.from(objectsToSave).flatMapSequential(this::insertAll); } protected Flux doInsertAll(Collection listToSave, MongoWriter writer) { @@ -1433,7 +1433,7 @@ protected Flux doInsertBatch(String collectionName, Collection { + return insertDocuments.flatMapSequential(tuple -> { Document document = tuple.getT2(); Object id = MappedDocument.of(document).getId(); @@ -1590,7 +1590,7 @@ protected Flux insertDocumentList(String collectionName, List { + }).flatMapSequential(s -> { return Flux.fromStream(documents.stream() // .map(MappedDocument::of) // @@ -2147,7 +2147,7 @@ public Flux mapReduce(Query filterQuery, Class domainType, String inpu publisher = collation.map(Collation::toMongoCollation).map(publisher::collation).orElse(publisher); return Flux.from(publisher) - .concatMap(new ReadDocumentCallback<>(mongoConverter, resultType, inputCollectionName)::doWith); + .flatMapSequential(new ReadDocumentCallback<>(mongoConverter, resultType, inputCollectionName)::doWith); }); } @@ -2215,7 +2215,7 @@ protected Flux doFindAndDelete(String collectionName, Query query, Class< return Flux.from(flux).collectList().filter(it -> !it.isEmpty()) .flatMapMany(list -> Flux.from(remove(operations.getByIdInQuery(list), entityClass, collectionName)) - .flatMap(deleteResult -> Flux.fromIterable(list))); + .flatMapSequential(deleteResult -> Flux.fromIterable(list))); } /** @@ -2674,7 +2674,7 @@ private Flux executeFindMultiInternal(ReactiveCollectionQueryCallback { return Flux.from(preparer.initiateFind(collection, collectionCallback::doInCollection)) - .concatMap(objectCallback::doWith); + .flatMapSequential(objectCallback::doWith); }); } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java index 69b592468d..cf4801319a 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java @@ -109,7 +109,7 @@ public Flux saveAll(Publisher entityStream) { Assert.notNull(entityStream, "The given Publisher of entities must not be null"); - return Flux.from(entityStream).flatMap(entity -> entityInformation.isNew(entity) ? // + return Flux.from(entityStream).flatMapSequential(entity -> entityInformation.isNew(entity) ? // mongoOperations.insert(entity, entityInformation.getCollectionName()) : // mongoOperations.save(entity, entityInformation.getCollectionName())); } @@ -167,7 +167,7 @@ public Flux findAllById(Publisher ids) { Assert.notNull(ids, "The given Publisher of Id's must not be null"); - return Flux.from(ids).buffer().flatMap(this::findAllById); + return Flux.from(ids).buffer().flatMapSequential(this::findAllById); } @Override @@ -297,7 +297,8 @@ public Flux insert(Publisher entities) { Assert.notNull(entities, "The given Publisher of entities must not be null"); - return Flux.from(entities).flatMap(entity -> mongoOperations.insert(entity, entityInformation.getCollectionName())); + return Flux.from(entities) + .flatMapSequential(entity -> mongoOperations.insert(entity, entityInformation.getCollectionName())); } // ------------------------------------------------------------------------- diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java index 6b20fcc6d1..c8c504edb5 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java @@ -36,6 +36,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import org.assertj.core.api.Assertions; @@ -698,6 +699,28 @@ void aggreateShouldUseReadReadPreference() { verify(collection).withReadPreference(ReadPreference.primaryPreferred()); } + @Test // GH-4543 + void aggregateDoesNotLimitBackpressure() { + + reset(collection); + + AtomicLong request = new AtomicLong(); + Publisher realPublisher = Flux.just(new Document()).doOnRequest(request::addAndGet); + + doAnswer(invocation -> { + Subscriber subscriber = invocation.getArgument(0); + realPublisher.subscribe(subscriber); + return null; + }).when(aggregatePublisher).subscribe(any()); + + when(collection.aggregate(anyList())).thenReturn(aggregatePublisher); + when(collection.aggregate(anyList(), any(Class.class))).thenReturn(aggregatePublisher); + + template.aggregate(newAggregation(Sith.class, project("id")), AutogenerateableId.class, Document.class).subscribe(); + + assertThat(request).hasValueGreaterThan(128); + } + @Test // DATAMONGO-1854 void aggreateShouldUseCollationFromOptionsEvenIfDefaultCollationIsPresent() { @@ -1262,6 +1285,17 @@ void findShouldInvokeAfterConvertCallbacks() { assertThat(results.get(0).id).isEqualTo("after-convert"); } + @Test // GH-4543 + void findShouldNotLimitBackpressure() { + + AtomicLong request = new AtomicLong(); + stubFindSubscribe(new Document(), request); + + template.find(new Query(), Person.class).subscribe(); + + assertThat(request).hasValueGreaterThan(128); + } + @Test // DATAMONGO-2479 void findByIdShouldInvokeAfterConvertCallbacks() { @@ -1626,8 +1660,12 @@ void changeStreamOptionFullDocumentBeforeChangeShouldBeApplied() { } private void stubFindSubscribe(Document document) { + stubFindSubscribe(document, new AtomicLong()); + } + + private void stubFindSubscribe(Document document, AtomicLong request) { - Publisher realPublisher = Flux.just(document); + Publisher realPublisher = Flux.just(document).doOnRequest(request::addAndGet); doAnswer(invocation -> { Subscriber subscriber = invocation.getArgument(0);