From 30830c2ec254fb29a2751c7abeaa8fa321faead1 Mon Sep 17 00:00:00 2001 From: Dominic Tootell Date: Thu, 18 Mar 2021 20:43:39 +0000 Subject: [PATCH] Add content_encoding option to kinesis_consumer input (#8891) --- plugins/inputs/kinesis_consumer/README.md | 9 + .../kinesis_consumer/kinesis_consumer.go | 65 ++++++- .../kinesis_consumer/kinesis_consumer_test.go | 177 ++++++++++++++++++ 3 files changed, 250 insertions(+), 1 deletion(-) create mode 100644 plugins/inputs/kinesis_consumer/kinesis_consumer_test.go diff --git a/plugins/inputs/kinesis_consumer/README.md b/plugins/inputs/kinesis_consumer/README.md index 7896557ac6cf5..ad25940d58541 100644 --- a/plugins/inputs/kinesis_consumer/README.md +++ b/plugins/inputs/kinesis_consumer/README.md @@ -54,6 +54,15 @@ and creates metrics using one of the supported [input data formats][]. ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "influx" + ## + ## The content encoding of the data from kinesis + ## If you are processing a cloudwatch logs kinesis stream then set this to "gzip" + ## as AWS compresses cloudwatch log data before it is sent to kinesis (aws + ## also base64 encodes the zip byte data before pushing to the stream. The base64 decoding + ## is done automatically by the golang sdk, as data is read from kinesis) + ## + # content_encoding = "identity" + ## Optional ## Configuration for a dynamodb checkpoint [inputs.kinesis_consumer.checkpoint_dynamodb] diff --git a/plugins/inputs/kinesis_consumer/kinesis_consumer.go b/plugins/inputs/kinesis_consumer/kinesis_consumer.go index 6a3b1c8301a48..0a57955ce7f7b 100644 --- a/plugins/inputs/kinesis_consumer/kinesis_consumer.go +++ b/plugins/inputs/kinesis_consumer/kinesis_consumer.go @@ -1,8 +1,12 @@ package kinesis_consumer import ( + "bytes" + "compress/gzip" + "compress/zlib" "context" "fmt" + "io/ioutil" "math/big" "strings" "sync" @@ -38,6 +42,7 @@ type ( ShardIteratorType string `toml:"shard_iterator_type"` DynamoDB *DynamoDB `toml:"checkpoint_dynamodb"` MaxUndeliveredMessages int `toml:"max_undelivered_messages"` + ContentEncoding string `toml:"content_encoding"` Log telegraf.Logger @@ -55,6 +60,8 @@ type ( recordsTex sync.Mutex wg sync.WaitGroup + processContentEncodingFunc processContent + lastSeqNum *big.Int } @@ -68,6 +75,8 @@ const ( defaultMaxUndeliveredMessages = 1000 ) +type processContent func([]byte) ([]byte, error) + // this is the largest sequence number allowed - https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SequenceNumberRange.html var maxSeq = strToBint(strings.Repeat("9", 129)) @@ -118,6 +127,15 @@ var sampleConfig = ` ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "influx" + ## + ## The content encoding of the data from kinesis + ## If you are processing a cloudwatch logs kinesis stream then set this to "gzip" + ## as AWS compresses cloudwatch log data before it is sent to kinesis (aws + ## also base64 encodes the zip byte data before pushing to the stream. The base64 decoding + ## is done automatically by the golang sdk, as data is read from kinesis) + ## + # content_encoding = "identity" + ## Optional ## Configuration for a dynamodb checkpoint [inputs.kinesis_consumer.checkpoint_dynamodb] @@ -239,7 +257,11 @@ func (k *KinesisConsumer) Start(ac telegraf.Accumulator) error { } func (k *KinesisConsumer) onMessage(acc telegraf.TrackingAccumulator, r *consumer.Record) error { - metrics, err := k.parser.Parse(r.Data) + data, err := k.processContentEncodingFunc(r.Data) + if err != nil { + return err + } + metrics, err := k.parser.Parse(data) if err != nil { return err } @@ -334,6 +356,46 @@ func (k *KinesisConsumer) Set(streamName, shardID, sequenceNumber string) error return nil } +func processGzip(data []byte) ([]byte, error) { + zipData, err := gzip.NewReader(bytes.NewReader(data)) + if err != nil { + return nil, err + } + defer zipData.Close() + return ioutil.ReadAll(zipData) +} + +func processZlib(data []byte) ([]byte, error) { + zlibData, err := zlib.NewReader(bytes.NewReader(data)) + if err != nil { + return nil, err + } + defer zlibData.Close() + return ioutil.ReadAll(zlibData) +} + +func processNoOp(data []byte) ([]byte, error) { + return data, nil +} + +func (k *KinesisConsumer) configureProcessContentEncodingFunc() error { + switch k.ContentEncoding { + case "gzip": + k.processContentEncodingFunc = processGzip + case "zlib": + k.processContentEncodingFunc = processZlib + case "none", "identity", "": + k.processContentEncodingFunc = processNoOp + default: + return fmt.Errorf("unknown content encoding %q", k.ContentEncoding) + } + return nil +} + +func (k *KinesisConsumer) Init() error { + return k.configureProcessContentEncodingFunc() +} + type noopCheckpoint struct{} func (n noopCheckpoint) Set(string, string, string) error { return nil } @@ -347,6 +409,7 @@ func init() { ShardIteratorType: "TRIM_HORIZON", MaxUndeliveredMessages: defaultMaxUndeliveredMessages, lastSeqNum: maxSeq, + ContentEncoding: "identity", } }) } diff --git a/plugins/inputs/kinesis_consumer/kinesis_consumer_test.go b/plugins/inputs/kinesis_consumer/kinesis_consumer_test.go new file mode 100644 index 0000000000000..b8becece054fc --- /dev/null +++ b/plugins/inputs/kinesis_consumer/kinesis_consumer_test.go @@ -0,0 +1,177 @@ +package kinesis_consumer + +import ( + "encoding/base64" + "github.com/aws/aws-sdk-go/aws" + consumer "github.com/harlow/kinesis-consumer" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/parsers/json" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestKinesisConsumer_onMessage(t *testing.T) { + zlibBytpes, _ := base64.StdEncoding.DecodeString("eF5FjlFrgzAUhf9KuM+2aNB2zdsQ2xe3whQGW8qIeqdhaiSJK0P874u1Y4+Hc/jON0GHxoga858BgUF8fs5fzunHU5Jlj6cEPFDXHvXStGqsrsKWTapq44pW1SetxsF1a8qsRtGt0YyFKbUcrFT9UbYWtQH2frntkm/s7RInkNU6t9JpWNE5WBAFPo3CcHeg+9D703OziUOhCg6MQ/yakrspuZsyEjdYfsm+Jg2K1jZEfZLKQWUvFglylBobZXDLwSP8//EGpD4NNj7dUJpT6hQY3W33h/AhCt84zDBf5l/MDl08") + gzippedBytes, _ := base64.StdEncoding.DecodeString("H4sIAAFXNGAAA0WOUWuDMBSF/0q4z7Zo0HbN2xDbF7fCFAZbyoh6p2FqJIkrQ/zvi7Vjj4dz+M43QYfGiBrznwGBQXx+zl/O6cdTkmWPpwQ8UNce9dK0aqyuwpZNqmrjilbVJ63GwXVryqxG0a3RjIUptRysVP1Rtha1AfZ+ue2Sb+ztEieQ1Tq30mlY0TlYEAU+jcJwd6D70PvTc7OJQ6EKDoxD/JqSuym5mzISN1h+yb4mDYrWNkR9kspBZS8WCXKUGhtlcMvBI/z/8QakPg02Pt1QmlPqFBjdbfeH8CEK3zjMMF/mX0TaxZUpAQAA") + notZippedBytes := []byte(`{"messageType":"CONTROL_MESSAGE","owner":"CloudwatchLogs","logGroup":"","logStream":"", +"subscriptionFilters":[],"logEvents":[ + {"id":"","timestamp":1510254469274,"message":"{\"bob\":\"CWL CONTROL MESSAGE: Checking health of destination Firehose.\", \"timestamp\":\"2021-02-22T22:15:26.794854Z\"},"}, + {"id":"","timestamp":1510254469274,"message":"{\"bob\":\"CWL CONTROL MESSAGE: Checking health of destination Firehose.\", \"timestamp\":\"2021-02-22T22:15:26.794854Z\"}"} +]}`) + parser, _ := json.New(&json.Config{ + MetricName: "json_test", + Query: "logEvents", + StringFields: []string{"message"}, + }) + + type fields struct { + ContentEncoding string + parser parsers.Parser + records map[telegraf.TrackingID]string + } + type args struct { + r *consumer.Record + } + type expected struct { + numberOfMetrics int + messageContains string + } + tests := []struct { + name string + fields fields + args args + wantErr bool + expected expected + }{ + { + name: "test no compression", + fields: fields{ + ContentEncoding: "none", + parser: parser, + records: make(map[telegraf.TrackingID]string), + }, + args: args{ + r: &consumer.Record{Data: notZippedBytes, SequenceNumber: aws.String("anything")}, + }, + wantErr: false, + expected: expected{ + messageContains: "bob", + numberOfMetrics: 2, + }, + }, + { + name: "test no compression via empty string for ContentEncoding", + fields: fields{ + ContentEncoding: "", + parser: parser, + records: make(map[telegraf.TrackingID]string), + }, + args: args{ + r: &consumer.Record{Data: notZippedBytes, SequenceNumber: aws.String("anything")}, + }, + wantErr: false, + expected: expected{ + messageContains: "bob", + numberOfMetrics: 2, + }, + }, + { + name: "test no compression via identity ContentEncoding", + fields: fields{ + ContentEncoding: "identity", + parser: parser, + records: make(map[telegraf.TrackingID]string), + }, + args: args{ + r: &consumer.Record{Data: notZippedBytes, SequenceNumber: aws.String("anything")}, + }, + wantErr: false, + expected: expected{ + messageContains: "bob", + numberOfMetrics: 2, + }, + }, + { + name: "test no compression via no ContentEncoding", + fields: fields{ + parser: parser, + records: make(map[telegraf.TrackingID]string), + }, + args: args{ + r: &consumer.Record{Data: notZippedBytes, SequenceNumber: aws.String("anything")}, + }, + wantErr: false, + expected: expected{ + messageContains: "bob", + numberOfMetrics: 2, + }, + }, + { + name: "test gzip compression", + fields: fields{ + ContentEncoding: "gzip", + parser: parser, + records: make(map[telegraf.TrackingID]string), + }, + args: args{ + r: &consumer.Record{Data: gzippedBytes, SequenceNumber: aws.String("anything")}, + }, + wantErr: false, + expected: expected{ + messageContains: "bob", + numberOfMetrics: 1, + }, + }, + { + name: "test zlib compression", + fields: fields{ + ContentEncoding: "zlib", + parser: parser, + records: make(map[telegraf.TrackingID]string), + }, + args: args{ + r: &consumer.Record{Data: zlibBytpes, SequenceNumber: aws.String("anything")}, + }, + wantErr: false, + expected: expected{ + messageContains: "bob", + numberOfMetrics: 1, + }, + }, + } + + k := &KinesisConsumer{ + ContentEncoding: "notsupported", + } + err := k.Init() + assert.NotNil(t, err) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + k := &KinesisConsumer{ + ContentEncoding: tt.fields.ContentEncoding, + parser: tt.fields.parser, + records: tt.fields.records, + } + err := k.Init() + assert.Nil(t, err) + + acc := testutil.Accumulator{} + if err := k.onMessage(acc.WithTracking(tt.expected.numberOfMetrics), tt.args.r); (err != nil) != tt.wantErr { + t.Errorf("onMessage() error = %v, wantErr %v", err, tt.wantErr) + } + + assert.Equal(t, tt.expected.numberOfMetrics, len(acc.Metrics)) + + for _, metric := range acc.Metrics { + if logEventMessage, ok := metric.Fields["message"]; ok { + assert.Contains(t, logEventMessage.(string), tt.expected.messageContains) + } else { + t.Errorf("Expect logEvents to be present") + } + } + }) + } +}