Skip to content

Commit

Permalink
Renew expired shard iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu committed Oct 10, 2024
1 parent 4e19415 commit 54a5b63
Showing 1 changed file with 15 additions and 2 deletions.
17 changes: 15 additions & 2 deletions shard_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func getShardIterator(k kinesisiface.KinesisAPI, streamName string, shardID stri
}

// getRecords returns the next records and shard iterator from the given shard iterator
func getRecords(k kinesisiface.KinesisAPI, iterator string) (records []*kinesis.Record, nextIterator string, lag time.Duration, err error) {
func getRecords(k kinesisiface.KinesisAPI, streamName string, shardID string, iterator string) (records []*kinesis.Record, nextIterator string, lag time.Duration, err error) {
params := &kinesis.GetRecordsInput{
Limit: aws.Int64(getRecordsLimit),
ShardIterator: aws.String(iterator),
Expand All @@ -65,6 +65,19 @@ func getRecords(k kinesisiface.KinesisAPI, iterator string) (records []*kinesis.
output, err := k.GetRecords(params)

if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case kinesis.ErrCodeExpiredIteratorException:
fmt.Printf("Renewing expired shard [id: %s] iterator\n", shardID)
it, ierr := getShardIterator(k, streamName, shardID, iterator, nil)
if ierr != nil {
fmt.Printf("Failed to renew shard [id: %s] iterator\n", shardID)
return nil, "", 0, ierr
}
fmt.Printf("Renewed expired shard [id: %s] iterator successfully\n", shardID)
return getRecords(k, streamName, shardID, it)
}
}
return nil, "", 0, err
}

Expand Down Expand Up @@ -194,7 +207,7 @@ mainloop:
}

// Get records from kinesis
records, next, lag, err := getRecords(k.kinesis, iterator)
records, next, lag, err := getRecords(k.kinesis, k.streamName, shardID, iterator)

if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
Expand Down

0 comments on commit 54a5b63

Please sign in to comment.