Skip to content

Commit

Permalink
Datadog Sink Connector issue DataDog#10 fix
Browse files Browse the repository at this point in the history
Ritchie Brothers Auctioneers fix for messages <5MB causing IOException.
DataDog#10
  • Loading branch information
andwarnbros authored Dec 12, 2022
1 parent 3452e59 commit 5abe70a
Showing 1 changed file with 58 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,15 @@ This product includes software developed at Datadog (https://www.datadoghq.com/)
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.zip.GZIPOutputStream;
import javax.naming.SizeLimitExceededException;
import javax.ws.rs.core.Response;

public class DatadogLogsApiWriter {
public static final int ONE_MEGABYTE = 1000000;
public static final int MAXIMUM_BATCH_MEGABYTES = 3;
public static final int MINIMUM_LIST_LENGTH = 0;
public static final int MINIMUM_STRING_LENGTH = 0;
public static final int TRUNCATE_DIVIDER = 2;
private final DatadogLogsSinkConnectorConfig config;
private static final Logger log = LoggerFactory.getLogger(DatadogLogsApiWriter.class);
private final Map<String, List<SinkRecord>> batches;
Expand Down Expand Up @@ -67,20 +73,26 @@ private void flushBatches() throws IOException {
}

private void sendBatch(String topic) throws IOException {
JsonArray content = formatBatch(topic);
if (content.size() == 0) {
List<JsonArray> contentList = formatBatch(topic);
if (contentList.size() == 0) {
log.debug("Nothing to send; Skipping the HTTP request.");
return;
}

URL url = config.getURL();

sendRequest(content, url);
for (JsonArray content: contentList) {
sendRequest(content, url);
}
}

private JsonArray formatBatch(String topic) {
private List<JsonArray> formatBatch(String topic) {
List<SinkRecord> sinkRecords = batches.get(topic);
JsonArray batchRecords = new JsonArray();
List<JsonArray> batchRecordsList = new ArrayList<>();

batchRecordsList.add(batchRecords);
int currentBatchListIndex = 0;

for (SinkRecord record : sinkRecords) {
if (record == null) {
Expand All @@ -93,10 +105,46 @@ private JsonArray formatBatch(String topic) {

JsonElement recordJSON = recordToJSON(record);
JsonObject message = populateMetadata(topic, recordJSON);
batchRecords.add(message);
JsonArray currentBatchRecords = batchRecordsList.get(currentBatchListIndex);

try {
if (hasBatchOverflowed(message, currentBatchRecords)) {
JsonArray emptyBatch = new JsonArray();

emptyBatch.add(message);
batchRecordsList.add(currentBatchListIndex++, emptyBatch);
} else {
currentBatchRecords.add(message);
}

} catch (SizeLimitExceededException ex) {
log.error("Single message exceeds size limit.", ex);
}
}
return batchRecordsList;
}

return batchRecords;
private boolean hasBatchOverflowed(JsonObject jsonRecord, JsonArray
batchRecords) throws SizeLimitExceededException {
String jsonRecordString = jsonRecord.toString();
UUID id = UUID.randomUUID();

batchRecords.add(jsonRecord);
String batchRecordsString = batchRecords.toString();
final byte[] batchRecordsStringBytes = batchRecordsString.getBytes(StandardCharsets.UTF_8);
batchRecords.remove(jsonRecord);

if (batchRecordsStringBytes.length / ONE_MEGABYTE >= MAXIMUM_BATCH_MEGABYTES) {
log.warn("Splitting batch because of size limits. Bytes of batch after new message was added: " + batchRecordsStringBytes.length + " id: " + id);
final byte[] jsonRecordStringBytes = jsonRecordString.getBytes(StandardCharsets.UTF_8);

if (batchRecords.size() == MINIMUM_LIST_LENGTH || jsonRecordStringBytes.length / ONE_MEGABYTE >= MAXIMUM_BATCH_MEGABYTES) {
throw new SizeLimitExceededException(String.format("Single message exceeds JSON size limit. " +
"Truncated message: %s, id: %s", jsonRecordString.substring(MINIMUM_STRING_LENGTH, jsonRecordString.length() / TRUNCATE_DIVIDER), id));
}
return true;
}
return false;
}

private JsonElement recordToJSON(SinkRecord record) {
Expand Down Expand Up @@ -151,14 +199,17 @@ private void sendRequest(JsonArray content, URL url) throws IOException {
int status = con.getResponseCode();
if (Response.Status.Family.familyOf(status) != Response.Status.Family.SUCCESSFUL) {
InputStream stream = con.getErrorStream();
UUID payloadErrorId = UUID.randomUUID();
String error = "";
if (stream != null ) {
error = getOutput(stream);
}

con.disconnect();
log.error(String.format("Data content for error id: %s, content: %s", payloadErrorId, con.getContent()));
throw new IOException("HTTP Response code: " + status
+ ", " + con.getResponseMessage() + ", " + error
+ ", Submitted payload: " + content);
+ " Error Id: " + payloadErrorId);
}

log.debug("Response code: " + status + ", " + con.getResponseMessage());
Expand Down

0 comments on commit 5abe70a

Please sign in to comment.