diff --git a/client.go b/client.go index 08bbade7..c4f39839 100644 --- a/client.go +++ b/client.go @@ -24,9 +24,9 @@ import ( "github.com/pion/rtp" "github.com/bluenviron/gortsplib/v5/internal/asyncprocessor" + "github.com/bluenviron/gortsplib/v5/internal/bytecounter" "github.com/bluenviron/gortsplib/v5/pkg/auth" "github.com/bluenviron/gortsplib/v5/pkg/base" - "github.com/bluenviron/gortsplib/v5/pkg/bytecounter" "github.com/bluenviron/gortsplib/v5/pkg/conn" "github.com/bluenviron/gortsplib/v5/pkg/description" "github.com/bluenviron/gortsplib/v5/pkg/format" @@ -562,7 +562,7 @@ type Client struct { lastRange *headers.Range checkTimeoutTimer *time.Timer checkTimeoutInitial bool - tcpLastFrameTime *int64 + tcpLastFrameTime atomic.Int64 keepAlivePeriod time.Duration keepAliveTimer *time.Timer closeError error @@ -573,8 +573,8 @@ type Client struct { mustClose bool tcpFrame *base.InterleavedFrame tcpBuffer []byte - bytesReceived *uint64 - bytesSent *uint64 + bytesReceived atomic.Uint64 + bytesSent atomic.Uint64 // in chOptions chan optionsReq @@ -695,8 +695,6 @@ func (c *Client) Start() error { c.checkTimeoutTimer = emptyTimer() c.keepAlivePeriod = 30 * time.Second c.keepAliveTimer = emptyTimer() - c.bytesReceived = new(uint64) - c.bytesSent = new(uint64) c.chOptions = make(chan optionsReq) c.chDescribe = make(chan describeReq) @@ -1068,7 +1066,7 @@ func (c *Client) startTransportRoutines() { default: // TCP c.checkTimeoutTimer = time.NewTimer(c.checkTimeoutPeriod) - c.tcpLastFrameTime = ptrOf(c.timeNow().Unix()) + c.tcpLastFrameTime.Store(c.timeNow().Unix()) } } @@ -1189,7 +1187,7 @@ func (c *Client) connOpen() error { } c.nconn = nconn - bc := bytecounter.New(c.nconn, c.bytesReceived, c.bytesSent) + bc := bytecounter.New(c.nconn, &c.bytesReceived, &c.bytesSent) c.conn = conn.NewConn(bufio.NewReader(bc), bc) c.reader = &clientReader{ c: c, @@ -1284,12 +1282,12 @@ func (c *Client) do(req *base.Request, skipResponse bool) (*base.Response, error func (c *Client) atLeastOneUDPPacketHasBeenReceived() bool { for _, ct := range c.setuppedMedias { - lft := atomic.LoadInt64(ct.udpRTPListener.lastPacketTime) + lft := ct.udpRTPListener.lastPacketTime.Load() if lft != 0 { return true } - lft = atomic.LoadInt64(ct.udpRTCPListener.lastPacketTime) + lft = ct.udpRTCPListener.lastPacketTime.Load() if lft != 0 { return true } @@ -1300,12 +1298,12 @@ func (c *Client) atLeastOneUDPPacketHasBeenReceived() bool { func (c *Client) isInUDPTimeout() bool { now := c.timeNow() for _, ct := range c.setuppedMedias { - lft := time.Unix(atomic.LoadInt64(ct.udpRTPListener.lastPacketTime), 0) + lft := time.Unix(ct.udpRTPListener.lastPacketTime.Load(), 0) if now.Sub(lft) < c.ReadTimeout { return false } - lft = time.Unix(atomic.LoadInt64(ct.udpRTCPListener.lastPacketTime), 0) + lft = time.Unix(ct.udpRTCPListener.lastPacketTime.Load(), 0) if now.Sub(lft) < c.ReadTimeout { return false } @@ -1315,7 +1313,7 @@ func (c *Client) isInUDPTimeout() bool { func (c *Client) isInTCPTimeout() bool { now := c.timeNow() - lft := time.Unix(atomic.LoadInt64(c.tcpLastFrameTime), 0) + lft := time.Unix(c.tcpLastFrameTime.Load(), 0) return now.Sub(lft) >= c.ReadTimeout } @@ -2468,10 +2466,10 @@ func (c *Client) Stats() *ClientStats { return &ClientStats{ Conn: ConnStats{ - InboundBytes: atomic.LoadUint64(c.bytesReceived), - OutboundBytes: atomic.LoadUint64(c.bytesSent), - BytesReceived: atomic.LoadUint64(c.bytesReceived), - BytesSent: atomic.LoadUint64(c.bytesSent), + InboundBytes: c.bytesReceived.Load(), + OutboundBytes: c.bytesSent.Load(), + BytesReceived: c.bytesReceived.Load(), + BytesSent: c.bytesSent.Load(), }, Session: sessionStatsFromMedias(mediaStats), } diff --git a/client_format.go b/client_format.go index 320140cc..14ab33ed 100644 --- a/client_format.go +++ b/client_format.go @@ -3,7 +3,6 @@ package gortsplib import ( "fmt" "sync" - "sync/atomic" "time" "github.com/pion/rtcp" @@ -305,7 +304,7 @@ func (cf *clientFormat) writePacketRTPEncoded( ) error { cf.rtpSender.ProcessPacket(pkt, ntp, ptsEqualsDTS) - atomic.AddUint64(cf.cm.bytesSent, uint64(len(payload))) + cf.cm.bytesSent.Add(uint64(len(payload))) cf.cm.c.writerMutex.RLock() defer cf.cm.c.writerMutex.RUnlock() diff --git a/client_media.go b/client_media.go index d9ccfc54..5a2a0017 100644 --- a/client_media.go +++ b/client_media.go @@ -126,22 +126,16 @@ type clientMedia struct { onPacketRTCP OnPacketRTCPFunc formats map[uint8]*clientFormat writePacketRTCPInQueue func([]byte) error - bytesReceived *uint64 - bytesSent *uint64 - rtpPacketsInError *uint64 - rtcpPacketsReceived *uint64 - rtcpPacketsSent *uint64 - rtcpPacketsInError *uint64 + bytesReceived atomic.Uint64 + bytesSent atomic.Uint64 + rtpPacketsInError atomic.Uint64 + rtcpPacketsReceived atomic.Uint64 + rtcpPacketsSent atomic.Uint64 + rtcpPacketsInError atomic.Uint64 } func (cm *clientMedia) initialize() { cm.onPacketRTCP = func(rtcp.Packet) {} - cm.bytesReceived = new(uint64) - cm.bytesSent = new(uint64) - cm.rtpPacketsInError = new(uint64) - cm.rtcpPacketsReceived = new(uint64) - cm.rtcpPacketsSent = new(uint64) - cm.rtcpPacketsInError = new(uint64) cm.formats = make(map[uint8]*clientFormat) @@ -207,12 +201,12 @@ func (cm *clientMedia) stop() { func (cm *clientMedia) stats() SessionStatsMedia { //nolint:dupl return SessionStatsMedia{ - InboundBytes: atomic.LoadUint64(cm.bytesReceived), - InboundRTPPacketsInError: atomic.LoadUint64(cm.rtpPacketsInError), - InboundRTCPPackets: atomic.LoadUint64(cm.rtcpPacketsReceived), - InboundRTCPPacketsInError: atomic.LoadUint64(cm.rtcpPacketsInError), - OutboundBytes: atomic.LoadUint64(cm.bytesSent), - OutboundRTCPPackets: atomic.LoadUint64(cm.rtcpPacketsSent), + InboundBytes: cm.bytesReceived.Load(), + InboundRTPPacketsInError: cm.rtpPacketsInError.Load(), + InboundRTCPPackets: cm.rtcpPacketsReceived.Load(), + InboundRTCPPacketsInError: cm.rtcpPacketsInError.Load(), + OutboundBytes: cm.bytesSent.Load(), + OutboundRTCPPackets: cm.rtcpPacketsSent.Load(), Formats: func() map[format.Format]SessionStatsFormat { ret := make(map[format.Format]SessionStatsFormat, len(cm.formats)) for _, fo := range cm.formats { @@ -221,12 +215,12 @@ func (cm *clientMedia) stats() SessionStatsMedia { //nolint:dupl return ret }(), // deprecated - BytesReceived: atomic.LoadUint64(cm.bytesReceived), - BytesSent: atomic.LoadUint64(cm.bytesSent), - RTPPacketsInError: atomic.LoadUint64(cm.rtpPacketsInError), - RTCPPacketsReceived: atomic.LoadUint64(cm.rtcpPacketsReceived), - RTCPPacketsSent: atomic.LoadUint64(cm.rtcpPacketsSent), - RTCPPacketsInError: atomic.LoadUint64(cm.rtcpPacketsInError), + BytesReceived: cm.bytesReceived.Load(), + BytesSent: cm.bytesSent.Load(), + RTPPacketsInError: cm.rtpPacketsInError.Load(), + RTCPPacketsReceived: cm.rtcpPacketsReceived.Load(), + RTCPPacketsSent: cm.rtcpPacketsSent.Load(), + RTCPPacketsInError: cm.rtcpPacketsInError.Load(), } } @@ -303,7 +297,7 @@ func (cm *clientMedia) readPacketRTCPPlay(payload []byte) bool { now := cm.c.timeNow() - atomic.AddUint64(cm.rtcpPacketsReceived, uint64(len(packets))) + cm.rtcpPacketsReceived.Add(uint64(len(packets))) for _, pkt := range packets { if sr, ok := pkt.(*rtcp.SenderReport); ok { @@ -326,7 +320,7 @@ func (cm *clientMedia) readPacketRTCPRecord(payload []byte) bool { return false } - atomic.AddUint64(cm.rtcpPacketsReceived, uint64(len(packets))) + cm.rtcpPacketsReceived.Add(uint64(len(packets))) for _, pkt := range packets { if rr, ok := pkt.(*rtcp.ReceiverReport); ok { @@ -345,19 +339,19 @@ func (cm *clientMedia) readPacketRTCPRecord(payload []byte) bool { } func (cm *clientMedia) readPacketRTPTCPPlay(payload []byte) bool { - atomic.AddUint64(cm.bytesReceived, uint64(len(payload))) + cm.bytesReceived.Add(uint64(len(payload))) now := cm.c.timeNow() - atomic.StoreInt64(cm.c.tcpLastFrameTime, now.Unix()) + cm.c.tcpLastFrameTime.Store(now.Unix()) return cm.readPacketRTP(payload, now) } func (cm *clientMedia) readPacketRTCPTCPPlay(payload []byte) bool { - atomic.AddUint64(cm.bytesReceived, uint64(len(payload))) + cm.bytesReceived.Add(uint64(len(payload))) now := cm.c.timeNow() - atomic.StoreInt64(cm.c.tcpLastFrameTime, now.Unix()) + cm.c.tcpLastFrameTime.Store(now.Unix()) if len(payload) > udpMaxPayloadSize { cm.onPacketRTCPDecodeError(liberrors.ErrClientRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize}) @@ -372,7 +366,7 @@ func (cm *clientMedia) readPacketRTPTCPRecord(_ []byte) bool { } func (cm *clientMedia) readPacketRTCPTCPRecord(payload []byte) bool { - atomic.AddUint64(cm.bytesReceived, uint64(len(payload))) + cm.bytesReceived.Add(uint64(len(payload))) if len(payload) > udpMaxPayloadSize { cm.onPacketRTCPDecodeError(liberrors.ErrClientRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize}) @@ -383,7 +377,7 @@ func (cm *clientMedia) readPacketRTCPTCPRecord(payload []byte) bool { } func (cm *clientMedia) readPacketRTPUDPPlay(payload []byte) bool { - atomic.AddUint64(cm.bytesReceived, uint64(len(payload))) + cm.bytesReceived.Add(uint64(len(payload))) if len(payload) == (udpMaxPayloadSize + 1) { cm.onPacketRTPDecodeError(liberrors.ErrClientRTPPacketTooBigUDP{}) @@ -394,7 +388,7 @@ func (cm *clientMedia) readPacketRTPUDPPlay(payload []byte) bool { } func (cm *clientMedia) readPacketRTCPUDPPlay(payload []byte) bool { - atomic.AddUint64(cm.bytesReceived, uint64(len(payload))) + cm.bytesReceived.Add(uint64(len(payload))) if len(payload) == (udpMaxPayloadSize + 1) { cm.onPacketRTCPDecodeError(liberrors.ErrClientRTCPPacketTooBigUDP{}) @@ -409,7 +403,7 @@ func (cm *clientMedia) readPacketRTPUDPRecord(_ []byte) bool { } func (cm *clientMedia) readPacketRTCPUDPRecord(payload []byte) bool { - atomic.AddUint64(cm.bytesReceived, uint64(len(payload))) + cm.bytesReceived.Add(uint64(len(payload))) if len(payload) == (udpMaxPayloadSize + 1) { cm.onPacketRTCPDecodeError(liberrors.ErrClientRTCPPacketTooBigUDP{}) @@ -420,12 +414,12 @@ func (cm *clientMedia) readPacketRTCPUDPRecord(payload []byte) bool { } func (cm *clientMedia) onPacketRTPDecodeError(err error) { - atomic.AddUint64(cm.rtpPacketsInError, 1) + cm.rtpPacketsInError.Add(1) cm.c.OnDecodeError(err) } func (cm *clientMedia) onPacketRTCPDecodeError(err error) { - atomic.AddUint64(cm.rtcpPacketsInError, 1) + cm.rtcpPacketsInError.Add(1) cm.c.OnDecodeError(err) } @@ -476,8 +470,8 @@ func (cm *clientMedia) writePacketRTCPInQueueUDP(payload []byte) error { return err } - atomic.AddUint64(cm.bytesSent, uint64(len(payload))) - atomic.AddUint64(cm.rtcpPacketsSent, 1) + cm.bytesSent.Add(uint64(len(payload))) + cm.rtcpPacketsSent.Add(1) return nil } @@ -490,7 +484,7 @@ func (cm *clientMedia) writePacketRTCPInQueueTCP(payload []byte) error { return err } - atomic.AddUint64(cm.bytesSent, uint64(len(payload))) - atomic.AddUint64(cm.rtcpPacketsSent, 1) + cm.bytesSent.Add(uint64(len(payload))) + cm.rtcpPacketsSent.Add(1) return nil } diff --git a/client_play_test.go b/client_play_test.go index 9f4266b4..4b87e49a 100644 --- a/client_play_test.go +++ b/client_play_test.go @@ -2405,7 +2405,7 @@ func TestClientPlayPausePlay(t *testing.T) { require.NoError(t, err2) }() - firstFrame := int32(0) + var firstFrame atomic.Int32 packetRecv := make(chan struct{}) c := Client{ @@ -2421,7 +2421,7 @@ func TestClientPlayPausePlay(t *testing.T) { err = readAll(&c, "rtsp://localhost:8554/teststream", func(_ *description.Media, _ format.Format, _ *rtp.Packet) { - if atomic.SwapInt32(&firstFrame, 1) == 0 { + if firstFrame.Swap(1) == 0 { close(packetRecv) } }) @@ -2433,7 +2433,7 @@ func TestClientPlayPausePlay(t *testing.T) { _, err = c.Pause() require.NoError(t, err) - firstFrame = int32(0) + firstFrame.Store(0) packetRecv = make(chan struct{}) _, err = c.Play(nil) diff --git a/client_udp_listener.go b/client_udp_listener.go index 005ffc8a..36902416 100644 --- a/client_udp_listener.go +++ b/client_udp_listener.go @@ -44,7 +44,7 @@ type clientUDPListener struct { writeAddr *net.UDPAddr running bool - lastPacketTime *int64 + lastPacketTime atomic.Int64 done chan struct{} } @@ -72,7 +72,6 @@ func (u *clientUDPListener) initialize() error { } } - u.lastPacketTime = ptrOf(int64(0)) return nil } @@ -134,7 +133,7 @@ func (u *clientUDPListener) run() { } now := u.c.timeNow() - atomic.StoreInt64(u.lastPacketTime, now.Unix()) + u.lastPacketTime.Store(now.Unix()) if u.readFunc(buf[:n]) { createNewBuffer() diff --git a/internal/bytecounter/bytecounter.go b/internal/bytecounter/bytecounter.go new file mode 100644 index 00000000..3c2dc418 --- /dev/null +++ b/internal/bytecounter/bytecounter.go @@ -0,0 +1,54 @@ +// Package bytecounter contains a io.ReadWriter wrapper that allows to count read and written bytes. +package bytecounter + +import ( + "io" + "sync/atomic" +) + +// ByteCounter is a io.ReadWriter wrapper that allows to count read and written bytes. +type ByteCounter struct { + rw io.ReadWriter + received *atomic.Uint64 + sent *atomic.Uint64 +} + +// New allocates a ByteCounter. +func New(rw io.ReadWriter, received *atomic.Uint64, sent *atomic.Uint64) *ByteCounter { + if received == nil { + received = new(atomic.Uint64) + } + if sent == nil { + sent = new(atomic.Uint64) + } + + return &ByteCounter{ + rw: rw, + received: received, + sent: sent, + } +} + +// Read implements io.ReadWriter. +func (bc *ByteCounter) Read(p []byte) (int, error) { + n, err := bc.rw.Read(p) + bc.received.Add(uint64(n)) + return n, err +} + +// Write implements io.ReadWriter. +func (bc *ByteCounter) Write(p []byte) (int, error) { + n, err := bc.rw.Write(p) + bc.sent.Add(uint64(n)) + return n, err +} + +// BytesReceived returns the number of bytes received. +func (bc *ByteCounter) BytesReceived() uint64 { + return bc.received.Load() +} + +// BytesSent returns the number of bytes sent. +func (bc *ByteCounter) BytesSent() uint64 { + return bc.sent.Load() +} diff --git a/internal/bytecounter/bytecounter_test.go b/internal/bytecounter/bytecounter_test.go new file mode 100644 index 00000000..2285cd85 --- /dev/null +++ b/internal/bytecounter/bytecounter_test.go @@ -0,0 +1,22 @@ +package bytecounter + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestByteCounter(t *testing.T) { + bc := New(bytes.NewBuffer(nil), nil, nil) + + _, err := bc.Write([]byte{0x01, 0x02, 0x03, 0x04}) + require.NoError(t, err) + + buf := make([]byte, 2) + _, err = bc.Read(buf) + require.NoError(t, err) + + require.Equal(t, uint64(4), bc.BytesSent()) + require.Equal(t, uint64(2), bc.BytesReceived()) +} diff --git a/pkg/bytecounter/bytecounter.go b/pkg/bytecounter/bytecounter.go index abd59d97..8f0c0f16 100644 --- a/pkg/bytecounter/bytecounter.go +++ b/pkg/bytecounter/bytecounter.go @@ -7,6 +7,8 @@ import ( ) // ByteCounter is a io.ReadWriter wrapper that allows to count read and written bytes. +// +// Deprecated: not exposed anymore. will be removed in next version. type ByteCounter struct { rw io.ReadWriter received *uint64 @@ -14,6 +16,8 @@ type ByteCounter struct { } // New allocates a ByteCounter. +// +// Deprecated: not exposed anymore. will be removed in next version. func New(rw io.ReadWriter, received *uint64, sent *uint64) *ByteCounter { if received == nil { received = new(uint64) diff --git a/server_conn.go b/server_conn.go index ef29c564..341756c1 100644 --- a/server_conn.go +++ b/server_conn.go @@ -13,9 +13,9 @@ import ( "sync" "time" + "github.com/bluenviron/gortsplib/v5/internal/bytecounter" "github.com/bluenviron/gortsplib/v5/pkg/auth" "github.com/bluenviron/gortsplib/v5/pkg/base" - "github.com/bluenviron/gortsplib/v5/pkg/bytecounter" "github.com/bluenviron/gortsplib/v5/pkg/conn" "github.com/bluenviron/gortsplib/v5/pkg/description" "github.com/bluenviron/gortsplib/v5/pkg/headers" diff --git a/server_play_test.go b/server_play_test.go index acf79f92..8fee2679 100644 --- a/server_play_test.go +++ b/server_play_test.go @@ -491,13 +491,14 @@ func TestServerPlaySetupErrors(t *testing.T) { func TestServerPlaySetupErrorSameUDPPortsAndIP(t *testing.T) { var stream *ServerStream - first := int32(1) + var first atomic.Int32 + first.Store(1) errorRecv := make(chan struct{}) s := &Server{ Handler: &testServerHandler{ onConnClose: func(ctx *ServerHandlerOnConnCloseCtx) { - if atomic.SwapInt32(&first, 0) == 1 { + if first.Swap(0) == 1 { require.EqualError(t, ctx.Error, "UDP ports 35466 and 35467 are already assigned to another reader with the same IP") close(errorRecv) @@ -622,7 +623,7 @@ func TestServerPlay(t *testing.T) { sessionOpened := make(chan struct{}) sessionClosed := make(chan struct{}) framesReceived := make(chan struct{}) - counter := uint64(0) + var counter atomic.Uint64 listenIP := multicastCapableIP(t) @@ -734,7 +735,7 @@ func TestServerPlay(t *testing.T) { ctx.Session.OnPacketRTCPAny(func(medi *description.Media, pkt rtcp.Packet) { // ignore multicast loopback - if ca.secure == "unsecure" && ca.transport == "multicast" && atomic.AddUint64(&counter, 1) > 1 { + if ca.secure == "unsecure" && ca.transport == "multicast" && counter.Add(1) > 1 { return } diff --git a/server_session.go b/server_session.go index 48df8d3c..4242c514 100644 --- a/server_session.go +++ b/server_session.go @@ -314,7 +314,7 @@ type ServerSession struct { lastRequestTime time.Time tcpConn *ServerConn announcedDesc *description.Session // record - udpLastPacketTime *int64 // record + udpLastPacketTime atomic.Int64 // record udpCheckStreamTimer *time.Timer writerMutex sync.RWMutex writer *asyncprocessor.Processor @@ -659,7 +659,7 @@ func (ss *ServerSession) runInner() error { case <-ss.udpCheckStreamTimer.C: now := ss.s.timeNow() - lft := atomic.LoadInt64(ss.udpLastPacketTime) + lft := ss.udpLastPacketTime.Load() // in case of RECORD, timeout happens when no RTP or RTCP packets are being received if ss.state == ServerSessionStateRecord { @@ -1231,7 +1231,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( ss.state = ServerSessionStatePlay ss.propsMutex.Unlock() - ss.udpLastPacketTime = ptrOf(ss.s.timeNow().Unix()) + ss.udpLastPacketTime.Store(ss.s.timeNow().Unix()) ss.timeDecoder = &rtptime.GlobalDecoder{} ss.timeDecoder.Initialize() @@ -1323,7 +1323,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( if res.StatusCode == base.StatusOK { ss.state = ServerSessionStateRecord - ss.udpLastPacketTime = ptrOf(ss.s.timeNow().Unix()) + ss.udpLastPacketTime.Store(ss.s.timeNow().Unix()) ss.timeDecoder = &rtptime.GlobalDecoder{} ss.timeDecoder.Initialize() diff --git a/server_session_format.go b/server_session_format.go index 50b50e28..910fa55b 100644 --- a/server_session_format.go +++ b/server_session_format.go @@ -4,7 +4,6 @@ import ( "fmt" "log" "sync" - "sync/atomic" "time" "github.com/pion/rtcp" @@ -323,7 +322,7 @@ func (ssf *serverSessionFormat) writePacketRTPEncoded( ) error { ssf.rtpSender.ProcessPacket(pkt, ntp, ptsEqualsDTS) - atomic.AddUint64(ssf.ssm.bytesSent, uint64(len(payload))) + ssf.ssm.bytesSent.Add(uint64(len(payload))) ssf.ssm.ss.writerMutex.RLock() defer ssf.ssm.ss.writerMutex.RUnlock() diff --git a/server_session_media.go b/server_session_media.go index 7631825b..80684026 100644 --- a/server_session_media.go +++ b/server_session_media.go @@ -30,22 +30,15 @@ type serverSessionMedia struct { formats map[uint8]*serverSessionFormat // record only writePacketRTCPInQueue func([]byte) error - bytesReceived *uint64 - bytesSent *uint64 - rtpPacketsInError *uint64 - rtcpPacketsReceived *uint64 - rtcpPacketsSent *uint64 - rtcpPacketsInError *uint64 + bytesReceived atomic.Uint64 + bytesSent atomic.Uint64 + rtpPacketsInError atomic.Uint64 + rtcpPacketsReceived atomic.Uint64 + rtcpPacketsSent atomic.Uint64 + rtcpPacketsInError atomic.Uint64 } func (ssm *serverSessionMedia) initialize() { - ssm.bytesReceived = new(uint64) - ssm.bytesSent = new(uint64) - ssm.rtpPacketsInError = new(uint64) - ssm.rtcpPacketsReceived = new(uint64) - ssm.rtcpPacketsSent = new(uint64) - ssm.rtcpPacketsInError = new(uint64) - ssm.formats = make(map[uint8]*serverSessionFormat) for _, forma := range ssm.media.Formats { @@ -145,12 +138,12 @@ func (ssm *serverSessionMedia) stop() { func (ssm *serverSessionMedia) stats() SessionStatsMedia { //nolint:dupl return SessionStatsMedia{ - InboundBytes: atomic.LoadUint64(ssm.bytesReceived), - InboundRTPPacketsInError: atomic.LoadUint64(ssm.rtpPacketsInError), - InboundRTCPPackets: atomic.LoadUint64(ssm.rtcpPacketsReceived), - InboundRTCPPacketsInError: atomic.LoadUint64(ssm.rtcpPacketsInError), - OutboundBytes: atomic.LoadUint64(ssm.bytesSent), - OutboundRTCPPackets: atomic.LoadUint64(ssm.rtcpPacketsSent), + InboundBytes: ssm.bytesReceived.Load(), + InboundRTPPacketsInError: ssm.rtpPacketsInError.Load(), + InboundRTCPPackets: ssm.rtcpPacketsReceived.Load(), + InboundRTCPPacketsInError: ssm.rtcpPacketsInError.Load(), + OutboundBytes: ssm.bytesSent.Load(), + OutboundRTCPPackets: ssm.rtcpPacketsSent.Load(), Formats: func() map[format.Format]SessionStatsFormat { ret := make(map[format.Format]SessionStatsFormat, len(ssm.formats)) for _, ssf := range ssm.formats { @@ -159,12 +152,12 @@ func (ssm *serverSessionMedia) stats() SessionStatsMedia { //nolint:dupl return ret }(), // deprecated - BytesReceived: atomic.LoadUint64(ssm.bytesReceived), - BytesSent: atomic.LoadUint64(ssm.bytesSent), - RTPPacketsInError: atomic.LoadUint64(ssm.rtpPacketsInError), - RTCPPacketsReceived: atomic.LoadUint64(ssm.rtcpPacketsReceived), - RTCPPacketsSent: atomic.LoadUint64(ssm.rtcpPacketsSent), - RTCPPacketsInError: atomic.LoadUint64(ssm.rtcpPacketsInError), + BytesReceived: ssm.bytesReceived.Load(), + BytesSent: ssm.bytesSent.Load(), + RTPPacketsInError: ssm.rtpPacketsInError.Load(), + RTCPPacketsReceived: ssm.rtcpPacketsReceived.Load(), + RTCPPacketsSent: ssm.rtcpPacketsSent.Load(), + RTCPPacketsInError: ssm.rtcpPacketsInError.Load(), } } @@ -239,7 +232,7 @@ func (ssm *serverSessionMedia) readPacketRTCPPlay(payload []byte) bool { return false } - atomic.AddUint64(ssm.rtcpPacketsReceived, uint64(len(packets))) + ssm.rtcpPacketsReceived.Add(uint64(len(packets))) for _, pkt := range packets { if rr, ok := pkt.(*rtcp.ReceiverReport); ok { @@ -266,7 +259,7 @@ func (ssm *serverSessionMedia) readPacketRTCPRecord(payload []byte) bool { now := ssm.ss.s.timeNow() - atomic.AddUint64(ssm.rtcpPacketsReceived, uint64(len(packets))) + ssm.rtcpPacketsReceived.Add(uint64(len(packets))) for _, pkt := range packets { if sr, ok := pkt.(*rtcp.SenderReport); ok { @@ -283,10 +276,10 @@ func (ssm *serverSessionMedia) readPacketRTCPRecord(payload []byte) bool { } func (ssm *serverSessionMedia) readPacketRTPUDPPlay(payload []byte) bool { - atomic.AddUint64(ssm.bytesReceived, uint64(len(payload))) + ssm.bytesReceived.Add(uint64(len(payload))) now := ssm.ss.s.timeNow() - atomic.StoreInt64(ssm.ss.udpLastPacketTime, now.Unix()) + ssm.ss.udpLastPacketTime.Store(now.Unix()) if len(payload) == (udpMaxPayloadSize + 1) { ssm.onPacketRTPDecodeError(liberrors.ErrServerRTPPacketTooBigUDP{}) @@ -297,10 +290,10 @@ func (ssm *serverSessionMedia) readPacketRTPUDPPlay(payload []byte) bool { } func (ssm *serverSessionMedia) readPacketRTCPUDPPlay(payload []byte) bool { - atomic.AddUint64(ssm.bytesReceived, uint64(len(payload))) + ssm.bytesReceived.Add(uint64(len(payload))) now := ssm.ss.s.timeNow() - atomic.StoreInt64(ssm.ss.udpLastPacketTime, now.Unix()) + ssm.ss.udpLastPacketTime.Store(now.Unix()) if len(payload) == (udpMaxPayloadSize + 1) { ssm.onPacketRTCPDecodeError(liberrors.ErrServerRTCPPacketTooBigUDP{}) @@ -311,10 +304,10 @@ func (ssm *serverSessionMedia) readPacketRTCPUDPPlay(payload []byte) bool { } func (ssm *serverSessionMedia) readPacketRTPUDPRecord(payload []byte) bool { - atomic.AddUint64(ssm.bytesReceived, uint64(len(payload))) + ssm.bytesReceived.Add(uint64(len(payload))) now := ssm.ss.s.timeNow() - atomic.StoreInt64(ssm.ss.udpLastPacketTime, now.Unix()) + ssm.ss.udpLastPacketTime.Store(now.Unix()) if len(payload) == (udpMaxPayloadSize + 1) { ssm.onPacketRTPDecodeError(liberrors.ErrServerRTPPacketTooBigUDP{}) @@ -325,10 +318,10 @@ func (ssm *serverSessionMedia) readPacketRTPUDPRecord(payload []byte) bool { } func (ssm *serverSessionMedia) readPacketRTCPUDPRecord(payload []byte) bool { - atomic.AddUint64(ssm.bytesReceived, uint64(len(payload))) + ssm.bytesReceived.Add(uint64(len(payload))) now := ssm.ss.s.timeNow() - atomic.StoreInt64(ssm.ss.udpLastPacketTime, now.Unix()) + ssm.ss.udpLastPacketTime.Store(now.Unix()) if len(payload) == (udpMaxPayloadSize + 1) { ssm.onPacketRTCPDecodeError(liberrors.ErrServerRTCPPacketTooBigUDP{}) @@ -343,13 +336,13 @@ func (ssm *serverSessionMedia) readPacketRTPTCPPlay(payload []byte) bool { return false } - atomic.AddUint64(ssm.bytesReceived, uint64(len(payload))) + ssm.bytesReceived.Add(uint64(len(payload))) return ssm.readPacketRTP(payload, ssm.ss.s.timeNow()) } func (ssm *serverSessionMedia) readPacketRTCPTCPPlay(payload []byte) bool { - atomic.AddUint64(ssm.bytesReceived, uint64(len(payload))) + ssm.bytesReceived.Add(uint64(len(payload))) if len(payload) > udpMaxPayloadSize { ssm.onPacketRTCPDecodeError(liberrors.ErrServerRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize}) @@ -360,13 +353,13 @@ func (ssm *serverSessionMedia) readPacketRTCPTCPPlay(payload []byte) bool { } func (ssm *serverSessionMedia) readPacketRTPTCPRecord(payload []byte) bool { - atomic.AddUint64(ssm.bytesReceived, uint64(len(payload))) + ssm.bytesReceived.Add(uint64(len(payload))) return ssm.readPacketRTP(payload, ssm.ss.s.timeNow()) } func (ssm *serverSessionMedia) readPacketRTCPTCPRecord(payload []byte) bool { - atomic.AddUint64(ssm.bytesReceived, uint64(len(payload))) + ssm.bytesReceived.Add(uint64(len(payload))) if len(payload) > udpMaxPayloadSize { ssm.onPacketRTCPDecodeError(liberrors.ErrServerRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize}) @@ -377,7 +370,7 @@ func (ssm *serverSessionMedia) readPacketRTCPTCPRecord(payload []byte) bool { } func (ssm *serverSessionMedia) onPacketRTPDecodeError(err error) { - atomic.AddUint64(ssm.rtpPacketsInError, 1) + ssm.rtpPacketsInError.Add(1) if h, ok := ssm.ss.s.Handler.(ServerHandlerOnDecodeError); ok { h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{ @@ -390,7 +383,7 @@ func (ssm *serverSessionMedia) onPacketRTPDecodeError(err error) { } func (ssm *serverSessionMedia) onPacketRTCPDecodeError(err error) { - atomic.AddUint64(ssm.rtcpPacketsInError, 1) + ssm.rtcpPacketsInError.Add(1) if h, ok := ssm.ss.s.Handler.(ServerHandlerOnDecodeError); ok { h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{ @@ -456,8 +449,8 @@ func (ssm *serverSessionMedia) writePacketRTCPInQueueUDP(payload []byte) error { return err } - atomic.AddUint64(ssm.bytesSent, uint64(len(payload))) - atomic.AddUint64(ssm.rtcpPacketsSent, 1) + ssm.bytesSent.Add(uint64(len(payload))) + ssm.rtcpPacketsSent.Add(1) return nil } @@ -470,7 +463,7 @@ func (ssm *serverSessionMedia) writePacketRTCPInQueueTCP(payload []byte) error { return err } - atomic.AddUint64(ssm.bytesSent, uint64(len(payload))) - atomic.AddUint64(ssm.rtcpPacketsSent, 1) + ssm.bytesSent.Add(uint64(len(payload))) + ssm.rtcpPacketsSent.Add(1) return nil } diff --git a/server_stream_format.go b/server_stream_format.go index 5ecdc380..07146142 100644 --- a/server_stream_format.go +++ b/server_stream_format.go @@ -3,7 +3,6 @@ package gortsplib import ( "crypto/rand" "sync" - "sync/atomic" "time" "github.com/pion/rtp" @@ -146,7 +145,7 @@ func (ssf *serverStreamFormat) writePacketRTP(pkt *rtp.Packet, ntp time.Time) er } } - atomic.AddUint64(ssf.ssm.bytesSent, uint64(len(encr))*encrReaders+uint64(len(plain))*plainReaders) + ssf.ssm.bytesSent.Add(uint64(len(encr))*encrReaders + uint64(len(plain))*plainReaders) ssf.mutex.Lock() ssf.firstSent = true diff --git a/server_stream_media.go b/server_stream_media.go index 6bda8848..e5d9fb8d 100644 --- a/server_stream_media.go +++ b/server_stream_media.go @@ -20,14 +20,11 @@ type serverStreamMedia struct { formats map[uint8]*serverStreamFormat multicastWriter *serverMulticastWriterMedia - bytesSent *uint64 - rtcpPacketsSent *uint64 + bytesSent atomic.Uint64 + rtcpPacketsSent atomic.Uint64 } func (ssm *serverStreamMedia) initialize() { - ssm.bytesSent = new(uint64) - ssm.rtcpPacketsSent = new(uint64) - ssm.formats = make(map[uint8]*serverStreamFormat) for _, forma := range ssm.media.Formats { @@ -53,8 +50,8 @@ func (ssm *serverStreamMedia) rtpInfoEntry(now time.Time) *headers.RTPInfoEntry } func (ssm *serverStreamMedia) stats() ServerStreamStatsMedia { - bytesSent := atomic.LoadUint64(ssm.bytesSent) - rtcpPacketsSent := atomic.LoadUint64(ssm.rtcpPacketsSent) + bytesSent := ssm.bytesSent.Load() + rtcpPacketsSent := ssm.rtcpPacketsSent.Load() return ServerStreamStatsMedia{ OutboundBytes: bytesSent, @@ -109,7 +106,7 @@ func (ssm *serverStreamMedia) writePacketRTCP(pkt rtcp.Packet) error { continue } - atomic.AddUint64(ssm.bytesSent, encrLen) + ssm.bytesSent.Add(encrLen) } else { err = sm.writePacketRTCPEncoded(plain) if err != nil { @@ -117,10 +114,10 @@ func (ssm *serverStreamMedia) writePacketRTCP(pkt rtcp.Packet) error { continue } - atomic.AddUint64(ssm.bytesSent, plainLen) + ssm.bytesSent.Add(plainLen) } - atomic.AddUint64(ssm.rtcpPacketsSent, 1) + ssm.rtcpPacketsSent.Add(1) } } @@ -132,17 +129,17 @@ func (ssm *serverStreamMedia) writePacketRTCP(pkt rtcp.Packet) error { return err } - atomic.AddUint64(ssm.bytesSent, encrLen) + ssm.bytesSent.Add(encrLen) } else { err = ssm.multicastWriter.writePacketRTCPEncoded(plain) if err != nil { return err } - atomic.AddUint64(ssm.bytesSent, plainLen) + ssm.bytesSent.Add(plainLen) } - atomic.AddUint64(ssm.rtcpPacketsSent, 1) + ssm.rtcpPacketsSent.Add(1) } return nil diff --git a/server_test.go b/server_test.go index b9c84bcd..b4bfe1fc 100644 --- a/server_test.go +++ b/server_test.go @@ -1314,12 +1314,12 @@ func TestServerTunnelHTTP(t *testing.T) { for _, ca := range []string{"http", "https"} { t.Run(ca, func(t *testing.T) { done := make(chan struct{}) - n := new(uint64) + var n atomic.Uint64 s := &Server{ Handler: &testServerHandler{ onConnClose: func(ctx *ServerHandlerOnConnCloseCtx) { - switch atomic.AddUint64(n, 1) { + switch n.Add(1) { case 1: require.EqualError(t, ctx.Error, "upgraded to HTTP conn")