From 868befcb5f67d5d72b13e1a78aed1009a8bfe2f6 Mon Sep 17 00:00:00 2001 From: David Bennett <71459415+Jagularr@users.noreply.github.com> Date: Tue, 6 Apr 2021 17:33:35 -0400 Subject: [PATCH] Add support for Logstash 7 'queue' stats from the Pipelines API (#9080) * LAdd support for logstash 7 'queue' stats for its pipelines stats API * appease the linter * Update samples_logstash7.go --- plugins/inputs/logstash/logstash.go | 26 +++- plugins/inputs/logstash/logstash_test.go | 69 ++++++++++ plugins/inputs/logstash/samples_logstash7.go | 137 +++++++++++++++++++ 3 files changed, 227 insertions(+), 5 deletions(-) create mode 100644 plugins/inputs/logstash/samples_logstash7.go diff --git a/plugins/inputs/logstash/logstash.go b/plugins/inputs/logstash/logstash.go index 92b392d67c36d..c9833f028654d 100644 --- a/plugins/inputs/logstash/logstash.go +++ b/plugins/inputs/logstash/logstash.go @@ -138,10 +138,13 @@ type PipelinePlugins struct { } type PipelineQueue struct { - Events float64 `json:"events"` - Type string `json:"type"` - Capacity interface{} `json:"capacity"` - Data interface{} `json:"data"` + Events float64 `json:"events"` + EventsCount *float64 `json:"events_count"` + Type string `json:"type"` + Capacity interface{} `json:"capacity"` + Data interface{} `json:"data"` + QueueSizeInBytes *float64 `json:"queue_size_in_bytes"` + MaxQueueSizeInBytes *float64 `json:"max_queue_size_in_bytes"` } const jvmStats = "/_node/stats/jvm" @@ -304,8 +307,13 @@ func (logstash *Logstash) gatherQueueStats( queueTags[tag] = value } + events := queue.Events + if queue.EventsCount != nil { + events = *queue.EventsCount + } + queueFields := map[string]interface{}{ - "events": queue.Events, + "events": events, } if queue.Type != "memory" { @@ -321,6 +329,14 @@ func (logstash *Logstash) gatherQueueStats( for field, value := range flattener.Fields { queueFields[field] = value } + + if queue.MaxQueueSizeInBytes != nil { + queueFields["max_queue_size_in_bytes"] = *queue.MaxQueueSizeInBytes + } + + if queue.QueueSizeInBytes != nil { + queueFields["queue_size_in_bytes"] = *queue.QueueSizeInBytes + } } accumulator.AddFields("logstash_queue", queueFields, queueTags) diff --git a/plugins/inputs/logstash/logstash_test.go b/plugins/inputs/logstash/logstash_test.go index b0d020b487003..931af66b23fd6 100644 --- a/plugins/inputs/logstash/logstash_test.go +++ b/plugins/inputs/logstash/logstash_test.go @@ -16,6 +16,7 @@ var logstashTest = NewLogstash() var ( logstash5accPipelineStats testutil.Accumulator logstash6accPipelinesStats testutil.Accumulator + logstash7accPipelinesStats testutil.Accumulator logstash5accProcessStats testutil.Accumulator logstash6accProcessStats testutil.Accumulator logstash5accJVMStats testutil.Accumulator @@ -686,3 +687,71 @@ func Test_Logstash6GatherJVMStats(test *testing.T) { }, ) } + +func Test_Logstash7GatherPipelinesQueueStats(test *testing.T) { + fakeServer := httptest.NewUnstartedServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + writer.Header().Set("Content-Type", "application/json") + _, err := fmt.Fprintf(writer, "%s", string(logstash7PipelinesJSON)) + if err != nil { + test.Logf("Can't print test json") + } + })) + requestURL, err := url.Parse(logstashTest.URL) + if err != nil { + test.Logf("Can't connect to: %s", logstashTest.URL) + } + fakeServer.Listener, _ = net.Listen("tcp", fmt.Sprintf("%s:%s", requestURL.Hostname(), requestURL.Port())) + fakeServer.Start() + defer fakeServer.Close() + + if logstashTest.client == nil { + client, err := logstashTest.createHTTPClient() + + if err != nil { + test.Logf("Can't createHTTPClient") + } + logstashTest.client = client + } + + if err := logstashTest.gatherPipelinesStats(logstashTest.URL+pipelineStats, &logstash7accPipelinesStats); err != nil { + test.Logf("Can't gather Pipeline stats") + } + + fields := make(map[string]interface{}) + fields["duration_in_millis"] = float64(3032875.0) + fields["queue_push_duration_in_millis"] = float64(13300.0) + fields["in"] = float64(2665549.0) + fields["filtered"] = float64(2665549.0) + fields["out"] = float64(2665549.0) + + logstash7accPipelinesStats.AssertContainsTaggedFields( + test, + "logstash_events", + fields, + map[string]string{ + "node_id": string("28580380-ad2c-4032-934b-76359125edca"), + "node_name": string("HOST01.local"), + "source": string("HOST01.local"), + "node_version": string("7.4.2"), + "pipeline": string("infra"), + }, + ) + + logstash7accPipelinesStats.AssertContainsTaggedFields( + test, + "logstash_queue", + map[string]interface{}{ + "events": float64(0), + "max_queue_size_in_bytes": float64(4294967296), + "queue_size_in_bytes": float64(32028566), + }, + map[string]string{ + "node_id": string("28580380-ad2c-4032-934b-76359125edca"), + "node_name": string("HOST01.local"), + "source": string("HOST01.local"), + "node_version": string("7.4.2"), + "pipeline": string("infra"), + "queue_type": string("persisted"), + }, + ) +} diff --git a/plugins/inputs/logstash/samples_logstash7.go b/plugins/inputs/logstash/samples_logstash7.go new file mode 100644 index 0000000000000..fe05712909c81 --- /dev/null +++ b/plugins/inputs/logstash/samples_logstash7.go @@ -0,0 +1,137 @@ +package logstash + +const logstash7PipelinesJSON = ` +{ + "host" : "HOST01.local", + "version" : "7.4.2", + "http_address" : "127.0.0.1:9600", + "id" : "28580380-ad2c-4032-934b-76359125edca", + "name" : "HOST01.local", + "ephemeral_id" : "bd95ff6b-3fa8-42ae-be32-098a4e4ea1ec", + "status" : "green", + "snapshot" : true, + "pipeline" : { + "workers" : 8, + "batch_size" : 125, + "batch_delay" : 50 + }, + "pipelines" : { + "infra" : { + "events" : { + "in" : 2665549, + "out" : 2665549, + "duration_in_millis" : 3032875, + "filtered" : 2665549, + "queue_push_duration_in_millis" : 13300 + }, + "plugins" : { + "inputs" : [ { + "id" : "8526dc80bc2257ab08f96018f96b0c68dd03abc5695bb22fb9e96339a8dfb4f86", + "events" : { + "out" : 2665549, + "queue_push_duration_in_millis" : 13300 + }, + "peak_connections" : 1, + "name" : "beats", + "current_connections" : 1 + } ], + "codecs" : [ { + "id" : "plain_7312c097-1e7f-41db-983b-4f5a87a9eba2", + "encode" : { + "duration_in_millis" : 0, + "writes_in" : 0 + }, + "name" : "plain", + "decode" : { + "out" : 0, + "duration_in_millis" : 0, + "writes_in" : 0 + } + }, { + "id" : "rubydebug_e958e3dc-10f6-4dd6-b7c5-ae3de2892afb", + "encode" : { + "duration_in_millis" : 0, + "writes_in" : 0 + }, + "name" : "rubydebug", + "decode" : { + "out" : 0, + "duration_in_millis" : 0, + "writes_in" : 0 + } + }, { + "id" : "plain_addb97be-fb77-4cbc-b45c-0424cd5d0ac7", + "encode" : { + "duration_in_millis" : 0, + "writes_in" : 0 + }, + "name" : "plain", + "decode" : { + "out" : 0, + "duration_in_millis" : 0, + "writes_in" : 0 + } + } ], + "filters" : [ { + "id" : "9e8297a6ee7b61864f77853317dccde83d29952ef869010c385dcfc9064ab8b8", + "events" : { + "in" : 2665549, + "out" : 2665549, + "duration_in_millis" : 8648 + }, + "name" : "date", + "matches" : 2665549 + }, { + "id" : "bec0c77b3f53a78c7878449c72ec59f97be31c1f12f9621f61ed2d4563bad869", + "events" : { + "in" : 2665549, + "out" : 2665549, + "duration_in_millis" : 195138 + }, + "name" : "fingerprint" + } ], + "outputs" : [ { + "id" : "df59066a933f038354c1845ba44de692f70dbd0d2009ab07a12b98b776be7e3f", + "events" : { + "in" : 0, + "out" : 0, + "duration_in_millis" : 25 + }, + "name" : "stdout" + }, { + "id" : "38967f09bbd2647a95aa00702b6b557bdbbab31da6a04f991d38abe5629779e3", + "events" : { + "in" : 2665549, + "out" : 2665549, + "duration_in_millis" : 2802177 + }, + "name" : "elasticsearch", + "bulk_requests" : { + "successes" : 2870, + "responses" : { + "200" : 2870 + } + }, + "documents" : { + "successes" : 2665549 + } + } ] + }, + "reloads" : { + "successes" : 4, + "last_error" : null, + "failures" : 0, + "last_success_timestamp" : "2020-06-05T08:06:12.538Z", + "last_failure_timestamp" : null + }, + "queue" : { + "type" : "persisted", + "events_count" : 0, + "queue_size_in_bytes" : 32028566, + "max_queue_size_in_bytes" : 4294967296 + }, + "hash" : "5bc589ae4b02cb3e436626429b50928b9d99360639c84dc7fc69268ac01a9fd0", + "ephemeral_id" : "4bcacefa-6cbf-461e-b14e-184edd9ebdf3" + } + } +}`