Skip to content

Commit

Permalink
Merge pull request #18 from Canva/sasan-s3-sink-add-kinesis-read-metrics
Browse files Browse the repository at this point in the history
Add time and count metrics
  • Loading branch information
sasanrose authored Apr 28, 2023
2 parents ba585e3 + 4e011a2 commit ee1673b
Showing 1 changed file with 9 additions and 1 deletion.
10 changes: 9 additions & 1 deletion internal/impl/aws/input_kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,6 @@ func (k *kinesisReader) runConsumer(wg *sync.WaitGroup, streamID, shardID, start
recordBatcher.Close(context.Background(), state == awsKinesisConsumerFinished)
boff.Reset()
k.boffPool.Put(boff)

reason := ""
switch state {
case awsKinesisConsumerFinished:
Expand Down Expand Up @@ -413,6 +412,12 @@ func (k *kinesisReader) runConsumer(wg *sync.WaitGroup, streamID, shardID, start
}
}

kinesisReadTime := time.Now()
metricKVectorReadTime := k.mgr.Metrics().GetTimerVec("kinesis_reader_time", "stream", "shardId")
metricKReadTime := metricKVectorReadTime.With(streamID, shardID)
metricKVectorReadCount := k.mgr.Metrics().GetCounterVec("kinesis_reader_count", "stream", "shardId")
metricKReadCount := metricKVectorReadCount.With(streamID, shardID)

for {
var err error
if state == awsKinesisConsumerConsuming && len(pending) == 0 && nextPullChan == unblockedChan {
Expand All @@ -435,6 +440,9 @@ func (k *kinesisReader) runConsumer(wg *sync.WaitGroup, streamID, shardID, start
} else if len(pending) == 0 {
nextPullChan = time.After(boff.NextBackOff())
} else {
metricKReadTime.Timing(time.Since(kinesisReadTime).Nanoseconds())
kinesisReadTime = time.Now()
metricKReadCount.Incr(int64(len(pending)))
boff.Reset()
nextPullChan = blockedChan
}
Expand Down

0 comments on commit ee1673b

Please sign in to comment.