Skip to content

Commit

Permalink
Renew expired shard iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu committed Oct 7, 2024
1 parent 4e19415 commit df0c7f2
Showing 1 changed file with 20 additions and 2 deletions.
22 changes: 20 additions & 2 deletions shard_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,33 @@ func getShardIterator(k kinesisiface.KinesisAPI, streamName string, shardID stri
}

// getRecords returns the next records and shard iterator from the given shard iterator
func getRecords(k kinesisiface.KinesisAPI, iterator string) (records []*kinesis.Record, nextIterator string, lag time.Duration, err error) {
func getRecords(k kinesisiface.KinesisAPI, streamName string, shardID string, iterator string) (records []*kinesis.Record, nextIterator string, lag time.Duration, err error) {
params := &kinesis.GetRecordsInput{
Limit: aws.Int64(getRecordsLimit),
ShardIterator: aws.String(iterator),
}

// sleep for 5 minutes + 5 sec, to make sure the shard iterator expires
fmt.Println("Sleeping for 305 seconds")
fmt.Printf("pre-sleep time.Now: %s\n", time.Now().Format(time.RFC3339))
time.Sleep(305 * time.Second)
fmt.Printf("post-sleep time.Now: %s\n", time.Now().Format(time.RFC3339))

output, err := k.GetRecords(params)

if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case kinesis.ErrCodeExpiredIteratorException:
fmt.Printf("Renewing expired shard [id: %s] iterator\n", shardID)
it, ierr := getShardIterator(k, streamName, shardID, iterator, nil)
if ierr != nil {
return nil, "", 0, ierr
}
fmt.Printf("Renewed expired shard [id: %s] iterator successfully\n", shardID)
return getRecords(k, streamName, shardID, it)
}
}
return nil, "", 0, err
}

Expand Down Expand Up @@ -194,7 +212,7 @@ mainloop:
}

// Get records from kinesis
records, next, lag, err := getRecords(k.kinesis, iterator)
records, next, lag, err := getRecords(k.kinesis, k.streamName, shardID, iterator)

if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
Expand Down

0 comments on commit df0c7f2

Please sign in to comment.