diff --git a/plugins/outputs/kinesis/kinesis_test.go b/plugins/outputs/kinesis/kinesis_test.go index 24de7413c1718..22b8e83e48e24 100644 --- a/plugins/outputs/kinesis/kinesis_test.go +++ b/plugins/outputs/kinesis/kinesis_test.go @@ -4,6 +4,7 @@ import ( "fmt" "testing" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" @@ -16,6 +17,10 @@ import ( "github.com/stretchr/testify/require" ) +const testPartitionKey = "partitionKey" +const testShardID = "shardId-000000000003" +const testSequenceNumber = "49543463076570308322303623326179887152428262250726293588" +const testStreamName = "streamName" const zero int64 = 0 func TestPartitionKey(t *testing.T) { @@ -105,14 +110,9 @@ func TestPartitionKey(t *testing.T) { func TestWriteKinesis_WhenSuccess(t *testing.T) { assert := assert.New(t) - partitionKey := "partitionKey" - shard := "shard" - sequenceNumber := "sequenceNumber" - streamName := "stream" - records := []*kinesis.PutRecordsRequestEntry{ { - PartitionKey: &partitionKey, + PartitionKey: aws.String(testPartitionKey), Data: []byte{0x65}, }, } @@ -122,26 +122,24 @@ func TestWriteKinesis_WhenSuccess(t *testing.T) { 0, []*kinesis.PutRecordsResultEntry{ { - ErrorCode: nil, - ErrorMessage: nil, - SequenceNumber: &sequenceNumber, - ShardId: &shard, + SequenceNumber: aws.String(testSequenceNumber), + ShardId: aws.String(testShardID), }, }, ) k := KinesisOutput{ Log: testutil.Logger{}, - StreamName: streamName, + StreamName: testStreamName, svc: svc, } elapsed := k.writeKinesis(records) assert.GreaterOrEqual(elapsed.Nanoseconds(), zero) - svc.AssertRequests(assert, []*kinesis.PutRecordsInput{ + svc.AssertRequests(t, []*kinesis.PutRecordsInput{ { - StreamName: &streamName, + StreamName: aws.String(testStreamName), Records: records, }, }) @@ -150,14 +148,9 @@ func TestWriteKinesis_WhenSuccess(t *testing.T) { func TestWriteKinesis_WhenRecordErrors(t *testing.T) { assert := assert.New(t) - errorCode := "InternalFailure" - errorMessage := "Internal Service Failure" - partitionKey := "partitionKey" - streamName := "stream" - records := []*kinesis.PutRecordsRequestEntry{ { - PartitionKey: &partitionKey, + PartitionKey: aws.String(testPartitionKey), Data: []byte{0x66}, }, } @@ -167,26 +160,24 @@ func TestWriteKinesis_WhenRecordErrors(t *testing.T) { 1, []*kinesis.PutRecordsResultEntry{ { - ErrorCode: &errorCode, - ErrorMessage: &errorMessage, - SequenceNumber: nil, - ShardId: nil, + ErrorCode: aws.String("InternalFailure"), + ErrorMessage: aws.String("Internal Service Failure"), }, }, ) k := KinesisOutput{ Log: testutil.Logger{}, - StreamName: streamName, + StreamName: testStreamName, svc: svc, } elapsed := k.writeKinesis(records) assert.GreaterOrEqual(elapsed.Nanoseconds(), zero) - svc.AssertRequests(assert, []*kinesis.PutRecordsInput{ + svc.AssertRequests(t, []*kinesis.PutRecordsInput{ { - StreamName: &streamName, + StreamName: aws.String(testStreamName), Records: records, }, }) @@ -195,12 +186,9 @@ func TestWriteKinesis_WhenRecordErrors(t *testing.T) { func TestWriteKinesis_WhenServiceError(t *testing.T) { assert := assert.New(t) - partitionKey := "partitionKey" - streamName := "stream" - records := []*kinesis.PutRecordsRequestEntry{ { - PartitionKey: &partitionKey, + PartitionKey: aws.String(testPartitionKey), Data: []byte{}, }, } @@ -212,16 +200,16 @@ func TestWriteKinesis_WhenServiceError(t *testing.T) { k := KinesisOutput{ Log: testutil.Logger{}, - StreamName: streamName, + StreamName: testStreamName, svc: svc, } elapsed := k.writeKinesis(records) assert.GreaterOrEqual(elapsed.Nanoseconds(), zero) - svc.AssertRequests(assert, []*kinesis.PutRecordsInput{ + svc.AssertRequests(t, []*kinesis.PutRecordsInput{ { - StreamName: &streamName, + StreamName: aws.String(testStreamName), Records: records, }, }) @@ -246,14 +234,12 @@ func TestWrite_NoMetrics(t *testing.T) { err := k.Write([]telegraf.Metric{}) assert.Nil(err, "Should not return error") - svc.AssertRequests(assert, []*kinesis.PutRecordsInput{}) + svc.AssertRequests(t, []*kinesis.PutRecordsInput{}) } func TestWrite_SingleMetric(t *testing.T) { assert := assert.New(t) serializer := influx.NewSerializer() - partitionKey := "partitionKey" - streamName := "stream" svc := &mockKinesisPutRecords{} svc.SetupGenericResponse(1, 0) @@ -262,9 +248,9 @@ func TestWrite_SingleMetric(t *testing.T) { Log: testutil.Logger{}, Partition: &Partition{ Method: "static", - Key: partitionKey, + Key: testPartitionKey, }, - StreamName: streamName, + StreamName: testStreamName, serializer: serializer, svc: svc, } @@ -273,12 +259,12 @@ func TestWrite_SingleMetric(t *testing.T) { err := k.Write([]telegraf.Metric{metric}) assert.Nil(err, "Should not return error") - svc.AssertRequests(assert, []*kinesis.PutRecordsInput{ + svc.AssertRequests(t, []*kinesis.PutRecordsInput{ { - StreamName: &streamName, + StreamName: aws.String(testStreamName), Records: []*kinesis.PutRecordsRequestEntry{ { - PartitionKey: &partitionKey, + PartitionKey: aws.String(testPartitionKey), Data: metricData, }, }, @@ -289,8 +275,6 @@ func TestWrite_SingleMetric(t *testing.T) { func TestWrite_MultipleMetrics_SinglePartialRequest(t *testing.T) { assert := assert.New(t) serializer := influx.NewSerializer() - partitionKey := "partitionKey" - streamName := "stream" svc := &mockKinesisPutRecords{} svc.SetupGenericResponse(3, 0) @@ -299,9 +283,9 @@ func TestWrite_MultipleMetrics_SinglePartialRequest(t *testing.T) { Log: testutil.Logger{}, Partition: &Partition{ Method: "static", - Key: partitionKey, + Key: testPartitionKey, }, - StreamName: streamName, + StreamName: testStreamName, serializer: serializer, svc: svc, } @@ -310,12 +294,11 @@ func TestWrite_MultipleMetrics_SinglePartialRequest(t *testing.T) { err := k.Write(metrics) assert.Nil(err, "Should not return error") - svc.AssertRequests(assert, []*kinesis.PutRecordsInput{ + svc.AssertRequests(t, []*kinesis.PutRecordsInput{ { - StreamName: &streamName, + StreamName: aws.String(testStreamName), Records: createPutRecordsRequestEntries( metricsData, - &partitionKey, ), }, }) @@ -324,8 +307,6 @@ func TestWrite_MultipleMetrics_SinglePartialRequest(t *testing.T) { func TestWrite_MultipleMetrics_SingleFullRequest(t *testing.T) { assert := assert.New(t) serializer := influx.NewSerializer() - partitionKey := "partitionKey" - streamName := "stream" svc := &mockKinesisPutRecords{} svc.SetupGenericResponse(maxRecordsPerRequest, 0) @@ -334,9 +315,9 @@ func TestWrite_MultipleMetrics_SingleFullRequest(t *testing.T) { Log: testutil.Logger{}, Partition: &Partition{ Method: "static", - Key: partitionKey, + Key: testPartitionKey, }, - StreamName: streamName, + StreamName: testStreamName, serializer: serializer, svc: svc, } @@ -345,12 +326,11 @@ func TestWrite_MultipleMetrics_SingleFullRequest(t *testing.T) { err := k.Write(metrics) assert.Nil(err, "Should not return error") - svc.AssertRequests(assert, []*kinesis.PutRecordsInput{ + svc.AssertRequests(t, []*kinesis.PutRecordsInput{ { - StreamName: &streamName, + StreamName: aws.String(testStreamName), Records: createPutRecordsRequestEntries( metricsData, - &partitionKey, ), }, }) @@ -359,8 +339,6 @@ func TestWrite_MultipleMetrics_SingleFullRequest(t *testing.T) { func TestWrite_MultipleMetrics_MultipleRequests(t *testing.T) { assert := assert.New(t) serializer := influx.NewSerializer() - partitionKey := "partitionKey" - streamName := "stream" svc := &mockKinesisPutRecords{} svc.SetupGenericResponse(maxRecordsPerRequest, 0) @@ -370,9 +348,9 @@ func TestWrite_MultipleMetrics_MultipleRequests(t *testing.T) { Log: testutil.Logger{}, Partition: &Partition{ Method: "static", - Key: partitionKey, + Key: testPartitionKey, }, - StreamName: streamName, + StreamName: testStreamName, serializer: serializer, svc: svc, } @@ -381,19 +359,17 @@ func TestWrite_MultipleMetrics_MultipleRequests(t *testing.T) { err := k.Write(metrics) assert.Nil(err, "Should not return error") - svc.AssertRequests(assert, []*kinesis.PutRecordsInput{ + svc.AssertRequests(t, []*kinesis.PutRecordsInput{ { - StreamName: &streamName, + StreamName: aws.String(testStreamName), Records: createPutRecordsRequestEntries( metricsData[0:maxRecordsPerRequest], - &partitionKey, ), }, { - StreamName: &streamName, + StreamName: aws.String(testStreamName), Records: createPutRecordsRequestEntries( metricsData[maxRecordsPerRequest:], - &partitionKey, ), }, }) @@ -402,8 +378,6 @@ func TestWrite_MultipleMetrics_MultipleRequests(t *testing.T) { func TestWrite_MultipleMetrics_MultipleFullRequests(t *testing.T) { assert := assert.New(t) serializer := influx.NewSerializer() - partitionKey := "partitionKey" - streamName := "stream" svc := &mockKinesisPutRecords{} svc.SetupGenericResponse(maxRecordsPerRequest, 0) @@ -413,9 +387,9 @@ func TestWrite_MultipleMetrics_MultipleFullRequests(t *testing.T) { Log: testutil.Logger{}, Partition: &Partition{ Method: "static", - Key: partitionKey, + Key: testPartitionKey, }, - StreamName: streamName, + StreamName: testStreamName, serializer: serializer, svc: svc, } @@ -424,19 +398,17 @@ func TestWrite_MultipleMetrics_MultipleFullRequests(t *testing.T) { err := k.Write(metrics) assert.Nil(err, "Should not return error") - svc.AssertRequests(assert, []*kinesis.PutRecordsInput{ + svc.AssertRequests(t, []*kinesis.PutRecordsInput{ { - StreamName: &streamName, + StreamName: aws.String(testStreamName), Records: createPutRecordsRequestEntries( metricsData[0:maxRecordsPerRequest], - &partitionKey, ), }, { - StreamName: &streamName, + StreamName: aws.String(testStreamName), Records: createPutRecordsRequestEntries( metricsData[maxRecordsPerRequest:], - &partitionKey, ), }, }) @@ -445,8 +417,6 @@ func TestWrite_MultipleMetrics_MultipleFullRequests(t *testing.T) { func TestWrite_SerializerError(t *testing.T) { assert := assert.New(t) serializer := influx.NewSerializer() - partitionKey := "partitionKey" - streamName := "stream" svc := &mockKinesisPutRecords{} svc.SetupGenericResponse(2, 0) @@ -455,9 +425,9 @@ func TestWrite_SerializerError(t *testing.T) { Log: testutil.Logger{}, Partition: &Partition{ Method: "static", - Key: partitionKey, + Key: testPartitionKey, }, - StreamName: streamName, + StreamName: testStreamName, serializer: serializer, svc: svc, } @@ -476,16 +446,16 @@ func TestWrite_SerializerError(t *testing.T) { assert.Nil(err, "Should not return error") // remaining valid metrics should still get written - svc.AssertRequests(assert, []*kinesis.PutRecordsInput{ + svc.AssertRequests(t, []*kinesis.PutRecordsInput{ { - StreamName: &streamName, + StreamName: aws.String(testStreamName), Records: []*kinesis.PutRecordsRequestEntry{ { - PartitionKey: &partitionKey, + PartitionKey: aws.String(testPartitionKey), Data: metric1Data, }, { - PartitionKey: &partitionKey, + PartitionKey: aws.String(testPartitionKey), Data: metric2Data, }, }, @@ -512,7 +482,7 @@ func (m *mockKinesisPutRecords) SetupResponse( m.responses = append(m.responses, &mockKinesisPutRecordsResponse{ Err: nil, Output: &kinesis.PutRecordsOutput{ - FailedRecordCount: &failedRecordCount, + FailedRecordCount: aws.Int64(failedRecordCount), Records: records, }, }) @@ -522,24 +492,19 @@ func (m *mockKinesisPutRecords) SetupGenericResponse( successfulRecordCount uint32, failedRecordCount uint32, ) { - errorCode := "InternalFailure" - errorMessage := "Internal Service Failure" - shard := "shardId-000000000003" - records := []*kinesis.PutRecordsResultEntry{} for i := uint32(0); i < successfulRecordCount; i++ { - sequenceNumber := fmt.Sprintf("%d", i) records = append(records, &kinesis.PutRecordsResultEntry{ - SequenceNumber: &sequenceNumber, - ShardId: &shard, + SequenceNumber: aws.String(testSequenceNumber), + ShardId: aws.String(testShardID), }) } for i := uint32(0); i < failedRecordCount; i++ { records = append(records, &kinesis.PutRecordsResultEntry{ - ErrorCode: &errorCode, - ErrorMessage: &errorMessage, + ErrorCode: aws.String("InternalFailure"), + ErrorMessage: aws.String("Internal Service Failure"), }) } @@ -566,49 +531,49 @@ func (m *mockKinesisPutRecords) PutRecords(input *kinesis.PutRecordsInput) (*kin } func (m *mockKinesisPutRecords) AssertRequests( - assert *assert.Assertions, + t *testing.T, expected []*kinesis.PutRecordsInput, ) { - assert.Equal( + require.Equalf(t, len(expected), len(m.requests), - fmt.Sprintf("Expected %v requests", len(expected)), + "Expected %v requests", len(expected), ) for i, expectedInput := range expected { actualInput := m.requests[i] - assert.Equal( + require.Equalf(t, expectedInput.StreamName, actualInput.StreamName, - fmt.Sprintf("Expected request %v to have correct StreamName", i), + "Expected request %v to have correct StreamName", i, ) - assert.Equal( + require.Equalf(t, len(expectedInput.Records), len(actualInput.Records), - fmt.Sprintf("Expected request %v to have %v Records", i, len(expectedInput.Records)), + "Expected request %v to have %v Records", i, len(expectedInput.Records), ) for r, expectedRecord := range expectedInput.Records { actualRecord := actualInput.Records[r] - assert.Equal( - &expectedRecord.PartitionKey, - &actualRecord.PartitionKey, - fmt.Sprintf("Expected (request %v, record %v) to have correct PartitionKey", i, r), + require.Equalf(t, + expectedRecord.PartitionKey, + actualRecord.PartitionKey, + "Expected (request %v, record %v) to have correct PartitionKey", i, r, ) - assert.Equal( - &expectedRecord.ExplicitHashKey, - &actualRecord.ExplicitHashKey, - fmt.Sprintf("Expected (request %v, record %v) to have correct ExplicitHashKey", i, r), + require.Equalf(t, + expectedRecord.ExplicitHashKey, + actualRecord.ExplicitHashKey, + "Expected (request %v, record %v) to have correct ExplicitHashKey", i, r, ) - assert.Equal( + require.Equalf(t, expectedRecord.Data, actualRecord.Data, - fmt.Sprintf("Expected (request %v, record %v) to have correct Data", i, r), + "Expected (request %v, record %v) to have correct Data", i, r, ) } } @@ -647,14 +612,13 @@ func createTestMetrics( func createPutRecordsRequestEntries( metricsData [][]byte, - partitionKey *string, ) []*kinesis.PutRecordsRequestEntry { count := len(metricsData) records := make([]*kinesis.PutRecordsRequestEntry, count) for i := 0; i < count; i++ { records[i] = &kinesis.PutRecordsRequestEntry{ - PartitionKey: partitionKey, + PartitionKey: aws.String(testPartitionKey), Data: metricsData[i], } }