From 9206537a4db76809da6ec768a0c5e45ddb618ef5 Mon Sep 17 00:00:00 2001 From: Jeff Wendling Date: Wed, 22 Jun 2022 14:12:28 -0400 Subject: [PATCH] drpcmanager: expose error from transport failures A cancel can sometimes only be performed by closing the underlying transport because that is the only way to interrupt a read/write syscall. But, rather than assume that every error from the transport failing is due to a cancel, only assume that an io.EOF means that. This can help with debugging when the remote side dies due things besides a "clean" close of the underlying transport. Closes #31 Change-Id: Ia655d07984f6abe0492e7f9300d2638a1c82b4d4 --- drpcmanager/manager.go | 8 +++++++- internal/integration/transport_test.go | 13 ++++++++----- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/drpcmanager/manager.go b/drpcmanager/manager.go index de9edc1..57926f0 100644 --- a/drpcmanager/manager.go +++ b/drpcmanager/manager.go @@ -5,7 +5,9 @@ package drpcmanager import ( "context" + "errors" "fmt" + "io" "time" "github.com/zeebo/errs" @@ -294,7 +296,11 @@ func (m *Manager) manageStream(ctx context.Context, stream *drpcstream.Stream) { select { case <-m.sigs.term.Signal(): - stream.Cancel(context.Canceled) + err := m.sigs.term.Err() + if errors.Is(err, io.EOF) { + err = context.Canceled + } + stream.Cancel(err) <-m.sterm return diff --git a/internal/integration/transport_test.go b/internal/integration/transport_test.go index d86675b..55f6a42 100644 --- a/internal/integration/transport_test.go +++ b/internal/integration/transport_test.go @@ -5,6 +5,8 @@ package integration import ( "context" + "errors" + "io" "testing" "github.com/zeebo/assert" @@ -80,13 +82,14 @@ func TestTransport_ErrorCausesCancel(t *testing.T) { // create a channel to signal when the rpc has started started := make(chan struct{}) - errs := make(chan error, 2) + serr := make(chan error, 1) + cerr := make(chan error, 1) // create a server that signals then waits for the context to die cli, close := createConnection(impl{ Method2Fn: func(stream DRPCService_Method2Stream) error { started <- struct{}{} - errs <- stream.MsgRecv(nil, Encoding) + serr <- stream.MsgRecv(nil, Encoding) return nil }, }) @@ -96,7 +99,7 @@ func TestTransport_ErrorCausesCancel(t *testing.T) { ctx.Run(func(ctx context.Context) { stream, _ := cli.Method2(ctx) started <- struct{}{} - errs <- stream.MsgRecv(nil, Encoding) + cerr <- stream.MsgRecv(nil, Encoding) }) // wait for it to be started. it is important to wait for @@ -111,6 +114,6 @@ func TestTransport_ErrorCausesCancel(t *testing.T) { assert.NoError(t, cli.DRPCConn().(*drpcconn.Conn).Transport().Close()) // ensure both of the errors we sent are canceled - assert.Equal(t, <-errs, context.Canceled) - assert.Equal(t, <-errs, context.Canceled) + assert.That(t, errors.Is(<-serr, context.Canceled)) + assert.That(t, errors.Is(<-cerr, io.ErrClosedPipe)) }