Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

es_out: support Upstream Servers #1560

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
es_out: support multiple output nodes in round-robin by using upstrea…
…m servers configuration

Signed-off-by: Clara Pohland <54847419+claralui@users.noreply.github.com>
claralp committed Sep 9, 2019
commit b08b568a13d24000f5ca3aaec4b1a35573807a44
164 changes: 113 additions & 51 deletions plugins/out_es/es.c
Original file line number Diff line number Diff line change
@@ -23,6 +23,8 @@
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_network.h>
#include <fluent-bit/flb_http_client.h>
#include <fluent-bit/flb_upstream.h>
#include <fluent-bit/flb_upstream_ha.h>
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_time.h>
#include <msgpack.h>
@@ -38,7 +40,7 @@ struct flb_output_plugin out_es_plugin;

static inline int es_pack_map_content(msgpack_packer *tmp_pck,
msgpack_object map,
struct flb_elasticsearch *ctx)
struct flb_elasticsearch_config *ec)
{
int i;
char *ptr_key = NULL;
@@ -87,7 +89,7 @@ static inline int es_pack_map_content(msgpack_packer *tmp_pck,
*
* https://goo.gl/R5NMTr
*/
if (ctx->replace_dots == FLB_TRUE) {
if (ec->replace_dots == FLB_TRUE) {
char *p = ptr_key;
char *end = ptr_key + key_size;
while (p != end) {
@@ -112,7 +114,7 @@ static inline int es_pack_map_content(msgpack_packer *tmp_pck,
*/
if (v->type == MSGPACK_OBJECT_MAP) {
msgpack_pack_map(tmp_pck, v->via.map.size);
es_pack_map_content(tmp_pck, *v, ctx);
es_pack_map_content(tmp_pck, *v, ec);
}
else {
msgpack_pack_object(tmp_pck, *v);
@@ -130,7 +132,7 @@ static inline int es_pack_map_content(msgpack_packer *tmp_pck,
*/
static char *elasticsearch_format(const void *data, size_t bytes,
const char *tag, int tag_len, int *out_size,
struct flb_elasticsearch *ctx)
struct flb_elasticsearch_config *ec)
{
int ret;
int len;
@@ -200,9 +202,9 @@ static char *elasticsearch_format(const void *data, size_t bytes,
msgpack_unpacked_init(&result);

/* Copy logstash prefix if logstash format is enabled */
if (ctx->logstash_format == FLB_TRUE) {
memcpy(logstash_index, ctx->logstash_prefix, ctx->logstash_prefix_len);
logstash_index[ctx->logstash_prefix_len] = '\0';
if (ec->logstash_format == FLB_TRUE) {
memcpy(logstash_index, ec->logstash_prefix, ec->logstash_prefix_len);
logstash_index[ec->logstash_prefix_len] = '\0';
}

/*
@@ -212,17 +214,17 @@ static char *elasticsearch_format(const void *data, size_t bytes,
* The header stored in 'j_index' will be used for the all records on
* this payload.
*/
if (ctx->logstash_format == FLB_FALSE && ctx->generate_id == FLB_FALSE) {
if (ec->logstash_format == FLB_FALSE && ec->generate_id == FLB_FALSE) {
flb_time_get(&tms);
gmtime_r(&tms.tm.tv_sec, &tm);
s = strftime(index_formatted, sizeof(index_formatted) - 1,
ctx->index, &tm);
ec->index, &tm);
es_index = index_formatted;

index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT,
es_index, ctx->type);
es_index, ec->type);
}

/*
@@ -231,7 +233,7 @@ static char *elasticsearch_format(const void *data, size_t bytes,
* in order to prevent generating millions of indexes
* we can set to always use current time for index generation
*/
if (ctx->current_time_index == FLB_TRUE) {
if (ec->current_time_index == FLB_TRUE) {
flb_time_get(&tms);
}

@@ -248,7 +250,7 @@ static char *elasticsearch_format(const void *data, size_t bytes,
}

/* Only pop time from record if current_time_index is disabled */
if (ctx->current_time_index == FLB_FALSE) {
if (ec->current_time_index == FLB_FALSE) {
flb_time_pop_from_msgpack(&tms, &result, &obj);
}

@@ -263,16 +265,16 @@ static char *elasticsearch_format(const void *data, size_t bytes,
map_size = map.via.map.size;

es_index_custom_len = 0;
if (ctx->logstash_prefix_key_len != 0) {
if (ec->logstash_prefix_key_len != 0) {
for (i = 0; i < map_size; i++) {
key = map.via.map.ptr[i].key;
if (key.type != MSGPACK_OBJECT_STR) {
continue;
}
if (key.via.str.size != ctx->logstash_prefix_key_len) {
if (key.via.str.size != ec->logstash_prefix_key_len) {
continue;
}
if (strncmp(key.via.str.ptr, ctx->logstash_prefix_key, ctx->logstash_prefix_key_len) != 0) {
if (strncmp(key.via.str.ptr, ec->logstash_prefix_key, ec->logstash_prefix_key_len) != 0) {
continue;
}
val = map.via.map.ptr[i].val;
@@ -294,62 +296,62 @@ static char *elasticsearch_format(const void *data, size_t bytes,
msgpack_sbuffer_init(&tmp_sbuf);
msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);

if (ctx->include_tag_key == FLB_TRUE) {
if (ec->include_tag_key == FLB_TRUE) {
map_size++;
}

/* Set the new map size */
msgpack_pack_map(&tmp_pck, map_size + 1);

/* Append the time key */
msgpack_pack_str(&tmp_pck, ctx->time_key_len);
msgpack_pack_str_body(&tmp_pck, ctx->time_key, ctx->time_key_len);
msgpack_pack_str(&tmp_pck, ec->time_key_len);
msgpack_pack_str_body(&tmp_pck, ec->time_key, ec->time_key_len);

/* Format the time */
gmtime_r(&tms.tm.tv_sec, &tm);
s = strftime(time_formatted, sizeof(time_formatted) - 1,
ctx->time_key_format, &tm);
ec->time_key_format, &tm);
len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s,
".%03" PRIu64 "Z", (uint64_t) tms.tm.tv_nsec);

s += len;
msgpack_pack_str(&tmp_pck, s);
msgpack_pack_str_body(&tmp_pck, time_formatted, s);

es_index = ctx->index;
if (ctx->logstash_format == FLB_TRUE) {
es_index = ec->index;
if (ec->logstash_format == FLB_TRUE) {
/* Compose Index header */
if (es_index_custom_len > 0) {
p = logstash_index + es_index_custom_len;
} else {
p = logstash_index + ctx->logstash_prefix_len;
p = logstash_index + ec->logstash_prefix_len;
}
*p++ = '-';

len = p - logstash_index;
s = strftime(p, sizeof(logstash_index) - len - 1,
ctx->logstash_dateformat, &tm);
ec->logstash_dateformat, &tm);
p += s;
*p++ = '\0';
es_index = logstash_index;
if (ctx->generate_id == FLB_FALSE) {
if (ec->generate_id == FLB_FALSE) {
index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT,
es_index, ctx->type);
es_index, ec->type);
}
}
else if (ctx->current_time_index == FLB_TRUE) {
else if (ec->current_time_index == FLB_TRUE) {
/* Make sure we handle index time format for index */
strftime(index_formatted, sizeof(index_formatted) - 1,
ctx->index, &tm);
ec->index, &tm);
es_index = index_formatted;
}

/* Tag Key */
if (ctx->include_tag_key == FLB_TRUE) {
msgpack_pack_str(&tmp_pck, ctx->tag_key_len);
msgpack_pack_str_body(&tmp_pck, ctx->tag_key, ctx->tag_key_len);
if (ec->include_tag_key == FLB_TRUE) {
msgpack_pack_str(&tmp_pck, ec->tag_key_len);
msgpack_pack_str_body(&tmp_pck, ec->tag_key, ec->tag_key_len);
msgpack_pack_str(&tmp_pck, tag_len);
msgpack_pack_str_body(&tmp_pck, tag, tag_len);
}
@@ -361,15 +363,15 @@ static char *elasticsearch_format(const void *data, size_t bytes,
* Elasticsearch have a restriction that key names cannot contain
* a dot; if some dot is found, it's replaced with an underscore.
*/
ret = es_pack_map_content(&tmp_pck, map, ctx);
ret = es_pack_map_content(&tmp_pck, map, ec);
if (ret == -1) {
msgpack_unpacked_destroy(&result);
msgpack_sbuffer_destroy(&tmp_sbuf);
es_bulk_destroy(bulk);
return NULL;
}

if (ctx->generate_id == FLB_TRUE) {
if (ec->generate_id == FLB_TRUE) {
MurmurHash3_x64_128(tmp_sbuf.data, tmp_sbuf.size, 42, hash);
snprintf(es_uuid, sizeof(es_uuid),
"%04x%04x-%04x-%04x-%04x-%04x%04x%04x",
@@ -378,7 +380,7 @@ static char *elasticsearch_format(const void *data, size_t bytes,
index_len = snprintf(j_index,
ES_BULK_HEADER,
ES_BULK_INDEX_FMT_ID,
es_index, ctx->type, es_uuid);
es_index, ec->type, es_uuid);
}

/* Convert msgpack to JSON */
@@ -412,7 +414,7 @@ static char *elasticsearch_format(const void *data, size_t bytes,
* return the bulk->ptr buffer
*/
flb_free(bulk);
if (ctx->trace_output) {
if (ec->trace_output) {
fwrite(buf, 1, *out_size, stdout);
fflush(stdout);
}
@@ -423,20 +425,30 @@ int cb_es_init(struct flb_output_instance *ins,
struct flb_config *config,
void *data)
{
int ret;
const char *tmp;
struct flb_elasticsearch *ctx;
(void) data;

ctx = flb_es_conf_create(ins, config);
ctx = flb_calloc(1, sizeof(struct flb_elasticsearch));
if (!ctx) {
flb_error("[out_es] cannot initialize plugin");
flb_errno();
return -1;
}

flb_debug("[out_es] host=%s port=%i uri=%s index=%s type=%s",
ins->host.name, ins->host.port, ctx->uri,
ctx->index, ctx->type);

mk_list_init(&ctx->configs);
flb_output_set_context(ins, ctx);
return 0;

/* Configure HA or simple mode ? */
tmp = flb_output_get_property("upstream", ins);
if (tmp) {
ret = es_config_ha(tmp, ctx, config);
}
else {
ret = es_config_simple(ins, ctx, config);
}

return ret;
}

static int elasticsearch_error_check(struct flb_http_client *c)
@@ -546,46 +558,72 @@ void cb_es_flush(const void *data, size_t bytes,
char *pack;
size_t b_sent;
struct flb_elasticsearch *ctx = out_context;
struct flb_elasticsearch_config *ec = NULL;
struct flb_upstream_conn *u_conn;
struct flb_http_client *c;
struct flb_upstream_node *node;
(void) i_ins;
(void) tag;
(void) tag_len;

if (ctx->ha_mode == FLB_TRUE) {
node = flb_upstream_ha_node_get(ctx->ha);
if (!node) {
flb_error("[out_es] cannot get an Upstream HA node");
FLB_OUTPUT_RETURN(FLB_RETRY);
}

/* Get forward_config stored in node opaque data */
ec = flb_upstream_node_get_data(node);
}
else {
ec = mk_list_entry_first(&ctx->configs,
struct flb_elasticsearch_config,
_head);
}

flb_debug("[out_es] trying node %s", node->name);

/* Get upstream connection */
u_conn = flb_upstream_conn_get(ctx->u);
if (ctx->ha_mode == FLB_TRUE) {
u_conn = flb_upstream_conn_get(node->u);
}
else {
u_conn = flb_upstream_conn_get(ctx->u);
}
if (!u_conn) {
flb_error("[out_es] no upstream connections available");
FLB_OUTPUT_RETURN(FLB_RETRY);
}

/* Convert format */
pack = elasticsearch_format(data, bytes, tag, tag_len, &bytes_out, ctx);
pack = elasticsearch_format(data, bytes, tag, tag_len, &bytes_out, ec);
if (!pack) {
flb_upstream_conn_release(u_conn);
FLB_OUTPUT_RETURN(FLB_ERROR);
}

/* Compose HTTP Client request */
c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->uri,
c = flb_http_client(u_conn, FLB_HTTP_POST, ec->uri,
pack, bytes_out, NULL, 0, NULL, 0);

flb_http_buffer_size(c, ctx->buffer_size);
flb_http_buffer_size(c, ec->buffer_size);

flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
flb_http_add_header(c, "Content-Type", 12, "application/x-ndjson", 20);

if (ctx->http_user && ctx->http_passwd) {
flb_http_basic_auth(c, ctx->http_user, ctx->http_passwd);
if (ec->http_user && ec->http_passwd) {
flb_http_basic_auth(c, ec->http_user, ec->http_passwd);
}

ret = flb_http_do(c, &b_sent);
if (ret != 0) {
flb_warn("[out_es] http_do=%i URI=%s", ret, ctx->uri);
flb_warn("[out_es] http_do=%i URI=%s", ret, ec->uri);
goto retry;
}
else {
/* The request was issued successfully, validate the 'error' field */
flb_debug("[out_es] HTTP Status=%i URI=%s", c->resp.status, ctx->uri);
flb_debug("[out_es] HTTP Status=%i URI=%s", c->resp.status, ec->uri);
if (c->resp.status != 200 && c->resp.status != 201) {
goto retry;
}
@@ -598,7 +636,7 @@ void cb_es_flush(const void *data, size_t bytes,
ret = elasticsearch_error_check(c);
if (ret == FLB_TRUE) {
/* we got an error */
if (ctx->trace_error) {
if (ec->trace_error) {
/*
* If trace_error is set, trace the actual
* input/output to Elasticsearch that caused the problem.
@@ -635,8 +673,32 @@ void cb_es_flush(const void *data, size_t bytes,
int cb_es_exit(void *data, struct flb_config *config)
{
struct flb_elasticsearch *ctx = data;
struct flb_elasticsearch_config *ec;
struct mk_list *head;
struct mk_list *tmp;
(void) config;

if (!ctx) {
return 0;
}

/* Destroy elasticsearch_config contexts */
mk_list_foreach_safe(head, tmp, &ctx->configs) {
ec = mk_list_entry(head, struct flb_elasticsearch_config, _head);
mk_list_del(&ec->_head);
flb_es_conf_destroy(ec);
}

if (ctx->ha_mode == FLB_TRUE) {
if (ctx->ha) {
flb_upstream_ha_destroy(ctx->ha);
}
}
else {
flb_upstream_destroy(ctx->u);
}
flb_free(ctx);

flb_es_conf_destroy(ctx);
return 0;
}

Loading