diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/expression/ExpressionEvaluator.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/expression/ExpressionEvaluator.java index ea786ce5bd..7dc5930816 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/expression/ExpressionEvaluator.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/expression/ExpressionEvaluator.java @@ -35,5 +35,5 @@ default Boolean evaluateConditional(final String statement, final Event context) Boolean isValidExpressionStatement(final String statement); - Boolean isValidFormatExpressions(final String format); + Boolean isValidFormatExpression(final String format); } \ No newline at end of file diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/expression/ExpressionEvaluatorTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/expression/ExpressionEvaluatorTest.java index 6ce1af8660..9b76fbc807 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/expression/ExpressionEvaluatorTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/expression/ExpressionEvaluatorTest.java @@ -25,7 +25,7 @@ public Boolean isValidExpressionStatement(final String statement) { } @Override - public Boolean isValidFormatExpressions(String format) { + public Boolean isValidFormatExpression(String format) { return true; } } diff --git a/data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator.java b/data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator.java index 89d8b15a84..6653e5c7b4 100644 --- a/data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator.java +++ b/data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator.java @@ -55,7 +55,7 @@ public Boolean isValidExpressionStatement(final String statement) { } @Override - public Boolean isValidFormatExpressions(final String format) { + public Boolean isValidFormatExpression(final String format) { int fromIndex = 0; int position = 0; while ((position = format.indexOf("${", fromIndex)) != -1) { diff --git a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluatorTest.java b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluatorTest.java index 0b8ac2ee03..a91e7fe368 100644 --- a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluatorTest.java +++ b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluatorTest.java @@ -132,14 +132,14 @@ void isValidExpressionStatement_returns_false_when_parse_throws() { "abc-${invalid, false" }) void isValidFormatExpressionsReturnsCorrectResult(final String format, final Boolean expectedResult) { - assertThat(statementEvaluator.isValidFormatExpressions(format), equalTo(expectedResult)); + assertThat(statementEvaluator.isValidFormatExpression(format), equalTo(expectedResult)); } @ParameterizedTest @ValueSource(strings = {"abc-${anyS(=tring}"}) void isValidFormatExpressionsReturnsFalseWhenIsValidKeyAndValidExpressionIsFalse(final String format) { doThrow(RuntimeException.class).when(parser).parse(anyString()); - assertThat(statementEvaluator.isValidFormatExpressions(format), equalTo(false)); + assertThat(statementEvaluator.isValidFormatExpression(format), equalTo(false)); } } diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index 97c80fe3bc..a5237f21de 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -682,7 +682,7 @@ public void testOpenSearchBulkActionsCreateWithExpression() throws IOException, Event event = (Event) testRecords.get(0).getData(); event.getMetadata().setAttribute("action", "create"); final String actionFormatExpression = "${getMetadata(\"action\")}"; - when(expressionEvaluator.isValidFormatExpressions(actionFormatExpression)).thenReturn(true); + when(expressionEvaluator.isValidFormatExpression(actionFormatExpression)).thenReturn(true); when(expressionEvaluator.isValidExpressionStatement("getMetadata(\"action\")")).thenReturn(true); when(expressionEvaluator.evaluate("getMetadata(\"action\")", event)).thenReturn(event.getMetadata().getAttribute("action")); pluginSetting.getSettings().put(IndexConfiguration.ACTION, actionFormatExpression); @@ -715,7 +715,7 @@ public void testOpenSearchBulkActionsCreateWithInvalidExpression() throws IOExce Event event = (Event) testRecords.get(0).getData(); event.getMetadata().setAttribute("action", "unknown"); final String actionFormatExpression = "${getMetadata(\"action\")}"; - when(expressionEvaluator.isValidFormatExpressions(actionFormatExpression)).thenReturn(true); + when(expressionEvaluator.isValidFormatExpression(actionFormatExpression)).thenReturn(true); when(expressionEvaluator.isValidExpressionStatement("getMetadata(\"action\")")).thenReturn(true); when(expressionEvaluator.evaluate("getMetadata(\"action\")", event)).thenReturn(event.getMetadata().getAttribute("action")); pluginSetting.getSettings().put(IndexConfiguration.ACTION, actionFormatExpression); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index b1edff4802..19676a6a2a 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -371,12 +371,17 @@ public void doOutput(final Collection> records) { versionExpressionEvaluationResult = event.formatString(versionExpression, expressionEvaluator); version = Long.valueOf(event.formatString(versionExpression, expressionEvaluator)); } catch (final NumberFormatException e) { - LOG.warn("Unable to convert the result of evaluating document_version '{}' to Long for an Event. The evaluation result '{}' must be a valid Long type", versionExpression, versionExpressionEvaluationResult); - logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e); + final String errorMessage = String.format( + "Unable to convert the result of evaluating document_version '%s' to Long for an Event. The evaluation result '%s' must be a valid Long type", versionExpression, versionExpressionEvaluationResult + ); + LOG.error(errorMessage); + logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, errorMessage)), e); dynamicDocumentVersionDroppedEvents.increment(); } catch (final RuntimeException e) { - LOG.error("There was an exception when evaluating the document_version '{}'. Check the dlq if configured to see details about the affected Event: {}", versionExpression, e.getMessage()); - logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e); + final String errorMessage = String.format( + "There was an exception when evaluating the document_version '%s': %s", versionExpression, e.getMessage()); + LOG.error(errorMessage + " Check the dlq if configured to see more details about the affected Event"); + logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, errorMessage)), e); dynamicDocumentVersionDroppedEvents.increment(); } } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java index c6ad893578..363e074c65 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java @@ -232,7 +232,7 @@ public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetti final String versionType = pluginSetting.getStringOrDefault(DOCUMENT_VERSION_TYPE, null); builder = builder.withVersionExpression(versionExpression); - if (versionExpression != null && (!expressionEvaluator.isValidFormatExpressions(versionExpression))) { + if (versionExpression != null && (!expressionEvaluator.isValidFormatExpression(versionExpression))) { throw new InvalidPluginConfigurationException("document_version {} is not a valid format expression."); } @@ -546,7 +546,7 @@ public Builder withIsmPolicyFile(final String ismPolicyFile) { public Builder withAction(final String action, final ExpressionEvaluator expressionEvaluator) { checkArgument((EnumUtils.isValidEnumIgnoreCase(OpenSearchBulkActions.class, action) || - (action.contains("${") && expressionEvaluator.isValidFormatExpressions(action))), "action \"" + action + "\" is invalid. action must be one of the following: " + Arrays.stream(OpenSearchBulkActions.values()).collect(Collectors.toList())); + (action.contains("${") && expressionEvaluator.isValidFormatExpression(action))), "action \"" + action + "\" is invalid. action must be one of the following: " + Arrays.stream(OpenSearchBulkActions.values()).collect(Collectors.toList())); this.action = action; return this; } @@ -556,7 +556,7 @@ public Builder withActions(final List> actions, final Expres String action = (String)actionMap.get("type"); if (action != null) { checkArgument((EnumUtils.isValidEnumIgnoreCase(OpenSearchBulkActions.class, action) || - (action.contains("${") && expressionEvaluator.isValidFormatExpressions(action))), "action \"" + action + "\". action must be one of the following: " + Arrays.stream(OpenSearchBulkActions.values()).collect(Collectors.toList())); + (action.contains("${") && expressionEvaluator.isValidFormatExpression(action))), "action \"" + action + "\". action must be one of the following: " + Arrays.stream(OpenSearchBulkActions.values()).collect(Collectors.toList())); } } this.actions = actions; diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkConfigurationTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkConfigurationTests.java index 78f3efc516..cb9dfbe898 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkConfigurationTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkConfigurationTests.java @@ -142,7 +142,7 @@ public void testReadESConfigWithBulkActionCreateExpression() { pluginSetting.setPipelineName(PIPELINE_NAME); expressionEvaluator = mock(ExpressionEvaluator.class); - when(expressionEvaluator.isValidFormatExpressions(actionFormatExpression)).thenReturn(true); + when(expressionEvaluator.isValidFormatExpression(actionFormatExpression)).thenReturn(true); final OpenSearchSinkConfiguration openSearchSinkConfiguration = OpenSearchSinkConfiguration.readESConfig(pluginSetting, expressionEvaluator); diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java new file mode 100644 index 0000000000..237f4b9a2f --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java @@ -0,0 +1,296 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Timer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockedConstruction; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.client.RestHighLevelClient; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.MetricNames; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.failures.DlqObject; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.SinkContext; +import org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedDlqData; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.DocumentBuilder; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexManager; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexManagerFactory; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.TemplateStrategy; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.TemplateType; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.UUID; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.model.sink.SinkLatencyMetrics.EXTERNAL_LATENCY; +import static org.opensearch.dataprepper.model.sink.SinkLatencyMetrics.INTERNAL_LATENCY; +import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.BULKREQUEST_ERRORS; +import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.BULKREQUEST_LATENCY; +import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.BULKREQUEST_SIZE_BYTES; +import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.DYNAMIC_INDEX_DROPPED_EVENTS; +import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.INVALID_ACTION_ERRORS; +import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.INVALID_VERSION_EXPRESSION_DROPPED_EVENTS; +import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration.DEFAULT_BULK_SIZE; +import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration.DEFAULT_FLUSH_TIMEOUT; + +@ExtendWith(MockitoExtension.class) +public class OpenSearchSinkTest { + + @Mock + private IndexManagerFactory indexManagerFactory; + + @Mock + private OpenSearchClient openSearchClient; + + @Mock + private PluginFactory pluginFactory; + + @Mock + private SinkContext sinkContext; + + @Mock + private PluginSetting pluginSetting; + + @Mock + private ExpressionEvaluator expressionEvaluator; + + @Mock + private AwsCredentialsSupplier awsCredentialsSupplier; + + @Mock + private OpenSearchSinkConfiguration openSearchSinkConfiguration; + + @Mock + private IndexConfiguration indexConfiguration; + + @Mock + private IndexManager indexManager; + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private Timer bulkRequestTimer; + + @Mock + private Counter bulkRequestErrorsCounter; + + @Mock + private Counter invalidActionErrorsCounter; + + @Mock + private Counter dynamicIndexDroppedEvents; + + @Mock + private DistributionSummary bulkRequestSizeBytesSummary; + + @Mock + private Counter dynamicDocumentVersionDroppedEvents; + + @BeforeEach + void setup() { + when(pluginSetting.getPipelineName()).thenReturn(UUID.randomUUID().toString()); + when(pluginSetting.getName()).thenReturn(UUID.randomUUID().toString()); + + final RetryConfiguration retryConfiguration = mock(RetryConfiguration.class); + when(retryConfiguration.getDlq()).thenReturn(Optional.empty()); + when(retryConfiguration.getDlqFile()).thenReturn(null); + + final ConnectionConfiguration connectionConfiguration = mock(ConnectionConfiguration.class); + final RestHighLevelClient restHighLevelClient = mock(RestHighLevelClient.class); + when(connectionConfiguration.createClient(awsCredentialsSupplier)).thenReturn(restHighLevelClient); + when(connectionConfiguration.createOpenSearchClient(restHighLevelClient, awsCredentialsSupplier)).thenReturn(openSearchClient); + + when(indexConfiguration.getAction()).thenReturn("index"); + when(indexConfiguration.getDocumentId()).thenReturn(null); + when(indexConfiguration.getDocumentIdField()).thenReturn(null); + when(indexConfiguration.getRoutingField()).thenReturn(null); + when(indexConfiguration.getActions()).thenReturn(null); + when(indexConfiguration.getDocumentRootKey()).thenReturn(null); + when(indexConfiguration.getVersionType()).thenReturn(null); + when(indexConfiguration.getVersionExpression()).thenReturn(null); + when(indexConfiguration.getIndexAlias()).thenReturn(UUID.randomUUID().toString()); + when(indexConfiguration.getTemplateType()).thenReturn(TemplateType.V1); + when(indexConfiguration.getIndexType()).thenReturn(IndexType.CUSTOM); + when(indexConfiguration.getBulkSize()).thenReturn(DEFAULT_BULK_SIZE); + when(indexConfiguration.getFlushTimeout()).thenReturn(DEFAULT_FLUSH_TIMEOUT); + + when(openSearchSinkConfiguration.getIndexConfiguration()).thenReturn(indexConfiguration); + when(openSearchSinkConfiguration.getRetryConfiguration()).thenReturn(retryConfiguration); + when(openSearchSinkConfiguration.getConnectionConfiguration()).thenReturn(connectionConfiguration); + + when(pluginMetrics.counter(MetricNames.RECORDS_IN)).thenReturn(mock(Counter.class)); + when(pluginMetrics.timer(MetricNames.TIME_ELAPSED)).thenReturn(mock(Timer.class)); + when(pluginMetrics.summary(INTERNAL_LATENCY)).thenReturn(mock(DistributionSummary.class)); + when(pluginMetrics.summary(EXTERNAL_LATENCY)).thenReturn(mock(DistributionSummary.class)); + when(pluginMetrics.timer(BULKREQUEST_LATENCY)).thenReturn(bulkRequestTimer); + when(pluginMetrics.counter(BULKREQUEST_ERRORS)).thenReturn(bulkRequestErrorsCounter); + when(pluginMetrics.counter(INVALID_ACTION_ERRORS)).thenReturn(invalidActionErrorsCounter); + when(pluginMetrics.counter(DYNAMIC_INDEX_DROPPED_EVENTS)).thenReturn(dynamicIndexDroppedEvents); + when(pluginMetrics.counter(INVALID_VERSION_EXPRESSION_DROPPED_EVENTS)).thenReturn(dynamicDocumentVersionDroppedEvents); + when(pluginMetrics.summary(BULKREQUEST_SIZE_BYTES)).thenReturn(bulkRequestSizeBytesSummary); + + when(sinkContext.getTagsTargetKey()).thenReturn(null); + when(sinkContext.getIncludeKeys()).thenReturn(null); + when(sinkContext.getExcludeKeys()).thenReturn(null); + } + + private OpenSearchSink createObjectUnderTest() throws IOException { + try (final MockedStatic openSearchSinkConfigurationMockedStatic = mockStatic(OpenSearchSinkConfiguration.class); + final MockedStatic pluginMetricsMockedStatic = mockStatic(PluginMetrics.class); + final MockedConstruction indexManagerFactoryMockedConstruction = mockConstruction(IndexManagerFactory.class, (mock, context) -> { + indexManagerFactory = mock; + })) { + pluginMetricsMockedStatic.when(() -> PluginMetrics.fromPluginSetting(pluginSetting)).thenReturn(pluginMetrics); + openSearchSinkConfigurationMockedStatic.when(() -> OpenSearchSinkConfiguration.readESConfig(pluginSetting, expressionEvaluator)) + .thenReturn(openSearchSinkConfiguration); + return new OpenSearchSink(pluginSetting, pluginFactory, sinkContext, expressionEvaluator, awsCredentialsSupplier); + } + } + + @Test + void doOutput_with_invalid_version_expression_catches_NumberFormatException_and_creates_DLQObject() throws IOException { + + final String versionExpression = UUID.randomUUID().toString(); + when(indexConfiguration.getVersionExpression()).thenReturn(versionExpression); + + final Event event = mock(JacksonEvent.class); + final String document = UUID.randomUUID().toString(); + when(event.toJsonString()).thenReturn(document); + final EventHandle eventHandle = mock(EventHandle.class); + when(event.getEventHandle()).thenReturn(eventHandle); + final String index = UUID.randomUUID().toString(); + when(event.formatString(versionExpression, expressionEvaluator)).thenReturn("not_a_number"); + when(event.formatString(indexConfiguration.getIndexAlias(), expressionEvaluator)).thenReturn(index); + final Record eventRecord = new Record<>(event); + + final OpenSearchSink objectUnderTest = createObjectUnderTest(); + when(indexManagerFactory.getIndexManager(any(IndexType.class), eq(openSearchClient), any(RestHighLevelClient.class), eq(openSearchSinkConfiguration), any(TemplateStrategy.class), any())) + .thenReturn(indexManager); + doNothing().when(indexManager).setupIndex(); + objectUnderTest.initialize(); + + when(indexManager.getIndexName(anyString())).thenReturn(index); + + final DlqObject dlqObject = mock(DlqObject.class); + + final DlqObject.Builder dlqObjectBuilder = mock(DlqObject.Builder.class); + final ArgumentCaptor failedDlqData = ArgumentCaptor.forClass(FailedDlqData.class); + when(dlqObjectBuilder.withEventHandle(eventHandle)).thenReturn(dlqObjectBuilder); + when(dlqObjectBuilder.withFailedData(failedDlqData.capture())).thenReturn(dlqObjectBuilder); + when(dlqObjectBuilder.withPluginName(pluginSetting.getName())).thenReturn(dlqObjectBuilder); + when(dlqObjectBuilder.withPluginId(pluginSetting.getName())).thenReturn(dlqObjectBuilder); + when(dlqObjectBuilder.withPipelineName(pluginSetting.getPipelineName())).thenReturn(dlqObjectBuilder); + + when(dlqObject.getFailedData()).thenReturn(mock(FailedDlqData.class)); + doNothing().when(dlqObject).releaseEventHandle(false); + when(dlqObjectBuilder.build()).thenReturn(dlqObject); + + try (final MockedStatic documentBuilderMockedStatic = mockStatic(DocumentBuilder.class); + final MockedStatic dlqObjectMockedStatic = mockStatic(DlqObject.class)) { + documentBuilderMockedStatic.when(() -> DocumentBuilder.build(eq(event), eq(null), eq(null), eq(null), eq(null))) + .thenReturn(UUID.randomUUID().toString()); + + dlqObjectMockedStatic.when(DlqObject::builder).thenReturn(dlqObjectBuilder); + objectUnderTest.doOutput(List.of(eventRecord)); + } + + final FailedDlqData failedDlqDataResult = failedDlqData.getValue(); + assertThat(failedDlqDataResult, notNullValue()); + assertThat(failedDlqDataResult.getDocument(), equalTo(document)); + assertThat(failedDlqDataResult.getIndex(), equalTo(index)); + assertThat(failedDlqDataResult.getMessage().startsWith("Unable to convert the result of evaluating document_version"), equalTo(true)); + + verify(dynamicDocumentVersionDroppedEvents).increment(); + } + + @Test + void doOutput_with_invalid_version_expression_result_catches_RuntimeException_and_creates_DLQObject() throws IOException { + + final String versionExpression = UUID.randomUUID().toString(); + when(indexConfiguration.getVersionExpression()).thenReturn(versionExpression); + + final Event event = mock(JacksonEvent.class); + final String document = UUID.randomUUID().toString(); + when(event.toJsonString()).thenReturn(document); + final EventHandle eventHandle = mock(EventHandle.class); + when(event.getEventHandle()).thenReturn(eventHandle); + final String index = UUID.randomUUID().toString(); + when(event.formatString(versionExpression, expressionEvaluator)).thenThrow(RuntimeException.class); + when(event.formatString(indexConfiguration.getIndexAlias(), expressionEvaluator)).thenReturn(index); + final Record eventRecord = new Record<>(event); + + final OpenSearchSink objectUnderTest = createObjectUnderTest(); + when(indexManagerFactory.getIndexManager(any(IndexType.class), eq(openSearchClient), any(RestHighLevelClient.class), eq(openSearchSinkConfiguration), any(TemplateStrategy.class), any())) + .thenReturn(indexManager); + doNothing().when(indexManager).setupIndex(); + objectUnderTest.initialize(); + + when(indexManager.getIndexName(anyString())).thenReturn(index); + + final DlqObject dlqObject = mock(DlqObject.class); + + final DlqObject.Builder dlqObjectBuilder = mock(DlqObject.Builder.class); + final ArgumentCaptor failedDlqData = ArgumentCaptor.forClass(FailedDlqData.class); + when(dlqObjectBuilder.withEventHandle(eventHandle)).thenReturn(dlqObjectBuilder); + when(dlqObjectBuilder.withFailedData(failedDlqData.capture())).thenReturn(dlqObjectBuilder); + when(dlqObjectBuilder.withPluginName(pluginSetting.getName())).thenReturn(dlqObjectBuilder); + when(dlqObjectBuilder.withPluginId(pluginSetting.getName())).thenReturn(dlqObjectBuilder); + when(dlqObjectBuilder.withPipelineName(pluginSetting.getPipelineName())).thenReturn(dlqObjectBuilder); + + when(dlqObject.getFailedData()).thenReturn(mock(FailedDlqData.class)); + doNothing().when(dlqObject).releaseEventHandle(false); + when(dlqObjectBuilder.build()).thenReturn(dlqObject); + + try (final MockedStatic documentBuilderMockedStatic = mockStatic(DocumentBuilder.class); + final MockedStatic dlqObjectMockedStatic = mockStatic(DlqObject.class)) { + documentBuilderMockedStatic.when(() -> DocumentBuilder.build(eq(event), eq(null), eq(null), eq(null), eq(null))) + .thenReturn(UUID.randomUUID().toString()); + + dlqObjectMockedStatic.when(DlqObject::builder).thenReturn(dlqObjectBuilder); + objectUnderTest.doOutput(List.of(eventRecord)); + } + + final FailedDlqData failedDlqDataResult = failedDlqData.getValue(); + assertThat(failedDlqDataResult, notNullValue()); + assertThat(failedDlqDataResult.getDocument(), equalTo(document)); + assertThat(failedDlqDataResult.getIndex(), equalTo(index)); + assertThat(failedDlqDataResult.getMessage().startsWith("There was an exception when evaluating the document_version"), equalTo(true)); + + verify(dynamicDocumentVersionDroppedEvents).increment(); + } +} diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java index b0878f53e9..99251fb956 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java @@ -481,7 +481,7 @@ public void testReadIndexConfig_emptyDocumentRootKey() { public void testReadIndexConfig_withValidDocumentVersionExpression(final String versionExpression) { final ExpressionEvaluator expressionEvaluator = mock(ExpressionEvaluator.class); - when(expressionEvaluator.isValidFormatExpressions(versionExpression)).thenReturn(true); + when(expressionEvaluator.isValidFormatExpression(versionExpression)).thenReturn(true); final Map metadata = initializeConfigMetaData( IndexType.CUSTOM.getValue(), "foo", null, null, null, null, null); @@ -500,7 +500,7 @@ public void testReadIndexConfig_withInvalidDocumentVersionExpression_throws_Inva final String versionExpression = UUID.randomUUID().toString(); final ExpressionEvaluator expressionEvaluator = mock(ExpressionEvaluator.class); - when(expressionEvaluator.isValidFormatExpressions(versionExpression)).thenReturn(false); + when(expressionEvaluator.isValidFormatExpression(versionExpression)).thenReturn(false); final Map metadata = initializeConfigMetaData( IndexType.CUSTOM.getValue(), "foo", null, null, null, null, null);