Skip to content

Commit

Permalink
add InvokeMetadata message and reserve control bit
Browse files Browse the repository at this point in the history
We need a way to send Metadata for an invoke such that older
servers will be able to read newer clients, so they must ignore
the metadata. Foruntately, when reading for a new Invoke, we
ignore any packets that are not of kind Invoke. That means that
newer clients can send whatever they want before the Invoke like
metadata.

It also reserves a bit of the control byte so that after everything
is all rolled out, we can use it if necessary to extend the
protocol. The current implementation will drop any frames that
have the control bit set, so that we can send arbitrary data
without future older clients breaking.

Change-Id: I51f007f9ef71e5f4e3f5ec2a83d1f61c69528454
  • Loading branch information
zeebo committed Mar 16, 2020
1 parent b92391a commit d44aecc
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 36 deletions.
31 changes: 23 additions & 8 deletions drpcmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ func (m *Manager) NewServerStream(ctx context.Context) (stream *drpcstream.Strea
return nil, "", err
}

var metadata drpcwire.Packet

for {
select {
case <-ctx.Done():
Expand All @@ -180,16 +182,29 @@ func (m *Manager) NewServerStream(ctx context.Context) (stream *drpcstream.Strea
return nil, "", m.term.Err()

case pkt := <-m.queue:
// we ignore packets that arent invokes because perhaps older streams have
// messages in the queue sent concurrently with our notification to them
// that the stream they were sent for is done.
if pkt.Kind != drpcwire.KindInvoke {
switch pkt.Kind {
case drpcwire.KindInvokeMetadata:
// keep track of any metadata being sent before an invoke so that we can
// include it if the stream id matches the eventual invoke.
metadata = pkt
continue
}

stream = drpcstream.NewWithOptions(m.ctx, pkt.ID.Stream, m.wr, m.opts.Stream)
go m.manageStream(ctx, stream)
return stream, string(pkt.Data), nil
case drpcwire.KindInvoke:
if metadata.ID.Stream == pkt.ID.Stream {
// TODO: parse and attach metadata from metadata.Data
_ = metadata.Data
}

stream = drpcstream.NewWithOptions(m.ctx, pkt.ID.Stream, m.wr, m.opts.Stream)
go m.manageStream(ctx, stream)
return stream, string(pkt.Data), nil

default:
// we ignore packets that arent invokes because perhaps older streams have
// messages in the queue sent concurrently with our notification to them
// that the stream they were sent for is done.
continue
}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions drpcwire/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ const (
// KindCloseSend is used to inform that no more messages will be sent.
// It has no body.
KindCloseSend Kind = 6 // body must be empty

// KindInvokeMetadata includes metadata about the next Invoke packet.
KindInvokeMetadata Kind = 7
)
```

Expand Down
16 changes: 13 additions & 3 deletions drpcwire/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ const (
// KindCloseSend is used to inform that no more messages will be sent.
// It has no body.
KindCloseSend Kind = 6 // body must be empty

// KindInvokeMetadata includes metadata about the next Invoke packet.
KindInvokeMetadata Kind = 7
)

//
Expand Down Expand Up @@ -75,6 +78,9 @@ type Frame struct {

// Done is true if this is the last frame for the ID.
Done bool

// Control is true if the frame has the control bit set.
Control bool
}

// ParseFrame attempts to parse a frame at the beginning of buf. If successful
Expand All @@ -90,8 +96,9 @@ func ParseFrame(buf []byte) (rem []byte, fr Frame, ok bool, err error) {
}

rem, control = buf[1:], buf[0]
fr.Done = control&1 > 0
fr.Kind = Kind(control >> 1)
fr.Done = (control & 0b00000001) > 0
fr.Control = (control & 0b10000000) > 0
fr.Kind = Kind((control & 0b01111110) >> 1)
rem, fr.ID.Stream, ok, err = ReadVarint(rem)
if !ok || err != nil {
goto bad
Expand All @@ -115,7 +122,10 @@ bad:
func AppendFrame(buf []byte, fr Frame) []byte {
control := byte(fr.Kind << 1)
if fr.Done {
control |= 1
control |= 0b00000001
}
if fr.Control {
control |= 0b10000000
}

out := buf
Expand Down
5 changes: 3 additions & 2 deletions drpcwire/packet_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions drpcwire/rand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,12 @@ func RandKind() Kind {
}

var payloadSize = map[Kind]func() int{
KindInvoke: func() int { return rand.Intn(1023) + 1 },
KindMessage: func() int { return rand.Intn(1023) + 1 },
KindError: func() int { return rand.Intn(1023) + 1 },
KindClose: func() int { return 0 },
KindCloseSend: func() int { return 0 },
KindInvoke: func() int { return rand.Intn(1023) + 1 },
KindMessage: func() int { return rand.Intn(1023) + 1 },
KindError: func() int { return rand.Intn(1023) + 1 },
KindClose: func() int { return 0 },
KindCloseSend: func() int { return 0 },
KindInvokeMetadata: func() int { return rand.Intn(1023) + 1 },
}

func RandFrame() Frame {
Expand Down
5 changes: 5 additions & 0 deletions drpcwire/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ func (s *Reader) ReadPacket() (pkt Packet, err error) {
return Packet{}, drpc.ProtocolError.Wrap(err)
case !ok, len(rem) > 0:
return Packet{}, drpc.InternalError.New("problem with scanner")
case fr.Control:
// Ignore any frames with the control bit set so that we can
// use it in the future to mean things to people who understand
// it.
continue
case fr.ID.Less(s.id):
return Packet{}, drpc.ProtocolError.New("id monotonicity violation")
case s.id.Less(fr.ID):
Expand Down
46 changes: 29 additions & 17 deletions drpcwire/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ func TestReader(t *testing.T) {
}
}

f := func(kind Kind, id uint64, data string, done bool) Frame {
f := func(kind Kind, id uint64, data string, done, control bool) Frame {
return Frame{
Data: []byte(data),
ID: ID{Stream: 1, Message: id},
Kind: kind,
Done: done,
Data: []byte(data),
ID: ID{Stream: 1, Message: id},
Kind: kind,
Done: done,
Control: control,
}
}

Expand All @@ -66,36 +67,36 @@ func TestReader(t *testing.T) {

megaFrames := make([]Frame, 0, 10*1024)
for i := 0; i < 10*1024; i++ {
megaFrames = append(megaFrames, f(KindMessage, 1, strings.Repeat("X", 1024), false))
megaFrames = append(megaFrames, f(KindMessage, 1, strings.Repeat("X", 1024), false, false))
}
megaFrames = append(megaFrames, f(KindMessage, 1, "", true))
megaFrames = append(megaFrames, f(KindMessage, 1, "", true, false))

cases := []testCase{
m(p(KindMessage, 1, "hello world"),
f(KindMessage, 1, "hello", false),
f(KindMessage, 1, " ", false),
f(KindMessage, 1, "world", true)),
f(KindMessage, 1, "hello", false, false),
f(KindMessage, 1, " ", false, false),
f(KindMessage, 1, "world", true, false)),

m(p(KindClose, 2, ""),
f(KindMessage, 1, "hello", false),
f(KindMessage, 1, " ", false),
f(KindClose, 2, "", true)),
f(KindMessage, 1, "hello", false, false),
f(KindMessage, 1, " ", false, false),
f(KindClose, 2, "", true, false)),

{
Packets: []Packet{
p(KindClose, 2, ""),
},
Frames: []Frame{
f(KindMessage, 1, "1", false),
f(KindClose, 2, "", true),
f(KindMessage, 1, "1", true),
f(KindMessage, 1, "1", false, false),
f(KindClose, 2, "", true, false),
f(KindMessage, 1, "1", true, false),
},
Error: "id monotonicity violation",
},

{ // a single frame that's too large
Packets: []Packet{},
Frames: []Frame{f(KindMessage, 1, strings.Repeat("X", 2<<20), true)},
Frames: []Frame{f(KindMessage, 1, strings.Repeat("X", 2<<20), true, false)},
Error: "token too long",
},

Expand All @@ -104,6 +105,17 @@ func TestReader(t *testing.T) {
Frames: megaFrames,
Error: "data overflow",
},

{ // Control bit is ignored
Packets: []Packet{
p(KindClose, 2, ""),
},
Frames: []Frame{
f(KindMessage, 1, "1", false, false),
f(KindClose, 2, "", true, false),
f(KindMessage, 1, "1", true, true),
},
},
}

for _, tc := range cases {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module storj.io/drpc

go 1.12
go 1.14

require (
github.com/gogo/protobuf v1.2.1
Expand Down

0 comments on commit d44aecc

Please sign in to comment.