From 74a23ab1e3a764a527910084803e1327f4e153f6 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Wed, 27 Sep 2023 10:55:33 -0300 Subject: [PATCH 1/6] input: refactor flb_input to use FLB_TLS macros for the libco parameters. Signed-off-by: Phillip Whelan --- include/fluent-bit/flb_input.h | 38 +++++++++++++--------------------- 1 file changed, 14 insertions(+), 24 deletions(-) diff --git a/include/fluent-bit/flb_input.h b/include/fluent-bit/flb_input.h index 1b4dff82254..d983ac80abb 100644 --- a/include/fluent-bit/flb_input.h +++ b/include/fluent-bit/flb_input.h @@ -456,14 +456,14 @@ struct flb_input_coro *flb_input_coro_create(struct flb_input_instance *ins, return input_coro; } -struct flb_libco_in_params { +struct flb_in_collect_params { struct flb_config *config; struct flb_input_collector *coll; struct flb_coro *coro; }; -extern pthread_key_t libco_in_param_key; -extern struct flb_libco_in_params libco_in_param; +extern FLB_TLS_DEFINE(struct flb_in_collect_params, in_collect_params); + void flb_input_coro_prepare_destroy(struct flb_input_coro *input_coro); static FLB_INLINE void input_params_set(struct flb_coro *coro, @@ -471,16 +471,16 @@ static FLB_INLINE void input_params_set(struct flb_coro *coro, struct flb_config *config, void *context) { - struct flb_libco_in_params *params; + struct flb_in_collect_params *params; - params = pthread_getspecific(libco_in_param_key); + params = (struct flb_in_collect_params *) FLB_TLS_GET(in_collect_params); if (params == NULL) { - params = flb_calloc(1, sizeof(struct flb_libco_in_params)); + params = flb_calloc(1, sizeof(struct flb_in_collect_params)); if (params == NULL) { flb_errno(); return; } - pthread_setspecific(libco_in_param_key, params); + FLB_TLS_SET(in_collect_params, params); } /* Set callback parameters */ @@ -495,16 +495,12 @@ static FLB_INLINE void input_pre_cb_collect(void) struct flb_input_collector *coll; struct flb_config *config; struct flb_coro *coro; - struct flb_libco_in_params *params; + struct flb_in_collect_params *params; - params = pthread_getspecific(libco_in_param_key); + params = (struct flb_in_collect_params *)FLB_TLS_GET(in_collect_params); if (params == NULL) { - params = flb_calloc(1, sizeof(struct flb_libco_in_params)); - if (params == NULL) { - flb_errno(); - return; - } - pthread_setspecific(libco_in_param_key, params); + flb_errno(); + return; } coll = params->coll; config = params->config; @@ -519,13 +515,6 @@ static FLB_INLINE void flb_input_coro_resume(struct flb_input_coro *input_coro) flb_coro_resume(input_coro->coro); } -static void libco_in_param_key_destroy(void *data) -{ - struct flb_libco_inparams *params = (struct flb_libco_inparams*)data; - - flb_free(params); -} - static FLB_INLINE struct flb_input_coro *flb_input_coro_collect(struct flb_input_collector *coll, struct flb_config *config) @@ -539,8 +528,6 @@ struct flb_input_coro *flb_input_coro_collect(struct flb_input_collector *coll, return NULL; } - pthread_key_create(&libco_in_param_key, libco_in_param_key_destroy); - coro = input_coro->coro; if (!coro) { return NULL; @@ -648,6 +635,9 @@ static inline int flb_input_config_map_set(struct flb_input_instance *ins, return ret; } +void flb_input_prepare(); +void flb_input_unprepare(); + int flb_input_register_all(struct flb_config *config); struct flb_input_instance *flb_input_new(struct flb_config *config, const char *input, void *data, From 9c94a7608f2292c30371a46fcb28e1b027825342 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Wed, 27 Sep 2023 10:56:18 -0300 Subject: [PATCH 2/6] input: initialize flb_input libco parameters using FLB_TLS macros. Signed-off-by: Phillip Whelan --- src/flb_input.c | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/src/flb_input.c b/src/flb_input.c index 1c4faad4d3f..3c577cb3709 100644 --- a/src/flb_input.c +++ b/src/flb_input.c @@ -49,8 +49,7 @@ #include #endif /* FLB_HAVE_CHUNK_TRACE */ -struct flb_libco_in_params libco_in_param; -pthread_key_t libco_in_param_key; +FLB_TLS_DEFINE(struct flb_in_collect_params, in_collect_params); #define protcmp(a, b) strncasecmp(a, b, strlen(a)) @@ -141,6 +140,23 @@ int flb_input_log_check(struct flb_input_instance *ins, int l) return FLB_TRUE; } +/* Prepare input co-routines for the thread. */ +void flb_input_prepare() +{ + FLB_TLS_INIT(in_collect_params); +} + +void flb_input_unprepare() +{ + struct flb_in_collect_params *params; + + params = (struct flb_in_collect_params *)FLB_TLS_GET(in_collect_params); + if (params) { + flb_free(params); + FLB_TLS_SET(in_collect_params, NULL); + } +} + /* Create an input plugin instance */ struct flb_input_instance *flb_input_new(struct flb_config *config, const char *input, void *data, @@ -1313,6 +1329,8 @@ void flb_input_exit_all(struct flb_config *config) /* destroy the instance */ flb_input_instance_destroy(ins); } + + flb_input_unprepare(); } /* Check that at least one Input is enabled */ From a829da95deede316e440330a5a863701a0c2ecad Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Wed, 27 Sep 2023 10:56:50 -0300 Subject: [PATCH 3/6] input: free flb_input libco parameters on engine exit. Signed-off-by: Phillip Whelan --- src/flb_lib.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/flb_lib.c b/src/flb_lib.c index 882faa54757..401c2e9b286 100644 --- a/src/flb_lib.c +++ b/src/flb_lib.c @@ -127,6 +127,7 @@ void flb_init_env() flb_upstream_init(); flb_downstream_init(); flb_output_prepare(); + flb_input_prepare(); FLB_TLS_INIT(flb_lib_active_context); FLB_TLS_INIT(flb_lib_active_cf_context); From 941a7afab3808873871265fa72e956c58d042330 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Wed, 27 Sep 2023 10:57:02 -0300 Subject: [PATCH 4/6] input: free flb_input libco parameters on input thread exit. Signed-off-by: Phillip Whelan --- src/flb_input_thread.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/flb_input_thread.c b/src/flb_input_thread.c index bf073296de2..72a31d173d6 100644 --- a/src/flb_input_thread.c +++ b/src/flb_input_thread.c @@ -459,6 +459,7 @@ static void input_thread(void *data) flb_bucket_queue_destroy(evl_bktq); flb_sched_destroy(sched); input_thread_instance_destroy(thi); + flb_input_unprepare(); } From 0abf858d9cc9a6bd7bd5e9b0435777631d05213c Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Mon, 30 Oct 2023 17:11:03 -0300 Subject: [PATCH 5/6] input: rename flb_input_prepare/flb_input_unprepare to flb_input_init/flb_input_exit. Signed-off-by: Phillip Whelan --- include/fluent-bit/flb_input.h | 4 ++-- src/flb_input.c | 6 +++--- src/flb_input_thread.c | 2 +- src/flb_lib.c | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/include/fluent-bit/flb_input.h b/include/fluent-bit/flb_input.h index d983ac80abb..27ae9bf9860 100644 --- a/include/fluent-bit/flb_input.h +++ b/include/fluent-bit/flb_input.h @@ -635,8 +635,8 @@ static inline int flb_input_config_map_set(struct flb_input_instance *ins, return ret; } -void flb_input_prepare(); -void flb_input_unprepare(); +void flb_input_init(); +void flb_input_exit(); int flb_input_register_all(struct flb_config *config); struct flb_input_instance *flb_input_new(struct flb_config *config, diff --git a/src/flb_input.c b/src/flb_input.c index 3c577cb3709..204e164aeec 100644 --- a/src/flb_input.c +++ b/src/flb_input.c @@ -141,12 +141,12 @@ int flb_input_log_check(struct flb_input_instance *ins, int l) } /* Prepare input co-routines for the thread. */ -void flb_input_prepare() +void flb_input_init() { FLB_TLS_INIT(in_collect_params); } -void flb_input_unprepare() +void flb_input_exit() { struct flb_in_collect_params *params; @@ -1330,7 +1330,7 @@ void flb_input_exit_all(struct flb_config *config) flb_input_instance_destroy(ins); } - flb_input_unprepare(); + flb_input_exit(); } /* Check that at least one Input is enabled */ diff --git a/src/flb_input_thread.c b/src/flb_input_thread.c index 72a31d173d6..2d656d41c20 100644 --- a/src/flb_input_thread.c +++ b/src/flb_input_thread.c @@ -459,7 +459,7 @@ static void input_thread(void *data) flb_bucket_queue_destroy(evl_bktq); flb_sched_destroy(sched); input_thread_instance_destroy(thi); - flb_input_unprepare(); + flb_input_exit(); } diff --git a/src/flb_lib.c b/src/flb_lib.c index 401c2e9b286..bf69a39eb3f 100644 --- a/src/flb_lib.c +++ b/src/flb_lib.c @@ -127,7 +127,7 @@ void flb_init_env() flb_upstream_init(); flb_downstream_init(); flb_output_prepare(); - flb_input_prepare(); + flb_input_init(); FLB_TLS_INIT(flb_lib_active_context); FLB_TLS_INIT(flb_lib_active_cf_context); From ef2f7fd36a37e68ff1368abc3cd6d1550085c711 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Mon, 6 Nov 2023 18:58:32 -0300 Subject: [PATCH 6/6] input: fix custom calyptia test (input/output initialization). Set coroutine parameters to NULL for input and output via their initializers. Signed-off-by: Phillip Whelan --- src/flb_input.c | 1 + src/flb_output.c | 1 + tests/runtime/custom_calyptia_test.c | 5 +++-- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/flb_input.c b/src/flb_input.c index 204e164aeec..6c77cac6c6c 100644 --- a/src/flb_input.c +++ b/src/flb_input.c @@ -144,6 +144,7 @@ int flb_input_log_check(struct flb_input_instance *ins, int l) void flb_input_init() { FLB_TLS_INIT(in_collect_params); + FLB_TLS_SET(in_collect_params, NULL); } void flb_input_exit() diff --git a/src/flb_output.c b/src/flb_output.c index b1548f60dfa..cbc39f74512 100644 --- a/src/flb_output.c +++ b/src/flb_output.c @@ -45,6 +45,7 @@ FLB_TLS_DEFINE(struct flb_out_flush_params, out_flush_params); void flb_output_prepare() { FLB_TLS_INIT(out_flush_params); + FLB_TLS_SET(out_flush_params, NULL); } /* Validate the the output address protocol */ diff --git a/tests/runtime/custom_calyptia_test.c b/tests/runtime/custom_calyptia_test.c index 5ad30e7a0bf..101b1386c76 100644 --- a/tests/runtime/custom_calyptia_test.c +++ b/tests/runtime/custom_calyptia_test.c @@ -19,6 +19,9 @@ void flb_custom_calyptia_pipeline_config_get_test() ctx = flb_create(); flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + flb_input_init(); + flb_output_prepare(); + in_ffd_dummy = flb_input(ctx, (char *) "dummy", NULL); TEST_CHECK(in_ffd_dummy >= 0); @@ -46,8 +49,6 @@ void flb_custom_calyptia_pipeline_config_get_test() cfg = custom_calyptia_pipeline_config_get(ctx->config); TEST_CHECK(strcmp(cfg, cfg_str) == 0); - // fix a thread local storage bug on macos - flb_output_prepare(); flb_sds_destroy(cfg); flb_destroy(ctx); }