Skip to content

Commit

Permalink
Merge branch 'main' into transport-comment
Browse files Browse the repository at this point in the history
  • Loading branch information
Erikvv authored Feb 23, 2022
2 parents 7decd61 + f6e3694 commit 5ac9cad
Show file tree
Hide file tree
Showing 48 changed files with 336 additions and 111 deletions.
2 changes: 2 additions & 0 deletions cmd/protoc-gen-go-drpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"

"google.golang.org/protobuf/compiler/protogen"
"google.golang.org/protobuf/types/pluginpb"
)

type config struct {
Expand All @@ -33,6 +34,7 @@ func main() {
}
generateFile(plugin, f, conf)
}
plugin.SupportedFeatures = uint64(pluginpb.CodeGeneratorResponse_FEATURE_PROTO3_OPTIONAL)
return nil
})
}
Expand Down
1 change: 1 addition & 0 deletions drpcdebug/log_disabled.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.

//go:build !debug
// +build !debug

package drpcdebug
Expand Down
5 changes: 5 additions & 0 deletions drpcserver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ Package drpcserver allows one to execute registered rpcs.
type Options struct {
// Manager controls the options we pass to the managers this server creates.
Manager drpcmanager.Options

// Log is called when errors happen that can not be returned up, like
// temporary network errors when accepting connections, or errors
// handling individual clients. It is not called if nil.
Log func(error)
}
```

Expand Down
40 changes: 29 additions & 11 deletions drpcserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package drpcserver
import (
"context"
"net"
"time"

"github.com/zeebo/errs"

Expand All @@ -20,6 +21,11 @@ import (
type Options struct {
// Manager controls the options we pass to the managers this server creates.
Manager drpcmanager.Options

// Log is called when errors happen that can not be returned up, like
// temporary network errors when accepting connections, or errors
// handling individual clients. It is not called if nil.
Log func(error)
}

// Server is an implementation of drpc.Server to serve drpc connections.
Expand Down Expand Up @@ -67,6 +73,7 @@ func (s *Server) ServeOne(ctx context.Context, tr drpc.Transport) (err error) {
// on new connections.
func (s *Server) Serve(ctx context.Context, lis net.Listener) (err error) {
tracker := drpcctx.NewTracker(ctx)
defer tracker.Wait()
defer tracker.Cancel()

tracker.Run(func(ctx context.Context) {
Expand All @@ -77,24 +84,35 @@ func (s *Server) Serve(ctx context.Context, lis net.Listener) (err error) {
for {
conn, err := lis.Accept()
if err != nil {
if ctx.Err() != nil {
return nil
}

if isTemporary(err) {
if s.opts.Log != nil {
s.opts.Log(err)
}

t := time.NewTimer(500 * time.Millisecond)
select {
case <-t.C:
case <-ctx.Done():
t.Stop()
return nil
}

continue
}
select {
case <-ctx.Done():
tracker.Wait()
return nil
default:
tracker.Cancel()
tracker.Wait()
return errs.Wrap(err)
}

return errs.Wrap(err)
}

// TODO(jeff): connection limits?
tracker.Run(func(ctx context.Context) {
// TODO(jeff): handle this error?
_ = s.ServeOne(ctx, conn)
err := s.ServeOne(ctx, conn)
if err != nil && s.opts.Log != nil {
s.opts.Log(err)
}
})
}
}
Expand Down
48 changes: 48 additions & 0 deletions drpcserver/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.

package drpcserver

import (
"context"
"net"
"testing"

"github.com/zeebo/assert"

"storj.io/drpc/drpcctx"
)

func TestServerTemporarySleep(t *testing.T) {
ctx := drpcctx.NewTracker(context.Background())
defer ctx.Wait()
defer ctx.Cancel()

calls := 0
l := listener(func() (net.Conn, error) {
calls++
switch calls {
case 1:
case 2:
ctx.Cancel()
default:
panic("spinning on temporary error")
}

return nil, new(temporaryError)
})

assert.NoError(t, New(nil).Serve(ctx, l))
}

type listener func() (net.Conn, error)

func (l listener) Accept() (net.Conn, error) { return l() }
func (l listener) Close() error { return nil }
func (l listener) Addr() net.Addr { return nil }

type temporaryError struct{}

func (temporaryError) Error() string { return "temporary error" }
func (temporaryError) Timeout() bool { return false }
func (temporaryError) Temporary() bool { return true }
1 change: 1 addition & 0 deletions drpcserver/util.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.

//go:build !windows
// +build !windows

package drpcserver
Expand Down
5 changes: 5 additions & 0 deletions drpcstream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ type Options struct {
// call RawFlush dynamically.
ManualFlush bool

// MaximumBufferSize causes the Stream to drop any internal buffers that
// are larger than this amount to control maximum memory usage at the
// expense of more allocations. 0 is unlimited.
MaximumBufferSize int

// Internal contains options that are for internal use only.
Internal drpcopts.Stream
}
Expand Down
16 changes: 12 additions & 4 deletions drpcstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ type Options struct {
// call RawFlush dynamically.
ManualFlush bool

// MaximumBufferSize causes the Stream to drop any internal buffers that
// are larger than this amount to control maximum memory usage at the
// expense of more allocations. 0 is unlimited.
MaximumBufferSize int

// Internal contains options that are for internal use only.
Internal drpcopts.Stream
}
Expand Down Expand Up @@ -385,11 +390,14 @@ func (s *Stream) MsgSend(msg drpc.Message, enc drpc.Encoding) (err error) {
s.write.Lock()
defer s.write.Unlock()

s.wbuf, err = drpcenc.MarshalAppend(msg, enc, s.wbuf[:0])
wbuf, err := drpcenc.MarshalAppend(msg, enc, s.wbuf[:0])
if err != nil {
return errs.Wrap(err)
}
if err := s.rawWriteLocked(drpcwire.KindMessage, s.wbuf); err != nil {
if s.opts.MaximumBufferSize == 0 || len(wbuf) < s.opts.MaximumBufferSize {
s.wbuf = wbuf
}
if err := s.rawWriteLocked(drpcwire.KindMessage, wbuf); err != nil {
return err
}
if !s.opts.ManualFlush {
Expand Down Expand Up @@ -426,7 +434,7 @@ var (
// SendError terminates the stream and sends the error to the remote. It is a no-op if
// the stream is already terminated.
func (s *Stream) SendError(serr error) (err error) {
s.log("CALL", func() string { return fmt.Sprintf("SendError(%v)", err) })
s.log("CALL", func() string { return fmt.Sprintf("SendError(%v)", serr) })

s.mu.Lock()
if s.sigs.term.IsSet() {
Expand Down Expand Up @@ -498,7 +506,7 @@ func (s *Stream) Cancel(err error) {
s.mu.Lock()
defer s.mu.Unlock()

if s.sigs.term.IsSet() {
if s.sigs.term.IsSet() && s.write.Unlocked() && s.read.Unlocked() {
return
}

Expand Down
26 changes: 26 additions & 0 deletions drpcstream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,29 @@ func TestStream_ContextCancel(t *testing.T) {
<-st.Context().Done()
<-child.Done()
}

func TestStream_ConcurrentCloseCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

pr, pw := io.Pipe()
defer func() { _ = pr.Close() }()
defer func() { _ = pw.Close() }()

st := New(ctx, 0, drpcwire.NewWriter(pw, 0))

// start the Close call
errch := make(chan error, 1)
go func() { errch <- st.Close() }()

// wait for the close to begin writing
_, err := pr.Read(make([]byte, 1))
assert.NoError(t, err)

// cancel the context and close the transport
st.Cancel(context.Canceled)
assert.NoError(t, pw.Close())

// we should always receive the canceled error
assert.That(t, errors.Is(<-errch, context.Canceled))
}
6 changes: 4 additions & 2 deletions examples/drpc/go.mod
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
module storj.io/drpc/examples/drpc

go 1.16
go 1.17

require (
google.golang.org/protobuf v1.26.0
google.golang.org/protobuf v1.27.1
storj.io/drpc v0.0.17
)

require github.com/zeebo/errs v1.2.2 // indirect

replace storj.io/drpc => ../..
4 changes: 2 additions & 2 deletions examples/drpc/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ github.com/zeebo/errs v1.2.2/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtC
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
4 changes: 2 additions & 2 deletions examples/drpc/pb/sesamestreet.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions examples/drpc_and_http/go.mod
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
module storj.io/drpc/examples/drpc_and_http

go 1.16
go 1.17

require (
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
google.golang.org/protobuf v1.26.0
google.golang.org/protobuf v1.27.1
storj.io/drpc v0.0.17
)

require github.com/zeebo/errs v1.2.2 // indirect

replace storj.io/drpc => ../..
4 changes: 2 additions & 2 deletions examples/drpc_and_http/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
4 changes: 2 additions & 2 deletions examples/drpc_and_http/pb/sesamestreet.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion examples/grpc/go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
module storj.io/drpc/examples/grpc

go 1.16
go 1.17

require (
google.golang.org/grpc v1.36.0
google.golang.org/protobuf v1.26.0
)

require (
github.com/golang/protobuf v1.5.0 // indirect
golang.org/x/net v0.0.0-20190311183353-d8887717615a // indirect
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a // indirect
golang.org/x/text v0.3.0 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
)
4 changes: 2 additions & 2 deletions examples/grpc/pb/sesamestreet.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 11 additions & 2 deletions examples/grpc_and_drpc/go.mod
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
module storj.io/drpc/examples/grpc_and_drpc

go 1.16
go 1.17

require (
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
google.golang.org/grpc v1.36.0
google.golang.org/protobuf v1.26.0
google.golang.org/protobuf v1.27.1
storj.io/drpc v0.0.17
)

require (
github.com/golang/protobuf v1.5.0 // indirect
github.com/zeebo/errs v1.2.2 // indirect
golang.org/x/net v0.0.0-20190311183353-d8887717615a // indirect
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a // indirect
golang.org/x/text v0.3.0 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
)

replace storj.io/drpc => ../..
4 changes: 2 additions & 2 deletions examples/grpc_and_drpc/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
Loading

0 comments on commit 5ac9cad

Please sign in to comment.