diff --git a/pkg/client/interfaces/options.go b/pkg/client/interfaces/options.go index 9764e02d..5be8ed22 100644 --- a/pkg/client/interfaces/options.go +++ b/pkg/client/interfaces/options.go @@ -75,8 +75,8 @@ func (o *ClientOptions) Parse() error { return nil } -func (c *ClientOptions) InspectMessage() bool { - return c.AutoFlushReplyDelta != 0 +func (o *ClientOptions) InspectMessage() bool { + return o.AutoFlushReplyDelta != 0 } func (o *PreRecordedTranscriptionOptions) Check() error { diff --git a/pkg/client/live/client.go b/pkg/client/live/client.go index a4b2d774..5f540a03 100644 --- a/pkg/client/live/client.go +++ b/pkg/client/live/client.go @@ -27,6 +27,10 @@ import ( interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces" ) +type controlMessage struct { + Type string `json:"type"` +} + /* NewForDemo creates a new websocket connection with all default options @@ -120,34 +124,34 @@ func (c *Client) Connect() bool { if c.retryCnt == 0 { c.retryCnt = DefaultConnectRetry } - return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt)) != nil + return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt), true) != nil } // ConnectWithCancel performs a websocket connection with specified number of retries and providing a // cancel function to stop the connection func (c *Client) ConnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int) bool { - return c.internalConnectWithCancel(ctx, ctxCancel, retryCnt) != nil + return c.internalConnectWithCancel(ctx, ctxCancel, retryCnt, true) != nil } // AttemptReconnect performs a reconnect after failing retries func (c *Client) AttemptReconnect(ctx context.Context, retries int64) bool { c.retry = true c.ctx, c.ctxCancel = context.WithCancel(ctx) - return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(retries)) != nil + return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(retries), true) != nil } // AttemptReconnect performs a reconnect after failing retries and providing a cancel function func (c *Client) AttemptReconnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retries int64) bool { c.retry = true - return c.internalConnectWithCancel(ctx, ctxCancel, int(retries)) != nil + return c.internalConnectWithCancel(ctx, ctxCancel, int(retries), true) != nil } func (c *Client) internalConnect() *websocket.Conn { - return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt)) + return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt), false) } //nolint:funlen // this is a complex function. keep as is -func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int) *websocket.Conn { +func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int, lock bool) *websocket.Conn { klog.V(7).Infof("live.Connect() ENTER\n") // set the context @@ -162,6 +166,13 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex return nil } + // lock conn access + if lock { + klog.V(3).Infof("Locking connection mutex\n") + c.muConn.Lock() + defer c.muConn.Unlock() + } + // if the connection is good, return it otherwise, attempt reconnect if c.wsconn != nil { select { @@ -170,6 +181,7 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex klog.V(7).Infof("live.ConnectWithRetry() LEAVE\n") return nil default: + klog.V(7).Infof("Connection is good. Return object.") klog.V(7).Infof("live.ConnectWithRetry() LEAVE\n") return c.wsconn @@ -228,10 +240,6 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex } klog.V(5).Infof("Connecting to %s\n", url) - // a single connection attempt - // Note: not using defer here because we arent leaving the scope of the function - c.muConn.Lock() - // perform the websocket connection ws, res, err := dialer.DialContext(c.ctx, url, myHeader) if res != nil { @@ -241,7 +249,6 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex if err != nil { klog.V(1).Infof("Cannot connect to websocket: %s\n", c.cOptions.Host) klog.V(1).Infof("Dialer failed. Err: %v\n", err) - c.muConn.Unlock() continue } @@ -249,9 +256,6 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex c.wsconn = ws c.retry = true - // unlock the connection - c.muConn.Unlock() - // kick off threads to listen for messages and ping/keepalive go c.listen() if c.cOptions.EnableKeepAlive { @@ -282,6 +286,7 @@ func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel contex return nil } +//nolint:funlen,gocyclo // this is a complex function. keep as is func (c *Client) listen() { klog.V(6).Infof("live.listen() ENTER\n") @@ -312,6 +317,20 @@ func (c *Client) listen() { // graceful close c.closeWs(false) + klog.V(6).Infof("live.listen() LEAVE\n") + return + case strings.Contains(errStr, UseOfClosedSocket): + klog.V(3).Infof("Probable graceful websocket close: %v\n", err) + + // send error on callback + sendErr := c.sendError(err) + if sendErr != nil { + klog.V(1).Infof("listen: Fatal socket error. Err: %v\n", sendErr) + } + + // fatal close + c.closeWs(false) + klog.V(6).Infof("live.listen() LEAVE\n") return case strings.Contains(errStr, FatalReadSocketErr): @@ -456,19 +475,19 @@ func (c *Client) WriteBinary(byData []byte) error { klog.V(7).Infof("live.WriteBinary() ENTER\n") // doing a write, need to lock + c.muConn.Lock() + defer c.muConn.Unlock() + + // get the connection ws := c.internalConnect() if ws == nil { err := ErrInvalidConnection - klog.V(4).Infof("c.Connect() is nil. Err: %v\n", err) + klog.V(4).Infof("c.internalConnect() is nil. Err: %v\n", err) klog.V(7).Infof("live.WriteBinary() LEAVE\n") return err } - // doing a write, need to lock - c.muConn.Lock() - defer c.muConn.Unlock() - if err := ws.WriteMessage( websocket.BinaryMessage, byData, @@ -492,16 +511,6 @@ managing the live transcription session on the Deepgram server. func (c *Client) WriteJSON(payload interface{}) error { klog.V(7).Infof("live.WriteJSON() ENTER\n") - // doing a write, need to lock - ws := c.internalConnect() - if ws == nil { - err := ErrInvalidConnection - klog.V(4).Infof("c.Connect() is nil. Err: %v\n", err) - klog.V(7).Infof("live.WriteJSON() LEAVE\n") - - return err - } - byData, err := json.Marshal(payload) if err != nil { klog.V(1).Infof("WriteJSON json.Marshal failed. Err: %v\n", err) @@ -513,6 +522,16 @@ func (c *Client) WriteJSON(payload interface{}) error { c.muConn.Lock() defer c.muConn.Unlock() + // doing a write, need to lock + ws := c.internalConnect() + if ws == nil { + err := ErrInvalidConnection + klog.V(4).Infof("c.internalConnect() is nil. Err: %v\n", err) + klog.V(7).Infof("live.WriteJSON() LEAVE\n") + + return err + } + if err := ws.WriteMessage( websocket.TextMessage, byData, @@ -548,27 +567,92 @@ func (c *Client) Write(p []byte) (int, error) { return byteLen, nil } +func (c *Client) KeepAlive() error { + klog.V(7).Infof("live.KeepAlive() ENTER\n") + + err := c.WriteJSON(controlMessage{Type: "KeepAlive"}) + if err != nil { + klog.V(1).Infof("Finalize failed. Err: %v\n", err) + klog.V(7).Infof("live.KeepAlive() LEAVE\n") + + return err + } + + klog.V(4).Infof("KeepAlive Succeeded\n") + klog.V(7).Infof("live.KeepAlive() LEAVE\n") + + return err +} + func (c *Client) Finalize() error { - klog.V(7).Infof("live.Finalize() ENTER\n") + klog.V(7).Infof("live.KeepAlive() ENTER\n") + + err := c.WriteJSON(controlMessage{Type: "Finalize"}) + if err != nil { + klog.V(1).Infof("Finalize failed. Err: %v\n", err) + klog.V(7).Infof("live.Finalize() LEAVE\n") + + return err + } + + klog.V(4).Infof("Finalize Succeeded\n") + klog.V(7).Infof("live.Finalize() LEAVE\n") + + return err +} + +func (c *Client) closeStream(lock bool) error { + klog.V(7).Infof("live.closeStream() ENTER\n") + + // doing a write, need to lock + if lock { + c.muConn.Lock() + defer c.muConn.Unlock() + } + + err := c.wsconn.WriteMessage(websocket.TextMessage, []byte("{ \"type\": \"CloseStream\" }")) + if err != nil { + klog.V(1).Infof("WriteMessage failed. Err: %v\n", err) + klog.V(7).Infof("live.closeStream() LEAVE\n") + + return err + } + + klog.V(4).Infof("closeStream Succeeded\n") + klog.V(7).Infof("live.closeStream() LEAVE\n") + + return err +} + +func (c *Client) normalClosure(lock bool) error { + klog.V(7).Infof("live.normalClosure() ENTER\n") // doing a write, need to lock + if lock { + c.muConn.Lock() + defer c.muConn.Unlock() + } + ws := c.internalConnect() if ws == nil { err := ErrInvalidConnection - klog.V(4).Infof("c.Connect() is nil. Err: %v\n", err) - klog.V(7).Infof("live.Finalize() LEAVE\n") + klog.V(4).Infof("c.internalConnect() is nil. Err: %v\n", err) + klog.V(7).Infof("live.normalClosure() LEAVE\n") return err } - // doing a write, need to lock - c.muConn.Lock() - defer c.muConn.Unlock() - - err := c.wsconn.WriteMessage(websocket.TextMessage, []byte("{ \"type\": \"Finalize\" }")) + err := c.wsconn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + switch err { + case websocket.ErrCloseSent: + klog.V(3).Infof("ErrCloseSent was sent. Err: %v\n", err) + case nil: + klog.V(4).Infof("normalClosure Succeeded\n") + default: + klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", err) + } - klog.V(4).Infof("Finalize Succeeded\n") - klog.V(7).Infof("live.Finalize() LEAVE\n") + klog.V(7).Infof("live.normalClosure() LEAVE\n") return err } @@ -592,21 +676,11 @@ func (c *Client) closeWs(fatal bool) { if c.wsconn != nil && !fatal { // deepgram requires a close message to be sent - errDg := c.wsconn.WriteMessage(websocket.TextMessage, []byte("{ \"type\": \"CloseStream\" }")) - if errDg == websocket.ErrCloseSent { - klog.V(3).Infof("Failed to send CloseNormalClosure. Err: %v\n", errDg) - } else if errDg != nil { - klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", errDg) - } + _ = c.closeStream(false) time.Sleep(TerminationSleep) // allow time for server to register closure // websocket protocol message - errProto := c.wsconn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) - if errProto == websocket.ErrCloseSent { - klog.V(3).Infof("Failed to send CloseNormalClosure. Err: %v\n", errProto) - } else if errProto != nil { - klog.V(1).Infof("Failed to send CloseNormalClosure. Err: %v\n", errProto) - } + _ = c.normalClosure(false) time.Sleep(TerminationSleep) // allow time for server to register closure } @@ -650,28 +724,14 @@ func (c *Client) ping() { klog.V(5).Infof("Starting ping...") counter++ - ws := c.internalConnect() - if ws == nil { - klog.V(1).Infof("ping Connection is not valid\n") - klog.V(6).Infof("live.ping() LEAVE\n") - return - } - - // doing a write, need to lock. - // Note: not using defer here because we arent leaving the scope of the function - c.muConn.Lock() - // deepgram keepalive message klog.V(5).Infof("Sending Deepgram KeepAlive message...\n") - err := c.wsconn.WriteMessage(websocket.TextMessage, []byte("{ \"type\": \"KeepAlive\" }")) + err := c.KeepAlive() if err == nil { klog.V(5).Infof("Ping sent!") } else { klog.V(1).Infof("Failed to send Deepgram KeepAlive. Err: %v\n", err) } - - // release - c.muConn.Unlock() } } } @@ -692,13 +752,6 @@ func (c *Client) flush() { klog.V(6).Infof("live.flush() LEAVE\n") return case <-ticker.C: - ws := c.internalConnect() - if ws == nil { - klog.V(1).Infof("flush Connection is not valid\n") - klog.V(6).Infof("live.flush() LEAVE\n") - return - } - // doing a read, need to lock. c.muFinal.Lock() @@ -762,7 +815,7 @@ func (c *Client) errorToResponse(err error) *msginterfaces.ErrorResponse { } else { errorCode = UnknownDeepgramErr errorNum = UnknownDeepgramErr - errorDesc = UnknownDeepgramErr + errorDesc = err.Error() } response := &msginterfaces.ErrorResponse{ @@ -818,7 +871,7 @@ func (c *Client) inspectMessage(mr *msginterfaces.MessageResponse) error { klog.V(7).Infof("client.inspectMessage() ENTER\n") sentence := strings.TrimSpace(mr.Channel.Alternatives[0].Transcript) - if len(mr.Channel.Alternatives) == 0 || len(sentence) == 0 { + if len(mr.Channel.Alternatives) == 0 || sentence == "" { klog.V(7).Info("inspectMessage is empty\n") klog.V(7).Infof("client.inspectMessage() LEAVE\n") return nil diff --git a/pkg/client/live/constants.go b/pkg/client/live/constants.go index fd9d1939..d1d13ed0 100644 --- a/pkg/client/live/constants.go +++ b/pkg/client/live/constants.go @@ -12,8 +12,7 @@ import ( // internal constants for retry, waits, back-off, etc. const ( - flushPeriod = 500 * time.Millisecond - flashInitialDelay = 3 * time.Second + flushPeriod = 500 * time.Millisecond pingPeriod = 5 * time.Second @@ -30,6 +29,7 @@ const ( // socket errors FatalReadSocketErr string = "read: can't assign requested address" FatalWriteSocketErr string = "write: broken pipe" + UseOfClosedSocket string = "use of closed network connection" UnknownDeepgramErr string = "unknown deepgram error" // socket successful close error