From df182487358946cc95298e2672a3fafafa93480f Mon Sep 17 00:00:00 2001 From: Kondaka Date: Wed, 1 May 2024 23:38:33 -0700 Subject: [PATCH] Addressed comments and added tests Signed-off-by: Kondaka --- .../dataprepper/model/buffer/Buffer.java | 5 ++-- .../dataprepper/model/buffer/BufferTest.java | 7 +++++ .../parser/MultiBufferDecorator.java | 15 ++++------- .../parser/MultiBufferDecoratorTest.java | 15 +++++++---- .../source/loghttp/LogHTTPService.java | 11 ++++---- .../source/loghttp/codec/JsonCodec.java | 13 +++++++--- .../source/loghttp/codec/JsonCodecTest.java | 26 ++++++++++++++++--- .../plugins/kafka/buffer/KafkaBuffer.java | 10 +++---- .../kafka/producer/KafkaCustomProducer.java | 9 +++++++ .../producer/KafkaCustomProducerFactory.java | 14 +++++----- .../plugins/kafka/sink/KafkaSink.java | 3 +-- 11 files changed, 84 insertions(+), 44 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java index 4932bf8cfc..eaaa978230 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java @@ -11,6 +11,7 @@ import java.time.Duration; import java.util.Collection; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeoutException; /** @@ -83,8 +84,8 @@ default boolean isByteBuffer() { return false; } - default Integer getMaxRequestSize() { - return null; + default Optional getMaxRequestSize() { + return Optional.empty(); } /** diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/BufferTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/BufferTest.java index f2c26ffcc5..2236f0ba33 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/BufferTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/BufferTest.java @@ -10,6 +10,7 @@ import org.opensearch.dataprepper.model.record.Record; import java.time.Duration; +import java.util.Optional; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -30,6 +31,12 @@ void testGetDrainTimeout() { assertEquals(Duration.ZERO, buffer.getDrainTimeout()); } + @Test + void testMaxRequestSize() { + final Buffer> buffer = createObjectUnderTest(); + assertEquals(buffer.getMaxRequestSize(), Optional.empty()); + } + @Test void testShutdown() { final Buffer> buffer = createObjectUnderTest(); diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/MultiBufferDecorator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/MultiBufferDecorator.java index 5b8f4ad7cf..eaa6c09491 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/MultiBufferDecorator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/MultiBufferDecorator.java @@ -12,6 +12,8 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Optional; +import java.util.OptionalInt; /** * Buffer decorator created for pipelines that make use of multiple buffers, such as PeerForwarder-enabled pipelines. The decorator @@ -43,16 +45,9 @@ public Duration getDrainTimeout() { } @Override - public Integer getMaxRequestSize() { - Integer result = null; - for (final Buffer buffer : allBuffers) { - Integer maxRequestSize = buffer.getMaxRequestSize(); - if (maxRequestSize != null) { - if (result == null || result > maxRequestSize) - result = maxRequestSize; - } - } - return result; + public Optional getMaxRequestSize() { + OptionalInt maxRequestSize = allBuffers.stream().filter(b -> b.getMaxRequestSize().isPresent()).mapToInt(b -> (Integer)b.getMaxRequestSize().get()).min(); + return maxRequestSize.isPresent() ? Optional.of(maxRequestSize.getAsInt()) : Optional.empty(); } @Override diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/MultiBufferDecoratorTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/MultiBufferDecoratorTest.java index fa405154a1..7a91cf2300 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/MultiBufferDecoratorTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/MultiBufferDecoratorTest.java @@ -17,11 +17,7 @@ import org.opensearch.dataprepper.model.record.Record; import java.time.Duration; -import java.util.AbstractMap; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Random; +import java.util.*; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -221,6 +217,15 @@ void shutdown_MultipleSecondaryBuffers_CallsAllBuffersShutdown() { verify(secondaryBuffer, times(2)).shutdown(); } + @Test + void test_getMaxRequestSize() { + when(primaryBuffer.getMaxRequestSize()).thenReturn(Optional.empty()); + when(secondaryBuffer.getMaxRequestSize()).thenReturn(Optional.empty()); + + final MultiBufferDecorator multiBufferDecorator = createObjectUnderTest(2); + assertThat(multiBufferDecorator.getMaxRequestSize(), equalTo(Optional.empty())); + } + private MultiBufferDecorator createObjectUnderTest(final int secondaryBufferCount) { final List secondaryBuffers = IntStream.range(0, secondaryBufferCount) .mapToObj(i -> secondaryBuffer) diff --git a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPService.java b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPService.java index ac22e9fb8f..ddb71cb216 100644 --- a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPService.java +++ b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPService.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; @@ -52,7 +53,7 @@ public class LogHTTPService { private final Counter successRequestsCounter; private final DistributionSummary payloadSizeSummary; private final Timer requestProcessDuration; - private Integer maxRequestLength; + private Optional maxRequestLength; public LogHTTPService(final int bufferWriteTimeoutInMillis, final Buffer> buffer, @@ -80,7 +81,7 @@ public HttpResponse doPost(final ServiceRequestContext serviceRequestContext, fi } private void sendJsonList(List jsonList) throws Exception { - StringBuilder sb = new StringBuilder(maxRequestLength); + StringBuilder sb = new StringBuilder(maxRequestLength.get()); sb.append("["); String comma = ""; String key = UUID.randomUUID().toString(); @@ -90,7 +91,7 @@ private void sendJsonList(List jsonList) throws Exception { comma = ","; } sb.append("]"); - if (sb.toString().getBytes().length > maxRequestLength) { + if (sb.toString().getBytes().length > maxRequestLength.get()) { throw new RuntimeException("Request length "+ sb.toString().getBytes().length + " exceeds maxRequestLength "+ maxRequestLength); } buffer.writeBytes(sb.toString().getBytes(), key, bufferWriteTimeoutInMillis); @@ -101,14 +102,14 @@ private HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRe List> jsonList; try { - jsonList = (maxRequestLength == null) ? jsonCodec.parse(content) : jsonCodec.parse(content, maxRequestLength - SERIALIZATION_OVERHEAD); + jsonList = (!maxRequestLength.isPresent()) ? jsonCodec.parse(content) : jsonCodec.parse(content, maxRequestLength.get() - SERIALIZATION_OVERHEAD); } catch (IOException e) { LOG.error("Failed to parse the request of size {} due to: {}", content.length(), e.getMessage()); throw new IOException("Bad request data format. Needs to be json array.", e.getCause()); } try { if (buffer.isByteBuffer()) { - if (maxRequestLength != null && content.array().length > maxRequestLength) { + if (maxRequestLength.isPresent() && content.array().length > maxRequestLength.get()) { for (final List innerJsonList: jsonList) { sendJsonList(innerJsonList); } diff --git a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/codec/JsonCodec.java b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/codec/JsonCodec.java index 0b7e264f70..b00fc121a2 100644 --- a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/codec/JsonCodec.java +++ b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/codec/JsonCodec.java @@ -20,6 +20,10 @@ *

*/ public class JsonCodec implements Codec>> { + // To account for "[" and "]" when the list is converted to String + private static final String OVERHEAD_CHARACTERS = "[]"; + // To account for "," when the list is converted to String + private static final int COMMA_OVERHEAD_LENGTH = 1; private static final ObjectMapper mapper = new ObjectMapper(); private static final TypeReference>> LIST_OF_MAP_TYPE_REFERENCE = new TypeReference>>() {}; @@ -42,17 +46,20 @@ public List> parse(HttpData httpData, int maxSize) throws IOExcepti List> jsonList = new ArrayList<>(); final List> logList = mapper.readValue(httpData.toInputStream(), LIST_OF_MAP_TYPE_REFERENCE); - int size = 2; // To account for "[" and "]" when the list is converted to String + int size = OVERHEAD_CHARACTERS.length(); List innerJsonList = new ArrayList<>(); for (final Map log: logList) { final String recordString = mapper.writeValueAsString(log); if (size + recordString.length() > maxSize) { jsonList.add(innerJsonList); innerJsonList = new ArrayList<>(); - size = 2; + size = OVERHEAD_CHARACTERS.length(); } innerJsonList.add(recordString); - size += recordString.length()+1; // +1 is to account for "," when the list is converted to String + size += recordString.length() + COMMA_OVERHEAD_LENGTH; + } + if (size > OVERHEAD_CHARACTERS.length()) { + jsonList.add(innerJsonList); } return jsonList; diff --git a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/codec/JsonCodecTest.java b/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/codec/JsonCodecTest.java index 5ab5b3c357..12b3400906 100644 --- a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/codec/JsonCodecTest.java +++ b/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/codec/JsonCodecTest.java @@ -16,6 +16,7 @@ class JsonCodecTest { private final HttpData goodTestData = HttpData.ofUtf8("[{\"a\":\"b\"}, {\"c\":\"d\"}]"); + private final HttpData goodLargeTestData = HttpData.ofUtf8("[{\"a1\":\"b1\"}, {\"a2\":\"b2\"}, {\"a3\":\"b3\"}, {\"a4\":\"b4\"}, {\"a5\":\"b5\"}]"); private final HttpData badTestDataJsonLine = HttpData.ofUtf8("{\"a\":\"b\"}"); private final HttpData badTestDataMultiJsonLines = HttpData.ofUtf8("{\"a\":\"b\"}{\"c\":\"d\"}"); private final HttpData badTestDataNonJson = HttpData.ofUtf8("non json content"); @@ -24,11 +25,30 @@ class JsonCodecTest { @Test public void testParseSuccess() throws IOException { // When - List res = objectUnderTest.parse(goodTestData); + List> res = objectUnderTest.parse(goodTestData); // Then - assertEquals(2, res.size()); - assertEquals("{\"a\":\"b\"}", res.get(0)); + assertEquals(1, res.size()); + assertEquals(2, res.get(0).size()); + assertEquals("{\"a\":\"b\"}", res.get(0).get(0)); + } + + @Test + public void testParseSuccessWithMaxSize() throws IOException { + // When + List> res = objectUnderTest.parse(goodLargeTestData, 30); + + assertEquals(3, res.size()); + + // Then + assertEquals(2, res.get(0).size()); + assertEquals("{\"a1\":\"b1\"}", res.get(0).get(0)); + assertEquals("{\"a2\":\"b2\"}", res.get(0).get(1)); + assertEquals(2, res.get(1).size()); + assertEquals("{\"a3\":\"b3\"}", res.get(1).get(0)); + assertEquals("{\"a4\":\"b4\"}", res.get(1).get(1)); + assertEquals(1, res.get(2).size()); + assertEquals("{\"a5\":\"b5\"}", res.get(2).get(0)); } @Test diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java index e26abbc172..02e24709eb 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java @@ -41,12 +41,12 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; @DataPrepperPlugin(name = "kafka", pluginType = Buffer.class, pluginConfigurationType = KafkaBufferConfig.class) public class KafkaBuffer extends AbstractBuffer> { @@ -65,7 +65,6 @@ public class KafkaBuffer extends AbstractBuffer> { private final Duration drainTimeout; private AtomicBoolean shutdownInProgress; private ByteDecoder byteDecoder; - private AtomicInteger maxRequestSize; @DataPrepperPluginConstructor public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig kafkaBufferConfig, @@ -76,10 +75,9 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka final SerializationFactory serializationFactory = new BufferSerializationFactory(new CommonSerializationFactory()); final KafkaCustomProducerFactory kafkaCustomProducerFactory = new KafkaCustomProducerFactory(serializationFactory, awsCredentialsSupplier, new TopicServiceFactory()); this.byteDecoder = byteDecoder; - this.maxRequestSize = new AtomicInteger(0); final String metricPrefixName = kafkaBufferConfig.getCustomMetricPrefix().orElse(pluginSetting.getName()); final PluginMetrics producerMetrics = PluginMetrics.fromNames(metricPrefixName + WRITE, pluginSetting.getPipelineName()); - producer = kafkaCustomProducerFactory.createProducer(kafkaBufferConfig, null, null, producerMetrics, null, maxRequestSize, false); + producer = kafkaCustomProducerFactory.createProducer(kafkaBufferConfig, null, null, producerMetrics, null, false); final KafkaCustomConsumerFactory kafkaCustomConsumerFactory = new KafkaCustomConsumerFactory(serializationFactory, awsCredentialsSupplier); innerBuffer = new BlockingBuffer<>(INNER_BUFFER_CAPACITY, INNER_BUFFER_BATCH_SIZE, pluginSetting.getPipelineName()); this.shutdownInProgress = new AtomicBoolean(false); @@ -94,8 +92,8 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka } @Override - public Integer getMaxRequestSize() { - return maxRequestSize.get(); + public Optional getMaxRequestSize() { + return Optional.of(producer.getMaxRequestSize()); } @Override diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java index 56e9783d2b..87db913fa3 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java @@ -26,6 +26,7 @@ import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties; import org.opensearch.dataprepper.plugins.kafka.service.SchemaService; import org.opensearch.dataprepper.plugins.kafka.sink.DLQSink; import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicProducerMetrics; @@ -109,6 +110,14 @@ public void produceRawData(final byte[] bytes, final String key) throws Exceptio } } + public Integer getMaxRequestSize() { + KafkaProducerProperties producerProperties = kafkaProducerConfig.getKafkaProducerProperties(); + if (producerProperties != null) { + return Integer.valueOf(producerProperties.getMaxRequestSize()); + } + return KafkaProducerProperties.DEFAULT_MAX_REQUEST_SIZE; + } + public void produceRecords(final Record record) throws Exception { bufferedEventHandles.add(record.getData().getEventHandle()); Event event = getEvent(record); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerFactory.java index 39e737e834..305727d1b1 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerFactory.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerFactory.java @@ -57,23 +57,21 @@ public KafkaCustomProducerFactory( public KafkaCustomProducer createProducer(final KafkaProducerConfig kafkaProducerConfig, final ExpressionEvaluator expressionEvaluator, final SinkContext sinkContext, final PluginMetrics pluginMetrics, final DLQSink dlqSink, - AtomicInteger maxRequestSize, final boolean topicNameInMetrics) { AwsContext awsContext = new AwsContext(kafkaProducerConfig, awsCredentialsSupplier); KeyFactory keyFactory = new KeyFactory(awsContext); // If either or both of Producer's max_request_size or // Topic's max_message_bytes is set, then maximum of the // two is set for both. If neither is set, then defaults are used. + Integer maxRequestSize = null; KafkaProducerProperties producerProperties = kafkaProducerConfig.getKafkaProducerProperties(); - if (producerProperties != null) { - maxRequestSize.set(Integer.valueOf(producerProperties.getMaxRequestSize())); - } else { - maxRequestSize.set(KafkaProducerProperties.DEFAULT_MAX_REQUEST_SIZE); + int producerMaxRequestSize = producerProperties.getMaxRequestSize(); + if (producerMaxRequestSize > KafkaProducerProperties.DEFAULT_MAX_REQUEST_SIZE) { + maxRequestSize = Integer.valueOf(producerMaxRequestSize); + } } - prepareTopicAndSchema(kafkaProducerConfig, - (maxRequestSize.get() == KafkaProducerProperties.DEFAULT_MAX_REQUEST_SIZE) ? - null : maxRequestSize.get()); + prepareTopicAndSchema(kafkaProducerConfig, maxRequestSize); Properties properties = SinkPropertyConfigurer.getProducerProperties(kafkaProducerConfig); properties = Objects.requireNonNull(properties); KafkaSecurityConfigurer.setAuthProperties(properties, kafkaProducerConfig, LOG); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java index d3ba5af31b..1de42d5f60 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java @@ -168,8 +168,7 @@ private void checkTopicCreationCriteriaAndCreateTopic() { private KafkaCustomProducer createProducer() { final DLQSink dlqSink = new DLQSink(pluginFactory, kafkaSinkConfig, pluginSetting); - AtomicInteger maxRequestSize = new AtomicInteger(0); - return kafkaCustomProducerFactory.createProducer(kafkaSinkConfig, expressionEvaluator, sinkContext, pluginMetrics, dlqSink, maxRequestSize, true); + return kafkaCustomProducerFactory.createProducer(kafkaSinkConfig, expressionEvaluator, sinkContext, pluginMetrics, dlqSink, true); }