Skip to content

Commit

Permalink
Retain order doing reactive save operations with multiple elements.
Browse files Browse the repository at this point in the history
Ensure subscription order on multi document operations.

Original pull request: #4824
Closes #4804
  • Loading branch information
christophstrobl authored and mp911de committed Nov 12, 2024
1 parent 6200440 commit d0ee280
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1413,7 +1413,7 @@ protected <T> Flux<T> doInsertAll(Collection<? extends T> listToSave, MongoWrite
});

return Flux.fromIterable(elementsByCollection.keySet())
.flatMap(collectionName -> doInsertBatch(collectionName, elementsByCollection.get(collectionName), writer));
.concatMap(collectionName -> doInsertBatch(collectionName, elementsByCollection.get(collectionName), writer));
}

protected <T> Flux<T> doInsertBatch(String collectionName, Collection<? extends T> batchToSave,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,16 @@ public <S extends T> Flux<S> saveAll(Iterable<S> entities) {
Streamable<S> source = Streamable.of(entities);

return source.stream().allMatch(entityInformation::isNew) ? //
mongoOperations.insert(source.stream().collect(Collectors.toList()), entityInformation.getCollectionName()) : //
Flux.fromIterable(entities).flatMap(this::save);
insert(entities) :
Flux.fromIterable(entities).concatMap(this::save);
}

@Override
public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) {

Assert.notNull(entityStream, "The given Publisher of entities must not be null");

return Flux.from(entityStream).flatMapSequential(entity -> entityInformation.isNew(entity) ? //
return Flux.from(entityStream).concatMap(entity -> entityInformation.isNew(entity) ? //
mongoOperations.insert(entity, entityInformation.getCollectionName()) : //
mongoOperations.save(entity, entityInformation.getCollectionName()));
}
Expand Down Expand Up @@ -295,7 +295,7 @@ public Mono<Void> deleteAll(Publisher<? extends T> entityStream) {
Optional<ReadPreference> readPreference = getReadPreference();
return Flux.from(entityStream)//
.map(entityInformation::getRequiredId)//
.flatMap(id -> deleteById(id, readPreference))//
.concatMap(id -> deleteById(id, readPreference))//
.then();
}

Expand Down Expand Up @@ -336,17 +336,15 @@ public <S extends T> Flux<S> insert(Iterable<S> entities) {
Assert.notNull(entities, "The given Iterable of entities must not be null");

Collection<S> source = toCollection(entities);

return source.isEmpty() ? Flux.empty() : mongoOperations.insertAll(source);
return source.isEmpty() ? Flux.empty() : mongoOperations.insert(source, entityInformation.getCollectionName());
}

@Override
public <S extends T> Flux<S> insert(Publisher<S> entities) {

Assert.notNull(entities, "The given Publisher of entities must not be null");

return Flux.from(entities)
.flatMapSequential(entity -> mongoOperations.insert(entity, entityInformation.getCollectionName()));
return Flux.from(entities).concatMap(this::insert);
}

// -------------------------------------------------------------------------
Expand Down

0 comments on commit d0ee280

Please sign in to comment.