Skip to content

Commit

Permalink
Revert corrupt records error
Browse files Browse the repository at this point in the history
  • Loading branch information
constanca-m committed Jan 9, 2025
1 parent f350765 commit f1ad0ff
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 13 deletions.
15 changes: 3 additions & 12 deletions receiver/awsfirehosereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression"
"io"
"net"
Expand Down Expand Up @@ -263,23 +264,13 @@ func (fmr *firehoseReceiver) transformRecords(records []firehoseRecord, decompre

data, err := base64.StdEncoding.DecodeString(record.Data)
if err != nil {
fmr.settings.Logger.Error(
"Unable to base64 decode the record data",
zap.Int("index", index),
zap.Error(err),
)
continue
return transformed, fmt.Errorf("unable to base64 decode the record at index %d: %w", index, err)
}

if decompress {
data, err = compression.Unzip(data)
if err != nil {
fmr.settings.Logger.Error(
"Failed to unzip record data",
zap.Int("index", index),
zap.Error(err),
)
continue
return transformed, fmt.Errorf("unable to unzip decoded record at index %d: %w", index, err)
}
}

Expand Down
4 changes: 3 additions & 1 deletion receiver/awsfirehosereceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -162,7 +163,8 @@ func TestFirehoseRequest(t *testing.T) {
body: testFirehoseRequest(testFirehoseRequestID, []firehoseRecord{
{Data: "XXXXXaGVsbG8="},
}),
wantStatusCode: http.StatusOK, // having an invalid record does not stop the request
wantStatusCode: http.StatusBadRequest,
wantErr: fmt.Errorf("unable to base64 decode the record at index 0: %w", base64.CorruptInputError(12)),
},
"WithValidRecords": {
body: testFirehoseRequest(testFirehoseRequestID, []firehoseRecord{
Expand Down

0 comments on commit f1ad0ff

Please sign in to comment.