Enhance WebRTC signaling robustness

Increase HD handshake channel buffer and harden signaling flow: enlarge HandleLiveHDHandshake buffer from 10 to 100 and add a nil-check to drop and log requests when the channel is not initialized. Add publishSignalingMessageAsync to publish MQTT messages with timeout and error logging, and replace blocking Publish().Wait() calls for ICE candidates and SDP answers with the async publisher. Reintroduce the remote-candidate processor goroutine after remote description handling to avoid AddICECandidate races. These changes reduce blocking, improve error handling, and make WebRTC/MQTT signaling more resilient.
This commit is contained in:
Cédric Verstraeten
2026-03-09 20:05:00 +01:00
parent 4fbee60e9f
commit 4efc80fecb
3 changed files with 55 additions and 35 deletions
+1 -1
View File
@@ -303,7 +303,7 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
}
// Handle livestream HD (high resolution over WEBRTC)
communication.HandleLiveHDHandshake = make(chan models.RequestHDStreamPayload, 10)
communication.HandleLiveHDHandshake = make(chan models.RequestHDStreamPayload, 100)
if subStreamEnabled {
livestreamHDCursor := subQueue.Latest()
go cloud.HandleLiveStreamHD(livestreamHDCursor, configuration, communication, mqttClient, rtspSubClient)
+5 -3
View File
@@ -537,10 +537,12 @@ func HandleRequestHDStream(mqttClient mqtt.Client, hubKey string, payload models
if communication.CameraConnected {
// Set the Hub key, so we can send back the answer.
requestHDStreamPayload.HubKey = hubKey
select {
case communication.HandleLiveHDHandshake <- requestHDStreamPayload:
default:
if communication.HandleLiveHDHandshake == nil {
log.Log.Error("routers.mqtt.main.HandleRequestHDStream(): handshake channel is nil, dropping request")
return
}
communication.HandleLiveHDHandshake <- requestHDStreamPayload
log.Log.Info("routers.mqtt.main.HandleRequestHDStream(): received request to setup webrtc.")
} else {
log.Log.Info("routers.mqtt.main.HandleRequestHDStream(): received request to setup webrtc, but camera is not connected.")
+49 -31
View File
@@ -252,6 +252,24 @@ func RegisterDefaultInterceptors(mediaEngine *pionWebRTC.MediaEngine, intercepto
return nil
}
func publishSignalingMessageAsync(mqttClient mqtt.Client, topic string, payload []byte, description string) {
if mqttClient == nil {
log.Log.Error("webrtc.main.publishSignalingMessageAsync(): mqtt client is nil for " + description)
return
}
token := mqttClient.Publish(topic, 2, false, payload)
go func() {
if !token.WaitTimeout(5 * time.Second) {
log.Log.Warning("webrtc.main.publishSignalingMessageAsync(): timed out publishing " + description)
return
}
if err := token.Error(); err != nil {
log.Log.Error("webrtc.main.publishSignalingMessageAsync(): failed publishing " + description + ": " + err.Error())
}
}()
}
func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoBroadcaster *TrackBroadcaster, audioBroadcaster *TrackBroadcaster, handshake models.RequestHDStreamPayload) {
config := configuration.Config
@@ -483,33 +501,6 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
}
})
go func() {
defer func() {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): candidate processor stopped for session: " + handshake.SessionID)
}()
// Iterate over the candidates and send them to the remote client
for {
select {
case <-ctx.Done():
return
case candidate, ok := <-candidateChannel:
if !ok {
return
}
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Received candidate from channel: " + candidate)
candidateInit, decodeErr := decodeICECandidate(candidate)
if decodeErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error decoding candidate: " + decodeErr.Error())
continue
}
if candidateErr := peerConnection.AddICECandidate(candidateInit); candidateErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding candidate: " + candidateErr.Error())
}
}
}
}()
// When an ICE candidate is available send to the other peer using the signaling server (MQTT).
// The other peer will add this candidate by calling AddICECandidate.
// This handler must be registered before setting the local description, otherwise early candidates can be missed.
@@ -573,8 +564,7 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
}
payload, err := models.PackageMQTTMessage(configuration, message)
if err == nil {
token := mqttClient.Publish("kerberos/hub/"+hubKey, 2, false, payload)
token.Wait()
publishSignalingMessageAsync(mqttClient, "kerberos/hub/"+hubKey, payload, "ICE candidate for session "+handshake.SessionID)
} else {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): while packaging mqtt message: " + err.Error())
}
@@ -587,6 +577,35 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
return
}
go func() {
defer func() {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): candidate processor stopped for session: " + handshake.SessionID)
}()
// Process remote candidates only after the remote description is set.
// MQTT can deliver candidates before the SDP offer handling completes,
// and Pion rejects AddICECandidate calls until SetRemoteDescription succeeds.
for {
select {
case <-ctx.Done():
return
case candidate, ok := <-candidateChannel:
if !ok {
return
}
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Received candidate from channel: " + candidate)
candidateInit, decodeErr := decodeICECandidate(candidate)
if decodeErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error decoding candidate: " + decodeErr.Error())
continue
}
if candidateErr := peerConnection.AddICECandidate(candidateInit); candidateErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding candidate: " + candidateErr.Error())
}
}
}
}()
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while creating answer: " + err.Error())
@@ -617,8 +636,7 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
}
payload, err := models.PackageMQTTMessage(configuration, message)
if err == nil {
token := mqttClient.Publish("kerberos/hub/"+hubKey, 2, false, payload)
token.Wait()
publishSignalingMessageAsync(mqttClient, "kerberos/hub/"+hubKey, payload, "SDP answer for session "+handshake.SessionID)
} else {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): while packaging mqtt message: " + err.Error())
}