From 9fba69e398e1d63b6d8f25e8cd04ae592838b005 Mon Sep 17 00:00:00 2001 From: Oguzhan Unlu Date: Thu, 10 Oct 2024 17:08:30 +0300 Subject: [PATCH] Renew expired shard iterator A Kinesis shard iterator expires 5 minutes after it is returned and requires renewal. --- shard_consumer.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/shard_consumer.go b/shard_consumer.go index d7c2d9d..d92dab3 100644 --- a/shard_consumer.go +++ b/shard_consumer.go @@ -205,6 +205,19 @@ mainloop: k.config.logger.Log("Got error: %s %s %sretry count is %d / %d", awsErr.Code(), awsErr.Message(), origErrStr, retryCount, maxErrorRetries) // Only retry for errors that should be retried; notably, don't retry serialization errors because something bad is happening shouldRetry := request.IsErrorRetryable(err) || request.IsErrorThrottle(err) + + switch awsErr.Code() { + case kinesis.ErrCodeExpiredIteratorException: + newIterator, ierr := getShardIterator(k.kinesis, k.streamName, shardID, lastSeqToCheckp, nil) + if ierr != nil { + k.shardErrors <- shardConsumerError{shardID: shardID, action: "getShardIterator", err: err} + return + } + iterator = newIterator + // should retry after expired iterator is renewed successfully + shouldRetry = true + } + if shouldRetry && retryCount < maxErrorRetries { retryCount++