diff --git a/machinery/src/routers/mqtt/main.go b/machinery/src/routers/mqtt/main.go index 5b628e4..75a71b8 100644 --- a/machinery/src/routers/mqtt/main.go +++ b/machinery/src/routers/mqtt/main.go @@ -92,8 +92,22 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration, // properly. More information here: github.com/eclipse/paho.mqtt.golang. opts.SetCleanSession(true) opts.SetConnectRetry(true) - //opts.SetAutoReconnect(true) + opts.SetAutoReconnect(true) + opts.SetConnectRetryInterval(5 * time.Second) + opts.SetKeepAlive(30 * time.Second) + opts.SetPingTimeout(10 * time.Second) + opts.SetOrderMatters(false) opts.SetConnectTimeout(30 * time.Second) + opts.SetConnectionLostHandler(func(client mqtt.Client, err error) { + if err != nil { + log.Log.Error("routers.mqtt.main.ConfigureMQTT(): MQTT connection lost: " + err.Error()) + } else { + log.Log.Error("routers.mqtt.main.ConfigureMQTT(): MQTT connection lost") + } + }) + opts.SetReconnectingHandler(func(client mqtt.Client, options *mqtt.ClientOptions) { + log.Log.Warning("routers.mqtt.main.ConfigureMQTT(): reconnecting to MQTT broker") + }) hubKey := "" // This is the old way ;) @@ -133,10 +147,12 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration, } } mqc := mqtt.NewClient(opts) - if token := mqc.Connect(); token.WaitTimeout(3 * time.Second) { + if token := mqc.Connect(); token.WaitTimeout(30 * time.Second) { if token.Error() != nil { log.Log.Error("routers.mqtt.main.ConfigureMQTT(): unable to establish mqtt broker connection, error was: " + token.Error().Error()) } + } else { + log.Log.Error("routers.mqtt.main.ConfigureMQTT(): timed out while establishing mqtt broker connection") } return mqc } @@ -149,7 +165,7 @@ func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configDirectory log.Log.Info("routers.mqtt.main.MQTTListenerHandler(): no hub key provided, not subscribing to kerberos/hub/{hubkey}") } else { agentListener := fmt.Sprintf("kerberos/agent/%s", hubKey) - mqttClient.Subscribe(agentListener, 1, func(c mqtt.Client, msg mqtt.Message) { + token := mqttClient.Subscribe(agentListener, 1, func(c mqtt.Client, msg mqtt.Message) { // Decode the message, we are expecting following format. // { @@ -276,6 +292,16 @@ func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configDirectory } }) + + if token.WaitTimeout(10 * time.Second) { + if token.Error() != nil { + log.Log.Error("routers.mqtt.main.MQTTListenerHandler(): failed to subscribe to " + agentListener + ": " + token.Error().Error()) + } else { + log.Log.Info("routers.mqtt.main.MQTTListenerHandler(): subscribed to " + agentListener) + } + } else { + log.Log.Error("routers.mqtt.main.MQTTListenerHandler(): timed out while subscribing to " + agentListener) + } } } diff --git a/machinery/src/webrtc/main.go b/machinery/src/webrtc/main.go index 7424c03..4764964 100644 --- a/machinery/src/webrtc/main.go +++ b/machinery/src/webrtc/main.go @@ -50,6 +50,7 @@ type peerConnectionWrapper struct { cancelCtx context.CancelFunc done chan struct{} closeOnce sync.Once + connected atomic.Bool } var globalConnectionManager = NewConnectionManager() @@ -122,6 +123,26 @@ func (cm *ConnectionManager) DecrementPeerCount() int64 { return atomic.AddInt64(&cm.peerConnectionCount, -1) } +func cleanupPeerConnection(sessionID string, sessionKey string, wrapper *peerConnectionWrapper) { + wrapper.closeOnce.Do(func() { + if wrapper.connected.Swap(false) { + count := globalConnectionManager.DecrementPeerCount() + log.Log.Info("webrtc.main.cleanupPeerConnection(): Peer disconnected. Active peers: " + strconv.FormatInt(count, 10)) + } + + globalConnectionManager.CloseCandidateChannel(sessionKey) + + if wrapper.conn != nil { + if err := wrapper.conn.Close(); err != nil { + log.Log.Error("webrtc.main.cleanupPeerConnection(): error closing peer connection: " + err.Error()) + } + } + + globalConnectionManager.RemovePeerConnection(sessionID) + close(wrapper.done) + }) +} + type WebRTC struct { Name string StunServers []string @@ -273,7 +294,7 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati if videoTrack != nil { if videoSender, err = peerConnection.AddTrack(videoTrack); err != nil { log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding video track: " + err.Error()) - cancel() + cleanupPeerConnection(handshake.SessionID, sessionKey, wrapper) return } } else { @@ -306,7 +327,7 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati if audioTrack != nil { if audioSender, err = peerConnection.AddTrack(audioTrack); err != nil { log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding audio track: " + err.Error()) - cancel() + cleanupPeerConnection(handshake.SessionID, sessionKey, wrapper) return } } else { @@ -339,28 +360,14 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection state changed to: " + connectionState.String()) switch connectionState { - case pionWebRTC.PeerConnectionStateDisconnected, pionWebRTC.PeerConnectionStateClosed: - wrapper.closeOnce.Do(func() { - count := globalConnectionManager.DecrementPeerCount() - log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Peer disconnected. Active peers: " + string(rune(count))) - - // Clean up resources - globalConnectionManager.CloseCandidateChannel(sessionKey) - - if err := peerConnection.Close(); err != nil { - log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error closing peer connection: " + err.Error()) - } - - globalConnectionManager.RemovePeerConnection(handshake.SessionID) - close(wrapper.done) - }) + case pionWebRTC.PeerConnectionStateDisconnected, pionWebRTC.PeerConnectionStateClosed, pionWebRTC.PeerConnectionStateFailed: + cleanupPeerConnection(handshake.SessionID, sessionKey, wrapper) case pionWebRTC.PeerConnectionStateConnected: - count := globalConnectionManager.IncrementPeerCount() - log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Peer connected. Active peers: " + string(rune(count))) - - case pionWebRTC.PeerConnectionStateFailed: - log.Log.Info("webrtc.main.InitializeWebRTCConnection(): ICE connection failed") + if wrapper.connected.CompareAndSwap(false, true) { + count := globalConnectionManager.IncrementPeerCount() + log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Peer connected. Active peers: " + strconv.FormatInt(count, 10)) + } } }) @@ -389,13 +396,19 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati offer := w.CreateOffer(sd) if err = peerConnection.SetRemoteDescription(offer); err != nil { log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while setting remote description: " + err.Error()) + cleanupPeerConnection(handshake.SessionID, sessionKey, wrapper) + return } answer, err := peerConnection.CreateAnswer(nil) if err != nil { log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while creating answer: " + err.Error()) + cleanupPeerConnection(handshake.SessionID, sessionKey, wrapper) + return } else if err = peerConnection.SetLocalDescription(answer); err != nil { log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while setting local description: " + err.Error()) + cleanupPeerConnection(handshake.SessionID, sessionKey, wrapper) + return } // When an ICE candidate is available send to the other peer using the signaling server (MQTT). @@ -472,31 +485,30 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati // Store peer connection in manager globalConnectionManager.AddPeerConnection(handshake.SessionID, wrapper) - if err == nil { - // Create a config map - valueMap := make(map[string]interface{}) - valueMap["sdp"] = []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP))) - valueMap["session_id"] = handshake.SessionID - log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Send SDP answer") + // Create a config map + valueMap := make(map[string]interface{}) + valueMap["sdp"] = []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP))) + valueMap["session_id"] = handshake.SessionID + log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Send SDP answer") - // We'll send the candidate to the hub - message := models.Message{ - Payload: models.Payload{ - Action: "receive-hd-answer", - DeviceId: configuration.Config.Key, - Value: valueMap, - }, - } - payload, err := models.PackageMQTTMessage(configuration, message) - if err == nil { - token := mqttClient.Publish("kerberos/hub/"+hubKey, 2, false, payload) - token.Wait() - } else { - log.Log.Info("webrtc.main.InitializeWebRTCConnection(): while packaging mqtt message: " + err.Error()) - } + // We'll send the candidate to the hub + message := models.Message{ + Payload: models.Payload{ + Action: "receive-hd-answer", + DeviceId: configuration.Config.Key, + Value: valueMap, + }, + } + payload, err := models.PackageMQTTMessage(configuration, message) + if err == nil { + token := mqttClient.Publish("kerberos/hub/"+hubKey, 2, false, payload) + token.Wait() + } else { + log.Log.Info("webrtc.main.InitializeWebRTCConnection(): while packaging mqtt message: " + err.Error()) } } } else { + globalConnectionManager.CloseCandidateChannel(sessionKey) log.Log.Error("Initializwebrtc.main.InitializeWebRTCConnection()eWebRTCConnection: NewPeerConnection failed: " + err.Error()) } } diff --git a/ui/src/components/ImageCanvas/ImageCanvas.jsx b/ui/src/components/ImageCanvas/ImageCanvas.jsx index a871388..58f7803 100644 --- a/ui/src/components/ImageCanvas/ImageCanvas.jsx +++ b/ui/src/components/ImageCanvas/ImageCanvas.jsx @@ -7,6 +7,7 @@ import './ImageCanvas.css'; class ImageCanvas extends React.Component { componentDidMount() { + this.isUnmounted = false; this.width = 0; this.height = 0; @@ -58,6 +59,9 @@ class ImageCanvas extends React.Component { const { image } = this.props; this.loadImage(image, (img) => { + if (this.isUnmounted || !this.editor) { + return; + } if (this.width !== img.width || this.height !== img.height) { this.width = img.width; this.height = img.height; @@ -71,6 +75,9 @@ class ImageCanvas extends React.Component { componentDidUpdate() { const { image } = this.props; this.loadImage(image, (img) => { + if (this.isUnmounted || !this.editor) { + return; + } if (this.width !== img.width || this.height !== img.height) { this.width = img.width; this.height = img.height; @@ -82,11 +89,57 @@ class ImageCanvas extends React.Component { }); } + componentWillUnmount() { + this.isUnmounted = true; + + if (this.pendingImage) { + this.pendingImage.onload = null; + this.pendingImage.src = ''; + this.pendingImage = null; + } + + if (this.editor) { + this.editor.onSelectionEnd = null; + this.editor.onRegionMoveEnd = null; + this.editor.onRegionDelete = null; + + if (this.editor.RM) { + this.editor.RM.deleteAllRegions(); + } + + if (typeof this.editor.dispose === 'function') { + this.editor.dispose(); + } else if (typeof this.editor.destroy === 'function') { + this.editor.destroy(); + } + + this.editor = null; + } + + if (this.toolbarContainer) { + this.toolbarContainer.innerHTML = ''; + this.toolbarContainer = null; + } + + if (this.editorContainer) { + this.editorContainer.innerHTML = ''; + this.editorContainer = null; + } + } + loadData = (image) => { + if (!this.editor) { + return; + } + const w = image.width; const h = image.height; this.editor.addContentSource(image).then(() => { + if (this.isUnmounted || !this.editor) { + return; + } + // Add exisiting polygons this.editor.RM.deleteAllRegions(); const { polygons } = this.props; @@ -152,11 +205,19 @@ class ImageCanvas extends React.Component { // eslint-disable-next-line class-methods-use-this loadImage = (path, onready) => { + if (this.pendingImage) { + this.pendingImage.onload = null; + } + const image = new Image(); - image.src = path; - image.addEventListener('load', (e) => { + this.pendingImage = image; + image.onload = (e) => { + if (this.pendingImage === image) { + this.pendingImage = null; + } onready(e.target); - }); + }; + image.src = path; }; // eslint-disable-next-line class-methods-use-this diff --git a/ui/src/pages/Dashboard/Dashboard.jsx b/ui/src/pages/Dashboard/Dashboard.jsx index 1100aa2..b6cbbad 100644 --- a/ui/src/pages/Dashboard/Dashboard.jsx +++ b/ui/src/pages/Dashboard/Dashboard.jsx @@ -38,16 +38,20 @@ class Dashboard extends React.Component { initialised: false, }; this.initialiseLiveview = this.initialiseLiveview.bind(this); + this.handleLiveviewLoad = this.handleLiveviewLoad.bind(this); + } + + handleLiveviewLoad() { + this.setState({ + liveviewLoaded: true, + }); } componentDidMount() { const liveview = document.getElementsByClassName('videocard-video'); if (liveview && liveview.length > 0) { - liveview[0].addEventListener('load', () => { - this.setState({ - liveviewLoaded: true, - }); - }); + this.liveviewElement = liveview[0]; + this.liveviewElement.addEventListener('load', this.handleLiveviewLoad); } this.initialiseLiveview(); } @@ -57,13 +61,14 @@ class Dashboard extends React.Component { } componentWillUnmount() { - const liveview = document.getElementsByClassName('videocard-video'); - if (liveview && liveview.length > 0) { - liveview[0].remove(); + if (this.liveviewElement) { + this.liveviewElement.removeEventListener('load', this.handleLiveviewLoad); + this.liveviewElement = null; } if (this.requestStreamSubscription) { this.requestStreamSubscription.unsubscribe(); + this.requestStreamSubscription = null; } const { dispatchSend } = this.props; const message = { diff --git a/ui/src/pages/Settings/Settings.jsx b/ui/src/pages/Settings/Settings.jsx index 65f1803..fceb4f6 100644 --- a/ui/src/pages/Settings/Settings.jsx +++ b/ui/src/pages/Settings/Settings.jsx @@ -159,7 +159,10 @@ class Settings extends React.Component { componentWillUnmount() { document.removeEventListener('keydown', this.escFunction, false); - clearInterval(this.interval); + if (this.requestStreamSubscription) { + this.requestStreamSubscription.unsubscribe(); + this.requestStreamSubscription = null; + } const { dispatchSend } = this.props; const message = {