Skip to content

Commit

Permalink
Merge pull request eclipse-ditto#2050 from beyonnex-io/bugfix/aggrega…
Browse files Browse the repository at this point in the history
…tion-metrics-with-non-zero

fix that only aggregated metrics for filters which actually matched >0 things are reported
  • Loading branch information
thjaeckle authored Oct 25, 2024
2 parents d7724b3 + a468d98 commit 275219c
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.thingsearch.model.signals.commands.exceptions;

import java.net.URI;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
import javax.annotation.concurrent.NotThreadSafe;

import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.json.JsonParsableException;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.thingsearch.model.ThingSearchException;

/**
* Thrown if during a custom aggregation metrics gathering multiple "filters" matched at the same time whereas only
* one filter is allowed to match.
*
* @since 3.6.2
*/
@Immutable
@JsonParsableException(errorCode = MultipleAggregationFilterMatchingException.ERROR_CODE)
public class MultipleAggregationFilterMatchingException extends DittoRuntimeException implements ThingSearchException {

/**
* Error code of this exception.
*/
public static final String ERROR_CODE = ERROR_CODE_PREFIX + "multiple.aggregation.filter.matching";

static final String DEFAULT_DESCRIPTION = "Ensure that only one defined 'filter' can match at the same time.";

static final HttpStatus HTTP_STATUS = HttpStatus.BAD_REQUEST;

private static final long serialVersionUID = -6341839112047194476L;

private MultipleAggregationFilterMatchingException(final DittoHeaders dittoHeaders,
@Nullable final String message,
@Nullable final String description,
@Nullable final Throwable cause,
@Nullable final URI href) {

super(ERROR_CODE, HTTP_STATUS, dittoHeaders, message, description, cause, href);
}

/**
* A mutable builder for a {@code MultipleAggregationFilterMatchingException}.
*
* @return the builder.
*/
public static Builder newBuilder() {
return new Builder();
}

/**
* Constructs a new {@code MultipleAggregationFilterMatchingException} object with the exception message extracted from the
* given JSON object.
*
* @param jsonObject the JSON to read the {@link DittoRuntimeException.JsonFields#MESSAGE} field from.
* @param dittoHeaders the headers of the command which resulted in this exception.
* @return the new MultipleAggregationFilterMatchingException.
* @throws NullPointerException if any argument is {@code null}.
* @throws org.eclipse.ditto.json.JsonMissingFieldException if this JsonObject did not contain an error message.
* @throws org.eclipse.ditto.json.JsonParseException if the passed in {@code jsonObject} was not in the expected
* format.
*/
public static MultipleAggregationFilterMatchingException fromJson(final JsonObject jsonObject,
final DittoHeaders dittoHeaders) {
return DittoRuntimeException.fromJson(jsonObject, dittoHeaders, new Builder());
}

@Override
public DittoRuntimeException setDittoHeaders(final DittoHeaders dittoHeaders) {
return new Builder()
.message(getMessage())
.description(getDescription().orElse(null))
.cause(getCause())
.href(getHref().orElse(null))
.dittoHeaders(dittoHeaders)
.build();
}

/**
* A mutable builder with a fluent API for a {@link org.eclipse.ditto.thingsearch.model.signals.commands.exceptions.MultipleAggregationFilterMatchingException}.
*/
@NotThreadSafe
public static final class Builder extends DittoRuntimeExceptionBuilder<MultipleAggregationFilterMatchingException> {

private Builder() {
description(DEFAULT_DESCRIPTION);
}

@Override
protected MultipleAggregationFilterMatchingException doBuild(final DittoHeaders dittoHeaders,
@Nullable final String message,
@Nullable final String description,
@Nullable final Throwable cause,
@Nullable final URI href) {

return new MultipleAggregationFilterMatchingException(dittoHeaders, message, description, cause, href);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
package org.eclipse.ditto.thingsearch.model.signals.commands.query;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand All @@ -38,6 +41,7 @@
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.thingsearch.model.signals.commands.exceptions.MultipleAggregationFilterMatchingException;

/**
* A response to an {@link AggregateThingsMetrics} command.
Expand Down Expand Up @@ -197,11 +201,13 @@ public Map<String, String> getGroupedBy() {
}

/**
* Returns the values for each filter defined in the metric
* Returns the values for the single matched filter defined in the metric.
*
* @return the result of the aggregation.
* @return the result of the aggregation, a single filter name with its count or an empty optional if no filter
* provided a count greater 0.
* @throws MultipleAggregationFilterMatchingException in case multiple filters matched at the same time
*/
public Map<String, Long> getResult() {
public Optional<Map.Entry<String, Long>> getResult() {
return extractFiltersResults(aggregation, filterNames);
}

Expand Down Expand Up @@ -233,19 +239,38 @@ public int hashCode() {

@Override
public String toString() {
return "AggregateThingsMetricsResponse{" +
return getClass().getSimpleName() + "[" +
"metricName='" + metricName + '\'' +
", dittoHeaders=" + dittoHeaders +
", filterNames=" + filterNames +
", aggregation=" + aggregation +
'}';
"]";
}

private Map<String, Long> extractFiltersResults(final JsonObject aggregation, final Set<String> filterNames) {
return filterNames.stream().filter(aggregation::contains).collect(Collectors.toMap(key ->
key, key -> aggregation.getValue(JsonPointer.of(key))
.orElseThrow(getJsonMissingFieldExceptionSupplier(key))
.asLong()));
private Optional<Map.Entry<String, Long>> extractFiltersResults(final JsonObject aggregation,
final Set<String> filterNames) {
final Map<String, Long> filterValues = filterNames.stream()
.filter(aggregation::contains)
.collect(
Collectors.toMap(Function.identity(),
key -> aggregation.getValue(JsonPointer.of(key))
.orElseThrow(getJsonMissingFieldExceptionSupplier(key))
.asLong()
)
);
final List<Map.Entry<String, Long>> filtersWithValueAboveZero = filterValues.entrySet()
.stream()
.filter(filterWithValue -> filterWithValue.getValue() > 0)
.collect(Collectors.toList());

if (filtersWithValueAboveZero.size() > 1) {
throw MultipleAggregationFilterMatchingException.newBuilder()
.message("Multiple filters matched: " + filtersWithValueAboveZero)
.dittoHeaders(dittoHeaders)
.build();
} else {
return filtersWithValueAboveZero.stream().findAny();
}
}

private static Supplier<RuntimeException> getJsonMissingFieldExceptionSupplier(final String field) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private <T> Source<T, NotUsed> processAggregationPersistenceResult(final Source<
final Flow<T, T, NotUsed> logAndFinishPersistenceSegmentFlow =
Flow.fromFunction(result -> {
log.withCorrelationId(dittoHeaders)
.info("aggregation element: {}", result);
.debug("aggregation element: {}", result);
return result;
});
return source.via(logAndFinishPersistenceSegmentFlow);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.eclipse.ditto.placeholders.ExpressionResolver;
import org.eclipse.ditto.placeholders.PlaceholderFactory;
import org.eclipse.ditto.placeholders.PlaceholderResolver;
import org.eclipse.ditto.thingsearch.model.signals.commands.exceptions.MultipleAggregationFilterMatchingException;
import org.eclipse.ditto.thingsearch.model.signals.commands.query.AggregateThingsMetrics;
import org.eclipse.ditto.thingsearch.model.signals.commands.query.AggregateThingsMetricsResponse;
import org.eclipse.ditto.thingsearch.service.common.config.CustomAggregationMetricConfig;
Expand Down Expand Up @@ -154,12 +155,24 @@ private void handleGatheringMetrics(final GatherMetricsCommand gatherMetricsComm


private void handleAggregateThingsResponse(final AggregateThingsMetricsResponse response) {
log.withCorrelationId(response).debug("Received aggregate things response: {} thread: {}",
response, Thread.currentThread().getName());
final String metricName = response.getMetricName();
// record by filter name and tags
response.getResult().forEach((filterName, value) -> {
resolveTags(filterName, customSearchMetricConfigMap.get(metricName), response);
final Optional<Map.Entry<String, Long>> result;
try {
result = response.getResult();
log.withCorrelationId(response)
.debug("Received aggregate things response for metric name <{}>: {}, " +
"extracted result: <{}> - in thread: {}",
metricName, response, result, Thread.currentThread().getName());
} catch (final MultipleAggregationFilterMatchingException e) {
log.withCorrelationId(response)
.warning("Could not gather metrics for metric name <{}> from aggregate " +
"things response: {} as multiple filters were matching at the same time: <{}>",
metricName, response, e.getMessage());
return;
}
result.ifPresent(entry -> {
final String filterName = entry.getKey();
final Long value = entry.getValue();
final CustomAggregationMetricConfig customAggregationMetricConfig =
customSearchMetricConfigMap.get(metricName);
final TagSet tagSet = resolveTags(filterName, customAggregationMetricConfig, response);
Expand Down

0 comments on commit 275219c

Please sign in to comment.