Skip to content

Commit

Permalink
drpcmigrate: close default listener properly
Browse files Browse the repository at this point in the history
Closing dprcmux can hang forever, if default Listener is used (.Accept).

It should be closed similat to all other routers.

Change-Id: I51dd56f5c319d18b09df5db218d14b7dd2b93697
  • Loading branch information
elek authored and Elek, Márton committed Sep 13, 2022
1 parent 9206537 commit f85532f
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 1 deletion.
3 changes: 2 additions & 1 deletion drpcmigrate/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ func (m *ListenMux) Run(ctx context.Context) error {
for _, lis := range m.routes {
<-lis.done
}

m.def.err = Closed
close(m.def.done)
return m.err
}

Expand Down
76 changes: 76 additions & 0 deletions drpcmigrate/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"net"
"testing"
"time"

"github.com/zeebo/assert"
"github.com/zeebo/errs"
Expand Down Expand Up @@ -62,6 +63,81 @@ func TestMux(t *testing.T) {
}
}

func TestMuxLoopClose(t *testing.T) {
timeout := time.NewTimer(5 * time.Second)
processed := make(chan struct{})

run := func(lis net.Listener) error {
for {
_, err := lis.Accept()
if err != nil {
return err
}
processed <- struct{}{}
// usually conn is passed here to a new go routine
// assuming it's done without error
}
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

lis := newFakeListener(
newPrefixConn([]byte("prefix1data1"), nil),
newPrefixConn([]byte("prefix2data2"), nil),
newPrefixConn([]byte("prefix3data3"), nil),
)

mux := NewListenMux(lis, len("prefixN"))
lis1 := mux.Route("prefix1")
lis2 := mux.Route("prefix2")

expectedErrs := make(chan error, 3)
muxErr := make(chan error, 1)
go func() {
expectedErrs <- run(lis1)
}()
go func() {
expectedErrs <- run(lis2)

}()
go func() {
expectedErrs <- run(mux.Default())
}()

go func() {
muxErr <- mux.Run(ctx)
}()

for i := 0; i < 3; i++ {
select {
case <-processed:
case <-timeout.C:
t.Fatal("test is timed out")
}

}

// stopping the mux
cancel()

select {
case err := <-muxErr:
assert.NoError(t, err)
case <-timeout.C:
t.Fatal("test is timed out")
}

for i := 0; i < 3; i++ {
select {
case err := <-expectedErrs:
assert.Error(t, err)
case <-timeout.C:
t.Fatal("test is timed out")
}
}
}

func TestMuxAcceptError(t *testing.T) {
err := errs.New("problem")
mux := NewListenMux(newErrorListener(err), 0)
Expand Down

0 comments on commit f85532f

Please sign in to comment.