Skip to content

Commit

Permalink
drpcmetadata: encode and decode metadata (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yingrong Zhao authored Mar 20, 2020
1 parent d44aecc commit 3311ebb
Show file tree
Hide file tree
Showing 14 changed files with 400 additions and 14 deletions.
35 changes: 31 additions & 4 deletions drpcconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"storj.io/drpc"
"storj.io/drpc/drpcmanager"
"storj.io/drpc/drpcmetadata"
"storj.io/drpc/drpcstream"
"storj.io/drpc/drpcwire"
)
Expand Down Expand Up @@ -66,6 +67,14 @@ func (c *Conn) Invoke(ctx context.Context, rpc string, in, out drpc.Message) (er
mon.Event("outgoing_requests")
mon.Event("outgoing_invokes")

var metadata []byte
if md, ok := drpcmetadata.Get(ctx); ok {
metadata, err = drpcmetadata.Encode(metadata, md)
if err != nil {
return err
}
}

data, err := proto.Marshal(in)
if err != nil {
return errs.Wrap(err)
Expand All @@ -77,13 +86,18 @@ func (c *Conn) Invoke(ctx context.Context, rpc string, in, out drpc.Message) (er
}
defer func() { err = errs.Combine(err, stream.Close()) }()

if err := c.doInvoke(stream, []byte(rpc), data, out); err != nil {
if err := c.doInvoke(stream, []byte(rpc), data, metadata, out); err != nil {
return err
}
return nil
}

func (c *Conn) doInvoke(stream *drpcstream.Stream, rpc, data []byte, out drpc.Message) (err error) {
func (c *Conn) doInvoke(stream *drpcstream.Stream, rpc, data []byte, metadata []byte, out drpc.Message) (err error) {
if len(metadata) > 0 {
if err := stream.RawWrite(drpcwire.KindInvokeMetadata, metadata); err != nil {
return err
}
}
if err := stream.RawWrite(drpcwire.KindInvoke, rpc); err != nil {
return err
}
Expand All @@ -107,18 +121,31 @@ func (c *Conn) NewStream(ctx context.Context, rpc string) (_ drpc.Stream, err er
mon.Event("outgoing_requests")
mon.Event("outgoing_streams")

var metadata []byte
if md, ok := drpcmetadata.Get(ctx); ok {
metadata, err = drpcmetadata.Encode(metadata, md)
if err != nil {
return nil, err
}
}

stream, err := c.man.NewClientStream(ctx)
if err != nil {
return nil, err
}

if err := c.doNewStream(stream, []byte(rpc)); err != nil {
if err := c.doNewStream(stream, []byte(rpc), metadata); err != nil {
return nil, errs.Combine(err, stream.Close())
}
return stream, nil
}

func (c *Conn) doNewStream(stream *drpcstream.Stream, rpc []byte) error {
func (c *Conn) doNewStream(stream *drpcstream.Stream, rpc []byte, metadata []byte) error {
if len(metadata) > 0 {
if err := stream.RawWrite(drpcwire.KindInvokeMetadata, metadata); err != nil {
return err
}
}
if err := stream.RawWrite(drpcwire.KindInvoke, rpc); err != nil {
return err
}
Expand Down
13 changes: 8 additions & 5 deletions drpcmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
"sync"

"github.com/zeebo/errs"

"storj.io/drpc"
"storj.io/drpc/drpcctx"
"storj.io/drpc/drpcdebug"
"storj.io/drpc/drpcmetadata"
"storj.io/drpc/drpcsignal"
"storj.io/drpc/drpcstream"
"storj.io/drpc/drpcwire"
Expand Down Expand Up @@ -190,15 +190,18 @@ func (m *Manager) NewServerStream(ctx context.Context) (stream *drpcstream.Strea
continue

case drpcwire.KindInvoke:
streamCtx := m.ctx
if metadata.ID.Stream == pkt.ID.Stream {
// TODO: parse and attach metadata from metadata.Data
_ = metadata.Data
md, err := drpcmetadata.Decode(metadata.Data)
if err != nil {
return nil, "", err
}
streamCtx = drpcmetadata.AddPairs(streamCtx, md)
}

stream = drpcstream.NewWithOptions(m.ctx, pkt.ID.Stream, m.wr, m.opts.Stream)
stream = drpcstream.NewWithOptions(streamCtx, pkt.ID.Stream, m.wr, m.opts.Stream)
go m.manageStream(ctx, stream)
return stream, string(pkt.Data), nil

default:
// we ignore packets that arent invokes because perhaps older streams have
// messages in the queue sent concurrently with our notification to them
Expand Down
37 changes: 37 additions & 0 deletions drpcmetadata/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# package drpcmetadata

`import "storj.io/drpc/drpcmetadata"`

Package drpcmetadata define the structure of the metadata supported by drpc
library.

## Usage

#### func Add

```go
func Add(ctx context.Context, key, value string) context.Context
```
Add associates a key/value pair on the context.

#### func AddPairs

```go
func AddPairs(ctx context.Context, md map[string]string) context.Context
```
AddPairs attaches metadata onto a context and return the context.

#### func Decode

```go
func Decode(data []byte) (*invoke.InvokeMetadata, error)
```
Decode translate byte form of metadata into metadata struct defined by protobuf.

#### func Encode

```go
func Encode(buffer []byte) ([]byte, error)
```
Encode generates byte form of the metadata and appends it onto the passed in
buffer.
5 changes: 5 additions & 0 deletions drpcmetadata/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.

// Package drpcmetadata define the structure of the metadata supported by drpc library.
package drpcmetadata
80 changes: 80 additions & 0 deletions drpcmetadata/invoke/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# package invoke

`import "storj.io/drpc/drpcmetadata/invoke"`

Package invoke defines the proto messages exposed by drpc for sending metadata
across the wire.

## Usage

#### type InvokeMetadata

```go
type InvokeMetadata struct {
Data map[string]string `protobuf:"bytes,2,rep,name=data,proto3" json:"data,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
```


#### func (*InvokeMetadata) Descriptor

```go
func (*InvokeMetadata) Descriptor() ([]byte, []int)
```

#### func (*InvokeMetadata) GetData

```go
func (m *InvokeMetadata) GetData() map[string]string
```

#### func (*InvokeMetadata) ProtoMessage

```go
func (*InvokeMetadata) ProtoMessage()
```

#### func (*InvokeMetadata) Reset

```go
func (m *InvokeMetadata) Reset()
```

#### func (*InvokeMetadata) String

```go
func (m *InvokeMetadata) String() string
```

#### func (*InvokeMetadata) XXX_DiscardUnknown

```go
func (m *InvokeMetadata) XXX_DiscardUnknown()
```

#### func (*InvokeMetadata) XXX_Marshal

```go
func (m *InvokeMetadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
```

#### func (*InvokeMetadata) XXX_Merge

```go
func (m *InvokeMetadata) XXX_Merge(src proto.Message)
```

#### func (*InvokeMetadata) XXX_Size

```go
func (m *InvokeMetadata) XXX_Size() int
```

#### func (*InvokeMetadata) XXX_Unmarshal

```go
func (m *InvokeMetadata) XXX_Unmarshal(b []byte) error
```
8 changes: 8 additions & 0 deletions drpcmetadata/invoke/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.

// Package invoke defines the proto messages exposed by drpc for
// sending metadata across the wire.
package invoke

//go:generate bash -c "go install storj.io/drpc/cmd/protoc-gen-drpc && protoc --drpc_out=plugins=drpc:. metadata.proto"
79 changes: 79 additions & 0 deletions drpcmetadata/invoke/metadata.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions drpcmetadata/invoke/metadata.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.

syntax = "proto3";
option go_package = "invoke";

package metadata;

message Metadata {
map<string, string> data = 1;
}
Loading

0 comments on commit 3311ebb

Please sign in to comment.