Skip to content

Commit

Permalink
drpcmanager: expose error from transport failures
Browse files Browse the repository at this point in the history
A cancel can sometimes only be performed by closing the underlying
transport because that is the only way to interrupt a read/write
syscall. But, rather than assume that every error from the transport
failing is due to a cancel, only assume that an io.EOF means that.
This can help with debugging when the remote side dies due things
besides a "clean" close of the underlying transport.

Closes #31

Change-Id: Ia655d07984f6abe0492e7f9300d2638a1c82b4d4
  • Loading branch information
zeebo committed Jun 22, 2022
1 parent 0a6ae7b commit 9206537
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 6 deletions.
8 changes: 7 additions & 1 deletion drpcmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ package drpcmanager

import (
"context"
"errors"
"fmt"
"io"
"time"

"github.com/zeebo/errs"
Expand Down Expand Up @@ -294,7 +296,11 @@ func (m *Manager) manageStream(ctx context.Context, stream *drpcstream.Stream) {

select {
case <-m.sigs.term.Signal():
stream.Cancel(context.Canceled)
err := m.sigs.term.Err()
if errors.Is(err, io.EOF) {
err = context.Canceled
}
stream.Cancel(err)
<-m.sterm
return

Expand Down
13 changes: 8 additions & 5 deletions internal/integration/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package integration

import (
"context"
"errors"
"io"
"testing"

"github.com/zeebo/assert"
Expand Down Expand Up @@ -80,13 +82,14 @@ func TestTransport_ErrorCausesCancel(t *testing.T) {

// create a channel to signal when the rpc has started
started := make(chan struct{})
errs := make(chan error, 2)
serr := make(chan error, 1)
cerr := make(chan error, 1)

// create a server that signals then waits for the context to die
cli, close := createConnection(impl{
Method2Fn: func(stream DRPCService_Method2Stream) error {
started <- struct{}{}
errs <- stream.MsgRecv(nil, Encoding)
serr <- stream.MsgRecv(nil, Encoding)
return nil
},
})
Expand All @@ -96,7 +99,7 @@ func TestTransport_ErrorCausesCancel(t *testing.T) {
ctx.Run(func(ctx context.Context) {
stream, _ := cli.Method2(ctx)
started <- struct{}{}
errs <- stream.MsgRecv(nil, Encoding)
cerr <- stream.MsgRecv(nil, Encoding)
})

// wait for it to be started. it is important to wait for
Expand All @@ -111,6 +114,6 @@ func TestTransport_ErrorCausesCancel(t *testing.T) {
assert.NoError(t, cli.DRPCConn().(*drpcconn.Conn).Transport().Close())

// ensure both of the errors we sent are canceled
assert.Equal(t, <-errs, context.Canceled)
assert.Equal(t, <-errs, context.Canceled)
assert.That(t, errors.Is(<-serr, context.Canceled))
assert.That(t, errors.Is(<-cerr, io.ErrClosedPipe))
}

1 comment on commit 9206537

@storjrobot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit has been mentioned on Storj Community Forum (official). There might be relevant details there:

https://forum.storj.io/t/introducing-drpc-our-replacement-for-grpc/13486/13

Please sign in to comment.