Skip to content

Commit

Permalink
in_kafka: increase the poll-timeout if we run in own own thread
Browse files Browse the repository at this point in the history
having 1ms timeout might make sense if the input plugin is
running in the main thread (not introducing delay for others).
but if we run in our very own thread then we should not over-
ride the fetch.wait.max.ms configuration value from the
kafka-consumer.

this in conjuntion with using autocommit again boosts the
throuhput significantly.

Signed-off-by: CoreidCC <[email protected]>
  • Loading branch information
coreidcc committed Jan 9, 2025
1 parent 6ab3841 commit a360cae
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 1 deletion.
17 changes: 16 additions & 1 deletion plugins/in_kafka/in_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ static int in_kafka_collect(struct flb_input_instance *ins,
ret = FLB_EVENT_ENCODER_SUCCESS;

while (ret == FLB_EVENT_ENCODER_SUCCESS) {
rkm = rd_kafka_consumer_poll(ctx->kafka.rk, 1);
rkm = rd_kafka_consumer_poll(ctx->kafka.rk, ctx->poll_timeount_ms);

if (!rkm) {
break;
Expand Down Expand Up @@ -246,6 +246,21 @@ static int in_kafka_init(struct flb_input_instance *ins,
goto init_error;
}

/* Set the kafka poll timeout dependend on wether we run in our own
* or in the main event thread.
* a) run in main event thread:
* -> minimize the delay we might create
* b) run in our own thread:
* -> optimize for throuput and relay on 'fetch.wait.max.ms'
* which is set to 500 by default default. lets set it to
* twice that so that increasing fetch.wait.max.ms still
* has an effect.
*/
ctx->poll_timeount_ms = 1;
if(ins->is_threaded) {
ctx->poll_timeount_ms = 1000;
}

if (ctx->buffer_max_size > 0) {
ctx->polling_threshold = ctx->buffer_max_size;

Expand Down
1 change: 1 addition & 0 deletions plugins/in_kafka/in_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ struct flb_in_kafka_config {
size_t buffer_max_size; /* Maximum size of chunk allocation */
size_t polling_threshold;
bool enable_auto_commit;
int poll_timeount_ms;
};

#endif

0 comments on commit a360cae

Please sign in to comment.