From cd5ffbe8d19465ea391cbf44b24a1ccf9ba771b2 Mon Sep 17 00:00:00 2001 From: Allan Guwatudde Date: Thu, 5 Dec 2024 18:48:55 +0300 Subject: [PATCH] Fix DecodeSignalMsg method --- clientcore/egress_consumer.go | 1 + common/resource.go | 52 +++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/clientcore/egress_consumer.go b/clientcore/egress_consumer.go index 86eed4a..1aa5be2 100644 --- a/clientcore/egress_consumer.go +++ b/clientcore/egress_consumer.go @@ -36,6 +36,7 @@ func NewEgressConsumerWebSocket(options *EgressOptions, wg *sync.WaitGroup) *Wor // like ($, 1)... OR just disallow just-in-time strategies, and make egress consumers // pre-establish N websocket connections + options.ConnectTimeout = 15 * time.Second ctx, cancel := context.WithTimeout(context.Background(), options.ConnectTimeout) defer cancel() diff --git a/common/resource.go b/common/resource.go index 1c629b8..2c40114 100644 --- a/common/resource.go +++ b/common/resource.go @@ -2,6 +2,7 @@ package common import ( "encoding/json" + "errors" "net" "github.com/pion/webrtc/v3" @@ -87,6 +88,52 @@ type SignalMsg struct { Payload string } +type ICECandidate struct { + statsID string + Foundation string `json:"foundation"` + Priority uint32 `json:"priority"` + Address string `json:"address"` + Protocol webrtc.ICEProtocol `json:"protocol"` + Port uint16 `json:"port"` + Typ int `json:"type"` + Component uint16 `json:"component"` + RelatedAddress string `json:"relatedAddress"` + RelatedPort uint16 `json:"relatedPort"` + TCPType string `json:"tcpType"` +} + +// we did an upgrade of pion/webrtc from 3.2.6 to 3.3.4, however marshaling and unmarshaling goes hand in hand +// and this broke the decoding, because some clients/consumers out there were still on 3.2.6 before pion/webrtc implemented +// encoding.TextMarshaler and encoding.TextUnmarshaler interfaces on ICECandidateType. This method will be a fallback to help unmarshal +// older messages sent by older clients +func fallBackIceCandidatesDecoder(raw []byte) ([]webrtc.ICECandidate, error) { + var candidates []ICECandidate + var webRTCCandidates []webrtc.ICECandidate + err := json.Unmarshal(raw, &candidates) + if err != nil { + return webRTCCandidates, err + } + + for _, c := range candidates { + new := webrtc.ICECandidate{ + Foundation: c.Foundation, + Priority: c.Priority, + Address: c.Address, + Protocol: c.Protocol, + Typ: webrtc.ICECandidateType(c.Typ), + Port: c.Port, + Component: c.Component, + RelatedAddress: c.RelatedAddress, + RelatedPort: c.RelatedPort, + TCPType: c.TCPType, + } + + webRTCCandidates = append(webRTCCandidates, new) + } + + return webRTCCandidates, nil +} + func DecodeSignalMsg(raw []byte) (string, interface{}, error) { var err error var msg SignalMsg @@ -109,7 +156,12 @@ func DecodeSignalMsg(raw []byte) (string, interface{}, error) { return msg.ReplyTo, answer, err case SignalMsgICE: var candidates []webrtc.ICECandidate + var unMarshalTypeErr *json.UnmarshalTypeError err := json.Unmarshal([]byte(msg.Payload), &candidates) + if errors.As(err, &unMarshalTypeErr) { + candidates, err = fallBackIceCandidatesDecoder([]byte(msg.Payload)) + return msg.ReplyTo, candidates, err + } return msg.ReplyTo, candidates, err } }