Skip to content

Commit

Permalink
Addressed comments and added tests
Browse files Browse the repository at this point in the history
Signed-off-by: Kondaka <[email protected]>
  • Loading branch information
kkondaka committed May 2, 2024
1 parent 24cc863 commit df18248
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeoutException;

/**
Expand Down Expand Up @@ -83,8 +84,8 @@ default boolean isByteBuffer() {
return false;
}

default Integer getMaxRequestSize() {
return null;
default Optional<Integer> getMaxRequestSize() {
return Optional.empty();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.opensearch.dataprepper.model.record.Record;

import java.time.Duration;
import java.util.Optional;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -30,6 +31,12 @@ void testGetDrainTimeout() {
assertEquals(Duration.ZERO, buffer.getDrainTimeout());
}

@Test
void testMaxRequestSize() {
final Buffer<Record<Event>> buffer = createObjectUnderTest();
assertEquals(buffer.getMaxRequestSize(), Optional.empty());
}

@Test
void testShutdown() {
final Buffer<Record<Event>> buffer = createObjectUnderTest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;

/**
* Buffer decorator created for pipelines that make use of multiple buffers, such as PeerForwarder-enabled pipelines. The decorator
Expand Down Expand Up @@ -43,16 +45,9 @@ public Duration getDrainTimeout() {
}

@Override
public Integer getMaxRequestSize() {
Integer result = null;
for (final Buffer buffer : allBuffers) {
Integer maxRequestSize = buffer.getMaxRequestSize();
if (maxRequestSize != null) {
if (result == null || result > maxRequestSize)
result = maxRequestSize;
}
}
return result;
public Optional<Integer> getMaxRequestSize() {
OptionalInt maxRequestSize = allBuffers.stream().filter(b -> b.getMaxRequestSize().isPresent()).mapToInt(b -> (Integer)b.getMaxRequestSize().get()).min();
return maxRequestSize.isPresent() ? Optional.of(maxRequestSize.getAsInt()) : Optional.empty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@
import org.opensearch.dataprepper.model.record.Record;

import java.time.Duration;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.*;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -221,6 +217,15 @@ void shutdown_MultipleSecondaryBuffers_CallsAllBuffersShutdown() {
verify(secondaryBuffer, times(2)).shutdown();
}

@Test
void test_getMaxRequestSize() {
when(primaryBuffer.getMaxRequestSize()).thenReturn(Optional.empty());
when(secondaryBuffer.getMaxRequestSize()).thenReturn(Optional.empty());

final MultiBufferDecorator multiBufferDecorator = createObjectUnderTest(2);
assertThat(multiBufferDecorator.getMaxRequestSize(), equalTo(Optional.empty()));
}

private MultiBufferDecorator createObjectUnderTest(final int secondaryBufferCount) {
final List<Buffer> secondaryBuffers = IntStream.range(0, secondaryBufferCount)
.mapToObj(i -> secondaryBuffer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

Expand All @@ -52,7 +53,7 @@ public class LogHTTPService {
private final Counter successRequestsCounter;
private final DistributionSummary payloadSizeSummary;
private final Timer requestProcessDuration;
private Integer maxRequestLength;
private Optional<Integer> maxRequestLength;

public LogHTTPService(final int bufferWriteTimeoutInMillis,
final Buffer<Record<Log>> buffer,
Expand Down Expand Up @@ -80,7 +81,7 @@ public HttpResponse doPost(final ServiceRequestContext serviceRequestContext, fi
}

private void sendJsonList(List<String> jsonList) throws Exception {
StringBuilder sb = new StringBuilder(maxRequestLength);
StringBuilder sb = new StringBuilder(maxRequestLength.get());
sb.append("[");
String comma = "";
String key = UUID.randomUUID().toString();
Expand All @@ -90,7 +91,7 @@ private void sendJsonList(List<String> jsonList) throws Exception {
comma = ",";
}
sb.append("]");
if (sb.toString().getBytes().length > maxRequestLength) {
if (sb.toString().getBytes().length > maxRequestLength.get()) {
throw new RuntimeException("Request length "+ sb.toString().getBytes().length + " exceeds maxRequestLength "+ maxRequestLength);
}
buffer.writeBytes(sb.toString().getBytes(), key, bufferWriteTimeoutInMillis);
Expand All @@ -101,14 +102,14 @@ private HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRe
List<List<String>> jsonList;

try {
jsonList = (maxRequestLength == null) ? jsonCodec.parse(content) : jsonCodec.parse(content, maxRequestLength - SERIALIZATION_OVERHEAD);
jsonList = (!maxRequestLength.isPresent()) ? jsonCodec.parse(content) : jsonCodec.parse(content, maxRequestLength.get() - SERIALIZATION_OVERHEAD);
} catch (IOException e) {
LOG.error("Failed to parse the request of size {} due to: {}", content.length(), e.getMessage());
throw new IOException("Bad request data format. Needs to be json array.", e.getCause());
}
try {
if (buffer.isByteBuffer()) {
if (maxRequestLength != null && content.array().length > maxRequestLength) {
if (maxRequestLength.isPresent() && content.array().length > maxRequestLength.get()) {
for (final List<String> innerJsonList: jsonList) {
sendJsonList(innerJsonList);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
* <p>
*/
public class JsonCodec implements Codec<List<List<String>>> {
// To account for "[" and "]" when the list is converted to String
private static final String OVERHEAD_CHARACTERS = "[]";
// To account for "," when the list is converted to String
private static final int COMMA_OVERHEAD_LENGTH = 1;
private static final ObjectMapper mapper = new ObjectMapper();
private static final TypeReference<List<Map<String, Object>>> LIST_OF_MAP_TYPE_REFERENCE =
new TypeReference<List<Map<String, Object>>>() {};
Expand All @@ -42,17 +46,20 @@ public List<List<String>> parse(HttpData httpData, int maxSize) throws IOExcepti
List<List<String>> jsonList = new ArrayList<>();
final List<Map<String, Object>> logList = mapper.readValue(httpData.toInputStream(),
LIST_OF_MAP_TYPE_REFERENCE);
int size = 2; // To account for "[" and "]" when the list is converted to String
int size = OVERHEAD_CHARACTERS.length();
List<String> innerJsonList = new ArrayList<>();
for (final Map<String, Object> log: logList) {
final String recordString = mapper.writeValueAsString(log);
if (size + recordString.length() > maxSize) {
jsonList.add(innerJsonList);
innerJsonList = new ArrayList<>();
size = 2;
size = OVERHEAD_CHARACTERS.length();
}
innerJsonList.add(recordString);
size += recordString.length()+1; // +1 is to account for "," when the list is converted to String
size += recordString.length() + COMMA_OVERHEAD_LENGTH;
}
if (size > OVERHEAD_CHARACTERS.length()) {
jsonList.add(innerJsonList);
}

return jsonList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

class JsonCodecTest {
private final HttpData goodTestData = HttpData.ofUtf8("[{\"a\":\"b\"}, {\"c\":\"d\"}]");
private final HttpData goodLargeTestData = HttpData.ofUtf8("[{\"a1\":\"b1\"}, {\"a2\":\"b2\"}, {\"a3\":\"b3\"}, {\"a4\":\"b4\"}, {\"a5\":\"b5\"}]");
private final HttpData badTestDataJsonLine = HttpData.ofUtf8("{\"a\":\"b\"}");
private final HttpData badTestDataMultiJsonLines = HttpData.ofUtf8("{\"a\":\"b\"}{\"c\":\"d\"}");
private final HttpData badTestDataNonJson = HttpData.ofUtf8("non json content");
Expand All @@ -24,11 +25,30 @@ class JsonCodecTest {
@Test
public void testParseSuccess() throws IOException {
// When
List<String> res = objectUnderTest.parse(goodTestData);
List<List<String>> res = objectUnderTest.parse(goodTestData);

// Then
assertEquals(2, res.size());
assertEquals("{\"a\":\"b\"}", res.get(0));
assertEquals(1, res.size());
assertEquals(2, res.get(0).size());
assertEquals("{\"a\":\"b\"}", res.get(0).get(0));
}

@Test
public void testParseSuccessWithMaxSize() throws IOException {
// When
List<List<String>> res = objectUnderTest.parse(goodLargeTestData, 30);

assertEquals(3, res.size());

// Then
assertEquals(2, res.get(0).size());
assertEquals("{\"a1\":\"b1\"}", res.get(0).get(0));
assertEquals("{\"a2\":\"b2\"}", res.get(0).get(1));
assertEquals(2, res.get(1).size());
assertEquals("{\"a3\":\"b3\"}", res.get(1).get(0));
assertEquals("{\"a4\":\"b4\"}", res.get(1).get(1));
assertEquals(1, res.get(2).size());
assertEquals("{\"a5\":\"b5\"}", res.get(2).get(0));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

@DataPrepperPlugin(name = "kafka", pluginType = Buffer.class, pluginConfigurationType = KafkaBufferConfig.class)
public class KafkaBuffer extends AbstractBuffer<Record<Event>> {
Expand All @@ -65,7 +65,6 @@ public class KafkaBuffer extends AbstractBuffer<Record<Event>> {
private final Duration drainTimeout;
private AtomicBoolean shutdownInProgress;
private ByteDecoder byteDecoder;
private AtomicInteger maxRequestSize;

@DataPrepperPluginConstructor
public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig kafkaBufferConfig,
Expand All @@ -76,10 +75,9 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka
final SerializationFactory serializationFactory = new BufferSerializationFactory(new CommonSerializationFactory());
final KafkaCustomProducerFactory kafkaCustomProducerFactory = new KafkaCustomProducerFactory(serializationFactory, awsCredentialsSupplier, new TopicServiceFactory());
this.byteDecoder = byteDecoder;
this.maxRequestSize = new AtomicInteger(0);
final String metricPrefixName = kafkaBufferConfig.getCustomMetricPrefix().orElse(pluginSetting.getName());
final PluginMetrics producerMetrics = PluginMetrics.fromNames(metricPrefixName + WRITE, pluginSetting.getPipelineName());
producer = kafkaCustomProducerFactory.createProducer(kafkaBufferConfig, null, null, producerMetrics, null, maxRequestSize, false);
producer = kafkaCustomProducerFactory.createProducer(kafkaBufferConfig, null, null, producerMetrics, null, false);
final KafkaCustomConsumerFactory kafkaCustomConsumerFactory = new KafkaCustomConsumerFactory(serializationFactory, awsCredentialsSupplier);
innerBuffer = new BlockingBuffer<>(INNER_BUFFER_CAPACITY, INNER_BUFFER_BATCH_SIZE, pluginSetting.getPipelineName());
this.shutdownInProgress = new AtomicBoolean(false);
Expand All @@ -94,8 +92,8 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka
}

@Override
public Integer getMaxRequestSize() {
return maxRequestSize.get();
public Optional<Integer> getMaxRequestSize() {
return Optional.of(producer.getMaxRequestSize());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties;
import org.opensearch.dataprepper.plugins.kafka.service.SchemaService;
import org.opensearch.dataprepper.plugins.kafka.sink.DLQSink;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicProducerMetrics;
Expand Down Expand Up @@ -109,6 +110,14 @@ public void produceRawData(final byte[] bytes, final String key) throws Exceptio
}
}

public Integer getMaxRequestSize() {
KafkaProducerProperties producerProperties = kafkaProducerConfig.getKafkaProducerProperties();
if (producerProperties != null) {
return Integer.valueOf(producerProperties.getMaxRequestSize());
}
return KafkaProducerProperties.DEFAULT_MAX_REQUEST_SIZE;
}

public void produceRecords(final Record<Event> record) throws Exception {
bufferedEventHandles.add(record.getData().getEventHandle());
Event event = getEvent(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,21 @@ public KafkaCustomProducerFactory(
public KafkaCustomProducer createProducer(final KafkaProducerConfig kafkaProducerConfig,
final ExpressionEvaluator expressionEvaluator, final SinkContext sinkContext, final PluginMetrics pluginMetrics,
final DLQSink dlqSink,
AtomicInteger maxRequestSize,
final boolean topicNameInMetrics) {
AwsContext awsContext = new AwsContext(kafkaProducerConfig, awsCredentialsSupplier);
KeyFactory keyFactory = new KeyFactory(awsContext);
// If either or both of Producer's max_request_size or
// Topic's max_message_bytes is set, then maximum of the
// two is set for both. If neither is set, then defaults are used.
Integer maxRequestSize = null;
KafkaProducerProperties producerProperties = kafkaProducerConfig.getKafkaProducerProperties();

if (producerProperties != null) {
maxRequestSize.set(Integer.valueOf(producerProperties.getMaxRequestSize()));
} else {
maxRequestSize.set(KafkaProducerProperties.DEFAULT_MAX_REQUEST_SIZE);
int producerMaxRequestSize = producerProperties.getMaxRequestSize();
if (producerMaxRequestSize > KafkaProducerProperties.DEFAULT_MAX_REQUEST_SIZE) {
maxRequestSize = Integer.valueOf(producerMaxRequestSize);
}
}
prepareTopicAndSchema(kafkaProducerConfig,
(maxRequestSize.get() == KafkaProducerProperties.DEFAULT_MAX_REQUEST_SIZE) ?
null : maxRequestSize.get());
prepareTopicAndSchema(kafkaProducerConfig, maxRequestSize);
Properties properties = SinkPropertyConfigurer.getProducerProperties(kafkaProducerConfig);
properties = Objects.requireNonNull(properties);
KafkaSecurityConfigurer.setAuthProperties(properties, kafkaProducerConfig, LOG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,7 @@ private void checkTopicCreationCriteriaAndCreateTopic() {

private KafkaCustomProducer createProducer() {
final DLQSink dlqSink = new DLQSink(pluginFactory, kafkaSinkConfig, pluginSetting);
AtomicInteger maxRequestSize = new AtomicInteger(0);
return kafkaCustomProducerFactory.createProducer(kafkaSinkConfig, expressionEvaluator, sinkContext, pluginMetrics, dlqSink, maxRequestSize, true);
return kafkaCustomProducerFactory.createProducer(kafkaSinkConfig, expressionEvaluator, sinkContext, pluginMetrics, dlqSink, true);
}


Expand Down

0 comments on commit df18248

Please sign in to comment.