From c2e626a88bb2d2da715eba5f7f787fac46748a7e Mon Sep 17 00:00:00 2001 From: braydonk Date: Tue, 1 Oct 2024 13:07:14 +0000 Subject: [PATCH] config: add storage.chunk_max_size Add a configuration value for the storage chunk max size. Signed-off-by: braydonk --- include/fluent-bit/flb_config.h | 24 +++++++++++++----------- src/flb_config.c | 5 +++++ src/flb_input_log.c | 10 ++++++---- 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index 82003386d6e..7dfa4bf17c5 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -48,11 +48,11 @@ struct flb_config { int is_running; /* service running ? */ double flush; /* Flush timeout */ - /* - * Maximum grace time on shutdown. If set to -1, the engine will + /* + * Maximum grace time on shutdown. If set to -1, the engine will * shutdown when all remaining tasks are flushed */ - int grace; + int grace; int grace_count; /* Count of grace shutdown tries */ flb_pipefd_t flush_fd; /* Timer FD associated to flush */ int convert_nan_to_null; /* convert null to nan ? */ @@ -227,6 +227,7 @@ struct flb_config { char *storage_bl_mem_limit; /* storage backlog memory limit */ struct flb_storage_metrics *storage_metrics_ctx; /* storage metrics context */ int storage_trim_files; /* enable/disable file trimming */ + size_t storage_chunk_max_size; /* The max chunk size */ /* Embedded SQL Database support (SQLite3) */ #ifdef FLB_HAVE_SQLDB @@ -354,15 +355,16 @@ enum conf_type { #define FLB_CONF_DNS_PREFER_IPV6 "dns.prefer_ipv6" /* Storage / Chunk I/O */ -#define FLB_CONF_STORAGE_PATH "storage.path" -#define FLB_CONF_STORAGE_SYNC "storage.sync" -#define FLB_CONF_STORAGE_METRICS "storage.metrics" -#define FLB_CONF_STORAGE_CHECKSUM "storage.checksum" -#define FLB_CONF_STORAGE_BL_MEM_LIMIT "storage.backlog.mem_limit" -#define FLB_CONF_STORAGE_MAX_CHUNKS_UP "storage.max_chunks_up" +#define FLB_CONF_STORAGE_PATH "storage.path" +#define FLB_CONF_STORAGE_SYNC "storage.sync" +#define FLB_CONF_STORAGE_METRICS "storage.metrics" +#define FLB_CONF_STORAGE_CHECKSUM "storage.checksum" +#define FLB_CONF_STORAGE_BL_MEM_LIMIT "storage.backlog.mem_limit" +#define FLB_CONF_STORAGE_MAX_CHUNKS_UP "storage.max_chunks_up" #define FLB_CONF_STORAGE_DELETE_IRRECOVERABLE_CHUNKS \ - "storage.delete_irrecoverable_chunks" -#define FLB_CONF_STORAGE_TRIM_FILES "storage.trim_files" + "storage.delete_irrecoverable_chunks" +#define FLB_CONF_STORAGE_TRIM_FILES "storage.trim_files" +#define FLB_CONF_STORAGE_CHUNK_MAX_SIZE "storage.chunk_max_size" /* Coroutines */ #define FLB_CONF_STR_CORO_STACK_SIZE "Coro_Stack_Size" diff --git a/src/flb_config.c b/src/flb_config.c index 747d855cf08..89184201a1b 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -44,6 +44,7 @@ #include #include #include +#include const char *FLB_CONF_ENV_LOGLEVEL = "FLB_LOG_LEVEL"; @@ -154,6 +155,9 @@ struct flb_service_config service_configs[] = { {FLB_CONF_STORAGE_TRIM_FILES, FLB_CONF_TYPE_BOOL, offsetof(struct flb_config, storage_trim_files)}, + {FLB_CONF_STORAGE_CHUNK_MAX_SIZE, + FLB_CONF_TYPE_INT, + offsetof(struct flb_config, storage_chunk_max_size)}, /* Coroutines */ {FLB_CONF_STR_CORO_STACK_SIZE, @@ -278,6 +282,7 @@ struct flb_config *flb_config_init() config->storage_path = NULL; config->storage_input_plugin = NULL; config->storage_metrics = FLB_TRUE; + config->storage_chunk_max_size = FLB_INPUT_CHUNK_FS_MAX_SIZE; config->sched_cap = FLB_SCHED_CAP; config->sched_base = FLB_SCHED_BASE; diff --git a/src/flb_input_log.c b/src/flb_input_log.c index 9fc248f2020..f33ede87aeb 100644 --- a/src/flb_input_log.c +++ b/src/flb_input_log.c @@ -54,7 +54,8 @@ static void buffer_entry_destroy(struct buffer_entry *entry) { } static int split_buffer_entry(struct buffer_entry *entry, - struct mk_list *entries) + struct mk_list *entries, + int buf_entry_max_size) { int ret; int encoder_result; @@ -114,7 +115,7 @@ static int split_buffer_entry(struct buffer_entry *entry, continue; } - if (log_encoder.output_length >= FLB_INPUT_CHUNK_FS_MAX_SIZE) { + if (log_encoder.output_length >= buf_entry_max_size) { tmp_encoder_buf_size = log_encoder.output_length; tmp_encoder_buf = log_encoder.output_buffer; flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder); @@ -191,13 +192,14 @@ static int input_log_append(struct flb_input_instance *ins, if (buf_size > FLB_INPUT_CHUNK_FS_MAX_SIZE) { mk_list_init(&buffers); start_buffer = new_buffer_entry(buf, buf_size); - split_buffer_entry(start_buffer, &buffers); + split_buffer_entry(start_buffer, &buffers, ins->config->storage_chunk_max_size); flb_free(start_buffer); mk_list_foreach_safe(head, tmp, &buffers) { iter_buffer = mk_list_entry(head, struct buffer_entry, _head); records = flb_mp_count(iter_buffer->buf, iter_buffer->buf_size); ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records, - tag, tag_len, iter_buffer->buf, iter_buffer->buf_size); + tag, tag_len, + iter_buffer->buf, iter_buffer->buf_size); buffer_entry_destroy(iter_buffer); } } else {