Skip to content

Commit

Permalink
Merge commit 'e3a1aa2' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
kylecarbs committed Apr 24, 2022
2 parents 238feac + e3a1aa2 commit 550b607
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 15 deletions.
2 changes: 1 addition & 1 deletion drpcconn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestConn_InvokeFlushesSendClose(t *testing.T) {

ctx.Run(func(ctx context.Context) {
wr := drpcwire.NewWriter(ps, 64)
rd := drpcwire.NewReader(ps, 0)
rd := drpcwire.NewReader(ps)

_, _ = rd.ReadPacket() // Invoke
_, _ = rd.ReadPacket() // Message
Expand Down
7 changes: 3 additions & 4 deletions drpcmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ type Options struct {
// flushing. Normal writes to streams typically issue a flush explicitly.
WriterBufferSize int

// ReaderBufferSize controls the size of the buffer that we accept before
// erroring out the stream.
ReaderBufferSize int
// Reader are passed to any readers the manager creates.
Reader drpcwire.ReaderOptions

// Stream are passed to any streams the manager creates.
Stream drpcstream.Options
Expand Down Expand Up @@ -82,7 +81,7 @@ func NewWithOptions(tr drpc.Transport, opts Options) *Manager {
m := &Manager{
tr: tr,
wr: drpcwire.NewWriter(tr, opts.WriterBufferSize),
rd: drpcwire.NewReader(tr, opts.ReaderBufferSize),
rd: drpcwire.NewReaderWithOptions(tr, opts.Reader),
opts: opts,

pkts: make(chan drpcwire.Packet),
Expand Down
28 changes: 20 additions & 8 deletions drpcwire/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,16 @@ import (
"storj.io/drpc"
)

// ReaderOptions controls configuration settings for a reader.
type ReaderOptions struct {
// MaximumBufferSize controls the maximum size of buffered
// packet data.
MaximumBufferSize int
}

// Reader reconstructs packets from frames read from an io.Reader.
type Reader struct {
size int
opts ReaderOptions
r io.Reader
curr []byte
buf []byte
Expand All @@ -20,15 +27,20 @@ type Reader struct {

const maximumFrameSize = 4<<20 + 1 + 9 + 9 + 9

// NewReader constructs a Reader to read Packets from the io.Reader
// that will buffer up to size.
func NewReader(r io.Reader, size int) *Reader {
if size == 0 {
// NewReader constructs a Reader to read Packets from the io.Reader.
func NewReader(r io.Reader) *Reader {
return NewReaderWithOptions(r, ReaderOptions{})
}

// NewReaderWithOptions constructs a Reader to read Packets from
// the io.Reader. It uses the provided options to manage buffering.
func NewReaderWithOptions(r io.Reader, opts ReaderOptions) *Reader {
if opts.MaximumBufferSize == 0 {
// Default to 4MB.
size = 4 << 20
opts.MaximumBufferSize = 4 << 20
}
return &Reader{
size: size,
opts: opts,
r: r,
curr: make([]byte, 0, 64*1024),
id: ID{Stream: 1, Message: 1},
Expand Down Expand Up @@ -123,7 +135,7 @@ func (r *Reader) ReadPacketUsing(buf []byte) (pkt Packet, err error) {
pkt.Data = append(pkt.Data, fr.Data...)

switch {
case len(pkt.Data) > r.size:
case len(pkt.Data) > r.opts.MaximumBufferSize:
return Packet{}, drpc.ProtocolError.New("data overflow")

case fr.Done:
Expand Down
4 changes: 2 additions & 2 deletions drpcwire/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestReader(t *testing.T) {
buf = AppendFrame(buf, fr)
}

rd := NewReader(bytes.NewReader(buf), 0)
rd := NewReader(bytes.NewReader(buf))
for _, expPkt := range tc.Packets {
pkt, err := rd.ReadPacket()
assert.NoError(t, err)
Expand Down Expand Up @@ -192,7 +192,7 @@ func TestReaderRandomized(t *testing.T) {
// exact sequence of bytes, so we reset bid to generate
// the sequence again.
bid = 0
r := NewReader(bytes.NewBuffer(buf), 0)
r := NewReader(bytes.NewBuffer(buf))
for i := 1; ; i++ {
pkt, err := r.ReadPacket()
if errors.Is(err, io.EOF) {
Expand Down

0 comments on commit 550b607

Please sign in to comment.