Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

drpcmetadata: encode and decode metadata #2

Merged
merged 15 commits into from
Mar 20, 2020
59 changes: 31 additions & 28 deletions drpcconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,36 +67,37 @@ func (c *Conn) Invoke(ctx context.Context, rpc string, in, out drpc.Message) (er
mon.Event("outgoing_requests")
mon.Event("outgoing_invokes")

stream, err := c.man.NewClientStream(ctx)
if err != nil {
return err
}
defer func() { err = errs.Combine(err, stream.Close()) }()

invokeMetadata := make([]byte, 0)
metadata, ok := drpcmetadata.Get(ctx)
if ok {
invokeMetadata, err = metadata.Encode(invokeMetadata)
var metadata []byte
if md, ok := drpcmetadata.Get(ctx); ok {
metadata, err = drpcmetadata.Encode(metadata, md)
if err != nil {
return err
}
if err := stream.RawWrite(drpcwire.KindInvokeMetadata, invokeMetadata); err != nil {
return err
}
}

data, err := proto.Marshal(in)
if err != nil {
return errs.Wrap(err)
}

if err := c.doInvoke(stream, []byte(rpc), data, out); err != nil {
stream, err := c.man.NewClientStream(ctx)
if err != nil {
return err
}
defer func() { err = errs.Combine(err, stream.Close()) }()

if err := c.doInvoke(stream, []byte(rpc), data, metadata, out); err != nil {
return err
}
return nil
}

func (c *Conn) doInvoke(stream *drpcstream.Stream, rpc, data []byte, out drpc.Message) (err error) {
func (c *Conn) doInvoke(stream *drpcstream.Stream, rpc, data []byte, metadata []byte, out drpc.Message) (err error) {
if len(metadata) > 0 {
if err := stream.RawWrite(drpcwire.KindInvokeMetadata, metadata); err != nil {
return err
}
}
if err := stream.RawWrite(drpcwire.KindInvoke, rpc); err != nil {
return err
}
Expand All @@ -120,30 +121,32 @@ func (c *Conn) NewStream(ctx context.Context, rpc string) (_ drpc.Stream, err er
mon.Event("outgoing_requests")
mon.Event("outgoing_streams")

stream, err := c.man.NewClientStream(ctx)
if err != nil {
return nil, err
}

invokeMsg := make([]byte, 0)
metadata, ok := drpcmetadata.Get(ctx)
var metadata []byte
md, ok := drpcmetadata.Get(ctx)
if ok {
invokeMsg, err = metadata.Encode(invokeMsg)
metadata, err = drpcmetadata.Encode(metadata, md)
if err != nil {
return nil, err
}
if err := stream.RawWrite(drpcwire.KindInvokeMetadata, invokeMsg); err != nil {
return nil, err
}
}

if err := c.doNewStream(stream, []byte(rpc)); err != nil {
stream, err := c.man.NewClientStream(ctx)
if err != nil {
return nil, err
}

if err := c.doNewStream(stream, []byte(rpc), metadata); err != nil {
return nil, errs.Combine(err, stream.Close())
}
return stream, nil
}

func (c *Conn) doNewStream(stream *drpcstream.Stream, rpc []byte) error {
func (c *Conn) doNewStream(stream *drpcstream.Stream, rpc []byte, metadata []byte) error {
if len(metadata) > 0 {
if err := stream.RawWrite(drpcwire.KindInvokeMetadata, metadata); err != nil {
return err
}
}
if err := stream.RawWrite(drpcwire.KindInvoke, rpc); err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions drpcmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ func (m *Manager) NewServerStream(ctx context.Context) (stream *drpcstream.Strea
}

var metadata drpcwire.Packet
streamCtx := m.ctx

for {
select {
Expand All @@ -191,12 +190,13 @@ func (m *Manager) NewServerStream(ctx context.Context) (stream *drpcstream.Strea
continue

case drpcwire.KindInvoke:
streamCtx := m.ctx
if metadata.ID.Stream == pkt.ID.Stream {
msg, err := drpcmetadata.Decode(metadata.Data)
if err != nil {
return nil, "", err
}
streamCtx = drpcmetadata.New(msg.GetData()).AddPairs(streamCtx)
streamCtx = drpcmetadata.AddPairs(streamCtx, msg.Data)
}

stream = drpcstream.NewWithOptions(streamCtx, pkt.ID.Stream, m.wr, m.opts.Stream)
Expand Down
34 changes: 34 additions & 0 deletions drpcmetadata/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,37 @@
# package drpcmetadata

`import "storj.io/drpc/drpcmetadata"`

Package drpcmetadata define the structure of the metadata supported by drpc
library.

## Usage

#### func Add

```go
func Add(ctx context.Context, key, value string) context.Context
```
Add associates a key/value pair on the context.

#### func AddPairs

```go
func AddPairs(ctx context.Context, md map[string]string) context.Context
```
AddPairs attaches metadata onto a context and return the context.

#### func Decode

```go
func Decode(data []byte) (*invoke.InvokeMetadata, error)
```
Decode translate byte form of metadata into metadata struct defined by protobuf.

#### func Encode

```go
func Encode(buffer []byte) ([]byte, error)
```
Encode generates byte form of the metadata and appends it onto the passed in
buffer.
80 changes: 80 additions & 0 deletions drpcmetadata/invoke/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# package invoke

`import "storj.io/drpc/drpcmetadata/invoke"`

Package invoke defines the proto messages exposed by drpc for sending metadata
across the wire.

## Usage

#### type InvokeMetadata

```go
type InvokeMetadata struct {
Data map[string]string `protobuf:"bytes,2,rep,name=data,proto3" json:"data,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
```


#### func (*InvokeMetadata) Descriptor

```go
func (*InvokeMetadata) Descriptor() ([]byte, []int)
```

#### func (*InvokeMetadata) GetData

```go
func (m *InvokeMetadata) GetData() map[string]string
```

#### func (*InvokeMetadata) ProtoMessage

```go
func (*InvokeMetadata) ProtoMessage()
```

#### func (*InvokeMetadata) Reset

```go
func (m *InvokeMetadata) Reset()
```

#### func (*InvokeMetadata) String

```go
func (m *InvokeMetadata) String() string
```

#### func (*InvokeMetadata) XXX_DiscardUnknown

```go
func (m *InvokeMetadata) XXX_DiscardUnknown()
```

#### func (*InvokeMetadata) XXX_Marshal

```go
func (m *InvokeMetadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
```

#### func (*InvokeMetadata) XXX_Merge

```go
func (m *InvokeMetadata) XXX_Merge(src proto.Message)
```

#### func (*InvokeMetadata) XXX_Size

```go
func (m *InvokeMetadata) XXX_Size() int
```

#### func (*InvokeMetadata) XXX_Unmarshal

```go
func (m *InvokeMetadata) XXX_Unmarshal(b []byte) error
```
4 changes: 2 additions & 2 deletions drpcmetadata/proto/doc.go → drpcmetadata/invoke/doc.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.

// Package proto defines the proto messages exposed by drpc for
// Package invoke defines the proto messages exposed by drpc for
// sending metadata across the wire.
package proto
package invoke

//go:generate bash -c "go install storj.io/drpc/cmd/protoc-gen-drpc && protoc --drpc_out=plugins=drpc:. metadata.proto"
79 changes: 79 additions & 0 deletions drpcmetadata/invoke/metadata.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
// See LICENSE for copying information.

syntax = "proto3";
option go_package = "proto";
option go_package = "invoke";

package metadata;

message InvokeMetadata {
uint32 version = 1;
map<string, string> data = 2;
message Metadata {
map<string, string> data = 1;
}
Loading