mirror of
https://github.com/kerberos-io/agent.git
synced 2026-04-22 15:47:06 +08:00
Merge pull request #255 from kerberos-io/feature/improve-mqtt-concurrency
feature/improve-mqtt-concurrency
This commit is contained in:
+148
-45
@@ -31,6 +31,7 @@ const (
|
||||
// Timeouts and intervals
|
||||
keepAliveTimeout = 15 * time.Second
|
||||
defaultTimeout = 10 * time.Second
|
||||
maxLivePacketAge = 1500 * time.Millisecond
|
||||
|
||||
// Track identifiers
|
||||
trackStreamID = "kerberos-stream"
|
||||
@@ -89,22 +90,41 @@ func (cm *ConnectionManager) CloseCandidateChannel(sessionKey string) {
|
||||
}
|
||||
|
||||
// AddPeerConnection adds a peer connection to the manager
|
||||
func (cm *ConnectionManager) AddPeerConnection(sessionID string, wrapper *peerConnectionWrapper) {
|
||||
func (cm *ConnectionManager) AddPeerConnection(sessionKey string, wrapper *peerConnectionWrapper) {
|
||||
cm.mu.Lock()
|
||||
defer cm.mu.Unlock()
|
||||
cm.peerConnections[sessionID] = wrapper
|
||||
cm.peerConnections[sessionKey] = wrapper
|
||||
}
|
||||
|
||||
// RemovePeerConnection removes a peer connection from the manager
|
||||
func (cm *ConnectionManager) RemovePeerConnection(sessionID string) {
|
||||
func (cm *ConnectionManager) RemovePeerConnection(sessionKey string) {
|
||||
cm.mu.Lock()
|
||||
defer cm.mu.Unlock()
|
||||
|
||||
if wrapper, exists := cm.peerConnections[sessionID]; exists {
|
||||
if wrapper, exists := cm.peerConnections[sessionKey]; exists {
|
||||
if wrapper.cancelCtx != nil {
|
||||
wrapper.cancelCtx()
|
||||
}
|
||||
delete(cm.peerConnections, sessionID)
|
||||
delete(cm.peerConnections, sessionKey)
|
||||
}
|
||||
}
|
||||
|
||||
// QueueCandidate safely queues a candidate for a session without racing with channel closure.
|
||||
func (cm *ConnectionManager) QueueCandidate(sessionKey string, candidate string) bool {
|
||||
cm.mu.Lock()
|
||||
defer cm.mu.Unlock()
|
||||
|
||||
ch, exists := cm.candidateChannels[sessionKey]
|
||||
if !exists {
|
||||
ch = make(chan string, candidateChannelBuffer)
|
||||
cm.candidateChannels[sessionKey] = ch
|
||||
}
|
||||
|
||||
select {
|
||||
case ch <- candidate:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
@@ -123,7 +143,7 @@ func (cm *ConnectionManager) DecrementPeerCount() int64 {
|
||||
return atomic.AddInt64(&cm.peerConnectionCount, -1)
|
||||
}
|
||||
|
||||
func cleanupPeerConnection(sessionID string, sessionKey string, wrapper *peerConnectionWrapper) {
|
||||
func cleanupPeerConnection(sessionKey string, wrapper *peerConnectionWrapper) {
|
||||
wrapper.closeOnce.Do(func() {
|
||||
if wrapper.connected.Swap(false) {
|
||||
count := globalConnectionManager.DecrementPeerCount()
|
||||
@@ -138,7 +158,7 @@ func cleanupPeerConnection(sessionID string, sessionKey string, wrapper *peerCon
|
||||
}
|
||||
}
|
||||
|
||||
globalConnectionManager.RemovePeerConnection(sessionID)
|
||||
globalConnectionManager.RemovePeerConnection(sessionKey)
|
||||
close(wrapper.done)
|
||||
})
|
||||
}
|
||||
@@ -182,16 +202,27 @@ func (w WebRTC) CreateOffer(sd []byte) pionWebRTC.SessionDescription {
|
||||
}
|
||||
|
||||
func RegisterCandidates(key string, candidate models.ReceiveHDCandidatesPayload) {
|
||||
ch := globalConnectionManager.GetOrCreateCandidateChannel(key)
|
||||
|
||||
log.Log.Info("webrtc.main.RegisterCandidates(): " + candidate.Candidate)
|
||||
select {
|
||||
case ch <- candidate.Candidate:
|
||||
default:
|
||||
if !globalConnectionManager.QueueCandidate(key, candidate.Candidate) {
|
||||
log.Log.Info("webrtc.main.RegisterCandidates(): channel is full, dropping candidate")
|
||||
}
|
||||
}
|
||||
|
||||
func decodeICECandidate(candidate string) (pionWebRTC.ICECandidateInit, error) {
|
||||
if candidate == "" {
|
||||
return pionWebRTC.ICECandidateInit{}, io.EOF
|
||||
}
|
||||
|
||||
var candidateInit pionWebRTC.ICECandidateInit
|
||||
if err := json.Unmarshal([]byte(candidate), &candidateInit); err == nil {
|
||||
if candidateInit.Candidate != "" {
|
||||
return candidateInit, nil
|
||||
}
|
||||
}
|
||||
|
||||
return pionWebRTC.ICECandidateInit{Candidate: candidate}, nil
|
||||
}
|
||||
|
||||
func RegisterDefaultInterceptors(mediaEngine *pionWebRTC.MediaEngine, interceptorRegistry *interceptor.Registry) error {
|
||||
if err := pionWebRTC.ConfigureNack(mediaEngine, interceptorRegistry); err != nil {
|
||||
return err
|
||||
@@ -294,7 +325,7 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
if videoTrack != nil {
|
||||
if videoSender, err = peerConnection.AddTrack(videoTrack); err != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding video track: " + err.Error())
|
||||
cleanupPeerConnection(handshake.SessionID, sessionKey, wrapper)
|
||||
cleanupPeerConnection(sessionKey, wrapper)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
@@ -327,7 +358,7 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
if audioTrack != nil {
|
||||
if audioSender, err = peerConnection.AddTrack(audioTrack); err != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding audio track: " + err.Error())
|
||||
cleanupPeerConnection(handshake.SessionID, sessionKey, wrapper)
|
||||
cleanupPeerConnection(sessionKey, wrapper)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
@@ -361,7 +392,7 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
|
||||
switch connectionState {
|
||||
case pionWebRTC.PeerConnectionStateDisconnected, pionWebRTC.PeerConnectionStateClosed, pionWebRTC.PeerConnectionStateFailed:
|
||||
cleanupPeerConnection(handshake.SessionID, sessionKey, wrapper)
|
||||
cleanupPeerConnection(sessionKey, wrapper)
|
||||
|
||||
case pionWebRTC.PeerConnectionStateConnected:
|
||||
if wrapper.connected.CompareAndSwap(false, true) {
|
||||
@@ -386,33 +417,21 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
return
|
||||
}
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): Received candidate from channel: " + candidate)
|
||||
if candidateErr := peerConnection.AddICECandidate(pionWebRTC.ICECandidateInit{Candidate: candidate}); candidateErr != nil {
|
||||
candidateInit, decodeErr := decodeICECandidate(candidate)
|
||||
if decodeErr != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error decoding candidate: " + decodeErr.Error())
|
||||
continue
|
||||
}
|
||||
if candidateErr := peerConnection.AddICECandidate(candidateInit); candidateErr != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): error adding candidate: " + candidateErr.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
offer := w.CreateOffer(sd)
|
||||
if err = peerConnection.SetRemoteDescription(offer); err != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while setting remote description: " + err.Error())
|
||||
cleanupPeerConnection(handshake.SessionID, sessionKey, wrapper)
|
||||
return
|
||||
}
|
||||
|
||||
answer, err := peerConnection.CreateAnswer(nil)
|
||||
if err != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while creating answer: " + err.Error())
|
||||
cleanupPeerConnection(handshake.SessionID, sessionKey, wrapper)
|
||||
return
|
||||
} else if err = peerConnection.SetLocalDescription(answer); err != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while setting local description: " + err.Error())
|
||||
cleanupPeerConnection(handshake.SessionID, sessionKey, wrapper)
|
||||
return
|
||||
}
|
||||
|
||||
// When an ICE candidate is available send to the other peer using the signaling server (MQTT).
|
||||
// The other peer will add this candidate by calling AddICECandidate
|
||||
// The other peer will add this candidate by calling AddICECandidate.
|
||||
// This handler must be registered before setting the local description, otherwise early candidates can be missed.
|
||||
var hasRelayCandidates bool
|
||||
peerConnection.OnICECandidate(func(candidate *pionWebRTC.ICECandidate) {
|
||||
|
||||
@@ -457,8 +476,6 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
candateBinary, err := json.Marshal(candidateJSON)
|
||||
if err == nil {
|
||||
valueMap["candidate"] = string(candateBinary)
|
||||
// SDP is not needed to be send..
|
||||
//valueMap["sdp"] = []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP)))
|
||||
valueMap["session_id"] = handshake.SessionID
|
||||
log.Log.Info("webrtc.main.InitializeWebRTCConnection(): sending " + candidateType + " candidate to hub")
|
||||
} else {
|
||||
@@ -482,8 +499,26 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
}
|
||||
})
|
||||
|
||||
offer := w.CreateOffer(sd)
|
||||
if err = peerConnection.SetRemoteDescription(offer); err != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while setting remote description: " + err.Error())
|
||||
cleanupPeerConnection(sessionKey, wrapper)
|
||||
return
|
||||
}
|
||||
|
||||
answer, err := peerConnection.CreateAnswer(nil)
|
||||
if err != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while creating answer: " + err.Error())
|
||||
cleanupPeerConnection(sessionKey, wrapper)
|
||||
return
|
||||
} else if err = peerConnection.SetLocalDescription(answer); err != nil {
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): something went wrong while setting local description: " + err.Error())
|
||||
cleanupPeerConnection(sessionKey, wrapper)
|
||||
return
|
||||
}
|
||||
|
||||
// Store peer connection in manager
|
||||
globalConnectionManager.AddPeerConnection(handshake.SessionID, wrapper)
|
||||
globalConnectionManager.AddPeerConnection(sessionKey, wrapper)
|
||||
|
||||
// Create a config map
|
||||
valueMap := make(map[string]interface{})
|
||||
@@ -509,7 +544,7 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
|
||||
}
|
||||
} else {
|
||||
globalConnectionManager.CloseCandidateChannel(sessionKey)
|
||||
log.Log.Error("Initializwebrtc.main.InitializeWebRTCConnection()eWebRTCConnection: NewPeerConnection failed: " + err.Error())
|
||||
log.Log.Error("webrtc.main.InitializeWebRTCConnection(): failed to decode remote session description: " + err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -551,6 +586,7 @@ type streamState struct {
|
||||
lastKeepAlive int64
|
||||
peerCount int64
|
||||
start bool
|
||||
catchingUp bool
|
||||
receivedKeyFrame bool
|
||||
lastAudioSample *pionMedia.Sample
|
||||
lastVideoSample *pionMedia.Sample
|
||||
@@ -639,6 +675,41 @@ func writeFinalSamples(state *streamState, videoTrack, audioTrack *pionWebRTC.Tr
|
||||
}
|
||||
}
|
||||
|
||||
func sampleTimestamp(pkt packets.Packet) uint32 {
|
||||
if pkt.TimeLegacy > 0 {
|
||||
return uint32(pkt.TimeLegacy.Milliseconds())
|
||||
}
|
||||
|
||||
if pkt.Time > 0 {
|
||||
return uint32(pkt.Time)
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
func sampleDuration(current packets.Packet, previousTimestamp uint32, fallback time.Duration) time.Duration {
|
||||
if current.TimeLegacy > 0 {
|
||||
currentDurationMs := current.TimeLegacy.Milliseconds()
|
||||
previousDurationMs := int64(previousTimestamp)
|
||||
if currentDurationMs > previousDurationMs {
|
||||
duration := time.Duration(currentDurationMs-previousDurationMs) * time.Millisecond
|
||||
if duration > 0 {
|
||||
return duration
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
currentTimestamp := sampleTimestamp(current)
|
||||
if currentTimestamp > previousTimestamp {
|
||||
duration := time.Duration(currentTimestamp-previousTimestamp) * time.Millisecond
|
||||
if duration > 0 {
|
||||
return duration
|
||||
}
|
||||
}
|
||||
|
||||
return fallback
|
||||
}
|
||||
|
||||
// processVideoPacket processes a video packet and writes samples to the track
|
||||
func processVideoPacket(pkt packets.Packet, state *streamState, videoTrack *pionWebRTC.TrackLocalStaticSample, config models.Config) {
|
||||
if videoTrack == nil {
|
||||
@@ -654,7 +725,7 @@ func processVideoPacket(pkt packets.Packet, state *streamState, videoTrack *pion
|
||||
return
|
||||
}
|
||||
|
||||
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: uint32(pkt.Time)}
|
||||
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: sampleTimestamp(pkt)}
|
||||
|
||||
if config.Capture.ForwardWebRTC == "true" {
|
||||
// Remote forwarding not yet implemented
|
||||
@@ -663,8 +734,7 @@ func processVideoPacket(pkt packets.Packet, state *streamState, videoTrack *pion
|
||||
}
|
||||
|
||||
if state.lastVideoSample != nil {
|
||||
duration := sample.PacketTimestamp - state.lastVideoSample.PacketTimestamp
|
||||
state.lastVideoSample.Duration = time.Duration(duration) * time.Millisecond
|
||||
state.lastVideoSample.Duration = sampleDuration(pkt, state.lastVideoSample.PacketTimestamp, 33*time.Millisecond)
|
||||
|
||||
if err := videoTrack.WriteSample(*state.lastVideoSample); err != nil && err != io.ErrClosedPipe {
|
||||
log.Log.Error("webrtc.main.processVideoPacket(): error writing video sample: " + err.Error())
|
||||
@@ -686,11 +756,10 @@ func processAudioPacket(pkt packets.Packet, state *streamState, audioTrack *pion
|
||||
return
|
||||
}
|
||||
|
||||
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: uint32(pkt.Time)}
|
||||
sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: sampleTimestamp(pkt)}
|
||||
|
||||
if state.lastAudioSample != nil {
|
||||
duration := sample.PacketTimestamp - state.lastAudioSample.PacketTimestamp
|
||||
state.lastAudioSample.Duration = time.Duration(duration) * time.Millisecond
|
||||
state.lastAudioSample.Duration = sampleDuration(pkt, state.lastAudioSample.PacketTimestamp, 20*time.Millisecond)
|
||||
|
||||
if err := audioTrack.WriteSample(*state.lastAudioSample); err != nil && err != io.ErrClosedPipe {
|
||||
log.Log.Error("webrtc.main.processAudioPacket(): error writing audio sample: " + err.Error())
|
||||
@@ -700,6 +769,15 @@ func processAudioPacket(pkt packets.Packet, state *streamState, audioTrack *pion
|
||||
state.lastAudioSample = &sample
|
||||
}
|
||||
|
||||
func shouldDropPacketForLatency(pkt packets.Packet) bool {
|
||||
if pkt.CurrentTime == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
age := time.Since(time.UnixMilli(pkt.CurrentTime))
|
||||
return age > maxLivePacketAge
|
||||
}
|
||||
|
||||
func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoTrack *pionWebRTC.TrackLocalStaticSample, audioTrack *pionWebRTC.TrackLocalStaticSample, rtspClient capture.RTSPClient) {
|
||||
|
||||
config := configuration.Config
|
||||
@@ -759,6 +837,31 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C
|
||||
continue
|
||||
}
|
||||
|
||||
// Keep live WebRTC close to realtime.
|
||||
// If audio+video load makes this consumer fall behind, skip old packets and
|
||||
// wait for a recent keyframe before resuming video.
|
||||
if shouldDropPacketForLatency(pkt) {
|
||||
if !state.catchingUp {
|
||||
log.Log.Warning("webrtc.main.WriteToTrack(): stream is lagging behind, dropping old packets until the next recent keyframe")
|
||||
}
|
||||
state.catchingUp = true
|
||||
state.start = false
|
||||
state.receivedKeyFrame = false
|
||||
state.lastAudioSample = nil
|
||||
state.lastVideoSample = nil
|
||||
continue
|
||||
}
|
||||
|
||||
if state.catchingUp {
|
||||
if !(pkt.IsVideo && pkt.IsKeyFrame) {
|
||||
continue
|
||||
}
|
||||
state.catchingUp = false
|
||||
state.start = false
|
||||
state.receivedKeyFrame = false
|
||||
log.Log.Info("webrtc.main.WriteToTrack(): caught up with live stream at a recent keyframe")
|
||||
}
|
||||
|
||||
// Wait for first keyframe before processing
|
||||
if !state.receivedKeyFrame {
|
||||
if pkt.IsKeyFrame {
|
||||
|
||||
Reference in New Issue
Block a user