From 8ebbaf48bdff443e65c4136d4910734d2bbd7faa Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Sun, 24 Apr 2022 13:54:30 -0500 Subject: [PATCH] drpcmanager: cancel stream with error when terminated Terminations were canceling streams with `context.Canceled` instead of surfacing the error, causing unexpected behavior. --- drpcmanager/manager.go | 3 ++- internal/integration/transport_test.go | 11 ++++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/drpcmanager/manager.go b/drpcmanager/manager.go index 5723abe..9eb3812 100644 --- a/drpcmanager/manager.go +++ b/drpcmanager/manager.go @@ -291,7 +291,8 @@ func (m *Manager) manageStream(ctx context.Context, stream *drpcstream.Stream) { select { case <-m.sigs.term.Signal(): - stream.Cancel(context.Canceled) + err, _ := m.sigs.term.Get() + stream.Cancel(err) <-m.sterm return diff --git a/internal/integration/transport_test.go b/internal/integration/transport_test.go index d86675b..c72ff27 100644 --- a/internal/integration/transport_test.go +++ b/internal/integration/transport_test.go @@ -5,6 +5,8 @@ package integration import ( "context" + "errors" + "io" "testing" "github.com/zeebo/assert" @@ -80,13 +82,15 @@ func TestTransport_ErrorCausesCancel(t *testing.T) { // create a channel to signal when the rpc has started started := make(chan struct{}) - errs := make(chan error, 2) + errs := make(chan error, 1) + closed := make(chan struct{}) // create a server that signals then waits for the context to die cli, close := createConnection(impl{ Method2Fn: func(stream DRPCService_Method2Stream) error { started <- struct{}{} errs <- stream.MsgRecv(nil, Encoding) + closed <- struct{}{} return nil }, }) @@ -96,6 +100,7 @@ func TestTransport_ErrorCausesCancel(t *testing.T) { ctx.Run(func(ctx context.Context) { stream, _ := cli.Method2(ctx) started <- struct{}{} + <-closed errs <- stream.MsgRecv(nil, Encoding) }) @@ -111,6 +116,6 @@ func TestTransport_ErrorCausesCancel(t *testing.T) { assert.NoError(t, cli.DRPCConn().(*drpcconn.Conn).Transport().Close()) // ensure both of the errors we sent are canceled - assert.Equal(t, <-errs, context.Canceled) - assert.Equal(t, <-errs, context.Canceled) + assert.True(t, errors.Is(<-errs, io.EOF)) + assert.True(t, errors.Is(<-errs, io.ErrClosedPipe)) }