Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

drpcwire: expose buffer size on Reader #30

Merged
merged 1 commit into from
Apr 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion drpcmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type Options struct {
// flushing. Normal writes to streams typically issue a flush explicitly.
WriterBufferSize int

// Reader are passed to any readers the manager creates.
Reader drpcwire.ReaderOptions

// Stream are passed to any streams the manager creates.
Stream drpcstream.Options

Expand Down Expand Up @@ -78,7 +81,7 @@ func NewWithOptions(tr drpc.Transport, opts Options) *Manager {
m := &Manager{
tr: tr,
wr: drpcwire.NewWriter(tr, opts.WriterBufferSize),
rd: drpcwire.NewReader(tr),
rd: drpcwire.NewReaderWithOptions(tr, opts.Reader),
opts: opts,

pkts: make(chan drpcwire.Packet),
Expand Down
21 changes: 20 additions & 1 deletion drpcwire/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,16 @@ import (
"storj.io/drpc"
)

// ReaderOptions controls configuration settings for a reader.
type ReaderOptions struct {
// MaximumBufferSize controls the maximum size of buffered
// packet data.
MaximumBufferSize int
}

// Reader reconstructs packets from frames read from an io.Reader.
type Reader struct {
opts ReaderOptions
r io.Reader
curr []byte
buf []byte
Expand All @@ -21,7 +29,18 @@ const maximumFrameSize = 4<<20 + 1 + 9 + 9 + 9

// NewReader constructs a Reader to read Packets from the io.Reader.
func NewReader(r io.Reader) *Reader {
return NewReaderWithOptions(r, ReaderOptions{})
}

// NewReaderWithOptions constructs a Reader to read Packets from
// the io.Reader. It uses the provided options to manage buffering.
func NewReaderWithOptions(r io.Reader, opts ReaderOptions) *Reader {
if opts.MaximumBufferSize == 0 {
// Default to 4MB.
opts.MaximumBufferSize = 4 << 20
}
return &Reader{
opts: opts,
r: r,
curr: make([]byte, 0, 64*1024),
id: ID{Stream: 1, Message: 1},
Expand Down Expand Up @@ -116,7 +135,7 @@ func (r *Reader) ReadPacketUsing(buf []byte) (pkt Packet, err error) {
pkt.Data = append(pkt.Data, fr.Data...)

switch {
case len(pkt.Data) > 4<<20:
case len(pkt.Data) > r.opts.MaximumBufferSize:
return Packet{}, drpc.ProtocolError.New("data overflow")

case fr.Done:
Expand Down