Skip to content

Commit

Permalink
Writing unit tests for Kinesis output plugin Write method (influxdata…
Browse files Browse the repository at this point in the history
  • Loading branch information
JeffAshton authored Mar 10, 2021
1 parent ed468f4 commit 06e9775
Show file tree
Hide file tree
Showing 2 changed files with 354 additions and 2 deletions.
6 changes: 4 additions & 2 deletions plugins/outputs/kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
"github.com/influxdata/telegraf/plugins/serializers"
)

// Limit set by AWS (https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html)
const maxRecordsPerRequest uint32 = 500

type (
KinesisOutput struct {
Region string `toml:"region"`
Expand Down Expand Up @@ -243,8 +246,7 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {

r = append(r, &d)

if sz == 500 {
// Max Messages Per PutRecordRequest is 500
if sz == maxRecordsPerRequest {
elapsed := k.writeKinesis(r)
k.Log.Debugf("Wrote a %d point batch to Kinesis in %+v.", sz, elapsed)
sz = 0
Expand Down
350 changes: 350 additions & 0 deletions plugins/outputs/kinesis/kinesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ import (
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
"github.com/gofrs/uuid"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const zero int64 = 0
Expand Down Expand Up @@ -227,6 +231,272 @@ func TestWriteKinesis_WhenServiceError(t *testing.T) {
})
}

func TestWrite_NoMetrics(t *testing.T) {
assert := assert.New(t)
serializer := influx.NewSerializer()
svc := &mockKinesisPutRecords{}

k := KinesisOutput{
Log: testutil.Logger{},
Partition: &Partition{
Method: "static",
Key: "partitionKey",
},
StreamName: "stream",
serializer: serializer,
svc: svc,
}

err := k.Write([]telegraf.Metric{})
assert.Nil(err, "Should not return error")

svc.AssertRequests(assert, []*kinesis.PutRecordsInput{})
}

func TestWrite_SingleMetric(t *testing.T) {
assert := assert.New(t)
serializer := influx.NewSerializer()
partitionKey := "partitionKey"
streamName := "stream"

svc := &mockKinesisPutRecords{}
svc.SetupGenericResponse(1, 0)

k := KinesisOutput{
Log: testutil.Logger{},
Partition: &Partition{
Method: "static",
Key: partitionKey,
},
StreamName: streamName,
serializer: serializer,
svc: svc,
}

metric, metricData := createTestMetric(t, "metric1", serializer)
err := k.Write([]telegraf.Metric{metric})
assert.Nil(err, "Should not return error")

svc.AssertRequests(assert, []*kinesis.PutRecordsInput{
{
StreamName: &streamName,
Records: []*kinesis.PutRecordsRequestEntry{
{
PartitionKey: &partitionKey,
Data: metricData,
},
},
},
})
}

func TestWrite_MultipleMetrics_SinglePartialRequest(t *testing.T) {
assert := assert.New(t)
serializer := influx.NewSerializer()
partitionKey := "partitionKey"
streamName := "stream"

svc := &mockKinesisPutRecords{}
svc.SetupGenericResponse(3, 0)

k := KinesisOutput{
Log: testutil.Logger{},
Partition: &Partition{
Method: "static",
Key: partitionKey,
},
StreamName: streamName,
serializer: serializer,
svc: svc,
}

metrics, metricsData := createTestMetrics(t, 3, serializer)
err := k.Write(metrics)
assert.Nil(err, "Should not return error")

svc.AssertRequests(assert, []*kinesis.PutRecordsInput{
{
StreamName: &streamName,
Records: createPutRecordsRequestEntries(
metricsData,
&partitionKey,
),
},
})
}

func TestWrite_MultipleMetrics_SingleFullRequest(t *testing.T) {
assert := assert.New(t)
serializer := influx.NewSerializer()
partitionKey := "partitionKey"
streamName := "stream"

svc := &mockKinesisPutRecords{}
svc.SetupGenericResponse(maxRecordsPerRequest, 0)

k := KinesisOutput{
Log: testutil.Logger{},
Partition: &Partition{
Method: "static",
Key: partitionKey,
},
StreamName: streamName,
serializer: serializer,
svc: svc,
}

metrics, metricsData := createTestMetrics(t, maxRecordsPerRequest, serializer)
err := k.Write(metrics)
assert.Nil(err, "Should not return error")

svc.AssertRequests(assert, []*kinesis.PutRecordsInput{
{
StreamName: &streamName,
Records: createPutRecordsRequestEntries(
metricsData,
&partitionKey,
),
},
})
}

func TestWrite_MultipleMetrics_MultipleRequests(t *testing.T) {
assert := assert.New(t)
serializer := influx.NewSerializer()
partitionKey := "partitionKey"
streamName := "stream"

svc := &mockKinesisPutRecords{}
svc.SetupGenericResponse(maxRecordsPerRequest, 0)
svc.SetupGenericResponse(1, 0)

k := KinesisOutput{
Log: testutil.Logger{},
Partition: &Partition{
Method: "static",
Key: partitionKey,
},
StreamName: streamName,
serializer: serializer,
svc: svc,
}

metrics, metricsData := createTestMetrics(t, maxRecordsPerRequest+1, serializer)
err := k.Write(metrics)
assert.Nil(err, "Should not return error")

svc.AssertRequests(assert, []*kinesis.PutRecordsInput{
{
StreamName: &streamName,
Records: createPutRecordsRequestEntries(
metricsData[0:maxRecordsPerRequest],
&partitionKey,
),
},
{
StreamName: &streamName,
Records: createPutRecordsRequestEntries(
metricsData[maxRecordsPerRequest:],
&partitionKey,
),
},
})
}

func TestWrite_MultipleMetrics_MultipleFullRequests(t *testing.T) {
assert := assert.New(t)
serializer := influx.NewSerializer()
partitionKey := "partitionKey"
streamName := "stream"

svc := &mockKinesisPutRecords{}
svc.SetupGenericResponse(maxRecordsPerRequest, 0)
svc.SetupGenericResponse(maxRecordsPerRequest, 0)

k := KinesisOutput{
Log: testutil.Logger{},
Partition: &Partition{
Method: "static",
Key: partitionKey,
},
StreamName: streamName,
serializer: serializer,
svc: svc,
}

metrics, metricsData := createTestMetrics(t, maxRecordsPerRequest*2, serializer)
err := k.Write(metrics)
assert.Nil(err, "Should not return error")

svc.AssertRequests(assert, []*kinesis.PutRecordsInput{
{
StreamName: &streamName,
Records: createPutRecordsRequestEntries(
metricsData[0:maxRecordsPerRequest],
&partitionKey,
),
},
{
StreamName: &streamName,
Records: createPutRecordsRequestEntries(
metricsData[maxRecordsPerRequest:],
&partitionKey,
),
},
})
}

func TestWrite_SerializerError(t *testing.T) {
assert := assert.New(t)
serializer := influx.NewSerializer()
partitionKey := "partitionKey"
streamName := "stream"

svc := &mockKinesisPutRecords{}
svc.SetupGenericResponse(2, 0)

k := KinesisOutput{
Log: testutil.Logger{},
Partition: &Partition{
Method: "static",
Key: partitionKey,
},
StreamName: streamName,
serializer: serializer,
svc: svc,
}

metric1, metric1Data := createTestMetric(t, "metric1", serializer)
metric2, metric2Data := createTestMetric(t, "metric2", serializer)

// metric is invalid because of empty name
invalidMetric := testutil.TestMetric(3, "")

err := k.Write([]telegraf.Metric{
metric1,
invalidMetric,
metric2,
})
assert.Nil(err, "Should not return error")

// remaining valid metrics should still get written
svc.AssertRequests(assert, []*kinesis.PutRecordsInput{
{
StreamName: &streamName,
Records: []*kinesis.PutRecordsRequestEntry{
{
PartitionKey: &partitionKey,
Data: metric1Data,
},
{
PartitionKey: &partitionKey,
Data: metric2Data,
},
},
},
})
}

type mockKinesisPutRecordsResponse struct {
Output *kinesis.PutRecordsOutput
Err error
Expand All @@ -253,6 +523,35 @@ func (m *mockKinesisPutRecords) SetupResponse(
})
}

func (m *mockKinesisPutRecords) SetupGenericResponse(
successfulRecordCount uint32,
failedRecordCount uint32,
) {

errorCode := "InternalFailure"
errorMessage := "Internal Service Failure"
shard := "shardId-000000000003"

records := []*kinesis.PutRecordsResultEntry{}

for i := uint32(0); i < successfulRecordCount; i++ {
sequenceNumber := fmt.Sprintf("%d", i)
records = append(records, &kinesis.PutRecordsResultEntry{
SequenceNumber: &sequenceNumber,
ShardId: &shard,
})
}

for i := uint32(0); i < failedRecordCount; i++ {
records = append(records, &kinesis.PutRecordsResultEntry{
ErrorCode: &errorCode,
ErrorMessage: &errorMessage,
})
}

m.SetupResponse(int64(failedRecordCount), records)
}

func (m *mockKinesisPutRecords) SetupErrorResponse(err error) {

m.responses = append(m.responses, &mockKinesisPutRecordsResponse{
Expand Down Expand Up @@ -323,3 +622,54 @@ func (m *mockKinesisPutRecords) AssertRequests(
}
}
}

func createTestMetric(
t *testing.T,
name string,
serializer serializers.Serializer,
) (telegraf.Metric, []byte) {

metric := testutil.TestMetric(1, name)

data, err := serializer.Serialize(metric)
require.NoError(t, err)

return metric, data
}

func createTestMetrics(
t *testing.T,
count uint32,
serializer serializers.Serializer,
) ([]telegraf.Metric, [][]byte) {

metrics := make([]telegraf.Metric, count)
metricsData := make([][]byte, count)

for i := uint32(0); i < count; i++ {
name := fmt.Sprintf("metric%d", i)
metric, data := createTestMetric(t, name, serializer)
metrics[i] = metric
metricsData[i] = data
}

return metrics, metricsData
}

func createPutRecordsRequestEntries(
metricsData [][]byte,
partitionKey *string,
) []*kinesis.PutRecordsRequestEntry {

count := len(metricsData)
records := make([]*kinesis.PutRecordsRequestEntry, count)

for i := 0; i < count; i++ {
records[i] = &kinesis.PutRecordsRequestEntry{
PartitionKey: partitionKey,
Data: metricsData[i],
}
}

return records
}

0 comments on commit 06e9775

Please sign in to comment.