From 9a48088063ecac42effe6e5b54faeaac11480229 Mon Sep 17 00:00:00 2001 From: Steven McVicker <33295381+StevenMcVicker@users.noreply.github.com> Date: Fri, 15 Mar 2024 13:15:29 -0600 Subject: [PATCH] Allow users to use ListShards to check Kinesis stream readiness (#43) Co-authored-by: Steven McVicker Co-authored-by: Gareth Lewin Co-authored-by: Sean Lydon --- config.go | 9 +++++++++ config_test.go | 4 +++- kinsumer.go | 10 +++++++++- 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/config.go b/config.go index 6880ef3..c9cd749 100644 --- a/config.go +++ b/config.go @@ -41,6 +41,9 @@ type Config struct { dynamoWriteCapacity int64 // Time to wait between attempts to verify tables were created/deleted completely dynamoWaiterDelay time.Duration + + // use ListShards to avoid LimitExceedException from DescribeStream + useListShardsForKinesisStreamReady bool } // NewConfig returns a default Config struct @@ -119,6 +122,12 @@ func (c Config) WithLogger(logger Logger) Config { return c } +// WithUseListShardsForKinesisStreamReady returns a config with a modified useListShardsForKinesisStreamReady toggle +func (c Config) WithUseListShardsForKinesisStreamReady(shouldUse bool) Config { + c.useListShardsForKinesisStreamReady = shouldUse + return c +} + // Verify that a config struct has sane and valid values func validateConfig(c *Config) error { if c.throttleDelay < 200*time.Millisecond { diff --git a/config_test.go b/config_test.go index 4300303..4b9ab17 100644 --- a/config_test.go +++ b/config_test.go @@ -61,7 +61,8 @@ func TestConfigWithMethods(t *testing.T) { WithShardCheckFrequency(1 * time.Second). WithLeaderActionFrequency(1 * time.Second). WithThrottleDelay(1 * time.Second). - WithStats(stats) + WithStats(stats). + WithUseListShardsForKinesisStreamReady(true) err := validateConfig(&config) require.NoError(t, err) @@ -72,4 +73,5 @@ func TestConfigWithMethods(t *testing.T) { require.Equal(t, 1*time.Second, config.shardCheckFrequency) require.Equal(t, 1*time.Second, config.leaderActionFrequency) require.Equal(t, stats, config.stats) + require.Equal(t, true, config.useListShardsForKinesisStreamReady) } diff --git a/kinsumer.go b/kinsumer.go index b1dcace..1f4043b 100644 --- a/kinsumer.go +++ b/kinsumer.go @@ -316,6 +316,15 @@ func (k *Kinsumer) dynamoDeleteTableIfExists(name string) error { // kinesisStreamReady returns an error if the given stream is not ACTIVE func (k *Kinsumer) kinesisStreamReady() error { + if k.config.useListShardsForKinesisStreamReady { + _, err := k.kinesis.ListShards(&kinesis.ListShardsInput{ + StreamName: aws.String(k.streamName), + }) + if err != nil { + return fmt.Errorf("error listing shards for stream %s: %v", k.streamName, err) + } + return nil + } out, err := k.kinesis.DescribeStream(&kinesis.DescribeStreamInput{ StreamName: aws.String(k.streamName), }) @@ -327,7 +336,6 @@ func (k *Kinsumer) kinesisStreamReady() error { if status != "ACTIVE" && status != "UPDATING" { return fmt.Errorf("stream %s exists but state '%s' is not 'ACTIVE' or 'UPDATING'", k.streamName, status) } - return nil }