diff --git a/shard_consumer.go b/shard_consumer.go index d7c2d9d..9cec015 100644 --- a/shard_consumer.go +++ b/shard_consumer.go @@ -56,15 +56,33 @@ func getShardIterator(k kinesisiface.KinesisAPI, streamName string, shardID stri } // getRecords returns the next records and shard iterator from the given shard iterator -func getRecords(k kinesisiface.KinesisAPI, iterator string) (records []*kinesis.Record, nextIterator string, lag time.Duration, err error) { +func getRecords(k kinesisiface.KinesisAPI, streamName string, shardID string, iterator string) (records []*kinesis.Record, nextIterator string, lag time.Duration, err error) { params := &kinesis.GetRecordsInput{ Limit: aws.Int64(getRecordsLimit), ShardIterator: aws.String(iterator), } + // sleep for 5 minutes + 5 sec, to make sure the shard iterator expires + fmt.Println("Sleeping for 305 seconds") + fmt.Printf("pre-sleep time.Now: %s\n", time.Now().Format(time.RFC3339)) + time.Sleep(305 * time.Second) + fmt.Printf("post-sleep time.Now: %s\n", time.Now().Format(time.RFC3339)) + output, err := k.GetRecords(params) if err != nil { + if aerr, ok := err.(awserr.Error); ok { + switch aerr.Code() { + case kinesis.ErrCodeExpiredIteratorException: + fmt.Printf("Renewing expired shard [id: %s] iterator\n", shardID) + it, ierr := getShardIterator(k, streamName, shardID, iterator, nil) + if ierr != nil { + return nil, "", 0, ierr + } + fmt.Printf("Renewed expired shard [id: %s] iterator successfully\n", shardID) + return getRecords(k, streamName, shardID, it) + } + } return nil, "", 0, err } @@ -194,7 +212,7 @@ mainloop: } // Get records from kinesis - records, next, lag, err := getRecords(k.kinesis, iterator) + records, next, lag, err := getRecords(k.kinesis, k.streamName, shardID, iterator) if err != nil { if awsErr, ok := err.(awserr.Error); ok {