Skip to content

Commit

Permalink
in_kafka: add support to switch to auto-commit
Browse files Browse the repository at this point in the history
Polling every 1ms and committing each message individually
results in rather pure performance in high volume Kafka
clusters.

Commiting in batches (relay on auto-commit of kafka)
drastically improves performance.

Signed-off-by: CoreidCC <[email protected]>
  • Loading branch information
coreidcc committed Jan 9, 2025
1 parent 09214eb commit 6ab3841
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
12 changes: 10 additions & 2 deletions plugins/in_kafka/in_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,11 @@ static int in_kafka_collect(struct flb_input_instance *ins,

rd_kafka_message_destroy(rkm);

/* TO-DO: commit the record based on `ret` */
rd_kafka_commit(ctx->kafka.rk, NULL, 0);

if(!ctx->enable_auto_commit) {
/* TO-DO: commit the record based on `ret` */
rd_kafka_commit(ctx->kafka.rk, NULL, 0);
}

/* Break from the loop when reaching the limit of polling if available */
if (ctx->polling_threshold != FLB_IN_KAFKA_UNLIMITED &&
Expand Down Expand Up @@ -428,6 +431,11 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, buffer_max_size),
"Set the maximum size of chunk"
},
{
FLB_CONFIG_MAP_BOOL, "enable_auto_commit", FLB_IN_KAFKA_ENABLE_AUTO_COMMIT,
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, enable_auto_commit),
"Rely on kafka auto-commit and commit messages in batches"
},
/* EOF */
{0}
};
Expand Down
2 changes: 2 additions & 0 deletions plugins/in_kafka/in_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#define FLB_IN_KAFKA_DEFAULT_FORMAT "none"
#define FLB_IN_KAFKA_UNLIMITED (size_t)-1
#define FLB_IN_KAFKA_BUFFER_MAX_SIZE "4M"
#define FLB_IN_KAFKA_ENABLE_AUTO_COMMIT "false"

enum {
FLB_IN_KAFKA_FORMAT_NONE,
Expand All @@ -48,6 +49,7 @@ struct flb_in_kafka_config {
int coll_fd;
size_t buffer_max_size; /* Maximum size of chunk allocation */
size_t polling_threshold;
bool enable_auto_commit;
};

#endif

0 comments on commit 6ab3841

Please sign in to comment.