Merge pull request #262 from kerberos-io/feature/concurrency-webrtc

feature/concurrency-webrtc
This commit is contained in:
Cédric Verstraeten
2026-03-09 21:37:04 +01:00
committed by GitHub
14 changed files with 1030 additions and 167 deletions
+9 -1
View File
@@ -21,6 +21,7 @@ import (
"github.com/kerberos-io/agent/machinery/src/packets"
routers "github.com/kerberos-io/agent/machinery/src/routers/mqtt"
"github.com/kerberos-io/agent/machinery/src/utils"
"github.com/kerberos-io/agent/machinery/src/webrtc"
"github.com/tevino/abool"
)
@@ -303,7 +304,7 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
}
// Handle livestream HD (high resolution over WEBRTC)
communication.HandleLiveHDHandshake = make(chan models.RequestHDStreamPayload, 10)
communication.HandleLiveHDHandshake = make(chan models.LiveHDHandshake, 100)
if subStreamEnabled {
livestreamHDCursor := subQueue.Latest()
go cloud.HandleLiveStreamHD(livestreamHDCursor, configuration, communication, mqttClient, rtspSubClient)
@@ -552,6 +553,11 @@ func GetDashboard(c *gin.Context, configDirectory string, configuration *models.
// The total number of recordings stored in the directory.
recordingDirectory := configDirectory + "/data/recordings"
numberOfRecordings := utils.NumberOfMP4sInDirectory(recordingDirectory)
activeWebRTCReaders := webrtc.GetActivePeerConnectionCount()
pendingWebRTCHandshakes := 0
if communication.HandleLiveHDHandshake != nil {
pendingWebRTCHandshakes = len(communication.HandleLiveHDHandshake)
}
// All days stored in this agent.
days := []string{}
@@ -574,6 +580,8 @@ func GetDashboard(c *gin.Context, configDirectory string, configuration *models.
"cameraOnline": cameraIsOnline,
"cloudOnline": cloudIsOnline,
"numberOfRecordings": numberOfRecordings,
"webrtcReaders": activeWebRTCReaders,
"webrtcPending": pendingWebRTCHandshakes,
"days": days,
"latestEvents": latestEvents,
})
+12 -1
View File
@@ -8,6 +8,17 @@ import (
"github.com/tevino/abool"
)
type LiveHDSignalingCallbacks struct {
SendAnswer func(sessionID string, sdp string) error
SendCandidate func(sessionID string, candidate string) error
SendError func(sessionID string, message string) error
}
type LiveHDHandshake struct {
Payload RequestHDStreamPayload
Signaling *LiveHDSignalingCallbacks
}
// The communication struct that is managing
// all the communication between the different goroutines.
type Communication struct {
@@ -27,7 +38,7 @@ type Communication struct {
HandleHeartBeat chan string
HandleLiveSD chan int64
HandleLiveHDKeepalive chan string
HandleLiveHDHandshake chan RequestHDStreamPayload
HandleLiveHDHandshake chan LiveHDHandshake
HandleLiveHDPeers chan string
HandleONVIF chan OnvifAction
IsConfiguring *abool.AtomicBool
+11 -6
View File
@@ -90,9 +90,10 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration,
// Some extra options to make sure the connection behaves
// properly. More information here: github.com/eclipse/paho.mqtt.golang.
opts.SetCleanSession(true)
//opts.SetResumeSubs(true)
//opts.SetStore(mqtt.NewMemoryStore())
//opts.SetCleanSession(true)
opts.SetCleanSession(false)
opts.SetResumeSubs(true)
opts.SetStore(mqtt.NewMemoryStore())
opts.SetConnectRetry(true)
opts.SetAutoReconnect(true)
opts.SetConnectRetryInterval(5 * time.Second)
@@ -537,9 +538,13 @@ func HandleRequestHDStream(mqttClient mqtt.Client, hubKey string, payload models
if communication.CameraConnected {
// Set the Hub key, so we can send back the answer.
requestHDStreamPayload.HubKey = hubKey
select {
case communication.HandleLiveHDHandshake <- requestHDStreamPayload:
default:
if communication.HandleLiveHDHandshake == nil {
log.Log.Error("routers.mqtt.main.HandleRequestHDStream(): handshake channel is nil, dropping request")
return
}
communication.HandleLiveHDHandshake <- models.LiveHDHandshake{
Payload: requestHDStreamPayload,
}
log.Log.Info("routers.mqtt.main.HandleRequestHDStream(): received request to setup webrtc.")
} else {
+95
View File
@@ -6,6 +6,7 @@ import (
"image"
"net/http"
"sync"
"time"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
@@ -14,6 +15,7 @@ import (
"github.com/kerberos-io/agent/machinery/src/models"
"github.com/kerberos-io/agent/machinery/src/packets"
"github.com/kerberos-io/agent/machinery/src/utils"
"github.com/kerberos-io/agent/machinery/src/webrtc"
)
type Message struct {
@@ -28,6 +30,23 @@ type Connection struct {
Cancels map[string]context.CancelFunc
}
func writeWebRTCError(connection *Connection, clientID string, sessionID string, errorMessage string) {
if connection == nil {
return
}
if err := connection.WriteJson(Message{
ClientID: clientID,
MessageType: "webrtc-error",
Message: map[string]string{
"session_id": sessionID,
"message": errorMessage,
},
}); err != nil {
log.Log.Error("routers.websocket.main.writeWebRTCError(): " + err.Error())
}
}
// Concurrency handling - sending messages
func (c *Connection) WriteJson(message Message) error {
c.mu.Lock()
@@ -115,6 +134,82 @@ func WebsocketHandler(c *gin.Context, configuration *models.Configuration, commu
go ForwardSDStream(ctx, clientID, sockets[clientID], configuration, communication, captureDevice)
}
}
case "stream-hd":
sessionID := message.Message["session_id"]
sessionDescription := message.Message["sdp"]
if sessionID == "" || sessionDescription == "" {
writeWebRTCError(sockets[clientID], clientID, sessionID, "missing session_id or sdp")
break
}
if !communication.CameraConnected {
writeWebRTCError(sockets[clientID], clientID, sessionID, "camera is not connected")
break
}
if communication.HandleLiveHDHandshake == nil {
writeWebRTCError(sockets[clientID], clientID, sessionID, "webrtc liveview is not available")
break
}
handshake := models.LiveHDHandshake{
Payload: models.RequestHDStreamPayload{
Timestamp: time.Now().Unix(),
SessionID: sessionID,
SessionDescription: sessionDescription,
},
Signaling: &models.LiveHDSignalingCallbacks{
SendAnswer: func(callbackSessionID string, sdp string) error {
return sockets[clientID].WriteJson(Message{
ClientID: clientID,
MessageType: "webrtc-answer",
Message: map[string]string{
"session_id": callbackSessionID,
"sdp": sdp,
},
})
},
SendCandidate: func(callbackSessionID string, candidate string) error {
return sockets[clientID].WriteJson(Message{
ClientID: clientID,
MessageType: "webrtc-candidate",
Message: map[string]string{
"session_id": callbackSessionID,
"candidate": candidate,
},
})
},
SendError: func(callbackSessionID string, errorMessage string) error {
writeWebRTCError(sockets[clientID], clientID, callbackSessionID, errorMessage)
return nil
},
},
}
communication.HandleLiveHDHandshake <- handshake
case "webrtc-candidate":
sessionID := message.Message["session_id"]
candidate := message.Message["candidate"]
if sessionID == "" || candidate == "" {
writeWebRTCError(sockets[clientID], clientID, sessionID, "missing session_id or candidate")
break
}
if !communication.CameraConnected {
writeWebRTCError(sockets[clientID], clientID, sessionID, "camera is not connected")
break
}
key := configuration.Config.Key + "/" + sessionID
go webrtc.RegisterCandidates(key, models.ReceiveHDCandidatesPayload{
Timestamp: time.Now().Unix(),
SessionID: sessionID,
Candidate: candidate,
})
}
err = conn.ReadJSON(&message)
+9 -1
View File
@@ -27,7 +27,8 @@ import (
// VERSION is the agent version. It defaults to "0.0.0" for local dev builds
// and is overridden at build time via:
// go build -ldflags "-X github.com/kerberos-io/agent/machinery/src/utils.VERSION=v1.2.3"
//
// go build -ldflags "-X github.com/kerberos-io/agent/machinery/src/utils.VERSION=v1.2.3"
var VERSION = "0.0.0"
const letterBytes = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
@@ -198,6 +199,13 @@ func GetMediaFormatted(files []os.FileInfo, recordingDirectory string, configura
timestampInt, err := strconv.ParseInt(timestamp, 10, 64)
if err == nil {
if eventFilter.TimestampOffsetStart > 0 {
// TimestampOffsetStart represents the newest lower bound to include.
if timestampInt < eventFilter.TimestampOffsetStart {
continue
}
}
// If we have an offset we will check if we should skip or not
if eventFilter.TimestampOffsetEnd > 0 {
// Medias are sorted from new to older. TimestampOffsetEnd holds the oldest
+54
View File
@@ -0,0 +1,54 @@
package utils
import (
"os"
"testing"
"time"
"github.com/kerberos-io/agent/machinery/src/models"
)
type stubFileInfo struct {
name string
}
func (s stubFileInfo) Name() string { return s.name }
func (s stubFileInfo) Size() int64 { return 0 }
func (s stubFileInfo) Mode() os.FileMode { return 0 }
func (s stubFileInfo) ModTime() time.Time { return time.Unix(0, 0) }
func (s stubFileInfo) IsDir() bool { return false }
func (s stubFileInfo) Sys() interface{} { return nil }
func TestGetMediaFormattedHonorsTimestampRange(t *testing.T) {
configuration := &models.Configuration{}
configuration.Config.Timezone = "UTC"
configuration.Config.Name = "Front Door"
configuration.Config.Key = "camera-1"
files := []os.FileInfo{
stubFileInfo{name: "1700000200_6_7_8_9_10.mp4"},
stubFileInfo{name: "1700000100_6_7_8_9_10.mp4"},
stubFileInfo{name: "1700000000_6_7_8_9_10.mp4"},
}
media := GetMediaFormatted(files, "/tmp/recordings", configuration, models.EventFilter{
TimestampOffsetStart: 1700000050,
TimestampOffsetEnd: 1700000200,
NumberOfElements: 10,
})
if len(media) != 1 {
t.Fatalf("expected 1 media item in time range, got %d", len(media))
}
if media[0].Timestamp != "1700000100" {
t.Fatalf("expected timestamp 1700000100, got %s", media[0].Timestamp)
}
if media[0].CameraName != "Front Door" {
t.Fatalf("expected camera name to be preserved, got %s", media[0].CameraName)
}
if media[0].CameraKey != "camera-1" {
t.Fatalf("expected camera key to be preserved, got %s", media[0].CameraKey)
}
}
+119 -71
View File
@@ -140,6 +140,11 @@ func (cm *ConnectionManager) GetPeerConnectionCount() int64 {
return atomic.LoadInt64(&cm.peerConnectionCount)
}
// GetActivePeerConnectionCount returns the current number of connected WebRTC readers.
func GetActivePeerConnectionCount() int64 {
return globalConnectionManager.GetPeerConnectionCount()
}
// IncrementPeerCount atomically increments the peer connection count
func (cm *ConnectionManager) IncrementPeerCount() int64 {
return atomic.AddInt64(&cm.peerConnectionCount, 1)
@@ -252,7 +257,79 @@ func RegisterDefaultInterceptors(mediaEngine *pionWebRTC.MediaEngine, intercepto
return nil
}
func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoBroadcaster *TrackBroadcaster, audioBroadcaster *TrackBroadcaster, handshake models.RequestHDStreamPayload) {
func publishSignalingMessageAsync(mqttClient mqtt.Client, topic string, payload []byte, description string) {
if mqttClient == nil {
log.Log.Error("webrtc.main.publishSignalingMessageAsync(): mqtt client is nil for " + description)
return
}
token := mqttClient.Publish(topic, 2, false, payload)
go func() {
if !token.WaitTimeout(5 * time.Second) {
log.Log.Warning("webrtc.main.publishSignalingMessageAsync(): timed out publishing " + description)
return
}
if err := token.Error(); err != nil {
log.Log.Error("webrtc.main.publishSignalingMessageAsync(): failed publishing " + description + ": " + err.Error())
}
}()
}
func sendCandidateSignal(configuration *models.Configuration, mqttClient mqtt.Client, hubKey string, handshake models.LiveHDHandshake, candidateJSON []byte) {
if handshake.Signaling != nil && handshake.Signaling.SendCandidate != nil {
if err := handshake.Signaling.SendCandidate(handshake.Payload.SessionID, string(candidateJSON)); err != nil {
log.Log.Error("webrtc.main.sendCandidateSignal(): " + err.Error())
}
return
}
message := models.Message{
Payload: models.Payload{
Action: "receive-hd-candidates",
DeviceId: configuration.Config.Key,
Value: map[string]interface{}{
"candidate": string(candidateJSON),
"session_id": handshake.Payload.SessionID,
},
},
}
payload, err := models.PackageMQTTMessage(configuration, message)
if err == nil {
publishSignalingMessageAsync(mqttClient, "kerberos/hub/"+hubKey, payload, "ICE candidate for session "+handshake.Payload.SessionID)
} else {
log.Log.Info("webrtc.main.sendCandidateSignal(): while packaging mqtt message: " + err.Error())
}
}
func sendAnswerSignal(configuration *models.Configuration, mqttClient mqtt.Client, hubKey string, handshake models.LiveHDHandshake, answer pionWebRTC.SessionDescription) {
encodedAnswer := base64.StdEncoding.EncodeToString([]byte(answer.SDP))
if handshake.Signaling != nil && handshake.Signaling.SendAnswer != nil {
if err := handshake.Signaling.SendAnswer(handshake.Payload.SessionID, encodedAnswer); err != nil {
log.Log.Error("webrtc.main.sendAnswerSignal(): " + err.Error())
}
return
}
message := models.Message{
Payload: models.Payload{
Action: "receive-hd-answer",
DeviceId: configuration.Config.Key,
Value: map[string]interface{}{
"sdp": []byte(encodedAnswer),
"session_id": handshake.Payload.SessionID,
},
},
}
payload, err := models.PackageMQTTMessage(configuration, message)
if err == nil {
publishSignalingMessageAsync(mqttClient, "kerberos/hub/"+hubKey, payload, "SDP answer for session "+handshake.Payload.SessionID)
} else {
log.Log.Info("webrtc.main.sendAnswerSignal(): while packaging mqtt message: " + err.Error())
}
}
func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoBroadcaster *TrackBroadcaster, audioBroadcaster *TrackBroadcaster, handshake models.LiveHDHandshake) {
config := configuration.Config
deviceKey := config.Key
@@ -260,14 +337,15 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
turnServers := []string{config.TURNURI}
turnServersUsername := config.TURNUsername
turnServersCredential := config.TURNPassword
handshakePayload := handshake.Payload
// We create a channel which will hold the candidates for this session.
sessionKey := config.Key + "/" + handshake.SessionID
sessionKey := config.Key + "/" + handshakePayload.SessionID
candidateChannel := globalConnectionManager.GetOrCreateCandidateChannel(sessionKey)
// Set variables
hubKey := handshake.HubKey
sessionDescription := handshake.SessionDescription
hubKey := handshakePayload.HubKey
sessionDescription := handshakePayload.SessionDescription
// Create WebRTC object
w := CreateWebRTC(deviceKey, stunServers, turnServers, turnServersUsername, turnServersCredential)
@@ -424,12 +502,12 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
// Log ICE connection state changes for diagnostics
peerConnection.OnICEConnectionStateChange(func(iceState pionWebRTC.ICEConnectionState) {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): ICE connection state changed to: " + iceState.String() +
" (session: " + handshake.SessionID + ")")
" (session: " + handshakePayload.SessionID + ")")
})
peerConnection.OnConnectionStateChange(func(connectionState pionWebRTC.PeerConnectionState) {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection state changed to: " + connectionState.String() +
" (session: " + handshake.SessionID + ")")
" (session: " + handshakePayload.SessionID + ")")
switch connectionState {
case pionWebRTC.PeerConnectionStateDisconnected:
@@ -438,9 +516,9 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
wrapper.disconnectMu.Lock()
if wrapper.disconnectTimer == nil {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): peer disconnected, waiting " +
disconnectGracePeriod.String() + " for recovery (session: " + handshake.SessionID + ")")
disconnectGracePeriod.String() + " for recovery (session: " + handshakePayload.SessionID + ")")
wrapper.disconnectTimer = time.AfterFunc(disconnectGracePeriod, func() {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): disconnect grace period expired, closing connection (session: " + handshake.SessionID + ")")
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): disconnect grace period expired, closing connection (session: " + handshakePayload.SessionID + ")")
cleanupPeerConnection(sessionKey, wrapper)
})
}
@@ -472,7 +550,7 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
if wrapper.disconnectTimer != nil {
wrapper.disconnectTimer.Stop()
wrapper.disconnectTimer = nil
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection recovered from disconnected state (session: " + handshake.SessionID + ")")
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): connection recovered from disconnected state (session: " + handshakePayload.SessionID + ")")
}
wrapper.disconnectMu.Unlock()
@@ -483,33 +561,6 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
}
})
go func() {
defer func() {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): candidate processor stopped for session: " + handshake.SessionID)
}()
// Iterate over the candidates and send them to the remote client
for {
select {
case <-ctx.Done():
return
case candidate, ok := <-candidateChannel:
if !ok {
return
}
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Received candidate from channel: " + candidate)
candidateInit, decodeErr := decodeICECandidate(candidate)
if decodeErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error decoding candidate: " + decodeErr.Error())
continue
}
if candidateErr := peerConnection.AddICECandidate(candidateInit); candidateErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding candidate: " + candidateErr.Error())
}
}
}
}()
// When an ICE candidate is available send to the other peer using the signaling server (MQTT).
// The other peer will add this candidate by calling AddICECandidate.
// This handler must be registered before setting the local description, otherwise early candidates can be missed.
@@ -557,27 +608,13 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
candateBinary, err := json.Marshal(candidateJSON)
if err == nil {
valueMap["candidate"] = string(candateBinary)
valueMap["session_id"] = handshake.SessionID
valueMap["session_id"] = handshakePayload.SessionID
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): sending " + candidateType + " candidate to hub")
} else {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): failed to marshal candidate: " + err.Error())
}
// We'll send the candidate to the hub
message := models.Message{
Payload: models.Payload{
Action: "receive-hd-candidates",
DeviceId: configuration.Config.Key,
Value: valueMap,
},
}
payload, err := models.PackageMQTTMessage(configuration, message)
if err == nil {
token := mqttClient.Publish("kerberos/hub/"+hubKey, 2, false, payload)
token.Wait()
} else {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): while packaging mqtt message: " + err.Error())
}
sendCandidateSignal(configuration, mqttClient, hubKey, handshake, candateBinary)
})
offer := w.CreateOffer(sd)
@@ -587,6 +624,35 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
return
}
go func() {
defer func() {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): candidate processor stopped for session: " + handshakePayload.SessionID)
}()
// Process remote candidates only after the remote description is set.
// MQTT can deliver candidates before the SDP offer handling completes,
// and Pion rejects AddICECandidate calls until SetRemoteDescription succeeds.
for {
select {
case <-ctx.Done():
return
case candidate, ok := <-candidateChannel:
if !ok {
return
}
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Received candidate from channel: " + candidate)
candidateInit, decodeErr := decodeICECandidate(candidate)
if decodeErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error decoding candidate: " + decodeErr.Error())
continue
}
if candidateErr := peerConnection.AddICECandidate(candidateInit); candidateErr != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding candidate: " + candidateErr.Error())
}
}
}
}()
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while creating answer: " + err.Error())
@@ -601,27 +667,9 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
// Store peer connection in manager
globalConnectionManager.AddPeerConnection(sessionKey, wrapper)
// Create a config map
valueMap := make(map[string]interface{})
valueMap["sdp"] = []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP)))
valueMap["session_id"] = handshake.SessionID
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Send SDP answer")
// We'll send the candidate to the hub
message := models.Message{
Payload: models.Payload{
Action: "receive-hd-answer",
DeviceId: configuration.Config.Key,
Value: valueMap,
},
}
payload, err := models.PackageMQTTMessage(configuration, message)
if err == nil {
token := mqttClient.Publish("kerberos/hub/"+hubKey, 2, false, payload)
token.Wait()
} else {
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): while packaging mqtt message: " + err.Error())
}
sendAnswerSignal(configuration, mqttClient, hubKey, handshake, answer)
}
} else {
globalConnectionManager.CloseCandidateChannel(sessionKey)
+3 -3
View File
@@ -45,9 +45,9 @@
"crypto": false
},
"scripts": {
"start": "react-scripts start",
"build": "GENERATE_SOURCEMAP=false REACT_APP_ENVIRONMENT=production react-scripts build && rm -rf ../machinery/www && mv build ../machinery/www",
"test": "react-scripts test",
"start": "DISABLE_ESLINT_PLUGIN=true react-scripts start",
"build": "DISABLE_ESLINT_PLUGIN=true GENERATE_SOURCEMAP=false REACT_APP_ENVIRONMENT=production react-scripts build && rm -rf ../machinery/www && mv build ../machinery/www",
"test": "DISABLE_ESLINT_PLUGIN=true react-scripts test",
"eject": "react-scripts eject",
"lint": "eslint --debug 'src/**/*.{js,jsx,ts,tsx}'",
"format": "prettier --write \"**/*.{js,jsx,json,md}\""
+1 -1
View File
@@ -237,4 +237,4 @@
"remove_after_upload_enabled": "Enable delete on upload"
}
}
}
}
+2 -1
View File
@@ -194,7 +194,7 @@ export const getDashboardInformation = (onSuccess, onError) => {
};
};
export const getEvents = (eventfilter, onSuccess, onError) => {
export const getEvents = (eventfilter, onSuccess, onError, append = false) => {
return (dispatch) => {
doGetEvents(
eventfilter,
@@ -203,6 +203,7 @@ export const getEvents = (eventfilter, onSuccess, onError) => {
type: 'GET_EVENTS',
events: data.events,
filter: eventfilter,
append,
});
if (onSuccess) {
onSuccess();
+436 -55
View File
@@ -26,6 +26,23 @@ import {
import './Dashboard.scss';
import ReactTooltip from 'react-tooltip';
import config from '../../config';
import { getConfig } from '../../actions/agent';
function createUUID() {
if (
typeof window !== 'undefined' &&
window.crypto &&
typeof window.crypto.randomUUID === 'function'
) {
return window.crypto.randomUUID();
}
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, (char) => {
const random = Math.floor(Math.random() * 16);
const value = char === 'x' ? random : 8 + Math.floor(random / 4);
return value.toString(16);
});
}
// eslint-disable-next-line react/prefer-stateless-function
class Dashboard extends React.Component {
@@ -33,48 +50,55 @@ class Dashboard extends React.Component {
super();
this.state = {
liveviewLoaded: false,
liveviewMode: 'webrtc',
open: false,
currentRecording: '',
initialised: false,
};
this.videoRef = React.createRef();
this.pendingRemoteCandidates = [];
this.initialiseLiveview = this.initialiseLiveview.bind(this);
this.handleLiveviewLoad = this.handleLiveviewLoad.bind(this);
this.initialiseSDLiveview = this.initialiseSDLiveview.bind(this);
this.startWebRTCLiveview = this.startWebRTCLiveview.bind(this);
this.handleWebRTCSignalMessage = this.handleWebRTCSignalMessage.bind(this);
this.fallbackToSDLiveview = this.fallbackToSDLiveview.bind(this);
}
componentDidMount() {
const liveview = document.getElementsByClassName('videocard-video');
if (liveview && liveview.length > 0) {
[this.liveviewElement] = liveview;
this.liveviewElement.addEventListener('load', this.handleLiveviewLoad);
}
const { dispatchGetConfig } = this.props;
dispatchGetConfig(() => this.initialiseLiveview());
this.initialiseLiveview();
}
componentDidUpdate() {
this.initialiseLiveview();
componentDidUpdate(prevProps) {
const { images, dashboard } = this.props;
const { liveviewLoaded, liveviewMode } = this.state;
const configLoaded = this.hasAgentConfig(this.props);
const prevConfigLoaded = this.hasAgentConfig(prevProps);
if (!prevConfigLoaded && configLoaded) {
this.initialiseLiveview();
}
if (
liveviewMode === 'sd' &&
!liveviewLoaded &&
prevProps.images !== images &&
images.length > 0
) {
this.setState({
liveviewLoaded: true,
});
}
if (!prevProps.dashboard.cameraOnline && dashboard.cameraOnline) {
this.initialiseLiveview();
}
}
componentWillUnmount() {
if (this.liveviewElement) {
this.liveviewElement.removeEventListener('load', this.handleLiveviewLoad);
this.liveviewElement = null;
}
if (this.requestStreamSubscription) {
this.requestStreamSubscription.unsubscribe();
this.requestStreamSubscription = null;
}
const { dispatchSend } = this.props;
const message = {
message_type: 'stop-sd',
};
dispatchSend(message);
}
handleLiveviewLoad() {
this.setState({
liveviewLoaded: true,
});
this.stopSDLiveview();
this.stopWebRTCLiveview();
}
handleClose() {
@@ -84,32 +108,378 @@ class Dashboard extends React.Component {
});
}
getCurrentTimestamp() {
return Math.round(Date.now() / 1000);
// eslint-disable-next-line react/sort-comp
hasAgentConfig(props) {
const currentProps = props || this.props;
const { config: configResponse } = currentProps;
return !!(configResponse && configResponse.config);
}
browserSupportsWebRTC() {
return (
typeof window !== 'undefined' &&
typeof window.RTCPeerConnection !== 'undefined'
);
}
buildPeerConnectionConfig() {
const { config: configResponse } = this.props;
const agentConfig =
configResponse && configResponse.config ? configResponse.config : {};
const iceServers = [];
if (agentConfig.stunuri) {
iceServers.push({
urls: [agentConfig.stunuri],
});
}
if (agentConfig.turnuri) {
const turnServer = {
urls: [agentConfig.turnuri],
};
if (agentConfig.turn_username) {
turnServer.username = agentConfig.turn_username;
}
if (agentConfig.turn_password) {
turnServer.credential = agentConfig.turn_password;
}
iceServers.push(turnServer);
}
return {
iceServers,
iceTransportPolicy: agentConfig.turn_force === 'true' ? 'relay' : 'all',
};
}
initialiseLiveview() {
const { initialised } = this.state;
if (!initialised) {
const message = {
message_type: 'stream-sd',
};
const { connected, dispatchSend } = this.props;
if (connected) {
const { dashboard } = this.props;
if (initialised || !dashboard.cameraOnline) {
return;
}
if (!this.hasAgentConfig()) {
return;
}
if (this.browserSupportsWebRTC()) {
this.startWebRTCLiveview();
} else {
this.fallbackToSDLiveview('WebRTC is not supported in this browser.');
}
}
initialiseSDLiveview() {
if (this.requestStreamSubscription) {
return;
}
const message = {
message_type: 'stream-sd',
};
const { connected, dispatchSend } = this.props;
if (connected) {
dispatchSend(message);
}
const requestStreamInterval = interval(2000);
this.requestStreamSubscription = requestStreamInterval.subscribe(() => {
const { connected: isConnected } = this.props;
if (isConnected) {
dispatchSend(message);
}
});
}
const requestStreamInterval = interval(2000);
this.requestStreamSubscription = requestStreamInterval.subscribe(() => {
const { connected: isConnected } = this.props;
if (isConnected) {
dispatchSend(message);
}
});
this.setState({
initialised: true,
});
stopSDLiveview() {
if (this.requestStreamSubscription) {
this.requestStreamSubscription.unsubscribe();
this.requestStreamSubscription = null;
}
const { dispatchSend } = this.props;
dispatchSend({
message_type: 'stop-sd',
});
}
stopWebRTCLiveview() {
if (this.webrtcTimeout) {
window.clearTimeout(this.webrtcTimeout);
this.webrtcTimeout = null;
}
if (this.webrtcSocket) {
this.webrtcSocket.onopen = null;
this.webrtcSocket.onmessage = null;
this.webrtcSocket.onerror = null;
this.webrtcSocket.onclose = null;
this.webrtcSocket.close();
this.webrtcSocket = null;
}
if (this.webrtcPeerConnection) {
this.webrtcPeerConnection.ontrack = null;
this.webrtcPeerConnection.onicecandidate = null;
this.webrtcPeerConnection.onconnectionstatechange = null;
this.webrtcPeerConnection.close();
this.webrtcPeerConnection = null;
}
this.pendingRemoteCandidates = [];
this.webrtcOfferStarted = false;
this.webrtcSessionId = null;
this.webrtcClientId = null;
if (this.videoRef.current) {
this.videoRef.current.srcObject = null;
}
}
sendWebRTCMessage(messageType, message = {}) {
if (!this.webrtcSocket || this.webrtcSocket.readyState !== WebSocket.OPEN) {
return;
}
this.webrtcSocket.send(
JSON.stringify({
client_id: this.webrtcClientId,
message_type: messageType,
message,
})
);
}
async handleWebRTCSignalMessage(event) {
let data;
try {
data = JSON.parse(event.data);
} catch (error) {
return;
}
const { message_type: messageType, message = {} } = data;
const { session_id: sessionID, sdp, candidate } = message;
if (messageType === 'hello-back') {
await this.beginWebRTCLiveview();
return;
}
if (sessionID && sessionID !== this.webrtcSessionId) {
return;
}
switch (messageType) {
case 'webrtc-answer':
try {
await this.webrtcPeerConnection.setRemoteDescription({
type: 'answer',
sdp: window.atob(sdp),
});
await this.flushPendingRemoteCandidates();
} catch (error) {
this.fallbackToSDLiveview(
`Unable to apply WebRTC answer: ${error.message}`
);
}
break;
case 'webrtc-candidate': {
try {
const candidateInit = JSON.parse(candidate);
if (
this.webrtcPeerConnection.remoteDescription &&
this.webrtcPeerConnection.remoteDescription.type
) {
await this.webrtcPeerConnection.addIceCandidate(candidateInit);
} else {
this.pendingRemoteCandidates.push(candidateInit);
}
} catch (error) {
this.fallbackToSDLiveview(
`Unable to apply WebRTC candidate: ${error.message}`
);
}
break;
}
case 'webrtc-error':
this.fallbackToSDLiveview(
message.message || 'The agent could not start the WebRTC liveview.'
);
break;
default:
break;
}
}
async beginWebRTCLiveview() {
if (!this.webrtcPeerConnection || this.webrtcOfferStarted) {
return;
}
try {
this.webrtcOfferStarted = true;
const offer = await this.webrtcPeerConnection.createOffer({
offerToReceiveAudio: true,
offerToReceiveVideo: true,
});
await this.webrtcPeerConnection.setLocalDescription(offer);
this.sendWebRTCMessage('stream-hd', {
session_id: this.webrtcSessionId,
sdp: window.btoa(this.webrtcPeerConnection.localDescription.sdp),
});
} catch (error) {
this.fallbackToSDLiveview(
`Unable to initialise WebRTC liveview: ${error.message}`,
);
}
}
async flushPendingRemoteCandidates() {
if (
!this.webrtcPeerConnection ||
!this.webrtcPeerConnection.remoteDescription
) {
return;
}
while (this.pendingRemoteCandidates.length > 0) {
const candidateInit = this.pendingRemoteCandidates.shift();
try {
// eslint-disable-next-line no-await-in-loop
await this.webrtcPeerConnection.addIceCandidate(candidateInit);
} catch (error) {
this.fallbackToSDLiveview(
`Unable to add remote ICE candidate: ${error.message}`,
);
return;
}
}
}
startWebRTCLiveview() {
if (this.webrtcPeerConnection || this.webrtcSocket) {
return;
}
this.stopSDLiveview();
this.webrtcClientId = createUUID();
this.webrtcSessionId = createUUID();
this.pendingRemoteCandidates = [];
this.webrtcPeerConnection = new window.RTCPeerConnection(
this.buildPeerConnectionConfig()
);
this.webrtcPeerConnection.ontrack = (event) => {
const [remoteStream] = event.streams;
if (this.videoRef.current && remoteStream) {
this.videoRef.current.srcObject = remoteStream;
const playPromise = this.videoRef.current.play();
if (playPromise && playPromise.catch) {
playPromise.catch(() => {});
}
}
this.setState({
liveviewLoaded: true,
});
};
this.webrtcPeerConnection.onicecandidate = (event) => {
if (!event.candidate) {
return;
}
this.sendWebRTCMessage('webrtc-candidate', {
session_id: this.webrtcSessionId,
candidate: JSON.stringify(event.candidate.toJSON()),
});
};
this.webrtcPeerConnection.onconnectionstatechange = () => {
const { connectionState } = this.webrtcPeerConnection;
if (connectionState === 'connected') {
this.setState({
liveviewLoaded: true,
});
}
if (
connectionState === 'failed' ||
connectionState === 'disconnected' ||
connectionState === 'closed'
) {
this.fallbackToSDLiveview(
`WebRTC connection ${connectionState}, falling back to SD liveview.`,
);
}
};
this.webrtcSocket = new WebSocket(config.WS_URL);
this.webrtcSocket.onopen = () => {
this.sendWebRTCMessage('hello', {});
};
this.webrtcSocket.onmessage = this.handleWebRTCSignalMessage;
this.webrtcSocket.onerror = () => {
this.fallbackToSDLiveview('Unable to open the WebRTC signaling channel.');
};
this.webrtcSocket.onclose = () => {
const { liveviewLoaded } = this.state;
if (!liveviewLoaded) {
this.fallbackToSDLiveview('WebRTC signaling channel closed early.');
}
};
this.webrtcTimeout = window.setTimeout(() => {
const { liveviewLoaded } = this.state;
if (!liveviewLoaded) {
this.fallbackToSDLiveview(
'WebRTC connection timed out, falling back to SD liveview.'
);
}
}, 10000);
this.setState({
initialised: true,
liveviewLoaded: false,
liveviewMode: 'webrtc',
});
}
fallbackToSDLiveview(errorMessage) {
const { liveviewMode } = this.state;
if (liveviewMode === 'sd' && this.requestStreamSubscription) {
return;
}
this.stopWebRTCLiveview();
if (errorMessage) {
// eslint-disable-next-line no-console
console.warn(errorMessage);
}
this.setState(
{
initialised: true,
liveviewLoaded: false,
liveviewMode: 'sd',
},
() => {
this.initialiseSDLiveview();
}
);
}
openModal(file) {
@@ -121,7 +491,8 @@ class Dashboard extends React.Component {
render() {
const { dashboard, t, images } = this.props;
const { liveviewLoaded, open, currentRecording } = this.state;
const { liveviewLoaded, liveviewMode, open, currentRecording } = this.state;
const listenerCount = dashboard.webrtcReaders ? dashboard.webrtcReaders : 0;
// We check if the camera was getting a valid frame
// during the last 5 seconds, otherwise we assume the camera is offline.
@@ -175,7 +546,6 @@ class Dashboard extends React.Component {
divider="0"
footer={t('dashboard.total_recordings')}
/>
<Link to="/settings">
<Card
title="IP Camera"
@@ -314,7 +684,9 @@ class Dashboard extends React.Component {
)}
</div>
<div>
<h2>{t('dashboard.live_view')}</h2>
<h2>
{t('dashboard.live_view')} ({listenerCount})
</h2>
{(!liveviewLoaded || !isCameraOnline) && (
<SetupBox
btnicon="preferences"
@@ -331,12 +703,16 @@ class Dashboard extends React.Component {
liveviewLoaded && isCameraOnline ? 'visible' : 'hidden',
}}
>
<ImageCard
imageSrc={`data:image/png;base64, ${
images.length ? images[0] : ''
}`}
onerror=""
/>
{liveviewMode === 'webrtc' ? (
<video ref={this.videoRef} autoPlay muted playsInline />
) : (
<ImageCard
imageSrc={`data:image/png;base64, ${
images.length ? images[0] : ''
}`}
onerror=""
/>
)}
</div>
</div>
</div>
@@ -348,20 +724,25 @@ class Dashboard extends React.Component {
const mapStateToProps = (state /* , ownProps */) => ({
dashboard: state.agent.dashboard,
config: state.agent.config,
connected: state.wss.connected,
images: state.wss.images,
});
const mapDispatchToProps = (dispatch) => ({
dispatchSend: (message) => dispatch(send(message)),
dispatchGetConfig: (onSuccess, onError) =>
dispatch(getConfig(onSuccess, onError)),
});
Dashboard.propTypes = {
dashboard: PropTypes.object.isRequired,
config: PropTypes.object.isRequired,
connected: PropTypes.bool.isRequired,
images: PropTypes.array.isRequired,
t: PropTypes.func.isRequired,
dispatchSend: PropTypes.func.isRequired,
dispatchGetConfig: PropTypes.func.isRequired,
};
export default withTranslation()(
+178 -21
View File
@@ -3,6 +3,7 @@ import PropTypes from 'prop-types';
import { withTranslation } from 'react-i18next';
import {
Breadcrumb,
ControlBar,
VideoCard,
Button,
Modal,
@@ -16,14 +17,53 @@ import { getEvents } from '../../actions/agent';
import config from '../../config';
import './Media.scss';
function formatDateTimeLocal(date) {
const year = date.getFullYear();
const month = String(date.getMonth() + 1).padStart(2, '0');
const day = String(date.getDate()).padStart(2, '0');
const hours = String(date.getHours()).padStart(2, '0');
const minutes = String(date.getMinutes()).padStart(2, '0');
return `${year}-${month}-${day}T${hours}:${minutes}`;
}
function getDefaultTimeWindow() {
const endDate = new Date();
const startDate = new Date(endDate.getTime() - 60 * 60 * 1000);
return {
startDateTime: formatDateTimeLocal(startDate),
endDateTime: formatDateTimeLocal(endDate),
timestamp_offset_start: Math.floor(startDate.getTime() / 1000),
timestamp_offset_end: Math.floor(endDate.getTime() / 1000) + 59,
};
}
function normalizeInputValue(valueOrEvent) {
if (valueOrEvent && valueOrEvent.target) {
return valueOrEvent.target.value;
}
return valueOrEvent;
}
// eslint-disable-next-line react/prefer-stateless-function
class Media extends React.Component {
constructor() {
super();
this.state = {
timestamp_offset_start: 0,
timestamp_offset_end: 0,
const defaultTimeWindow = getDefaultTimeWindow();
const initialFilter = {
timestamp_offset_start: defaultTimeWindow.timestamp_offset_start,
timestamp_offset_end: defaultTimeWindow.timestamp_offset_end,
number_of_elements: 12,
};
this.state = {
appliedFilter: initialFilter,
startDateTime: defaultTimeWindow.startDateTime,
endDateTime: defaultTimeWindow.endDateTime,
isScrolling: false,
open: false,
currentRecording: '',
@@ -32,7 +72,8 @@ class Media extends React.Component {
componentDidMount() {
const { dispatchGetEvents } = this.props;
dispatchGetEvents(this.state);
const { appliedFilter } = this.state;
dispatchGetEvents(appliedFilter);
document.addEventListener('scroll', this.trackScrolling);
}
@@ -49,29 +90,107 @@ class Media extends React.Component {
trackScrolling = () => {
const { events, dispatchGetEvents } = this.props;
const { isScrolling } = this.state;
const { isScrolling, appliedFilter } = this.state;
const wrappedElement = document.getElementById('loader');
if (!isScrolling && this.isBottom(wrappedElement)) {
this.setState({
isScrolling: true,
});
// Get last element
const lastElement = events[events.length - 1];
if (lastElement) {
this.setState({
if (!wrappedElement || isScrolling || !this.isBottom(wrappedElement)) {
return;
}
this.setState({
isScrolling: true,
});
// Get last element
const lastElement = events[events.length - 1];
if (lastElement) {
dispatchGetEvents(
{
...appliedFilter,
timestamp_offset_end: parseInt(lastElement.timestamp, 10),
});
dispatchGetEvents(this.state, () => {
},
() => {
setTimeout(() => {
this.setState({
isScrolling: false,
});
}, 1000);
});
}
},
() => {
this.setState({
isScrolling: false,
});
},
true
);
} else {
this.setState({
isScrolling: false,
});
}
};
buildEventFilter(startDateTime, endDateTime) {
const { appliedFilter } = this.state;
return {
timestamp_offset_start: this.getTimestampFromInput(
startDateTime,
'start'
),
timestamp_offset_end: this.getTimestampFromInput(endDateTime, 'end'),
number_of_elements: appliedFilter.number_of_elements,
};
}
handleDateFilterChange(field, value) {
const { dispatchGetEvents } = this.props;
const { startDateTime, endDateTime } = this.state;
const normalizedValue = normalizeInputValue(value);
const nextStartDateTime =
field === 'startDateTime' ? normalizedValue : startDateTime;
const nextEndDateTime =
field === 'endDateTime' ? normalizedValue : endDateTime;
const nextFilter = this.buildEventFilter(
nextStartDateTime,
nextEndDateTime
);
const shouldApplyFilter =
(nextStartDateTime === '' || nextStartDateTime.length === 16) &&
(nextEndDateTime === '' || nextEndDateTime.length === 16);
this.setState(
{
[field]: normalizedValue,
appliedFilter: shouldApplyFilter
? nextFilter
: this.state.appliedFilter,
isScrolling: false,
},
() => {
if (shouldApplyFilter) {
dispatchGetEvents(nextFilter);
}
}
);
}
getTimestampFromInput(value, boundary) {
if (!value) {
return 0;
}
const date = new Date(value);
if (Number.isNaN(date.getTime())) {
return 0;
}
const seconds = Math.floor(date.getTime() / 1000);
if (boundary === 'end') {
return seconds + 59;
}
return seconds;
}
isBottom(el) {
return el.getBoundingClientRect().bottom + 50 <= window.innerHeight;
}
@@ -85,7 +204,9 @@ class Media extends React.Component {
render() {
const { events, eventsLoaded, t } = this.props;
const { isScrolling, open, currentRecording } = this.state;
const { isScrolling, open, currentRecording, startDateTime, endDateTime } =
this.state;
return (
<div id="media">
<Breadcrumb
@@ -102,6 +223,37 @@ class Media extends React.Component {
</Link>
</Breadcrumb>
<div className="media-control-bar">
<ControlBar>
<div className="media-filters">
<div className="media-filters__field">
<label htmlFor="recordings-start-time">Start time</label>
<input
className="media-filters__input"
id="recordings-start-time"
type="datetime-local"
value={startDateTime}
onChange={(value) =>
this.handleDateFilterChange('startDateTime', value)
}
/>
</div>
<div className="media-filters__field">
<label htmlFor="recordings-end-time">End time</label>
<input
className="media-filters__input"
id="recordings-end-time"
type="datetime-local"
value={endDateTime}
onChange={(value) =>
this.handleDateFilterChange('endDateTime', value)
}
/>
</div>
</div>
</ControlBar>
</div>
<div className="stats grid-container --four-columns">
{events.map((event) => (
<div
@@ -123,6 +275,11 @@ class Media extends React.Component {
</div>
))}
</div>
{events.length === 0 && eventsLoaded === 0 && (
<div className="media-empty-state">
No recordings found in the selected time range.
</div>
)}
{open && (
<Modal>
<ModalHeader
@@ -182,13 +339,13 @@ const mapStateToProps = (state /* , ownProps */) => ({
});
const mapDispatchToProps = (dispatch) => ({
dispatchGetEvents: (eventFilter, success, error) =>
dispatch(getEvents(eventFilter, success, error)),
dispatchGetEvents: (eventFilter, success, error, append) =>
dispatch(getEvents(eventFilter, success, error, append)),
});
Media.propTypes = {
t: PropTypes.func.isRequired,
events: PropTypes.objectOf(PropTypes.object).isRequired,
events: PropTypes.arrayOf(PropTypes.object).isRequired,
eventsLoaded: PropTypes.number.isRequired,
dispatchGetEvents: PropTypes.func.isRequired,
};
+99
View File
@@ -1,4 +1,103 @@
#media {
.media-control-bar {
.control-bar {
display: block;
padding: 0 var(--main-content-gutter);
}
.control-bar .filtering {
display: block;
}
.control-bar .filtering > * {
border-right: 0;
flex: 1 1 100% !important;
max-width: none;
width: 100%;
}
}
.media-filters {
align-items: stretch;
display: grid;
gap: 0;
grid-template-columns: repeat(2, minmax(0, 1fr));
min-width: 0;
width: 100%;
}
.media-filters__field {
border-right: 1px solid var(--bg-muted);
min-width: 0;
padding: 16px 24px;
label {
display: block;
font-size: 14px;
font-weight: 600;
margin-bottom: 8px;
white-space: nowrap;
}
}
.media-filters__field:first-child {
padding-left: 0;
}
.media-filters__input {
appearance: none;
background: var(--white);
border: 1px solid var(--grey-light);
border-radius: 8px;
box-sizing: border-box;
color: var(--black);
font-size: 16px;
min-height: 48px;
padding: 0 14px 0 0;
width: 100%;
}
.media-filters__input::-webkit-datetime-edit,
.media-filters__input::-webkit-datetime-edit-fields-wrapper {
padding: 0;
}
.media-filters__input:focus {
border-color: var(--oss);
outline: 0;
}
.media-filters__field:first-child .media-filters__input {
padding-left: 0;
}
.media-filters__field:last-child {
border-right: 0;
padding-right: 0;
}
@media (max-width: 700px) {
.media-filters {
grid-template-columns: 1fr;
}
.media-filters__field {
border-right: 0;
border-bottom: 1px solid var(--bg-muted);
padding-left: 0;
padding-right: 0;
}
.media-filters__field:last-child {
border-bottom: 0;
}
}
.media-empty-state {
margin: 24px 0;
opacity: 0.8;
text-align: center;
}
#loader {
display: flex;
+2 -6
View File
@@ -123,16 +123,12 @@ const agent = (
};
case 'GET_EVENTS':
const { timestamp_offset_end } = action.filter;
const { events } = action;
return {
...state,
eventsLoaded: events.length,
events:
timestamp_offset_end === 0
? [...events]
: [...state.events, ...events],
eventfilter: action.eventfilter,
events: action.append ? [...state.events, ...events] : [...events],
eventfilter: action.filter,
};
default: