Skip to content

Commit

Permalink
Renew expired shard iterator
Browse files Browse the repository at this point in the history
A Kinesis shard iterator expires 5 minutes after it is returned and requires renewal.
  • Loading branch information
oguzhanunlu committed Oct 11, 2024
1 parent 61d8da7 commit 9fba69e
Showing 1 changed file with 13 additions and 0 deletions.
13 changes: 13 additions & 0 deletions shard_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,19 @@ mainloop:
k.config.logger.Log("Got error: %s %s %sretry count is %d / %d", awsErr.Code(), awsErr.Message(), origErrStr, retryCount, maxErrorRetries)
// Only retry for errors that should be retried; notably, don't retry serialization errors because something bad is happening
shouldRetry := request.IsErrorRetryable(err) || request.IsErrorThrottle(err)

switch awsErr.Code() {
case kinesis.ErrCodeExpiredIteratorException:
newIterator, ierr := getShardIterator(k.kinesis, k.streamName, shardID, lastSeqToCheckp, nil)
if ierr != nil {
k.shardErrors <- shardConsumerError{shardID: shardID, action: "getShardIterator", err: err}
return
}
iterator = newIterator
// should retry after expired iterator is renewed successfully
shouldRetry = true
}

if shouldRetry && retryCount < maxErrorRetries {
retryCount++

Expand Down

0 comments on commit 9fba69e

Please sign in to comment.