use safer atomic structs instead of atomic functions (#1039)

This commit is contained in:
Alessandro Ros
2026-03-31 11:31:08 +02:00
committed by GitHub
parent df75157370
commit 569226dd3a
16 changed files with 197 additions and 138 deletions
+15 -17
View File
@@ -24,9 +24,9 @@ import (
"github.com/pion/rtp"
"github.com/bluenviron/gortsplib/v5/internal/asyncprocessor"
"github.com/bluenviron/gortsplib/v5/internal/bytecounter"
"github.com/bluenviron/gortsplib/v5/pkg/auth"
"github.com/bluenviron/gortsplib/v5/pkg/base"
"github.com/bluenviron/gortsplib/v5/pkg/bytecounter"
"github.com/bluenviron/gortsplib/v5/pkg/conn"
"github.com/bluenviron/gortsplib/v5/pkg/description"
"github.com/bluenviron/gortsplib/v5/pkg/format"
@@ -562,7 +562,7 @@ type Client struct {
lastRange *headers.Range
checkTimeoutTimer *time.Timer
checkTimeoutInitial bool
tcpLastFrameTime *int64
tcpLastFrameTime atomic.Int64
keepAlivePeriod time.Duration
keepAliveTimer *time.Timer
closeError error
@@ -573,8 +573,8 @@ type Client struct {
mustClose bool
tcpFrame *base.InterleavedFrame
tcpBuffer []byte
bytesReceived *uint64
bytesSent *uint64
bytesReceived atomic.Uint64
bytesSent atomic.Uint64
// in
chOptions chan optionsReq
@@ -695,8 +695,6 @@ func (c *Client) Start() error {
c.checkTimeoutTimer = emptyTimer()
c.keepAlivePeriod = 30 * time.Second
c.keepAliveTimer = emptyTimer()
c.bytesReceived = new(uint64)
c.bytesSent = new(uint64)
c.chOptions = make(chan optionsReq)
c.chDescribe = make(chan describeReq)
@@ -1068,7 +1066,7 @@ func (c *Client) startTransportRoutines() {
default: // TCP
c.checkTimeoutTimer = time.NewTimer(c.checkTimeoutPeriod)
c.tcpLastFrameTime = ptrOf(c.timeNow().Unix())
c.tcpLastFrameTime.Store(c.timeNow().Unix())
}
}
@@ -1189,7 +1187,7 @@ func (c *Client) connOpen() error {
}
c.nconn = nconn
bc := bytecounter.New(c.nconn, c.bytesReceived, c.bytesSent)
bc := bytecounter.New(c.nconn, &c.bytesReceived, &c.bytesSent)
c.conn = conn.NewConn(bufio.NewReader(bc), bc)
c.reader = &clientReader{
c: c,
@@ -1284,12 +1282,12 @@ func (c *Client) do(req *base.Request, skipResponse bool) (*base.Response, error
func (c *Client) atLeastOneUDPPacketHasBeenReceived() bool {
for _, ct := range c.setuppedMedias {
lft := atomic.LoadInt64(ct.udpRTPListener.lastPacketTime)
lft := ct.udpRTPListener.lastPacketTime.Load()
if lft != 0 {
return true
}
lft = atomic.LoadInt64(ct.udpRTCPListener.lastPacketTime)
lft = ct.udpRTCPListener.lastPacketTime.Load()
if lft != 0 {
return true
}
@@ -1300,12 +1298,12 @@ func (c *Client) atLeastOneUDPPacketHasBeenReceived() bool {
func (c *Client) isInUDPTimeout() bool {
now := c.timeNow()
for _, ct := range c.setuppedMedias {
lft := time.Unix(atomic.LoadInt64(ct.udpRTPListener.lastPacketTime), 0)
lft := time.Unix(ct.udpRTPListener.lastPacketTime.Load(), 0)
if now.Sub(lft) < c.ReadTimeout {
return false
}
lft = time.Unix(atomic.LoadInt64(ct.udpRTCPListener.lastPacketTime), 0)
lft = time.Unix(ct.udpRTCPListener.lastPacketTime.Load(), 0)
if now.Sub(lft) < c.ReadTimeout {
return false
}
@@ -1315,7 +1313,7 @@ func (c *Client) isInUDPTimeout() bool {
func (c *Client) isInTCPTimeout() bool {
now := c.timeNow()
lft := time.Unix(atomic.LoadInt64(c.tcpLastFrameTime), 0)
lft := time.Unix(c.tcpLastFrameTime.Load(), 0)
return now.Sub(lft) >= c.ReadTimeout
}
@@ -2468,10 +2466,10 @@ func (c *Client) Stats() *ClientStats {
return &ClientStats{
Conn: ConnStats{
InboundBytes: atomic.LoadUint64(c.bytesReceived),
OutboundBytes: atomic.LoadUint64(c.bytesSent),
BytesReceived: atomic.LoadUint64(c.bytesReceived),
BytesSent: atomic.LoadUint64(c.bytesSent),
InboundBytes: c.bytesReceived.Load(),
OutboundBytes: c.bytesSent.Load(),
BytesReceived: c.bytesReceived.Load(),
BytesSent: c.bytesSent.Load(),
},
Session: sessionStatsFromMedias(mediaStats),
}
+1 -2
View File
@@ -3,7 +3,6 @@ package gortsplib
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/pion/rtcp"
@@ -305,7 +304,7 @@ func (cf *clientFormat) writePacketRTPEncoded(
) error {
cf.rtpSender.ProcessPacket(pkt, ntp, ptsEqualsDTS)
atomic.AddUint64(cf.cm.bytesSent, uint64(len(payload)))
cf.cm.bytesSent.Add(uint64(len(payload)))
cf.cm.c.writerMutex.RLock()
defer cf.cm.c.writerMutex.RUnlock()
+34 -40
View File
@@ -126,22 +126,16 @@ type clientMedia struct {
onPacketRTCP OnPacketRTCPFunc
formats map[uint8]*clientFormat
writePacketRTCPInQueue func([]byte) error
bytesReceived *uint64
bytesSent *uint64
rtpPacketsInError *uint64
rtcpPacketsReceived *uint64
rtcpPacketsSent *uint64
rtcpPacketsInError *uint64
bytesReceived atomic.Uint64
bytesSent atomic.Uint64
rtpPacketsInError atomic.Uint64
rtcpPacketsReceived atomic.Uint64
rtcpPacketsSent atomic.Uint64
rtcpPacketsInError atomic.Uint64
}
func (cm *clientMedia) initialize() {
cm.onPacketRTCP = func(rtcp.Packet) {}
cm.bytesReceived = new(uint64)
cm.bytesSent = new(uint64)
cm.rtpPacketsInError = new(uint64)
cm.rtcpPacketsReceived = new(uint64)
cm.rtcpPacketsSent = new(uint64)
cm.rtcpPacketsInError = new(uint64)
cm.formats = make(map[uint8]*clientFormat)
@@ -207,12 +201,12 @@ func (cm *clientMedia) stop() {
func (cm *clientMedia) stats() SessionStatsMedia { //nolint:dupl
return SessionStatsMedia{
InboundBytes: atomic.LoadUint64(cm.bytesReceived),
InboundRTPPacketsInError: atomic.LoadUint64(cm.rtpPacketsInError),
InboundRTCPPackets: atomic.LoadUint64(cm.rtcpPacketsReceived),
InboundRTCPPacketsInError: atomic.LoadUint64(cm.rtcpPacketsInError),
OutboundBytes: atomic.LoadUint64(cm.bytesSent),
OutboundRTCPPackets: atomic.LoadUint64(cm.rtcpPacketsSent),
InboundBytes: cm.bytesReceived.Load(),
InboundRTPPacketsInError: cm.rtpPacketsInError.Load(),
InboundRTCPPackets: cm.rtcpPacketsReceived.Load(),
InboundRTCPPacketsInError: cm.rtcpPacketsInError.Load(),
OutboundBytes: cm.bytesSent.Load(),
OutboundRTCPPackets: cm.rtcpPacketsSent.Load(),
Formats: func() map[format.Format]SessionStatsFormat {
ret := make(map[format.Format]SessionStatsFormat, len(cm.formats))
for _, fo := range cm.formats {
@@ -221,12 +215,12 @@ func (cm *clientMedia) stats() SessionStatsMedia { //nolint:dupl
return ret
}(),
// deprecated
BytesReceived: atomic.LoadUint64(cm.bytesReceived),
BytesSent: atomic.LoadUint64(cm.bytesSent),
RTPPacketsInError: atomic.LoadUint64(cm.rtpPacketsInError),
RTCPPacketsReceived: atomic.LoadUint64(cm.rtcpPacketsReceived),
RTCPPacketsSent: atomic.LoadUint64(cm.rtcpPacketsSent),
RTCPPacketsInError: atomic.LoadUint64(cm.rtcpPacketsInError),
BytesReceived: cm.bytesReceived.Load(),
BytesSent: cm.bytesSent.Load(),
RTPPacketsInError: cm.rtpPacketsInError.Load(),
RTCPPacketsReceived: cm.rtcpPacketsReceived.Load(),
RTCPPacketsSent: cm.rtcpPacketsSent.Load(),
RTCPPacketsInError: cm.rtcpPacketsInError.Load(),
}
}
@@ -303,7 +297,7 @@ func (cm *clientMedia) readPacketRTCPPlay(payload []byte) bool {
now := cm.c.timeNow()
atomic.AddUint64(cm.rtcpPacketsReceived, uint64(len(packets)))
cm.rtcpPacketsReceived.Add(uint64(len(packets)))
for _, pkt := range packets {
if sr, ok := pkt.(*rtcp.SenderReport); ok {
@@ -326,7 +320,7 @@ func (cm *clientMedia) readPacketRTCPRecord(payload []byte) bool {
return false
}
atomic.AddUint64(cm.rtcpPacketsReceived, uint64(len(packets)))
cm.rtcpPacketsReceived.Add(uint64(len(packets)))
for _, pkt := range packets {
if rr, ok := pkt.(*rtcp.ReceiverReport); ok {
@@ -345,19 +339,19 @@ func (cm *clientMedia) readPacketRTCPRecord(payload []byte) bool {
}
func (cm *clientMedia) readPacketRTPTCPPlay(payload []byte) bool {
atomic.AddUint64(cm.bytesReceived, uint64(len(payload)))
cm.bytesReceived.Add(uint64(len(payload)))
now := cm.c.timeNow()
atomic.StoreInt64(cm.c.tcpLastFrameTime, now.Unix())
cm.c.tcpLastFrameTime.Store(now.Unix())
return cm.readPacketRTP(payload, now)
}
func (cm *clientMedia) readPacketRTCPTCPPlay(payload []byte) bool {
atomic.AddUint64(cm.bytesReceived, uint64(len(payload)))
cm.bytesReceived.Add(uint64(len(payload)))
now := cm.c.timeNow()
atomic.StoreInt64(cm.c.tcpLastFrameTime, now.Unix())
cm.c.tcpLastFrameTime.Store(now.Unix())
if len(payload) > udpMaxPayloadSize {
cm.onPacketRTCPDecodeError(liberrors.ErrClientRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize})
@@ -372,7 +366,7 @@ func (cm *clientMedia) readPacketRTPTCPRecord(_ []byte) bool {
}
func (cm *clientMedia) readPacketRTCPTCPRecord(payload []byte) bool {
atomic.AddUint64(cm.bytesReceived, uint64(len(payload)))
cm.bytesReceived.Add(uint64(len(payload)))
if len(payload) > udpMaxPayloadSize {
cm.onPacketRTCPDecodeError(liberrors.ErrClientRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize})
@@ -383,7 +377,7 @@ func (cm *clientMedia) readPacketRTCPTCPRecord(payload []byte) bool {
}
func (cm *clientMedia) readPacketRTPUDPPlay(payload []byte) bool {
atomic.AddUint64(cm.bytesReceived, uint64(len(payload)))
cm.bytesReceived.Add(uint64(len(payload)))
if len(payload) == (udpMaxPayloadSize + 1) {
cm.onPacketRTPDecodeError(liberrors.ErrClientRTPPacketTooBigUDP{})
@@ -394,7 +388,7 @@ func (cm *clientMedia) readPacketRTPUDPPlay(payload []byte) bool {
}
func (cm *clientMedia) readPacketRTCPUDPPlay(payload []byte) bool {
atomic.AddUint64(cm.bytesReceived, uint64(len(payload)))
cm.bytesReceived.Add(uint64(len(payload)))
if len(payload) == (udpMaxPayloadSize + 1) {
cm.onPacketRTCPDecodeError(liberrors.ErrClientRTCPPacketTooBigUDP{})
@@ -409,7 +403,7 @@ func (cm *clientMedia) readPacketRTPUDPRecord(_ []byte) bool {
}
func (cm *clientMedia) readPacketRTCPUDPRecord(payload []byte) bool {
atomic.AddUint64(cm.bytesReceived, uint64(len(payload)))
cm.bytesReceived.Add(uint64(len(payload)))
if len(payload) == (udpMaxPayloadSize + 1) {
cm.onPacketRTCPDecodeError(liberrors.ErrClientRTCPPacketTooBigUDP{})
@@ -420,12 +414,12 @@ func (cm *clientMedia) readPacketRTCPUDPRecord(payload []byte) bool {
}
func (cm *clientMedia) onPacketRTPDecodeError(err error) {
atomic.AddUint64(cm.rtpPacketsInError, 1)
cm.rtpPacketsInError.Add(1)
cm.c.OnDecodeError(err)
}
func (cm *clientMedia) onPacketRTCPDecodeError(err error) {
atomic.AddUint64(cm.rtcpPacketsInError, 1)
cm.rtcpPacketsInError.Add(1)
cm.c.OnDecodeError(err)
}
@@ -476,8 +470,8 @@ func (cm *clientMedia) writePacketRTCPInQueueUDP(payload []byte) error {
return err
}
atomic.AddUint64(cm.bytesSent, uint64(len(payload)))
atomic.AddUint64(cm.rtcpPacketsSent, 1)
cm.bytesSent.Add(uint64(len(payload)))
cm.rtcpPacketsSent.Add(1)
return nil
}
@@ -490,7 +484,7 @@ func (cm *clientMedia) writePacketRTCPInQueueTCP(payload []byte) error {
return err
}
atomic.AddUint64(cm.bytesSent, uint64(len(payload)))
atomic.AddUint64(cm.rtcpPacketsSent, 1)
cm.bytesSent.Add(uint64(len(payload)))
cm.rtcpPacketsSent.Add(1)
return nil
}
+3 -3
View File
@@ -2405,7 +2405,7 @@ func TestClientPlayPausePlay(t *testing.T) {
require.NoError(t, err2)
}()
firstFrame := int32(0)
var firstFrame atomic.Int32
packetRecv := make(chan struct{})
c := Client{
@@ -2421,7 +2421,7 @@ func TestClientPlayPausePlay(t *testing.T) {
err = readAll(&c, "rtsp://localhost:8554/teststream",
func(_ *description.Media, _ format.Format, _ *rtp.Packet) {
if atomic.SwapInt32(&firstFrame, 1) == 0 {
if firstFrame.Swap(1) == 0 {
close(packetRecv)
}
})
@@ -2433,7 +2433,7 @@ func TestClientPlayPausePlay(t *testing.T) {
_, err = c.Pause()
require.NoError(t, err)
firstFrame = int32(0)
firstFrame.Store(0)
packetRecv = make(chan struct{})
_, err = c.Play(nil)
+2 -3
View File
@@ -44,7 +44,7 @@ type clientUDPListener struct {
writeAddr *net.UDPAddr
running bool
lastPacketTime *int64
lastPacketTime atomic.Int64
done chan struct{}
}
@@ -72,7 +72,6 @@ func (u *clientUDPListener) initialize() error {
}
}
u.lastPacketTime = ptrOf(int64(0))
return nil
}
@@ -134,7 +133,7 @@ func (u *clientUDPListener) run() {
}
now := u.c.timeNow()
atomic.StoreInt64(u.lastPacketTime, now.Unix())
u.lastPacketTime.Store(now.Unix())
if u.readFunc(buf[:n]) {
createNewBuffer()
+54
View File
@@ -0,0 +1,54 @@
// Package bytecounter contains a io.ReadWriter wrapper that allows to count read and written bytes.
package bytecounter
import (
"io"
"sync/atomic"
)
// ByteCounter is a io.ReadWriter wrapper that allows to count read and written bytes.
type ByteCounter struct {
rw io.ReadWriter
received *atomic.Uint64
sent *atomic.Uint64
}
// New allocates a ByteCounter.
func New(rw io.ReadWriter, received *atomic.Uint64, sent *atomic.Uint64) *ByteCounter {
if received == nil {
received = new(atomic.Uint64)
}
if sent == nil {
sent = new(atomic.Uint64)
}
return &ByteCounter{
rw: rw,
received: received,
sent: sent,
}
}
// Read implements io.ReadWriter.
func (bc *ByteCounter) Read(p []byte) (int, error) {
n, err := bc.rw.Read(p)
bc.received.Add(uint64(n))
return n, err
}
// Write implements io.ReadWriter.
func (bc *ByteCounter) Write(p []byte) (int, error) {
n, err := bc.rw.Write(p)
bc.sent.Add(uint64(n))
return n, err
}
// BytesReceived returns the number of bytes received.
func (bc *ByteCounter) BytesReceived() uint64 {
return bc.received.Load()
}
// BytesSent returns the number of bytes sent.
func (bc *ByteCounter) BytesSent() uint64 {
return bc.sent.Load()
}
+22
View File
@@ -0,0 +1,22 @@
package bytecounter
import (
"bytes"
"testing"
"github.com/stretchr/testify/require"
)
func TestByteCounter(t *testing.T) {
bc := New(bytes.NewBuffer(nil), nil, nil)
_, err := bc.Write([]byte{0x01, 0x02, 0x03, 0x04})
require.NoError(t, err)
buf := make([]byte, 2)
_, err = bc.Read(buf)
require.NoError(t, err)
require.Equal(t, uint64(4), bc.BytesSent())
require.Equal(t, uint64(2), bc.BytesReceived())
}
+4
View File
@@ -7,6 +7,8 @@ import (
)
// ByteCounter is a io.ReadWriter wrapper that allows to count read and written bytes.
//
// Deprecated: not exposed anymore. will be removed in next version.
type ByteCounter struct {
rw io.ReadWriter
received *uint64
@@ -14,6 +16,8 @@ type ByteCounter struct {
}
// New allocates a ByteCounter.
//
// Deprecated: not exposed anymore. will be removed in next version.
func New(rw io.ReadWriter, received *uint64, sent *uint64) *ByteCounter {
if received == nil {
received = new(uint64)
+1 -1
View File
@@ -13,9 +13,9 @@ import (
"sync"
"time"
"github.com/bluenviron/gortsplib/v5/internal/bytecounter"
"github.com/bluenviron/gortsplib/v5/pkg/auth"
"github.com/bluenviron/gortsplib/v5/pkg/base"
"github.com/bluenviron/gortsplib/v5/pkg/bytecounter"
"github.com/bluenviron/gortsplib/v5/pkg/conn"
"github.com/bluenviron/gortsplib/v5/pkg/description"
"github.com/bluenviron/gortsplib/v5/pkg/headers"
+5 -4
View File
@@ -491,13 +491,14 @@ func TestServerPlaySetupErrors(t *testing.T) {
func TestServerPlaySetupErrorSameUDPPortsAndIP(t *testing.T) {
var stream *ServerStream
first := int32(1)
var first atomic.Int32
first.Store(1)
errorRecv := make(chan struct{})
s := &Server{
Handler: &testServerHandler{
onConnClose: func(ctx *ServerHandlerOnConnCloseCtx) {
if atomic.SwapInt32(&first, 0) == 1 {
if first.Swap(0) == 1 {
require.EqualError(t, ctx.Error,
"UDP ports 35466 and 35467 are already assigned to another reader with the same IP")
close(errorRecv)
@@ -622,7 +623,7 @@ func TestServerPlay(t *testing.T) {
sessionOpened := make(chan struct{})
sessionClosed := make(chan struct{})
framesReceived := make(chan struct{})
counter := uint64(0)
var counter atomic.Uint64
listenIP := multicastCapableIP(t)
@@ -734,7 +735,7 @@ func TestServerPlay(t *testing.T) {
ctx.Session.OnPacketRTCPAny(func(medi *description.Media, pkt rtcp.Packet) {
// ignore multicast loopback
if ca.secure == "unsecure" && ca.transport == "multicast" && atomic.AddUint64(&counter, 1) > 1 {
if ca.secure == "unsecure" && ca.transport == "multicast" && counter.Add(1) > 1 {
return
}
+4 -4
View File
@@ -314,7 +314,7 @@ type ServerSession struct {
lastRequestTime time.Time
tcpConn *ServerConn
announcedDesc *description.Session // record
udpLastPacketTime *int64 // record
udpLastPacketTime atomic.Int64 // record
udpCheckStreamTimer *time.Timer
writerMutex sync.RWMutex
writer *asyncprocessor.Processor
@@ -659,7 +659,7 @@ func (ss *ServerSession) runInner() error {
case <-ss.udpCheckStreamTimer.C:
now := ss.s.timeNow()
lft := atomic.LoadInt64(ss.udpLastPacketTime)
lft := ss.udpLastPacketTime.Load()
// in case of RECORD, timeout happens when no RTP or RTCP packets are being received
if ss.state == ServerSessionStateRecord {
@@ -1231,7 +1231,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
ss.state = ServerSessionStatePlay
ss.propsMutex.Unlock()
ss.udpLastPacketTime = ptrOf(ss.s.timeNow().Unix())
ss.udpLastPacketTime.Store(ss.s.timeNow().Unix())
ss.timeDecoder = &rtptime.GlobalDecoder{}
ss.timeDecoder.Initialize()
@@ -1323,7 +1323,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
if res.StatusCode == base.StatusOK {
ss.state = ServerSessionStateRecord
ss.udpLastPacketTime = ptrOf(ss.s.timeNow().Unix())
ss.udpLastPacketTime.Store(ss.s.timeNow().Unix())
ss.timeDecoder = &rtptime.GlobalDecoder{}
ss.timeDecoder.Initialize()
+1 -2
View File
@@ -4,7 +4,6 @@ import (
"fmt"
"log"
"sync"
"sync/atomic"
"time"
"github.com/pion/rtcp"
@@ -323,7 +322,7 @@ func (ssf *serverSessionFormat) writePacketRTPEncoded(
) error {
ssf.rtpSender.ProcessPacket(pkt, ntp, ptsEqualsDTS)
atomic.AddUint64(ssf.ssm.bytesSent, uint64(len(payload)))
ssf.ssm.bytesSent.Add(uint64(len(payload)))
ssf.ssm.ss.writerMutex.RLock()
defer ssf.ssm.ss.writerMutex.RUnlock()
+38 -45
View File
@@ -30,22 +30,15 @@ type serverSessionMedia struct {
formats map[uint8]*serverSessionFormat // record only
writePacketRTCPInQueue func([]byte) error
bytesReceived *uint64
bytesSent *uint64
rtpPacketsInError *uint64
rtcpPacketsReceived *uint64
rtcpPacketsSent *uint64
rtcpPacketsInError *uint64
bytesReceived atomic.Uint64
bytesSent atomic.Uint64
rtpPacketsInError atomic.Uint64
rtcpPacketsReceived atomic.Uint64
rtcpPacketsSent atomic.Uint64
rtcpPacketsInError atomic.Uint64
}
func (ssm *serverSessionMedia) initialize() {
ssm.bytesReceived = new(uint64)
ssm.bytesSent = new(uint64)
ssm.rtpPacketsInError = new(uint64)
ssm.rtcpPacketsReceived = new(uint64)
ssm.rtcpPacketsSent = new(uint64)
ssm.rtcpPacketsInError = new(uint64)
ssm.formats = make(map[uint8]*serverSessionFormat)
for _, forma := range ssm.media.Formats {
@@ -145,12 +138,12 @@ func (ssm *serverSessionMedia) stop() {
func (ssm *serverSessionMedia) stats() SessionStatsMedia { //nolint:dupl
return SessionStatsMedia{
InboundBytes: atomic.LoadUint64(ssm.bytesReceived),
InboundRTPPacketsInError: atomic.LoadUint64(ssm.rtpPacketsInError),
InboundRTCPPackets: atomic.LoadUint64(ssm.rtcpPacketsReceived),
InboundRTCPPacketsInError: atomic.LoadUint64(ssm.rtcpPacketsInError),
OutboundBytes: atomic.LoadUint64(ssm.bytesSent),
OutboundRTCPPackets: atomic.LoadUint64(ssm.rtcpPacketsSent),
InboundBytes: ssm.bytesReceived.Load(),
InboundRTPPacketsInError: ssm.rtpPacketsInError.Load(),
InboundRTCPPackets: ssm.rtcpPacketsReceived.Load(),
InboundRTCPPacketsInError: ssm.rtcpPacketsInError.Load(),
OutboundBytes: ssm.bytesSent.Load(),
OutboundRTCPPackets: ssm.rtcpPacketsSent.Load(),
Formats: func() map[format.Format]SessionStatsFormat {
ret := make(map[format.Format]SessionStatsFormat, len(ssm.formats))
for _, ssf := range ssm.formats {
@@ -159,12 +152,12 @@ func (ssm *serverSessionMedia) stats() SessionStatsMedia { //nolint:dupl
return ret
}(),
// deprecated
BytesReceived: atomic.LoadUint64(ssm.bytesReceived),
BytesSent: atomic.LoadUint64(ssm.bytesSent),
RTPPacketsInError: atomic.LoadUint64(ssm.rtpPacketsInError),
RTCPPacketsReceived: atomic.LoadUint64(ssm.rtcpPacketsReceived),
RTCPPacketsSent: atomic.LoadUint64(ssm.rtcpPacketsSent),
RTCPPacketsInError: atomic.LoadUint64(ssm.rtcpPacketsInError),
BytesReceived: ssm.bytesReceived.Load(),
BytesSent: ssm.bytesSent.Load(),
RTPPacketsInError: ssm.rtpPacketsInError.Load(),
RTCPPacketsReceived: ssm.rtcpPacketsReceived.Load(),
RTCPPacketsSent: ssm.rtcpPacketsSent.Load(),
RTCPPacketsInError: ssm.rtcpPacketsInError.Load(),
}
}
@@ -239,7 +232,7 @@ func (ssm *serverSessionMedia) readPacketRTCPPlay(payload []byte) bool {
return false
}
atomic.AddUint64(ssm.rtcpPacketsReceived, uint64(len(packets)))
ssm.rtcpPacketsReceived.Add(uint64(len(packets)))
for _, pkt := range packets {
if rr, ok := pkt.(*rtcp.ReceiverReport); ok {
@@ -266,7 +259,7 @@ func (ssm *serverSessionMedia) readPacketRTCPRecord(payload []byte) bool {
now := ssm.ss.s.timeNow()
atomic.AddUint64(ssm.rtcpPacketsReceived, uint64(len(packets)))
ssm.rtcpPacketsReceived.Add(uint64(len(packets)))
for _, pkt := range packets {
if sr, ok := pkt.(*rtcp.SenderReport); ok {
@@ -283,10 +276,10 @@ func (ssm *serverSessionMedia) readPacketRTCPRecord(payload []byte) bool {
}
func (ssm *serverSessionMedia) readPacketRTPUDPPlay(payload []byte) bool {
atomic.AddUint64(ssm.bytesReceived, uint64(len(payload)))
ssm.bytesReceived.Add(uint64(len(payload)))
now := ssm.ss.s.timeNow()
atomic.StoreInt64(ssm.ss.udpLastPacketTime, now.Unix())
ssm.ss.udpLastPacketTime.Store(now.Unix())
if len(payload) == (udpMaxPayloadSize + 1) {
ssm.onPacketRTPDecodeError(liberrors.ErrServerRTPPacketTooBigUDP{})
@@ -297,10 +290,10 @@ func (ssm *serverSessionMedia) readPacketRTPUDPPlay(payload []byte) bool {
}
func (ssm *serverSessionMedia) readPacketRTCPUDPPlay(payload []byte) bool {
atomic.AddUint64(ssm.bytesReceived, uint64(len(payload)))
ssm.bytesReceived.Add(uint64(len(payload)))
now := ssm.ss.s.timeNow()
atomic.StoreInt64(ssm.ss.udpLastPacketTime, now.Unix())
ssm.ss.udpLastPacketTime.Store(now.Unix())
if len(payload) == (udpMaxPayloadSize + 1) {
ssm.onPacketRTCPDecodeError(liberrors.ErrServerRTCPPacketTooBigUDP{})
@@ -311,10 +304,10 @@ func (ssm *serverSessionMedia) readPacketRTCPUDPPlay(payload []byte) bool {
}
func (ssm *serverSessionMedia) readPacketRTPUDPRecord(payload []byte) bool {
atomic.AddUint64(ssm.bytesReceived, uint64(len(payload)))
ssm.bytesReceived.Add(uint64(len(payload)))
now := ssm.ss.s.timeNow()
atomic.StoreInt64(ssm.ss.udpLastPacketTime, now.Unix())
ssm.ss.udpLastPacketTime.Store(now.Unix())
if len(payload) == (udpMaxPayloadSize + 1) {
ssm.onPacketRTPDecodeError(liberrors.ErrServerRTPPacketTooBigUDP{})
@@ -325,10 +318,10 @@ func (ssm *serverSessionMedia) readPacketRTPUDPRecord(payload []byte) bool {
}
func (ssm *serverSessionMedia) readPacketRTCPUDPRecord(payload []byte) bool {
atomic.AddUint64(ssm.bytesReceived, uint64(len(payload)))
ssm.bytesReceived.Add(uint64(len(payload)))
now := ssm.ss.s.timeNow()
atomic.StoreInt64(ssm.ss.udpLastPacketTime, now.Unix())
ssm.ss.udpLastPacketTime.Store(now.Unix())
if len(payload) == (udpMaxPayloadSize + 1) {
ssm.onPacketRTCPDecodeError(liberrors.ErrServerRTCPPacketTooBigUDP{})
@@ -343,13 +336,13 @@ func (ssm *serverSessionMedia) readPacketRTPTCPPlay(payload []byte) bool {
return false
}
atomic.AddUint64(ssm.bytesReceived, uint64(len(payload)))
ssm.bytesReceived.Add(uint64(len(payload)))
return ssm.readPacketRTP(payload, ssm.ss.s.timeNow())
}
func (ssm *serverSessionMedia) readPacketRTCPTCPPlay(payload []byte) bool {
atomic.AddUint64(ssm.bytesReceived, uint64(len(payload)))
ssm.bytesReceived.Add(uint64(len(payload)))
if len(payload) > udpMaxPayloadSize {
ssm.onPacketRTCPDecodeError(liberrors.ErrServerRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize})
@@ -360,13 +353,13 @@ func (ssm *serverSessionMedia) readPacketRTCPTCPPlay(payload []byte) bool {
}
func (ssm *serverSessionMedia) readPacketRTPTCPRecord(payload []byte) bool {
atomic.AddUint64(ssm.bytesReceived, uint64(len(payload)))
ssm.bytesReceived.Add(uint64(len(payload)))
return ssm.readPacketRTP(payload, ssm.ss.s.timeNow())
}
func (ssm *serverSessionMedia) readPacketRTCPTCPRecord(payload []byte) bool {
atomic.AddUint64(ssm.bytesReceived, uint64(len(payload)))
ssm.bytesReceived.Add(uint64(len(payload)))
if len(payload) > udpMaxPayloadSize {
ssm.onPacketRTCPDecodeError(liberrors.ErrServerRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize})
@@ -377,7 +370,7 @@ func (ssm *serverSessionMedia) readPacketRTCPTCPRecord(payload []byte) bool {
}
func (ssm *serverSessionMedia) onPacketRTPDecodeError(err error) {
atomic.AddUint64(ssm.rtpPacketsInError, 1)
ssm.rtpPacketsInError.Add(1)
if h, ok := ssm.ss.s.Handler.(ServerHandlerOnDecodeError); ok {
h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{
@@ -390,7 +383,7 @@ func (ssm *serverSessionMedia) onPacketRTPDecodeError(err error) {
}
func (ssm *serverSessionMedia) onPacketRTCPDecodeError(err error) {
atomic.AddUint64(ssm.rtcpPacketsInError, 1)
ssm.rtcpPacketsInError.Add(1)
if h, ok := ssm.ss.s.Handler.(ServerHandlerOnDecodeError); ok {
h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{
@@ -456,8 +449,8 @@ func (ssm *serverSessionMedia) writePacketRTCPInQueueUDP(payload []byte) error {
return err
}
atomic.AddUint64(ssm.bytesSent, uint64(len(payload)))
atomic.AddUint64(ssm.rtcpPacketsSent, 1)
ssm.bytesSent.Add(uint64(len(payload)))
ssm.rtcpPacketsSent.Add(1)
return nil
}
@@ -470,7 +463,7 @@ func (ssm *serverSessionMedia) writePacketRTCPInQueueTCP(payload []byte) error {
return err
}
atomic.AddUint64(ssm.bytesSent, uint64(len(payload)))
atomic.AddUint64(ssm.rtcpPacketsSent, 1)
ssm.bytesSent.Add(uint64(len(payload)))
ssm.rtcpPacketsSent.Add(1)
return nil
}
+1 -2
View File
@@ -3,7 +3,6 @@ package gortsplib
import (
"crypto/rand"
"sync"
"sync/atomic"
"time"
"github.com/pion/rtp"
@@ -146,7 +145,7 @@ func (ssf *serverStreamFormat) writePacketRTP(pkt *rtp.Packet, ntp time.Time) er
}
}
atomic.AddUint64(ssf.ssm.bytesSent, uint64(len(encr))*encrReaders+uint64(len(plain))*plainReaders)
ssf.ssm.bytesSent.Add(uint64(len(encr))*encrReaders + uint64(len(plain))*plainReaders)
ssf.mutex.Lock()
ssf.firstSent = true
+10 -13
View File
@@ -20,14 +20,11 @@ type serverStreamMedia struct {
formats map[uint8]*serverStreamFormat
multicastWriter *serverMulticastWriterMedia
bytesSent *uint64
rtcpPacketsSent *uint64
bytesSent atomic.Uint64
rtcpPacketsSent atomic.Uint64
}
func (ssm *serverStreamMedia) initialize() {
ssm.bytesSent = new(uint64)
ssm.rtcpPacketsSent = new(uint64)
ssm.formats = make(map[uint8]*serverStreamFormat)
for _, forma := range ssm.media.Formats {
@@ -53,8 +50,8 @@ func (ssm *serverStreamMedia) rtpInfoEntry(now time.Time) *headers.RTPInfoEntry
}
func (ssm *serverStreamMedia) stats() ServerStreamStatsMedia {
bytesSent := atomic.LoadUint64(ssm.bytesSent)
rtcpPacketsSent := atomic.LoadUint64(ssm.rtcpPacketsSent)
bytesSent := ssm.bytesSent.Load()
rtcpPacketsSent := ssm.rtcpPacketsSent.Load()
return ServerStreamStatsMedia{
OutboundBytes: bytesSent,
@@ -109,7 +106,7 @@ func (ssm *serverStreamMedia) writePacketRTCP(pkt rtcp.Packet) error {
continue
}
atomic.AddUint64(ssm.bytesSent, encrLen)
ssm.bytesSent.Add(encrLen)
} else {
err = sm.writePacketRTCPEncoded(plain)
if err != nil {
@@ -117,10 +114,10 @@ func (ssm *serverStreamMedia) writePacketRTCP(pkt rtcp.Packet) error {
continue
}
atomic.AddUint64(ssm.bytesSent, plainLen)
ssm.bytesSent.Add(plainLen)
}
atomic.AddUint64(ssm.rtcpPacketsSent, 1)
ssm.rtcpPacketsSent.Add(1)
}
}
@@ -132,17 +129,17 @@ func (ssm *serverStreamMedia) writePacketRTCP(pkt rtcp.Packet) error {
return err
}
atomic.AddUint64(ssm.bytesSent, encrLen)
ssm.bytesSent.Add(encrLen)
} else {
err = ssm.multicastWriter.writePacketRTCPEncoded(plain)
if err != nil {
return err
}
atomic.AddUint64(ssm.bytesSent, plainLen)
ssm.bytesSent.Add(plainLen)
}
atomic.AddUint64(ssm.rtcpPacketsSent, 1)
ssm.rtcpPacketsSent.Add(1)
}
return nil
+2 -2
View File
@@ -1314,12 +1314,12 @@ func TestServerTunnelHTTP(t *testing.T) {
for _, ca := range []string{"http", "https"} {
t.Run(ca, func(t *testing.T) {
done := make(chan struct{})
n := new(uint64)
var n atomic.Uint64
s := &Server{
Handler: &testServerHandler{
onConnClose: func(ctx *ServerHandlerOnConnCloseCtx) {
switch atomic.AddUint64(n, 1) {
switch n.Add(1) {
case 1:
require.EqualError(t, ctx.Error, "upgraded to HTTP conn")