From fdde9084c106b6dde757edfa3c2cf6e962d782d7 Mon Sep 17 00:00:00 2001 From: i-prudnikov Date: Fri, 26 Mar 2021 18:02:42 +0200 Subject: [PATCH] AWS Cloudwatch log output (#8639) * Cloudwatch log output * Fixes based on @sspaink review * Make linter happy * iMake LGTM happy, add new tests --- plugins/outputs/all/all.go | 1 + plugins/outputs/cloudwatch_logs/README.md | 78 +++ .../cloudwatch_logs/cloudwatch_logs.go | 440 +++++++++++++++ .../cloudwatch_logs/cloudwatch_logs_test.go | 528 ++++++++++++++++++ 4 files changed, 1047 insertions(+) create mode 100644 plugins/outputs/cloudwatch_logs/README.md create mode 100644 plugins/outputs/cloudwatch_logs/cloudwatch_logs.go create mode 100644 plugins/outputs/cloudwatch_logs/cloudwatch_logs_test.go diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index e183242b91343..61270d5ad412e 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -8,6 +8,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/azure_monitor" _ "github.com/influxdata/telegraf/plugins/outputs/cloud_pubsub" _ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch" + _ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch_logs" _ "github.com/influxdata/telegraf/plugins/outputs/cratedb" _ "github.com/influxdata/telegraf/plugins/outputs/datadog" _ "github.com/influxdata/telegraf/plugins/outputs/discard" diff --git a/plugins/outputs/cloudwatch_logs/README.md b/plugins/outputs/cloudwatch_logs/README.md new file mode 100644 index 0000000000000..26dd3cfafc9a3 --- /dev/null +++ b/plugins/outputs/cloudwatch_logs/README.md @@ -0,0 +1,78 @@ +## Amazon CloudWatch Logs Output for Telegraf + +This plugin will send logs to Amazon CloudWatch. + +## Amazon Authentication + +This plugin uses a credential chain for Authentication with the CloudWatch Logs +API endpoint. In the following order the plugin will attempt to authenticate. +1. Assumed credentials via STS if `role_arn` attribute is specified (source credentials are evaluated from subsequent rules) +2. Explicit credentials from `access_key`, `secret_key`, and `token` attributes +3. Shared profile from `profile` attribute +4. [Environment Variables](https://github.com/aws/aws-sdk-go/wiki/configuring-sdk#environment-variables) +5. [Shared Credentials](https://github.com/aws/aws-sdk-go/wiki/configuring-sdk#shared-credentials-file) +6. [EC2 Instance Profile](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html) + +The IAM user needs the following permissions ( https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/permissions-reference-cwl.html): +- `logs:DescribeLogGroups` - required for check if configured log group exist +- `logs:DescribeLogStreams` - required to view all log streams associated with a log group. +- `logs:CreateLogStream` - required to create a new log stream in a log group.) +- `logs:PutLogEvents` - required to upload a batch of log events into log stream. + +## Config +```toml +[[outputs.cloudwatch_logs]] + ## The region is the Amazon region that you wish to connect to. + ## Examples include but are not limited to: + ## - us-west-1 + ## - us-west-2 + ## - us-east-1 + ## - ap-southeast-1 + ## - ap-southeast-2 + ## ... + region = "us-east-1" + + ## Amazon Credentials + ## Credentials are loaded in the following order + ## 1) Assumed credentials via STS if role_arn is specified + ## 2) explicit credentials from 'access_key' and 'secret_key' + ## 3) shared profile from 'profile' + ## 4) environment variables + ## 5) shared credentials file + ## 6) EC2 Instance Profile + #access_key = "" + #secret_key = "" + #token = "" + #role_arn = "" + #profile = "" + #shared_credential_file = "" + + ## Endpoint to make request against, the correct endpoint is automatically + ## determined and this option should only be set if you wish to override the + ## default. + ## ex: endpoint_url = "http://localhost:8000" + # endpoint_url = "" + + ## Cloud watch log group. Must be created in AWS cloudwatch logs upfront! + ## For example, you can specify the name of the k8s cluster here to group logs from all cluster in oine place + log_group = "my-group-name" + + ## Log stream in log group + ## Either log group name or reference to metric attribute, from which it can be parsed: + ## tag: or field:. If log stream is not exist, it will be created. + ## Since AWS is not automatically delete logs streams with expired logs entries (i.e. empty log stream) + ## you need to put in place appropriate house-keeping (https://forums.aws.amazon.com/thread.jspa?threadID=178855) + log_stream = "tag:location" + + ## Source of log data - metric name + ## specify the name of the metric, from which the log data should be retrieved. + ## I.e., if you are using docker_log plugin to stream logs from container, then + ## specify log_data_metric_name = "docker_log" + log_data_metric_name = "docker_log" + + ## Specify from which metric attribute the log data should be retrieved: + ## tag: or field:. + ## I.e., if you are using docker_log plugin to stream logs from container, then + ## specify log_data_source = "field:message" + log_data_source = "field:message" +``` \ No newline at end of file diff --git a/plugins/outputs/cloudwatch_logs/cloudwatch_logs.go b/plugins/outputs/cloudwatch_logs/cloudwatch_logs.go new file mode 100644 index 0000000000000..d1d96b0b33951 --- /dev/null +++ b/plugins/outputs/cloudwatch_logs/cloudwatch_logs.go @@ -0,0 +1,440 @@ +package cloudwatch_logs + +import ( + "fmt" + "sort" + "strings" + "time" + + "github.com/aws/aws-sdk-go/service/cloudwatchlogs" + "github.com/influxdata/telegraf" + internalaws "github.com/influxdata/telegraf/config/aws" + "github.com/influxdata/telegraf/plugins/outputs" +) + +type messageBatch struct { + logEvents []*cloudwatchlogs.InputLogEvent + messageCount int +} +type logStreamContainer struct { + currentBatchSizeBytes int + currentBatchIndex int + messageBatches []messageBatch + sequenceToken string +} + +//Cloudwatch Logs service interface +type cloudWatchLogs interface { + DescribeLogGroups(*cloudwatchlogs.DescribeLogGroupsInput) (*cloudwatchlogs.DescribeLogGroupsOutput, error) + DescribeLogStreams(*cloudwatchlogs.DescribeLogStreamsInput) (*cloudwatchlogs.DescribeLogStreamsOutput, error) + CreateLogStream(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) + PutLogEvents(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) +} + +// CloudWatchLogs plugin object definition +type CloudWatchLogs struct { + Region string `toml:"region"` + AccessKey string `toml:"access_key"` + SecretKey string `toml:"secret_key"` + RoleARN string `toml:"role_arn"` + Profile string `toml:"profile"` + Filename string `toml:"shared_credential_file"` + Token string `toml:"token"` + EndpointURL string `toml:"endpoint_url"` + + LogGroup string `toml:"log_group"` + lg *cloudwatchlogs.LogGroup //log group data + + LogStream string `toml:"log_stream"` + lsKey string //log stream source: tag or field + lsSource string //log stream source tag or field name + ls map[string]*logStreamContainer //log stream info + + LDMetricName string `toml:"log_data_metric_name"` + + LDSource string `toml:"log_data_source"` + logDatKey string //log data source (tag or field) + logDataSource string //log data source tag or field name + + svc cloudWatchLogs //cloudwatch logs service + + Log telegraf.Logger `toml:"-"` +} + +const ( + // Log events must comply with the following + // (https://docs.aws.amazon.com/sdk-for-go/api/service/cloudwatchlogs/#CloudWatchLogs.PutLogEvents): + maxLogMessageLength = 262144 - awsOverheadPerLogMessageBytes //In bytes + maxBatchSizeBytes = 1048576 // The sum of all event messages in UTF-8, plus 26 bytes for each log event + awsOverheadPerLogMessageBytes = 26 + maxFutureLogEventTimeOffset = time.Hour * 2 // None of the log events in the batch can be more than 2 hours in the future. + + maxPastLogEventTimeOffset = time.Hour * 24 * 14 // None of the log events in the batch can be older than 14 days or older + // than the retention period of the log group. + + maxItemsInBatch = 10000 // The maximum number of log events in a batch is 10,000. + + //maxTimeSpanInBatch = time.Hour * 24 // A batch of log events in a single request cannot span more than 24 hours. + // Otherwise, the operation fails. +) + +var sampleConfig = ` +## The region is the Amazon region that you wish to connect to. +## Examples include but are not limited to: +## - us-west-1 +## - us-west-2 +## - us-east-1 +## - ap-southeast-1 +## - ap-southeast-2 +## ... +region = "us-east-1" + +## Amazon Credentials +## Credentials are loaded in the following order +## 1) Assumed credentials via STS if role_arn is specified +## 2) explicit credentials from 'access_key' and 'secret_key' +## 3) shared profile from 'profile' +## 4) environment variables +## 5) shared credentials file +## 6) EC2 Instance Profile +#access_key = "" +#secret_key = "" +#token = "" +#role_arn = "" +#profile = "" +#shared_credential_file = "" + +## Endpoint to make request against, the correct endpoint is automatically +## determined and this option should only be set if you wish to override the +## default. +## ex: endpoint_url = "http://localhost:8000" +# endpoint_url = "" + +## Cloud watch log group. Must be created in AWS cloudwatch logs upfront! +## For example, you can specify the name of the k8s cluster here to group logs from all cluster in oine place +log_group = "my-group-name" + +## Log stream in log group +## Either log group name or reference to metric attribute, from which it can be parsed: +## tag: or field:. If log stream is not exist, it will be created. +## Since AWS is not automatically delete logs streams with expired logs entries (i.e. empty log stream) +## you need to put in place appropriate house-keeping (https://forums.aws.amazon.com/thread.jspa?threadID=178855) +log_stream = "tag:location" + +## Source of log data - metric name +## specify the name of the metric, from which the log data should be retrieved. +## I.e., if you are using docker_log plugin to stream logs from container, then +## specify log_data_metric_name = "docker_log" +log_data_metric_name = "docker_log" + +## Specify from which metric attribute the log data should be retrieved: +## tag: or field:. +## I.e., if you are using docker_log plugin to stream logs from container, then +## specify log_data_source = "field:message" +log_data_source = "field:message" +` + +// SampleConfig returns sample config description for plugin +func (c *CloudWatchLogs) SampleConfig() string { + return sampleConfig +} + +// Description returns one-liner description for plugin +func (c *CloudWatchLogs) Description() string { + return "Configuration for AWS CloudWatchLogs output." +} + +// Init initialize plugin with checking configuration parameters +func (c *CloudWatchLogs) Init() error { + if c.LogGroup == "" { + return fmt.Errorf("log group is not set") + } + + if c.LogStream == "" { + return fmt.Errorf("log stream is not set") + } + + if c.LDMetricName == "" { + return fmt.Errorf("log data metrics name is not set") + } + + if c.LDSource == "" { + return fmt.Errorf("log data source is not set") + } + lsSplitArray := strings.Split(c.LDSource, ":") + if len(lsSplitArray) != 2 { + return fmt.Errorf("log data source is not properly formatted, ':' is missed.\n" + + "Should be 'tag:' or 'field:'") + } + + if lsSplitArray[0] != "tag" && lsSplitArray[0] != "field" { + return fmt.Errorf("log data source is not properly formatted.\n" + + "Should be 'tag:' or 'field:'") + } + + c.logDatKey = lsSplitArray[0] + c.logDataSource = lsSplitArray[1] + c.Log.Debugf("Log data: key '%s', source '%s'...", c.logDatKey, c.logDataSource) + + if c.lsSource == "" { + c.lsSource = c.LogStream + c.Log.Debugf("Log stream '%s'...", c.lsSource) + } + + return nil +} + +// Connect connects plugin with to receiver of metrics +func (c *CloudWatchLogs) Connect() error { + var queryToken *string + var dummyToken = "dummy" + var logGroupsOutput = &cloudwatchlogs.DescribeLogGroupsOutput{NextToken: &dummyToken} + var err error + + credentialConfig := &internalaws.CredentialConfig{ + Region: c.Region, + AccessKey: c.AccessKey, + SecretKey: c.SecretKey, + RoleARN: c.RoleARN, + Profile: c.Profile, + Filename: c.Filename, + Token: c.Token, + EndpointURL: c.EndpointURL, + } + configProvider := credentialConfig.Credentials() + + c.svc = cloudwatchlogs.New(configProvider) + if c.svc == nil { + return fmt.Errorf("can't create cloudwatch logs service endpoint") + } + + //Find log group with name 'c.LogGroup' + if c.lg == nil { //In case connection is not retried, first time + for logGroupsOutput.NextToken != nil { + logGroupsOutput, err = c.svc.DescribeLogGroups( + &cloudwatchlogs.DescribeLogGroupsInput{ + LogGroupNamePrefix: &c.LogGroup, + NextToken: queryToken}) + + if err != nil { + return err + } + queryToken = logGroupsOutput.NextToken + + for _, logGroup := range logGroupsOutput.LogGroups { + if *(logGroup.LogGroupName) == c.LogGroup { + c.Log.Debugf("Found log group %q", c.LogGroup) + c.lg = logGroup + } + } + } + + if c.lg == nil { + return fmt.Errorf("can't find log group %q", c.LogGroup) + } + + lsSplitArray := strings.Split(c.LogStream, ":") + if len(lsSplitArray) > 1 { + if lsSplitArray[0] == "tag" || lsSplitArray[0] == "field" { + c.lsKey = lsSplitArray[0] + c.lsSource = lsSplitArray[1] + c.Log.Debugf("Log stream: key %q, source %q...", c.lsKey, c.lsSource) + } + } + + if c.lsSource == "" { + c.lsSource = c.LogStream + c.Log.Debugf("Log stream %q...", c.lsSource) + } + + c.ls = map[string]*logStreamContainer{} + } + + return nil +} + +// Close closes plugin connection with remote receiver +func (c *CloudWatchLogs) Close() error { + return nil +} + +// Write perform metrics write to receiver of metrics +func (c *CloudWatchLogs) Write(metrics []telegraf.Metric) error { + minTime := time.Now() + if c.lg.RetentionInDays != nil { + minTime = minTime.Add(-time.Hour * 24 * time.Duration(*c.lg.RetentionInDays)) + } else { + minTime = minTime.Add(-maxPastLogEventTimeOffset) + } + + maxTime := time.Now().Add(maxFutureLogEventTimeOffset) + + for _, m := range metrics { + //Filtering metrics + if m.Name() != c.LDMetricName { + continue + } + + if m.Time().After(maxTime) || m.Time().Before(minTime) { + c.Log.Debugf("Processing metric '%v': Metric is filtered based on TS!", m) + continue + } + + tags := m.Tags() + fields := m.Fields() + + logStream := "" + logData := "" + lsContainer := &logStreamContainer{ + currentBatchSizeBytes: 0, + currentBatchIndex: 0, + messageBatches: []messageBatch{{}}} + + switch c.lsKey { + case "tag": + logStream = tags[c.lsSource] + case "field": + if fields[c.lsSource] != nil { + logStream = fields[c.lsSource].(string) + } + default: + logStream = c.lsSource + } + + if logStream == "" { + c.Log.Errorf("Processing metric '%v': log stream: key %q, source %q, not found!", m, c.lsKey, c.lsSource) + continue + } + + switch c.logDatKey { + case "tag": + logData = tags[c.logDataSource] + case "field": + if fields[c.logDataSource] != nil { + logData = fields[c.logDataSource].(string) + } + } + + if logData == "" { + c.Log.Errorf("Processing metric '%v': log data: key %q, source %q, not found!", m, c.logDatKey, c.logDataSource) + continue + } + + //Check if message size is not fit to batch + if len(logData) > maxLogMessageLength { + metricStr := fmt.Sprintf("%v", m) + c.Log.Errorf("Processing metric '%s...', message is too large to fit to aws max log message size: %d (bytes) !", metricStr[0:maxLogMessageLength/1000], maxLogMessageLength) + continue + } + //Batching log messages + //awsOverheadPerLogMessageBytes - is mandatory aws overhead per each log message + messageSizeInBytesForAWS := len(logData) + awsOverheadPerLogMessageBytes + + //Pick up existing or prepare new log stream container. + //Log stream container stores logs per log stream in + //the AWS Cloudwatch logs API friendly structure + if val, ok := c.ls[logStream]; ok { + lsContainer = val + } else { + lsContainer.messageBatches[0].messageCount = 0 + lsContainer.messageBatches[0].logEvents = []*cloudwatchlogs.InputLogEvent{} + c.ls[logStream] = lsContainer + } + + if lsContainer.currentBatchSizeBytes+messageSizeInBytesForAWS > maxBatchSizeBytes || + lsContainer.messageBatches[lsContainer.currentBatchIndex].messageCount >= maxItemsInBatch { + //Need to start new batch, and reset counters + lsContainer.currentBatchIndex++ + lsContainer.messageBatches = append(lsContainer.messageBatches, + messageBatch{ + logEvents: []*cloudwatchlogs.InputLogEvent{}, + messageCount: 0}) + lsContainer.currentBatchSizeBytes = messageSizeInBytesForAWS + } else { + lsContainer.currentBatchSizeBytes += messageSizeInBytesForAWS + lsContainer.messageBatches[lsContainer.currentBatchIndex].messageCount++ + } + + //AWS need time in milliseconds. time.UnixNano() returns time in nanoseconds since epoch + //we store here TS with nanosec precision iun order to have proper ordering, later ts will be reduced to milliseconds + metricTime := m.Time().UnixNano() + //Adding metring to batch + lsContainer.messageBatches[lsContainer.currentBatchIndex].logEvents = + append(lsContainer.messageBatches[lsContainer.currentBatchIndex].logEvents, + &cloudwatchlogs.InputLogEvent{ + Message: &logData, + Timestamp: &metricTime}) + } + + // Sorting out log events by TS and sending them to cloud watch logs + for logStream, elem := range c.ls { + for index, batch := range elem.messageBatches { + if len(batch.logEvents) == 0 { //can't push empty batch + //c.Log.Warnf("Empty batch detected, skipping...") + continue + } + //Sorting + sort.Slice(batch.logEvents[:], func(i, j int) bool { + return *batch.logEvents[i].Timestamp < *batch.logEvents[j].Timestamp + }) + + putLogEvents := cloudwatchlogs.PutLogEventsInput{LogGroupName: &c.LogGroup, LogStreamName: &logStream} + if elem.sequenceToken == "" { + //This is the first attempt to write to log stream, + //need to check log stream existence and create it if necessary + describeLogStreamOutput, err := c.svc.DescribeLogStreams(&cloudwatchlogs.DescribeLogStreamsInput{ + LogGroupName: &c.LogGroup, + LogStreamNamePrefix: &logStream}) + if err == nil && len(describeLogStreamOutput.LogStreams) == 0 { + _, err := c.svc.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{ + LogGroupName: &c.LogGroup, + LogStreamName: &logStream}) + if err != nil { + c.Log.Errorf("Can't create log stream %q in log group. Reason: %v %q.", logStream, c.LogGroup, err) + continue + } + putLogEvents.SequenceToken = nil + } else if err == nil && len(describeLogStreamOutput.LogStreams) == 1 { + putLogEvents.SequenceToken = describeLogStreamOutput.LogStreams[0].UploadSequenceToken + } else if err == nil && len(describeLogStreamOutput.LogStreams) > 1 { //Ambiguity + c.Log.Errorf("More than 1 log stream found with prefix %q in log group %q.", logStream, c.LogGroup) + continue + } else { + c.Log.Errorf("Error describing log streams in log group %q. Reason: %v", c.LogGroup, err) + continue + } + } else { + putLogEvents.SequenceToken = &elem.sequenceToken + } + + //Upload log events + //Adjusting TS to be in align with cloudwatch logs requirements + for _, event := range batch.logEvents { + *event.Timestamp = *event.Timestamp / 1000000 + } + putLogEvents.LogEvents = batch.logEvents + + //There is a quota of 5 requests per second per log stream. Additional + //requests are throttled. This quota can't be changed. + putLogEventsOutput, err := c.svc.PutLogEvents(&putLogEvents) + if err != nil { + c.Log.Errorf("Can't push logs batch to AWS. Reason: %v", err) + continue + } + //Cleanup batch + elem.messageBatches[index] = messageBatch{ + logEvents: []*cloudwatchlogs.InputLogEvent{}, + messageCount: 0} + + elem.sequenceToken = *putLogEventsOutput.NextSequenceToken + } + } + + return nil +} + +func init() { + outputs.Add("cloudwatch_logs", func() telegraf.Output { + return &CloudWatchLogs{} + }) +} diff --git a/plugins/outputs/cloudwatch_logs/cloudwatch_logs_test.go b/plugins/outputs/cloudwatch_logs/cloudwatch_logs_test.go new file mode 100644 index 0000000000000..66378969f2ac2 --- /dev/null +++ b/plugins/outputs/cloudwatch_logs/cloudwatch_logs_test.go @@ -0,0 +1,528 @@ +package cloudwatch_logs + +import ( + "fmt" + "math/rand" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/aws/aws-sdk-go/service/cloudwatchlogs" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +type mockCloudWatchLogs struct { + logStreamName string + pushedLogEvents []cloudwatchlogs.InputLogEvent +} + +func (c *mockCloudWatchLogs) Init(lsName string) { + c.logStreamName = lsName + c.pushedLogEvents = make([]cloudwatchlogs.InputLogEvent, 0) +} + +func (c *mockCloudWatchLogs) DescribeLogGroups(*cloudwatchlogs.DescribeLogGroupsInput) (*cloudwatchlogs.DescribeLogGroupsOutput, error) { + return nil, nil +} + +func (c *mockCloudWatchLogs) DescribeLogStreams(*cloudwatchlogs.DescribeLogStreamsInput) (*cloudwatchlogs.DescribeLogStreamsOutput, error) { + arn := "arn" + creationTime := time.Now().Unix() + sequenceToken := "arbitraryToken" + output := &cloudwatchlogs.DescribeLogStreamsOutput{ + LogStreams: []*cloudwatchlogs.LogStream{ + { + Arn: &arn, + CreationTime: &creationTime, + FirstEventTimestamp: &creationTime, + LastEventTimestamp: &creationTime, + LastIngestionTime: &creationTime, + LogStreamName: &c.logStreamName, + UploadSequenceToken: &sequenceToken, + }}, + NextToken: &sequenceToken, + } + return output, nil +} +func (c *mockCloudWatchLogs) CreateLogStream(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { + return nil, nil +} +func (c *mockCloudWatchLogs) PutLogEvents(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + sequenceToken := "arbitraryToken" + output := &cloudwatchlogs.PutLogEventsOutput{NextSequenceToken: &sequenceToken} + //Saving messages + for _, event := range input.LogEvents { + c.pushedLogEvents = append(c.pushedLogEvents, *event) + } + + return output, nil +} + +//Ensure mockCloudWatchLogs implement cloudWatchLogs interface +var _ cloudWatchLogs = (*mockCloudWatchLogs)(nil) + +func RandStringBytes(n int) string { + const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" + b := make([]byte, n) + for i := range b { + b[i] = letterBytes[rand.Intn(len(letterBytes))] + } + return string(b) +} +func TestInit(t *testing.T) { + tests := []struct { + name string + expectedErrorString string + plugin *CloudWatchLogs + }{ + { + name: "log group is not set", + expectedErrorString: "log group is not set", + plugin: &CloudWatchLogs{ + Region: "eu-central-1", + AccessKey: "dummy", + SecretKey: "dummy", + LogGroup: "", + LogStream: "tag:source", + LDMetricName: "docker_log", + LDSource: "field:message", + Log: testutil.Logger{ + Name: "outputs.cloudwatch_logs", + }, + }, + }, + { + name: "log stream is not set", + expectedErrorString: "log stream is not set", + plugin: &CloudWatchLogs{ + Region: "eu-central-1", + AccessKey: "dummy", + SecretKey: "dummy", + LogGroup: "TestLogGroup", + LogStream: "", + LDMetricName: "docker_log", + LDSource: "field:message", + Log: testutil.Logger{ + Name: "outputs.cloudwatch_logs", + }, + }, + }, + { + name: "log data metrics name is not set", + expectedErrorString: "log data metrics name is not set", + plugin: &CloudWatchLogs{ + Region: "eu-central-1", + AccessKey: "dummy", + SecretKey: "dummy", + LogGroup: "TestLogGroup", + LogStream: "tag:source", + LDMetricName: "", + LDSource: "field:message", + Log: testutil.Logger{ + Name: "outputs.cloudwatch_logs", + }, + }, + }, + { + name: "log data source is not set", + expectedErrorString: "log data source is not set", + plugin: &CloudWatchLogs{ + Region: "eu-central-1", + AccessKey: "dummy", + SecretKey: "dummy", + LogGroup: "TestLogGroup", + LogStream: "tag:source", + LDMetricName: "docker_log", + LDSource: "", + Log: testutil.Logger{ + Name: "outputs.cloudwatch_logs", + }, + }, + }, + { + name: "log data source is not properly formatted (no divider)", + expectedErrorString: "log data source is not properly formatted, ':' is missed.\n" + + "Should be 'tag:' or 'field:'", + plugin: &CloudWatchLogs{ + Region: "eu-central-1", + AccessKey: "dummy", + SecretKey: "dummy", + LogGroup: "TestLogGroup", + LogStream: "tag:source", + LDMetricName: "docker_log", + LDSource: "field_message", + Log: testutil.Logger{ + Name: "outputs.cloudwatch_logs", + }, + }, + }, + { + name: "log data source is not properly formatted (inappropriate fields)", + expectedErrorString: "log data source is not properly formatted.\n" + + "Should be 'tag:' or 'field:'", + plugin: &CloudWatchLogs{ + Region: "eu-central-1", + AccessKey: "dummy", + SecretKey: "dummy", + LogGroup: "TestLogGroup", + LogStream: "tag:source", + LDMetricName: "docker_log", + LDSource: "bla:bla", + Log: testutil.Logger{ + Name: "outputs.cloudwatch_logs", + }, + }, + }, + { + name: "valid config", + plugin: &CloudWatchLogs{ + Region: "eu-central-1", + AccessKey: "dummy", + SecretKey: "dummy", + LogGroup: "TestLogGroup", + LogStream: "tag:source", + LDMetricName: "docker_log", + LDSource: "tag:location", + Log: testutil.Logger{ + Name: "outputs.cloudwatch_logs", + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.expectedErrorString != "" { + require.EqualError(t, tt.plugin.Init(), tt.expectedErrorString) + } else { + require.Nil(t, tt.plugin.Init()) + } + }) + } +} + +func TestConnect(t *testing.T) { + //mock cloudwatch logs endpoint that is used only in plugin.Connect + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = fmt.Fprintln(w, + `{ + "logGroups": [ + { + "arn": "string", + "creationTime": 123456789, + "kmsKeyId": "string", + "logGroupName": "TestLogGroup", + "metricFilterCount": 1, + "retentionInDays": 10, + "storedBytes": 0 + } + ] + }`) + })) + defer ts.Close() + + plugin := &CloudWatchLogs{ + Region: "eu-central-1", + AccessKey: "dummy", + SecretKey: "dummy", + EndpointURL: ts.URL, + LogGroup: "TestLogGroup", + LogStream: "tag:source", + LDMetricName: "docker_log", + LDSource: "field:message", + Log: testutil.Logger{ + Name: "outputs.cloudwatch_logs", + }, + } + + require.Nil(t, plugin.Init()) + require.Nil(t, plugin.Connect()) +} + +func TestWrite(t *testing.T) { + //mock cloudwatch logs endpoint that is used only in plugin.Connect + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = fmt.Fprintln(w, + `{ + "logGroups": [ + { + "arn": "string", + "creationTime": 123456789, + "kmsKeyId": "string", + "logGroupName": "TestLogGroup", + "metricFilterCount": 1, + "retentionInDays": 1, + "storedBytes": 0 + } + ] + }`) + })) + defer ts.Close() + + plugin := &CloudWatchLogs{ + Region: "eu-central-1", + AccessKey: "dummy", + SecretKey: "dummy", + EndpointURL: ts.URL, + LogGroup: "TestLogGroup", + LogStream: "tag:source", + LDMetricName: "docker_log", + LDSource: "field:message", + Log: testutil.Logger{ + Name: "outputs.cloudwatch_logs", + }, + } + require.Nil(t, plugin.Init()) + require.Nil(t, plugin.Connect()) + + tests := []struct { + name string + logStreamName string + metrics []telegraf.Metric + expectedMetricsOrder map[int]int //map[] + expectedMetricsCount int + }{ + { + name: "Sorted by timestamp log entries", + logStreamName: "deadbeef", + expectedMetricsOrder: map[int]int{0: 0, 1: 1}, + expectedMetricsCount: 2, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "docker_log", + map[string]string{ + "container_name": "telegraf", + "container_image": "influxdata/telegraf", + "container_version": "1.11.0", + "stream": "tty", + "source": "deadbeef", + }, + map[string]interface{}{ + "container_id": "deadbeef", + "message": "Sorted: message #1", + }, + time.Now().Add(-time.Minute), + ), + testutil.MustMetric( + "docker_log", + map[string]string{ + "container_name": "telegraf", + "container_image": "influxdata/telegraf", + "container_version": "1.11.0", + "stream": "tty", + "source": "deadbeef", + }, + map[string]interface{}{ + "container_id": "deadbeef", + "message": "Sorted: message #2", + }, + time.Now(), + ), + }, + }, + { + name: "Unsorted log entries", + logStreamName: "deadbeef", + expectedMetricsOrder: map[int]int{0: 1, 1: 0}, + expectedMetricsCount: 2, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "docker_log", + map[string]string{ + "container_name": "telegraf", + "container_image": "influxdata/telegraf", + "container_version": "1.11.0", + "stream": "tty", + "source": "deadbeef", + }, + map[string]interface{}{ + "container_id": "deadbeef", + "message": "Unsorted: message #1", + }, + time.Now(), + ), + testutil.MustMetric( + "docker_log", + map[string]string{ + "container_name": "telegraf", + "container_image": "influxdata/telegraf", + "container_version": "1.11.0", + "stream": "tty", + "source": "deadbeef", + }, + map[string]interface{}{ + "container_id": "deadbeef", + "message": "Unsorted: message #2", + }, + time.Now().Add(-time.Minute), + ), + }, + }, + { + name: "Too old log entry & log entry in the future", + logStreamName: "deadbeef", + expectedMetricsCount: 0, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "docker_log", + map[string]string{ + "container_name": "telegraf", + "container_image": "influxdata/telegraf", + "container_version": "1.11.0", + "stream": "tty", + "source": "deadbeef", + }, + map[string]interface{}{ + "container_id": "deadbeef", + "message": "message #1", + }, + time.Now().Add(-maxPastLogEventTimeOffset).Add(-time.Hour), + ), + testutil.MustMetric( + "docker_log", + map[string]string{ + "container_name": "telegraf", + "container_image": "influxdata/telegraf", + "container_version": "1.11.0", + "stream": "tty", + "source": "deadbeef", + }, + map[string]interface{}{ + "container_id": "deadbeef", + "message": "message #2", + }, + time.Now().Add(maxFutureLogEventTimeOffset).Add(time.Hour), + ), + }, + }, + { + name: "Oversized log entry", + logStreamName: "deadbeef", + expectedMetricsCount: 0, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "docker_log", + map[string]string{ + "container_name": "telegraf", + "container_image": "influxdata/telegraf", + "container_version": "1.11.0", + "stream": "tty", + "source": "deadbeef", + }, + map[string]interface{}{ + "container_id": "deadbeef", + //Here comes very long message + "message": RandStringBytes(maxLogMessageLength + 1), + }, + time.Now().Add(-time.Minute), + ), + }, + }, + { + name: "Batching log entries", + logStreamName: "deadbeef", + expectedMetricsOrder: map[int]int{0: 0, 1: 1, 2: 2, 3: 3, 4: 4}, + expectedMetricsCount: 5, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "docker_log", + map[string]string{ + "container_name": "telegraf", + "container_image": "influxdata/telegraf", + "container_version": "1.11.0", + "stream": "tty", + "source": "deadbeef", + }, + map[string]interface{}{ + "container_id": "deadbeef", + //Here comes very long message to cause message batching + "message": "batch1 message1:" + RandStringBytes(maxLogMessageLength-16), + }, + time.Now().Add(-4*time.Minute), + ), + testutil.MustMetric( + "docker_log", + map[string]string{ + "container_name": "telegraf", + "container_image": "influxdata/telegraf", + "container_version": "1.11.0", + "stream": "tty", + "source": "deadbeef", + }, + map[string]interface{}{ + "container_id": "deadbeef", + //Here comes very long message to cause message batching + "message": "batch1 message2:" + RandStringBytes(maxLogMessageLength-16), + }, + time.Now().Add(-3*time.Minute), + ), + testutil.MustMetric( + "docker_log", + map[string]string{ + "container_name": "telegraf", + "container_image": "influxdata/telegraf", + "container_version": "1.11.0", + "stream": "tty", + "source": "deadbeef", + }, + map[string]interface{}{ + "container_id": "deadbeef", + //Here comes very long message to cause message batching + "message": "batch1 message3:" + RandStringBytes(maxLogMessageLength-16), + }, + time.Now().Add(-2*time.Minute), + ), + testutil.MustMetric( + "docker_log", + map[string]string{ + "container_name": "telegraf", + "container_image": "influxdata/telegraf", + "container_version": "1.11.0", + "stream": "tty", + "source": "deadbeef", + }, + map[string]interface{}{ + "container_id": "deadbeef", + //Here comes very long message to cause message batching + "message": "batch1 message4:" + RandStringBytes(maxLogMessageLength-16), + }, + time.Now().Add(-time.Minute), + ), + testutil.MustMetric( + "docker_log", + map[string]string{ + "container_name": "telegraf", + "container_image": "influxdata/telegraf", + "container_version": "1.11.0", + "stream": "tty", + "source": "deadbeef", + }, + map[string]interface{}{ + "container_id": "deadbeef", + "message": "batch2 message1", + }, + time.Now(), + ), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + //Overwrite cloud watch log endpoint + mockCwl := &mockCloudWatchLogs{} + mockCwl.Init(tt.logStreamName) + plugin.svc = mockCwl + require.Nil(t, plugin.Write(tt.metrics)) + require.Equal(t, tt.expectedMetricsCount, len(mockCwl.pushedLogEvents)) + + for index, elem := range mockCwl.pushedLogEvents { + require.Equal(t, *elem.Message, tt.metrics[tt.expectedMetricsOrder[index]].Fields()["message"]) + require.Equal(t, *elem.Timestamp, tt.metrics[tt.expectedMetricsOrder[index]].Time().UnixNano()/1000000) + } + }) + } +}