Skip to content

Commit

Permalink
Merge pull request #24 from Canva/mathew.v-add-ischeckpointed
Browse files Browse the repository at this point in the history
  • Loading branch information
mallyx3 authored Aug 15, 2023
2 parents f534329 + c48245f commit c60fdb4
Showing 1 changed file with 46 additions and 7 deletions.
53 changes: 46 additions & 7 deletions internal/impl/aws/input_kinesis_checkpointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
// Common errors that might occur throughout checkpointing.
var (
ErrLeaseNotAcquired = errors.New("the shard could not be leased due to a collision")
ErrNoSequenceNum = errors.New("sequence ID was not found in checkpoint")
)

// awsKinesisCheckpointer manages the shard checkpointing for a given client
Expand Down Expand Up @@ -130,7 +131,7 @@ func (k *awsKinesisCheckpointer) getCheckpoint(ctx context.Context, streamID, sh
if s, ok := rawItem.Item["SequenceNumber"]; ok && s.S != nil {
c.SequenceNumber = *s.S
} else {
return nil, errors.New("sequence ID was not found in checkpoint")
return nil, ErrNoSequenceNum
}

if s, ok := rawItem.Item["ClientID"]; ok && s.S != nil {
Expand Down Expand Up @@ -282,17 +283,37 @@ func (k *awsKinesisCheckpointer) Claim(ctx context.Context, streamID, shardID, f
// This allows the victim client to update the checkpoint with the final
// sequence as it yields the shard.
if len(fromClientID) > 0 && time.Since(currentLease) < k.leaseDuration {
// Wait for the estimated next checkpoint time plus a grace period of
// The maximum duration we'll wait for is the estimated next checkpoint time plus a grace period of
// one second.
waitFor := k.leaseDuration - time.Since(currentLease) + time.Second
select {
case <-time.After(waitFor):
case <-ctx.Done():
return "", ctx.Err()
timer := time.NewTimer(waitFor)
FOR_LOOP:
for {
checkPointed, err := k.isCheckpointed(ctx, streamID, shardID, startingSequence)
if err != nil {
return "", err
}

if checkPointed {
break
}

// If the shard has not been checkpointed yet, check again in 30 seconds, or just break and
// steal the shard when the max waitFor timer expires
select {
case <-time.After(time.Second * 30):
case <-timer.C:
break FOR_LOOP
case <-ctx.Done():
return "", ctx.Err()
}
}

cp, err := k.getCheckpoint(ctx, streamID, shardID)
if err != nil {
switch {
case err == ErrNoSequenceNum:
return "", nil
case err != nil:
return "", err
}
startingSequence = cp.SequenceNumber
Expand All @@ -301,6 +322,24 @@ func (k *awsKinesisCheckpointer) Claim(ctx context.Context, streamID, shardID, f
return startingSequence, nil
}

// Attempts to determine whether a shard has been checkpointed depending on the starting sequenceNumber
// If there has been no checkpoint on that shard, then resume from start of shard.
func (k *awsKinesisCheckpointer) isCheckpointed(ctx context.Context, streamID, shardID, sequenceNumber string) (bool, error) {
item, err := k.getCheckpoint(ctx, streamID, shardID)
switch {
case err == ErrNoSequenceNum:
return true, nil
case err != nil:
return false, err
}

if item.SequenceNumber != sequenceNumber {
return true, nil
}

return false, nil
}

// Checkpoint attempts to set a sequence number for a stream shard. Returns a
// boolean indicating whether this shard is still owned by the client.
//
Expand Down

0 comments on commit c60fdb4

Please sign in to comment.