Skip to content

Commit

Permalink
avro: fix timestamp overflows
Browse files Browse the repository at this point in the history
  • Loading branch information
nrwiersma authored Oct 29, 2020
1 parent 6888cf0 commit 06bedbc
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 3 deletions.
10 changes: 7 additions & 3 deletions codec_native.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,19 +383,23 @@ type timestampMillisCodec struct{}

func (c *timestampMillisCodec) Decode(ptr unsafe.Pointer, r *Reader) {
i := r.ReadLong()
*((*time.Time)(ptr)) = time.Unix(0, i*int64(time.Millisecond)).UTC()
sec := i / 1e3
nsec := (i - sec*1e3) * 1e6
*((*time.Time)(ptr)) = time.Unix(sec, nsec).UTC()
}

func (c *timestampMillisCodec) Encode(ptr unsafe.Pointer, w *Writer) {
t := *((*time.Time)(ptr))
w.WriteLong(t.UnixNano() / int64(time.Millisecond))
w.WriteLong(t.Unix()*1e3 + int64(t.Nanosecond()/1e6))
}

type timestampMicrosCodec struct{}

func (c *timestampMicrosCodec) Decode(ptr unsafe.Pointer, r *Reader) {
i := r.ReadLong()
*((*time.Time)(ptr)) = time.Unix(0, i*int64(time.Microsecond)).UTC()
sec := i / 1e6
nsec := (i - sec*1e6) * 1e3
*((*time.Time)(ptr)) = time.Unix(sec, nsec).UTC()
}

func (c *timestampMicrosCodec) Encode(ptr unsafe.Pointer, w *Writer) {
Expand Down
60 changes: 60 additions & 0 deletions decoder_native_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,36 @@ func TestDecoder_Time_TimestampMillis(t *testing.T) {
assert.Equal(t, time.Date(2020, 1, 2, 3, 4, 5, 0, time.UTC), got)
}

func TestDecoder_Time_TimestampMillisZero(t *testing.T) {
defer ConfigTeardown()

data := []byte{0xff, 0xdf, 0xe6, 0xa2, 0xe2, 0xa0, 0x1c}
schema := `{"type":"long","logicalType":"timestamp-millis"}`
dec, err := avro.NewDecoder(schema, bytes.NewReader(data))
assert.NoError(t, err)

var got time.Time
err = dec.Decode(&got)

assert.NoError(t, err)
assert.Equal(t, time.Time{}, got)
}

func TestDecoder_Time_TimestampMillisOneMillis(t *testing.T) {
defer ConfigTeardown()

data := []byte{0x02}
schema := `{"type":"long","logicalType":"timestamp-millis"}`
dec, err := avro.NewDecoder(schema, bytes.NewReader(data))
assert.NoError(t, err)

var got time.Time
err = dec.Decode(&got)

assert.NoError(t, err)
assert.Equal(t, time.Date(1970, 1, 1, 0, 0, 0, 1e6, time.UTC), got)
}

func TestDecoder_Time_TimestampMicros(t *testing.T) {
defer ConfigTeardown()

Expand All @@ -359,6 +389,36 @@ func TestDecoder_Time_TimestampMicros(t *testing.T) {
assert.Equal(t, time.Date(2020, 1, 2, 3, 4, 5, 0, time.UTC), got)
}

func TestDecoder_Time_TimestampMicrosZero(t *testing.T) {
defer ConfigTeardown()

data := []byte{0xff, 0xff, 0xdd, 0xf2, 0xdf, 0xff, 0xdf, 0xdc, 0x1}
schema := `{"type":"long","logicalType":"timestamp-micros"}`
dec, err := avro.NewDecoder(schema, bytes.NewReader(data))
assert.NoError(t, err)

var got time.Time
err = dec.Decode(&got)

assert.NoError(t, err)
assert.Equal(t, time.Time{}, got)
}

func TestDecoder_Time_TimestampMillisOneMicros(t *testing.T) {
defer ConfigTeardown()

data := []byte{0x02}
schema := `{"type":"long","logicalType":"timestamp-micros"}`
dec, err := avro.NewDecoder(schema, bytes.NewReader(data))
assert.NoError(t, err)

var got time.Time
err = dec.Decode(&got)

assert.NoError(t, err)
assert.Equal(t, time.Date(1970, 1, 1, 0, 0, 0, 1e3, time.UTC), got)
}

func TestDecoder_TimeInvalidSchema(t *testing.T) {
defer ConfigTeardown()

Expand Down
56 changes: 56 additions & 0 deletions encoder_native_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,34 @@ func TestEncoder_Time_TimestampMillis(t *testing.T) {
assert.Equal(t, []byte{0x90, 0xB2, 0xAE, 0xC3, 0xEC, 0x5B}, buf.Bytes())
}

func TestEncoder_Time_TimestampMillisZero(t *testing.T) {
defer ConfigTeardown()

schema := `{"type":"long","logicalType":"timestamp-millis"}`
buf := bytes.NewBuffer([]byte{})
enc, err := avro.NewEncoder(schema, buf)
assert.NoError(t, err)

err = enc.Encode(time.Time{})

assert.NoError(t, err)
assert.Equal(t, []byte{0xff, 0xdf, 0xe6, 0xa2, 0xe2, 0xa0, 0x1c}, buf.Bytes())
}

func TestEncoder_Time_TimestampMillisOneMillis(t *testing.T) {
defer ConfigTeardown()

schema := `{"type":"long","logicalType":"timestamp-millis"}`
buf := bytes.NewBuffer([]byte{})
enc, err := avro.NewEncoder(schema, buf)
assert.NoError(t, err)

err = enc.Encode(time.Date(1970, 1, 1, 0, 0, 0, 1e6, time.UTC))

assert.NoError(t, err)
assert.Equal(t, []byte{0x2}, buf.Bytes())
}

func TestEncoder_Time_TimestampMicros(t *testing.T) {
defer ConfigTeardown()

Expand All @@ -363,6 +391,34 @@ func TestEncoder_Time_TimestampMicros(t *testing.T) {
assert.Equal(t, []byte{0x80, 0xCD, 0xB7, 0xA2, 0xEE, 0xC7, 0xCD, 0x05}, buf.Bytes())
}

func TestEncoder_Time_TimestampMicrosZero(t *testing.T) {
defer ConfigTeardown()

schema := `{"type":"long","logicalType":"timestamp-micros"}`
buf := bytes.NewBuffer([]byte{})
enc, err := avro.NewEncoder(schema, buf)
assert.NoError(t, err)

err = enc.Encode(time.Time{})

assert.NoError(t, err)
assert.Equal(t, []byte{0xff, 0xff, 0xdd, 0xf2, 0xdf, 0xff, 0xdf, 0xdc, 0x1}, buf.Bytes())
}

func TestEncoder_Time_TimestampMillisOneMicros(t *testing.T) {
defer ConfigTeardown()

schema := `{"type":"long","logicalType":"timestamp-micros"}`
buf := bytes.NewBuffer([]byte{})
enc, err := avro.NewEncoder(schema, buf)
assert.NoError(t, err)

err = enc.Encode(time.Date(1970, 1, 1, 0, 0, 0, 1e3, time.UTC))

assert.NoError(t, err)
assert.Equal(t, []byte{0x2}, buf.Bytes())
}

func TestEncoder_TimeInvalidSchema(t *testing.T) {
defer ConfigTeardown()

Expand Down

0 comments on commit 06bedbc

Please sign in to comment.