Skip to content

Commit

Permalink
Add support for Logstash 7 'queue' stats from the Pipelines API (infl…
Browse files Browse the repository at this point in the history
…uxdata#9080)

* LAdd support for logstash 7 'queue' stats for its pipelines stats API

* appease the linter

* Update samples_logstash7.go
  • Loading branch information
ivorybilled authored Apr 6, 2021
1 parent 5524acf commit 868befc
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 5 deletions.
26 changes: 21 additions & 5 deletions plugins/inputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,13 @@ type PipelinePlugins struct {
}

type PipelineQueue struct {
Events float64 `json:"events"`
Type string `json:"type"`
Capacity interface{} `json:"capacity"`
Data interface{} `json:"data"`
Events float64 `json:"events"`
EventsCount *float64 `json:"events_count"`
Type string `json:"type"`
Capacity interface{} `json:"capacity"`
Data interface{} `json:"data"`
QueueSizeInBytes *float64 `json:"queue_size_in_bytes"`
MaxQueueSizeInBytes *float64 `json:"max_queue_size_in_bytes"`
}

const jvmStats = "/_node/stats/jvm"
Expand Down Expand Up @@ -304,8 +307,13 @@ func (logstash *Logstash) gatherQueueStats(
queueTags[tag] = value
}

events := queue.Events
if queue.EventsCount != nil {
events = *queue.EventsCount
}

queueFields := map[string]interface{}{
"events": queue.Events,
"events": events,
}

if queue.Type != "memory" {
Expand All @@ -321,6 +329,14 @@ func (logstash *Logstash) gatherQueueStats(
for field, value := range flattener.Fields {
queueFields[field] = value
}

if queue.MaxQueueSizeInBytes != nil {
queueFields["max_queue_size_in_bytes"] = *queue.MaxQueueSizeInBytes
}

if queue.QueueSizeInBytes != nil {
queueFields["queue_size_in_bytes"] = *queue.QueueSizeInBytes
}
}

accumulator.AddFields("logstash_queue", queueFields, queueTags)
Expand Down
69 changes: 69 additions & 0 deletions plugins/inputs/logstash/logstash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var logstashTest = NewLogstash()
var (
logstash5accPipelineStats testutil.Accumulator
logstash6accPipelinesStats testutil.Accumulator
logstash7accPipelinesStats testutil.Accumulator
logstash5accProcessStats testutil.Accumulator
logstash6accProcessStats testutil.Accumulator
logstash5accJVMStats testutil.Accumulator
Expand Down Expand Up @@ -686,3 +687,71 @@ func Test_Logstash6GatherJVMStats(test *testing.T) {
},
)
}

func Test_Logstash7GatherPipelinesQueueStats(test *testing.T) {
fakeServer := httptest.NewUnstartedServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
writer.Header().Set("Content-Type", "application/json")
_, err := fmt.Fprintf(writer, "%s", string(logstash7PipelinesJSON))
if err != nil {
test.Logf("Can't print test json")
}
}))
requestURL, err := url.Parse(logstashTest.URL)
if err != nil {
test.Logf("Can't connect to: %s", logstashTest.URL)
}
fakeServer.Listener, _ = net.Listen("tcp", fmt.Sprintf("%s:%s", requestURL.Hostname(), requestURL.Port()))
fakeServer.Start()
defer fakeServer.Close()

if logstashTest.client == nil {
client, err := logstashTest.createHTTPClient()

if err != nil {
test.Logf("Can't createHTTPClient")
}
logstashTest.client = client
}

if err := logstashTest.gatherPipelinesStats(logstashTest.URL+pipelineStats, &logstash7accPipelinesStats); err != nil {
test.Logf("Can't gather Pipeline stats")
}

fields := make(map[string]interface{})
fields["duration_in_millis"] = float64(3032875.0)
fields["queue_push_duration_in_millis"] = float64(13300.0)
fields["in"] = float64(2665549.0)
fields["filtered"] = float64(2665549.0)
fields["out"] = float64(2665549.0)

logstash7accPipelinesStats.AssertContainsTaggedFields(
test,
"logstash_events",
fields,
map[string]string{
"node_id": string("28580380-ad2c-4032-934b-76359125edca"),
"node_name": string("HOST01.local"),
"source": string("HOST01.local"),
"node_version": string("7.4.2"),
"pipeline": string("infra"),
},
)

logstash7accPipelinesStats.AssertContainsTaggedFields(
test,
"logstash_queue",
map[string]interface{}{
"events": float64(0),
"max_queue_size_in_bytes": float64(4294967296),
"queue_size_in_bytes": float64(32028566),
},
map[string]string{
"node_id": string("28580380-ad2c-4032-934b-76359125edca"),
"node_name": string("HOST01.local"),
"source": string("HOST01.local"),
"node_version": string("7.4.2"),
"pipeline": string("infra"),
"queue_type": string("persisted"),
},
)
}
137 changes: 137 additions & 0 deletions plugins/inputs/logstash/samples_logstash7.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package logstash

const logstash7PipelinesJSON = `
{
"host" : "HOST01.local",
"version" : "7.4.2",
"http_address" : "127.0.0.1:9600",
"id" : "28580380-ad2c-4032-934b-76359125edca",
"name" : "HOST01.local",
"ephemeral_id" : "bd95ff6b-3fa8-42ae-be32-098a4e4ea1ec",
"status" : "green",
"snapshot" : true,
"pipeline" : {
"workers" : 8,
"batch_size" : 125,
"batch_delay" : 50
},
"pipelines" : {
"infra" : {
"events" : {
"in" : 2665549,
"out" : 2665549,
"duration_in_millis" : 3032875,
"filtered" : 2665549,
"queue_push_duration_in_millis" : 13300
},
"plugins" : {
"inputs" : [ {
"id" : "8526dc80bc2257ab08f96018f96b0c68dd03abc5695bb22fb9e96339a8dfb4f86",
"events" : {
"out" : 2665549,
"queue_push_duration_in_millis" : 13300
},
"peak_connections" : 1,
"name" : "beats",
"current_connections" : 1
} ],
"codecs" : [ {
"id" : "plain_7312c097-1e7f-41db-983b-4f5a87a9eba2",
"encode" : {
"duration_in_millis" : 0,
"writes_in" : 0
},
"name" : "plain",
"decode" : {
"out" : 0,
"duration_in_millis" : 0,
"writes_in" : 0
}
}, {
"id" : "rubydebug_e958e3dc-10f6-4dd6-b7c5-ae3de2892afb",
"encode" : {
"duration_in_millis" : 0,
"writes_in" : 0
},
"name" : "rubydebug",
"decode" : {
"out" : 0,
"duration_in_millis" : 0,
"writes_in" : 0
}
}, {
"id" : "plain_addb97be-fb77-4cbc-b45c-0424cd5d0ac7",
"encode" : {
"duration_in_millis" : 0,
"writes_in" : 0
},
"name" : "plain",
"decode" : {
"out" : 0,
"duration_in_millis" : 0,
"writes_in" : 0
}
} ],
"filters" : [ {
"id" : "9e8297a6ee7b61864f77853317dccde83d29952ef869010c385dcfc9064ab8b8",
"events" : {
"in" : 2665549,
"out" : 2665549,
"duration_in_millis" : 8648
},
"name" : "date",
"matches" : 2665549
}, {
"id" : "bec0c77b3f53a78c7878449c72ec59f97be31c1f12f9621f61ed2d4563bad869",
"events" : {
"in" : 2665549,
"out" : 2665549,
"duration_in_millis" : 195138
},
"name" : "fingerprint"
} ],
"outputs" : [ {
"id" : "df59066a933f038354c1845ba44de692f70dbd0d2009ab07a12b98b776be7e3f",
"events" : {
"in" : 0,
"out" : 0,
"duration_in_millis" : 25
},
"name" : "stdout"
}, {
"id" : "38967f09bbd2647a95aa00702b6b557bdbbab31da6a04f991d38abe5629779e3",
"events" : {
"in" : 2665549,
"out" : 2665549,
"duration_in_millis" : 2802177
},
"name" : "elasticsearch",
"bulk_requests" : {
"successes" : 2870,
"responses" : {
"200" : 2870
}
},
"documents" : {
"successes" : 2665549
}
} ]
},
"reloads" : {
"successes" : 4,
"last_error" : null,
"failures" : 0,
"last_success_timestamp" : "2020-06-05T08:06:12.538Z",
"last_failure_timestamp" : null
},
"queue" : {
"type" : "persisted",
"events_count" : 0,
"queue_size_in_bytes" : 32028566,
"max_queue_size_in_bytes" : 4294967296
},
"hash" : "5bc589ae4b02cb3e436626429b50928b9d99360639c84dc7fc69268ac01a9fd0",
"ephemeral_id" : "4bcacefa-6cbf-461e-b14e-184edd9ebdf3"
}
}
}`

0 comments on commit 868befc

Please sign in to comment.