Skip to content

Commit

Permalink
drpcmanager: cancel stream with error when terminated
Browse files Browse the repository at this point in the history
Terminations were canceling streams with `context.Canceled` instead
of surfacing the error, causing unexpected behavior.
  • Loading branch information
kylecarbs committed Apr 24, 2022
1 parent 41e8fa5 commit 8ebbaf4
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
3 changes: 2 additions & 1 deletion drpcmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ func (m *Manager) manageStream(ctx context.Context, stream *drpcstream.Stream) {

select {
case <-m.sigs.term.Signal():
stream.Cancel(context.Canceled)
err, _ := m.sigs.term.Get()
stream.Cancel(err)
<-m.sterm
return

Expand Down
11 changes: 8 additions & 3 deletions internal/integration/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package integration

import (
"context"
"errors"
"io"
"testing"

"github.com/zeebo/assert"
Expand Down Expand Up @@ -80,13 +82,15 @@ func TestTransport_ErrorCausesCancel(t *testing.T) {

// create a channel to signal when the rpc has started
started := make(chan struct{})
errs := make(chan error, 2)
errs := make(chan error, 1)
closed := make(chan struct{})

// create a server that signals then waits for the context to die
cli, close := createConnection(impl{
Method2Fn: func(stream DRPCService_Method2Stream) error {
started <- struct{}{}
errs <- stream.MsgRecv(nil, Encoding)
closed <- struct{}{}
return nil
},
})
Expand All @@ -96,6 +100,7 @@ func TestTransport_ErrorCausesCancel(t *testing.T) {
ctx.Run(func(ctx context.Context) {
stream, _ := cli.Method2(ctx)
started <- struct{}{}
<-closed
errs <- stream.MsgRecv(nil, Encoding)
})

Expand All @@ -111,6 +116,6 @@ func TestTransport_ErrorCausesCancel(t *testing.T) {
assert.NoError(t, cli.DRPCConn().(*drpcconn.Conn).Transport().Close())

// ensure both of the errors we sent are canceled
assert.Equal(t, <-errs, context.Canceled)
assert.Equal(t, <-errs, context.Canceled)
assert.True(t, errors.Is(<-errs, io.EOF))
assert.True(t, errors.Is(<-errs, io.ErrClosedPipe))
}

0 comments on commit 8ebbaf4

Please sign in to comment.