Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make All Socket Write Operations Atomic #243

Merged
merged 1 commit into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/client/interfaces/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func (o *ClientOptions) Parse() error {
return nil
}

func (c *ClientOptions) InspectMessage() bool {
return c.AutoFlushReplyDelta != 0
func (o *ClientOptions) InspectMessage() bool {
return o.AutoFlushReplyDelta != 0
}

func (o *PreRecordedTranscriptionOptions) Check() error {
Expand Down
203 changes: 128 additions & 75 deletions pkg/client/live/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (
interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces"
)

type controlMessage struct {
Type string `json:"type"`
}

/*
NewForDemo creates a new websocket connection with all default options

Expand Down Expand Up @@ -120,34 +124,34 @@ func (c *Client) Connect() bool {
if c.retryCnt == 0 {
c.retryCnt = DefaultConnectRetry
}
return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt)) != nil
return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt), true) != nil
}

// ConnectWithCancel performs a websocket connection with specified number of retries and providing a
// cancel function to stop the connection
func (c *Client) ConnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int) bool {
return c.internalConnectWithCancel(ctx, ctxCancel, retryCnt) != nil
return c.internalConnectWithCancel(ctx, ctxCancel, retryCnt, true) != nil
}

// AttemptReconnect performs a reconnect after failing retries
func (c *Client) AttemptReconnect(ctx context.Context, retries int64) bool {
c.retry = true
c.ctx, c.ctxCancel = context.WithCancel(ctx)
return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(retries)) != nil
return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(retries), true) != nil
}

// AttemptReconnect performs a reconnect after failing retries and providing a cancel function
func (c *Client) AttemptReconnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retries int64) bool {
c.retry = true
return c.internalConnectWithCancel(ctx, ctxCancel, int(retries)) != nil
return c.internalConnectWithCancel(ctx, ctxCancel, int(retries), true) != nil
}

func (c *Client) internalConnect() *websocket.Conn {
return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt))
return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt), false)
}

//nolint:funlen // this is a complex function. keep as is
func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int) *websocket.Conn {
func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int, lock bool) *websocket.Conn {
klog.V(7).Infof("live.Connect() ENTER\n")

// set the context
Expand All @@ -162,6 +166,13 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex
return nil
}

// lock conn access
if lock {
klog.V(3).Infof("Locking connection mutex\n")
c.muConn.Lock()
defer c.muConn.Unlock()
}

// if the connection is good, return it otherwise, attempt reconnect
if c.wsconn != nil {
select {
Expand All @@ -170,6 +181,7 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex
klog.V(7).Infof("live.ConnectWithRetry() LEAVE\n")
return nil
default:

klog.V(7).Infof("Connection is good. Return object.")
klog.V(7).Infof("live.ConnectWithRetry() LEAVE\n")
return c.wsconn
Expand Down Expand Up @@ -228,10 +240,6 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex
}
klog.V(5).Infof("Connecting to %s\n", url)

// a single connection attempt
// Note: not using defer here because we arent leaving the scope of the function
c.muConn.Lock()

// perform the websocket connection
ws, res, err := dialer.DialContext(c.ctx, url, myHeader)
if res != nil {
Expand All @@ -241,17 +249,13 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex
if err != nil {
klog.V(1).Infof("Cannot connect to websocket: %s\n", c.cOptions.Host)
klog.V(1).Infof("Dialer failed. Err: %v\n", err)
c.muConn.Unlock()
continue
}

// set the object to allow threads to function
c.wsconn = ws
c.retry = true

// unlock the connection
c.muConn.Unlock()

// kick off threads to listen for messages and ping/keepalive
go c.listen()
if c.cOptions.EnableKeepAlive {
Expand Down Expand Up @@ -282,6 +286,7 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex
return nil
}

//nolint:funlen,gocyclo // this is a complex function. keep as is
func (c *Client) listen() {
klog.V(6).Infof("live.listen() ENTER\n")

Expand Down Expand Up @@ -312,6 +317,20 @@ func (c *Client) listen() {
// graceful close
c.closeWs(false)

klog.V(6).Infof("live.listen() LEAVE\n")
return
case strings.Contains(errStr, UseOfClosedSocket):
klog.V(3).Infof("Probable graceful websocket close: %v\n", err)

// send error on callback
sendErr := c.sendError(err)
if sendErr != nil {
klog.V(1).Infof("listen: Fatal socket error. Err: %v\n", sendErr)
}

// fatal close
c.closeWs(false)

klog.V(6).Infof("live.listen() LEAVE\n")
return
case strings.Contains(errStr, FatalReadSocketErr):
Expand Down Expand Up @@ -456,19 +475,19 @@ func (c *Client) WriteBinary(byData []byte) error {
klog.V(7).Infof("live.WriteBinary() ENTER\n")

// doing a write, need to lock
c.muConn.Lock()
defer c.muConn.Unlock()

// get the connection
ws := c.internalConnect()
if ws == nil {
err := ErrInvalidConnection
klog.V(4).Infof("c.Connect() is nil. Err: %v\n", err)
klog.V(4).Infof("c.internalConnect() is nil. Err: %v\n", err)
klog.V(7).Infof("live.WriteBinary() LEAVE\n")

return err
}

// doing a write, need to lock
c.muConn.Lock()
defer c.muConn.Unlock()

if err := ws.WriteMessage(
websocket.BinaryMessage,
byData,
Expand All @@ -492,16 +511,6 @@ managing the live transcription session on the Deepgram server.
func (c *Client) WriteJSON(payload interface{}) error {
klog.V(7).Infof("live.WriteJSON() ENTER\n")

// doing a write, need to lock
ws := c.internalConnect()
if ws == nil {
err := ErrInvalidConnection
klog.V(4).Infof("c.Connect() is nil. Err: %v\n", err)
klog.V(7).Infof("live.WriteJSON() LEAVE\n")

return err
}

byData, err := json.Marshal(payload)
if err != nil {
klog.V(1).Infof("WriteJSON json.Marshal failed. Err: %v\n", err)
Expand All @@ -513,6 +522,16 @@ func (c *Client) WriteJSON(payload interface{}) error {
c.muConn.Lock()
defer c.muConn.Unlock()

// doing a write, need to lock
ws := c.internalConnect()
if ws == nil {
err := ErrInvalidConnection
klog.V(4).Infof("c.internalConnect() is nil. Err: %v\n", err)
klog.V(7).Infof("live.WriteJSON() LEAVE\n")

return err
}

if err := ws.WriteMessage(
websocket.TextMessage,
byData,
Expand Down Expand Up @@ -548,27 +567,92 @@ func (c *Client) Write(p []byte) (int, error) {
return byteLen, nil
}

func (c *Client) KeepAlive() error {
klog.V(7).Infof("live.KeepAlive() ENTER\n")

err := c.WriteJSON(controlMessage{Type: "KeepAlive"})
if err != nil {
klog.V(1).Infof("Finalize failed. Err: %v\n", err)
klog.V(7).Infof("live.KeepAlive() LEAVE\n")

return err
}

klog.V(4).Infof("KeepAlive Succeeded\n")
klog.V(7).Infof("live.KeepAlive() LEAVE\n")

return err
}

func (c *Client) Finalize() error {
klog.V(7).Infof("live.Finalize() ENTER\n")
klog.V(7).Infof("live.KeepAlive() ENTER\n")

err := c.WriteJSON(controlMessage{Type: "Finalize"})
if err != nil {
klog.V(1).Infof("Finalize failed. Err: %v\n", err)
klog.V(7).Infof("live.Finalize() LEAVE\n")

return err
}

klog.V(4).Infof("Finalize Succeeded\n")
klog.V(7).Infof("live.Finalize() LEAVE\n")

return err
}

func (c *Client) closeStream(lock bool) error {
klog.V(7).Infof("live.closeStream() ENTER\n")

// doing a write, need to lock
if lock {
c.muConn.Lock()
defer c.muConn.Unlock()
}

err := c.wsconn.WriteMessage(websocket.TextMessage, []byte("{ \"type\": \"CloseStream\" }"))
if err != nil {
klog.V(1).Infof("WriteMessage failed. Err: %v\n", err)
klog.V(7).Infof("live.closeStream() LEAVE\n")

return err
}

klog.V(4).Infof("closeStream Succeeded\n")
klog.V(7).Infof("live.closeStream() LEAVE\n")

return err
}

func (c *Client) normalClosure(lock bool) error {
klog.V(7).Infof("live.normalClosure() ENTER\n")

// doing a write, need to lock
if lock {
c.muConn.Lock()
defer c.muConn.Unlock()
}

ws := c.internalConnect()
if ws == nil {
err := ErrInvalidConnection
klog.V(4).Infof("c.Connect() is nil. Err: %v\n", err)
klog.V(7).Infof("live.Finalize() LEAVE\n")
klog.V(4).Infof("c.internalConnect() is nil. Err: %v\n", err)
klog.V(7).Infof("live.normalClosure() LEAVE\n")

return err
}

// doing a write, need to lock
c.muConn.Lock()
defer c.muConn.Unlock()

err := c.wsconn.WriteMessage(websocket.TextMessage, []byte("{ \"type\": \"Finalize\" }"))
err := c.wsconn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
switch err {
case websocket.ErrCloseSent:
klog.V(3).Infof("ErrCloseSent was sent. Err: %v\n", err)
case nil:
klog.V(4).Infof("normalClosure Succeeded\n")
default:
klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", err)
}

klog.V(4).Infof("Finalize Succeeded\n")
klog.V(7).Infof("live.Finalize() LEAVE\n")
klog.V(7).Infof("live.normalClosure() LEAVE\n")

return err
}
Expand All @@ -592,21 +676,11 @@ func (c *Client) closeWs(fatal bool) {

if c.wsconn != nil && !fatal {
// deepgram requires a close message to be sent
errDg := c.wsconn.WriteMessage(websocket.TextMessage, []byte("{ \"type\": \"CloseStream\" }"))
if errDg == websocket.ErrCloseSent {
klog.V(3).Infof("Failed to send CloseNormalClosure. Err: %v\n", errDg)
} else if errDg != nil {
klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", errDg)
}
_ = c.closeStream(false)
time.Sleep(TerminationSleep) // allow time for server to register closure

// websocket protocol message
errProto := c.wsconn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if errProto == websocket.ErrCloseSent {
klog.V(3).Infof("Failed to send CloseNormalClosure. Err: %v\n", errProto)
} else if errProto != nil {
klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", errProto)
}
_ = c.normalClosure(false)
time.Sleep(TerminationSleep) // allow time for server to register closure
}

Expand Down Expand Up @@ -650,28 +724,14 @@ func (c *Client) ping() {
klog.V(5).Infof("Starting ping...")
counter++

ws := c.internalConnect()
if ws == nil {
klog.V(1).Infof("ping Connection is not valid\n")
klog.V(6).Infof("live.ping() LEAVE\n")
return
}

// doing a write, need to lock.
// Note: not using defer here because we arent leaving the scope of the function
c.muConn.Lock()

// deepgram keepalive message
klog.V(5).Infof("Sending Deepgram KeepAlive message...\n")
err := c.wsconn.WriteMessage(websocket.TextMessage, []byte("{ \"type\": \"KeepAlive\" }"))
err := c.KeepAlive()
if err == nil {
klog.V(5).Infof("Ping sent!")
} else {
klog.V(1).Infof("Failed to send Deepgram KeepAlive. Err: %v\n", err)
}

// release
c.muConn.Unlock()
}
}
}
Expand All @@ -692,13 +752,6 @@ func (c *Client) flush() {
klog.V(6).Infof("live.flush() LEAVE\n")
return
case <-ticker.C:
ws := c.internalConnect()
if ws == nil {
klog.V(1).Infof("flush Connection is not valid\n")
klog.V(6).Infof("live.flush() LEAVE\n")
return
}

// doing a read, need to lock.
c.muFinal.Lock()

Expand Down Expand Up @@ -762,7 +815,7 @@ func (c *Client) errorToResponse(err error) *msginterfaces.ErrorResponse {
} else {
errorCode = UnknownDeepgramErr
errorNum = UnknownDeepgramErr
errorDesc = UnknownDeepgramErr
errorDesc = err.Error()
}

response := &msginterfaces.ErrorResponse{
Expand Down Expand Up @@ -818,7 +871,7 @@ func (c *Client) inspectMessage(mr *msginterfaces.MessageResponse) error {
klog.V(7).Infof("client.inspectMessage() ENTER\n")

sentence := strings.TrimSpace(mr.Channel.Alternatives[0].Transcript)
if len(mr.Channel.Alternatives) == 0 || len(sentence) == 0 {
if len(mr.Channel.Alternatives) == 0 || sentence == "" {
klog.V(7).Info("inspectMessage is empty\n")
klog.V(7).Infof("client.inspectMessage() LEAVE\n")
return nil
Expand Down
Loading
Loading