diff --git a/pom.xml b/pom.xml index c3245aad49..7ea6f2049d 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-mongodb-parent - 4.5.0-SNAPSHOT + 4.5.x-GH-4839-SNAPSHOT pom Spring Data MongoDB diff --git a/spring-data-mongodb-distribution/pom.xml b/spring-data-mongodb-distribution/pom.xml index 58c63dfc97..07d80daf4c 100644 --- a/spring-data-mongodb-distribution/pom.xml +++ b/spring-data-mongodb-distribution/pom.xml @@ -15,7 +15,7 @@ org.springframework.data spring-data-mongodb-parent - 4.5.0-SNAPSHOT + 4.5.x-GH-4839-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml index 98516a5ba9..0ee2725e9d 100644 --- a/spring-data-mongodb/pom.xml +++ b/spring-data-mongodb/pom.xml @@ -13,7 +13,7 @@ org.springframework.data spring-data-mongodb-parent - 4.5.0-SNAPSHOT + 4.5.x-GH-4839-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractReactiveMongoQuery.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractReactiveMongoQuery.java index 15ff5e5e23..0209ab05b6 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractReactiveMongoQuery.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractReactiveMongoQuery.java @@ -17,6 +17,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; import java.util.ArrayList; import java.util.List; @@ -61,7 +62,6 @@ import org.springframework.data.repository.query.ResultProcessor; import org.springframework.data.repository.query.ValueExpressionDelegate; import org.springframework.data.spel.ExpressionDependencies; -import org.springframework.data.util.TypeInformation; import org.springframework.expression.ExpressionParser; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.lang.Nullable; @@ -70,7 +70,6 @@ import org.springframework.util.StringUtils; import com.mongodb.MongoClientSettings; -import reactor.util.function.Tuple2; /** * Base class for reactive {@link RepositoryQuery} implementations for MongoDB. @@ -112,16 +111,20 @@ public AbstractReactiveMongoQuery(ReactiveMongoQueryMethod method, ReactiveMongo this.method = method; this.operations = operations; this.instantiators = new EntityInstantiators(); - this.valueExpressionDelegate = new ValueExpressionDelegate(new QueryMethodValueEvaluationContextAccessor(new StandardEnvironment(), evaluationContextProvider.getEvaluationContextProvider()), ValueExpressionParser.create(() -> expressionParser)); + this.valueExpressionDelegate = new ValueExpressionDelegate( + new QueryMethodValueEvaluationContextAccessor(new StandardEnvironment(), + evaluationContextProvider.getEvaluationContextProvider()), + ValueExpressionParser.create(() -> expressionParser)); MongoEntityMetadata metadata = method.getEntityInformation(); Class type = metadata.getCollectionEntity().getType(); this.findOperationWithProjection = operations.query(type); this.updateOps = operations.update(type); - ValueEvaluationContextProvider valueContextProvider = valueExpressionDelegate.createValueContextProvider( - method.getParameters()); - Assert.isInstanceOf(ReactiveValueEvaluationContextProvider.class, valueContextProvider, "ValueEvaluationContextProvider must be reactive"); + ValueEvaluationContextProvider valueContextProvider = valueExpressionDelegate + .createValueContextProvider(method.getParameters()); + Assert.isInstanceOf(ReactiveValueEvaluationContextProvider.class, valueContextProvider, + "ValueEvaluationContextProvider must be reactive"); this.valueEvaluationContextProvider = (ReactiveValueEvaluationContextProvider) valueContextProvider; } @@ -151,9 +154,10 @@ public AbstractReactiveMongoQuery(ReactiveMongoQueryMethod method, ReactiveMongo this.findOperationWithProjection = operations.query(type); this.updateOps = operations.update(type); - ValueEvaluationContextProvider valueContextProvider = valueExpressionDelegate.createValueContextProvider( - method.getParameters()); - Assert.isInstanceOf(ReactiveValueEvaluationContextProvider.class, valueContextProvider, "ValueEvaluationContextProvider must be reactive"); + ValueEvaluationContextProvider valueContextProvider = valueExpressionDelegate + .createValueContextProvider(method.getParameters()); + Assert.isInstanceOf(ReactiveValueEvaluationContextProvider.class, valueContextProvider, + "ValueEvaluationContextProvider must be reactive"); this.valueEvaluationContextProvider = (ReactiveValueEvaluationContextProvider) valueContextProvider; } @@ -182,14 +186,9 @@ private Publisher execute(MongoParameterAccessor parameterAccessor) { ConvertingParameterAccessor accessor = new ConvertingParameterAccessor(operations.getConverter(), parameterAccessor); - TypeInformation returnType = method.getReturnType(); ResultProcessor processor = method.getResultProcessor().withDynamicProjection(accessor); Class typeToRead = processor.getReturnedType().getTypeToRead(); - if (typeToRead == null && returnType.getComponentType() != null) { - typeToRead = returnType.getComponentType().getType(); - } - return doExecute(method, processor, accessor, typeToRead); } @@ -221,11 +220,15 @@ protected Publisher doExecute(ReactiveMongoQueryMethod method, ResultPro String collection = method.getEntityInformation().getCollectionName(); ReactiveMongoQueryExecution execution = getExecution(accessor, - new ResultProcessingConverter(processor, operations, instantiators), find); + getResultProcessing(processor), find); return execution.execute(query, processor.getReturnedType().getDomainType(), collection); }); } + ResultProcessingConverter getResultProcessing(ResultProcessor processor) { + return new ResultProcessingConverter(processor, operations, instantiators); + } + /** * Returns the execution instance to use. * @@ -439,8 +442,8 @@ private Mono> ex return getValueExpressionEvaluatorLater(dependencies, accessor).zipWith(Mono.just(codec)); } - private Document decode(Tuple2 expressionEvaluator, String source, MongoParameterAccessor accessor, - ParameterBindingDocumentCodec codec) { + private Document decode(Tuple2 expressionEvaluator, + String source, MongoParameterAccessor accessor, ParameterBindingDocumentCodec codec) { ParameterBindingContext bindingContext = new ParameterBindingContext(accessor::getBindableValue, expressionEvaluator.getT1()); @@ -490,8 +493,8 @@ ValueExpressionEvaluator getValueExpressionEvaluator(MongoParameterAccessor acce @Override public T evaluate(String expressionString) { ValueExpression expression = valueExpressionDelegate.parse(expressionString); - ValueEvaluationContext evaluationContext = valueEvaluationContextProvider.getEvaluationContext(accessor.getValues(), - expression.getExpressionDependencies()); + ValueEvaluationContext evaluationContext = valueEvaluationContextProvider + .getEvaluationContext(accessor.getValues(), expression.getExpressionDependencies()); return (T) expression.evaluate(evaluationContext); } }; @@ -509,8 +512,9 @@ public T evaluate(String expressionString) { protected Mono getValueExpressionEvaluatorLater(ExpressionDependencies dependencies, MongoParameterAccessor accessor) { - return valueEvaluationContextProvider.getEvaluationContextLater(accessor.getValues(), dependencies) - .map(evaluationContext -> new ValueExpressionDelegateValueExpressionEvaluator(valueExpressionDelegate, valueExpression -> evaluationContext)); + return valueEvaluationContextProvider.getEvaluationContextLater(accessor.getValues(), dependencies) + .map(evaluationContext -> new ValueExpressionDelegateValueExpressionEvaluator(valueExpressionDelegate, + valueExpression -> evaluationContext)); } /** diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AggregationUtils.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AggregationUtils.java index bef1659308..3d99ed9e54 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AggregationUtils.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AggregationUtils.java @@ -17,6 +17,7 @@ import java.time.Duration; import java.util.Map; +import java.util.function.Function; import java.util.function.IntUnaryOperator; import java.util.function.LongUnaryOperator; @@ -28,11 +29,18 @@ import org.springframework.data.mongodb.core.aggregation.Aggregation; import org.springframework.data.mongodb.core.aggregation.AggregationOptions; import org.springframework.data.mongodb.core.aggregation.AggregationPipeline; +import org.springframework.data.mongodb.core.aggregation.AggregationResults; +import org.springframework.data.mongodb.core.aggregation.TypedAggregation; import org.springframework.data.mongodb.core.convert.MongoConverter; import org.springframework.data.mongodb.core.mapping.FieldName; +import org.springframework.data.mongodb.core.mapping.MongoSimpleTypes; import org.springframework.data.mongodb.core.query.Collation; import org.springframework.data.mongodb.core.query.Meta; import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.repository.query.ResultProcessor; +import org.springframework.data.repository.query.ReturnedType; +import org.springframework.data.util.ReflectionUtils; +import org.springframework.data.util.TypeInformation; import org.springframework.lang.Nullable; import org.springframework.util.ClassUtils; import org.springframework.util.ObjectUtils; @@ -116,13 +124,15 @@ static AggregationOptions.Builder applyHint(AggregationOptions.Builder builder, } /** - * If present apply the preference from the {@link org.springframework.data.mongodb.repository.ReadPreference} annotation. + * If present apply the preference from the {@link org.springframework.data.mongodb.repository.ReadPreference} + * annotation. * * @param builder must not be {@literal null}. * @return never {@literal null}. * @since 4.2 */ - static AggregationOptions.Builder applyReadPreference(AggregationOptions.Builder builder, MongoQueryMethod queryMethod) { + static AggregationOptions.Builder applyReadPreference(AggregationOptions.Builder builder, + MongoQueryMethod queryMethod) { if (!queryMethod.hasAnnotatedReadPreference()) { return builder; @@ -131,6 +141,90 @@ static AggregationOptions.Builder applyReadPreference(AggregationOptions.Builder return builder.readPreference(ReadPreference.valueOf(queryMethod.getAnnotatedReadPreference())); } + static AggregationOptions computeOptions(MongoQueryMethod method, ConvertingParameterAccessor accessor, + AggregationPipeline pipeline, ValueExpressionEvaluator evaluator) { + + AggregationOptions.Builder builder = Aggregation.newAggregationOptions(); + + AggregationUtils.applyCollation(builder, method.getAnnotatedCollation(), accessor, evaluator); + AggregationUtils.applyMeta(builder, method); + AggregationUtils.applyHint(builder, method); + AggregationUtils.applyReadPreference(builder, method); + + TypeInformation returnType = method.getReturnType(); + if (returnType.getComponentType() != null) { + returnType = returnType.getRequiredComponentType(); + } + if (ReflectionUtils.isVoid(returnType.getType()) && pipeline.isOutOrMerge()) { + builder.skipOutput(); + } + + return builder.build(); + } + + /** + * Prepares the AggregationPipeline including type discovery and calling {@link AggregationCallback} to run the + * aggregation. + */ + @Nullable + static T doAggregate(AggregationPipeline pipeline, MongoQueryMethod method, ResultProcessor processor, + ConvertingParameterAccessor accessor, + Function evaluatorFunction, AggregationCallback callback) { + + Class sourceType = method.getDomainClass(); + ReturnedType returnedType = processor.getReturnedType(); + // 🙈 + TypeInformation returnType = method.getReturnType(); + Class returnElementType = (returnType.getComponentType() != null ? returnType.getRequiredComponentType() + : returnType).getType(); + Class entityType; + + boolean isRawAggregationResult = ClassUtils.isAssignable(AggregationResults.class, method.getReturnedObjectType()); + + if (returnElementType.equals(Document.class)) { + entityType = sourceType; + } else { + entityType = returnElementType; + } + + AggregationUtils.appendSortIfPresent(pipeline, accessor, entityType); + + if (method.isSliceQuery()) { + AggregationUtils.appendLimitAndOffsetIfPresent(pipeline, accessor, LongUnaryOperator.identity(), + limit -> limit + 1); + } else { + AggregationUtils.appendLimitAndOffsetIfPresent(pipeline, accessor); + } + + AggregationOptions options = AggregationUtils.computeOptions(method, accessor, pipeline, + evaluatorFunction.apply(accessor)); + TypedAggregation aggregation = new TypedAggregation<>(sourceType, pipeline.getOperations(), options); + + boolean isSimpleReturnType = MongoSimpleTypes.HOLDER.isSimpleType(returnElementType); + Class typeToRead; + + if (isSimpleReturnType) { + typeToRead = Document.class; + } else if (isRawAggregationResult) { + typeToRead = returnElementType; + } else { + + if (returnedType.isProjecting()) { + typeToRead = returnedType.getReturnedType().isInterface() ? Document.class : returnedType.getReturnedType(); + } else { + typeToRead = entityType; + } + } + + return callback.doAggregate(aggregation, sourceType, typeToRead, returnElementType, isSimpleReturnType, + isRawAggregationResult); + } + + static AggregationPipeline computePipeline(AbstractMongoQuery mongoQuery, MongoQueryMethod method, + ConvertingParameterAccessor accessor) { + return new AggregationPipeline(mongoQuery.parseAggregationPipeline(method.getAnnotatedAggregation(), accessor)); + } + /** * Append {@code $sort} aggregation stage if {@link ConvertingParameterAccessor#getSort()} is present. * @@ -139,7 +233,7 @@ static AggregationOptions.Builder applyReadPreference(AggregationOptions.Builder * @param targetType */ static void appendSortIfPresent(AggregationPipeline aggregationPipeline, ConvertingParameterAccessor accessor, - Class targetType) { + @Nullable Class targetType) { if (accessor.getSort().isUnsorted()) { return; @@ -254,4 +348,26 @@ private static T getPotentiallyConvertedSimpleTypeValue(MongoConverter conve return converter.getConversionService().convert(value, targetType); } + + /** + * Interface to invoke an aggregation along with source, intermediate, and target types. + * + * @param + */ + interface AggregationCallback { + + /** + * @param aggregation + * @param domainType + * @param typeToRead + * @param elementType + * @param simpleType whether the aggregation returns {@link Document} or a + * {@link org.springframework.data.mapping.model.SimpleTypeHolder simple type}. + * @param rawResult whether the aggregation returns {@link AggregationResults}. + * @return + */ + @Nullable + T doAggregate(TypedAggregation aggregation, Class domainType, Class typeToRead, Class elementType, + boolean simpleType, boolean rawResult); + } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ReactiveStringBasedAggregation.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ReactiveStringBasedAggregation.java index a74694d968..57c2c10832 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ReactiveStringBasedAggregation.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ReactiveStringBasedAggregation.java @@ -24,11 +24,8 @@ import org.reactivestreams.Publisher; import org.springframework.data.mongodb.core.ReactiveMongoOperations; -import org.springframework.data.mongodb.core.aggregation.Aggregation; import org.springframework.data.mongodb.core.aggregation.AggregationOperation; -import org.springframework.data.mongodb.core.aggregation.AggregationOptions; import org.springframework.data.mongodb.core.aggregation.AggregationPipeline; -import org.springframework.data.mongodb.core.aggregation.TypedAggregation; import org.springframework.data.mongodb.core.convert.MongoConverter; import org.springframework.data.mongodb.core.mapping.MongoSimpleTypes; import org.springframework.data.mongodb.core.query.Query; @@ -36,9 +33,8 @@ import org.springframework.data.repository.query.ResultProcessor; import org.springframework.data.repository.query.ValueExpressionDelegate; import org.springframework.data.util.ReflectionUtils; -import org.springframework.data.util.TypeInformation; import org.springframework.expression.ExpressionParser; -import org.springframework.util.ClassUtils; +import org.springframework.lang.Nullable; /** * A reactive {@link org.springframework.data.repository.query.RepositoryQuery} to use a plain JSON String to create an @@ -87,46 +83,39 @@ public ReactiveStringBasedAggregation(ReactiveMongoQueryMethod method, } @Override + @SuppressWarnings("ReactiveStreamsNullableInLambdaInTransform") protected Publisher doExecute(ReactiveMongoQueryMethod method, ResultProcessor processor, - ConvertingParameterAccessor accessor, Class typeToRead) { + ConvertingParameterAccessor accessor, @Nullable Class ignored) { return computePipeline(accessor).flatMapMany(it -> { - Class sourceType = method.getDomainClass(); - Class targetType = typeToRead; + return AggregationUtils.doAggregate(new AggregationPipeline(it), method, processor, accessor, + this::getValueExpressionEvaluator, + (aggregation, sourceType, typeToRead, elementType, simpleType, rawResult) -> { - AggregationPipeline pipeline = new AggregationPipeline(it); + Flux flux = reactiveMongoOperations.aggregate(aggregation, typeToRead); + if (ReflectionUtils.isVoid(elementType)) { + return flux.then(); + } - AggregationUtils.appendSortIfPresent(pipeline, accessor, typeToRead); - AggregationUtils.appendLimitAndOffsetIfPresent(pipeline, accessor); + ReactiveMongoQueryExecution.ResultProcessingConverter resultProcessing = getResultProcessing(processor); - boolean isSimpleReturnType = isSimpleReturnType(typeToRead); - boolean isRawReturnType = ClassUtils.isAssignable(org.bson.Document.class, typeToRead); + if (simpleType && !rawResult && !elementType.equals(Document.class)) { - if (isSimpleReturnType || isRawReturnType) { - targetType = Document.class; - } + flux = flux.handle((item, sink) -> { - AggregationOptions options = computeOptions(method, accessor, pipeline); - TypedAggregation aggregation = new TypedAggregation<>(sourceType, pipeline.getOperations(), options); + Object result = AggregationUtils.extractSimpleTypeResult((Document) item, elementType, mongoConverter); - Flux flux = reactiveMongoOperations.aggregate(aggregation, targetType); - if (ReflectionUtils.isVoid(typeToRead)) { - return flux.then(); - } + if (result != null) { + sink.next(result); + } + }); + } - if (isSimpleReturnType && !isRawReturnType) { - flux = flux.handle((item, sink) -> { + flux = flux.map(resultProcessing::convert); - Object result = AggregationUtils.extractSimpleTypeResult((Document) item, typeToRead, mongoConverter); - - if (result != null) { - sink.next(result); - } - }); - } - - return method.isCollectionQuery() ? flux : flux.next(); + return method.isCollectionQuery() ? flux : flux.next(); + }); }); } @@ -138,28 +127,6 @@ private Mono> computePipeline(ConvertingParameterAcce return parseAggregationPipeline(getQueryMethod().getAnnotatedAggregation(), accessor); } - private AggregationOptions computeOptions(MongoQueryMethod method, ConvertingParameterAccessor accessor, - AggregationPipeline pipeline) { - - AggregationOptions.Builder builder = Aggregation.newAggregationOptions(); - - AggregationUtils.applyCollation(builder, method.getAnnotatedCollation(), accessor, - getValueExpressionEvaluator(accessor)); - AggregationUtils.applyMeta(builder, method); - AggregationUtils.applyHint(builder, method); - AggregationUtils.applyReadPreference(builder, method); - - TypeInformation returnType = method.getReturnType(); - if (returnType.getComponentType() != null) { - returnType = returnType.getRequiredComponentType(); - } - if (ReflectionUtils.isVoid(returnType.getType()) && pipeline.isOutOrMerge()) { - builder.skipOutput(); - } - - return builder.build(); - } - @Override protected Mono createQuery(ConvertingParameterAccessor accessor) { throw new UnsupportedOperationException("No query support for aggregation"); diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/StringBasedAggregation.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/StringBasedAggregation.java index 1ffca4d85a..00fc79d4aa 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/StringBasedAggregation.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/StringBasedAggregation.java @@ -17,7 +17,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.function.LongUnaryOperator; import java.util.stream.Stream; import org.bson.Document; @@ -26,11 +25,7 @@ import org.springframework.data.domain.SliceImpl; import org.springframework.data.mongodb.InvalidMongoDbApiUsageException; import org.springframework.data.mongodb.core.MongoOperations; -import org.springframework.data.mongodb.core.aggregation.Aggregation; -import org.springframework.data.mongodb.core.aggregation.AggregationOptions; -import org.springframework.data.mongodb.core.aggregation.AggregationPipeline; import org.springframework.data.mongodb.core.aggregation.AggregationResults; -import org.springframework.data.mongodb.core.aggregation.TypedAggregation; import org.springframework.data.mongodb.core.convert.MongoConverter; import org.springframework.data.mongodb.core.mapping.MongoSimpleTypes; import org.springframework.data.mongodb.core.query.Query; @@ -41,7 +36,6 @@ import org.springframework.data.util.ReflectionUtils; import org.springframework.expression.ExpressionParser; import org.springframework.lang.Nullable; -import org.springframework.util.ClassUtils; /** * {@link AbstractMongoQuery} implementation to run string-based aggregations using @@ -103,85 +97,67 @@ public StringBasedAggregation(MongoQueryMethod method, MongoOperations mongoOper this.mongoConverter = mongoOperations.getConverter(); } + @SuppressWarnings("unchecked") @Override @Nullable - protected Object doExecute(MongoQueryMethod method, ResultProcessor resultProcessor, - ConvertingParameterAccessor accessor, Class typeToRead) { + protected Object doExecute(MongoQueryMethod method, ResultProcessor processor, ConvertingParameterAccessor accessor, + @Nullable Class ignore) { - Class sourceType = method.getDomainClass(); - Class targetType = typeToRead; + return AggregationUtils.doAggregate(AggregationUtils.computePipeline(this, method, accessor), method, processor, + accessor, this::getExpressionEvaluatorFor, + (aggregation, sourceType, typeToRead, elementType, simpleType, rawResult) -> { - AggregationPipeline pipeline = computePipeline(method, accessor); - AggregationUtils.appendSortIfPresent(pipeline, accessor, typeToRead); + if (method.isStreamQuery()) { - if (method.isSliceQuery()) { - AggregationUtils.appendLimitAndOffsetIfPresent(pipeline, accessor, LongUnaryOperator.identity(), - limit -> limit + 1); - } else { - AggregationUtils.appendLimitAndOffsetIfPresent(pipeline, accessor); - } - - boolean isSimpleReturnType = isSimpleReturnType(typeToRead); - boolean isRawAggregationResult = ClassUtils.isAssignable(AggregationResults.class, typeToRead); + Stream stream = mongoOperations.aggregateStream(aggregation, typeToRead); - if (isSimpleReturnType) { - targetType = Document.class; - } else if (isRawAggregationResult) { + if (!simpleType || elementType.equals(Document.class)) { + return stream; + } - // 🙈 - targetType = method.getReturnType().getRequiredActualType().getRequiredComponentType().getType(); - } + return stream + .map(it -> AggregationUtils.extractSimpleTypeResult((Document) it, elementType, mongoConverter)); + } - AggregationOptions options = computeOptions(method, accessor, pipeline); - TypedAggregation aggregation = new TypedAggregation<>(sourceType, pipeline.getOperations(), options); + AggregationResults result = (AggregationResults) mongoOperations.aggregate(aggregation, + typeToRead); - if (method.isStreamQuery()) { + if (ReflectionUtils.isVoid(elementType)) { + return null; + } - Stream stream = mongoOperations.aggregateStream(aggregation, targetType); + if (rawResult) { + return result; + } - if (isSimpleReturnType) { - return stream.map(it -> AggregationUtils.extractSimpleTypeResult((Document) it, typeToRead, mongoConverter)); - } + List results = result.getMappedResults(); + if (method.isCollectionQuery()) { + return simpleType ? convertResults(elementType, (List) results) : results; + } - return stream; - } + if (method.isSliceQuery()) { - AggregationResults result = (AggregationResults) mongoOperations.aggregate(aggregation, targetType); - if (ReflectionUtils.isVoid(typeToRead)) { - return null; - } + Pageable pageable = accessor.getPageable(); + int pageSize = pageable.getPageSize(); + List resultsToUse = simpleType ? convertResults(elementType, (List) results) + : (List) results; + boolean hasNext = resultsToUse.size() > pageSize; + return new SliceImpl<>(hasNext ? resultsToUse.subList(0, pageSize) : resultsToUse, pageable, hasNext); + } - if (isRawAggregationResult) { - return result; - } - - List results = result.getMappedResults(); - if (method.isCollectionQuery()) { - return isSimpleReturnType ? convertResults(typeToRead, results) : results; - } - - if (method.isSliceQuery()) { - - Pageable pageable = accessor.getPageable(); - int pageSize = pageable.getPageSize(); - List resultsToUse = isSimpleReturnType ? convertResults(typeToRead, results) : results; - boolean hasNext = resultsToUse.size() > pageSize; - return new SliceImpl<>(hasNext ? resultsToUse.subList(0, pageSize) : resultsToUse, pageable, hasNext); - } + Object uniqueResult = result.getUniqueMappedResult(); - Object uniqueResult = result.getUniqueMappedResult(); - - return isSimpleReturnType - ? AggregationUtils.extractSimpleTypeResult((Document) uniqueResult, typeToRead, mongoConverter) - : uniqueResult; + return simpleType + ? AggregationUtils.extractSimpleTypeResult((Document) uniqueResult, elementType, mongoConverter) + : uniqueResult; + }); } - private List convertResults(Class typeToRead, List mappedResults) { + private List convertResults(Class targetType, List mappedResults) { List list = new ArrayList<>(mappedResults.size()); - for (Object it : mappedResults) { - Object extractSimpleTypeResult = AggregationUtils.extractSimpleTypeResult((Document) it, typeToRead, - mongoConverter); + for (Document it : mappedResults) { + Object extractSimpleTypeResult = AggregationUtils.extractSimpleTypeResult(it, targetType, mongoConverter); list.add(extractSimpleTypeResult); } return list; @@ -191,28 +167,6 @@ private boolean isSimpleReturnType(Class targetType) { return MongoSimpleTypes.HOLDER.isSimpleType(targetType); } - AggregationPipeline computePipeline(MongoQueryMethod method, ConvertingParameterAccessor accessor) { - return new AggregationPipeline(parseAggregationPipeline(method.getAnnotatedAggregation(), accessor)); - } - - private AggregationOptions computeOptions(MongoQueryMethod method, ConvertingParameterAccessor accessor, - AggregationPipeline pipeline) { - - AggregationOptions.Builder builder = Aggregation.newAggregationOptions(); - - AggregationUtils.applyCollation(builder, method.getAnnotatedCollation(), accessor, - getExpressionEvaluatorFor(accessor)); - AggregationUtils.applyMeta(builder, method); - AggregationUtils.applyHint(builder, method); - AggregationUtils.applyReadPreference(builder, method); - - if (ReflectionUtils.isVoid(method.getReturnType().getType()) && pipeline.isOutOrMerge()) { - builder.skipOutput(); - } - - return builder.build(); - } - @Override protected Query createQuery(ConvertingParameterAccessor accessor) { throw new UnsupportedOperationException("No query support for aggregation"); diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/AbstractPersonRepositoryIntegrationTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/AbstractPersonRepositoryIntegrationTests.java index 33720d2557..247caada04 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/AbstractPersonRepositoryIntegrationTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/AbstractPersonRepositoryIntegrationTests.java @@ -1396,6 +1396,18 @@ void findStreamOfSingleValue() { } } + @Test // DATAMONGO-4841 + void annotatedAggregationStreamWithPlaceholderValue() { + + assertThat(repository.groupStreamByLastnameAnd("firstname")) + .contains(new PersonAggregate("Lessard", Collections.singletonList("Stefan"))) // + .contains(new PersonAggregate("Keys", Collections.singletonList("Alicia"))) // + .contains(new PersonAggregate("Tinsley", Collections.singletonList("Boyd"))) // + .contains(new PersonAggregate("Beauford", Collections.singletonList("Carter"))) // + .contains(new PersonAggregate("Moore", Collections.singletonList("Leroi"))) // + .contains(new PersonAggregate("Matthews", Arrays.asList("Dave", "Oliver August"))); + } + @Test // DATAMONGO-2153 void annotatedAggregationWithPlaceholderValue() { @@ -1459,6 +1471,15 @@ void annotatedAggregationWithAggregationResultAsReturnTypeAndProjection() { .containsExactly(new SumAge(245L)); } + @Test // GH-4839 + void annotatedAggregationWithAggregationResultAsClosedInterfaceProjection() { + + assertThat(repository.findAggregatedClosedInterfaceProjectionBy()).allSatisfy(it -> { + assertThat(it.getFirstname()).isIn(dave.getFirstname(), oliver.getFirstname()); + assertThat(it.getLastname()).isEqualTo(dave.getLastname()); + }); + } + @Test // DATAMONGO-2374 void findsWithNativeProjection() { diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java index c2036cabf9..0f731535ed 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java @@ -27,10 +27,10 @@ import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Range; -import org.springframework.data.domain.Window; import org.springframework.data.domain.ScrollPosition; import org.springframework.data.domain.Slice; import org.springframework.data.domain.Sort; +import org.springframework.data.domain.Window; import org.springframework.data.geo.Box; import org.springframework.data.geo.Circle; import org.springframework.data.geo.Distance; @@ -413,6 +413,9 @@ Page findByCustomQueryLastnameAndAddressStreetInList(String lastname, Li @Aggregation("{ '$project': { '_id' : '$lastname' } }") Stream findAllLastnamesAsStream(); + @Aggregation("{ '$group': { '_id' : '$lastname', names : { $addToSet : '$?0' } } }") + Stream groupStreamByLastnameAnd(String property); + @Aggregation("{ '$group': { '_id' : '$lastname', names : { $addToSet : '$?0' } } }") List groupByLastnameAnd(String property); @@ -434,6 +437,12 @@ Page findByCustomQueryLastnameAndAddressStreetInList(String lastname, Li @Aggregation(pipeline = "{ '$group' : { '_id' : null, 'total' : { $sum: '$age' } } }") AggregationResults sumAgeAndReturnAggregationResultWrapperWithConcreteType(); + @Aggregation({ + "{ '$match' : { 'lastname' : 'Matthews'} }", + "{ '$project': { _id : 0, firstname : 1, lastname : 1 } }" + }) + Iterable findAggregatedClosedInterfaceProjectionBy(); + @Query(value = "{_id:?0}") Optional findDocumentById(String id); diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactiveMongoRepositoryTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactiveMongoRepositoryTests.java index fc916b49d2..e692d8627e 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactiveMongoRepositoryTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactiveMongoRepositoryTests.java @@ -600,6 +600,17 @@ void annotatedAggregationWithAggregationResultAsMap() { }).verifyComplete(); } + @Test // GH-4839 + void annotatedAggregationWithAggregationResultAsClosedInterfaceProjection() { + + repository.findAggregatedClosedInterfaceProjectionBy() // + .as(StepVerifier::create) // + .consumeNextWith(it -> { + assertThat(it.getFirstname()).isIn(dave.getFirstname(), oliver.getFirstname()); + assertThat(it.getLastname()).isEqualTo(dave.getLastname()); + }).expectNextCount(1).verifyComplete(); + } + @Test // DATAMONGO-2403 @DirtiesState void annotatedAggregationExtractingSimpleValueIsEmptyForEmptyDocument() { @@ -816,6 +827,10 @@ Mono> findTop2ByLastnameLikeOrderByFirstnameAscLastnameAsc(String @Aggregation(pipeline = "{ '$group' : { '_id' : null, 'total' : { $sum: '$age' } } }") Mono sumAgeAndReturnSumAsMap(); + @Aggregation({ "{ '$match' : { 'lastname' : 'Matthews'} }", + "{ '$project': { _id : 0, firstname : 1, lastname : 1 } }" }) + Flux findAggregatedClosedInterfaceProjectionBy(); + @Aggregation( pipeline = { "{ '$match' : { 'firstname' : '?0' } }", "{ '$project' : { '_id' : 0, 'lastname' : 1 } }" }) Mono projectToLastnameAndRemoveId(String firstname); diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactivePersonRepository.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactivePersonRepository.java index 126a168b0a..45c5f009b7 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactivePersonRepository.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactivePersonRepository.java @@ -32,4 +32,5 @@ public interface ReactivePersonRepository extends ReactiveMongoRepository findByLastname(String lastname); + } diff --git a/src/main/antora/modules/ROOT/pages/mongodb/repositories/query-methods.adoc b/src/main/antora/modules/ROOT/pages/mongodb/repositories/query-methods.adoc index fa93311360..adb2392f04 100644 --- a/src/main/antora/modules/ROOT/pages/mongodb/repositories/query-methods.adoc +++ b/src/main/antora/modules/ROOT/pages/mongodb/repositories/query-methods.adoc @@ -571,23 +571,29 @@ public interface PersonRepository extends CrudRepository { @Aggregation("{ $group: { _id : $lastname, names : { $addToSet : $firstname } } }") Stream groupByLastnameAndFirstnamesAsStream(); <5> + @Aggregation(pipeline = { + "{ '$match' : { 'lastname' : '?0'} }", + "{ '$project': { _id : 0, firstname : 1, lastname : 1 } }" + }) + Stream groupByLastnameAndFirstnamesAsStream(); <6> + @Aggregation("{ $group : { _id : null, total : { $sum : $age } } }") - SumValue sumAgeUsingValueWrapper(); <6> + SumValue sumAgeUsingValueWrapper(); <7> @Aggregation("{ $group : { _id : null, total : { $sum : $age } } }") - Long sumAge(); <7> + Long sumAge(); <8> @Aggregation("{ $group : { _id : null, total : { $sum : $age } } }") - AggregationResults sumAgeRaw(); <8> + AggregationResults sumAgeRaw(); <9> @Aggregation("{ '$project': { '_id' : '$lastname' } }") - List findAllLastnames(); <9> + List findAllLastnames(); <10> @Aggregation(pipeline = { "{ $group : { _id : '$author', books: { $push: '$title' } } }", "{ $out : 'authors' }" }) - void groupAndOutSkippingOutput(); <10> + void groupAndOutSkippingOutput(); <11> } ---- [source,java] @@ -614,19 +620,25 @@ public class SumValue { // Getter omitted } + +interface PersonProjection { + String getFirstname(); + String getLastname(); +} ---- <1> Aggregation pipeline to group first names by `lastname` in the `Person` collection returning these as `PersonAggregate`. <2> If `Sort` argument is present, `$sort` is appended after the declared pipeline stages so that it only affects the order of the final results after having passed all other aggregation stages. Therefore, the `Sort` properties are mapped against the methods return type `PersonAggregate` which turns `Sort.by("lastname")` into `{ $sort : { '_id', 1 } }` because `PersonAggregate.lastname` is annotated with `@Id`. <3> Replaces `?0` with the given value for `property` for a dynamic aggregation pipeline. <4> `$skip`, `$limit` and `$sort` can be passed on via a `Pageable` argument. Same as in <2>, the operators are appended to the pipeline definition. Methods accepting `Pageable` can return `Slice` for easier pagination. -<5> Aggregation methods can return `Stream` to consume results directly from an underlying cursor. Make sure to close the stream after consuming it to release the server-side cursor by either calling `close()` or through `try-with-resources`. -<6> Map the result of an aggregation returning a single `Document` to an instance of a desired `SumValue` target type. -<7> Aggregations resulting in single document holding just an accumulation result like e.g. `$sum` can be extracted directly from the result `Document`. +<5> Aggregation methods can return interface based projections wrapping the resulting `org.bson.Document` behind a proxy, exposing getters delegating to fields within the document. +<6> Aggregation methods can return `Stream` to consume results directly from an underlying cursor. Make sure to close the stream after consuming it to release the server-side cursor by either calling `close()` or through `try-with-resources`. +<7> Map the result of an aggregation returning a single `Document` to an instance of a desired `SumValue` target type. +<8> Aggregations resulting in single document holding just an accumulation result like e.g. `$sum` can be extracted directly from the result `Document`. To gain more control, you might consider `AggregationResult` as method return type as shown in <7>. -<8> Obtain the raw `AggregationResults` mapped to the generic target wrapper type `SumValue` or `org.bson.Document`. -<9> Like in <6>, a single value can be directly obtained from multiple result ``Document``s. -<10> Skips the output of the `$out` stage when return type is `void`. +<9> Obtain the raw `AggregationResults` mapped to the generic target wrapper type `SumValue` or `org.bson.Document`. +<10> Like in <6>, a single value can be directly obtained from multiple result ``Document``s. +<11> Skips the output of the `$out` stage when return type is `void`. ==== In some scenarios, aggregations might require additional options, such as a maximum run time, additional log comments, or the permission to temporarily write data to disk.