Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into add-merge-upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
colmsnowplow committed Nov 13, 2024
2 parents 8cc8e5a + 9a48088 commit dd87bce
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 2 deletions.
9 changes: 9 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type Config struct {
dynamoWriteCapacity int64
// Time to wait between attempts to verify tables were created/deleted completely
dynamoWaiterDelay time.Duration

// use ListShards to avoid LimitExceedException from DescribeStream
useListShardsForKinesisStreamReady bool
}

// NewConfig returns a default Config struct
Expand Down Expand Up @@ -148,6 +151,12 @@ func (c Config) WithLogger(logger Logger) Config {
return c
}

// WithUseListShardsForKinesisStreamReady returns a config with a modified useListShardsForKinesisStreamReady toggle
func (c Config) WithUseListShardsForKinesisStreamReady(shouldUse bool) Config {
c.useListShardsForKinesisStreamReady = shouldUse
return c
}

// Verify that a config struct has sane and valid values
func validateConfig(c *Config) error {
if c.throttleDelay < 200*time.Millisecond {
Expand Down
4 changes: 3 additions & 1 deletion config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ func TestConfigWithMethods(t *testing.T) {
WithLeaderActionFrequency(1 * time.Second).
WithThrottleDelay(1 * time.Second).
WithStats(stats).
WithIteratorStartTimestamp(&tstamp)
WithIteratorStartTimestamp(&tstamp).
WithUseListShardsForKinesisStreamReady(true)

err := validateConfig(&config)
require.NoError(t, err)
Expand All @@ -76,4 +77,5 @@ func TestConfigWithMethods(t *testing.T) {
require.Equal(t, 1*time.Second, config.leaderActionFrequency)
require.Equal(t, stats, config.stats)
require.Equal(t, &tstamp, config.iteratorStartTimestamp)
require.Equal(t, true, config.useListShardsForKinesisStreamReady)
}
10 changes: 9 additions & 1 deletion kinsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,15 @@ func (k *Kinsumer) dynamoDeleteTableIfExists(name string) error {

// kinesisStreamReady returns an error if the given stream is not ACTIVE
func (k *Kinsumer) kinesisStreamReady() error {
if k.config.useListShardsForKinesisStreamReady {
_, err := k.kinesis.ListShards(&kinesis.ListShardsInput{
StreamName: aws.String(k.streamName),
})
if err != nil {
return fmt.Errorf("error listing shards for stream %s: %v", k.streamName, err)
}
return nil
}
out, err := k.kinesis.DescribeStream(&kinesis.DescribeStreamInput{
StreamName: aws.String(k.streamName),
})
Expand All @@ -336,7 +345,6 @@ func (k *Kinsumer) kinesisStreamReady() error {
if status != "ACTIVE" && status != "UPDATING" {
return fmt.Errorf("stream %s exists but state '%s' is not 'ACTIVE' or 'UPDATING'", k.streamName, status)
}

return nil
}

Expand Down

0 comments on commit dd87bce

Please sign in to comment.