Skip to content

Commit

Permalink
Remove exponential backoff code (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
PettitWesley authored Jun 1, 2020
1 parent 3268b34 commit 65f77ac
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 29 deletions.
12 changes: 4 additions & 8 deletions fluent-bit-kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,9 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int
}
count++
}
err := kinesisOutput.Flush()
if err != nil {
logrus.Errorf("[kinesis %d] %v\n", kinesisOutput.PluginID, err)
return output.FLB_ERROR
retCode := kinesisOutput.Flush()
if retCode != output.FLB_OK {
return retCode
}
logrus.Debugf("[kinesis %d] Processed %d events with tag %s\n", kinesisOutput.PluginID, count, fluentTag)

Expand All @@ -158,10 +157,7 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int
func FLBPluginExit() int {
// Before final exit, call Flush() for all the instances of the Output Plugin
for i := range pluginInstances {
err := pluginInstances[i].Flush()
if err != nil {
logrus.Errorf("[kinesis %d] %v\n", pluginInstances[i].PluginID, err)
}
pluginInstances[i].Flush()
}

return output.FLB_OK
Expand Down
36 changes: 18 additions & 18 deletions kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ type OutputPlugin struct {
client PutRecordsClient
records []*kinesis.PutRecordsRequestEntry
dataLength int
backoff *plugins.Backoff
timer *plugins.Timeout
PluginID int
random *random
Expand Down Expand Up @@ -134,7 +133,6 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint,
timeKey: timeKey,
fmtStrftime: timeFormatter,
lastInvalidPartitionKeyIndex: -1,
backoff: plugins.NewBackoff(),
timer: timer,
PluginID: pluginID,
random: random,
Expand Down Expand Up @@ -199,11 +197,12 @@ func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{},
newRecordSize := len(data) + len(partitionKey)

if len(outputPlugin.records) == maximumRecordsPerPut || (outputPlugin.dataLength+newRecordSize) > maximumPutRecordBatchSize {
err = outputPlugin.sendCurrentBatch()
retCode, err := outputPlugin.sendCurrentBatch()
if err != nil {
logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err)
// If FluentBit fails to send logs, it will retry rather than discarding the logs
return fluentbit.FLB_RETRY
}
if retCode != fluentbit.FLB_OK {
return retCode
}
}

Expand All @@ -216,8 +215,13 @@ func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{},
}

// Flush sends the current buffer of log records
func (outputPlugin *OutputPlugin) Flush() error {
return outputPlugin.sendCurrentBatch()
// Returns FLB_OK, FLB_RETRY, FLB_ERROR
func (outputPlugin *OutputPlugin) Flush() int {
retCode, err := outputPlugin.sendCurrentBatch()
if err != nil {
logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err)
}
return retCode
}

func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface{}, partitionKey string) ([]byte, error) {
Expand Down Expand Up @@ -251,12 +255,11 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface
return data, nil
}

func (outputPlugin *OutputPlugin) sendCurrentBatch() error {
func (outputPlugin *OutputPlugin) sendCurrentBatch() (int, error) {
if outputPlugin.lastInvalidPartitionKeyIndex >= 0 {
logrus.Errorf("[kinesis %d] Invalid partition key. Failed to find partition_key %s in log record %s", outputPlugin.PluginID, outputPlugin.partitionKey, outputPlugin.records[outputPlugin.lastInvalidPartitionKeyIndex].Data)
outputPlugin.lastInvalidPartitionKeyIndex = -1
}
outputPlugin.backoff.Wait()
outputPlugin.timer.Check()

response, err := outputPlugin.client.PutRecords(&kinesis.PutRecordsInput{
Expand All @@ -269,11 +272,9 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch() error {
if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() == kinesis.ErrCodeProvisionedThroughputExceededException {
logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID)
// Backoff and Retry
outputPlugin.backoff.StartBackoff()
}
}
return err
return fluentbit.FLB_RETRY, err
}
logrus.Debugf("[kinesis %d] Sent %d events to Kinesis\n", outputPlugin.PluginID, len(outputPlugin.records))

Expand All @@ -282,12 +283,12 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch() error {

// processAPIResponse processes the successful and failed records
// it returns an error iff no records succeeded (i.e.) no progress has been made
func (outputPlugin *OutputPlugin) processAPIResponse(response *kinesis.PutRecordsOutput) error {
func (outputPlugin *OutputPlugin) processAPIResponse(response *kinesis.PutRecordsOutput) (int, error) {
if aws.Int64Value(response.FailedRecordCount) > 0 {
// start timer if all records failed (no progress has been made)
if aws.Int64Value(response.FailedRecordCount) == int64(len(outputPlugin.records)) {
outputPlugin.timer.Start()
return fmt.Errorf("PutRecords request returned with no records successfully recieved")
return fluentbit.FLB_RETRY, fmt.Errorf("PutRecords request returned with no records successfully recieved")
}

logrus.Warnf("[kinesis %d] %d records failed to be delivered. Will retry.\n", outputPlugin.PluginID, aws.Int64Value(response.FailedRecordCount))
Expand All @@ -299,8 +300,8 @@ func (outputPlugin *OutputPlugin) processAPIResponse(response *kinesis.PutRecord
failedRecords = append(failedRecords, outputPlugin.records[i])
}
if aws.StringValue(record.ErrorCode) == kinesis.ErrCodeProvisionedThroughputExceededException {
// Backoff and Retry
outputPlugin.backoff.StartBackoff()
logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID)
return fluentbit.FLB_RETRY, nil
}
}

Expand All @@ -313,11 +314,10 @@ func (outputPlugin *OutputPlugin) processAPIResponse(response *kinesis.PutRecord
} else {
// request fully succeeded
outputPlugin.timer.Reset()
outputPlugin.backoff.Reset()
outputPlugin.records = outputPlugin.records[:0]
outputPlugin.dataLength = 0
}
return nil
return fluentbit.FLB_OK, nil
}

// randomString generates a random string of length 8
Expand Down
5 changes: 2 additions & 3 deletions kinesis/kinesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ func newMockOutputPlugin(client *mock_kinesis.MockPutRecordsClient) (*OutputPlug
dataKeys: "",
partitionKey: "",
lastInvalidPartitionKeyIndex: -1,
backoff: plugins.NewBackoff(),
timer: timer,
PluginID: 0,
random: random,
Expand Down Expand Up @@ -98,6 +97,6 @@ func TestAddRecordAndFlush(t *testing.T) {
retCode := outputPlugin.AddRecord(record, &timeStamp)
assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK")

err := outputPlugin.Flush()
assert.NoError(t, err, "Unexpected error calling flush")
retCode = outputPlugin.Flush()
assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK")
}

0 comments on commit 65f77ac

Please sign in to comment.