diff --git a/drpcmanager/manager.go b/drpcmanager/manager.go index 5723abe..de9edc1 100644 --- a/drpcmanager/manager.go +++ b/drpcmanager/manager.go @@ -27,6 +27,9 @@ type Options struct { // flushing. Normal writes to streams typically issue a flush explicitly. WriterBufferSize int + // Reader are passed to any readers the manager creates. + Reader drpcwire.ReaderOptions + // Stream are passed to any streams the manager creates. Stream drpcstream.Options @@ -78,7 +81,7 @@ func NewWithOptions(tr drpc.Transport, opts Options) *Manager { m := &Manager{ tr: tr, wr: drpcwire.NewWriter(tr, opts.WriterBufferSize), - rd: drpcwire.NewReader(tr), + rd: drpcwire.NewReaderWithOptions(tr, opts.Reader), opts: opts, pkts: make(chan drpcwire.Packet), diff --git a/drpcwire/reader.go b/drpcwire/reader.go index 6dd6ec1..8a5cb67 100644 --- a/drpcwire/reader.go +++ b/drpcwire/reader.go @@ -9,8 +9,16 @@ import ( "storj.io/drpc" ) +// ReaderOptions controls configuration settings for a reader. +type ReaderOptions struct { + // MaximumBufferSize controls the maximum size of buffered + // packet data. + MaximumBufferSize int +} + // Reader reconstructs packets from frames read from an io.Reader. type Reader struct { + opts ReaderOptions r io.Reader curr []byte buf []byte @@ -21,7 +29,18 @@ const maximumFrameSize = 4<<20 + 1 + 9 + 9 + 9 // NewReader constructs a Reader to read Packets from the io.Reader. func NewReader(r io.Reader) *Reader { + return NewReaderWithOptions(r, ReaderOptions{}) +} + +// NewReaderWithOptions constructs a Reader to read Packets from +// the io.Reader. It uses the provided options to manage buffering. +func NewReaderWithOptions(r io.Reader, opts ReaderOptions) *Reader { + if opts.MaximumBufferSize == 0 { + // Default to 4MB. + opts.MaximumBufferSize = 4 << 20 + } return &Reader{ + opts: opts, r: r, curr: make([]byte, 0, 64*1024), id: ID{Stream: 1, Message: 1}, @@ -116,7 +135,7 @@ func (r *Reader) ReadPacketUsing(buf []byte) (pkt Packet, err error) { pkt.Data = append(pkt.Data, fr.Data...) switch { - case len(pkt.Data) > 4<<20: + case len(pkt.Data) > r.opts.MaximumBufferSize: return Packet{}, drpc.ProtocolError.New("data overflow") case fr.Done: