diff --git a/drpcconn/conn.go b/drpcconn/conn.go index 9a81685..1c7c5df 100644 --- a/drpcconn/conn.go +++ b/drpcconn/conn.go @@ -11,6 +11,7 @@ import ( "storj.io/drpc" "storj.io/drpc/drpcmanager" + "storj.io/drpc/drpcmetadata" "storj.io/drpc/drpcstream" "storj.io/drpc/drpcwire" ) @@ -66,6 +67,14 @@ func (c *Conn) Invoke(ctx context.Context, rpc string, in, out drpc.Message) (er mon.Event("outgoing_requests") mon.Event("outgoing_invokes") + var metadata []byte + if md, ok := drpcmetadata.Get(ctx); ok { + metadata, err = drpcmetadata.Encode(metadata, md) + if err != nil { + return err + } + } + data, err := proto.Marshal(in) if err != nil { return errs.Wrap(err) @@ -77,13 +86,18 @@ func (c *Conn) Invoke(ctx context.Context, rpc string, in, out drpc.Message) (er } defer func() { err = errs.Combine(err, stream.Close()) }() - if err := c.doInvoke(stream, []byte(rpc), data, out); err != nil { + if err := c.doInvoke(stream, []byte(rpc), data, metadata, out); err != nil { return err } return nil } -func (c *Conn) doInvoke(stream *drpcstream.Stream, rpc, data []byte, out drpc.Message) (err error) { +func (c *Conn) doInvoke(stream *drpcstream.Stream, rpc, data []byte, metadata []byte, out drpc.Message) (err error) { + if len(metadata) > 0 { + if err := stream.RawWrite(drpcwire.KindInvokeMetadata, metadata); err != nil { + return err + } + } if err := stream.RawWrite(drpcwire.KindInvoke, rpc); err != nil { return err } @@ -107,18 +121,31 @@ func (c *Conn) NewStream(ctx context.Context, rpc string) (_ drpc.Stream, err er mon.Event("outgoing_requests") mon.Event("outgoing_streams") + var metadata []byte + if md, ok := drpcmetadata.Get(ctx); ok { + metadata, err = drpcmetadata.Encode(metadata, md) + if err != nil { + return nil, err + } + } + stream, err := c.man.NewClientStream(ctx) if err != nil { return nil, err } - if err := c.doNewStream(stream, []byte(rpc)); err != nil { + if err := c.doNewStream(stream, []byte(rpc), metadata); err != nil { return nil, errs.Combine(err, stream.Close()) } return stream, nil } -func (c *Conn) doNewStream(stream *drpcstream.Stream, rpc []byte) error { +func (c *Conn) doNewStream(stream *drpcstream.Stream, rpc []byte, metadata []byte) error { + if len(metadata) > 0 { + if err := stream.RawWrite(drpcwire.KindInvokeMetadata, metadata); err != nil { + return err + } + } if err := stream.RawWrite(drpcwire.KindInvoke, rpc); err != nil { return err } diff --git a/drpcmanager/manager.go b/drpcmanager/manager.go index 544e054..e4cd3de 100644 --- a/drpcmanager/manager.go +++ b/drpcmanager/manager.go @@ -9,10 +9,10 @@ import ( "sync" "github.com/zeebo/errs" - "storj.io/drpc" "storj.io/drpc/drpcctx" "storj.io/drpc/drpcdebug" + "storj.io/drpc/drpcmetadata" "storj.io/drpc/drpcsignal" "storj.io/drpc/drpcstream" "storj.io/drpc/drpcwire" @@ -190,15 +190,18 @@ func (m *Manager) NewServerStream(ctx context.Context) (stream *drpcstream.Strea continue case drpcwire.KindInvoke: + streamCtx := m.ctx if metadata.ID.Stream == pkt.ID.Stream { - // TODO: parse and attach metadata from metadata.Data - _ = metadata.Data + md, err := drpcmetadata.Decode(metadata.Data) + if err != nil { + return nil, "", err + } + streamCtx = drpcmetadata.AddPairs(streamCtx, md) } - stream = drpcstream.NewWithOptions(m.ctx, pkt.ID.Stream, m.wr, m.opts.Stream) + stream = drpcstream.NewWithOptions(streamCtx, pkt.ID.Stream, m.wr, m.opts.Stream) go m.manageStream(ctx, stream) return stream, string(pkt.Data), nil - default: // we ignore packets that arent invokes because perhaps older streams have // messages in the queue sent concurrently with our notification to them diff --git a/drpcmetadata/README.md b/drpcmetadata/README.md new file mode 100644 index 0000000..70cfcef --- /dev/null +++ b/drpcmetadata/README.md @@ -0,0 +1,37 @@ +# package drpcmetadata + +`import "storj.io/drpc/drpcmetadata"` + +Package drpcmetadata define the structure of the metadata supported by drpc +library. + +## Usage + +#### func Add + +```go +func Add(ctx context.Context, key, value string) context.Context +``` +Add associates a key/value pair on the context. + +#### func AddPairs + +```go +func AddPairs(ctx context.Context, md map[string]string) context.Context +``` +AddPairs attaches metadata onto a context and return the context. + +#### func Decode + +```go +func Decode(data []byte) (*invoke.InvokeMetadata, error) +``` +Decode translate byte form of metadata into metadata struct defined by protobuf. + +#### func Encode + +```go +func Encode(buffer []byte) ([]byte, error) +``` +Encode generates byte form of the metadata and appends it onto the passed in +buffer. diff --git a/drpcmetadata/doc.go b/drpcmetadata/doc.go new file mode 100644 index 0000000..61f9a21 --- /dev/null +++ b/drpcmetadata/doc.go @@ -0,0 +1,5 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +// Package drpcmetadata define the structure of the metadata supported by drpc library. +package drpcmetadata diff --git a/drpcmetadata/invoke/README.md b/drpcmetadata/invoke/README.md new file mode 100644 index 0000000..e416ee5 --- /dev/null +++ b/drpcmetadata/invoke/README.md @@ -0,0 +1,80 @@ +# package invoke + +`import "storj.io/drpc/drpcmetadata/invoke"` + +Package invoke defines the proto messages exposed by drpc for sending metadata +across the wire. + +## Usage + +#### type InvokeMetadata + +```go +type InvokeMetadata struct { + Data map[string]string `protobuf:"bytes,2,rep,name=data,proto3" json:"data,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} +``` + + +#### func (*InvokeMetadata) Descriptor + +```go +func (*InvokeMetadata) Descriptor() ([]byte, []int) +``` + +#### func (*InvokeMetadata) GetData + +```go +func (m *InvokeMetadata) GetData() map[string]string +``` + +#### func (*InvokeMetadata) ProtoMessage + +```go +func (*InvokeMetadata) ProtoMessage() +``` + +#### func (*InvokeMetadata) Reset + +```go +func (m *InvokeMetadata) Reset() +``` + +#### func (*InvokeMetadata) String + +```go +func (m *InvokeMetadata) String() string +``` + +#### func (*InvokeMetadata) XXX_DiscardUnknown + +```go +func (m *InvokeMetadata) XXX_DiscardUnknown() +``` + +#### func (*InvokeMetadata) XXX_Marshal + +```go +func (m *InvokeMetadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) +``` + +#### func (*InvokeMetadata) XXX_Merge + +```go +func (m *InvokeMetadata) XXX_Merge(src proto.Message) +``` + +#### func (*InvokeMetadata) XXX_Size + +```go +func (m *InvokeMetadata) XXX_Size() int +``` + +#### func (*InvokeMetadata) XXX_Unmarshal + +```go +func (m *InvokeMetadata) XXX_Unmarshal(b []byte) error +``` diff --git a/drpcmetadata/invoke/doc.go b/drpcmetadata/invoke/doc.go new file mode 100644 index 0000000..183dafc --- /dev/null +++ b/drpcmetadata/invoke/doc.go @@ -0,0 +1,8 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +// Package invoke defines the proto messages exposed by drpc for +// sending metadata across the wire. +package invoke + +//go:generate bash -c "go install storj.io/drpc/cmd/protoc-gen-drpc && protoc --drpc_out=plugins=drpc:. metadata.proto" diff --git a/drpcmetadata/invoke/metadata.pb.go b/drpcmetadata/invoke/metadata.pb.go new file mode 100644 index 0000000..df224b5 --- /dev/null +++ b/drpcmetadata/invoke/metadata.pb.go @@ -0,0 +1,79 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: metadata.proto + +package invoke + +import ( + fmt "fmt" + proto "github.com/gogo/protobuf/proto" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +type Metadata struct { + Data map[string]string `protobuf:"bytes,1,rep,name=data,proto3" json:"data,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Metadata) Reset() { *m = Metadata{} } +func (m *Metadata) String() string { return proto.CompactTextString(m) } +func (*Metadata) ProtoMessage() {} +func (*Metadata) Descriptor() ([]byte, []int) { + return fileDescriptor_56d9f74966f40d04, []int{0} +} +func (m *Metadata) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Metadata.Unmarshal(m, b) +} +func (m *Metadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Metadata.Marshal(b, m, deterministic) +} +func (m *Metadata) XXX_Merge(src proto.Message) { + xxx_messageInfo_Metadata.Merge(m, src) +} +func (m *Metadata) XXX_Size() int { + return xxx_messageInfo_Metadata.Size(m) +} +func (m *Metadata) XXX_DiscardUnknown() { + xxx_messageInfo_Metadata.DiscardUnknown(m) +} + +var xxx_messageInfo_Metadata proto.InternalMessageInfo + +func (m *Metadata) GetData() map[string]string { + if m != nil { + return m.Data + } + return nil +} + +func init() { + proto.RegisterType((*Metadata)(nil), "metadata.Metadata") + proto.RegisterMapType((map[string]string)(nil), "metadata.Metadata.DataEntry") +} + +func init() { proto.RegisterFile("metadata.proto", fileDescriptor_56d9f74966f40d04) } + +var fileDescriptor_56d9f74966f40d04 = []byte{ + // 138 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcb, 0x4d, 0x2d, 0x49, + 0x4c, 0x49, 0x2c, 0x49, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x80, 0xf1, 0x95, 0x4a, + 0xb9, 0x38, 0x7c, 0xa1, 0x6c, 0x21, 0x03, 0x2e, 0x16, 0x10, 0x2d, 0xc1, 0xa8, 0xc0, 0xac, 0xc1, + 0x6d, 0x24, 0xa3, 0x07, 0xd7, 0x04, 0x53, 0xa1, 0xe7, 0x92, 0x58, 0x92, 0xe8, 0x9a, 0x57, 0x52, + 0x54, 0x19, 0x04, 0x56, 0x29, 0x65, 0xce, 0xc5, 0x09, 0x17, 0x12, 0x12, 0xe0, 0x62, 0xce, 0x4e, + 0xad, 0x94, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0c, 0x02, 0x31, 0x85, 0x44, 0xb8, 0x58, 0xcb, 0x12, + 0x73, 0x4a, 0x53, 0x25, 0x98, 0xc0, 0x62, 0x10, 0x8e, 0x15, 0x93, 0x05, 0xa3, 0x13, 0x47, 0x14, + 0x5b, 0x66, 0x5e, 0x59, 0x7e, 0x76, 0x6a, 0x12, 0x1b, 0xd8, 0x45, 0xc6, 0x80, 0x00, 0x00, 0x00, + 0xff, 0xff, 0xc2, 0xda, 0x43, 0xa9, 0xa3, 0x00, 0x00, 0x00, +} diff --git a/drpcmetadata/invoke/metadata.proto b/drpcmetadata/invoke/metadata.proto new file mode 100644 index 0000000..0c73ad0 --- /dev/null +++ b/drpcmetadata/invoke/metadata.proto @@ -0,0 +1,11 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +syntax = "proto3"; +option go_package = "invoke"; + +package metadata; + +message Metadata { + map data = 1; +} diff --git a/drpcmetadata/metadata.go b/drpcmetadata/metadata.go new file mode 100644 index 0000000..5918838 --- /dev/null +++ b/drpcmetadata/metadata.go @@ -0,0 +1,78 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package drpcmetadata + +import ( + "context" + + "github.com/gogo/protobuf/proto" + "storj.io/drpc/drpcmetadata/invoke" +) + +// AddPairs attaches metadata onto a context and return the context. +func AddPairs(ctx context.Context, md map[string]string) context.Context { + if len(md) < 1 { + return ctx + } + + for key, val := range md { + ctx = Add(ctx, key, val) + } + + return ctx +} + +// Encode generates byte form of the metadata and appends it onto the passed in buffer. +func Encode(buffer []byte, md map[string]string) ([]byte, error) { + if len(md) < 1 { + return buffer, nil + } + + msg := invoke.Metadata{ + Data: md, + } + + msgBytes, err := proto.Marshal(&msg) + if err != nil { + return buffer, err + } + + buffer = append(buffer, msgBytes...) + + return buffer, nil +} + +// Decode translate byte form of metadata into key/value metadata. +func Decode(data []byte) (map[string]string, error) { + if len(data) < 1 { + return map[string]string{}, nil + } + + msg := invoke.Metadata{} + err := proto.Unmarshal(data, &msg) + if err != nil { + return nil, err + } + + return msg.Data, nil +} + +type metadataKey struct{} + +// Add associates a key/value pair on the context. +func Add(ctx context.Context, key, value string) context.Context { + metadata, ok := Get(ctx) + if !ok { + metadata = make(map[string]string) + ctx = context.WithValue(ctx, metadataKey{}, metadata) + } + metadata[key] = value + return ctx +} + +// Get returns all key/value pairs on the given context. +func Get(ctx context.Context) (map[string]string, bool) { + metadata, ok := ctx.Value(metadataKey{}).(map[string]string) + return metadata, ok +} diff --git a/drpcmetadata/metadata_test.go b/drpcmetadata/metadata_test.go new file mode 100644 index 0000000..9268627 --- /dev/null +++ b/drpcmetadata/metadata_test.go @@ -0,0 +1,55 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package drpcmetadata + +import ( + "testing" + + "github.com/gogo/protobuf/proto" + "github.com/zeebo/assert" + "storj.io/drpc/drpcmetadata/invoke" +) + +func TestEncode(t *testing.T) { + t.Run("Empty Metadata", func(t *testing.T) { + var buffer []byte + var metadata map[string]string + buf, err := Encode(buffer, metadata) + assert.Equal(t, len(buffer), len(buf)) + assert.NoError(t, err) + }) + + t.Run("With Metadata", func(t *testing.T) { + var buffer []byte + metadata := map[string]string{ + "test1": "a", + "test2": "b", + } + buf, err := Encode(buffer, metadata) + assert.NoError(t, err) + assert.That(t, len(buf) > 0) + }) +} + +func TestDecode(t *testing.T) { + t.Run("Empty Metadata", func(t *testing.T) { + var data []byte + metadata, err := Decode(data) + assert.NoError(t, err) + assert.Equal(t, 0, len(metadata)) + }) + + t.Run("With Metadata", func(t *testing.T) { + msg := invoke.Metadata{ + Data: map[string]string{ + "test": "a", + }, + } + data, err := proto.Marshal(&msg) + assert.NoError(t, err) + metadata, err := Decode(data) + assert.NoError(t, err) + assert.DeepEqual(t, msg.Data, metadata) + }) +} diff --git a/drpcwire/README.md b/drpcwire/README.md index e82e72d..04f4f41 100644 --- a/drpcwire/README.md +++ b/drpcwire/README.md @@ -74,6 +74,9 @@ type Frame struct { // Done is true if this is the last frame for the ID. Done bool + + // Control is true if the frame has the control bit set. + Control bool } ``` diff --git a/go.mod b/go.mod index 11a61bf..4c560eb 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.14 require ( github.com/gogo/protobuf v1.2.1 - github.com/spacemonkeygo/monkit/v3 v3.0.1 + github.com/spacemonkeygo/monkit/v3 v3.0.4 github.com/zeebo/assert v1.1.0 github.com/zeebo/errs v1.2.2 ) diff --git a/go.sum b/go.sum index 7dae0c7..ac65c59 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/spacemonkeygo/monkit/v3 v3.0.1 h1:mSZQU+LOFuN5KSUvE1EiU1lxlFcOz/r0N5Tz8z+TwN0= -github.com/spacemonkeygo/monkit/v3 v3.0.1/go.mod h1:JcK1pCbReQsOsMKF/POFSZCq7drXFybgGmbc27tuwes= +github.com/spacemonkeygo/monkit/v3 v3.0.4 h1:Ay+PZirv+qfd4sqcT+X/U3BnC7AcIaqp/IXh0oV36k8= +github.com/spacemonkeygo/monkit/v3 v3.0.4/go.mod h1:JcK1pCbReQsOsMKF/POFSZCq7drXFybgGmbc27tuwes= github.com/spacemonkeygo/monotime v0.0.0-20180824235756-e3f48a95f98a h1:8+cCjxhToanKmxLIbuyBNe2EnpgwhiivsIaRJstDRFA= github.com/spacemonkeygo/monotime v0.0.0-20180824235756-e3f48a95f98a/go.mod h1:ul4bvvnCOPZgq8w0nTkSmWVg/hauVpFS97Am1YM1XXo= github.com/zeebo/assert v1.1.0 h1:hU1L1vLTHsnO8x8c9KAR5GmM5QscxHg5RNU5z5qbUWY= diff --git a/internal/grpccompat/go.sum b/internal/grpccompat/go.sum index bd0b3ca..ce73e99 100644 --- a/internal/grpccompat/go.sum +++ b/internal/grpccompat/go.sum @@ -16,8 +16,8 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/spacemonkeygo/monkit/v3 v3.0.1 h1:mSZQU+LOFuN5KSUvE1EiU1lxlFcOz/r0N5Tz8z+TwN0= -github.com/spacemonkeygo/monkit/v3 v3.0.1/go.mod h1:JcK1pCbReQsOsMKF/POFSZCq7drXFybgGmbc27tuwes= +github.com/spacemonkeygo/monkit/v3 v3.0.4 h1:Ay+PZirv+qfd4sqcT+X/U3BnC7AcIaqp/IXh0oV36k8= +github.com/spacemonkeygo/monkit/v3 v3.0.4/go.mod h1:JcK1pCbReQsOsMKF/POFSZCq7drXFybgGmbc27tuwes= github.com/spacemonkeygo/monotime v0.0.0-20180824235756-e3f48a95f98a h1:8+cCjxhToanKmxLIbuyBNe2EnpgwhiivsIaRJstDRFA= github.com/spacemonkeygo/monotime v0.0.0-20180824235756-e3f48a95f98a/go.mod h1:ul4bvvnCOPZgq8w0nTkSmWVg/hauVpFS97Am1YM1XXo= github.com/zeebo/assert v1.1.0 h1:hU1L1vLTHsnO8x8c9KAR5GmM5QscxHg5RNU5z5qbUWY=