Enhance MQTT reconnection handling and improve WebRTC connection cleanup

- Enable automatic reconnection for MQTT with configurable intervals and timeouts.
- Add logging for connection loss and reconnection attempts.
- Refactor WebRTC connection cleanup to ensure proper resource management on disconnection.
- Improve event handling in ImageCanvas and Dashboard components for better performance and reliability.
This commit is contained in:
Cédric Verstraeten
2026-03-09 11:04:10 +00:00
parent ccf4034cc8
commit dbcf4e242c
5 changed files with 165 additions and 58 deletions
+29 -3
View File
@@ -92,8 +92,22 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration,
// properly. More information here: github.com/eclipse/paho.mqtt.golang.
opts.SetCleanSession(true)
opts.SetConnectRetry(true)
//opts.SetAutoReconnect(true)
opts.SetAutoReconnect(true)
opts.SetConnectRetryInterval(5 * time.Second)
opts.SetKeepAlive(30 * time.Second)
opts.SetPingTimeout(10 * time.Second)
opts.SetOrderMatters(false)
opts.SetConnectTimeout(30 * time.Second)
opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
if err != nil {
log.Log.Error("routers.mqtt.main.ConfigureMQTT(): MQTT connection lost: " + err.Error())
} else {
log.Log.Error("routers.mqtt.main.ConfigureMQTT(): MQTT connection lost")
}
})
opts.SetReconnectingHandler(func(client mqtt.Client, options *mqtt.ClientOptions) {
log.Log.Warning("routers.mqtt.main.ConfigureMQTT(): reconnecting to MQTT broker")
})
hubKey := ""
// This is the old way ;)
@@ -133,10 +147,12 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration,
}
}
mqc := mqtt.NewClient(opts)
if token := mqc.Connect(); token.WaitTimeout(3 * time.Second) {
if token := mqc.Connect(); token.WaitTimeout(30 * time.Second) {
if token.Error() != nil {
log.Log.Error("routers.mqtt.main.ConfigureMQTT(): unable to establish mqtt broker connection, error was: " + token.Error().Error())
}
} else {
log.Log.Error("routers.mqtt.main.ConfigureMQTT(): timed out while establishing mqtt broker connection")
}
return mqc
}
@@ -149,7 +165,7 @@ func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configDirectory
log.Log.Info("routers.mqtt.main.MQTTListenerHandler(): no hub key provided, not subscribing to kerberos/hub/{hubkey}")
} else {
agentListener := fmt.Sprintf("kerberos/agent/%s", hubKey)
mqttClient.Subscribe(agentListener, 1, func(c mqtt.Client, msg mqtt.Message) {
token := mqttClient.Subscribe(agentListener, 1, func(c mqtt.Client, msg mqtt.Message) {
// Decode the message, we are expecting following format.
// {
@@ -276,6 +292,16 @@ func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configDirectory
}
})
if token.WaitTimeout(10 * time.Second) {
if token.Error() != nil {
log.Log.Error("routers.mqtt.main.MQTTListenerHandler(): failed to subscribe to " + agentListener + ": " + token.Error().Error())
} else {
log.Log.Info("routers.mqtt.main.MQTTListenerHandler(): subscribed to " + agentListener)
}
} else {
log.Log.Error("routers.mqtt.main.MQTTListenerHandler(): timed out while subscribing to " + agentListener)
}
}
}
+55 -43
View File
@@ -50,6 +50,7 @@ type peerConnectionWrapper struct {
cancelCtx context.CancelFunc
done chan struct{}
closeOnce sync.Once
connected atomic.Bool
}
var globalConnectionManager = NewConnectionManager()
@@ -122,6 +123,26 @@ func (cm *ConnectionManager) DecrementPeerCount() int64 {
return atomic.AddInt64(&cm.peerConnectionCount, -1)
}
func cleanupPeerConnection(sessionID string, sessionKey string, wrapper *peerConnectionWrapper) {
wrapper.closeOnce.Do(func() {
if wrapper.connected.Swap(false) {
count := globalConnectionManager.DecrementPeerCount()
log.Log.Info("webrtc.main.cleanupPeerConnection(): Peer disconnected. Active peers: " + strconv.FormatInt(count, 10))
}
globalConnectionManager.CloseCandidateChannel(sessionKey)
if wrapper.conn != nil {
if err := wrapper.conn.Close(); err != nil {
log.Log.Error("webrtc.main.cleanupPeerConnection(): error closing peer connection: " + err.Error())
}
}
globalConnectionManager.RemovePeerConnection(sessionID)
close(wrapper.done)
})
}
type WebRTC struct {
Name string
StunServers []string
@@ -273,7 +294,7 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
if videoTrack != nil {
if videoSender, err = peerConnection.AddTrack(videoTrack); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding video track: " + err.Error())
cancel()
cleanupPeerConnection(handshake.SessionID, sessionKey, wrapper)
return
}
} else {
@@ -306,7 +327,7 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
if audioTrack != nil {
if audioSender, err = peerConnection.AddTrack(audioTrack); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding audio track: " + err.Error())
cancel()
cleanupPeerConnection(handshake.SessionID, sessionKey, wrapper)
return
}
} else {
@@ -339,28 +360,14 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection state changed to: " + connectionState.String())
switch connectionState {
case pionWebRTC.PeerConnectionStateDisconnected, pionWebRTC.PeerConnectionStateClosed:
wrapper.closeOnce.Do(func() {
count := globalConnectionManager.DecrementPeerCount()
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Peer disconnected. Active peers: " + string(rune(count)))
// Clean up resources
globalConnectionManager.CloseCandidateChannel(sessionKey)
if err := peerConnection.Close(); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error closing peer connection: " + err.Error())
}
globalConnectionManager.RemovePeerConnection(handshake.SessionID)
close(wrapper.done)
})
case pionWebRTC.PeerConnectionStateDisconnected, pionWebRTC.PeerConnectionStateClosed, pionWebRTC.PeerConnectionStateFailed:
cleanupPeerConnection(handshake.SessionID, sessionKey, wrapper)
case pionWebRTC.PeerConnectionStateConnected:
count := globalConnectionManager.IncrementPeerCount()
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Peer connected. Active peers: " + string(rune(count)))
case pionWebRTC.PeerConnectionStateFailed:
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): ICE connection failed")
if wrapper.connected.CompareAndSwap(false, true) {
count := globalConnectionManager.IncrementPeerCount()
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Peer connected. Active peers: " + strconv.FormatInt(count, 10))
}
}
})
@@ -389,13 +396,19 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
offer := w.CreateOffer(sd)
if err = peerConnection.SetRemoteDescription(offer); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while setting remote description: " + err.Error())
cleanupPeerConnection(handshake.SessionID, sessionKey, wrapper)
return
}
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while creating answer: " + err.Error())
cleanupPeerConnection(handshake.SessionID, sessionKey, wrapper)
return
} else if err = peerConnection.SetLocalDescription(answer); err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while setting local description: " + err.Error())
cleanupPeerConnection(handshake.SessionID, sessionKey, wrapper)
return
}
// When an ICE candidate is available send to the other peer using the signaling server (MQTT).
@@ -472,31 +485,30 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
// Store peer connection in manager
globalConnectionManager.AddPeerConnection(handshake.SessionID, wrapper)
if err == nil {
// Create a config map
valueMap := make(map[string]interface{})
valueMap["sdp"] = []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP)))
valueMap["session_id"] = handshake.SessionID
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Send SDP answer")
// Create a config map
valueMap := make(map[string]interface{})
valueMap["sdp"] = []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP)))
valueMap["session_id"] = handshake.SessionID
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Send SDP answer")
// We'll send the candidate to the hub
message := models.Message{
Payload: models.Payload{
Action: "receive-hd-answer",
DeviceId: configuration.Config.Key,
Value: valueMap,
},
}
payload, err := models.PackageMQTTMessage(configuration, message)
if err == nil {
token := mqttClient.Publish("kerberos/hub/"+hubKey, 2, false, payload)
token.Wait()
} else {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): while packaging mqtt message: " + err.Error())
}
// We'll send the candidate to the hub
message := models.Message{
Payload: models.Payload{
Action: "receive-hd-answer",
DeviceId: configuration.Config.Key,
Value: valueMap,
},
}
payload, err := models.PackageMQTTMessage(configuration, message)
if err == nil {
token := mqttClient.Publish("kerberos/hub/"+hubKey, 2, false, payload)
token.Wait()
} else {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): while packaging mqtt message: " + err.Error())
}
}
} else {
globalConnectionManager.CloseCandidateChannel(sessionKey)
log.Log.Error("Initializwebrtc.main.InitializeWebRTCConnection()eWebRTCConnection: NewPeerConnection failed: " + err.Error())
}
}
+64 -3
View File
@@ -7,6 +7,7 @@ import './ImageCanvas.css';
class ImageCanvas extends React.Component {
componentDidMount() {
this.isUnmounted = false;
this.width = 0;
this.height = 0;
@@ -58,6 +59,9 @@ class ImageCanvas extends React.Component {
const { image } = this.props;
this.loadImage(image, (img) => {
if (this.isUnmounted || !this.editor) {
return;
}
if (this.width !== img.width || this.height !== img.height) {
this.width = img.width;
this.height = img.height;
@@ -71,6 +75,9 @@ class ImageCanvas extends React.Component {
componentDidUpdate() {
const { image } = this.props;
this.loadImage(image, (img) => {
if (this.isUnmounted || !this.editor) {
return;
}
if (this.width !== img.width || this.height !== img.height) {
this.width = img.width;
this.height = img.height;
@@ -82,11 +89,57 @@ class ImageCanvas extends React.Component {
});
}
componentWillUnmount() {
this.isUnmounted = true;
if (this.pendingImage) {
this.pendingImage.onload = null;
this.pendingImage.src = '';
this.pendingImage = null;
}
if (this.editor) {
this.editor.onSelectionEnd = null;
this.editor.onRegionMoveEnd = null;
this.editor.onRegionDelete = null;
if (this.editor.RM) {
this.editor.RM.deleteAllRegions();
}
if (typeof this.editor.dispose === 'function') {
this.editor.dispose();
} else if (typeof this.editor.destroy === 'function') {
this.editor.destroy();
}
this.editor = null;
}
if (this.toolbarContainer) {
this.toolbarContainer.innerHTML = '';
this.toolbarContainer = null;
}
if (this.editorContainer) {
this.editorContainer.innerHTML = '';
this.editorContainer = null;
}
}
loadData = (image) => {
if (!this.editor) {
return;
}
const w = image.width;
const h = image.height;
this.editor.addContentSource(image).then(() => {
if (this.isUnmounted || !this.editor) {
return;
}
// Add exisiting polygons
this.editor.RM.deleteAllRegions();
const { polygons } = this.props;
@@ -152,11 +205,19 @@ class ImageCanvas extends React.Component {
// eslint-disable-next-line class-methods-use-this
loadImage = (path, onready) => {
if (this.pendingImage) {
this.pendingImage.onload = null;
}
const image = new Image();
image.src = path;
image.addEventListener('load', (e) => {
this.pendingImage = image;
image.onload = (e) => {
if (this.pendingImage === image) {
this.pendingImage = null;
}
onready(e.target);
});
};
image.src = path;
};
// eslint-disable-next-line class-methods-use-this
+13 -8
View File
@@ -38,16 +38,20 @@ class Dashboard extends React.Component {
initialised: false,
};
this.initialiseLiveview = this.initialiseLiveview.bind(this);
this.handleLiveviewLoad = this.handleLiveviewLoad.bind(this);
}
handleLiveviewLoad() {
this.setState({
liveviewLoaded: true,
});
}
componentDidMount() {
const liveview = document.getElementsByClassName('videocard-video');
if (liveview && liveview.length > 0) {
liveview[0].addEventListener('load', () => {
this.setState({
liveviewLoaded: true,
});
});
this.liveviewElement = liveview[0];
this.liveviewElement.addEventListener('load', this.handleLiveviewLoad);
}
this.initialiseLiveview();
}
@@ -57,13 +61,14 @@ class Dashboard extends React.Component {
}
componentWillUnmount() {
const liveview = document.getElementsByClassName('videocard-video');
if (liveview && liveview.length > 0) {
liveview[0].remove();
if (this.liveviewElement) {
this.liveviewElement.removeEventListener('load', this.handleLiveviewLoad);
this.liveviewElement = null;
}
if (this.requestStreamSubscription) {
this.requestStreamSubscription.unsubscribe();
this.requestStreamSubscription = null;
}
const { dispatchSend } = this.props;
const message = {
+4 -1
View File
@@ -159,7 +159,10 @@ class Settings extends React.Component {
componentWillUnmount() {
document.removeEventListener('keydown', this.escFunction, false);
clearInterval(this.interval);
if (this.requestStreamSubscription) {
this.requestStreamSubscription.unsubscribe();
this.requestStreamSubscription = null;
}
const { dispatchSend } = this.props;
const message = {