Skip to content

Commit

Permalink
Merge branch 'main' into fix/multiline_doc
Browse files Browse the repository at this point in the history
  • Loading branch information
nrwiersma authored Feb 5, 2025
2 parents 5045416 + e9a408d commit f96ccb5
Show file tree
Hide file tree
Showing 13 changed files with 298 additions and 51 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
uses: jandelgado/gcov2lcov-action@v1

- name: Coveralls
uses: coverallsapp/[email protected].4
uses: coverallsapp/[email protected].6
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
path-to-lcov: coverage.lcov
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,6 @@ additional features. This will be not be considered a breaking change.

## Who uses hamba/avro?

- [Apache Arrow for Go](https://github.com/apache/arrow/tree/main/go)
- [Apache Arrow for Go](https://github.com/apache/arrow-go)
- [confluent-kafka-go](https://github.com/confluentinc/confluent-kafka-go)
- [pulsar-client-go](https://github.com/apache/pulsar-client-go)
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/modern-go/reflect2 v1.0.2
github.com/stretchr/testify v1.9.0
golang.org/x/tools v0.28.0
golang.org/x/tools v0.29.0
)

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ golang.org/x/mod v0.22.0 h1:D4nJWe9zXqHOmWqj4VMOJhvzj7bEZg4wEYa759z1pH4=
golang.org/x/mod v0.22.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8=
golang.org/x/tools v0.28.0/go.mod h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw=
golang.org/x/tools v0.29.0 h1:Xx0h3TtM9rzQpQuR4dKLrdglAmCEN5Oi+P74JdhdzXE=
golang.org/x/tools v0.29.0/go.mod h1:KMQVMRsVxU6nHCFXrBPhDB8XncLNLM0lIy/F14RP588=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
22 changes: 16 additions & 6 deletions ocf/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,29 @@ const (
ZStandard CodecName = "zstandard"
)

func resolveCodec(name CodecName, lvl int) (Codec, error) {
type codecOptions struct {
DeflateCompressionLevel int
ZStandardOptions zstdOptions
}

type zstdOptions struct {
EOptions []zstd.EOption
DOptions []zstd.DOption
}

func resolveCodec(name CodecName, codecOpts codecOptions) (Codec, error) {
switch name {
case Null, "":
return &NullCodec{}, nil

case Deflate:
return &DeflateCodec{compLvl: lvl}, nil
return &DeflateCodec{compLvl: codecOpts.DeflateCompressionLevel}, nil

case Snappy:
return &SnappyCodec{}, nil

case ZStandard:
return newZStandardCodec(), nil
return newZStandardCodec(codecOpts.ZStandardOptions), nil

default:
return nil, fmt.Errorf("unknown codec %s", name)
Expand Down Expand Up @@ -132,9 +142,9 @@ type ZStandardCodec struct {
encoder *zstd.Encoder
}

func newZStandardCodec() *ZStandardCodec {
decoder, _ := zstd.NewReader(nil)
encoder, _ := zstd.NewWriter(nil)
func newZStandardCodec(opts zstdOptions) *ZStandardCodec {
decoder, _ := zstd.NewReader(nil, opts.DOptions...)
encoder, _ := zstd.NewWriter(nil, opts.EOptions...)
return &ZStandardCodec{
decoder: decoder,
encoder: encoder,
Expand Down
6 changes: 3 additions & 3 deletions ocf/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func BenchmarkZstdEncodeDecodeLowEntropyLong(b *testing.B) {

input := makeTestData(8762, func() byte { return 'a' })

codec, err := resolveCodec(ZStandard, 0)
codec, err := resolveCodec(ZStandard, codecOptions{})
require.NoError(b, err)

b.ReportAllocs()
Expand All @@ -74,7 +74,7 @@ func BenchmarkZstdEncodeDecodeLowEntropyLong(b *testing.B) {
func BenchmarkZstdEncodeDecodeHighEntropyLong(b *testing.B) {
input := makeTestData(8762, func() byte { return byte(rand.Uint32()) })

codec, err := resolveCodec(ZStandard, 0)
codec, err := resolveCodec(ZStandard, codecOptions{})
require.NoError(b, err)

b.ReportAllocs()
Expand All @@ -87,7 +87,7 @@ func BenchmarkZstdEncodeDecodeHighEntropyLong(b *testing.B) {
}

func verifyZstdEncodeDecode(t *testing.T, input []byte) {
codec, err := resolveCodec(ZStandard, 0)
codec, err := resolveCodec(ZStandard, codecOptions{})
require.NoError(t, err)

compressed := codec.Encode(input)
Expand Down
2 changes: 1 addition & 1 deletion ocf/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func ExampleNewDecoder() {
// Do something with the data
}

if dec.Error() != nil {
if err := dec.Error(); err != nil {
log.Fatal(err)
}
}
Expand Down
72 changes: 50 additions & 22 deletions ocf/ocf.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package ocf

import (
"bytes"
"compress/flate"
"crypto/rand"
"encoding/json"
"errors"
Expand All @@ -14,6 +15,7 @@ import (

"github.com/hamba/avro/v2"
"github.com/hamba/avro/v2/internal/bytesx"
"github.com/klauspost/compress/zstd"
)

const (
Expand Down Expand Up @@ -54,6 +56,7 @@ type Header struct {
type decoderConfig struct {
DecoderConfig avro.API
SchemaCache *avro.SchemaCache
CodecOptions codecOptions
}

// DecoderFunc represents a configuration function for Decoder.
Expand All @@ -74,6 +77,13 @@ func WithDecoderSchemaCache(cache *avro.SchemaCache) DecoderFunc {
}
}

// WithZStandardDecoderOptions sets the options for the ZStandard decoder.
func WithZStandardDecoderOptions(opts ...zstd.DOption) DecoderFunc {
return func(cfg *decoderConfig) {
cfg.CodecOptions.ZStandardOptions.DOptions = append(cfg.CodecOptions.ZStandardOptions.DOptions, opts...)
}
}

// Decoder reads and decodes Avro values from a container file.
type Decoder struct {
reader *avro.Reader
Expand All @@ -93,14 +103,17 @@ func NewDecoder(r io.Reader, opts ...DecoderFunc) (*Decoder, error) {
cfg := decoderConfig{
DecoderConfig: avro.DefaultConfig,
SchemaCache: avro.DefaultSchemaCache,
CodecOptions: codecOptions{
DeflateCompressionLevel: flate.DefaultCompression,
},
}
for _, opt := range opts {
opt(&cfg)
}

reader := avro.NewReader(r, 1024)

h, err := readHeader(reader, cfg.SchemaCache)
h, err := readHeader(reader, cfg.SchemaCache, cfg.CodecOptions)
if err != nil {
return nil, fmt.Errorf("decoder: %w", err)
}
Expand Down Expand Up @@ -174,7 +187,8 @@ func (d *Decoder) readBlock() int64 {
size := d.reader.ReadLong()

// Read the blocks data
if count > 0 {
switch {
case count > 0:
data := make([]byte, size)
d.reader.Read(data)

Expand All @@ -184,6 +198,11 @@ func (d *Decoder) readBlock() int64 {
}

d.resetReader.Reset(data)

case size > 0:
// Skip the block data when count is 0
data := make([]byte, size)
d.reader.Read(data)
}

// Read the sync.
Expand All @@ -197,14 +216,14 @@ func (d *Decoder) readBlock() int64 {
}

type encoderConfig struct {
BlockLength int
CodecName CodecName
CodecCompression int
Metadata map[string][]byte
Sync [16]byte
EncodingConfig avro.API
SchemaCache *avro.SchemaCache
SchemaMarshaler func(avro.Schema) ([]byte, error)
BlockLength int
CodecName CodecName
CodecOptions codecOptions
Metadata map[string][]byte
Sync [16]byte
EncodingConfig avro.API
SchemaCache *avro.SchemaCache
SchemaMarshaler func(avro.Schema) ([]byte, error)
}

// EncoderFunc represents a configuration function for Encoder.
Expand All @@ -229,7 +248,14 @@ func WithCodec(codec CodecName) EncoderFunc {
func WithCompressionLevel(compLvl int) EncoderFunc {
return func(cfg *encoderConfig) {
cfg.CodecName = Deflate
cfg.CodecCompression = compLvl
cfg.CodecOptions.DeflateCompressionLevel = compLvl
}
}

// WithZStandardEncoderOptions sets the options for the ZStandard encoder.
func WithZStandardEncoderOptions(opts ...zstd.EOption) EncoderFunc {
return func(cfg *encoderConfig) {
cfg.CodecOptions.ZStandardOptions.EOptions = append(cfg.CodecOptions.ZStandardOptions.EOptions, opts...)
}
}

Expand Down Expand Up @@ -316,7 +342,7 @@ func newEncoder(schema avro.Schema, w io.Writer, cfg encoderConfig) (*Encoder, e

if info.Size() > 0 {
reader := avro.NewReader(file, 1024)
h, err := readHeader(reader, cfg.SchemaCache)
h, err := readHeader(reader, cfg.SchemaCache, cfg.CodecOptions)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -354,7 +380,7 @@ func newEncoder(schema avro.Schema, w io.Writer, cfg encoderConfig) (*Encoder, e
_, _ = rand.Read(header.Sync[:])
}

codec, err := resolveCodec(cfg.CodecName, cfg.CodecCompression)
codec, err := resolveCodec(cfg.CodecName, cfg.CodecOptions)
if err != nil {
return nil, err
}
Expand All @@ -379,13 +405,15 @@ func newEncoder(schema avro.Schema, w io.Writer, cfg encoderConfig) (*Encoder, e

func computeEncoderConfig(opts []EncoderFunc) encoderConfig {
cfg := encoderConfig{
BlockLength: 100,
CodecName: Null,
CodecCompression: -1,
Metadata: map[string][]byte{},
EncodingConfig: avro.DefaultConfig,
SchemaCache: avro.DefaultSchemaCache,
SchemaMarshaler: DefaultSchemaMarshaler,
BlockLength: 100,
CodecName: Null,
CodecOptions: codecOptions{
DeflateCompressionLevel: flate.DefaultCompression,
},
Metadata: map[string][]byte{},
EncodingConfig: avro.DefaultConfig,
SchemaCache: avro.DefaultSchemaCache,
SchemaMarshaler: DefaultSchemaMarshaler,
}
for _, opt := range opts {
opt(&cfg)
Expand Down Expand Up @@ -469,7 +497,7 @@ type ocfHeader struct {
Sync [16]byte
}

func readHeader(reader *avro.Reader, schemaCache *avro.SchemaCache) (*ocfHeader, error) {
func readHeader(reader *avro.Reader, schemaCache *avro.SchemaCache, codecOpts codecOptions) (*ocfHeader, error) {
var h Header
reader.ReadVal(HeaderSchema, &h)
if reader.Error != nil {
Expand All @@ -484,7 +512,7 @@ func readHeader(reader *avro.Reader, schemaCache *avro.SchemaCache) (*ocfHeader,
return nil, err
}

codec, err := resolveCodec(CodecName(h.Meta[codecKey]), -1)
codec, err := resolveCodec(CodecName(h.Meta[codecKey]), codecOpts)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit f96ccb5

Please sign in to comment.