From f7311cbd3f72d0aeda5057565fde77dc2556587e Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Wed, 13 Sep 2023 22:53:41 -0300 Subject: [PATCH 01/20] in_calyptia_fleet: add new helper functions for obtaining results from json. Signed-off-by: Phillip Whelan --- plugins/in_calyptia_fleet/in_calyptia_fleet.c | 261 +++++++++--------- 1 file changed, 137 insertions(+), 124 deletions(-) diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index 7dfa1df6fdf..cf2d9dc2f0e 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -474,6 +474,53 @@ static char *tls_setting_string(int use_tls) return "Off"; } +static msgpack_object *msgpack_lookup_map_key(msgpack_object *obj, const char *keyname) +{ + int idx; + msgpack_object_kv *cur; + msgpack_object_str *key; + + if (obj == NULL || keyname == NULL) { + return NULL; + } + + if (obj->type != MSGPACK_OBJECT_MAP) { + return NULL; + } + + for (idx = 0; idx < obj->via.map.size; idx++) { + cur = &obj->via.map.ptr[idx]; + if (cur->key.type != MSGPACK_OBJECT_STR) { + continue; + } + + key = &cur->key.via.str; + + if (strncmp(key->ptr, keyname, key->size) == 0) { + return &cur->val; + } + } + + return NULL; +} + +static msgpack_object *msgpack_lookup_array_offset(msgpack_object *obj, size_t offset) +{ + if (obj == NULL) { + return NULL; + } + + if (obj->type != MSGPACK_OBJECT_ARRAY) { + return NULL; + } + + if (obj->via.array.size <= offset) { + return NULL; + } + + return &obj->via.array.ptr[offset]; +} + static flb_sds_t parse_api_key_json(struct flb_in_calyptia_fleet_config *ctx, char *payload, size_t size) { @@ -483,10 +530,8 @@ static flb_sds_t parse_api_key_json(struct flb_in_calyptia_fleet_config *ctx, struct flb_pack_state pack_state; size_t off = 0; msgpack_unpacked result; - msgpack_object_kv *cur; - msgpack_object_str *key; - flb_sds_t project_id; - int idx = 0; + msgpack_object *projectID; + flb_sds_t project_id = NULL; /* Initialize packer */ flb_pack_state_init(&pack_state); @@ -497,49 +542,36 @@ static flb_sds_t parse_api_key_json(struct flb_in_calyptia_fleet_config *ctx, flb_pack_state_reset(&pack_state); /* Handle exceptions */ - if (ret == FLB_ERR_JSON_PART) { - flb_plg_warn(ctx->ins, "JSON data is incomplete, skipping"); - return NULL; - } - else if (ret == FLB_ERR_JSON_INVAL) { + if (ret == FLB_ERR_JSON_PART || ret == FLB_ERR_JSON_INVAL || ret == -1) { flb_plg_warn(ctx->ins, "invalid JSON message, skipping"); return NULL; } - else if (ret == -1) { - return NULL; - } msgpack_unpacked_init(&result); while (msgpack_unpack_next(&result, pack, out_size, &off) == MSGPACK_UNPACK_SUCCESS) { + projectID = msgpack_lookup_map_key(&result.data, "ProjectID"); - if (result.data.type == MSGPACK_OBJECT_MAP) { - for (idx = 0; idx < result.data.via.map.size; idx++) { - cur = &result.data.via.map.ptr[idx]; - key = &cur->key.via.str; - - if (strncmp(key->ptr, "ProjectID", key->size) == 0) { - - if (cur->val.type != MSGPACK_OBJECT_STR) { - flb_plg_error(ctx->ins, "unable to find fleet by name"); - msgpack_unpacked_destroy(&result); - return NULL; - } - - project_id = flb_sds_create_len(cur->val.via.str.ptr, - cur->val.via.str.size); - msgpack_unpacked_destroy(&result); - flb_free(pack); + if (projectID == NULL) { + flb_plg_error(ctx->ins, "unable to find fleet by name"); + msgpack_unpacked_destroy(&result); + return NULL; + } - return project_id; - } - } + if (projectID->type != MSGPACK_OBJECT_STR) { + flb_plg_error(ctx->ins, "invalid fleet ID"); + msgpack_unpacked_destroy(&result); + return NULL; } + + project_id = flb_sds_create_len(projectID->via.str.ptr, + projectID->via.str.size); + break; } msgpack_unpacked_destroy(&result); flb_free(pack); - return NULL; + return project_id; } static ssize_t parse_fleet_search_json(struct flb_in_calyptia_fleet_config *ctx, @@ -551,10 +583,8 @@ static ssize_t parse_fleet_search_json(struct flb_in_calyptia_fleet_config *ctx, struct flb_pack_state pack_state; size_t off = 0; msgpack_unpacked result; - msgpack_object_array *results; - msgpack_object_kv *cur; - msgpack_object_str *key; - int idx = 0; + msgpack_object *map; + msgpack_object *fleet; /* Initialize packer */ flb_pack_state_init(&pack_state); @@ -565,48 +595,31 @@ static ssize_t parse_fleet_search_json(struct flb_in_calyptia_fleet_config *ctx, flb_pack_state_reset(&pack_state); /* Handle exceptions */ - if (ret == FLB_ERR_JSON_PART) { - flb_plg_warn(ctx->ins, "JSON data is incomplete, skipping"); - return -1; - } - else if (ret == FLB_ERR_JSON_INVAL) { + if (ret == FLB_ERR_JSON_PART || ret == FLB_ERR_JSON_INVAL || ret == -1) { flb_plg_warn(ctx->ins, "invalid JSON message, skipping"); return -1; } - else if (ret == -1) { - return -1; - } msgpack_unpacked_init(&result); while (msgpack_unpack_next(&result, pack, out_size, &off) == MSGPACK_UNPACK_SUCCESS) { + map = msgpack_lookup_array_offset(&result.data, 0); + if (map == NULL) { + break; + } - if (result.data.type == MSGPACK_OBJECT_ARRAY) { - results = &result.data.via.array; - - if (results->ptr[0].type == MSGPACK_OBJECT_MAP) { - - for (idx = 0; idx < results->ptr[0].via.map.size; idx++) { - cur = &results->ptr[0].via.map.ptr[idx]; - key = &cur->key.via.str; - - if (strncasecmp(key->ptr, "id", key->size) == 0) { - - if (cur->val.type != MSGPACK_OBJECT_STR) { - flb_plg_error(ctx->ins, "unable to find fleet by name"); - msgpack_unpacked_destroy(&result); - return -1; - } + fleet = msgpack_lookup_map_key(map, "ID"); + if (fleet == NULL) { + flb_plg_error(ctx->ins, "unable to find fleet by name"); + break; + } - ctx->fleet_id_found = 1; - ctx->fleet_id = flb_sds_create_len(cur->val.via.str.ptr, - cur->val.via.str.size); - break; - } - break; - } - break; - } + if (fleet->type != MSGPACK_OBJECT_STR) { + flb_plg_error(ctx->ins, "unable to find fleet by name"); + break; } + + ctx->fleet_id = flb_sds_create_len(fleet->via.str.ptr, fleet->via.str.size); + break; } msgpack_unpacked_destroy(&result); @@ -619,25 +632,19 @@ static ssize_t parse_fleet_search_json(struct flb_in_calyptia_fleet_config *ctx, return 0; } -static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ctx, - struct flb_connection *u_conn, - struct flb_config *config) +static flb_sds_t get_project_id_from_api_key(struct flb_in_calyptia_fleet_config *ctx) { - struct flb_http_client *client; - flb_sds_t url; - flb_sds_t project_id; - unsigned char token[512] = {0}; unsigned char encoded[256]; - size_t elen; - size_t tlen; + unsigned char token[512] = {0}; char *api_token_sep; - size_t b_sent; + size_t tlen; + size_t elen; int ret; api_token_sep = strchr(ctx->api_key, '.'); if (api_token_sep == NULL) { - return -1; + return NULL; } elen = api_token_sep-ctx->api_key; @@ -645,7 +652,7 @@ static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ct if (elen > sizeof(encoded)) { flb_plg_error(ctx->ins, "API Token is too large"); - return -1; + return NULL; } memset(encoded, '=', sizeof(encoded)); @@ -655,25 +662,26 @@ static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ct encoded, elen); if (ret != 0) { - return ret; + return NULL; } - project_id = parse_api_key_json(ctx, (char *)token, tlen); - - if (project_id == NULL) { - return -1; - } + return parse_api_key_json(ctx, (char *)token, tlen); +} - url = flb_sds_create_size(4096); - flb_sds_printf(&url, "/v1/search?project_id=%s&resource=fleet&term=%s", - project_id, ctx->fleet_name); +static struct flb_http_client *fleet_http_do(struct flb_in_calyptia_fleet_config *ctx, + struct flb_connection *u_conn, + flb_sds_t url) +{ + struct flb_http_client *client; + size_t b_sent; + int ret = -1; client = flb_http_client(u_conn, FLB_HTTP_GET, url, NULL, 0, ctx->ins->host.name, ctx->ins->host.port, NULL, 0); if (!client) { flb_plg_error(ctx->ins, "unable to create http client"); - return -1; + goto http_client_error; } flb_http_buffer_size(client, 8192); @@ -686,19 +694,49 @@ static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ct if (ret != 0) { flb_plg_error(ctx->ins, "http do error"); - flb_http_client_destroy(client); - return -1; + goto http_do_error; } if (client->resp.status != 200) { flb_plg_error(ctx->ins, "search http status code error: %d", client->resp.status); - flb_http_client_destroy(client); - return -1; + goto http_do_error; } if (client->resp.payload_size <= 0) { flb_plg_error(ctx->ins, "empty response"); flb_http_client_destroy(client); + goto http_do_error; + } + + return client; + +http_do_error: + flb_http_client_destroy(client); +http_client_error: + return NULL; +} + +static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ctx, + struct flb_connection *u_conn, + struct flb_config *config) +{ + struct flb_http_client *client; + flb_sds_t url; + flb_sds_t project_id; + + project_id = get_project_id_from_api_key(ctx); + + if (project_id == NULL) { + return -1; + } + + url = flb_sds_create_size(4096); + flb_sds_printf(&url, "/v1/search?project_id=%s&resource=fleet&term=%s", + project_id, ctx->fleet_name); + + client = fleet_http_do(ctx, u_conn, url); + + if (!client) { return -1; } @@ -708,12 +746,12 @@ static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ct return -1; } + flb_http_client_destroy(client); + if (ctx->fleet_id == NULL) { - flb_http_client_destroy(client); return -1; } - flb_http_client_destroy(client); return 0; } @@ -877,38 +915,13 @@ static int in_calyptia_fleet_collect(struct flb_input_instance *ins, flb_sds_printf(&ctx->fleet_url, "/v1/fleets/%s/config?format=ini", ctx->fleet_id); } - client = flb_http_client(u_conn, FLB_HTTP_GET, ctx->fleet_url, - NULL, 0, - ctx->ins->host.name, ctx->ins->host.port, NULL, 0); + client = fleet_http_do(ctx, u_conn, ctx->fleet_url); if (!client) { flb_plg_error(ins, "unable to create http client"); goto client_error; } - flb_http_buffer_size(client, 8192); - - flb_http_add_header(client, - CALYPTIA_H_PROJECT, sizeof(CALYPTIA_H_PROJECT) - 1, - ctx->api_key, flb_sds_len(ctx->api_key)); - - ret = flb_http_do(client, &b_sent); - - if (ret != 0) { - flb_plg_error(ins, "http do error"); - goto http_error; - } - - if (client->resp.status != 200) { - flb_plg_error(ins, "http status code error: %d", client->resp.status); - goto http_error; - } - - if (client->resp.payload_size <= 0) { - flb_plg_error(ins, "empty response"); - goto http_error; - } - /* copy and NULL terminate the payload */ data = flb_sds_create_size(client->resp.payload_size + 1); From 83f15f08f9e02d84af00f22bf6cac22f4c57ad34 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Mon, 25 Sep 2023 17:02:22 -0300 Subject: [PATCH 02/20] in_calyptia_fleet: add functions to better handle the different configuration versions. Signed-off-by: Phillip Whelan --- plugins/in_calyptia_fleet/in_calyptia_fleet.c | 410 ++++++++++-------- 1 file changed, 241 insertions(+), 169 deletions(-) diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index cf2d9dc2f0e..c8325c5681d 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -370,6 +370,19 @@ static int exists_cur_fleet_config(struct flb_in_calyptia_fleet_config *ctx) return ret; } +static int exists_old_fleet_config(struct flb_in_calyptia_fleet_config *ctx) +{ + flb_sds_t cfgoldname; + int ret = FLB_FALSE; + + + cfgoldname = old_fleet_config_filename(ctx); + ret = access(cfgoldname, F_OK) == 0 ? FLB_TRUE : FLB_FALSE; + + flb_sds_destroy(cfgoldname); + return ret; +} + static void *do_reload(void *data) { struct reload_ctx *reload = (struct reload_ctx *)data; @@ -399,7 +412,7 @@ static int test_config_is_valid(flb_sds_t cfgpath) if (conf == NULL) { goto config_init_error; - } + } conf = flb_cf_create_from_file(conf, cfgpath); @@ -868,216 +881,275 @@ static int create_fleet_directory(struct flb_in_calyptia_fleet_config *ctx) } /* cb_collect callback */ -static int in_calyptia_fleet_collect(struct flb_input_instance *ins, - struct flb_config *config, - void *in_context) -{ - struct flb_in_calyptia_fleet_config *ctx = in_context; - struct flb_connection *u_conn; +static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx, + struct flb_connection *u_conn, + const char *url, + const char *hdr, + const char *dst, + time_t *time_last_modified) + { struct flb_http_client *client; - flb_sds_t cfgname; - flb_sds_t cfgnewname; - flb_sds_t cfgoldname; - flb_sds_t cfgcurname; - flb_sds_t header = NULL; - flb_sds_t hdr; - FILE *cfgfp; + char fname[4096] = { 0 }; + size_t len; + FILE *fp; + int ret = -1; const char *fbit_last_modified; - int fbit_last_modified_len; struct flb_tm tm_last_modified = { 0 }; - time_t time_last_modified; - char *data = NULL; - size_t b_sent; - int ret = -1; -#ifdef FLB_SYSTEM_WINDOWS - DWORD err; - LPSTR lpMsg; -#endif + int fbit_last_modified_len; + time_t last_modified; - u_conn = flb_upstream_conn_get(ctx->u); + if (ctx == NULL || u_conn == NULL || url == NULL || dst == NULL) { + return -1; + } - if (!u_conn) { - flb_plg_error(ctx->ins, "could not get an upstream connection to %s:%u", - ctx->ins->host.name, ctx->ins->host.port); - goto conn_error; + client = fleet_http_do(ctx, u_conn, ctx->fleet_url); + + if (client == NULL) { + return -1; } - if (ctx->fleet_id == NULL) { + ret = case_header_lookup(client, "Last-modified", strlen("Last-modified"), + &fbit_last_modified, &fbit_last_modified_len); - if (get_calyptia_fleet_id_by_name(ctx, u_conn, config) == -1) { - flb_plg_error(ctx->ins, "unable to find fleet: %s", ctx->fleet_name); - goto conn_error; - } + if (ret < 0) { + goto client_error; } - if (ctx->fleet_url == NULL) { - ctx->fleet_url = flb_sds_create_size(4096); - flb_sds_printf(&ctx->fleet_url, "/v1/fleets/%s/config?format=ini", ctx->fleet_id); + flb_strptime(fbit_last_modified, "%a, %d %B %Y %H:%M:%S GMT", &tm_last_modified); + last_modified = mktime(&tm_last_modified.tm); + + /* skip the second PATH_SEPARATOR to allow creating files in the base + * fleet_config directory. */ + snprintf(fname, sizeof(fname)-1, "%s/%d%s", ctx->config_dir, (int)last_modified, dst); + + errno = 0; + if (access(fname, F_OK) != -1 || errno == ENOENT) { + ret = 0; + goto client_error; } - client = fleet_http_do(ctx, u_conn, ctx->fleet_url); + fp = fopen(fname, "w+"); - if (!client) { - flb_plg_error(ins, "unable to create http client"); + if (fp == NULL) { goto client_error; } - /* copy and NULL terminate the payload */ - data = flb_sds_create_size(client->resp.payload_size + 1); + if (hdr != NULL) { + len = fwrite(hdr, strlen(hdr), 1, fp); + if (len < strlen(hdr)) { + flb_plg_error(ctx->ins, "truncated write: %s", dst); + goto file_error; + } + } + + len = fwrite(client->resp.payload, client->resp.payload_size, 1, fp); + if (len < client->resp.payload_size) { + flb_plg_error(ctx->ins, "truncated write: %s", dst); + goto file_error; + } - if (!data) { - goto http_error; + if (time_last_modified) { + *time_last_modified = last_modified; } - memcpy(data, client->resp.payload, client->resp.payload_size); - data[client->resp.payload_size] = '\0'; - ret = case_header_lookup(client, "Last-modified", strlen("Last-modified"), - &fbit_last_modified, &fbit_last_modified_len); + ret = 1; - if (ret == -1) { - flb_plg_error(ctx->ins, "unable to get last-modified header"); - goto payload_error; +file_error: + fclose(fp); +client_error: + flb_http_client_destroy(client); + return ret; +} + +static int calyptia_config_add(struct flb_in_calyptia_fleet_config *ctx, + const char *cfgname) +{ + flb_sds_t cfgnewname; + flb_sds_t cfgoldname; + + cfgnewname = new_fleet_config_filename(ctx); + + if (exists_new_fleet_config(ctx) == FLB_TRUE) { + cfgoldname = old_fleet_config_filename(ctx); + rename(cfgnewname, cfgoldname); + unlink(cfgnewname); + flb_sds_destroy(cfgoldname); } - flb_strptime(fbit_last_modified, "%a, %d %B %Y %H:%M:%S GMT", &tm_last_modified); - time_last_modified = mktime(&tm_last_modified.tm); + link(cfgname, cfgnewname); + flb_sds_destroy(cfgnewname); - cfgname = time_fleet_config_filename(ctx, time_last_modified); + return 0; +} - if (access(cfgname, F_OK) == -1 && errno == ENOENT) { - if (create_fleet_directory(ctx) != 0) { - flb_plg_error(ctx->ins, "unable to create fleet directories"); - goto http_error; - } - cfgfp = fopen(cfgname, "w+"); +static int calyptia_config_commit(struct flb_in_calyptia_fleet_config *ctx, + const char *cfgname) +{ + flb_sds_t cfgnewname; + flb_sds_t cfgcurname; + flb_sds_t cfgoldname; - if (cfgfp == NULL) { - flb_plg_error(ctx->ins, "unable to open configuration file: %s", cfgname); - flb_sds_destroy(cfgname); - goto payload_error; - } + cfgnewname = new_fleet_config_filename(ctx); + cfgcurname = cur_fleet_config_filename(ctx); + cfgoldname = old_fleet_config_filename(ctx); - header = flb_sds_create_size(4096); - - if (ctx->fleet_name == NULL) { - hdr = flb_sds_printf(&header, - "[CUSTOM]\n" - " Name calyptia\n" - " api_key %s\n" - " fleet_id %s\n" - " add_label fleet_id %s\n" - " fleet.config_dir %s\n" - " calyptia_host %s\n" - " calyptia_port %d\n" - " calyptia_tls %s\n", - ctx->api_key, - ctx->fleet_id, - ctx->fleet_id, - ctx->config_dir, - ctx->ins->host.name, - ctx->ins->host.port, - tls_setting_string(ctx->ins->use_tls) - ); - } - else { - hdr = flb_sds_printf(&header, - "[CUSTOM]\n" - " Name calyptia\n" - " api_key %s\n" - " fleet_name %s\n" - " fleet_id %s\n" - " add_label fleet_id %s\n" - " fleet.config_dir %s\n" - " calyptia_host %s\n" - " calyptia_port %d\n" - " calyptia_tls %s\n", - ctx->api_key, - ctx->fleet_name, - ctx->fleet_id, - ctx->fleet_id, - ctx->config_dir, - ctx->ins->host.name, - ctx->ins->host.port, - tls_setting_string(ctx->ins->use_tls) - ); - } - if (hdr == NULL) { - fclose(cfgfp); - flb_sds_destroy(cfgname); - goto header_error; - } - if (ctx->machine_id) { - hdr = flb_sds_printf(&header, " machine_id %s\n", ctx->machine_id); - if (hdr == NULL) { - fclose(cfgfp); - flb_sds_destroy(cfgname); - goto header_error; - } - } - fwrite(header, strlen(header), 1, cfgfp); - flb_sds_destroy(header); - header = NULL; - fwrite(data, client->resp.payload_size, 1, cfgfp); - fclose(cfgfp); + if (exists_new_fleet_config(ctx) == FLB_TRUE) { + unlink(cfgnewname); + } - cfgnewname = new_fleet_config_filename(ctx); + if (exists_old_fleet_config(ctx) == FLB_TRUE) { + unlink(cfgoldname); + } - if (exists_new_fleet_config(ctx) == FLB_TRUE) { - cfgoldname = old_fleet_config_filename(ctx); - rename(cfgnewname, cfgoldname); - unlink(cfgnewname); - flb_sds_destroy(cfgoldname); - } + if (exists_cur_fleet_config(ctx) == FLB_TRUE) { + unlink(cfgcurname); + } - if (!link(cfgname, cfgnewname)) { -#ifdef FLB_SYSTEM_WINDOWS - err = GetLastError(); - FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER, - NULL, err, 0, &lpMsg, 0, NULL); - flb_plg_error(ctx->ins, "unable to create hard link: %s", lpMsg); -#else - flb_errno(); -#endif - } + link(cfgname, cfgcurname); - flb_sds_destroy(cfgnewname); + flb_sds_destroy(cfgnewname); + flb_sds_destroy(cfgcurname); + flb_sds_destroy(cfgoldname); + + return 0; +} + +static int calyptia_config_rollback(struct flb_in_calyptia_fleet_config *ctx, + const char *cfgname) +{ + flb_sds_t cfgnewname; + flb_sds_t cfgcurname; + flb_sds_t cfgoldname; + + cfgnewname = new_fleet_config_filename(ctx); + cfgcurname = cur_fleet_config_filename(ctx); + cfgoldname = old_fleet_config_filename(ctx); + + if (exists_new_fleet_config(ctx) == FLB_TRUE) { + unlink(cfgnewname); } - if (ctx->config_timestamp < time_last_modified) { - flb_plg_debug(ctx->ins, "new configuration is newer than current: %ld < %ld", - ctx->config_timestamp, time_last_modified); + if (exists_old_fleet_config(ctx) == FLB_TRUE) { + rename(cfgoldname, cfgcurname); + } - if (execute_reload(ctx, cfgname) == FLB_FALSE) { - cfgoldname = old_fleet_config_filename(ctx); - cfgcurname = cur_fleet_config_filename(ctx); - rename(cfgoldname, cfgcurname); - flb_sds_destroy(cfgcurname); - flb_sds_destroy(cfgoldname); - flb_sds_destroy(cfgname); - goto reload_error; - } + flb_sds_destroy(cfgnewname); + flb_sds_destroy(cfgcurname); + flb_sds_destroy(cfgoldname); + + return 0; +} + +static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx, + struct flb_connection *u_conn) +{ + flb_sds_t cfgname; + flb_sds_t cfgnewname; + flb_sds_t header; + time_t time_last_modified; + int ret = -1; + + if (ctx->fleet_url == NULL) { + ctx->fleet_url = flb_sds_create_size(4096); + flb_sds_printf(&ctx->fleet_url, "/v1/fleets/%s/config?format=ini", ctx->fleet_id); + } + + header = flb_sds_create_size(4096); + + if (ctx->fleet_name == NULL) { + flb_sds_printf(&header, + "[CUSTOM]\n" + " Name calyptia\n" + " api_key %s\n" + " fleet_id %s\n" + " add_label fleet_id %s\n" + " fleet.config_dir %s\n" + " calyptia_host %s\n" + " calyptia_port %d\n" + " calyptia_tls %s\n", + ctx->api_key, + ctx->fleet_id, + ctx->fleet_id, + ctx->config_dir, + ctx->ins->host.name, + ctx->ins->host.port, + tls_setting_string(ctx->ins->use_tls) + ); } else { + flb_sds_printf(&header, + "[CUSTOM]\n" + " Name calyptia\n" + " api_key %s\n" + " fleet_name %s\n" + " fleet_id %s\n" + " add_label fleet_id %s\n" + " fleet.config_dir %s\n" + " calyptia_host %s\n" + " calyptia_port %d\n" + " calyptia_tls %s\n", + ctx->api_key, + ctx->fleet_name, + ctx->fleet_id, + ctx->fleet_id, + ctx->config_dir, + ctx->ins->host.name, + ctx->ins->host.port, + tls_setting_string(ctx->ins->use_tls) + ); + } + + /* create the base file. */ + ret = get_calyptia_file(ctx, u_conn, ctx->fleet_url, header, + ".ini", &time_last_modified); + + /* new file created! */ + if (ret == 1) { + cfgname = time_fleet_config_filename(ctx, time_last_modified); + calyptia_config_add(ctx, cfgname); flb_sds_destroy(cfgname); + + cfgnewname = new_fleet_config_filename(ctx); + if (execute_reload(ctx, cfgnewname) == FLB_FALSE) { + calyptia_config_rollback(ctx, cfgname); + flb_sds_destroy(cfgnewname); + return -1; + } else { + calyptia_config_commit(ctx, cfgname); + flb_sds_destroy(cfgnewname); + } } - ret = 0; + return 0; +} -reload_error: -header_error: - if (header) { - flb_sds_destroy(header); +/* cb_collect callback */ +static int in_calyptia_fleet_collect(struct flb_input_instance *ins, + struct flb_config *config, + void *in_context) +{ + struct flb_in_calyptia_fleet_config *ctx = in_context; + struct flb_connection *u_conn; + int ret = -1; + + u_conn = flb_upstream_conn_get(ctx->u); + + if (!u_conn) { + flb_plg_error(ctx->ins, "could not get an upstream connection to %s:%u", + ctx->ins->host.name, ctx->ins->host.port); + goto conn_error; } -payload_error: - if (data) { - flb_sds_destroy(data); + + if (ctx->fleet_id == NULL) { + + if (get_calyptia_fleet_id_by_name(ctx, u_conn, config) == -1) { + flb_plg_error(ctx->ins, "unable to find fleet: %s", ctx->fleet_name); + goto conn_error; + } } -http_error: - flb_plg_debug(ctx->ins, "freeing http client in fleet collect"); - flb_http_client_destroy(client); -client_error: - flb_upstream_conn_release(u_conn); + ret = get_calyptia_fleet_config(ctx, u_conn); + conn_error: FLB_INPUT_RETURN(ret); } From e8eea97a833da7e6f4f5f9e238f5e5848f087079 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Fri, 6 Oct 2023 15:41:42 -0300 Subject: [PATCH 03/20] fleet: fix return value handling for fwrite. Signed-off-by: Phillip Whelan --- plugins/in_calyptia_fleet/in_calyptia_fleet.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index c8325c5681d..fc66405d9aa 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -936,14 +936,14 @@ static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx, if (hdr != NULL) { len = fwrite(hdr, strlen(hdr), 1, fp); - if (len < strlen(hdr)) { + if (len < 1) { flb_plg_error(ctx->ins, "truncated write: %s", dst); goto file_error; } } len = fwrite(client->resp.payload, client->resp.payload_size, 1, fp); - if (len < client->resp.payload_size) { + if (len < 1) { flb_plg_error(ctx->ins, "truncated write: %s", dst); goto file_error; } From b067a8531a412b3549d2b3694974376a978d09bb Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Fri, 6 Oct 2023 15:46:09 -0300 Subject: [PATCH 04/20] fleet: correctly generate the new configuration file name. Signed-off-by: Phillip Whelan --- plugins/in_calyptia_fleet/in_calyptia_fleet.c | 86 ++++++++++--------- 1 file changed, 45 insertions(+), 41 deletions(-) diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index fc66405d9aa..5fcc71c56e8 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -184,6 +184,16 @@ struct reload_ctx { flb_sds_t cfg_path; }; +static flb_sds_t generate_fleet_directory(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t *fleet_dir) +{ + if (ctx->fleet_name != NULL) { + return flb_sds_printf(fleet_dir, "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s", + ctx->config_dir, ctx->machine_id, ctx->fleet_name); + } + return flb_sds_printf(fleet_dir, "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s", + ctx->config_dir, ctx->machine_id, ctx->fleet_id); +} + static flb_sds_t fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx, char *fname) { flb_sds_t cfgname; @@ -845,41 +855,6 @@ static int __mkdir(const char *dir, int perms) { return _mkdir(tmp, perms); } -static int create_fleet_directory(struct flb_in_calyptia_fleet_config *ctx) -{ - flb_sds_t myfleetdir; - - flb_plg_debug(ctx->ins, "checking for configuration directory=%s", ctx->config_dir); - if (access(ctx->config_dir, F_OK) != 0) { - if (__mkdir(ctx->config_dir, 0700) != 0) { - flb_plg_error(ctx->ins, "unable to create fleet config directory"); - return -1; - } - } - - myfleetdir = flb_sds_create_size(256); - - if (ctx->fleet_name != NULL) { - flb_sds_printf(&myfleetdir, "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s", - ctx->config_dir, ctx->machine_id, ctx->fleet_name); - } - else { - flb_sds_printf(&myfleetdir, "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s", - ctx->config_dir, ctx->machine_id, ctx->fleet_id); - } - - flb_plg_debug(ctx->ins, "checking for fleet directory=%s", myfleetdir); - if (access(myfleetdir, F_OK) != 0) { - if (__mkdir(myfleetdir, 0700) !=0) { - flb_plg_error(ctx->ins, "unable to create fleet specific directory"); - return -1; - } - } - - flb_sds_destroy(myfleetdir); - return 0; -} - /* cb_collect callback */ static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx, struct flb_connection *u_conn, @@ -889,7 +864,6 @@ static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx, time_t *time_last_modified) { struct flb_http_client *client; - char fname[4096] = { 0 }; size_t len; FILE *fp; int ret = -1; @@ -897,6 +871,7 @@ static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx, struct flb_tm tm_last_modified = { 0 }; int fbit_last_modified_len; time_t last_modified; + flb_sds_t fname; if (ctx == NULL || u_conn == NULL || url == NULL || dst == NULL) { return -1; @@ -918,13 +893,14 @@ static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx, flb_strptime(fbit_last_modified, "%a, %d %B %Y %H:%M:%S GMT", &tm_last_modified); last_modified = mktime(&tm_last_modified.tm); - /* skip the second PATH_SEPARATOR to allow creating files in the base - * fleet_config directory. */ - snprintf(fname, sizeof(fname)-1, "%s/%d%s", ctx->config_dir, (int)last_modified, dst); + fname = time_fleet_config_filename(ctx, last_modified); - errno = 0; - if (access(fname, F_OK) != -1 || errno == ENOENT) { + if (access(fname, F_OK) == 0) { ret = 0; + goto file_exists; + } + + if (fname == NULL) { goto client_error; } @@ -957,6 +933,7 @@ static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx, file_error: fclose(fp); client_error: +file_exists: flb_http_client_destroy(client); return ret; } @@ -1154,6 +1131,33 @@ static int in_calyptia_fleet_collect(struct flb_input_instance *ins, FLB_INPUT_RETURN(ret); } +static int create_fleet_directory(struct flb_in_calyptia_fleet_config *ctx) +{ + flb_sds_t myfleetdir; + + if (access(ctx->config_dir, F_OK) != 0) { + if (__mkdir(ctx->config_dir, 0700) != 0) { + return -1; + } + } + + myfleetdir = flb_sds_create_size(256); + + if (generate_fleet_directory(ctx, &myfleetdir) == NULL) { + flb_sds_destroy(myfleetdir); + return -1; + } + + if (access(myfleetdir, F_OK) != 0) { + if (__mkdir(myfleetdir, 0700) !=0) { + return -1; + } + } + + flb_sds_destroy(myfleetdir); + return 0; +} + static int load_fleet_config(struct flb_in_calyptia_fleet_config *ctx) { flb_ctx_t *flb_ctx = flb_context_get(); From a6da4a879a6be6e4cdcbcb756d5f65c144ef216b Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Fri, 6 Oct 2023 15:49:31 -0300 Subject: [PATCH 05/20] fleet: symlink the timestamped file. Signed-off-by: Phillip Whelan --- plugins/in_calyptia_fleet/in_calyptia_fleet.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index 5fcc71c56e8..3f349c75fc4 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -953,7 +953,7 @@ static int calyptia_config_add(struct flb_in_calyptia_fleet_config *ctx, flb_sds_destroy(cfgoldname); } - link(cfgname, cfgnewname); + symlink(cfgname, cfgnewname); flb_sds_destroy(cfgnewname); return 0; @@ -965,10 +965,12 @@ static int calyptia_config_commit(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cfgnewname; flb_sds_t cfgcurname; flb_sds_t cfgoldname; + flb_sds_t cfgtimename; cfgnewname = new_fleet_config_filename(ctx); cfgcurname = cur_fleet_config_filename(ctx); cfgoldname = old_fleet_config_filename(ctx); + cfgtimename = time_fleet_config_filename(ctx, ctx->config_timestamp); if (exists_new_fleet_config(ctx) == FLB_TRUE) { unlink(cfgnewname); @@ -982,11 +984,12 @@ static int calyptia_config_commit(struct flb_in_calyptia_fleet_config *ctx, unlink(cfgcurname); } - link(cfgname, cfgcurname); + symlink(cfgtimename, cfgcurname); flb_sds_destroy(cfgnewname); flb_sds_destroy(cfgcurname); flb_sds_destroy(cfgoldname); + flb_sds_destroy(cfgtimename); return 0; } From 0049222080ba246ed8e24b19b3a0b55f2417b504 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Fri, 6 Oct 2023 15:50:54 -0300 Subject: [PATCH 06/20] fleet: delete old configuration files during commit. Signed-off-by: Phillip Whelan --- plugins/in_calyptia_fleet/in_calyptia_fleet.c | 234 +++++++++++++++++- 1 file changed, 229 insertions(+), 5 deletions(-) diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index 3f349c75fc4..53fdebba8b7 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -37,6 +37,16 @@ #include #include +// Glob support +#ifndef _MSC_VER +#include +#endif + +#ifdef _WIN32 +#include +#include +#define PATH_MAX MAX_PATH +#endif #define CALYPTIA_H_PROJECT "X-Project-Token" #define CALYPTIA_H_CTYPE "Content-Type" @@ -938,6 +948,152 @@ static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx, return ret; } +#ifndef _WIN32 +static struct cfl_array *read_glob(const char *path) +{ + int ret = -1; + int ret_glb = -1; + glob_t glb; + size_t idx; + struct cfl_array *list; + + + ret_glb = glob(path, GLOB_NOSORT, NULL, &glb); + + if (ret_glb != 0) { + switch(ret_glb){ + case GLOB_NOSPACE: + flb_warn("[%s] glob: [%s] no space", __FUNCTION__, path); + break; + case GLOB_NOMATCH: + flb_warn("[%s] glob: [%s] no match", __FUNCTION__, path); + break; + case GLOB_ABORTED: + flb_warn("[%s] glob: [%s] aborted", __FUNCTION__, path); + break; + default: + flb_warn("[%s] glob: [%s] other error", __FUNCTION__, path); + } + return NULL; + } + + list = cfl_array_create(glb.gl_pathc); + for (idx = 0; idx < glb.gl_pathc; idx++) { + ret = cfl_array_append_string(list, glb.gl_pathv[idx]); + if (ret < 0) { + cfl_array_destroy(list); + return NULL; + } + } + + globfree(&glb); + return list; +} +#else +static char *dirname(char *path) +{ + char *ptr; + + + ptr = strrchr(path, '\\'); + + if (ptr == NULL) { + return path; + } + *ptr++='\0'; + return path; +} + +static struct cfl_array *read_glob(const char *path) +{ + char *star, *p0, *p1; + char pattern[MAX_PATH]; + char buf[MAX_PATH]; + int ret; + struct stat st; + HANDLE hnd; + WIN32_FIND_DATA data; + + if (strlen(path) > MAX_PATH - 1) { + return -1; + } + + star = strchr(path, '*'); + + if (star == NULL) { + return -1; + } + + /* + * C:\data\tmp\input_*.conf + * 0<-----| + */ + p0 = star; + while (path <= p0 && *p0 != '\\') { + p0--; + } + + /* + * C:\data\tmp\input_*.conf + * |---->1 + */ + p1 = star; + while (*p1 && *p1 != '\\') { + p1++; + } + + memcpy(pattern, path, (p1 - path)); + pattern[p1 - path] = '\0'; + + hnd = FindFirstFileA(pattern, &data); + + if (hnd == INVALID_HANDLE_VALUE) { + return 0; + } + + do { + /* Ignore the current and parent dirs */ + if (!strcmp(".", data.cFileName) || !strcmp("..", data.cFileName)) { + continue; + } + + /* Avoid an infinite loop */ + if (strchr(data.cFileName, '*')) { + continue; + } + + /* Create a path (prefix + filename + suffix) */ + memcpy(buf, path, p0 - path + 1); + buf[p0 - path + 1] = '\0'; + + if (FAILED(StringCchCatA(buf, MAX_PATH, data.cFileName))) { + continue; + } + + if (FAILED(StringCchCatA(buf, MAX_PATH, p1))) { + continue; + } + + if (strchr(p1, '*')) { + read_glob(conf, ctx, state, buf); /* recursive */ + continue; + } + + ret = stat(buf, &st); + + if (ret == 0 && (st.st_mode & S_IFMT) == S_IFREG) { + + if (read_config(conf, ctx, state, buf) < 0) { + return -1; + } + } + } while (FindNextFileA(hnd, &data) != 0); + + FindClose(hnd); + return 0; +} +#endif + static int calyptia_config_add(struct flb_in_calyptia_fleet_config *ctx, const char *cfgname) { @@ -959,8 +1115,73 @@ static int calyptia_config_add(struct flb_in_calyptia_fleet_config *ctx, return 0; } -static int calyptia_config_commit(struct flb_in_calyptia_fleet_config *ctx, - const char *cfgname) +static int cfl_array_qsort_ini(const void *arg_a, const void *arg_b) +{ + struct cfl_variant *var_a = (struct cfl_variant *)*(void **)arg_a; + struct cfl_variant *var_b = (struct cfl_variant *)*(void **)arg_b; + + if (var_a == NULL && var_b == NULL) { + return 0; + } + else if (var_a == NULL) { + return -1; + } + else if (var_b == NULL) { + return 1; + } + else if (var_a->type != CFL_VARIANT_STRING && + var_b->type != CFL_VARIANT_STRING) { + return 0; + } + else if (var_a->type != CFL_VARIANT_STRING) { + return -1; + } + else if (var_b->type != CFL_VARIANT_STRING) { + return 1; + } + + return strcmp(var_a->data.as_string, var_b->data.as_string); +} + +static int calyptia_config_delete_old(struct flb_in_calyptia_fleet_config *ctx) +{ + struct cfl_array *inis; + flb_sds_t glob_ini; + int idx; + + glob_ini = flb_sds_create_size(128); + if (glob_ini == NULL) { + return -1; + } + + if (generate_fleet_directory(ctx, &glob_ini) == NULL) { + flb_sds_destroy(glob_ini); + return -1; + } + + if (flb_sds_cat_safe(&glob_ini, "/*.ini", strlen("/*.ini")) != 0) { + flb_sds_destroy(glob_ini); + return -1; + } + + inis = read_glob(glob_ini); + if (inis == NULL) { + flb_sds_destroy(glob_ini); + return -1; + } + + qsort(inis->entries, inis->entry_count, + sizeof(struct cfl_variant *), + cfl_array_qsort_ini); + + for (idx = 0; idx < (((ssize_t)inis->entry_count) -1 -3);idx++) { + unlink(inis->entries[idx]->data.as_string); + } + + return 0; +} + +static int calyptia_config_commit(struct flb_in_calyptia_fleet_config *ctx) { flb_sds_t cfgnewname; flb_sds_t cfgcurname; @@ -991,6 +1212,8 @@ static int calyptia_config_commit(struct flb_in_calyptia_fleet_config *ctx, flb_sds_destroy(cfgoldname); flb_sds_destroy(cfgtimename); + calyptia_config_delete_old(ctx); + return 0; } @@ -1094,9 +1317,6 @@ static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx, calyptia_config_rollback(ctx, cfgname); flb_sds_destroy(cfgnewname); return -1; - } else { - calyptia_config_commit(ctx, cfgname); - flb_sds_destroy(cfgnewname); } } @@ -1316,6 +1536,10 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, return 0; } + if (exists_new_fleet_config(ctx) == FLB_TRUE) { + calyptia_config_commit(ctx); + } + /* Set our collector based on time */ ret = flb_input_set_collector_time(in, in_calyptia_fleet_collect, From 291cee0956f9a2c61cd0200585012afad1605f0d Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Fri, 6 Oct 2023 15:51:14 -0300 Subject: [PATCH 07/20] fleet: remove white space. Signed-off-by: Phillip Whelan --- plugins/in_calyptia_fleet/in_calyptia_fleet.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index 53fdebba8b7..bd501eb342a 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -129,7 +129,7 @@ static char *find_case_header(struct flb_http_client *cli, const char *header) if (strncasecmp(ptr, header, strlen(header)) == 0) { if (ptr[strlen(header)] == ':' && ptr[strlen(header)+1] == ' ') { - return ptr; + return ptr; } } } @@ -417,7 +417,7 @@ static void *do_reload(void *data) #ifndef FLB_SYSTEM_WINDOWS kill(getpid(), SIGHUP); #else - GenerateConsoleCtrlEvent(1 /* CTRL_BREAK_EVENT_1 */, 0); + GenerateConsoleCtrlEvent(1 /* CTRL_BREAK_EVENT_1 */, 0); #endif return NULL; } @@ -438,7 +438,7 @@ static int test_config_is_valid(flb_sds_t cfgpath) if (conf == NULL) { goto cf_create_from_file_error; - } + } ret = FLB_TRUE; @@ -692,7 +692,7 @@ static flb_sds_t get_project_id_from_api_key(struct flb_in_calyptia_fleet_config memcpy(encoded, ctx->api_key, api_token_sep-ctx->api_key); ret = flb_base64_decode(token, sizeof(token)-1, &tlen, - encoded, elen); + encoded, elen); if (ret != 0) { return NULL; @@ -764,7 +764,7 @@ static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ct } url = flb_sds_create_size(4096); - flb_sds_printf(&url, "/v1/search?project_id=%s&resource=fleet&term=%s", + flb_sds_printf(&url, "/v1/search?project_id=%s&resource=fleet&term=%s", project_id, ctx->fleet_name); client = fleet_http_do(ctx, u_conn, url); @@ -872,7 +872,7 @@ static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx, const char *hdr, const char *dst, time_t *time_last_modified) - { +{ struct flb_http_client *client; size_t len; FILE *fp; @@ -1303,7 +1303,7 @@ static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx, } /* create the base file. */ - ret = get_calyptia_file(ctx, u_conn, ctx->fleet_url, header, + ret = get_calyptia_file(ctx, u_conn, ctx->fleet_url, header, ".ini", &time_last_modified); /* new file created! */ @@ -1325,7 +1325,7 @@ static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx, /* cb_collect callback */ static int in_calyptia_fleet_collect(struct flb_input_instance *ins, - struct flb_config *config, + struct flb_config *config, void *in_context) { struct flb_in_calyptia_fleet_config *ctx = in_context; From 3cb8302e9406a901da07168b734029fff3d52b57 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Wed, 11 Oct 2023 16:23:39 -0300 Subject: [PATCH 08/20] in_calyptia_fleet: generate_base_fleet_directory now allocates the flb_sds_t for the caller. Signed-off-by: Phillip Whelan --- plugins/in_calyptia_fleet/in_calyptia_fleet.c | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index bd501eb342a..f59b52a7e90 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -194,8 +194,13 @@ struct reload_ctx { flb_sds_t cfg_path; }; -static flb_sds_t generate_fleet_directory(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t *fleet_dir) +static flb_sds_t generate_base_fleet_directory(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t *fleet_dir) { + *fleet_dir = flb_sds_create_size(4096); + if (*fleet_dir == NULL) { + return NULL; + } + if (ctx->fleet_name != NULL) { return flb_sds_printf(fleet_dir, "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s", ctx->config_dir, ctx->machine_id, ctx->fleet_name); @@ -1149,12 +1154,7 @@ static int calyptia_config_delete_old(struct flb_in_calyptia_fleet_config *ctx) flb_sds_t glob_ini; int idx; - glob_ini = flb_sds_create_size(128); - if (glob_ini == NULL) { - return -1; - } - - if (generate_fleet_directory(ctx, &glob_ini) == NULL) { + if (generate_base_fleet_directory(ctx, &glob_ini) == NULL) { flb_sds_destroy(glob_ini); return -1; } @@ -1178,6 +1178,7 @@ static int calyptia_config_delete_old(struct flb_in_calyptia_fleet_config *ctx) unlink(inis->entries[idx]->data.as_string); } + flb_sds_destroy(glob_ini); return 0; } @@ -1364,9 +1365,7 @@ static int create_fleet_directory(struct flb_in_calyptia_fleet_config *ctx) } } - myfleetdir = flb_sds_create_size(256); - - if (generate_fleet_directory(ctx, &myfleetdir) == NULL) { + if (generate_base_fleet_directory(ctx, &myfleetdir) == NULL) { flb_sds_destroy(myfleetdir); return -1; } From daf43f86f0e2704f1a30922417fee11558255e15 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Wed, 11 Oct 2023 16:41:59 -0300 Subject: [PATCH 09/20] in_calyptia_fleet: download and load fleet files. Download fleet files to a sub-directory with the timestamp of the relevant configuration and allow it to be loaded by changing to that directory when fluent-bit is reloaded. * Recurse through fleet files and download each one. * Change to the fleet files directory on reload so they can be referred to from the main fleet configuration file. * Delete old fleet file directories when the configuration is deleted. Signed-off-by: Phillip Whelan --- plugins/in_calyptia_fleet/in_calyptia_fleet.c | 377 ++++++++++++++++-- 1 file changed, 339 insertions(+), 38 deletions(-) diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index f59b52a7e90..8c4a11c50c1 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -89,6 +89,7 @@ struct flb_in_calyptia_fleet_config { flb_sds_t cloud_port; flb_sds_t fleet_url; + flb_sds_t fleet_files_url; struct flb_input_instance *ins; /* plugin instance */ struct flb_config *config; /* Fluent Bit context */ @@ -99,6 +100,13 @@ struct flb_in_calyptia_fleet_config { int collect_fd; }; +static int get_calyptia_files(struct flb_in_calyptia_fleet_config *ctx, + struct flb_connection *u_conn, + const char *url, + time_t timestamp); + +static int fleet_cur_chdir(struct flb_in_calyptia_fleet_config *ctx); + static char *find_case_header(struct flb_http_client *cli, const char *header) { char *ptr; @@ -453,6 +461,59 @@ static int test_config_is_valid(flb_sds_t cfgpath) return ret; } +static int parse_config_name_timestamp(struct flb_in_calyptia_fleet_config *ctx, + const char *cfgpath, + long *config_timestamp) +{ + char *ext = NULL; + long timestamp; + char realname[4096]; + char *fname; + ssize_t len; + + if (config_timestamp == NULL || cfgpath == NULL) { + return FLB_FALSE; + } + + len = readlink(cfgpath, realname, sizeof(realname)); + + if (len > sizeof(realname)) { + return FLB_FALSE; + } + + fname = basename(realname); + + errno = 0; + timestamp = strtol(fname, &ext, 10); + + if ((errno == ERANGE && (timestamp == LONG_MAX || timestamp == LONG_MIN)) || + (errno != 0 && timestamp == 0)) { + flb_errno(); + return FLB_FALSE; + } + + /* unable to parse the timstamp */ + if (errno == ERANGE) { + return FLB_FALSE; + } + + *config_timestamp = timestamp; + + return FLB_TRUE; +} + +static int parse_config_timestamp(struct flb_in_calyptia_fleet_config *ctx, + long *config_timestamp) +{ + flb_ctx_t *flb_ctx = flb_context_get(); + + if (config_timestamp == NULL) { + return FLB_FALSE; + } + + return parse_config_name_timestamp(ctx, flb_ctx->config->conf_path_file, config_timestamp); +} + static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cfgpath) { struct reload_ctx *reload; @@ -460,6 +521,9 @@ static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cf pthread_attr_t ptha; flb_ctx_t *flb = flb_context_get(); + + parse_config_name_timestamp(ctx, cfgpath, &ctx->config_timestamp); + if (ctx->collect_fd > 0) { flb_input_collector_pause(ctx->collect_fd, ctx->ins); } @@ -496,6 +560,8 @@ static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cf reload->flb = flb; reload->cfg_path = cfgpath; + fleet_cur_chdir(ctx); + pthread_attr_init(&ptha); pthread_attr_setdetachstate(&ptha, PTHREAD_CREATE_DETACHED); pthread_create(&pth, &ptha, do_reload, reload); @@ -795,6 +861,7 @@ static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ct #ifdef FLB_SYSTEM_WINDOWS #define link(a, b) CreateHardLinkA(b, a, 0) +#define symlink(a, b) CreateSymLinkA(b, a, 0) ssize_t readlink(const char *path, char *realpath, size_t srealpath) { HANDLE hFile; @@ -870,10 +937,9 @@ static int __mkdir(const char *dir, int perms) { return _mkdir(tmp, perms); } -/* cb_collect callback */ static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx, struct flb_connection *u_conn, - const char *url, + flb_sds_t url, const char *hdr, const char *dst, time_t *time_last_modified) @@ -888,11 +954,11 @@ static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx, time_t last_modified; flb_sds_t fname; - if (ctx == NULL || u_conn == NULL || url == NULL || dst == NULL) { + if (ctx == NULL || u_conn == NULL || url == NULL) { return -1; } - client = fleet_http_do(ctx, u_conn, ctx->fleet_url); + client = fleet_http_do(ctx, u_conn, url); if (client == NULL) { return -1; @@ -905,10 +971,14 @@ static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx, goto client_error; } - flb_strptime(fbit_last_modified, "%a, %d %B %Y %H:%M:%S GMT", &tm_last_modified); - last_modified = mktime(&tm_last_modified.tm); + if (dst == NULL) { + flb_strptime(fbit_last_modified, "%a, %d %B %Y %H:%M:%S GMT", &tm_last_modified); + last_modified = mktime(&tm_last_modified.tm); - fname = time_fleet_config_filename(ctx, last_modified); + fname = time_fleet_config_filename(ctx, last_modified); + } else { + fname = flb_sds_create_len(dst, strlen(dst)); + } if (access(fname, F_OK) == 0) { ret = 0; @@ -948,6 +1018,7 @@ static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx, file_error: fclose(fp); client_error: + flb_sds_destroy(fname); file_exists: flb_http_client_destroy(client); return ret; @@ -1148,6 +1219,43 @@ static int cfl_array_qsort_ini(const void *arg_a, const void *arg_b) return strcmp(var_a->data.as_string, var_b->data.as_string); } +static int calyptia_config_delete_old_dir(const char *cfgpath) +{ + flb_sds_t cfg_glob; + char *ext; + struct cfl_array *files; + int idx; + + ext = strrchr(cfgpath, '.'); + if (ext == NULL) { + return FLB_FALSE; + } + + cfg_glob = flb_sds_create_len(cfgpath, ext - cfgpath); + if (cfg_glob == NULL) { + return FLB_FALSE; + } + + if (flb_sds_cat_safe(&cfg_glob, "/*", strlen("/*")) != 0) { + flb_sds_destroy(cfg_glob); + return FLB_FALSE; + } + + files = read_glob(cfg_glob); + if (files == NULL) { + flb_sds_destroy(cfg_glob); + return FLB_FALSE; + } + + for (idx = 0; idx < ((ssize_t)files->entry_count); idx++) { + unlink(files->entries[idx]->data.as_string); + } + + flb_sds_destroy(cfg_glob); + cfl_array_destroy(files); + return FLB_TRUE; +} + static int calyptia_config_delete_old(struct flb_in_calyptia_fleet_config *ctx) { struct cfl_array *inis; @@ -1174,10 +1282,12 @@ static int calyptia_config_delete_old(struct flb_in_calyptia_fleet_config *ctx) sizeof(struct cfl_variant *), cfl_array_qsort_ini); - for (idx = 0; idx < (((ssize_t)inis->entry_count) -1 -3);idx++) { + for (idx = 0; idx < (((ssize_t)inis->entry_count) -1 - 3); idx++) { unlink(inis->entries[idx]->data.as_string); + calyptia_config_delete_old_dir(inis->entries[idx]->data.as_string); } + cfl_array_destroy(inis); flb_sds_destroy(glob_ini); return 0; } @@ -1258,6 +1368,11 @@ static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx, flb_sds_printf(&ctx->fleet_url, "/v1/fleets/%s/config?format=ini", ctx->fleet_id); } + if (ctx->fleet_files_url == NULL) { + ctx->fleet_files_url = flb_sds_create_size(4096); + flb_sds_printf(&ctx->fleet_files_url, "/v1/fleets/%s/files", ctx->fleet_id); + } + header = flb_sds_create_size(4096); if (ctx->fleet_name == NULL) { @@ -1305,10 +1420,12 @@ static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx, /* create the base file. */ ret = get_calyptia_file(ctx, u_conn, ctx->fleet_url, header, - ".ini", &time_last_modified); + NULL, &time_last_modified); /* new file created! */ if (ret == 1) { + get_calyptia_files(ctx, u_conn, ctx->fleet_files_url, time_last_modified); + cfgname = time_fleet_config_filename(ctx, time_last_modified); calyptia_config_add(ctx, cfgname); flb_sds_destroy(cfgname); @@ -1380,14 +1497,71 @@ static int create_fleet_directory(struct flb_in_calyptia_fleet_config *ctx) return 0; } +static flb_sds_t fleet_gendir(struct flb_in_calyptia_fleet_config *ctx, time_t timestamp) +{ + flb_sds_t fleetdir; + flb_sds_t fleetcurdir; + + + if (generate_base_fleet_directory(ctx, &fleetdir) == NULL) { + return NULL; + } + + fleetcurdir = flb_sds_create_size(strlen(fleetdir) + 32); + + if (fleetcurdir == NULL) { + flb_sds_destroy(fleetdir); + return NULL; + } + + if (flb_sds_printf(&fleetcurdir, "%s/%ld", fleetdir, timestamp) == NULL) { + flb_sds_destroy(fleetdir); + flb_sds_destroy(fleetcurdir); + return NULL; + } + + flb_sds_destroy(fleetdir); + + return fleetcurdir; +} + +static int fleet_mkdir(struct flb_in_calyptia_fleet_config *ctx, time_t timestamp) +{ + flb_sds_t fleetcurdir; + + fleetcurdir = fleet_gendir(ctx, timestamp); + + if (fleetcurdir == NULL) { + return -1; + } + + __mkdir(fleetcurdir, 0700); + flb_sds_destroy(fleetcurdir); + + return 0; +} + +static int fleet_cur_chdir(struct flb_in_calyptia_fleet_config *ctx) +{ + flb_sds_t fleetcurdir; + int ret; + + fleetcurdir = fleet_gendir(ctx, ctx->config_timestamp); + flb_plg_info(ctx->ins, "changing to config dir: %s", fleetcurdir); + + if (fleetcurdir == NULL) { + return -1; + } + + ret = chdir(fleetcurdir); + flb_sds_destroy(fleetcurdir); + + return ret; +} + static int load_fleet_config(struct flb_in_calyptia_fleet_config *ctx) { flb_ctx_t *flb_ctx = flb_context_get(); - char *fname; - char *ext; - long timestamp; - char realname[4096]; - ssize_t len; if (create_fleet_directory(ctx) != 0) { flb_plg_error(ctx->ins, "unable to create fleet directories"); @@ -1405,41 +1579,168 @@ static int load_fleet_config(struct flb_in_calyptia_fleet_config *ctx) } } else { - if (is_new_fleet_config(ctx, flb_ctx->config) || is_cur_fleet_config(ctx, flb_ctx->config)) { - len = readlink(flb_ctx->config->conf_path_file, realname, sizeof(realname)); + parse_config_timestamp(ctx, &ctx->config_timestamp); + } - if (len > sizeof(realname)) { - return FLB_FALSE; - } + return FLB_FALSE; +} - fname = basename(realname); - } - else { - fname = basename(flb_ctx->config->conf_path_file); - } +static int create_fleet_file(flb_sds_t fleetdir, + const char *name, + int nlen, + const char *b64_content, + int blen) +{ + flb_sds_t fname; + flb_sds_t dst; + size_t dlen = 2 * blen; + FILE *fp; + int ret; - if (fname == NULL) { - return FLB_FALSE; - } + fname = flb_sds_create_size(strlen(fleetdir) + nlen + 2); + if (fname == NULL) { + return -1; + } + + if (flb_sds_cat_safe(&fname, fleetdir, strlen(fleetdir)) < 0) { + flb_sds_destroy(fname); + return -1; + } + + if (flb_sds_cat_safe(&fname, "/", 1) < 0) { + flb_sds_destroy(fname); + return -1; + } + + if (flb_sds_cat_safe(&fname, name, nlen) < 0) { + flb_sds_destroy(fname); + return -1; + } + + fp = fopen(fname, "w+"); + if (fp == NULL) { + return -1; + } + + dst = flb_sds_create_size(dlen); + ret = flb_base64_decode((unsigned char *)dst, dlen, &dlen, + (unsigned char *)b64_content, blen); + + if (ret != 0) { + return -1; + } - errno = 0; - timestamp = strtol(fname, &ext, 10); + fwrite(dst, dlen, 1, fp); - if ((errno == ERANGE && (timestamp == LONG_MAX || timestamp == LONG_MIN)) || - (errno != 0 && timestamp == 0)) { - flb_errno(); - return FLB_FALSE; + fclose(fp); + flb_sds_destroy(dst); + flb_sds_destroy(fname); + + return 0; +} + +static int create_fleet_files(struct flb_in_calyptia_fleet_config *ctx, + char *payload, size_t size, time_t timestamp) +{ + int ret; + int out_size; + char *pack; + struct flb_pack_state pack_state; + size_t off = 0; + int idx; + flb_sds_t fleetdir; + msgpack_unpacked result; + msgpack_object *map; + msgpack_object *name; + msgpack_object *contents; + + /* Initialize packer */ + flb_pack_state_init(&pack_state); + + /* Pack JSON as msgpack */ + ret = flb_pack_json_state(payload, size, + &pack, &out_size, &pack_state); + flb_pack_state_reset(&pack_state); + + /* Handle exceptions */ + if (ret == FLB_ERR_JSON_PART || ret == FLB_ERR_JSON_INVAL || ret == -1) { + flb_plg_warn(ctx->ins, "invalid JSON message, skipping"); + return -1; + } + + fleetdir = fleet_gendir(ctx, timestamp); + + msgpack_unpacked_init(&result); + while (msgpack_unpack_next(&result, pack, out_size, &off) == MSGPACK_UNPACK_SUCCESS) { + if (result.data.type != MSGPACK_OBJECT_ARRAY) { + continue; } + for (idx = 0; idx < result.data.via.array.size; idx++) { + map = msgpack_lookup_array_offset(&result.data, idx); - /* unable to parse the timstamp */ - if (errno == ERANGE) { - return FLB_FALSE; + if (map == NULL) { + return -1; + } + + name = msgpack_lookup_map_key(map, "name"); + if (name == NULL) { + return -1; + } + if (name->type != MSGPACK_OBJECT_STR) { + return -1; + } + + contents = msgpack_lookup_map_key(map, "contents"); + if (contents == NULL) { + return -1; + } + if (contents->type != MSGPACK_OBJECT_STR) { + return -1; + } + + create_fleet_file(fleetdir, + name->via.str.ptr, + name->via.str.size, + contents->via.str.ptr, + contents->via.str.size); } + } - ctx->config_timestamp = timestamp; + msgpack_unpacked_destroy(&result); + flb_free(pack); + + return 0; +} + +static int get_calyptia_files(struct flb_in_calyptia_fleet_config *ctx, + struct flb_connection *u_conn, + const char *url, + time_t timestamp) +{ + struct flb_http_client *client; + int ret = -1; + + if (ctx == NULL || u_conn == NULL || url == NULL) { + return -1; } - return FLB_FALSE; + client = fleet_http_do(ctx, u_conn, ctx->fleet_files_url); + + if (client == NULL) { + return -1; + } + + fleet_mkdir(ctx, timestamp); + ret = create_fleet_files(ctx, client->resp.payload, client->resp.payload_size, timestamp); + if (ret != 0) { + goto file_error; + } + + ret = 1; + +file_error: + flb_http_client_destroy(client); + return ret; } static int in_calyptia_fleet_init(struct flb_input_instance *in, From 6ad1ef6388a1384243871a4404d969b759a10175 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Thu, 26 Oct 2023 11:17:44 -0300 Subject: [PATCH 10/20] in_calyptia_fleet: improve internal API used for handling configuration file paths. Signed-off-by: Phillip Whelan --- plugins/in_calyptia_fleet/in_calyptia_fleet.c | 57 ++++++++----------- 1 file changed, 25 insertions(+), 32 deletions(-) diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index 8c4a11c50c1..2e3eb3a20b3 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -204,53 +204,46 @@ struct reload_ctx { static flb_sds_t generate_base_fleet_directory(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t *fleet_dir) { - *fleet_dir = flb_sds_create_size(4096); - if (*fleet_dir == NULL) { + if (fleet_dir == NULL) { return NULL; } + if (*fleet_dir == NULL) { + *fleet_dir = flb_sds_create_size(4096); + if (*fleet_dir == NULL) { + return NULL; + } + } + if (ctx->fleet_name != NULL) { return flb_sds_printf(fleet_dir, "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s", - ctx->config_dir, ctx->machine_id, ctx->fleet_name); + ctx->config_dir, ctx->machine_id, ctx->fleet_name); } return flb_sds_printf(fleet_dir, "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s", - ctx->config_dir, ctx->machine_id, ctx->fleet_id); + ctx->config_dir, ctx->machine_id, ctx->fleet_id); } static flb_sds_t fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx, char *fname) { - flb_sds_t cfgname; - - cfgname = flb_sds_create_size(4096); + flb_sds_t cfgname = NULL; + flb_sds_t ret; - if (ctx->fleet_name != NULL) { - flb_sds_printf(&cfgname, - "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s.ini", - ctx->config_dir, ctx->machine_id, ctx->fleet_name, fname); + if (generate_base_fleet_directory(ctx, &cfgname) == NULL) { + return NULL; } - else { - flb_sds_printf(&cfgname, - "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s.ini", - ctx->config_dir, ctx->machine_id, ctx->fleet_id, fname); + + ret = flb_sds_printf(&cfgname, PATH_SEPARATOR "%s.ini", fname); + if (ret == NULL) { + flb_sds_destroy(cfgname); + return NULL; } return cfgname; } -static flb_sds_t new_fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx) -{ - return fleet_config_filename(ctx, "new"); -} - -static flb_sds_t cur_fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx) -{ - return fleet_config_filename(ctx, "cur"); -} - -static flb_sds_t old_fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx) -{ - return fleet_config_filename(ctx, "old"); -} +#define new_fleet_config_filename(a) fleet_config_filename((a), "new") +#define cur_fleet_config_filename(a) fleet_config_filename((a), "cur") +#define old_fleet_config_filename(a) fleet_config_filename((a), "old") static flb_sds_t time_fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx, time_t t) { @@ -1259,7 +1252,7 @@ static int calyptia_config_delete_old_dir(const char *cfgpath) static int calyptia_config_delete_old(struct flb_in_calyptia_fleet_config *ctx) { struct cfl_array *inis; - flb_sds_t glob_ini; + flb_sds_t glob_ini = NULL; int idx; if (generate_base_fleet_directory(ctx, &glob_ini) == NULL) { @@ -1474,7 +1467,7 @@ static int in_calyptia_fleet_collect(struct flb_input_instance *ins, static int create_fleet_directory(struct flb_in_calyptia_fleet_config *ctx) { - flb_sds_t myfleetdir; + flb_sds_t myfleetdir = NULL; if (access(ctx->config_dir, F_OK) != 0) { if (__mkdir(ctx->config_dir, 0700) != 0) { @@ -1499,7 +1492,7 @@ static int create_fleet_directory(struct flb_in_calyptia_fleet_config *ctx) static flb_sds_t fleet_gendir(struct flb_in_calyptia_fleet_config *ctx, time_t timestamp) { - flb_sds_t fleetdir; + flb_sds_t fleetdir = NULL; flb_sds_t fleetcurdir; From 2a02f639aa733222c56fd64c8b95392ca58908c0 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Thu, 26 Oct 2023 11:28:33 -0300 Subject: [PATCH 11/20] in_calyptia_fleet: improve return value and parameter checking. Signed-off-by: Phillip Whelan --- plugins/in_calyptia_fleet/in_calyptia_fleet.c | 93 ++++++++++++++++++- 1 file changed, 89 insertions(+), 4 deletions(-) diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index 2e3eb3a20b3..393e03b1a51 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -228,6 +228,10 @@ static flb_sds_t fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cfgname = NULL; flb_sds_t ret; + if (ctx == NULL || fname == NULL) { + return NULL; + } + if (generate_base_fleet_directory(ctx, &cfgname) == NULL) { return NULL; } @@ -259,11 +263,19 @@ static int is_new_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct int ret = FLB_FALSE; + if (cfg == NULL) { + return FLB_FALSE; + } + if (cfg->conf_path_file == NULL) { return FLB_FALSE; } cfgnewname = new_fleet_config_filename(ctx); + if (cfgnewname == NULL) { + flb_plg_error(ctx->ins, "unable to allocate configuration name"); + return FLB_FALSE; + } if (strcmp(cfgnewname, cfg->conf_path_file) == 0) { ret = FLB_TRUE; @@ -279,12 +291,19 @@ static int is_cur_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_sds_t cfgcurname; int ret = FLB_FALSE; + if (cfg == NULL) { + return FLB_FALSE; + } if (cfg->conf_path_file == NULL) { return FLB_FALSE; } cfgcurname = cur_fleet_config_filename(ctx); + if (cfgcurname == NULL) { + flb_plg_error(ctx->ins, "unable to allocate configuration name"); + return FLB_FALSE; + } if (strcmp(cfgcurname, cfg->conf_path_file) == 0) { ret = FLB_TRUE; @@ -330,6 +349,10 @@ static int is_timestamped_fleet_config(struct flb_in_calyptia_fleet_config *ctx, char *end; long val; + if (cfg == NULL) { + return FLB_FALSE; + } + if (cfg->conf_path_file == NULL) { return FLB_FALSE; } @@ -360,6 +383,10 @@ static int is_timestamped_fleet_config(struct flb_in_calyptia_fleet_config *ctx, static int is_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg) { + if (cfg == NULL) { + return FLB_FALSE; + } + if (cfg->conf_path_file == NULL) { return FLB_FALSE; } @@ -377,6 +404,11 @@ static int exists_new_fleet_config(struct flb_in_calyptia_fleet_config *ctx) cfgnewname = new_fleet_config_filename(ctx); + if (cfgnewname == NULL) { + flb_plg_error(ctx->ins, "unable to allocate configuration name"); + return FLB_FALSE; + } + ret = access(cfgnewname, F_OK) == 0 ? FLB_TRUE : FLB_FALSE; flb_sds_destroy(cfgnewname); @@ -390,6 +422,11 @@ static int exists_cur_fleet_config(struct flb_in_calyptia_fleet_config *ctx) cfgcurname = cur_fleet_config_filename(ctx); + if (cfgcurname == NULL) { + flb_plg_error(ctx->ins, "unable to allocate configuration name"); + return FLB_FALSE; + } + ret = access(cfgcurname, F_OK) == 0 ? FLB_TRUE : FLB_FALSE; flb_sds_destroy(cfgcurname); @@ -403,6 +440,11 @@ static int exists_old_fleet_config(struct flb_in_calyptia_fleet_config *ctx) cfgoldname = old_fleet_config_filename(ctx); + if (cfgoldname == NULL) { + flb_plg_error(ctx->ins, "unable to allocate configuration name"); + return FLB_FALSE; + } + ret = access(cfgoldname, F_OK) == 0 ? FLB_TRUE : FLB_FALSE; flb_sds_destroy(cfgoldname); @@ -413,6 +455,10 @@ static void *do_reload(void *data) { struct reload_ctx *reload = (struct reload_ctx *)data; + if (reload == NULL) { + return NULL; + } + /* avoid reloading the current configuration... just use our new one! */ flb_context_set(reload->flb); reload->flb->config->enable_hot_reload = FLB_TRUE; @@ -433,6 +479,9 @@ static int test_config_is_valid(flb_sds_t cfgpath) struct flb_cf *conf; int ret = FLB_FALSE; + if (cfgpath == NULL) { + return FLB_FALSE; + } conf = flb_cf_create(); @@ -464,7 +513,7 @@ static int parse_config_name_timestamp(struct flb_in_calyptia_fleet_config *ctx, char *fname; ssize_t len; - if (config_timestamp == NULL || cfgpath == NULL) { + if (ctx == NULL || config_timestamp == NULL || cfgpath == NULL) { return FLB_FALSE; } @@ -500,7 +549,7 @@ static int parse_config_timestamp(struct flb_in_calyptia_fleet_config *ctx, { flb_ctx_t *flb_ctx = flb_context_get(); - if (config_timestamp == NULL) { + if (ctx == NULL || config_timestamp == NULL) { return FLB_FALSE; } @@ -515,7 +564,9 @@ static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cf flb_ctx_t *flb = flb_context_get(); - parse_config_name_timestamp(ctx, cfgpath, &ctx->config_timestamp); + if (parse_config_name_timestamp(ctx, cfgpath, &ctx->config_timestamp) != FLB_TRUE) { + return FLB_FALSE; + } if (ctx->collect_fd > 0) { flb_input_collector_pause(ctx->collect_fd, ctx->ins); @@ -630,6 +681,10 @@ static flb_sds_t parse_api_key_json(struct flb_in_calyptia_fleet_config *ctx, msgpack_object *projectID; flb_sds_t project_id = NULL; + if (ctx == NULL || payload == NULL) { + return NULL; + } + /* Initialize packer */ flb_pack_state_init(&pack_state); @@ -683,6 +738,10 @@ static ssize_t parse_fleet_search_json(struct flb_in_calyptia_fleet_config *ctx, msgpack_object *map; msgpack_object *fleet; + if (ctx == NULL || payload == NULL) { + return -1; + } + /* Initialize packer */ flb_pack_state_init(&pack_state); @@ -738,6 +797,10 @@ static flb_sds_t get_project_id_from_api_key(struct flb_in_calyptia_fleet_config size_t elen; int ret; + if (ctx == NULL) { + return NULL; + } + api_token_sep = strchr(ctx->api_key, '.'); if (api_token_sep == NULL) { @@ -773,6 +836,10 @@ static struct flb_http_client *fleet_http_do(struct flb_in_calyptia_fleet_config size_t b_sent; int ret = -1; + if (ctx == NULL || u_conn == NULL || url == NULL) { + return NULL; + } + client = flb_http_client(u_conn, FLB_HTTP_GET, url, NULL, 0, ctx->ins->host.name, ctx->ins->host.port, NULL, 0); @@ -821,6 +888,10 @@ static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ct flb_sds_t url; flb_sds_t project_id; + if (ctx == NULL || u_conn == NULL || config == NULL) { + return -1; + } + project_id = get_project_id_from_api_key(ctx); if (project_id == NULL) { @@ -1170,6 +1241,9 @@ static int calyptia_config_add(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cfgoldname; cfgnewname = new_fleet_config_filename(ctx); + if (cfgnewname == NULL) { + return -1; + } if (exists_new_fleet_config(ctx) == FLB_TRUE) { cfgoldname = old_fleet_config_filename(ctx); @@ -1178,7 +1252,10 @@ static int calyptia_config_add(struct flb_in_calyptia_fleet_config *ctx, flb_sds_destroy(cfgoldname); } - symlink(cfgname, cfgnewname); + if (symlink(cfgname, cfgnewname)) { + flb_sds_destroy(cfgnewname); + return -1; + } flb_sds_destroy(cfgnewname); return 0; @@ -1219,6 +1296,10 @@ static int calyptia_config_delete_old_dir(const char *cfgpath) struct cfl_array *files; int idx; + if (cfgpath == NULL) { + return FLB_FALSE; + } + ext = strrchr(cfgpath, '.'); if (ext == NULL) { return FLB_FALSE; @@ -1255,6 +1336,10 @@ static int calyptia_config_delete_old(struct flb_in_calyptia_fleet_config *ctx) flb_sds_t glob_ini = NULL; int idx; + if (ctx == NULL) { + return -1; + } + if (generate_base_fleet_directory(ctx, &glob_ini) == NULL) { flb_sds_destroy(glob_ini); return -1; From 2e835c928d30760403e0a889e2ba62b921707795 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Thu, 26 Oct 2023 11:38:00 -0300 Subject: [PATCH 12/20] in_calyptia_fleet: improve config_commit/add/rollback. Signed-off-by: Phillip Whelan --- plugins/in_calyptia_fleet/in_calyptia_fleet.c | 137 +++++++++++++----- 1 file changed, 104 insertions(+), 33 deletions(-) diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index 393e03b1a51..f1eea5b1478 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -504,12 +504,12 @@ static int test_config_is_valid(flb_sds_t cfgpath) } static int parse_config_name_timestamp(struct flb_in_calyptia_fleet_config *ctx, - const char *cfgpath, + const char *cfgpath, long *config_timestamp) { char *ext = NULL; long timestamp; - char realname[4096]; + char realname[4096] = {0}; char *fname; ssize_t len; @@ -1237,28 +1237,53 @@ static struct cfl_array *read_glob(const char *path) static int calyptia_config_add(struct flb_in_calyptia_fleet_config *ctx, const char *cfgname) { - flb_sds_t cfgnewname; - flb_sds_t cfgoldname; + int rc = FLB_FALSE; + + flb_sds_t cfgnewname = NULL; + flb_sds_t cfgoldname = NULL; + flb_sds_t cfgcurname = NULL; cfgnewname = new_fleet_config_filename(ctx); - if (cfgnewname == NULL) { - return -1; + cfgcurname = cur_fleet_config_filename(ctx); + cfgoldname = old_fleet_config_filename(ctx); + + if (cfgnewname == NULL || cfgcurname == NULL || cfgoldname == NULL) { + goto error; } if (exists_new_fleet_config(ctx) == FLB_TRUE) { - cfgoldname = old_fleet_config_filename(ctx); - rename(cfgnewname, cfgoldname); - unlink(cfgnewname); - flb_sds_destroy(cfgoldname); + + if (rename(cfgnewname, cfgoldname)) { + goto error; + } + } + else if (exists_cur_fleet_config(ctx) == FLB_TRUE) { + + if (rename(cfgcurname, cfgoldname)) { + goto error; + } } if (symlink(cfgname, cfgnewname)) { + goto error; + } + + rc = FLB_TRUE; + +error: + if (cfgnewname) { flb_sds_destroy(cfgnewname); - return -1; } - flb_sds_destroy(cfgnewname); - return 0; + if (cfgcurname) { + flb_sds_destroy(cfgcurname); + } + + if (cfgoldname) { + flb_sds_destroy(cfgoldname); + } + + return rc; } static int cfl_array_qsort_ini(const void *arg_a, const void *arg_b) @@ -1325,8 +1350,16 @@ static int calyptia_config_delete_old_dir(const char *cfgpath) unlink(files->entries[idx]->data.as_string); } + /* attempt to delete the main directory */ + ext = strrchr(cfg_glob, '/'); + if (ext) { + *ext = '\0'; + rmdir(cfg_glob); + } + flb_sds_destroy(cfg_glob); cfl_array_destroy(files); + return FLB_TRUE; } @@ -1367,23 +1400,25 @@ static int calyptia_config_delete_old(struct flb_in_calyptia_fleet_config *ctx) cfl_array_destroy(inis); flb_sds_destroy(glob_ini); + return 0; } static int calyptia_config_commit(struct flb_in_calyptia_fleet_config *ctx) { - flb_sds_t cfgnewname; - flb_sds_t cfgcurname; - flb_sds_t cfgoldname; - flb_sds_t cfgtimename; + int rc = FLB_FALSE; + flb_sds_t cfgnewname = NULL; + flb_sds_t cfgcurname = NULL; + flb_sds_t cfgoldname = NULL; cfgnewname = new_fleet_config_filename(ctx); cfgcurname = cur_fleet_config_filename(ctx); cfgoldname = old_fleet_config_filename(ctx); - cfgtimename = time_fleet_config_filename(ctx, ctx->config_timestamp); - if (exists_new_fleet_config(ctx) == FLB_TRUE) { - unlink(cfgnewname); + if (cfgnewname == NULL || + cfgcurname == NULL || + cfgoldname == NULL) { + goto error; } if (exists_old_fleet_config(ctx) == FLB_TRUE) { @@ -1391,24 +1426,40 @@ static int calyptia_config_commit(struct flb_in_calyptia_fleet_config *ctx) } if (exists_cur_fleet_config(ctx) == FLB_TRUE) { - unlink(cfgcurname); + if (rename(cfgcurname, cfgoldname)) { + goto error; + } } - symlink(cfgtimename, cfgcurname); - - flb_sds_destroy(cfgnewname); - flb_sds_destroy(cfgcurname); - flb_sds_destroy(cfgoldname); - flb_sds_destroy(cfgtimename); + if (exists_new_fleet_config(ctx) == FLB_TRUE) { + if (rename(cfgnewname, cfgcurname)) { + goto error; + } + } calyptia_config_delete_old(ctx); + rc = FLB_TRUE; - return 0; +error: + if (cfgnewname) { + flb_sds_destroy(cfgnewname); + } + + if (cfgcurname) { + flb_sds_destroy(cfgcurname); + } + + if (cfgoldname) { + flb_sds_destroy(cfgoldname); + } + + return rc; } static int calyptia_config_rollback(struct flb_in_calyptia_fleet_config *ctx, const char *cfgname) { + int rc = FLB_TRUE; flb_sds_t cfgnewname; flb_sds_t cfgcurname; flb_sds_t cfgoldname; @@ -1417,6 +1468,10 @@ static int calyptia_config_rollback(struct flb_in_calyptia_fleet_config *ctx, cfgcurname = cur_fleet_config_filename(ctx); cfgoldname = old_fleet_config_filename(ctx); + if (cfgnewname == NULL || cfgcurname == NULL || cfgoldname == NULL) { + goto error; + } + if (exists_new_fleet_config(ctx) == FLB_TRUE) { unlink(cfgnewname); } @@ -1425,11 +1480,22 @@ static int calyptia_config_rollback(struct flb_in_calyptia_fleet_config *ctx, rename(cfgoldname, cfgcurname); } - flb_sds_destroy(cfgnewname); - flb_sds_destroy(cfgcurname); - flb_sds_destroy(cfgoldname); + rc = FLB_TRUE; - return 0; +error: + if (cfgnewname) { + flb_sds_destroy(cfgnewname); + } + + if (cfgcurname) { + flb_sds_destroy(cfgcurname); + } + + if (cfgoldname) { + flb_sds_destroy(cfgoldname); + } + + return rc; } static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx, @@ -1505,7 +1571,12 @@ static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx, get_calyptia_files(ctx, u_conn, ctx->fleet_files_url, time_last_modified); cfgname = time_fleet_config_filename(ctx, time_last_modified); - calyptia_config_add(ctx, cfgname); + + if (calyptia_config_add(ctx, cfgname) == FLB_FALSE) { + flb_sds_destroy(cfgname); + return -1; + } + flb_sds_destroy(cfgname); cfgnewname = new_fleet_config_filename(ctx); From 8a0fac62f1a10f37eb5967950c0473c5a54eda27 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Thu, 26 Oct 2023 11:38:46 -0300 Subject: [PATCH 13/20] in_calyptia_fleet: finish read_glob for files on windows. Signed-off-by: Phillip Whelan --- plugins/in_calyptia_fleet/in_calyptia_fleet.c | 359 +++++++++++++----- 1 file changed, 271 insertions(+), 88 deletions(-) diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index f1eea5b1478..f1b2dfbe174 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -107,6 +107,52 @@ static int get_calyptia_files(struct flb_in_calyptia_fleet_config *ctx, static int fleet_cur_chdir(struct flb_in_calyptia_fleet_config *ctx); +#ifndef FLB_SYSTEM_WINDOWS + +static int is_link(const char *path) { + struct stat st = { 0 }; + + if (lstat(path, &st) != 0) { + return -1; + } + + if ((st.st_mode & S_IFMT) == S_IFLNK) { + return FLB_TRUE; + } + + return FLB_FALSE; +} +#else +/* symlinks are too difficult to use on win32 so we skip their use entirely. */ +static int is_link(const char *path) { + return FLB_FALSE; +} + +/* we include readlink just in case... and also so it can link. */ +ssize_t readlink(const char *path, char *realpath, size_t srealpath) { + HANDLE hFile; + DWORD ret; + + hFile = CreateFile(path, GENERIC_READ, FILE_SHARE_READ, NULL, OPEN_EXISTING, + FILE_ATTRIBUTE_NORMAL, NULL); + + if (hFile == INVALID_HANDLE_VALUE) { + return -1; + } + + ret = GetFinalPathNameByHandleA(hFile, realpath, srealpath, VOLUME_NAME_NT); + + if (ret < srealpath) { + CloseHandle(hFile); + return -1; + } + + CloseHandle(hFile); + return ret; +} +#endif + + static char *find_case_header(struct flb_http_client *cli, const char *header) { char *ptr; @@ -236,7 +282,7 @@ static flb_sds_t fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx, return NULL; } - ret = flb_sds_printf(&cfgname, PATH_SEPARATOR "%s.ini", fname); + ret = flb_sds_printf(&cfgname, PATH_SEPARATOR "%s.conf", fname); if (ret == NULL) { flb_sds_destroy(cfgname); return NULL; @@ -374,7 +420,7 @@ static int is_timestamped_fleet_config(struct flb_in_calyptia_fleet_config *ctx, return FLB_FALSE; } - if (strcmp(end, ".ini") == 0) { + if (strcmp(end, ".conf") == 0) { return FLB_TRUE; } @@ -474,7 +520,8 @@ static void *do_reload(void *data) return NULL; } -static int test_config_is_valid(flb_sds_t cfgpath) +static int test_config_is_valid(struct flb_in_calyptia_fleet_config *ctx, + flb_sds_t cfgpath) { struct flb_cf *conf; int ret = FLB_FALSE; @@ -486,12 +533,17 @@ static int test_config_is_valid(flb_sds_t cfgpath) conf = flb_cf_create(); if (conf == NULL) { + flb_plg_debug(ctx->ins, "unable to create conf during validation test: %s", + cfgpath); goto config_init_error; } conf = flb_cf_create_from_file(conf, cfgpath); if (conf == NULL) { + flb_plg_debug(ctx->ins, + "unable to create conf from file during validation test: %s", + cfgpath); goto cf_create_from_file_error; } @@ -517,14 +569,26 @@ static int parse_config_name_timestamp(struct flb_in_calyptia_fleet_config *ctx, return FLB_FALSE; } - len = readlink(cfgpath, realname, sizeof(realname)); + switch (is_link(cfgpath)) { + case FLB_TRUE: + len = readlink(cfgpath, realname, sizeof(realname)); - if (len > sizeof(realname)) { + if (len > sizeof(realname)) { + return FLB_FALSE; + } + break; + case FLB_FALSE: + strncpy(realname, cfgpath, sizeof(realname)-1); + break; + default: + flb_errno(); return FLB_FALSE; } fname = basename(realname); + flb_plg_debug(ctx->ins, "parsing configuration timestamp from path: %s", fname); + errno = 0; timestamp = strtol(fname, &ext, 10); @@ -563,11 +627,14 @@ static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cf pthread_attr_t ptha; flb_ctx_t *flb = flb_context_get(); - if (parse_config_name_timestamp(ctx, cfgpath, &ctx->config_timestamp) != FLB_TRUE) { return FLB_FALSE; } + reload = flb_calloc(1, sizeof(struct reload_ctx)); + reload->flb = flb; + reload->cfg_path = flb_sds_create(cfgpath); + if (ctx->collect_fd > 0) { flb_input_collector_pause(ctx->collect_fd, ctx->ins); } @@ -587,9 +654,9 @@ static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cf * otherwise flb_reload errors out with: * [error] [reload] given flb context is NULL */ - flb_plg_info(ctx->ins, "loading configuration from %s.", cfgpath); + flb_plg_info(ctx->ins, "loading configuration from %s.", reload->cfg_path); - if (test_config_is_valid(cfgpath) == FLB_FALSE) { + if (test_config_is_valid(ctx, reload->cfg_path) == FLB_FALSE) { flb_plg_error(ctx->ins, "unable to load configuration."); if (ctx->collect_fd > 0) { @@ -600,11 +667,10 @@ static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cf return FLB_FALSE; } - reload = flb_calloc(1, sizeof(struct reload_ctx)); - reload->flb = flb; - reload->cfg_path = cfgpath; - - fleet_cur_chdir(ctx); + if (fleet_cur_chdir(ctx) == -1) { + flb_errno(); + flb_plg_error(ctx->ins, "unable to change to configuration directory"); + } pthread_attr_init(&ptha); pthread_attr_setdetachstate(&ptha, PTHREAD_CREATE_DETACHED); @@ -1144,7 +1210,7 @@ static char *dirname(char *path) return path; } -static struct cfl_array *read_glob(const char *path) +static struct cfl_array *read_glob_win(const char *path, struct cfl_array *list) { char *star, *p0, *p1; char pattern[MAX_PATH]; @@ -1155,13 +1221,15 @@ static struct cfl_array *read_glob(const char *path) WIN32_FIND_DATA data; if (strlen(path) > MAX_PATH - 1) { - return -1; + flb_error("path too long: %s", path); + return NULL; } star = strchr(path, '*'); if (star == NULL) { - return -1; + flb_error("path has no wild card: %s", path); + return NULL; } /* @@ -1188,7 +1256,26 @@ static struct cfl_array *read_glob(const char *path) hnd = FindFirstFileA(pattern, &data); if (hnd == INVALID_HANDLE_VALUE) { - return 0; + flb_error("unable to open valid handle for: %s", path); + return NULL; + } + + if (list == NULL) { + list = cfl_array_create(3); + + if (list == NULL) { + flb_error("unable to allocate array"); + FindClose(hnd); + return NULL; + } + + /* cfl_array_resizable is hardcoded to return 0. */ + if (cfl_array_resizable(list, FLB_TRUE) != 0) { + flb_error("unable to make array resizable"); + FindClose(hnd); + cfl_array_destroy(list); + return NULL; + } } do { @@ -1215,78 +1302,33 @@ static struct cfl_array *read_glob(const char *path) } if (strchr(p1, '*')) { - read_glob(conf, ctx, state, buf); /* recursive */ + if (read_glob_win(path, list) == NULL) { + cfl_array_destroy(list); + FindClose(hnd); + return NULL; + } continue; } ret = stat(buf, &st); if (ret == 0 && (st.st_mode & S_IFMT) == S_IFREG) { - - if (read_config(conf, ctx, state, buf) < 0) { - return -1; - } + cfl_array_append_string(list, buf); } } while (FindNextFileA(hnd, &data) != 0); FindClose(hnd); - return 0; + return list; } -#endif -static int calyptia_config_add(struct flb_in_calyptia_fleet_config *ctx, - const char *cfgname) +static struct cfl_array *read_glob(const char *path) { - int rc = FLB_FALSE; - - flb_sds_t cfgnewname = NULL; - flb_sds_t cfgoldname = NULL; - flb_sds_t cfgcurname = NULL; - - cfgnewname = new_fleet_config_filename(ctx); - cfgcurname = cur_fleet_config_filename(ctx); - cfgoldname = old_fleet_config_filename(ctx); - - if (cfgnewname == NULL || cfgcurname == NULL || cfgoldname == NULL) { - goto error; - } - - if (exists_new_fleet_config(ctx) == FLB_TRUE) { - - if (rename(cfgnewname, cfgoldname)) { - goto error; - } - } - else if (exists_cur_fleet_config(ctx) == FLB_TRUE) { - - if (rename(cfgcurname, cfgoldname)) { - goto error; - } - } - - if (symlink(cfgname, cfgnewname)) { - goto error; - } - - rc = FLB_TRUE; - -error: - if (cfgnewname) { - flb_sds_destroy(cfgnewname); - } - - if (cfgcurname) { - flb_sds_destroy(cfgcurname); - } - - if (cfgoldname) { - flb_sds_destroy(cfgoldname); - } - - return rc; + return read_glob_win(path, NULL); } -static int cfl_array_qsort_ini(const void *arg_a, const void *arg_b) +#endif + +static int cfl_array_qsort_conf_files(const void *arg_a, const void *arg_b) { struct cfl_variant *var_a = (struct cfl_variant *)*(void **)arg_a; struct cfl_variant *var_b = (struct cfl_variant *)*(void **)arg_b; @@ -1335,7 +1377,7 @@ static int calyptia_config_delete_old_dir(const char *cfgpath) return FLB_FALSE; } - if (flb_sds_cat_safe(&cfg_glob, "/*", strlen("/*")) != 0) { + if (flb_sds_cat_safe(&cfg_glob, PATH_SEPARATOR "*", strlen(PATH_SEPARATOR "*")) != 0) { flb_sds_destroy(cfg_glob); return FLB_FALSE; } @@ -1366,32 +1408,32 @@ static int calyptia_config_delete_old_dir(const char *cfgpath) static int calyptia_config_delete_old(struct flb_in_calyptia_fleet_config *ctx) { struct cfl_array *inis; - flb_sds_t glob_ini = NULL; + flb_sds_t glob_files = NULL; int idx; if (ctx == NULL) { return -1; } - if (generate_base_fleet_directory(ctx, &glob_ini) == NULL) { - flb_sds_destroy(glob_ini); + if (generate_base_fleet_directory(ctx, &glob_files) == NULL) { + flb_sds_destroy(glob_files); return -1; } - if (flb_sds_cat_safe(&glob_ini, "/*.ini", strlen("/*.ini")) != 0) { - flb_sds_destroy(glob_ini); + if (flb_sds_cat_safe(&glob_files, PATH_SEPARATOR "*.conf", strlen(PATH_SEPARATOR "*.conf")) != 0) { + flb_sds_destroy(glob_files); return -1; } - inis = read_glob(glob_ini); + inis = read_glob(glob_files); if (inis == NULL) { - flb_sds_destroy(glob_ini); + flb_sds_destroy(glob_files); return -1; } qsort(inis->entries, inis->entry_count, sizeof(struct cfl_variant *), - cfl_array_qsort_ini); + cfl_array_qsort_conf_files); for (idx = 0; idx < (((ssize_t)inis->entry_count) -1 - 3); idx++) { unlink(inis->entries[idx]->data.as_string); @@ -1399,11 +1441,109 @@ static int calyptia_config_delete_old(struct flb_in_calyptia_fleet_config *ctx) } cfl_array_destroy(inis); - flb_sds_destroy(glob_ini); + flb_sds_destroy(glob_files); return 0; } +static flb_sds_t calyptia_config_get_newest(struct flb_in_calyptia_fleet_config *ctx) +{ + struct cfl_array *inis; + flb_sds_t glob_conf_files = NULL; + flb_sds_t cfgnewname = NULL; + + if (ctx == NULL) { + return NULL; + } + + if (generate_base_fleet_directory(ctx, &glob_conf_files) == NULL) { + flb_plg_error(ctx->ins, "unable to generate fleet directory name"); + flb_sds_destroy(glob_conf_files); + return NULL; + } + + if (flb_sds_cat_safe(&glob_conf_files, PATH_SEPARATOR "*.conf", strlen(PATH_SEPARATOR "*.conf")) != 0) { + flb_plg_error(ctx->ins, "unable to concatenate fleet glob"); + flb_sds_destroy(glob_conf_files); + return NULL; + } + + inis = read_glob(glob_conf_files); + if (inis == NULL) { + flb_plg_error(ctx->ins, "unable to read fleet directory for conf files: %s", + glob_conf_files); + flb_sds_destroy(glob_conf_files); + return NULL; + } + + qsort(inis->entries, inis->entry_count, + sizeof(struct cfl_variant *), + cfl_array_qsort_conf_files); + + cfgnewname = flb_sds_create_len(inis->entries[inis->entry_count-1]->data.as_string, + strlen(inis->entries[inis->entry_count-1]->data.as_string)); + + cfl_array_destroy(inis); + flb_sds_destroy(glob_conf_files); + + return cfgnewname; +} + +#ifndef FLB_SYSTEM_WINDOWS + +static int calyptia_config_add(struct flb_in_calyptia_fleet_config *ctx, + const char *cfgname) +{ + int rc = FLB_FALSE; + + flb_sds_t cfgnewname = NULL; + flb_sds_t cfgoldname = NULL; + flb_sds_t cfgcurname = NULL; + + cfgnewname = new_fleet_config_filename(ctx); + cfgcurname = cur_fleet_config_filename(ctx); + cfgoldname = old_fleet_config_filename(ctx); + + if (cfgnewname == NULL || cfgcurname == NULL || cfgoldname == NULL) { + goto error; + } + + if (exists_new_fleet_config(ctx) == FLB_TRUE) { + + if (rename(cfgnewname, cfgoldname)) { + goto error; + } + } + else if (exists_cur_fleet_config(ctx) == FLB_TRUE) { + + if (rename(cfgcurname, cfgoldname)) { + goto error; + } + } + + if (symlink(cfgname, cfgnewname)) { + flb_plg_error(ctx->ins, "unable to create new configuration symlink."); + goto error; + } + + rc = FLB_TRUE; + +error: + if (cfgnewname) { + flb_sds_destroy(cfgnewname); + } + + if (cfgcurname) { + flb_sds_destroy(cfgcurname); + } + + if (cfgoldname) { + flb_sds_destroy(cfgoldname); + } + + return rc; +} + static int calyptia_config_commit(struct flb_in_calyptia_fleet_config *ctx) { int rc = FLB_FALSE; @@ -1497,6 +1637,26 @@ static int calyptia_config_rollback(struct flb_in_calyptia_fleet_config *ctx, return rc; } +#else +static int calyptia_config_add(struct flb_in_calyptia_fleet_config *ctx, + const char *cfgname) +{ + return FLB_TRUE; +} + +static int calyptia_config_commit(struct flb_in_calyptia_fleet_config *ctx) +{ + calyptia_config_delete_old(ctx); + return FLB_TRUE; +} + +static int calyptia_config_rollback(struct flb_in_calyptia_fleet_config *ctx, + const char *cfgname) +{ + unlink(cfgname); + return FLB_TRUE; +} +#endif static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_connection *u_conn) @@ -1573,18 +1733,26 @@ static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx, cfgname = time_fleet_config_filename(ctx, time_last_modified); if (calyptia_config_add(ctx, cfgname) == FLB_FALSE) { + flb_plg_error(ctx->ins, "unable to add config: %s", cfgname); flb_sds_destroy(cfgname); return -1; } flb_sds_destroy(cfgname); +#ifndef FLB_SYSTEM_WINDOWS cfgnewname = new_fleet_config_filename(ctx); if (execute_reload(ctx, cfgnewname) == FLB_FALSE) { calyptia_config_rollback(ctx, cfgname); - flb_sds_destroy(cfgnewname); + flb_sds_destroy(cfgname); + return -1; + } +#else + if (execute_reload(ctx, cfgname) == FLB_FALSE) { + calyptia_config_rollback(ctx, cfgname); return -1; } +#endif } return 0; @@ -1663,7 +1831,7 @@ static flb_sds_t fleet_gendir(struct flb_in_calyptia_fleet_config *ctx, time_t t return NULL; } - if (flb_sds_printf(&fleetcurdir, "%s/%ld", fleetdir, timestamp) == NULL) { + if (flb_sds_printf(&fleetcurdir, "%s" PATH_SEPARATOR "%ld", fleetdir, timestamp) == NULL) { flb_sds_destroy(fleetdir); flb_sds_destroy(fleetcurdir); return NULL; @@ -1711,6 +1879,7 @@ static int fleet_cur_chdir(struct flb_in_calyptia_fleet_config *ctx) static int load_fleet_config(struct flb_in_calyptia_fleet_config *ctx) { flb_ctx_t *flb_ctx = flb_context_get(); + flb_sds_t cfgnewname = NULL; if (create_fleet_directory(ctx) != 0) { flb_plg_error(ctx->ins, "unable to create fleet directories"); @@ -1719,6 +1888,7 @@ static int load_fleet_config(struct flb_in_calyptia_fleet_config *ctx) /* check if we are already using the fleet configuration file. */ if (is_fleet_config(ctx, flb_ctx->config) == FLB_FALSE) { + flb_plg_debug(ctx->ins, "loading configuration file"); /* check which one and load it */ if (exists_cur_fleet_config(ctx) == FLB_TRUE) { return execute_reload(ctx, cur_fleet_config_filename(ctx)); @@ -1726,8 +1896,21 @@ static int load_fleet_config(struct flb_in_calyptia_fleet_config *ctx) else if (exists_new_fleet_config(ctx) == FLB_TRUE) { return execute_reload(ctx, new_fleet_config_filename(ctx)); } + else { + cfgnewname = calyptia_config_get_newest(ctx); + + if (cfgnewname != NULL) { + flb_plg_debug(ctx->ins, "loading newest configuration: %s", cfgnewname); + return execute_reload(ctx, cfgnewname); + } + else { + flb_plg_warn(ctx->ins, "unable to find latest configuration file"); + } + } } else { + flb_plg_debug(ctx->ins, "we are already using a configuration file: %s", + flb_ctx->config->conf_path_file); parse_config_timestamp(ctx, &ctx->config_timestamp); } @@ -1847,7 +2030,7 @@ static int create_fleet_files(struct flb_in_calyptia_fleet_config *ctx, return -1; } - create_fleet_file(fleetdir, + create_fleet_file(fleetdir, name->via.str.ptr, name->via.str.size, contents->via.str.ptr, @@ -1985,7 +2168,7 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, return 0; } - if (exists_new_fleet_config(ctx) == FLB_TRUE) { + if (exists_new_fleet_config(ctx) == FLB_TRUE || is_fleet_config(ctx, ctx->config)) { calyptia_config_commit(ctx); } From e5e293a4bec0b6ad02e4c04acc26516ea3093bdb Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Mon, 6 Nov 2023 12:36:22 -0300 Subject: [PATCH 14/20] in_calyptia_fleet: fix memory leak from configuration file name. Signed-off-by: Phillip Whelan --- plugins/in_calyptia_fleet/in_calyptia_fleet.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index f1b2dfbe174..310c697ae18 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -633,7 +633,7 @@ static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cf reload = flb_calloc(1, sizeof(struct reload_ctx)); reload->flb = flb; - reload->cfg_path = flb_sds_create(cfgpath); + reload->cfg_path = cfgpath; if (ctx->collect_fd > 0) { flb_input_collector_pause(ctx->collect_fd, ctx->ins); From ce2c2a7cb67138f092af5964da3f0a2adfd54150 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Tue, 7 Nov 2023 10:12:46 -0300 Subject: [PATCH 15/20] in_calyptia_fleet: remove readklink for win32. Signed-off-by: Phillip Whelan --- plugins/in_calyptia_fleet/in_calyptia_fleet.c | 51 ------------------- 1 file changed, 51 deletions(-) diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index 310c697ae18..ee2e5cfa0b7 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -127,29 +127,6 @@ static int is_link(const char *path) { static int is_link(const char *path) { return FLB_FALSE; } - -/* we include readlink just in case... and also so it can link. */ -ssize_t readlink(const char *path, char *realpath, size_t srealpath) { - HANDLE hFile; - DWORD ret; - - hFile = CreateFile(path, GENERIC_READ, FILE_SHARE_READ, NULL, OPEN_EXISTING, - FILE_ATTRIBUTE_NORMAL, NULL); - - if (hFile == INVALID_HANDLE_VALUE) { - return -1; - } - - ret = GetFinalPathNameByHandleA(hFile, realpath, srealpath, VOLUME_NAME_NT); - - if (ret < srealpath) { - CloseHandle(hFile); - return -1; - } - - CloseHandle(hFile); - return ret; -} #endif @@ -989,34 +966,6 @@ static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ct return 0; } -#ifdef FLB_SYSTEM_WINDOWS -#define link(a, b) CreateHardLinkA(b, a, 0) -#define symlink(a, b) CreateSymLinkA(b, a, 0) - -ssize_t readlink(const char *path, char *realpath, size_t srealpath) { - HANDLE hFile; - DWORD ret; - - hFile = CreateFile(path, GENERIC_READ, FILE_SHARE_READ, NULL, OPEN_EXISTING, - FILE_ATTRIBUTE_NORMAL, NULL); - - if (hFile == INVALID_HANDLE_VALUE) { - return -1; - } - - ret = GetFinalPathNameByHandleA(hFile, realpath, srealpath, VOLUME_NAME_NT); - - if (ret < srealpath) { - CloseHandle(hFile); - return -1; - } - - CloseHandle(hFile); - return ret; -} - -#endif - #ifdef FLB_SYSTEM_WINDOWS #define _mkdir(a, b) mkdir(a) #else From b6c0786bc0207cf5c5b350db60ec9bd4498c303e Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Mon, 13 Nov 2023 09:22:24 -0300 Subject: [PATCH 16/20] in_calyptia_fleet: fix use-after-free when reloading fleet configuration on windows. Signed-off-by: Phillip Whelan --- plugins/in_calyptia_fleet/in_calyptia_fleet.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index ee2e5cfa0b7..8b53816dfd8 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -1687,9 +1687,8 @@ static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx, return -1; } - flb_sds_destroy(cfgname); - #ifndef FLB_SYSTEM_WINDOWS + flb_sds_destroy(cfgname); cfgnewname = new_fleet_config_filename(ctx); if (execute_reload(ctx, cfgnewname) == FLB_FALSE) { calyptia_config_rollback(ctx, cfgname); From 2a2aa52bfcf688401ad6758f852af7d7a841dc64 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Tue, 14 Nov 2023 15:55:39 -0300 Subject: [PATCH 17/20] in_calyptia_fleet: remove config from flb_in_calyptia_fleet_config. Signed-off-by: Phillip Whelan --- plugins/in_calyptia_fleet/in_calyptia_fleet.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index 8b53816dfd8..58e08482244 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -92,7 +92,6 @@ struct flb_in_calyptia_fleet_config { flb_sds_t fleet_files_url; struct flb_input_instance *ins; /* plugin instance */ - struct flb_config *config; /* Fluent Bit context */ /* Networking */ struct flb_upstream *u; @@ -2116,7 +2115,7 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, return 0; } - if (exists_new_fleet_config(ctx) == FLB_TRUE || is_fleet_config(ctx, ctx->config)) { + if (exists_new_fleet_config(ctx) == FLB_TRUE || is_fleet_config(ctx, config)) { calyptia_config_commit(ctx); } From 471d7e7db26d0d410fcf80b1297843476a02daa7 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Tue, 14 Nov 2023 15:56:53 -0300 Subject: [PATCH 18/20] in_calyptia_fleet: commit to new configuration without checking if the 'new' symlink exists. Signed-off-by: Phillip Whelan --- plugins/in_calyptia_fleet/in_calyptia_fleet.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index 58e08482244..37608ba3d19 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -2115,7 +2115,7 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, return 0; } - if (exists_new_fleet_config(ctx) == FLB_TRUE || is_fleet_config(ctx, config)) { + if (is_fleet_config(ctx, config)) { calyptia_config_commit(ctx); } From 7d2097df9066484be09008b6736a133c4ea62d9d Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Tue, 14 Nov 2023 16:35:37 -0300 Subject: [PATCH 19/20] in_calyptia_fleet: remove configuration file directories for fleets without files. Signed-off-by: Phillip Whelan --- plugins/in_calyptia_fleet/in_calyptia_fleet.c | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index 37608ba3d19..fb321a972bf 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -1331,24 +1331,22 @@ static int calyptia_config_delete_old_dir(const char *cfgpath) } files = read_glob(cfg_glob); - if (files == NULL) { - flb_sds_destroy(cfg_glob); - return FLB_FALSE; - } - for (idx = 0; idx < ((ssize_t)files->entry_count); idx++) { - unlink(files->entries[idx]->data.as_string); + if (files != NULL) { + for (idx = 0; idx < ((ssize_t)files->entry_count); idx++) { + unlink(files->entries[idx]->data.as_string); + } + cfl_array_destroy(files); } /* attempt to delete the main directory */ - ext = strrchr(cfg_glob, '/'); + ext = strrchr(cfg_glob, PATH_SEPARATOR[0]); if (ext) { *ext = '\0'; rmdir(cfg_glob); } flb_sds_destroy(cfg_glob); - cfl_array_destroy(files); return FLB_TRUE; } From afbb19601649a8d271e8d20eaafacef13f6e3732 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Thu, 23 Nov 2023 10:48:56 -0300 Subject: [PATCH 20/20] in_calyptia_fleet: preserve machine_id across reloads. Signed-off-by: Phillip Whelan --- plugins/in_calyptia_fleet/in_calyptia_fleet.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index fb321a972bf..c3b729ce8f1 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -1632,6 +1632,7 @@ static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx, " api_key %s\n" " fleet_id %s\n" " add_label fleet_id %s\n" + " machine_id %s\n" " fleet.config_dir %s\n" " calyptia_host %s\n" " calyptia_port %d\n" @@ -1639,6 +1640,7 @@ static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx, ctx->api_key, ctx->fleet_id, ctx->fleet_id, + ctx->machine_id, ctx->config_dir, ctx->ins->host.name, ctx->ins->host.port, @@ -1653,6 +1655,7 @@ static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx, " fleet_name %s\n" " fleet_id %s\n" " add_label fleet_id %s\n" + " machine_id %s\n" " fleet.config_dir %s\n" " calyptia_host %s\n" " calyptia_port %d\n" @@ -1661,6 +1664,7 @@ static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx, ctx->fleet_name, ctx->fleet_id, ctx->fleet_id, + ctx->machine_id, ctx->config_dir, ctx->ins->host.name, ctx->ins->host.port,