Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datadog Sink Connector issue #10 fix #32

Conversation

andwarnbros
Copy link

Ritchie Brothers Auctioneers fix for messages <5MB causing IOException. #10

What does this PR do?

Hello I am a tech lead engineer from Ritchie Bros Auctioneers.
This PR is a suggested fix for issue: Truncate messages if compressed size exceeds 5MB.
Instead of sending logs >5mb, when a new log is added to the batch the bytes are checked to see if had reached a new limit, the limit is below 3MB. Instead of sending one oversized batch like it does today, a new batch is created for the rest of the messages in the topic waiting for processing. If the new, new batch exceeds 3MB, another batch is created again. This process can continue until all messages are read. Once all messages are read each batch in the list of batches is then sent to Datadogs API.

The content of the IOException was also removed because the content had the complete over sized message. Kafkas default connector log size is around 1MB.

Corner cases:
If one message exceeds the limit, exception is thrown but caught. We didnt want to shutdown the connector for this issue. The exception is logged as an error, along with the truncated message.


Motivation

Truncate messages if compressed size exceeds 5MB. Suggests trunkating messages, but we prefer to have all messages intact, and just logging the offending message if a single message is above the limit.
Also increasing the Kafka log size limit would have negative impacts on our system. So we removed the content in the IOException.

Our connectors were constantly being shutdown when we try to send a batch of large messages exceeding 5MB to Datadog API. Kafka then tries to log the error from Datadog but Kafka logging limit is around 1MB this would also cause problems in our system. We did not want to increase the Kafka limit because processing larger messages might have a performance impact.


Additional Notes

Also of the limits are hardcoded. Feel free to make an suggestions

Ritchie Brothers Auctioneers fix for messages <5MB causing IOException.
DataDog#10
@gh123man gh123man self-requested a review December 20, 2022 18:17
Copy link
Contributor

@jszwedko jszwedko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the long delay in review of this @andwarnbros . Given how long it has been it seems unlikely to me that you'd still be interested in pushing this forward, but feel free to reopen in the off-chance that you are 🙏 Thanks for this attempt at addressing the issue.


return batchRecords;
private boolean hasBatchOverflowed(JsonObject jsonRecord, JsonArray
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned that this implementation is fairly inefficient in that it serializes the batch to determine if it is oversized, but then throws that serialization away. I think serialization is indeed required to determine if the batch is oversized, but I'd like to see the implementation use the serialized batch to make the common case, where it doesn't need to be split, more efficient.

import javax.ws.rs.core.Response;

public class DatadogLogsApiWriter {
public static final int ONE_MEGABYTE = 1000000;
public static final int MAXIMUM_BATCH_MEGABYTES = 3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 3 instead of 5?

Comment on lines +137 to +146
if (batchRecordsStringBytes.length / ONE_MEGABYTE >= MAXIMUM_BATCH_MEGABYTES) {
log.warn("Splitting batch because of size limits. Bytes of batch after new message was added: " + batchRecordsStringBytes.length + " id: " + id);
final byte[] jsonRecordStringBytes = jsonRecordString.getBytes(StandardCharsets.UTF_8);

if (batchRecords.size() == MINIMUM_LIST_LENGTH || jsonRecordStringBytes.length / ONE_MEGABYTE >= MAXIMUM_BATCH_MEGABYTES) {
throw new SizeLimitExceededException(String.format("Single message exceeds JSON size limit. " +
"Truncated message: %s, id: %s", jsonRecordString.substring(MINIMUM_STRING_LENGTH, jsonRecordString.length() / TRUNCATE_DIVIDER), id));
}
return true;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be slightly more efficient by storing a constant of the max batch size in bytes rather than needing to do division.

@jszwedko jszwedko closed this Apr 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants