Skip to content

Commit

Permalink
Update for Race Condition on Cancel CTX and Stop
Browse files Browse the repository at this point in the history
  • Loading branch information
davidvonthenen committed Sep 9, 2024
1 parent 4de178e commit e1029a4
Showing 1 changed file with 33 additions and 23 deletions.
56 changes: 33 additions & 23 deletions pkg/client/listen/v1/websocket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,18 @@ func (c *Client) ConnectWithCancel(ctx context.Context, ctxCancel context.Cancel

// AttemptReconnect performs a reconnect after failing retries
func (c *Client) AttemptReconnect(ctx context.Context, retries int64) bool {
c.muConn.Lock()
c.retry = true
c.muConn.Unlock()
c.ctx, c.ctxCancel = context.WithCancel(ctx)
return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(retries), true) != nil
}

// AttemptReconnect performs a reconnect after failing retries and providing a cancel function
func (c *Client) AttemptReconnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retries int64) bool {
c.muConn.Lock()
c.retry = true
c.muConn.Unlock()
return c.internalConnectWithCancel(ctx, ctxCancel, int(retries), true) != nil
}

Expand All @@ -157,20 +161,20 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex
c.ctxCancel = ctxCancel
c.retryCnt = int64(retryCnt)

// we explicitly stopped and should not attempt to reconnect
if !c.retry {
klog.V(7).Infof("This connection has been terminated. Please either call with AttemptReconnect or create a new Client object using NewWebSocketClient.")
klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n")
return nil
}

// lock conn access
if lock {
klog.V(3).Infof("Locking connection mutex\n")
c.muConn.Lock()
defer c.muConn.Unlock()
}

// we explicitly stopped and should not attempt to reconnect
if !c.retry {
klog.V(7).Infof("This connection has been terminated. Please either call with AttemptReconnect or create a new Client object using NewWebSocketClient.")
klog.V(7).Infof("live.internalConnectWithCancel() LEAVE\n")
return nil
}

// if the connection is good, return it otherwise, attempt reconnect
if c.wsconn != nil {
select {
Expand Down Expand Up @@ -308,7 +312,7 @@ func (c *Client) listen() {
}

// fatal close
c.closeWs(true)
c.closeWs(true, false)

klog.V(6).Infof("live.listen() LEAVE\n")
return
Expand Down Expand Up @@ -343,15 +347,15 @@ func (c *Client) listen() {
klog.V(3).Infof("Graceful websocket close\n")

// graceful close
c.closeWs(false)
c.closeWs(false, false)

klog.V(6).Infof("live.listen() LEAVE\n")
return
case strings.Contains(errStr, UseOfClosedSocket):
klog.V(3).Infof("Probable graceful websocket close: %v\n", err)

// fatal close
c.closeWs(false)
c.closeWs(false, false)

klog.V(6).Infof("live.listen() LEAVE\n")
return
Expand All @@ -365,7 +369,7 @@ func (c *Client) listen() {
}

// fatal close
c.closeWs(true)
c.closeWs(true, false)

klog.V(6).Infof("live.listen() LEAVE\n")
return
Expand All @@ -379,11 +383,11 @@ func (c *Client) listen() {
}

// close the connection
c.closeWs(false)
c.closeWs(false, false)

klog.V(6).Infof("live.listen() LEAVE\n")
return
case (err == io.EOF || err == io.ErrUnexpectedEOF) && !c.retry:
case (err == io.EOF || err == io.ErrUnexpectedEOF):
klog.V(3).Infof("stream object EOF\n")

// send error on callback
Expand All @@ -393,7 +397,7 @@ func (c *Client) listen() {
}

// close the connection
c.closeWs(true)
c.closeWs(true, false)

klog.V(6).Infof("live.listen() LEAVE\n")
return
Expand All @@ -407,7 +411,7 @@ func (c *Client) listen() {
}

// close the connection
c.closeWs(true)
c.closeWs(true, false)

klog.V(6).Infof("live.listen() LEAVE\n")
return
Expand Down Expand Up @@ -465,7 +469,7 @@ func (c *Client) Stream(r io.Reader) error {
klog.V(1).Infof("Fatal socket error: %v\n", err)
klog.V(6).Infof("live.Stream() LEAVE\n")
return err
case (err == io.EOF || err == io.ErrUnexpectedEOF) && !c.retry:
case (err == io.EOF || err == io.ErrUnexpectedEOF):
klog.V(3).Infof("stream object EOF\n")
klog.V(6).Infof("live.Stream() LEAVE\n")
return err
Expand Down Expand Up @@ -691,15 +695,16 @@ func (c *Client) normalClosure(lock bool) error {
// Stop will send close message and shutdown websocket connection
func (c *Client) Stop() {
klog.V(3).Infof("Stopping...\n")
c.muConn.Lock()
c.retry = false
c.muConn.Unlock()

// exit gracefully
c.ctxCancel()
c.closeWs(false)
c.closeWs(false, true)
}

// closeWs closes the websocket connection
func (c *Client) closeWs(fatal bool) {
func (c *Client) closeWs(fatal bool, perm bool) {
klog.V(6).Infof("live.closeWs() closing channels...\n")

// doing a write, need to lock
Expand All @@ -716,6 +721,11 @@ func (c *Client) closeWs(fatal bool) {
time.Sleep(TerminationSleep) // allow time for server to register closure
}

// cancel the context because we are exiting exiting...
if perm {
c.ctxCancel()
}

if fatal || c.wsconn != nil {
// fire off close connection
err := c.router.CloseHelper(&msginterfaces.CloseResponse{
Expand Down Expand Up @@ -752,7 +762,7 @@ func (c *Client) ping() {
}

// fatal close
c.closeWs(true)
c.closeWs(true, false)

klog.V(6).Infof("live.ping() LEAVE\n")
return
Expand All @@ -767,7 +777,7 @@ func (c *Client) ping() {
klog.V(3).Infof("live.ping() Exiting\n")

// exit gracefully
c.closeWs(false)
c.closeWs(false, false)

klog.V(6).Infof("live.ping() LEAVE\n")
return
Expand Down Expand Up @@ -802,7 +812,7 @@ func (c *Client) flush() {
}

// fatal close
c.closeWs(true)
c.closeWs(true, false)

klog.V(6).Infof("live.flush() LEAVE\n")
return
Expand All @@ -817,7 +827,7 @@ func (c *Client) flush() {
klog.V(3).Infof("live.flush() Exiting\n")

// exit gracefully
c.closeWs(false)
c.closeWs(false, false)

klog.V(6).Infof("live.flush() LEAVE\n")
return
Expand Down

0 comments on commit e1029a4

Please sign in to comment.