Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

drpcmetadata: encode and decode metadata #2

Merged
merged 15 commits into from
Mar 20, 2020
36 changes: 32 additions & 4 deletions drpcconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"storj.io/drpc"
"storj.io/drpc/drpcmanager"
"storj.io/drpc/drpcmetadata"
"storj.io/drpc/drpcstream"
"storj.io/drpc/drpcwire"
)
Expand Down Expand Up @@ -66,6 +67,14 @@ func (c *Conn) Invoke(ctx context.Context, rpc string, in, out drpc.Message) (er
mon.Event("outgoing_requests")
mon.Event("outgoing_invokes")

var metadata []byte
if md, ok := drpcmetadata.Get(ctx); ok {
metadata, err = drpcmetadata.Encode(metadata, md)
if err != nil {
return err
}
}

data, err := proto.Marshal(in)
if err != nil {
return errs.Wrap(err)
Expand All @@ -77,13 +86,18 @@ func (c *Conn) Invoke(ctx context.Context, rpc string, in, out drpc.Message) (er
}
defer func() { err = errs.Combine(err, stream.Close()) }()

if err := c.doInvoke(stream, []byte(rpc), data, out); err != nil {
if err := c.doInvoke(stream, []byte(rpc), data, metadata, out); err != nil {
return err
}
return nil
}

func (c *Conn) doInvoke(stream *drpcstream.Stream, rpc, data []byte, out drpc.Message) (err error) {
func (c *Conn) doInvoke(stream *drpcstream.Stream, rpc, data []byte, metadata []byte, out drpc.Message) (err error) {
if len(metadata) > 0 {
if err := stream.RawWrite(drpcwire.KindInvokeMetadata, metadata); err != nil {
return err
}
}
if err := stream.RawWrite(drpcwire.KindInvoke, rpc); err != nil {
return err
}
Expand All @@ -107,18 +121,32 @@ func (c *Conn) NewStream(ctx context.Context, rpc string) (_ drpc.Stream, err er
mon.Event("outgoing_requests")
mon.Event("outgoing_streams")

var metadata []byte
md, ok := drpcmetadata.Get(ctx)
if ok {
metadata, err = drpcmetadata.Encode(metadata, md)
if err != nil {
return nil, err
}
}

stream, err := c.man.NewClientStream(ctx)
if err != nil {
return nil, err
}

if err := c.doNewStream(stream, []byte(rpc)); err != nil {
if err := c.doNewStream(stream, []byte(rpc), metadata); err != nil {
return nil, errs.Combine(err, stream.Close())
}
return stream, nil
}

func (c *Conn) doNewStream(stream *drpcstream.Stream, rpc []byte) error {
func (c *Conn) doNewStream(stream *drpcstream.Stream, rpc []byte, metadata []byte) error {
if len(metadata) > 0 {
if err := stream.RawWrite(drpcwire.KindInvokeMetadata, metadata); err != nil {
return err
}
}
if err := stream.RawWrite(drpcwire.KindInvoke, rpc); err != nil {
return err
}
Expand Down
13 changes: 8 additions & 5 deletions drpcmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
"sync"

"github.com/zeebo/errs"

"storj.io/drpc"
"storj.io/drpc/drpcctx"
"storj.io/drpc/drpcdebug"
"storj.io/drpc/drpcmetadata"
"storj.io/drpc/drpcsignal"
"storj.io/drpc/drpcstream"
"storj.io/drpc/drpcwire"
Expand Down Expand Up @@ -190,15 +190,18 @@ func (m *Manager) NewServerStream(ctx context.Context) (stream *drpcstream.Strea
continue

case drpcwire.KindInvoke:
streamCtx := m.ctx
if metadata.ID.Stream == pkt.ID.Stream {
// TODO: parse and attach metadata from metadata.Data
_ = metadata.Data
msg, err := drpcmetadata.Decode(metadata.Data)
if err != nil {
return nil, "", err
}
streamCtx = drpcmetadata.AddPairs(streamCtx, msg.Data)
}

stream = drpcstream.NewWithOptions(m.ctx, pkt.ID.Stream, m.wr, m.opts.Stream)
stream = drpcstream.NewWithOptions(streamCtx, pkt.ID.Stream, m.wr, m.opts.Stream)
go m.manageStream(ctx, stream)
return stream, string(pkt.Data), nil

default:
// we ignore packets that arent invokes because perhaps older streams have
// messages in the queue sent concurrently with our notification to them
Expand Down
37 changes: 37 additions & 0 deletions drpcmetadata/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# package drpcmetadata

`import "storj.io/drpc/drpcmetadata"`

Package drpcmetadata define the structure of the metadata supported by drpc
library.

## Usage

#### func Add

```go
func Add(ctx context.Context, key, value string) context.Context
```
Add associates a key/value pair on the context.

#### func AddPairs

```go
func AddPairs(ctx context.Context, md map[string]string) context.Context
```
AddPairs attaches metadata onto a context and return the context.

#### func Decode

```go
func Decode(data []byte) (*invoke.InvokeMetadata, error)
```
Decode translate byte form of metadata into metadata struct defined by protobuf.

#### func Encode

```go
func Encode(buffer []byte) ([]byte, error)
```
Encode generates byte form of the metadata and appends it onto the passed in
buffer.
5 changes: 5 additions & 0 deletions drpcmetadata/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.

// Package drpcmetadata define the structure of the metadata supported by drpc library.
package drpcmetadata
80 changes: 80 additions & 0 deletions drpcmetadata/invoke/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# package invoke

`import "storj.io/drpc/drpcmetadata/invoke"`

Package invoke defines the proto messages exposed by drpc for sending metadata
across the wire.

## Usage

#### type InvokeMetadata

```go
type InvokeMetadata struct {
Data map[string]string `protobuf:"bytes,2,rep,name=data,proto3" json:"data,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
```


#### func (*InvokeMetadata) Descriptor

```go
func (*InvokeMetadata) Descriptor() ([]byte, []int)
```

#### func (*InvokeMetadata) GetData

```go
func (m *InvokeMetadata) GetData() map[string]string
```

#### func (*InvokeMetadata) ProtoMessage

```go
func (*InvokeMetadata) ProtoMessage()
```

#### func (*InvokeMetadata) Reset

```go
func (m *InvokeMetadata) Reset()
```

#### func (*InvokeMetadata) String

```go
func (m *InvokeMetadata) String() string
```

#### func (*InvokeMetadata) XXX_DiscardUnknown

```go
func (m *InvokeMetadata) XXX_DiscardUnknown()
```

#### func (*InvokeMetadata) XXX_Marshal

```go
func (m *InvokeMetadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
```

#### func (*InvokeMetadata) XXX_Merge

```go
func (m *InvokeMetadata) XXX_Merge(src proto.Message)
```

#### func (*InvokeMetadata) XXX_Size

```go
func (m *InvokeMetadata) XXX_Size() int
```

#### func (*InvokeMetadata) XXX_Unmarshal

```go
func (m *InvokeMetadata) XXX_Unmarshal(b []byte) error
```
8 changes: 8 additions & 0 deletions drpcmetadata/invoke/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.

// Package invoke defines the proto messages exposed by drpc for
// sending metadata across the wire.
package invoke

//go:generate bash -c "go install storj.io/drpc/cmd/protoc-gen-drpc && protoc --drpc_out=plugins=drpc:. metadata.proto"
79 changes: 79 additions & 0 deletions drpcmetadata/invoke/metadata.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions drpcmetadata/invoke/metadata.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.

syntax = "proto3";
option go_package = "invoke";

package metadata;

message Metadata {
map<string, string> data = 1;
}
Loading