Files
gortsplib/server_stream_format.go
T
Alessandro Ros 4c54b29a94 allocate a rtp sender for each server session (#1021)
this produces more realistic statistics and in the future will
allow to implement per-session statistics and packet retransmission.
2026-03-16 19:46:09 +01:00

158 lines
3.3 KiB
Go

package gortsplib
import (
"crypto/rand"
"sync"
"sync/atomic"
"time"
"github.com/pion/rtp"
"github.com/bluenviron/gortsplib/v5/pkg/format"
"github.com/bluenviron/gortsplib/v5/pkg/headers"
)
func randUint32() (uint32, error) {
var b [4]byte
_, err := rand.Read(b[:])
if err != nil {
return 0, err
}
return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil
}
type serverStreamFormat struct {
ssm *serverStreamMedia
format format.Format
localSSRC uint32
multicastWriter *serverMulticastWriterFormat
mutex sync.RWMutex
firstSent bool
rtpPacketsSent uint64
lastSequenceNumber uint16
lastRTP uint32
lastNTP time.Time
}
func (ssf *serverStreamFormat) initialize() {
}
func (ssf *serverStreamFormat) rtpInfoEntry(now time.Time) *headers.RTPInfoEntry {
clockRate := ssf.format.ClockRate()
if clockRate == 0 {
return nil
}
ssf.mutex.RLock()
defer ssf.mutex.RUnlock()
if !ssf.firstSent {
return nil
}
// sequence number of the first packet of the stream
seqNum := ssf.lastSequenceNumber + 1
// RTP timestamp corresponding to the time value in
// the Range response header.
// remove a small quantity in order to avoid DTS > PTS
ts := uint32(uint64(ssf.lastRTP) +
uint64(now.Sub(ssf.lastNTP).Seconds()*float64(clockRate)) -
uint64(clockRate)/10)
return &headers.RTPInfoEntry{
SequenceNumber: &seqNum,
Timestamp: &ts,
}
}
func (ssf *serverStreamFormat) stats() ServerStreamStatsFormat {
ssf.mutex.RLock()
defer ssf.mutex.RUnlock()
return ServerStreamStatsFormat{
RTPPacketsSent: ssf.rtpPacketsSent,
LocalSSRC: ssf.localSSRC,
}
}
func (ssf *serverStreamFormat) writePacketRTP(pkt *rtp.Packet, ntp time.Time) error {
pkt.SSRC = ssf.localSSRC
maxPlainPacketSize := ssf.ssm.st.Server.MaxPacketSize
if ssf.ssm.srtpOutCtx != nil {
maxPlainPacketSize -= srtpOverhead
}
plain := make([]byte, maxPlainPacketSize)
n, err := pkt.MarshalTo(plain)
if err != nil {
return err
}
plain = plain[:n]
var encr []byte
if ssf.ssm.srtpOutCtx != nil {
encr = make([]byte, ssf.ssm.st.Server.MaxPacketSize)
encr, err = ssf.ssm.srtpOutCtx.encryptRTP(encr, plain, &pkt.Header)
if err != nil {
return err
}
}
ptsEqualsDTS := ssf.format.PTSEqualsDTS(pkt)
encrReaders := uint64(0)
plainReaders := uint64(0)
// send unicast
for r := range ssf.ssm.st.activeUnicastReaders {
if rsm, ok := r.setuppedMedias[ssf.ssm.media]; ok {
rsf := rsm.formats[pkt.PayloadType]
var buf []byte
if rsm.srtpOutCtx != nil {
buf = encr
encrReaders++
} else {
buf = plain
plainReaders++
}
err = rsf.writePacketRTPEncoded(pkt, ntp, ptsEqualsDTS, buf)
if err != nil {
r.onStreamWriteError(err)
continue
}
}
}
// send multicast
if ssf.ssm.multicastWriter != nil {
var buf []byte
if ssf.ssm.srtpOutCtx != nil {
buf = encr
encrReaders++
} else {
buf = plain
plainReaders++
}
err = ssf.multicastWriter.writePacketRTPEncoded(pkt, ntp, ptsEqualsDTS, buf)
if err != nil {
return err
}
}
atomic.AddUint64(ssf.ssm.bytesSent, uint64(len(encr))*encrReaders+uint64(len(plain))*plainReaders)
ssf.mutex.Lock()
ssf.firstSent = true
ssf.rtpPacketsSent += encrReaders + plainReaders
ssf.lastSequenceNumber = pkt.SequenceNumber
ssf.lastRTP = pkt.Timestamp
ssf.lastNTP = ntp
ssf.mutex.Unlock()
return nil
}