Skip to content

Commit

Permalink
drpcwire: expose buffer size on Reader (#30)
Browse files Browse the repository at this point in the history
The maximum was hardcoded at 4MB. This allows it to
be configurable for larger payloads. An option is
exposed in `dprcmanager.Options`.
  • Loading branch information
kylecarbs authored Apr 25, 2022
1 parent 41e8fa5 commit 56b3e4e
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
5 changes: 4 additions & 1 deletion drpcmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type Options struct {
// flushing. Normal writes to streams typically issue a flush explicitly.
WriterBufferSize int

// Reader are passed to any readers the manager creates.
Reader drpcwire.ReaderOptions

// Stream are passed to any streams the manager creates.
Stream drpcstream.Options

Expand Down Expand Up @@ -78,7 +81,7 @@ func NewWithOptions(tr drpc.Transport, opts Options) *Manager {
m := &Manager{
tr: tr,
wr: drpcwire.NewWriter(tr, opts.WriterBufferSize),
rd: drpcwire.NewReader(tr),
rd: drpcwire.NewReaderWithOptions(tr, opts.Reader),
opts: opts,

pkts: make(chan drpcwire.Packet),
Expand Down
21 changes: 20 additions & 1 deletion drpcwire/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,16 @@ import (
"storj.io/drpc"
)

// ReaderOptions controls configuration settings for a reader.
type ReaderOptions struct {
// MaximumBufferSize controls the maximum size of buffered
// packet data.
MaximumBufferSize int
}

// Reader reconstructs packets from frames read from an io.Reader.
type Reader struct {
opts ReaderOptions
r io.Reader
curr []byte
buf []byte
Expand All @@ -21,7 +29,18 @@ const maximumFrameSize = 4<<20 + 1 + 9 + 9 + 9

// NewReader constructs a Reader to read Packets from the io.Reader.
func NewReader(r io.Reader) *Reader {
return NewReaderWithOptions(r, ReaderOptions{})
}

// NewReaderWithOptions constructs a Reader to read Packets from
// the io.Reader. It uses the provided options to manage buffering.
func NewReaderWithOptions(r io.Reader, opts ReaderOptions) *Reader {
if opts.MaximumBufferSize == 0 {
// Default to 4MB.
opts.MaximumBufferSize = 4 << 20
}
return &Reader{
opts: opts,
r: r,
curr: make([]byte, 0, 64*1024),
id: ID{Stream: 1, Message: 1},
Expand Down Expand Up @@ -116,7 +135,7 @@ func (r *Reader) ReadPacketUsing(buf []byte) (pkt Packet, err error) {
pkt.Data = append(pkt.Data, fr.Data...)

switch {
case len(pkt.Data) > 4<<20:
case len(pkt.Data) > r.opts.MaximumBufferSize:
return Packet{}, drpc.ProtocolError.New("data overflow")

case fr.Done:
Expand Down

0 comments on commit 56b3e4e

Please sign in to comment.