From dfc0423c4ba56ba5107f122f33818e24ea1b15cd Mon Sep 17 00:00:00 2001 From: Mathew Vo Date: Fri, 11 Aug 2023 12:09:18 +1000 Subject: [PATCH 1/2] done --- .../impl/aws/input_kinesis_checkpointer.go | 53 ++++++++++++++++--- 1 file changed, 45 insertions(+), 8 deletions(-) diff --git a/internal/impl/aws/input_kinesis_checkpointer.go b/internal/impl/aws/input_kinesis_checkpointer.go index 32ae53011d..c0527990de 100644 --- a/internal/impl/aws/input_kinesis_checkpointer.go +++ b/internal/impl/aws/input_kinesis_checkpointer.go @@ -20,6 +20,7 @@ import ( // Common errors that might occur throughout checkpointing. var ( ErrLeaseNotAcquired = errors.New("the shard could not be leased due to a collision") + ErrNoSequenceNum = errors.New("sequence ID was not found in checkpoint") ) // awsKinesisCheckpointer manages the shard checkpointing for a given client @@ -130,7 +131,7 @@ func (k *awsKinesisCheckpointer) getCheckpoint(ctx context.Context, streamID, sh if s, ok := rawItem.Item["SequenceNumber"]; ok && s.S != nil { c.SequenceNumber = *s.S } else { - return nil, errors.New("sequence ID was not found in checkpoint") + return nil, ErrNoSequenceNum } if s, ok := rawItem.Item["ClientID"]; ok && s.S != nil { @@ -282,17 +283,35 @@ func (k *awsKinesisCheckpointer) Claim(ctx context.Context, streamID, shardID, f // This allows the victim client to update the checkpoint with the final // sequence as it yields the shard. if len(fromClientID) > 0 && time.Since(currentLease) < k.leaseDuration { - // Wait for the estimated next checkpoint time plus a grace period of - // one second. waitFor := k.leaseDuration - time.Since(currentLease) + time.Second - select { - case <-time.After(waitFor): - case <-ctx.Done(): - return "", ctx.Err() + timer := time.NewTimer(waitFor) + FOR_LOOP: + for { + checkPointed, err := k.isCheckpointed(ctx, streamID, shardID, startingSequence) + if err != nil { + return "", err + } + + if checkPointed { + break + } + + // Wait for the estimated next checkpoint time plus a grace period of + // one second. + select { + case <-time.After(time.Second * 30): + case <-timer.C: + break FOR_LOOP + case <-ctx.Done(): + return "", ctx.Err() + } } cp, err := k.getCheckpoint(ctx, streamID, shardID) - if err != nil { + switch { + case err == ErrNoSequenceNum: + return "", nil + case err != nil: return "", err } startingSequence = cp.SequenceNumber @@ -301,6 +320,24 @@ func (k *awsKinesisCheckpointer) Claim(ctx context.Context, streamID, shardID, f return startingSequence, nil } +// Attempts to determine whether a shard has been checkpointed depending on the starting sequenceNumber +// If there has been no checkpoint on that shard, then resume from start of shard. +func (k *awsKinesisCheckpointer) isCheckpointed(ctx context.Context, streamID, shardID, sequenceNumber string) (bool, error) { + item, err := k.getCheckpoint(ctx, streamID, shardID) + switch { + case err == ErrNoSequenceNum: + return true, nil + case err != nil: + return false, err + } + + if item.SequenceNumber != sequenceNumber { + return true, nil + } + + return false, nil +} + // Checkpoint attempts to set a sequence number for a stream shard. Returns a // boolean indicating whether this shard is still owned by the client. // From c48245f683a2134ecac81c00d13f0b6efdaed85d Mon Sep 17 00:00:00 2001 From: Mathew Vo Date: Tue, 15 Aug 2023 10:25:29 +1000 Subject: [PATCH 2/2] add comment --- internal/impl/aws/input_kinesis_checkpointer.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/impl/aws/input_kinesis_checkpointer.go b/internal/impl/aws/input_kinesis_checkpointer.go index c0527990de..d4b01426da 100644 --- a/internal/impl/aws/input_kinesis_checkpointer.go +++ b/internal/impl/aws/input_kinesis_checkpointer.go @@ -283,6 +283,8 @@ func (k *awsKinesisCheckpointer) Claim(ctx context.Context, streamID, shardID, f // This allows the victim client to update the checkpoint with the final // sequence as it yields the shard. if len(fromClientID) > 0 && time.Since(currentLease) < k.leaseDuration { + // The maximum duration we'll wait for is the estimated next checkpoint time plus a grace period of + // one second. waitFor := k.leaseDuration - time.Since(currentLease) + time.Second timer := time.NewTimer(waitFor) FOR_LOOP: @@ -296,8 +298,8 @@ func (k *awsKinesisCheckpointer) Claim(ctx context.Context, streamID, shardID, f break } - // Wait for the estimated next checkpoint time plus a grace period of - // one second. + // If the shard has not been checkpointed yet, check again in 30 seconds, or just break and + // steal the shard when the max waitFor timer expires select { case <-time.After(time.Second * 30): case <-timer.C: