diff --git a/machinery/src/components/Kerberos.go b/machinery/src/components/Kerberos.go index a8d115c..6958a62 100644 --- a/machinery/src/components/Kerberos.go +++ b/machinery/src/components/Kerberos.go @@ -21,6 +21,7 @@ import ( "github.com/kerberos-io/agent/machinery/src/packets" routers "github.com/kerberos-io/agent/machinery/src/routers/mqtt" "github.com/kerberos-io/agent/machinery/src/utils" + "github.com/kerberos-io/agent/machinery/src/webrtc" "github.com/tevino/abool" ) @@ -303,7 +304,7 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu } // Handle livestream HD (high resolution over WEBRTC) - communication.HandleLiveHDHandshake = make(chan models.RequestHDStreamPayload, 10) + communication.HandleLiveHDHandshake = make(chan models.LiveHDHandshake, 100) if subStreamEnabled { livestreamHDCursor := subQueue.Latest() go cloud.HandleLiveStreamHD(livestreamHDCursor, configuration, communication, mqttClient, rtspSubClient) @@ -552,6 +553,11 @@ func GetDashboard(c *gin.Context, configDirectory string, configuration *models. // The total number of recordings stored in the directory. recordingDirectory := configDirectory + "/data/recordings" numberOfRecordings := utils.NumberOfMP4sInDirectory(recordingDirectory) + activeWebRTCReaders := webrtc.GetActivePeerConnectionCount() + pendingWebRTCHandshakes := 0 + if communication.HandleLiveHDHandshake != nil { + pendingWebRTCHandshakes = len(communication.HandleLiveHDHandshake) + } // All days stored in this agent. days := []string{} @@ -574,6 +580,8 @@ func GetDashboard(c *gin.Context, configDirectory string, configuration *models. "cameraOnline": cameraIsOnline, "cloudOnline": cloudIsOnline, "numberOfRecordings": numberOfRecordings, + "webrtcReaders": activeWebRTCReaders, + "webrtcPending": pendingWebRTCHandshakes, "days": days, "latestEvents": latestEvents, }) diff --git a/machinery/src/models/Communication.go b/machinery/src/models/Communication.go index 262a86f..afb7b25 100644 --- a/machinery/src/models/Communication.go +++ b/machinery/src/models/Communication.go @@ -8,6 +8,17 @@ import ( "github.com/tevino/abool" ) +type LiveHDSignalingCallbacks struct { + SendAnswer func(sessionID string, sdp string) error + SendCandidate func(sessionID string, candidate string) error + SendError func(sessionID string, message string) error +} + +type LiveHDHandshake struct { + Payload RequestHDStreamPayload + Signaling *LiveHDSignalingCallbacks +} + // The communication struct that is managing // all the communication between the different goroutines. type Communication struct { @@ -27,7 +38,7 @@ type Communication struct { HandleHeartBeat chan string HandleLiveSD chan int64 HandleLiveHDKeepalive chan string - HandleLiveHDHandshake chan RequestHDStreamPayload + HandleLiveHDHandshake chan LiveHDHandshake HandleLiveHDPeers chan string HandleONVIF chan OnvifAction IsConfiguring *abool.AtomicBool diff --git a/machinery/src/routers/mqtt/main.go b/machinery/src/routers/mqtt/main.go index 14dc782..ea07c8f 100644 --- a/machinery/src/routers/mqtt/main.go +++ b/machinery/src/routers/mqtt/main.go @@ -90,9 +90,10 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration, // Some extra options to make sure the connection behaves // properly. More information here: github.com/eclipse/paho.mqtt.golang. - opts.SetCleanSession(true) - //opts.SetResumeSubs(true) - //opts.SetStore(mqtt.NewMemoryStore()) + //opts.SetCleanSession(true) + opts.SetCleanSession(false) + opts.SetResumeSubs(true) + opts.SetStore(mqtt.NewMemoryStore()) opts.SetConnectRetry(true) opts.SetAutoReconnect(true) opts.SetConnectRetryInterval(5 * time.Second) @@ -537,9 +538,13 @@ func HandleRequestHDStream(mqttClient mqtt.Client, hubKey string, payload models if communication.CameraConnected { // Set the Hub key, so we can send back the answer. requestHDStreamPayload.HubKey = hubKey - select { - case communication.HandleLiveHDHandshake <- requestHDStreamPayload: - default: + if communication.HandleLiveHDHandshake == nil { + log.Log.Error("routers.mqtt.main.HandleRequestHDStream(): handshake channel is nil, dropping request") + return + } + + communication.HandleLiveHDHandshake <- models.LiveHDHandshake{ + Payload: requestHDStreamPayload, } log.Log.Info("routers.mqtt.main.HandleRequestHDStream(): received request to setup webrtc.") } else { diff --git a/machinery/src/routers/websocket/main.go b/machinery/src/routers/websocket/main.go index 3541b7a..0ae2e4a 100644 --- a/machinery/src/routers/websocket/main.go +++ b/machinery/src/routers/websocket/main.go @@ -6,6 +6,7 @@ import ( "image" "net/http" "sync" + "time" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" @@ -14,6 +15,7 @@ import ( "github.com/kerberos-io/agent/machinery/src/models" "github.com/kerberos-io/agent/machinery/src/packets" "github.com/kerberos-io/agent/machinery/src/utils" + "github.com/kerberos-io/agent/machinery/src/webrtc" ) type Message struct { @@ -28,6 +30,23 @@ type Connection struct { Cancels map[string]context.CancelFunc } +func writeWebRTCError(connection *Connection, clientID string, sessionID string, errorMessage string) { + if connection == nil { + return + } + + if err := connection.WriteJson(Message{ + ClientID: clientID, + MessageType: "webrtc-error", + Message: map[string]string{ + "session_id": sessionID, + "message": errorMessage, + }, + }); err != nil { + log.Log.Error("routers.websocket.main.writeWebRTCError(): " + err.Error()) + } +} + // Concurrency handling - sending messages func (c *Connection) WriteJson(message Message) error { c.mu.Lock() @@ -115,6 +134,82 @@ func WebsocketHandler(c *gin.Context, configuration *models.Configuration, commu go ForwardSDStream(ctx, clientID, sockets[clientID], configuration, communication, captureDevice) } } + + case "stream-hd": + sessionID := message.Message["session_id"] + sessionDescription := message.Message["sdp"] + + if sessionID == "" || sessionDescription == "" { + writeWebRTCError(sockets[clientID], clientID, sessionID, "missing session_id or sdp") + break + } + + if !communication.CameraConnected { + writeWebRTCError(sockets[clientID], clientID, sessionID, "camera is not connected") + break + } + + if communication.HandleLiveHDHandshake == nil { + writeWebRTCError(sockets[clientID], clientID, sessionID, "webrtc liveview is not available") + break + } + + handshake := models.LiveHDHandshake{ + Payload: models.RequestHDStreamPayload{ + Timestamp: time.Now().Unix(), + SessionID: sessionID, + SessionDescription: sessionDescription, + }, + Signaling: &models.LiveHDSignalingCallbacks{ + SendAnswer: func(callbackSessionID string, sdp string) error { + return sockets[clientID].WriteJson(Message{ + ClientID: clientID, + MessageType: "webrtc-answer", + Message: map[string]string{ + "session_id": callbackSessionID, + "sdp": sdp, + }, + }) + }, + SendCandidate: func(callbackSessionID string, candidate string) error { + return sockets[clientID].WriteJson(Message{ + ClientID: clientID, + MessageType: "webrtc-candidate", + Message: map[string]string{ + "session_id": callbackSessionID, + "candidate": candidate, + }, + }) + }, + SendError: func(callbackSessionID string, errorMessage string) error { + writeWebRTCError(sockets[clientID], clientID, callbackSessionID, errorMessage) + return nil + }, + }, + } + + communication.HandleLiveHDHandshake <- handshake + + case "webrtc-candidate": + sessionID := message.Message["session_id"] + candidate := message.Message["candidate"] + + if sessionID == "" || candidate == "" { + writeWebRTCError(sockets[clientID], clientID, sessionID, "missing session_id or candidate") + break + } + + if !communication.CameraConnected { + writeWebRTCError(sockets[clientID], clientID, sessionID, "camera is not connected") + break + } + + key := configuration.Config.Key + "/" + sessionID + go webrtc.RegisterCandidates(key, models.ReceiveHDCandidatesPayload{ + Timestamp: time.Now().Unix(), + SessionID: sessionID, + Candidate: candidate, + }) } err = conn.ReadJSON(&message) diff --git a/machinery/src/utils/main.go b/machinery/src/utils/main.go index 8044297..c746e0c 100644 --- a/machinery/src/utils/main.go +++ b/machinery/src/utils/main.go @@ -27,7 +27,8 @@ import ( // VERSION is the agent version. It defaults to "0.0.0" for local dev builds // and is overridden at build time via: -// go build -ldflags "-X github.com/kerberos-io/agent/machinery/src/utils.VERSION=v1.2.3" +// +// go build -ldflags "-X github.com/kerberos-io/agent/machinery/src/utils.VERSION=v1.2.3" var VERSION = "0.0.0" const letterBytes = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ" @@ -198,6 +199,13 @@ func GetMediaFormatted(files []os.FileInfo, recordingDirectory string, configura timestampInt, err := strconv.ParseInt(timestamp, 10, 64) if err == nil { + if eventFilter.TimestampOffsetStart > 0 { + // TimestampOffsetStart represents the newest lower bound to include. + if timestampInt < eventFilter.TimestampOffsetStart { + continue + } + } + // If we have an offset we will check if we should skip or not if eventFilter.TimestampOffsetEnd > 0 { // Medias are sorted from new to older. TimestampOffsetEnd holds the oldest diff --git a/machinery/src/utils/main_test.go b/machinery/src/utils/main_test.go new file mode 100644 index 0000000..9243c0a --- /dev/null +++ b/machinery/src/utils/main_test.go @@ -0,0 +1,54 @@ +package utils + +import ( + "os" + "testing" + "time" + + "github.com/kerberos-io/agent/machinery/src/models" +) + +type stubFileInfo struct { + name string +} + +func (s stubFileInfo) Name() string { return s.name } +func (s stubFileInfo) Size() int64 { return 0 } +func (s stubFileInfo) Mode() os.FileMode { return 0 } +func (s stubFileInfo) ModTime() time.Time { return time.Unix(0, 0) } +func (s stubFileInfo) IsDir() bool { return false } +func (s stubFileInfo) Sys() interface{} { return nil } + +func TestGetMediaFormattedHonorsTimestampRange(t *testing.T) { + configuration := &models.Configuration{} + configuration.Config.Timezone = "UTC" + configuration.Config.Name = "Front Door" + configuration.Config.Key = "camera-1" + + files := []os.FileInfo{ + stubFileInfo{name: "1700000200_6_7_8_9_10.mp4"}, + stubFileInfo{name: "1700000100_6_7_8_9_10.mp4"}, + stubFileInfo{name: "1700000000_6_7_8_9_10.mp4"}, + } + + media := GetMediaFormatted(files, "/tmp/recordings", configuration, models.EventFilter{ + TimestampOffsetStart: 1700000050, + TimestampOffsetEnd: 1700000200, + NumberOfElements: 10, + }) + + if len(media) != 1 { + t.Fatalf("expected 1 media item in time range, got %d", len(media)) + } + + if media[0].Timestamp != "1700000100" { + t.Fatalf("expected timestamp 1700000100, got %s", media[0].Timestamp) + } + + if media[0].CameraName != "Front Door" { + t.Fatalf("expected camera name to be preserved, got %s", media[0].CameraName) + } + if media[0].CameraKey != "camera-1" { + t.Fatalf("expected camera key to be preserved, got %s", media[0].CameraKey) + } +} diff --git a/machinery/src/webrtc/main.go b/machinery/src/webrtc/main.go index a9201df..ceceb91 100644 --- a/machinery/src/webrtc/main.go +++ b/machinery/src/webrtc/main.go @@ -140,6 +140,11 @@ func (cm *ConnectionManager) GetPeerConnectionCount() int64 { return atomic.LoadInt64(&cm.peerConnectionCount) } +// GetActivePeerConnectionCount returns the current number of connected WebRTC readers. +func GetActivePeerConnectionCount() int64 { + return globalConnectionManager.GetPeerConnectionCount() +} + // IncrementPeerCount atomically increments the peer connection count func (cm *ConnectionManager) IncrementPeerCount() int64 { return atomic.AddInt64(&cm.peerConnectionCount, 1) @@ -252,7 +257,79 @@ func RegisterDefaultInterceptors(mediaEngine *pionWebRTC.MediaEngine, intercepto return nil } -func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoBroadcaster *TrackBroadcaster, audioBroadcaster *TrackBroadcaster, handshake models.RequestHDStreamPayload) { +func publishSignalingMessageAsync(mqttClient mqtt.Client, topic string, payload []byte, description string) { + if mqttClient == nil { + log.Log.Error("webrtc.main.publishSignalingMessageAsync(): mqtt client is nil for " + description) + return + } + + token := mqttClient.Publish(topic, 2, false, payload) + go func() { + if !token.WaitTimeout(5 * time.Second) { + log.Log.Warning("webrtc.main.publishSignalingMessageAsync(): timed out publishing " + description) + return + } + if err := token.Error(); err != nil { + log.Log.Error("webrtc.main.publishSignalingMessageAsync(): failed publishing " + description + ": " + err.Error()) + } + }() +} + +func sendCandidateSignal(configuration *models.Configuration, mqttClient mqtt.Client, hubKey string, handshake models.LiveHDHandshake, candidateJSON []byte) { + if handshake.Signaling != nil && handshake.Signaling.SendCandidate != nil { + if err := handshake.Signaling.SendCandidate(handshake.Payload.SessionID, string(candidateJSON)); err != nil { + log.Log.Error("webrtc.main.sendCandidateSignal(): " + err.Error()) + } + return + } + + message := models.Message{ + Payload: models.Payload{ + Action: "receive-hd-candidates", + DeviceId: configuration.Config.Key, + Value: map[string]interface{}{ + "candidate": string(candidateJSON), + "session_id": handshake.Payload.SessionID, + }, + }, + } + payload, err := models.PackageMQTTMessage(configuration, message) + if err == nil { + publishSignalingMessageAsync(mqttClient, "kerberos/hub/"+hubKey, payload, "ICE candidate for session "+handshake.Payload.SessionID) + } else { + log.Log.Info("webrtc.main.sendCandidateSignal(): while packaging mqtt message: " + err.Error()) + } +} + +func sendAnswerSignal(configuration *models.Configuration, mqttClient mqtt.Client, hubKey string, handshake models.LiveHDHandshake, answer pionWebRTC.SessionDescription) { + encodedAnswer := base64.StdEncoding.EncodeToString([]byte(answer.SDP)) + + if handshake.Signaling != nil && handshake.Signaling.SendAnswer != nil { + if err := handshake.Signaling.SendAnswer(handshake.Payload.SessionID, encodedAnswer); err != nil { + log.Log.Error("webrtc.main.sendAnswerSignal(): " + err.Error()) + } + return + } + + message := models.Message{ + Payload: models.Payload{ + Action: "receive-hd-answer", + DeviceId: configuration.Config.Key, + Value: map[string]interface{}{ + "sdp": []byte(encodedAnswer), + "session_id": handshake.Payload.SessionID, + }, + }, + } + payload, err := models.PackageMQTTMessage(configuration, message) + if err == nil { + publishSignalingMessageAsync(mqttClient, "kerberos/hub/"+hubKey, payload, "SDP answer for session "+handshake.Payload.SessionID) + } else { + log.Log.Info("webrtc.main.sendAnswerSignal(): while packaging mqtt message: " + err.Error()) + } +} + +func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoBroadcaster *TrackBroadcaster, audioBroadcaster *TrackBroadcaster, handshake models.LiveHDHandshake) { config := configuration.Config deviceKey := config.Key @@ -260,14 +337,15 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati turnServers := []string{config.TURNURI} turnServersUsername := config.TURNUsername turnServersCredential := config.TURNPassword + handshakePayload := handshake.Payload // We create a channel which will hold the candidates for this session. - sessionKey := config.Key + "/" + handshake.SessionID + sessionKey := config.Key + "/" + handshakePayload.SessionID candidateChannel := globalConnectionManager.GetOrCreateCandidateChannel(sessionKey) // Set variables - hubKey := handshake.HubKey - sessionDescription := handshake.SessionDescription + hubKey := handshakePayload.HubKey + sessionDescription := handshakePayload.SessionDescription // Create WebRTC object w := CreateWebRTC(deviceKey, stunServers, turnServers, turnServersUsername, turnServersCredential) @@ -424,12 +502,12 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati // Log ICE connection state changes for diagnostics peerConnection.OnICEConnectionStateChange(func(iceState pionWebRTC.ICEConnectionState) { log.Log.Info("webrtc.main.InitializeWebRTCConnection(): ICE connection state changed to: " + iceState.String() + - " (session: " + handshake.SessionID + ")") + " (session: " + handshakePayload.SessionID + ")") }) peerConnection.OnConnectionStateChange(func(connectionState pionWebRTC.PeerConnectionState) { log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection state changed to: " + connectionState.String() + - " (session: " + handshake.SessionID + ")") + " (session: " + handshakePayload.SessionID + ")") switch connectionState { case pionWebRTC.PeerConnectionStateDisconnected: @@ -438,9 +516,9 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati wrapper.disconnectMu.Lock() if wrapper.disconnectTimer == nil { log.Log.Info("webrtc.main.InitializeWebRTCConnection(): peer disconnected, waiting " + - disconnectGracePeriod.String() + " for recovery (session: " + handshake.SessionID + ")") + disconnectGracePeriod.String() + " for recovery (session: " + handshakePayload.SessionID + ")") wrapper.disconnectTimer = time.AfterFunc(disconnectGracePeriod, func() { - log.Log.Info("webrtc.main.InitializeWebRTCConnection(): disconnect grace period expired, closing connection (session: " + handshake.SessionID + ")") + log.Log.Info("webrtc.main.InitializeWebRTCConnection(): disconnect grace period expired, closing connection (session: " + handshakePayload.SessionID + ")") cleanupPeerConnection(sessionKey, wrapper) }) } @@ -472,7 +550,7 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati if wrapper.disconnectTimer != nil { wrapper.disconnectTimer.Stop() wrapper.disconnectTimer = nil - log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection recovered from disconnected state (session: " + handshake.SessionID + ")") + log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection recovered from disconnected state (session: " + handshakePayload.SessionID + ")") } wrapper.disconnectMu.Unlock() @@ -483,33 +561,6 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati } }) - go func() { - defer func() { - log.Log.Info("webrtc.main.InitializeWebRTCConnection(): candidate processor stopped for session: " + handshake.SessionID) - }() - - // Iterate over the candidates and send them to the remote client - for { - select { - case <-ctx.Done(): - return - case candidate, ok := <-candidateChannel: - if !ok { - return - } - log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Received candidate from channel: " + candidate) - candidateInit, decodeErr := decodeICECandidate(candidate) - if decodeErr != nil { - log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error decoding candidate: " + decodeErr.Error()) - continue - } - if candidateErr := peerConnection.AddICECandidate(candidateInit); candidateErr != nil { - log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding candidate: " + candidateErr.Error()) - } - } - } - }() - // When an ICE candidate is available send to the other peer using the signaling server (MQTT). // The other peer will add this candidate by calling AddICECandidate. // This handler must be registered before setting the local description, otherwise early candidates can be missed. @@ -557,27 +608,13 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati candateBinary, err := json.Marshal(candidateJSON) if err == nil { valueMap["candidate"] = string(candateBinary) - valueMap["session_id"] = handshake.SessionID + valueMap["session_id"] = handshakePayload.SessionID log.Log.Info("webrtc.main.InitializeWebRTCConnection(): sending " + candidateType + " candidate to hub") } else { log.Log.Error("webrtc.main.InitializeWebRTCConnection(): failed to marshal candidate: " + err.Error()) } - // We'll send the candidate to the hub - message := models.Message{ - Payload: models.Payload{ - Action: "receive-hd-candidates", - DeviceId: configuration.Config.Key, - Value: valueMap, - }, - } - payload, err := models.PackageMQTTMessage(configuration, message) - if err == nil { - token := mqttClient.Publish("kerberos/hub/"+hubKey, 2, false, payload) - token.Wait() - } else { - log.Log.Info("webrtc.main.InitializeWebRTCConnection(): while packaging mqtt message: " + err.Error()) - } + sendCandidateSignal(configuration, mqttClient, hubKey, handshake, candateBinary) }) offer := w.CreateOffer(sd) @@ -587,6 +624,35 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati return } + go func() { + defer func() { + log.Log.Info("webrtc.main.InitializeWebRTCConnection(): candidate processor stopped for session: " + handshakePayload.SessionID) + }() + + // Process remote candidates only after the remote description is set. + // MQTT can deliver candidates before the SDP offer handling completes, + // and Pion rejects AddICECandidate calls until SetRemoteDescription succeeds. + for { + select { + case <-ctx.Done(): + return + case candidate, ok := <-candidateChannel: + if !ok { + return + } + log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Received candidate from channel: " + candidate) + candidateInit, decodeErr := decodeICECandidate(candidate) + if decodeErr != nil { + log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error decoding candidate: " + decodeErr.Error()) + continue + } + if candidateErr := peerConnection.AddICECandidate(candidateInit); candidateErr != nil { + log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding candidate: " + candidateErr.Error()) + } + } + } + }() + answer, err := peerConnection.CreateAnswer(nil) if err != nil { log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while creating answer: " + err.Error()) @@ -601,27 +667,9 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati // Store peer connection in manager globalConnectionManager.AddPeerConnection(sessionKey, wrapper) - // Create a config map - valueMap := make(map[string]interface{}) - valueMap["sdp"] = []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP))) - valueMap["session_id"] = handshake.SessionID log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Send SDP answer") - // We'll send the candidate to the hub - message := models.Message{ - Payload: models.Payload{ - Action: "receive-hd-answer", - DeviceId: configuration.Config.Key, - Value: valueMap, - }, - } - payload, err := models.PackageMQTTMessage(configuration, message) - if err == nil { - token := mqttClient.Publish("kerberos/hub/"+hubKey, 2, false, payload) - token.Wait() - } else { - log.Log.Info("webrtc.main.InitializeWebRTCConnection(): while packaging mqtt message: " + err.Error()) - } + sendAnswerSignal(configuration, mqttClient, hubKey, handshake, answer) } } else { globalConnectionManager.CloseCandidateChannel(sessionKey) diff --git a/ui/package.json b/ui/package.json index 3b3c09c..acc465b 100644 --- a/ui/package.json +++ b/ui/package.json @@ -45,9 +45,9 @@ "crypto": false }, "scripts": { - "start": "react-scripts start", - "build": "GENERATE_SOURCEMAP=false REACT_APP_ENVIRONMENT=production react-scripts build && rm -rf ../machinery/www && mv build ../machinery/www", - "test": "react-scripts test", + "start": "DISABLE_ESLINT_PLUGIN=true react-scripts start", + "build": "DISABLE_ESLINT_PLUGIN=true GENERATE_SOURCEMAP=false REACT_APP_ENVIRONMENT=production react-scripts build && rm -rf ../machinery/www && mv build ../machinery/www", + "test": "DISABLE_ESLINT_PLUGIN=true react-scripts test", "eject": "react-scripts eject", "lint": "eslint --debug 'src/**/*.{js,jsx,ts,tsx}'", "format": "prettier --write \"**/*.{js,jsx,json,md}\"" diff --git a/ui/public/locales/en/translation.json b/ui/public/locales/en/translation.json index 8d03564..f05f08e 100644 --- a/ui/public/locales/en/translation.json +++ b/ui/public/locales/en/translation.json @@ -237,4 +237,4 @@ "remove_after_upload_enabled": "Enable delete on upload" } } -} \ No newline at end of file +} diff --git a/ui/src/actions/agent.js b/ui/src/actions/agent.js index 71fc1c1..0ef7786 100644 --- a/ui/src/actions/agent.js +++ b/ui/src/actions/agent.js @@ -194,7 +194,7 @@ export const getDashboardInformation = (onSuccess, onError) => { }; }; -export const getEvents = (eventfilter, onSuccess, onError) => { +export const getEvents = (eventfilter, onSuccess, onError, append = false) => { return (dispatch) => { doGetEvents( eventfilter, @@ -203,6 +203,7 @@ export const getEvents = (eventfilter, onSuccess, onError) => { type: 'GET_EVENTS', events: data.events, filter: eventfilter, + append, }); if (onSuccess) { onSuccess(); diff --git a/ui/src/pages/Dashboard/Dashboard.jsx b/ui/src/pages/Dashboard/Dashboard.jsx index 385b875..50877ba 100644 --- a/ui/src/pages/Dashboard/Dashboard.jsx +++ b/ui/src/pages/Dashboard/Dashboard.jsx @@ -26,6 +26,23 @@ import { import './Dashboard.scss'; import ReactTooltip from 'react-tooltip'; import config from '../../config'; +import { getConfig } from '../../actions/agent'; + +function createUUID() { + if ( + typeof window !== 'undefined' && + window.crypto && + typeof window.crypto.randomUUID === 'function' + ) { + return window.crypto.randomUUID(); + } + + return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, (char) => { + const random = Math.floor(Math.random() * 16); + const value = char === 'x' ? random : 8 + Math.floor(random / 4); + return value.toString(16); + }); +} // eslint-disable-next-line react/prefer-stateless-function class Dashboard extends React.Component { @@ -33,48 +50,55 @@ class Dashboard extends React.Component { super(); this.state = { liveviewLoaded: false, + liveviewMode: 'webrtc', open: false, currentRecording: '', initialised: false, }; + this.videoRef = React.createRef(); + this.pendingRemoteCandidates = []; this.initialiseLiveview = this.initialiseLiveview.bind(this); - this.handleLiveviewLoad = this.handleLiveviewLoad.bind(this); + this.initialiseSDLiveview = this.initialiseSDLiveview.bind(this); + this.startWebRTCLiveview = this.startWebRTCLiveview.bind(this); + this.handleWebRTCSignalMessage = this.handleWebRTCSignalMessage.bind(this); + this.fallbackToSDLiveview = this.fallbackToSDLiveview.bind(this); } componentDidMount() { - const liveview = document.getElementsByClassName('videocard-video'); - if (liveview && liveview.length > 0) { - [this.liveviewElement] = liveview; - this.liveviewElement.addEventListener('load', this.handleLiveviewLoad); - } + const { dispatchGetConfig } = this.props; + dispatchGetConfig(() => this.initialiseLiveview()); this.initialiseLiveview(); } - componentDidUpdate() { - this.initialiseLiveview(); + componentDidUpdate(prevProps) { + const { images, dashboard } = this.props; + const { liveviewLoaded, liveviewMode } = this.state; + const configLoaded = this.hasAgentConfig(this.props); + const prevConfigLoaded = this.hasAgentConfig(prevProps); + + if (!prevConfigLoaded && configLoaded) { + this.initialiseLiveview(); + } + + if ( + liveviewMode === 'sd' && + !liveviewLoaded && + prevProps.images !== images && + images.length > 0 + ) { + this.setState({ + liveviewLoaded: true, + }); + } + + if (!prevProps.dashboard.cameraOnline && dashboard.cameraOnline) { + this.initialiseLiveview(); + } } componentWillUnmount() { - if (this.liveviewElement) { - this.liveviewElement.removeEventListener('load', this.handleLiveviewLoad); - this.liveviewElement = null; - } - - if (this.requestStreamSubscription) { - this.requestStreamSubscription.unsubscribe(); - this.requestStreamSubscription = null; - } - const { dispatchSend } = this.props; - const message = { - message_type: 'stop-sd', - }; - dispatchSend(message); - } - - handleLiveviewLoad() { - this.setState({ - liveviewLoaded: true, - }); + this.stopSDLiveview(); + this.stopWebRTCLiveview(); } handleClose() { @@ -84,32 +108,378 @@ class Dashboard extends React.Component { }); } - getCurrentTimestamp() { - return Math.round(Date.now() / 1000); + // eslint-disable-next-line react/sort-comp + hasAgentConfig(props) { + const currentProps = props || this.props; + const { config: configResponse } = currentProps; + return !!(configResponse && configResponse.config); + } + + browserSupportsWebRTC() { + return ( + typeof window !== 'undefined' && + typeof window.RTCPeerConnection !== 'undefined' + ); + } + + buildPeerConnectionConfig() { + const { config: configResponse } = this.props; + const agentConfig = + configResponse && configResponse.config ? configResponse.config : {}; + const iceServers = []; + + if (agentConfig.stunuri) { + iceServers.push({ + urls: [agentConfig.stunuri], + }); + } + + if (agentConfig.turnuri) { + const turnServer = { + urls: [agentConfig.turnuri], + }; + + if (agentConfig.turn_username) { + turnServer.username = agentConfig.turn_username; + } + + if (agentConfig.turn_password) { + turnServer.credential = agentConfig.turn_password; + } + + iceServers.push(turnServer); + } + + return { + iceServers, + iceTransportPolicy: agentConfig.turn_force === 'true' ? 'relay' : 'all', + }; } initialiseLiveview() { const { initialised } = this.state; - if (!initialised) { - const message = { - message_type: 'stream-sd', - }; - const { connected, dispatchSend } = this.props; - if (connected) { + const { dashboard } = this.props; + + if (initialised || !dashboard.cameraOnline) { + return; + } + + if (!this.hasAgentConfig()) { + return; + } + + if (this.browserSupportsWebRTC()) { + this.startWebRTCLiveview(); + } else { + this.fallbackToSDLiveview('WebRTC is not supported in this browser.'); + } + } + + initialiseSDLiveview() { + if (this.requestStreamSubscription) { + return; + } + + const message = { + message_type: 'stream-sd', + }; + const { connected, dispatchSend } = this.props; + if (connected) { + dispatchSend(message); + } + + const requestStreamInterval = interval(2000); + this.requestStreamSubscription = requestStreamInterval.subscribe(() => { + const { connected: isConnected } = this.props; + if (isConnected) { dispatchSend(message); } + }); + } - const requestStreamInterval = interval(2000); - this.requestStreamSubscription = requestStreamInterval.subscribe(() => { - const { connected: isConnected } = this.props; - if (isConnected) { - dispatchSend(message); - } - }); - this.setState({ - initialised: true, - }); + stopSDLiveview() { + if (this.requestStreamSubscription) { + this.requestStreamSubscription.unsubscribe(); + this.requestStreamSubscription = null; } + + const { dispatchSend } = this.props; + dispatchSend({ + message_type: 'stop-sd', + }); + } + + stopWebRTCLiveview() { + if (this.webrtcTimeout) { + window.clearTimeout(this.webrtcTimeout); + this.webrtcTimeout = null; + } + + if (this.webrtcSocket) { + this.webrtcSocket.onopen = null; + this.webrtcSocket.onmessage = null; + this.webrtcSocket.onerror = null; + this.webrtcSocket.onclose = null; + this.webrtcSocket.close(); + this.webrtcSocket = null; + } + + if (this.webrtcPeerConnection) { + this.webrtcPeerConnection.ontrack = null; + this.webrtcPeerConnection.onicecandidate = null; + this.webrtcPeerConnection.onconnectionstatechange = null; + this.webrtcPeerConnection.close(); + this.webrtcPeerConnection = null; + } + + this.pendingRemoteCandidates = []; + this.webrtcOfferStarted = false; + this.webrtcSessionId = null; + this.webrtcClientId = null; + + if (this.videoRef.current) { + this.videoRef.current.srcObject = null; + } + } + + sendWebRTCMessage(messageType, message = {}) { + if (!this.webrtcSocket || this.webrtcSocket.readyState !== WebSocket.OPEN) { + return; + } + + this.webrtcSocket.send( + JSON.stringify({ + client_id: this.webrtcClientId, + message_type: messageType, + message, + }) + ); + } + + async handleWebRTCSignalMessage(event) { + let data; + try { + data = JSON.parse(event.data); + } catch (error) { + return; + } + + const { message_type: messageType, message = {} } = data; + const { session_id: sessionID, sdp, candidate } = message; + + if (messageType === 'hello-back') { + await this.beginWebRTCLiveview(); + return; + } + + if (sessionID && sessionID !== this.webrtcSessionId) { + return; + } + + switch (messageType) { + case 'webrtc-answer': + try { + await this.webrtcPeerConnection.setRemoteDescription({ + type: 'answer', + sdp: window.atob(sdp), + }); + await this.flushPendingRemoteCandidates(); + } catch (error) { + this.fallbackToSDLiveview( + `Unable to apply WebRTC answer: ${error.message}` + ); + } + break; + + case 'webrtc-candidate': { + try { + const candidateInit = JSON.parse(candidate); + if ( + this.webrtcPeerConnection.remoteDescription && + this.webrtcPeerConnection.remoteDescription.type + ) { + await this.webrtcPeerConnection.addIceCandidate(candidateInit); + } else { + this.pendingRemoteCandidates.push(candidateInit); + } + } catch (error) { + this.fallbackToSDLiveview( + `Unable to apply WebRTC candidate: ${error.message}` + ); + } + break; + } + + case 'webrtc-error': + this.fallbackToSDLiveview( + message.message || 'The agent could not start the WebRTC liveview.' + ); + break; + + default: + break; + } + } + + async beginWebRTCLiveview() { + if (!this.webrtcPeerConnection || this.webrtcOfferStarted) { + return; + } + + try { + this.webrtcOfferStarted = true; + const offer = await this.webrtcPeerConnection.createOffer({ + offerToReceiveAudio: true, + offerToReceiveVideo: true, + }); + await this.webrtcPeerConnection.setLocalDescription(offer); + this.sendWebRTCMessage('stream-hd', { + session_id: this.webrtcSessionId, + sdp: window.btoa(this.webrtcPeerConnection.localDescription.sdp), + }); + } catch (error) { + this.fallbackToSDLiveview( + `Unable to initialise WebRTC liveview: ${error.message}`, + ); + } + } + + async flushPendingRemoteCandidates() { + if ( + !this.webrtcPeerConnection || + !this.webrtcPeerConnection.remoteDescription + ) { + return; + } + + while (this.pendingRemoteCandidates.length > 0) { + const candidateInit = this.pendingRemoteCandidates.shift(); + try { + // eslint-disable-next-line no-await-in-loop + await this.webrtcPeerConnection.addIceCandidate(candidateInit); + } catch (error) { + this.fallbackToSDLiveview( + `Unable to add remote ICE candidate: ${error.message}`, + ); + return; + } + } + } + + startWebRTCLiveview() { + if (this.webrtcPeerConnection || this.webrtcSocket) { + return; + } + + this.stopSDLiveview(); + + this.webrtcClientId = createUUID(); + this.webrtcSessionId = createUUID(); + this.pendingRemoteCandidates = []; + + this.webrtcPeerConnection = new window.RTCPeerConnection( + this.buildPeerConnectionConfig() + ); + + this.webrtcPeerConnection.ontrack = (event) => { + const [remoteStream] = event.streams; + if (this.videoRef.current && remoteStream) { + this.videoRef.current.srcObject = remoteStream; + const playPromise = this.videoRef.current.play(); + if (playPromise && playPromise.catch) { + playPromise.catch(() => {}); + } + } + + this.setState({ + liveviewLoaded: true, + }); + }; + + this.webrtcPeerConnection.onicecandidate = (event) => { + if (!event.candidate) { + return; + } + + this.sendWebRTCMessage('webrtc-candidate', { + session_id: this.webrtcSessionId, + candidate: JSON.stringify(event.candidate.toJSON()), + }); + }; + + this.webrtcPeerConnection.onconnectionstatechange = () => { + const { connectionState } = this.webrtcPeerConnection; + if (connectionState === 'connected') { + this.setState({ + liveviewLoaded: true, + }); + } + + if ( + connectionState === 'failed' || + connectionState === 'disconnected' || + connectionState === 'closed' + ) { + this.fallbackToSDLiveview( + `WebRTC connection ${connectionState}, falling back to SD liveview.`, + ); + } + }; + + this.webrtcSocket = new WebSocket(config.WS_URL); + this.webrtcSocket.onopen = () => { + this.sendWebRTCMessage('hello', {}); + }; + this.webrtcSocket.onmessage = this.handleWebRTCSignalMessage; + this.webrtcSocket.onerror = () => { + this.fallbackToSDLiveview('Unable to open the WebRTC signaling channel.'); + }; + this.webrtcSocket.onclose = () => { + const { liveviewLoaded } = this.state; + if (!liveviewLoaded) { + this.fallbackToSDLiveview('WebRTC signaling channel closed early.'); + } + }; + + this.webrtcTimeout = window.setTimeout(() => { + const { liveviewLoaded } = this.state; + if (!liveviewLoaded) { + this.fallbackToSDLiveview( + 'WebRTC connection timed out, falling back to SD liveview.' + ); + } + }, 10000); + + this.setState({ + initialised: true, + liveviewLoaded: false, + liveviewMode: 'webrtc', + }); + } + + fallbackToSDLiveview(errorMessage) { + const { liveviewMode } = this.state; + + if (liveviewMode === 'sd' && this.requestStreamSubscription) { + return; + } + + this.stopWebRTCLiveview(); + if (errorMessage) { + // eslint-disable-next-line no-console + console.warn(errorMessage); + } + + this.setState( + { + initialised: true, + liveviewLoaded: false, + liveviewMode: 'sd', + }, + () => { + this.initialiseSDLiveview(); + } + ); } openModal(file) { @@ -121,7 +491,8 @@ class Dashboard extends React.Component { render() { const { dashboard, t, images } = this.props; - const { liveviewLoaded, open, currentRecording } = this.state; + const { liveviewLoaded, liveviewMode, open, currentRecording } = this.state; + const listenerCount = dashboard.webrtcReaders ? dashboard.webrtcReaders : 0; // We check if the camera was getting a valid frame // during the last 5 seconds, otherwise we assume the camera is offline. @@ -175,7 +546,6 @@ class Dashboard extends React.Component { divider="0" footer={t('dashboard.total_recordings')} /> - - {t('dashboard.live_view')} + + {t('dashboard.live_view')} ({listenerCount}) + {(!liveviewLoaded || !isCameraOnline) && ( - + {liveviewMode === 'webrtc' ? ( + + ) : ( + + )} @@ -348,20 +724,25 @@ class Dashboard extends React.Component { const mapStateToProps = (state /* , ownProps */) => ({ dashboard: state.agent.dashboard, + config: state.agent.config, connected: state.wss.connected, images: state.wss.images, }); const mapDispatchToProps = (dispatch) => ({ dispatchSend: (message) => dispatch(send(message)), + dispatchGetConfig: (onSuccess, onError) => + dispatch(getConfig(onSuccess, onError)), }); Dashboard.propTypes = { dashboard: PropTypes.object.isRequired, + config: PropTypes.object.isRequired, connected: PropTypes.bool.isRequired, images: PropTypes.array.isRequired, t: PropTypes.func.isRequired, dispatchSend: PropTypes.func.isRequired, + dispatchGetConfig: PropTypes.func.isRequired, }; export default withTranslation()( diff --git a/ui/src/pages/Media/Media.jsx b/ui/src/pages/Media/Media.jsx index e32ae64..7d90c92 100644 --- a/ui/src/pages/Media/Media.jsx +++ b/ui/src/pages/Media/Media.jsx @@ -3,6 +3,7 @@ import PropTypes from 'prop-types'; import { withTranslation } from 'react-i18next'; import { Breadcrumb, + ControlBar, VideoCard, Button, Modal, @@ -16,14 +17,53 @@ import { getEvents } from '../../actions/agent'; import config from '../../config'; import './Media.scss'; +function formatDateTimeLocal(date) { + const year = date.getFullYear(); + const month = String(date.getMonth() + 1).padStart(2, '0'); + const day = String(date.getDate()).padStart(2, '0'); + const hours = String(date.getHours()).padStart(2, '0'); + const minutes = String(date.getMinutes()).padStart(2, '0'); + + return `${year}-${month}-${day}T${hours}:${minutes}`; +} + +function getDefaultTimeWindow() { + const endDate = new Date(); + const startDate = new Date(endDate.getTime() - 60 * 60 * 1000); + + return { + startDateTime: formatDateTimeLocal(startDate), + endDateTime: formatDateTimeLocal(endDate), + timestamp_offset_start: Math.floor(startDate.getTime() / 1000), + timestamp_offset_end: Math.floor(endDate.getTime() / 1000) + 59, + }; +} + +function normalizeInputValue(valueOrEvent) { + if (valueOrEvent && valueOrEvent.target) { + return valueOrEvent.target.value; + } + + return valueOrEvent; +} + // eslint-disable-next-line react/prefer-stateless-function class Media extends React.Component { constructor() { super(); - this.state = { - timestamp_offset_start: 0, - timestamp_offset_end: 0, + + const defaultTimeWindow = getDefaultTimeWindow(); + + const initialFilter = { + timestamp_offset_start: defaultTimeWindow.timestamp_offset_start, + timestamp_offset_end: defaultTimeWindow.timestamp_offset_end, number_of_elements: 12, + }; + + this.state = { + appliedFilter: initialFilter, + startDateTime: defaultTimeWindow.startDateTime, + endDateTime: defaultTimeWindow.endDateTime, isScrolling: false, open: false, currentRecording: '', @@ -32,7 +72,8 @@ class Media extends React.Component { componentDidMount() { const { dispatchGetEvents } = this.props; - dispatchGetEvents(this.state); + const { appliedFilter } = this.state; + dispatchGetEvents(appliedFilter); document.addEventListener('scroll', this.trackScrolling); } @@ -49,29 +90,107 @@ class Media extends React.Component { trackScrolling = () => { const { events, dispatchGetEvents } = this.props; - const { isScrolling } = this.state; + const { isScrolling, appliedFilter } = this.state; const wrappedElement = document.getElementById('loader'); - if (!isScrolling && this.isBottom(wrappedElement)) { - this.setState({ - isScrolling: true, - }); - // Get last element - const lastElement = events[events.length - 1]; - if (lastElement) { - this.setState({ + if (!wrappedElement || isScrolling || !this.isBottom(wrappedElement)) { + return; + } + + this.setState({ + isScrolling: true, + }); + + // Get last element + const lastElement = events[events.length - 1]; + if (lastElement) { + dispatchGetEvents( + { + ...appliedFilter, timestamp_offset_end: parseInt(lastElement.timestamp, 10), - }); - dispatchGetEvents(this.state, () => { + }, + () => { setTimeout(() => { this.setState({ isScrolling: false, }); }, 1000); - }); - } + }, + () => { + this.setState({ + isScrolling: false, + }); + }, + true + ); + } else { + this.setState({ + isScrolling: false, + }); } }; + buildEventFilter(startDateTime, endDateTime) { + const { appliedFilter } = this.state; + + return { + timestamp_offset_start: this.getTimestampFromInput( + startDateTime, + 'start' + ), + timestamp_offset_end: this.getTimestampFromInput(endDateTime, 'end'), + number_of_elements: appliedFilter.number_of_elements, + }; + } + + handleDateFilterChange(field, value) { + const { dispatchGetEvents } = this.props; + const { startDateTime, endDateTime } = this.state; + const normalizedValue = normalizeInputValue(value); + const nextStartDateTime = + field === 'startDateTime' ? normalizedValue : startDateTime; + const nextEndDateTime = + field === 'endDateTime' ? normalizedValue : endDateTime; + const nextFilter = this.buildEventFilter( + nextStartDateTime, + nextEndDateTime + ); + const shouldApplyFilter = + (nextStartDateTime === '' || nextStartDateTime.length === 16) && + (nextEndDateTime === '' || nextEndDateTime.length === 16); + + this.setState( + { + [field]: normalizedValue, + appliedFilter: shouldApplyFilter + ? nextFilter + : this.state.appliedFilter, + isScrolling: false, + }, + () => { + if (shouldApplyFilter) { + dispatchGetEvents(nextFilter); + } + } + ); + } + + getTimestampFromInput(value, boundary) { + if (!value) { + return 0; + } + + const date = new Date(value); + if (Number.isNaN(date.getTime())) { + return 0; + } + + const seconds = Math.floor(date.getTime() / 1000); + if (boundary === 'end') { + return seconds + 59; + } + return seconds; + } + isBottom(el) { return el.getBoundingClientRect().bottom + 50 <= window.innerHeight; } @@ -85,7 +204,9 @@ class Media extends React.Component { render() { const { events, eventsLoaded, t } = this.props; - const { isScrolling, open, currentRecording } = this.state; + const { isScrolling, open, currentRecording, startDateTime, endDateTime } = + this.state; + return ( + + + + + Start time + + this.handleDateFilterChange('startDateTime', value) + } + /> + + + End time + + this.handleDateFilterChange('endDateTime', value) + } + /> + + + + + {events.map((event) => ( ))} + {events.length === 0 && eventsLoaded === 0 && ( + + No recordings found in the selected time range. + + )} {open && ( ({ }); const mapDispatchToProps = (dispatch) => ({ - dispatchGetEvents: (eventFilter, success, error) => - dispatch(getEvents(eventFilter, success, error)), + dispatchGetEvents: (eventFilter, success, error, append) => + dispatch(getEvents(eventFilter, success, error, append)), }); Media.propTypes = { t: PropTypes.func.isRequired, - events: PropTypes.objectOf(PropTypes.object).isRequired, + events: PropTypes.arrayOf(PropTypes.object).isRequired, eventsLoaded: PropTypes.number.isRequired, dispatchGetEvents: PropTypes.func.isRequired, }; diff --git a/ui/src/pages/Media/Media.scss b/ui/src/pages/Media/Media.scss index 7776f09..17883b8 100644 --- a/ui/src/pages/Media/Media.scss +++ b/ui/src/pages/Media/Media.scss @@ -1,4 +1,103 @@ #media { + .media-control-bar { + .control-bar { + display: block; + padding: 0 var(--main-content-gutter); + } + + .control-bar .filtering { + display: block; + } + + .control-bar .filtering > * { + border-right: 0; + flex: 1 1 100% !important; + max-width: none; + width: 100%; + } + } + + .media-filters { + align-items: stretch; + display: grid; + gap: 0; + grid-template-columns: repeat(2, minmax(0, 1fr)); + min-width: 0; + width: 100%; + } + + .media-filters__field { + border-right: 1px solid var(--bg-muted); + min-width: 0; + padding: 16px 24px; + + label { + display: block; + font-size: 14px; + font-weight: 600; + margin-bottom: 8px; + white-space: nowrap; + } + } + + .media-filters__field:first-child { + padding-left: 0; + } + + .media-filters__input { + appearance: none; + background: var(--white); + border: 1px solid var(--grey-light); + border-radius: 8px; + box-sizing: border-box; + color: var(--black); + font-size: 16px; + min-height: 48px; + padding: 0 14px 0 0; + width: 100%; + } + + .media-filters__input::-webkit-datetime-edit, + .media-filters__input::-webkit-datetime-edit-fields-wrapper { + padding: 0; + } + + .media-filters__input:focus { + border-color: var(--oss); + outline: 0; + } + + .media-filters__field:first-child .media-filters__input { + padding-left: 0; + } + + .media-filters__field:last-child { + border-right: 0; + padding-right: 0; + } + + @media (max-width: 700px) { + .media-filters { + grid-template-columns: 1fr; + } + + .media-filters__field { + border-right: 0; + border-bottom: 1px solid var(--bg-muted); + padding-left: 0; + padding-right: 0; + } + + .media-filters__field:last-child { + border-bottom: 0; + } + } + + .media-empty-state { + margin: 24px 0; + opacity: 0.8; + text-align: center; + } #loader { display: flex; diff --git a/ui/src/reducers/agent.js b/ui/src/reducers/agent.js index 478f8b7..d41d5d5 100644 --- a/ui/src/reducers/agent.js +++ b/ui/src/reducers/agent.js @@ -123,16 +123,12 @@ const agent = ( }; case 'GET_EVENTS': - const { timestamp_offset_end } = action.filter; const { events } = action; return { ...state, eventsLoaded: events.length, - events: - timestamp_offset_end === 0 - ? [...events] - : [...state.events, ...events], - eventfilter: action.eventfilter, + events: action.append ? [...state.events, ...events] : [...events], + eventfilter: action.filter, }; default: