Skip to content

Commit

Permalink
Parse offset if provided, otherwise use constants
Browse files Browse the repository at this point in the history
  • Loading branch information
mostafa committed Dec 18, 2024
1 parent efefb69 commit ac97393
Showing 1 changed file with 31 additions and 7 deletions.
38 changes: 31 additions & 7 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"encoding/json"
"errors"
"io"
"strconv"
"time"

"github.com/grafana/sobek"
kafkago "github.com/segmentio/kafka-go"
"github.com/sirupsen/logrus"
"go.k6.io/k6/js/common"
"go.k6.io/k6/metrics"
)
Expand All @@ -28,9 +30,17 @@ var (
IsolationLevels map[string]kafkago.IsolationLevel

// Start offsets.
lastOffset = "start_offset_last_offset"
firstOffset = "start_offset_first_offset"

lastOffset = "start_offsets_last_offset"
firstOffset = "start_offsets_first_offset"

// StartOffset determines from whence the consumer group should begin
// consuming when it finds a partition without a committed offset. If
// non-zero, it must be set to one of FirstOffset or LastOffset.
//
// Default: FirstOffset
//
// Only used when GroupID is set
// Ref: https://github.com/segmentio/kafka-go/blob/a8e5eabf4a90025a4ad2c28e929324d18db21103/reader.go#L481-L488
StartOffsets map[string]int64

RebalanceTimeout = time.Second * 5
Expand Down Expand Up @@ -239,11 +249,25 @@ func (k *Kafka) reader(readerConfig *ReaderConfig) *kafkago.Reader {
isolationLevel = IsolationLevels[readerConfig.IsolationLevel]
}

var startOffset int64
var startOffset int64 // Will be set if GroupID is specified and valid StartOffset is provided
if readerConfig.GroupID != "" && readerConfig.StartOffset != "" {
startOffset = StartOffsets[firstOffset] // Default to FirstOffset
if s, ok := StartOffsets[readerConfig.StartOffset]; ok {
startOffset = s
// Check if StartOffset is a predefined value
if predefinedOffset, exists := StartOffsets[readerConfig.StartOffset]; exists {
startOffset = predefinedOffset
} else {
// Attempt to parse StartOffset as an integer
parsedOffset, err := strconv.ParseInt(readerConfig.StartOffset, 10, 64)
if err != nil {
// Log the error and default to FirstOffset
logger.WithFields(logrus.Fields{
"error": err,
"start_offset": readerConfig.StartOffset,
}).Error("Invalid StartOffset, defaulting to FirstOffset")
startOffset = StartOffsets[firstOffset]
} else {
// Use the parsed offset if valid
startOffset = parsedOffset
}
}
}

Expand Down

0 comments on commit ac97393

Please sign in to comment.