Skip to content

Commit

Permalink
drpcstream: set cancel state more often
Browse files Browse the repository at this point in the history
if a Close has started, and we Cancel, we should still
set the cancelled state while Close is still executing.
this is so that we don't surface the underying transport
being closed errors.

Change-Id: I2f3924f7b03c3952ab58b9952e4ae18ef1a5a88f
  • Loading branch information
zeebo committed Jan 26, 2022
1 parent ca0e3b1 commit f6e3694
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 2 deletions.
4 changes: 2 additions & 2 deletions drpcstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ var (
// SendError terminates the stream and sends the error to the remote. It is a no-op if
// the stream is already terminated.
func (s *Stream) SendError(serr error) (err error) {
s.log("CALL", func() string { return fmt.Sprintf("SendError(%v)", err) })
s.log("CALL", func() string { return fmt.Sprintf("SendError(%v)", serr) })

s.mu.Lock()
if s.sigs.term.IsSet() {
Expand Down Expand Up @@ -506,7 +506,7 @@ func (s *Stream) Cancel(err error) {
s.mu.Lock()
defer s.mu.Unlock()

if s.sigs.term.IsSet() {
if s.sigs.term.IsSet() && s.write.Unlocked() && s.read.Unlocked() {
return
}

Expand Down
26 changes: 26 additions & 0 deletions drpcstream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,29 @@ func TestStream_ContextCancel(t *testing.T) {
<-st.Context().Done()
<-child.Done()
}

func TestStream_ConcurrentCloseCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

pr, pw := io.Pipe()
defer func() { _ = pr.Close() }()
defer func() { _ = pw.Close() }()

st := New(ctx, 0, drpcwire.NewWriter(pw, 0))

// start the Close call
errch := make(chan error, 1)
go func() { errch <- st.Close() }()

// wait for the close to begin writing
_, err := pr.Read(make([]byte, 1))
assert.NoError(t, err)

// cancel the context and close the transport
st.Cancel(context.Canceled)
assert.NoError(t, pw.Close())

// we should always receive the canceled error
assert.That(t, errors.Is(<-errch, context.Canceled))
}

0 comments on commit f6e3694

Please sign in to comment.