Skip to content

Commit

Permalink
Rebased to latest
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Jun 13, 2024
1 parent af7d1b5 commit 691e7e5
Show file tree
Hide file tree
Showing 7 changed files with 344 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher;

import java.math.BigDecimal;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -147,6 +148,23 @@ public static long getTimeNanos(final Instant time) {
return currentTimeNanos;
}

public static Instant convertObjectToInstant(Object timeObject) {
if (timeObject instanceof Instant) {
return (Instant)timeObject;
} else if (timeObject instanceof String) {
return Instant.parse((String)timeObject);
} else if (timeObject instanceof Integer || timeObject instanceof Long) {
long value = ((Number)timeObject).longValue();
return (value > 1E10) ? Instant.ofEpochMilli(value) : Instant.ofEpochSecond(value);
} else if (timeObject instanceof Double || timeObject instanceof Float || timeObject instanceof BigDecimal) {
double value = ((Number)timeObject).doubleValue();
long seconds = (long) value;
long nanos = (long) ((value - seconds) * 1_000_000_000);
return Instant.ofEpochSecond(seconds, nanos);
} else {
throw new RuntimeException("Invalid format for time "+timeObject);
}
}

@Override
public void prepareForShutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionInput;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionOutput;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionResponse;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateProcessor;
import static org.opensearch.dataprepper.plugins.processor.aggregate.AggregateProcessor.getTimeNanos;
import org.opensearch.dataprepper.plugins.processor.aggregate.GroupState;
import io.opentelemetry.proto.metrics.v1.AggregationTemporality;

Expand All @@ -44,22 +46,18 @@ public class CountAggregateAction implements AggregateAction {
static final boolean SUM_METRIC_IS_MONOTONIC = true;
public final String countKey;
public final String startTimeKey;
public final String endTimeKey;
public final String outputFormat;
private long startTimeNanos;

@DataPrepperPluginConstructor
public CountAggregateAction(final CountAggregateActionConfig countAggregateActionConfig) {
this.countKey = countAggregateActionConfig.getCountKey();
this.startTimeKey = countAggregateActionConfig.getStartTimeKey();
this.endTimeKey = countAggregateActionConfig.getEndTimeKey();
this.outputFormat = countAggregateActionConfig.getOutputFormat();
}

private long getTimeNanos(Instant time) {
final long NANO_MULTIPLIER = 1_000 * 1_000 * 1_000;
long currentTimeNanos = time.getEpochSecond() * NANO_MULTIPLIER + time.getNano();
return currentTimeNanos;
}

public Exemplar createExemplar(final Event event) {
long curTimeNanos = getTimeNanos(Instant.now());
Map<String, Object> attributes = event.toMap();
Expand All @@ -81,15 +79,33 @@ public Exemplar createExemplar(final Event event) {
@Override
public AggregateActionResponse handleEvent(final Event event, final AggregateActionInput aggregateActionInput) {
final GroupState groupState = aggregateActionInput.getGroupState();
Instant eventStartTime = Instant.now();
Instant eventEndTime = eventStartTime;
Object startTime = event.get(startTimeKey, Object.class);
Object endTime = event.get(endTimeKey, Object.class);

if (startTime != null) {
eventStartTime = AggregateProcessor.convertObjectToInstant(startTime);
}
if (endTime != null) {
eventEndTime = AggregateProcessor.convertObjectToInstant(endTime);
}
if (groupState.get(countKey) == null) {
groupState.put(startTimeKey, Instant.now());
groupState.putAll(aggregateActionInput.getIdentificationKeys());
groupState.put(countKey, 1);
groupState.put(exemplarKey, createExemplar(event));
groupState.put(startTimeKey, eventStartTime);
groupState.put(endTimeKey, eventEndTime);
} else {
Integer v = (Integer)groupState.get(countKey) + 1;
groupState.put(countKey, v);
}
Instant groupStartTime = (Instant)groupState.get(startTimeKey);
Instant groupEndTime = (Instant)groupState.get(endTimeKey);
if (eventStartTime.isBefore(groupStartTime))
groupState.put(startTimeKey, eventStartTime);
if (eventEndTime.isAfter(groupEndTime))
groupState.put(endTimeKey, eventEndTime);
}
return AggregateActionResponse.nullEventResponse();
}

Expand All @@ -98,6 +114,8 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
GroupState groupState = aggregateActionInput.getGroupState();
Event event;
Instant startTime = (Instant)groupState.get(startTimeKey);
Instant endTime = (Instant)groupState.get(endTimeKey);
groupState.remove(endTimeKey);
if (outputFormat.equals(OutputFormat.RAW.toString())) {
groupState.put(startTimeKey, startTime.atZone(ZoneId.of(ZoneId.systemDefault().toString())).format(DateTimeFormatter.ofPattern(DATE_FORMAT)));
event = JacksonEvent.builder()
Expand All @@ -110,14 +128,14 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
groupState.remove(exemplarKey);
groupState.remove(countKey);
groupState.remove(startTimeKey);
long currentTimeNanos = getTimeNanos(Instant.now());
long endTimeNanos = getTimeNanos(endTime);
long startTimeNanos = getTimeNanos(startTime);
Map<String, Object> attr = new HashMap<String, Object>();
groupState.forEach((k, v) -> attr.put((String)k, v));
JacksonSum sum = JacksonSum.builder()
.withName(SUM_METRIC_NAME)
.withDescription(SUM_METRIC_DESCRIPTION)
.withTime(OTelProtoCodec.convertUnixNanosToISO8601(currentTimeNanos))
.withTime(OTelProtoCodec.convertUnixNanosToISO8601(endTimeNanos))
.withStartTime(OTelProtoCodec.convertUnixNanosToISO8601(startTimeNanos))
.withIsMonotonic(SUM_METRIC_IS_MONOTONIC)
.withUnit(SUM_METRIC_UNIT)
Expand All @@ -128,7 +146,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
.build(false);
event = (Event)sum;
}

return new AggregateActionOutput(List.of(event));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
public class CountAggregateActionConfig {
public static final String DEFAULT_COUNT_KEY = "aggr._count";
public static final String DEFAULT_START_TIME_KEY = "aggr._start_time";
public static final String DEFAULT_END_TIME_KEY = "aggr._end_time";
public static final Set<String> validOutputFormats = new HashSet<>(Set.of(OutputFormat.OTEL_METRICS.toString(), OutputFormat.RAW.toString()));

@JsonProperty("count_key")
Expand All @@ -20,13 +21,20 @@ public class CountAggregateActionConfig {
@JsonProperty("start_time_key")
String startTimeKey = DEFAULT_START_TIME_KEY;

@JsonProperty("end_time_key")
String endTimeKey = DEFAULT_END_TIME_KEY;

@JsonProperty("output_format")
String outputFormat = OutputFormat.OTEL_METRICS.toString();

public String getCountKey() {
return countKey;
}

public String getEndTimeKey() {
return endTimeKey;
}

public String getStartTimeKey() {
return startTimeKey;
}
Expand All @@ -37,4 +45,4 @@ public String getOutputFormat() {
}
return outputFormat;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateAction;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionInput;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateProcessor;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionOutput;
import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionResponse;
import org.opensearch.dataprepper.plugins.processor.aggregate.GroupState;
Expand All @@ -35,8 +36,8 @@
import java.util.ArrayList;

/**
* An AggregateAction that combines multiple Events into a single Event. This action will create a combined event with histogram buckets of the values
* of specified list of keys from the groupState on concludeGroup.
* An AggregateAction that combines multiple Events into a single Event. This action will create a combined event with histogram buckets of the values
* of specified list of keys from the groupState on concludeGroup.
* @since 2.1
*/
@DataPrepperPlugin(name = "histogram", pluginType = AggregateAction.class, pluginConfigurationType = HistogramAggregateActionConfig.class)
Expand Down Expand Up @@ -137,16 +138,29 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct
return AggregateActionResponse.nullEventResponse();
}
double doubleValue = convertToDouble(value);

int idx = Arrays.binarySearch(this.buckets, doubleValue);
if (idx < 0) {
idx = -idx-2;
}
Instant eventTime = Instant.now();
Instant eventStartTime = eventTime;
Instant eventEndTime = eventTime;
Object startTime = event.get(startTimeKey, Object.class);
Object endTime = event.get(endTimeKey, Object.class);
if (startTime != null) {
eventStartTime = AggregateProcessor.convertObjectToInstant(startTime);
}
if (endTime != null) {
eventEndTime = AggregateProcessor.convertObjectToInstant(endTime);
}
if (groupState.get(bucketCountsKey) == null) {
groupState.put(startTimeKey, eventStartTime);
groupState.put(endTimeKey, eventEndTime);
Long[] bucketCountsList = new Long[buckets.length-1];
Arrays.fill(bucketCountsList, (long)0);
bucketCountsList[idx]++;
groupState.put(startTimeKey, Instant.now());
groupState.put(startTimeKey, eventTime);
groupState.putAll(aggregateActionInput.getIdentificationKeys());
groupState.put(sumKey, doubleValue);
groupState.put(countKey, 1);
Expand Down Expand Up @@ -180,9 +194,13 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct
maxValue = doubleValue;
}
}
}
// Keep over-writing endTime to get the last time a record of this group received
groupState.put(endTimeKey, Instant.now());
Instant groupStartTime = (Instant)groupState.get(startTimeKey);
Instant groupEndTime = (Instant)groupState.get(endTimeKey);
if (eventStartTime.isBefore(groupStartTime))
groupState.put(startTimeKey, eventStartTime);
if (eventEndTime.isAfter(groupEndTime))
groupState.put(endTimeKey, eventEndTime);
}
return AggregateActionResponse.nullEventResponse();
}

Expand Down Expand Up @@ -247,7 +265,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
.build(false);
event = (Event)histogram;
}

return new AggregateActionOutput(List.of(event));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.aggregate;

import org.junit.jupiter.api.Test;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.time.Instant;
import java.time.Duration;

public class AggregateProcessorStaticFunctionsTest {
@Test
public void testConvertObjectToInstant() {
Instant now = Instant.now();
assertThat(AggregateProcessor.convertObjectToInstant(now), equalTo(now));
String nowStr = now.toString();
long nowSeconds = now.getEpochSecond();
long nowMillis = now.toEpochMilli();
int nowNanos = now.getNano();
double nowDouble = nowSeconds+(double)nowNanos/1000_000_000;
assertThat(AggregateProcessor.convertObjectToInstant(nowStr), equalTo(now));
assertThat(AggregateProcessor.convertObjectToInstant(nowSeconds), equalTo(Instant.ofEpochSecond(nowSeconds)));
assertThat(AggregateProcessor.convertObjectToInstant(nowMillis), equalTo(Instant.ofEpochMilli(nowMillis)));
Duration tolerance = Duration.ofNanos(1000);
assertTrue((Duration.between(AggregateProcessor.convertObjectToInstant(nowDouble), Instant.ofEpochSecond(nowSeconds, nowNanos))).abs().compareTo(tolerance) <= 0);
}

@Test
public void testGetTimeNanos() {
Instant now = Instant.now();
assertThat(AggregateProcessor.getTimeNanos(now) / 1000_000_000, equalTo(now.getEpochSecond()));
assertThat(AggregateProcessor.getTimeNanos(now) % 1000_000_000, equalTo((long)now.getNano()));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@
import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.time.Instant;

import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -139,4 +143,88 @@ void testCountAggregateOTelFormat(int testCount) {
assertThat(attributes.get(key2), equalTo(value2));
assertTrue(attributes.containsKey(dataKey2));
}

@ParameterizedTest
@ValueSource(ints = {1, 2, 10, 100})
void testCountAggregateOTelFormatWithStartAndEndTimesInTheEvent(int testCount) {
CountAggregateActionConfig mockConfig = mock(CountAggregateActionConfig.class);
when(mockConfig.getCountKey()).thenReturn(CountAggregateActionConfig.DEFAULT_COUNT_KEY);
String startTimeKey = UUID.randomUUID().toString();
String endTimeKey = UUID.randomUUID().toString();
when(mockConfig.getStartTimeKey()).thenReturn(startTimeKey);
when(mockConfig.getEndTimeKey()).thenReturn(endTimeKey);
when(mockConfig.getOutputFormat()).thenReturn(OutputFormat.OTEL_METRICS.toString());
countAggregateAction = createObjectUnderTest(mockConfig);
final String key1 = "key-"+UUID.randomUUID().toString();
final String value1 = UUID.randomUUID().toString();
final String dataKey1 = "datakey-"+UUID.randomUUID().toString();
final String key2 = "key-"+UUID.randomUUID().toString();
final String value2 = UUID.randomUUID().toString();
final String dataKey2 = "datakey-"+UUID.randomUUID().toString();
final Instant testTime = Instant.ofEpochSecond(Instant.now().getEpochSecond());
Map<Object, Object> eventMap = Collections.singletonMap(key1, value1);
Event testEvent = JacksonEvent.builder()
.withEventType("event")
.withData(eventMap)
.build();
Map<Object, Object> eventMap2 = Collections.singletonMap(key2, value2);
JacksonEvent testEvent2 = JacksonEvent.builder()
.withEventType("event")
.withData(eventMap2)
.build();
AggregateActionInput aggregateActionInput = new AggregateActionTestUtils.TestAggregateActionInput(eventMap);
AggregateActionInput aggregateActionInput2 = new AggregateActionTestUtils.TestAggregateActionInput(eventMap2);
Random random = new Random();
for (int i = 0; i < testCount; i++) {
testEvent.put(dataKey1, UUID.randomUUID().toString());
Instant sTime = (i == 0) ? testTime : testTime.plusSeconds(random.nextInt(5));
Instant eTime = (i == testCount-1) ? testTime.plusSeconds(100) : testTime.plusSeconds (50+random.nextInt(45));
testEvent.put(startTimeKey, sTime);
testEvent.put(endTimeKey, eTime);
testEvent2.put(dataKey2, UUID.randomUUID().toString());
testEvent2.put(startTimeKey, sTime.toString());
testEvent2.put(endTimeKey, eTime.toString());
AggregateActionResponse aggregateActionResponse = countAggregateAction.handleEvent(testEvent, aggregateActionInput);
assertThat(aggregateActionResponse.getEvent(), equalTo(null));
aggregateActionResponse = countAggregateAction.handleEvent(testEvent2, aggregateActionInput2);
assertThat(aggregateActionResponse.getEvent(), equalTo(null));
}

AggregateActionOutput actionOutput = countAggregateAction.concludeGroup(aggregateActionInput);
final List<Event> result = actionOutput.getEvents();
assertThat(result.size(), equalTo(1));
Map<String, Object> expectedEventMap = new HashMap<>();
expectedEventMap.put("value", (double)testCount);
expectedEventMap.put("name", "count");
expectedEventMap.put("description", "Number of events");
expectedEventMap.put("isMonotonic", true);
expectedEventMap.put("aggregationTemporality", "AGGREGATION_TEMPORALITY_DELTA");
expectedEventMap.put("unit", "1");
expectedEventMap.forEach((k, v) -> assertThat(result.get(0).toMap(), hasEntry(k,v)));
assertThat(result.get(0).toMap().get("attributes"), equalTo(eventMap));
JacksonMetric metric = (JacksonMetric) result.get(0);
assertThat(metric.toJsonString().indexOf("attributes"), not(-1));
assertThat(result.get(0).get("startTime", String.class), equalTo(testTime.toString()));
assertThat(result.get(0).get("time", String.class), equalTo(testTime.plusSeconds(100).toString()));

assertThat(result.get(0).toMap(), hasKey("startTime"));
assertThat(result.get(0).toMap(), hasKey("time"));
List<Map<String, Object>> exemplars = (List <Map<String, Object>>)result.get(0).toMap().get("exemplars");
assertThat(exemplars.size(), equalTo(1));
Map<String, Object> exemplar = exemplars.get(0);
Map<String, Object> attributes = (Map<String, Object>)exemplar.get("attributes");
assertThat(attributes.get(key1), equalTo(value1));
assertTrue(attributes.containsKey(dataKey1));

actionOutput = countAggregateAction.concludeGroup(aggregateActionInput2);
final List<Event> result2 = actionOutput.getEvents();
assertThat(result2.size(), equalTo(1));

exemplars = (List <Map<String, Object>>)result2.get(0).toMap().get("exemplars");
assertThat(exemplars.size(), equalTo(1));
exemplar = exemplars.get(0);
attributes = (Map<String, Object>)exemplar.get("attributes");
assertThat(attributes.get(key2), equalTo(value2));
assertTrue(attributes.containsKey(dataKey2));
}
}
Loading

0 comments on commit 691e7e5

Please sign in to comment.