From 6cb6a04768002db5f7341bd84a50283e1a13ba61 Mon Sep 17 00:00:00 2001 From: Jorge Niedbalski Date: Wed, 27 Nov 2024 22:06:24 +0100 Subject: [PATCH 1/2] out_calyptia: retry registering agent on flush. if register_retry_on_flush is set (default true), agent registration is retried on each flush callback. if set to false then registration will cause to abort the plugin initialisation. Signed-off-by: Jorge Niedbalski --- plugins/out_calyptia/calyptia.c | 205 +++++++++++++++++--------------- plugins/out_calyptia/calyptia.h | 1 + 2 files changed, 113 insertions(+), 93 deletions(-) diff --git a/plugins/out_calyptia/calyptia.c b/plugins/out_calyptia/calyptia.c index aa760e2fa5e..8f24fe81236 100644 --- a/plugins/out_calyptia/calyptia.c +++ b/plugins/out_calyptia/calyptia.c @@ -771,12 +771,40 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins, return ctx; } -static int cb_calyptia_init(struct flb_output_instance *ins, - struct flb_config *config, void *data) +static int register_agent(struct flb_calyptia *ctx, struct flb_config *config) { int ret; + + /* Try registration */ + ret = api_agent_create(config, ctx); + if (ret != FLB_OK) { + flb_plg_warn(ctx->ins, "agent registration failed"); + return FLB_ERROR; + } + + /* Update endpoints */ + flb_sds_len_set(ctx->metrics_endpoint, 0); + flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS, + ctx->agent_id); + +#ifdef FLB_HAVE_CHUNK_TRACE + if (ctx->pipeline_id) { + flb_sds_len_set(ctx->trace_endpoint, 0); + flb_sds_printf(&ctx->trace_endpoint, CALYPTIA_ENDPOINT_TRACE, + ctx->pipeline_id); + } +#endif + + flb_plg_info(ctx->ins, "agent registration successful"); + return FLB_OK; +} + +static int cb_calyptia_init(struct flb_output_instance *ins, + struct flb_config *config, void *data) +{ struct flb_calyptia *ctx; (void) data; + int ret; /* create config context */ ctx = config_init(ins, config); @@ -791,23 +819,12 @@ static int cb_calyptia_init(struct flb_output_instance *ins, */ flb_output_set_http_debug_callbacks(ins); - /* register/update agent */ - ret = api_agent_create(config, ctx); - if (ret != FLB_OK) { - flb_plg_error(ctx->ins, "agent registration failed"); + ret = register_agent(ctx, config); + if (ret != FLB_OK && !ctx->register_retry_on_flush) { + flb_plg_error(ins, "agent registration failed and register_retry_on_flush=false"); return -1; } - /* metrics endpoint */ - ctx->metrics_endpoint = flb_sds_create_size(256); - flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS, - ctx->agent_id); - -#ifdef FLB_HAVE_CHUNK_TRACE - ctx->trace_endpoint = flb_sds_create_size(256); - flb_sds_printf(&ctx->trace_endpoint, CALYPTIA_ENDPOINT_TRACE, - ctx->pipeline_id); -#endif /* FLB_HAVE_CHUNK_TRACE */ return 0; } @@ -830,22 +847,60 @@ static void debug_payload(struct flb_calyptia *ctx, void *data, size_t bytes) cmt_destroy(cmt); } -static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, - struct flb_output_flush *out_flush, - struct flb_input_instance *i_ins, - void *out_context, - struct flb_config *config) +static int cb_calyptia_exit(void *data, struct flb_config *config) { - int ret = FLB_RETRY; - size_t off = 0; - size_t out_size = 0; - char *out_buf = NULL; + struct flb_calyptia *ctx = data; + + if (!ctx) { + return 0; + } + + if (ctx->u) { + flb_upstream_destroy(ctx->u); + } + + if (ctx->agent_id) { + flb_sds_destroy(ctx->agent_id); + } + + if (ctx->agent_token) { + flb_sds_destroy(ctx->agent_token); + } + + if (ctx->env) { + flb_env_destroy(ctx->env); + } + + if (ctx->metrics_endpoint) { + flb_sds_destroy(ctx->metrics_endpoint); + } -/* used to create records for reporting traces to the cloud. */ #ifdef FLB_HAVE_CHUNK_TRACE - flb_sds_t json; + if (ctx->trace_endpoint) { + flb_sds_destroy(ctx->trace_endpoint); + } #endif /* FLB_HAVE_CHUNK_TRACE */ + if (ctx->fs) { + flb_fstore_destroy(ctx->fs); + } + + flb_kv_release(&ctx->kv_labels); + flb_free(ctx); + + return 0; +} + +static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *i_ins, + void *out_context, + struct flb_config *config) +{ + int ret; + size_t off = 0; + size_t out_size = 0; + char *out_buf = NULL; struct flb_connection *u_conn; struct flb_http_client *c = NULL; struct flb_calyptia *ctx = out_context; @@ -853,6 +908,17 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, (void) i_ins; (void) config; + if (!ctx->agent_id && ctx->register_retry_on_flush) { + flb_plg_info(ctx->ins, "agent_id not found and register_retry_on_flush=true, attempting registration"); + if (register_agent(ctx, config) != FLB_OK) { + FLB_OUTPUT_RETURN(FLB_RETRY); + } + } + else if (!ctx->agent_id) { + flb_plg_error(ctx->ins, "no agent_id available and register_retry_on_flush=false"); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + /* Get upstream connection */ u_conn = flb_upstream_conn_get(ctx->u); if (!u_conn) { @@ -890,7 +956,7 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, /* Compose HTTP Client request */ c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->metrics_endpoint, - out_buf, out_size, NULL, 0, NULL, 0); + out_buf, out_size, NULL, 0, NULL, 0); if (!c) { if (out_buf != event_chunk->data) { cmt_encode_msgpack_destroy(out_buf); @@ -899,12 +965,12 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, FLB_OUTPUT_RETURN(FLB_RETRY); } - /* perform request: 'ret' might be FLB_OK, FLB_ERROR or FLB_RETRY */ + /* perform request */ ret = calyptia_http_do(ctx, c, CALYPTIA_ACTION_METRICS); if (ret == FLB_OK) { flb_plg_debug(ctx->ins, "metrics delivered OK"); } - else if (ret == FLB_ERROR) { + else { flb_plg_error(ctx->ins, "could not deliver metrics"); debug_payload(ctx, out_buf, out_size); } @@ -916,41 +982,33 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, #ifdef FLB_HAVE_CHUNK_TRACE if (event_chunk->type == (FLB_EVENT_TYPE_LOGS | FLB_EVENT_TYPE_HAS_TRACE)) { - json = flb_pack_msgpack_to_json_format(event_chunk->data, - event_chunk->size, - FLB_PACK_JSON_FORMAT_STREAM, - FLB_PACK_JSON_DATE_DOUBLE, - NULL); + flb_sds_t json = flb_pack_msgpack_to_json_format(event_chunk->data, + event_chunk->size, + FLB_PACK_JSON_FORMAT_STREAM, + FLB_PACK_JSON_DATE_DOUBLE, + NULL); if (json == NULL) { flb_upstream_conn_release(u_conn); FLB_OUTPUT_RETURN(FLB_RETRY); } - out_buf = (char *)json; - out_size = flb_sds_len(json); - if (flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS, - ctx->agent_id) == NULL) { - flb_upstream_conn_release(u_conn); - flb_sds_destroy(json); - FLB_OUTPUT_RETURN(FLB_RETRY); - } c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->trace_endpoint, - out_buf, out_size, NULL, 0, NULL, 0); + (char *) json, flb_sds_len(json), + NULL, 0, NULL, 0); + if (!c) { flb_upstream_conn_release(u_conn); flb_sds_destroy(json); - flb_sds_destroy(ctx->metrics_endpoint); FLB_OUTPUT_RETURN(FLB_RETRY); } - /* perform request: 'ret' might be FLB_OK, FLB_ERROR or FLB_RETRY */ ret = calyptia_http_do(ctx, c, CALYPTIA_ACTION_TRACE); if (ret == FLB_OK) { flb_plg_debug(ctx->ins, "trace delivered OK"); } - else if (ret == FLB_ERROR) { + else { flb_plg_error(ctx->ins, "could not deliver trace"); - debug_payload(ctx, out_buf, out_size); + debug_payload(ctx, (char *) json, flb_sds_len(json)); } flb_sds_destroy(json); } @@ -961,51 +1019,8 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, if (c) { flb_http_client_destroy(c); } - FLB_OUTPUT_RETURN(ret); -} - -static int cb_calyptia_exit(void *data, struct flb_config *config) -{ - struct flb_calyptia *ctx = data; - - if (!ctx) { - return 0; - } - - if (ctx->u) { - flb_upstream_destroy(ctx->u); - } - if (ctx->agent_id) { - flb_sds_destroy(ctx->agent_id); - } - - if (ctx->agent_token) { - flb_sds_destroy(ctx->agent_token); - } - - if (ctx->env) { - flb_env_destroy(ctx->env); - } - - if (ctx->metrics_endpoint) { - flb_sds_destroy(ctx->metrics_endpoint); - } - -#ifdef FLB_HAVE_CHUNK_TRACE - if (ctx->trace_endpoint) { - flb_sds_destroy(ctx->trace_endpoint); - } -#endif /* FLB_HAVE_CHUNK_TRACE */ - - if (ctx->fs) { - flb_fstore_destroy(ctx->fs); - } - - flb_kv_release(&ctx->kv_labels); - flb_free(ctx); - - return 0; + FLB_OUTPUT_RETURN(ret); } /* Configuration properties map */ @@ -1057,7 +1072,11 @@ static struct flb_config_map config_map[] = { "Pipeline ID for calyptia core traces." }, #endif - + { + FLB_CONFIG_MAP_BOOL, "register_retry_on_flush", "true", + 0, FLB_TRUE, offsetof(struct flb_calyptia, register_retry_on_flush), + "Retry agent registration on flush if failed on init." + }, /* EOF */ {0} }; diff --git a/plugins/out_calyptia/calyptia.h b/plugins/out_calyptia/calyptia.h index ee37d8778dc..0532e4a2b01 100644 --- a/plugins/out_calyptia/calyptia.h +++ b/plugins/out_calyptia/calyptia.h @@ -80,6 +80,7 @@ struct flb_calyptia { flb_sds_t trace_endpoint; flb_sds_t pipeline_id; #endif /* FLB_HAVE_CHUNK_TRACE */ + bool register_retry_on_flush; /* retry registration on flush if failed */ }; #endif From a19a7076982fe740b3b861a847f857ec3b442b34 Mon Sep 17 00:00:00 2001 From: Jorge Niedbalski Date: Wed, 27 Nov 2024 22:08:09 +0100 Subject: [PATCH 2/2] custom_calyptia: cascade register_retry_on_flush variables. Signed-off-by: Jorge Niedbalski --- plugins/custom_calyptia/calyptia.c | 12 +- plugins/custom_calyptia/calyptia.h | 1 + plugins/out_calyptia/calyptia.c | 53 ++- tests/runtime/CMakeLists.txt | 38 ++- .../custom_calyptia_registration_retry_test.c | 313 ++++++++++++++++++ 5 files changed, 389 insertions(+), 28 deletions(-) create mode 100644 tests/runtime/custom_calyptia_registration_retry_test.c diff --git a/plugins/custom_calyptia/calyptia.c b/plugins/custom_calyptia/calyptia.c index 76f5868efdc..bfbb42f4767 100644 --- a/plugins/custom_calyptia/calyptia.c +++ b/plugins/custom_calyptia/calyptia.c @@ -293,6 +293,12 @@ static struct flb_output_instance *setup_cloud_output(struct flb_config *config, flb_output_set_property(cloud, "match", "_calyptia_cloud"); flb_output_set_property(cloud, "api_key", ctx->api_key); + if (ctx->register_retry_on_flush) { + flb_output_set_property(cloud, "register_retry_on_flush", "true"); + } else { + flb_output_set_property(cloud, "register_retry_on_flush", "false"); + } + if (ctx->store_path) { flb_output_set_property(cloud, "store_path", ctx->store_path); } @@ -585,7 +591,11 @@ static struct flb_config_map config_map[] = { "Pipeline ID for reporting to calyptia cloud." }, #endif /* FLB_HAVE_CHUNK_TRACE */ - + { + FLB_CONFIG_MAP_BOOL, "register_retry_on_flush", "true", + 0, FLB_TRUE, offsetof(struct calyptia, register_retry_on_flush), + "Retry agent registration on flush if failed on init." + }, /* EOF */ {0} }; diff --git a/plugins/custom_calyptia/calyptia.h b/plugins/custom_calyptia/calyptia.h index e1f4dd36770..b4313f51182 100644 --- a/plugins/custom_calyptia/calyptia.h +++ b/plugins/custom_calyptia/calyptia.h @@ -53,6 +53,7 @@ struct calyptia { flb_sds_t fleet_max_http_buffer_size; flb_sds_t fleet_interval_sec; flb_sds_t fleet_interval_nsec; + bool register_retry_on_flush; /* retry registration on flush if failed */ }; int set_fleet_input_properties(struct calyptia *ctx, struct flb_input_instance *fleet); diff --git a/plugins/out_calyptia/calyptia.c b/plugins/out_calyptia/calyptia.c index 8f24fe81236..c16fd31d6ee 100644 --- a/plugins/out_calyptia/calyptia.c +++ b/plugins/out_calyptia/calyptia.c @@ -322,8 +322,24 @@ static int calyptia_http_do(struct flb_calyptia *ctx, struct flb_http_client *c, int ret; size_t b_sent; + if( !ctx || !c ) { + return FLB_ERROR; + } + + /* Ensure agent_token is not empty when required */ + if ((type == CALYPTIA_ACTION_METRICS || type == CALYPTIA_ACTION_PATCH || type == CALYPTIA_ACTION_TRACE) && + !ctx->agent_token) { + flb_plg_warn(ctx->ins, "agent_token is missing for action type %d", type); + return FLB_ERROR; + } + /* append headers */ if (type == CALYPTIA_ACTION_REGISTER) { + // When registering a new agent api key is required + if (!ctx->api_key) { + flb_plg_error(ctx->ins, "api_key is missing"); + return FLB_ERROR; + } flb_http_add_header(c, CALYPTIA_H_CTYPE, sizeof(CALYPTIA_H_CTYPE) - 1, CALYPTIA_H_CTYPE_JSON, sizeof(CALYPTIA_H_CTYPE_JSON) - 1); @@ -721,6 +737,21 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins, return NULL; } + ctx->metrics_endpoint = flb_sds_create_size(256); + if (!ctx->metrics_endpoint) { + flb_free(ctx); + return NULL; + } + +#ifdef FLB_HAVE_CHUNK_TRACE + ctx->trace_endpoint = flb_sds_create_size(256); + if (!ctx->trace_endpoint) { + flb_sds_destroy(ctx->metrics_endpoint); + flb_free(ctx); + return NULL; + } +#endif + /* api_key */ if (!ctx->api_key) { flb_plg_error(ctx->ins, "configuration 'api_key' is missing"); @@ -905,17 +936,18 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, struct flb_http_client *c = NULL; struct flb_calyptia *ctx = out_context; struct cmt *cmt; + flb_sds_t json; (void) i_ins; (void) config; - if (!ctx->agent_id && ctx->register_retry_on_flush) { - flb_plg_info(ctx->ins, "agent_id not found and register_retry_on_flush=true, attempting registration"); + if ((!ctx->agent_id || !ctx->agent_token) && ctx->register_retry_on_flush) { + flb_plg_info(ctx->ins, "missing agent_id or agent_token, attempting re-registration register_retry_on_flush=true"); if (register_agent(ctx, config) != FLB_OK) { FLB_OUTPUT_RETURN(FLB_RETRY); } } - else if (!ctx->agent_id) { - flb_plg_error(ctx->ins, "no agent_id available and register_retry_on_flush=false"); + else if (!ctx->agent_id || !ctx->agent_token) { + flb_plg_error(ctx->ins, "missing agent_id or agent_token, and register_retry_on_flush=false"); FLB_OUTPUT_RETURN(FLB_ERROR); } @@ -981,12 +1013,13 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk, } #ifdef FLB_HAVE_CHUNK_TRACE - if (event_chunk->type == (FLB_EVENT_TYPE_LOGS | FLB_EVENT_TYPE_HAS_TRACE)) { - flb_sds_t json = flb_pack_msgpack_to_json_format(event_chunk->data, - event_chunk->size, - FLB_PACK_JSON_FORMAT_STREAM, - FLB_PACK_JSON_DATE_DOUBLE, - NULL); + if (event_chunk->type & FLB_EVENT_TYPE_LOGS && + event_chunk->type & FLB_EVENT_TYPE_HAS_TRACE) { + json = flb_pack_msgpack_to_json_format(event_chunk->data, + event_chunk->size, + FLB_PACK_JSON_FORMAT_STREAM, + FLB_PACK_JSON_DATE_DOUBLE, + NULL); if (json == NULL) { flb_upstream_conn_release(u_conn); FLB_OUTPUT_RETURN(FLB_RETRY); diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index f355294ed9f..09dce13c15e 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -61,29 +61,37 @@ if(FLB_OUT_LIB) endif() if (FLB_CUSTOM_CALYPTIA) - # Define common variables for calyptia tests set(CALYPTIA_TEST_LINK_LIBS fluent-bit-static ${CMAKE_THREAD_LIBS_INIT} ) - # Add calyptia input properties test - set(TEST_TARGET "flb-rt-calyptia_input_properties") - add_executable(${TEST_TARGET} + set(CALYPTIA_TESTS + "custom_calyptia_test.c" + "custom_calyptia_registration_retry_test.c" "custom_calyptia_input_test.c" - "../../plugins/custom_calyptia/calyptia.c" ) - target_link_libraries(${TEST_TARGET} - ${CALYPTIA_TEST_LINK_LIBS} - ) + foreach(TEST_SOURCE ${CALYPTIA_TESTS}) + get_filename_component(TEST_NAME ${TEST_SOURCE} NAME_WE) - add_test(NAME ${TEST_TARGET} - COMMAND ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${TEST_TARGET} - WORKING_DIRECTORY ${CMAKE_HOME_DIRECTORY}/build) + set(TEST_TARGET "flb-rt-${TEST_NAME}") + add_executable(${TEST_TARGET} + ${TEST_SOURCE} + "../../plugins/custom_calyptia/calyptia.c" + ) + + target_link_libraries(${TEST_TARGET} + ${CALYPTIA_TEST_LINK_LIBS} + ) - set_tests_properties(${TEST_TARGET} PROPERTIES LABELS "runtime") - add_dependencies(${TEST_TARGET} fluent-bit-static) + add_test(NAME ${TEST_TARGET} + COMMAND ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${TEST_TARGET} + WORKING_DIRECTORY ${CMAKE_HOME_DIRECTORY}/build) + + set_tests_properties(${TEST_TARGET} PROPERTIES LABELS "runtime") + add_dependencies(${TEST_TARGET} fluent-bit-static) + endforeach() endif() if(FLB_IN_EBPF) @@ -222,10 +230,6 @@ if(FLB_IN_LIB) endif() -if (FLB_CUSTOM_CALYPTIA) - FLB_RT_TEST(FLB_CUSTOM_CALYPTIA "custom_calyptia_test.c") -endif() - if (FLB_PROCESSOR_METRICS_SELECTOR) FLB_RT_TEST(FLB_PROCESSOR_METRICS_SELECTOR "processor_metrics_selector.c") endif() diff --git a/tests/runtime/custom_calyptia_registration_retry_test.c b/tests/runtime/custom_calyptia_registration_retry_test.c new file mode 100644 index 00000000000..db3ea101e25 --- /dev/null +++ b/tests/runtime/custom_calyptia_registration_retry_test.c @@ -0,0 +1,313 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include +#include +#include +#include +#include +#include +#include + +#include "flb_tests_runtime.h" + +#define MOCK_SERVER_HOST "127.0.0.1" +#define MOCK_SERVER_PORT 9876 + +static int registration_count = 0; + +static void mock_server_cb_empty_token(mk_request_t *request, void *data) +{ + registration_count++; + if (registration_count == 1) { + /* Use a local buffer with correct size */ + const char *response = "{\"id\":\"test-id\"}"; + size_t response_len = strlen(response); // Ensure size is accurate + + mk_http_status(request, 200); + mk_http_header(request, "Content-Type", sizeof("Content-Type") - 1, + "application/json", sizeof("application/json") - 1); + mk_http_send(request, response, response_len, NULL); // Use response_len + } else { + mk_http_status(request, 500); + mk_http_header(request, "Content-Type", sizeof("Content-Type") - 1, + "text/plain", sizeof("text/plain") - 1); + mk_http_send(request, "Internal Server Error", sizeof("Internal Server Error") - 1, NULL); + } + mk_http_done(request); +} + +static void mock_server_cb(mk_request_t *request, void *data) +{ + registration_count++; + mk_http_status(request, 500); + mk_http_header(request, "Content-Type", sizeof("Content-Type") - 1, + "text/plain", sizeof("text/plain") - 1); + mk_http_send(request, "Internal Server Error", sizeof("Internal Server Error") - 1, NULL); + mk_http_done(request); +} + +/* Test function */ +void test_calyptia_register_retry() +{ + flb_ctx_t *ctx; + int ret; + int in_ffd; + mk_ctx_t *mock_ctx; + int vid; + char tmp[256]; + struct flb_custom_instance *calyptia; + + /* Reset registration count */ + registration_count = 0; + + /* Init mock server */ + mock_ctx = mk_create(); + TEST_CHECK(mock_ctx != NULL); + + /* Compose listen address */ + snprintf(tmp, sizeof(tmp) - 1, "%s:%d", MOCK_SERVER_HOST, MOCK_SERVER_PORT); + ret = mk_config_set(mock_ctx, "Listen", tmp, NULL); + TEST_CHECK(ret == 0); + + vid = mk_vhost_create(mock_ctx, NULL); + TEST_CHECK(vid >= 0); + + ret = mk_vhost_handler(mock_ctx, vid, "/v1/agents", mock_server_cb, NULL); + TEST_CHECK(ret == 0); + + ret = mk_vhost_handler(mock_ctx, vid, "/v1/agents/test-id", mock_server_cb, NULL); + TEST_CHECK(ret == 0); + + ret = mk_start(mock_ctx); + TEST_CHECK(ret == 0); + + flb_time_msleep(500); // Allow the mock server to initialize + + /* Init Fluent Bit context */ + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + ret = flb_service_set(ctx, + "Log_Level", "debug", + NULL); + TEST_CHECK(ret == 0); + + /* Create dummy input */ + in_ffd = flb_input(ctx, (char *)"dummy", NULL); + TEST_CHECK(in_ffd >= 0); + + /* Create custom Calyptia plugin */ + calyptia = flb_custom_new(ctx->config, (char *)"calyptia", NULL); + TEST_CHECK(calyptia != NULL); + + /* Set custom plugin properties */ + flb_custom_set_property(calyptia, "api_key", "test-key"); + flb_custom_set_property(calyptia, "log_level", "debug"); + flb_custom_set_property(calyptia, "add_label", "pipeline_id test-pipeline-id"); + flb_custom_set_property(calyptia, "calyptia_host", MOCK_SERVER_HOST); + flb_custom_set_property(calyptia, "calyptia_port", "9876"); + flb_custom_set_property(calyptia, "register_retry_on_flush", "true"); + flb_custom_set_property(calyptia, "calyptia_tls", "off"); + flb_custom_set_property(calyptia, "calyptia_tls.verify", "off"); + + /* Start the engine */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* First registration attempt should have failed */ + TEST_CHECK(registration_count == 1); + + flb_time_msleep(1000); + flb_lib_push(ctx, in_ffd, "{\"key\":\"val\"}", 13); + + /* Wait for processing */ + flb_time_msleep(10000); + TEST_CHECK(registration_count > 1); + + /* Cleanup */ + flb_stop(ctx); + flb_destroy(ctx); + mk_stop(mock_ctx); + mk_destroy(mock_ctx); +} + +static void test_calyptia_register_retry_empty_token() +{ + flb_ctx_t *ctx; + int ret; + int in_ffd; + mk_ctx_t *mock_ctx; + int vid; + char tmp[256]; + struct flb_custom_instance *calyptia; + + /* Reset registration count */ + registration_count = 0; + + /* Init mock server */ + mock_ctx = mk_create(); + TEST_CHECK(mock_ctx != NULL); + + /* Compose listen address */ + snprintf(tmp, sizeof(tmp) - 1, "%s:%d", MOCK_SERVER_HOST, MOCK_SERVER_PORT); + ret = mk_config_set(mock_ctx, "Listen", tmp, NULL); + TEST_CHECK(ret == 0); + + vid = mk_vhost_create(mock_ctx, NULL); + TEST_CHECK(vid >= 0); + + ret = mk_vhost_handler(mock_ctx, vid, "/v1/agents", mock_server_cb_empty_token, NULL); + TEST_CHECK(ret == 0); + + ret = mk_vhost_handler(mock_ctx, vid, "/v1/agents/test-id", mock_server_cb_empty_token, NULL); + TEST_CHECK(ret == 0); + + ret = mk_start(mock_ctx); + TEST_CHECK(ret == 0); + + flb_time_msleep(500); // Allow the mock server to initialize + + /* Init Fluent Bit context */ + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + ret = flb_service_set(ctx, + "Log_Level", "debug", + NULL); + TEST_CHECK(ret == 0); + + /* Create dummy input */ + in_ffd = flb_input(ctx, (char *)"dummy", NULL); + TEST_CHECK(in_ffd >= 0); + + /* Create custom Calyptia plugin */ + calyptia = flb_custom_new(ctx->config, (char *)"calyptia", NULL); + TEST_CHECK(calyptia != NULL); + + /* Set custom plugin properties */ + flb_custom_set_property(calyptia, "api_key", "test-key"); + flb_custom_set_property(calyptia, "log_level", "debug"); + flb_custom_set_property(calyptia, "add_label", "pipeline_id test-pipeline-id"); + flb_custom_set_property(calyptia, "calyptia_host", MOCK_SERVER_HOST); + flb_custom_set_property(calyptia, "calyptia_port", "9876"); + flb_custom_set_property(calyptia, "register_retry_on_flush", "false"); + flb_custom_set_property(calyptia, "calyptia_tls", "off"); + flb_custom_set_property(calyptia, "calyptia_tls.verify", "off"); + + /* Start the engine */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* First registration should be successful but with an empty token */ + TEST_CHECK(registration_count == 1); + + /* Push some data to trigger flush */ + flb_time_msleep(1000); + flb_lib_push(ctx, in_ffd, "{\"key\":\"val\"}", 13); + + /* Wait for processing */ + flb_time_msleep(10000); + + /* Verify the plugin fails due to empty token */ + TEST_CHECK(registration_count == 1); + + /* Cleanup */ + flb_stop(ctx); + flb_destroy(ctx); + mk_stop(mock_ctx); + mk_destroy(mock_ctx); +} + +static void test_calyptia_register_retry_empty_token_retry_true() +{ + flb_ctx_t *ctx; + int ret; + int in_ffd; + mk_ctx_t *mock_ctx; + int vid; + char tmp[256]; + struct flb_custom_instance *calyptia; + + /* Reset registration count */ + registration_count = 0; + + /* Init mock server */ + mock_ctx = mk_create(); + TEST_CHECK(mock_ctx != NULL); + + /* Compose listen address */ + snprintf(tmp, sizeof(tmp) - 1, "%s:%d", MOCK_SERVER_HOST, MOCK_SERVER_PORT); + ret = mk_config_set(mock_ctx, "Listen", tmp, NULL); + TEST_CHECK(ret == 0); + + vid = mk_vhost_create(mock_ctx, NULL); + TEST_CHECK(vid >= 0); + + ret = mk_vhost_handler(mock_ctx, vid, "/v1/agents", mock_server_cb_empty_token, NULL); + TEST_CHECK(ret == 0); + + ret = mk_vhost_handler(mock_ctx, vid, "/v1/agents/test-id", mock_server_cb_empty_token, NULL); + TEST_CHECK(ret == 0); + + ret = mk_start(mock_ctx); + TEST_CHECK(ret == 0); + + flb_time_msleep(500); // Allow the mock server to initialize + + /* Init Fluent Bit context */ + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + + ret = flb_service_set(ctx, + "Log_Level", "debug", + NULL); + TEST_CHECK(ret == 0); + + /* Create dummy input */ + in_ffd = flb_input(ctx, (char *)"dummy", NULL); + TEST_CHECK(in_ffd >= 0); + + /* Create custom Calyptia plugin */ + calyptia = flb_custom_new(ctx->config, (char *)"calyptia", NULL); + TEST_CHECK(calyptia != NULL); + + /* Set custom plugin properties */ + flb_custom_set_property(calyptia, "api_key", "test-key"); + flb_custom_set_property(calyptia, "log_level", "debug"); + flb_custom_set_property(calyptia, "add_label", "pipeline_id test-pipeline-id"); + flb_custom_set_property(calyptia, "calyptia_host", MOCK_SERVER_HOST); + flb_custom_set_property(calyptia, "calyptia_port", "9876"); + flb_custom_set_property(calyptia, "register_retry_on_flush", "true"); + flb_custom_set_property(calyptia, "calyptia_tls", "off"); + flb_custom_set_property(calyptia, "calyptia_tls.verify", "off"); + + /* Start the engine */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* First registration should be successful but with an empty token */ + TEST_CHECK(registration_count == 1); + + /* Push some data to trigger flush */ + flb_time_msleep(1000); + flb_lib_push(ctx, in_ffd, "{\"key\":\"val\"}", 13); + + /* Wait for processing */ + flb_time_msleep(10000); + + /* Verify the plugin fails due to empty token */ + TEST_CHECK(registration_count > 1); + + /* Cleanup */ + flb_stop(ctx); + flb_destroy(ctx); + mk_stop(mock_ctx); + mk_destroy(mock_ctx); +} + +TEST_LIST = { + {"register_retry", test_calyptia_register_retry}, + {"register_retry_empty_token", test_calyptia_register_retry_empty_token}, + {"register_retry_empty_token_retry_true", test_calyptia_register_retry_empty_token_retry_true}, + {NULL, NULL} +}; \ No newline at end of file