diff --git a/README.md b/README.md index a92a2f1a..9548895b 100644 --- a/README.md +++ b/README.md @@ -22,8 +22,8 @@ Features: * Read selected media streams * Pause or seek without disconnecting from the server * Write to ONVIF back channels - * Get PTS (presentation timestamp) of incoming packets - * Get NTP (absolute timestamp) of incoming packets + * Get PTS (presentation timestamp) of inbound packets + * Get NTP (absolute timestamp) of inbound packets * Write media streams to a server ("record") * Write streams with the UDP or TCP transport protocol * Switch transport protocol automatically @@ -35,8 +35,8 @@ Features: * Validate client credentials * Read media streams from clients ("record") * Read streams with the UDP or TCP transport protocol - * Get PTS (presentation timestamp) of incoming packets - * Get NTP (absolute timestamp) of incoming packets + * Get PTS (presentation timestamp) of inbound packets + * Get NTP (absolute timestamp) of inbound packets * Serve media streams to clients ("play") * Write streams with the UDP, UDP-multicast or TCP transport protocol * Compute and provide SSRC, RTP-Info to clients diff --git a/client.go b/client.go index fc91b63d..44fd8875 100644 --- a/client.go +++ b/client.go @@ -481,10 +481,10 @@ type Client struct { // This can be increased to reduce packet losses. // It defaults to the operating system default value. UDPReadBufferSize int - // Size of the queue of outgoing packets. + // Size of the queue of outbound packets. // It defaults to 256. WriteQueueSize int - // maximum size of outgoing RTP / RTCP packets. + // maximum size of outbound RTP / RTCP packets. // This must be less than the IPv4/UDP MTU (1472 bytes). // It defaults to 1472. MaxPacketSize int @@ -495,7 +495,7 @@ type Client struct { DisableRTCPSenderReports bool // explicitly request back channels to the server. RequestBackChannels bool - // Range of ports used as source port in outgoing UDP packets. + // Range of ports used as source port in outbound UDP packets. // It defaults to [10000, 65535]. UDPSourcePortRange [2]uint16 @@ -2424,7 +2424,7 @@ func (c *Client) WritePacketRTCP(medi *description.Media, pkt rtcp.Packet) error return cm.writePacketRTCP(pkt) } -// PacketPTS returns the PTS (presentation timestamp) of an incoming RTP packet. +// PacketPTS returns the PTS (presentation timestamp) of an inbound RTP packet. // It is computed by decoding the packet timestamp and sychronizing it with other tracks. func (c *Client) PacketPTS(medi *description.Media, pkt *rtp.Packet) (int64, bool) { cm := c.setuppedMedias[medi] @@ -2432,7 +2432,7 @@ func (c *Client) PacketPTS(medi *description.Media, pkt *rtp.Packet) (int64, boo return c.timeDecoder.Decode(ct.format, pkt) } -// PacketNTP returns the NTP (absolute timestamp) of an incoming RTP packet. +// PacketNTP returns the NTP (absolute timestamp) of an inbound RTP packet. // The NTP is computed from RTCP sender reports. func (c *Client) PacketNTP(medi *description.Media, pkt *rtp.Packet) (time.Time, bool) { cm := c.setuppedMedias[medi] @@ -2468,94 +2468,11 @@ func (c *Client) Stats() *ClientStats { return &ClientStats{ Conn: ConnStats{ + InboundBytes: atomic.LoadUint64(c.bytesReceived), + OutboundBytes: atomic.LoadUint64(c.bytesSent), BytesReceived: atomic.LoadUint64(c.bytesReceived), BytesSent: atomic.LoadUint64(c.bytesSent), }, - Session: SessionStats{ //nolint:dupl - BytesReceived: func() uint64 { - v := uint64(0) - for _, ms := range mediaStats { - v += ms.BytesReceived - } - return v - }(), - BytesSent: func() uint64 { - v := uint64(0) - for _, ms := range mediaStats { - v += ms.BytesSent - } - return v - }(), - RTPPacketsReceived: func() uint64 { - v := uint64(0) - for _, ms := range mediaStats { - for _, f := range ms.Formats { - v += f.RTPPacketsReceived - } - } - return v - }(), - RTPPacketsSent: func() uint64 { - v := uint64(0) - for _, ms := range mediaStats { - for _, f := range ms.Formats { - v += f.RTPPacketsSent - } - } - return v - }(), - RTPPacketsLost: func() uint64 { - v := uint64(0) - for _, ms := range mediaStats { - for _, f := range ms.Formats { - v += f.RTPPacketsLost - } - } - return v - }(), - RTPPacketsInError: func() uint64 { - v := uint64(0) - for _, ms := range mediaStats { - v += ms.RTPPacketsInError - } - return v - }(), - RTPPacketsJitter: func() float64 { - v := float64(0) - n := float64(0) - for _, ms := range mediaStats { - for _, f := range ms.Formats { - v += f.RTPPacketsJitter - n++ - } - } - if n != 0 { - return v / n - } - return 0 - }(), - RTCPPacketsReceived: func() uint64 { - v := uint64(0) - for _, ms := range mediaStats { - v += ms.RTCPPacketsReceived - } - return v - }(), - RTCPPacketsSent: func() uint64 { - v := uint64(0) - for _, ms := range mediaStats { - v += ms.RTCPPacketsSent - } - return v - }(), - RTCPPacketsInError: func() uint64 { - v := uint64(0) - for _, ms := range mediaStats { - v += ms.RTCPPacketsInError - } - return v - }(), - Medias: mediaStats, - }, + Session: sessionStatsFromMedias(mediaStats), } } diff --git a/client_format.go b/client_format.go index 3cad3fb4..c54ba5f8 100644 --- a/client_format.go +++ b/client_format.go @@ -91,24 +91,66 @@ func (cf *clientFormat) stats() SessionStatsFormat { //nolint:dupl } return SessionStatsFormat{ - RTPPacketsReceived: func() uint64 { + InboundRTPPackets: func() uint64 { if recvStats != nil { - return recvStats.TotalReceived + return recvStats.Received } return 0 }(), - RTPPacketsSent: func() uint64 { + InboundRTPPacketsLost: func() uint64 { + if recvStats != nil { + return recvStats.Lost + } + return 0 + }(), + InboundRTPPacketsJitter: func() float64 { + if recvStats != nil { + return recvStats.Jitter + } + return 0 + }(), + InboundRTPPacketsLastSequenceNumber: func() uint16 { + if recvStats != nil { + return recvStats.LastSequenceNumber + } + return 0 + }(), + InboundRTPPacketsLastRTP: func() uint32 { + if recvStats != nil { + return recvStats.LastRTP + } + return 0 + }(), + InboundRTPPacketsLastNTP: func() time.Time { + if recvStats != nil { + return recvStats.LastNTP + } + return time.Time{} + }(), + OutboundRTPPackets: func() uint64 { if sentStats != nil { - return sentStats.TotalSent + return sentStats.Sent } return 0 }(), - RTPPacketsLost: func() uint64 { - if recvStats != nil { - return recvStats.TotalLost + OutboundRTPPacketsLastSequenceNumber: func() uint16 { + if sentStats != nil { + return sentStats.LastSequenceNumber } return 0 }(), + OutboundRTPPacketsLastRTP: func() uint32 { + if sentStats != nil { + return sentStats.LastRTP + } + return 0 + }(), + OutboundRTPPacketsLastNTP: func() time.Time { + if sentStats != nil { + return sentStats.LastNTP + } + return time.Time{} + }(), LocalSSRC: cf.localSSRC, RemoteSSRC: func() uint32 { if v, ok := cf.remoteSSRC(); ok { @@ -116,6 +158,25 @@ func (cf *clientFormat) stats() SessionStatsFormat { //nolint:dupl } return 0 }(), + // deprecated + RTPPacketsReceived: func() uint64 { + if recvStats != nil { + return recvStats.Received + } + return 0 + }(), + RTPPacketsSent: func() uint64 { + if sentStats != nil { + return sentStats.Sent + } + return 0 + }(), + RTPPacketsLost: func() uint64 { + if recvStats != nil { + return recvStats.Lost + } + return 0 + }(), RTPPacketsLastSequenceNumber: func() uint16 { if recvStats != nil { return recvStats.LastSequenceNumber diff --git a/client_media.go b/client_media.go index 71adcfec..58667824 100644 --- a/client_media.go +++ b/client_media.go @@ -205,14 +205,14 @@ func (cm *clientMedia) stop() { } } -func (cm *clientMedia) stats() SessionStatsMedia { +func (cm *clientMedia) stats() SessionStatsMedia { //nolint:dupl return SessionStatsMedia{ - BytesReceived: atomic.LoadUint64(cm.bytesReceived), - BytesSent: atomic.LoadUint64(cm.bytesSent), - RTPPacketsInError: atomic.LoadUint64(cm.rtpPacketsInError), - RTCPPacketsReceived: atomic.LoadUint64(cm.rtcpPacketsReceived), - RTCPPacketsSent: atomic.LoadUint64(cm.rtcpPacketsSent), - RTCPPacketsInError: atomic.LoadUint64(cm.rtcpPacketsInError), + InboundBytes: atomic.LoadUint64(cm.bytesReceived), + InboundRTPPacketsInError: atomic.LoadUint64(cm.rtpPacketsInError), + InboundRTCPPackets: atomic.LoadUint64(cm.rtcpPacketsReceived), + InboundRTCPPacketsInError: atomic.LoadUint64(cm.rtcpPacketsInError), + OutboundBytes: atomic.LoadUint64(cm.bytesSent), + OutboundRTCPPackets: atomic.LoadUint64(cm.rtcpPacketsSent), Formats: func() map[format.Format]SessionStatsFormat { ret := make(map[format.Format]SessionStatsFormat, len(cm.formats)) for _, fo := range cm.formats { @@ -220,6 +220,13 @@ func (cm *clientMedia) stats() SessionStatsMedia { } return ret }(), + // deprecated + BytesReceived: atomic.LoadUint64(cm.bytesReceived), + BytesSent: atomic.LoadUint64(cm.bytesSent), + RTPPacketsInError: atomic.LoadUint64(cm.rtpPacketsInError), + RTCPPacketsReceived: atomic.LoadUint64(cm.rtcpPacketsReceived), + RTCPPacketsSent: atomic.LoadUint64(cm.rtcpPacketsSent), + RTCPPacketsInError: atomic.LoadUint64(cm.rtcpPacketsInError), } } diff --git a/client_play_test.go b/client_play_test.go index 7243293e..b0b0573d 100644 --- a/client_play_test.go +++ b/client_play_test.go @@ -652,47 +652,36 @@ func TestClientPlay(t *testing.T) { // test that stats are available after client is closed s := c.Stats() - require.Equal(t, &ClientStats{ - Conn: ConnStats{ - BytesReceived: s.Conn.BytesReceived, - BytesSent: s.Conn.BytesSent, - }, - Session: SessionStats{ - BytesReceived: s.Session.BytesReceived, - BytesSent: s.Session.BytesSent, - RTPPacketsReceived: s.Session.RTPPacketsReceived, - RTCPPacketsReceived: s.Session.RTCPPacketsReceived, - RTCPPacketsSent: s.Session.RTCPPacketsSent, - Medias: map[*description.Media]SessionStatsMedia{ - sd.Medias[0]: { //nolint:dupl - BytesReceived: s.Session.Medias[sd.Medias[0]].BytesReceived, - BytesSent: s.Session.Medias[sd.Medias[0]].BytesSent, - RTCPPacketsReceived: s.Session.Medias[sd.Medias[0]].RTCPPacketsReceived, - RTCPPacketsSent: s.Session.Medias[sd.Medias[0]].RTCPPacketsSent, - Formats: map[format.Format]SessionStatsFormat{ - sd.Medias[0].Formats[0]: { - RTPPacketsReceived: s.Session.Medias[sd.Medias[0]].Formats[sd.Medias[0].Formats[0]].RTPPacketsReceived, - LocalSSRC: s.Session.Medias[sd.Medias[0]].Formats[sd.Medias[0].Formats[0]].LocalSSRC, - RemoteSSRC: s.Session.Medias[sd.Medias[0]].Formats[sd.Medias[0].Formats[0]].RemoteSSRC, - }, - }, - }, - sd.Medias[1]: { //nolint:dupl - BytesReceived: s.Session.Medias[sd.Medias[1]].BytesReceived, - BytesSent: s.Session.Medias[sd.Medias[1]].BytesSent, - RTCPPacketsReceived: s.Session.Medias[sd.Medias[1]].RTCPPacketsReceived, - RTCPPacketsSent: s.Session.Medias[sd.Medias[1]].RTCPPacketsSent, - Formats: map[format.Format]SessionStatsFormat{ - sd.Medias[1].Formats[0]: { - RTPPacketsReceived: s.Session.Medias[sd.Medias[1]].Formats[sd.Medias[1].Formats[0]].RTPPacketsReceived, - LocalSSRC: s.Session.Medias[sd.Medias[1]].Formats[sd.Medias[1].Formats[0]].LocalSSRC, - RemoteSSRC: s.Session.Medias[sd.Medias[1]].Formats[sd.Medias[1].Formats[0]].RemoteSSRC, - }, - }, - }, - }, - }, - }, s) + require.Equal(t, s.Conn.InboundBytes, s.Conn.BytesReceived) + require.Equal(t, s.Conn.OutboundBytes, s.Conn.BytesSent) + require.Equal(t, s.Session.InboundBytes, s.Session.BytesReceived) + require.Equal(t, s.Session.OutboundBytes, s.Session.BytesSent) + require.Equal(t, s.Session.InboundRTPPackets, s.Session.RTPPacketsReceived) + require.Equal(t, s.Session.InboundRTCPPackets, s.Session.RTCPPacketsReceived) + require.Equal(t, s.Session.OutboundRTCPPackets, s.Session.RTCPPacketsSent) + require.Len(t, s.Session.Medias, 2) + + media0 := s.Session.Medias[sd.Medias[0]] + require.Equal(t, media0.InboundBytes, media0.BytesReceived) + require.Equal(t, media0.OutboundBytes, media0.BytesSent) + require.Equal(t, media0.InboundRTCPPackets, media0.RTCPPacketsReceived) + require.Equal(t, media0.OutboundRTCPPackets, media0.RTCPPacketsSent) + format0 := media0.Formats[sd.Medias[0].Formats[0]] + require.Equal(t, format0.InboundRTPPackets, format0.RTPPacketsReceived) + require.Equal(t, format0.InboundRTPPacketsLost, format0.RTPPacketsLost) + require.Equal(t, format0.InboundRTPPacketsJitter, format0.RTPPacketsJitter) + require.Equal(t, format0.InboundRTPPacketsLastSequenceNumber, format0.RTPPacketsLastSequenceNumber) + + media1 := s.Session.Medias[sd.Medias[1]] + require.Equal(t, media1.InboundBytes, media1.BytesReceived) + require.Equal(t, media1.OutboundBytes, media1.BytesSent) + require.Equal(t, media1.InboundRTCPPackets, media1.RTCPPacketsReceived) + require.Equal(t, media1.OutboundRTCPPackets, media1.RTCPPacketsSent) + format1 := media1.Formats[sd.Medias[1].Formats[0]] + require.Equal(t, format1.InboundRTPPackets, format1.RTPPacketsReceived) + require.Equal(t, format1.InboundRTPPacketsLost, format1.RTPPacketsLost) + require.Equal(t, format1.InboundRTPPacketsJitter, format1.RTPPacketsJitter) + require.Equal(t, format1.InboundRTPPacketsLastSequenceNumber, format1.RTPPacketsLastSequenceNumber) require.Greater(t, s.Session.BytesSent, uint64(19)) require.Less(t, s.Session.BytesSent, uint64(70)) diff --git a/client_record_test.go b/client_record_test.go index 60710eea..dfc8d50e 100644 --- a/client_record_test.go +++ b/client_record_test.go @@ -451,37 +451,29 @@ func TestClientRecord(t *testing.T) { // test that stats are available after client is closed s := c.Stats() - require.Equal(t, &ClientStats{ - Conn: ConnStats{ - BytesReceived: s.Conn.BytesReceived, - BytesSent: s.Conn.BytesSent, - }, - Session: SessionStats{ - BytesReceived: s.Session.BytesReceived, - BytesSent: s.Session.BytesSent, - RTPPacketsSent: s.Session.RTPPacketsSent, - RTPPacketsReceived: s.Session.RTPPacketsReceived, - RTCPPacketsReceived: s.Session.RTCPPacketsReceived, - RTCPPacketsSent: s.Session.RTCPPacketsSent, - Medias: map[*description.Media]SessionStatsMedia{ - medias[0]: { - BytesReceived: s.Session.Medias[medias[0]].BytesReceived, - BytesSent: s.Session.Medias[medias[0]].BytesSent, - RTCPPacketsReceived: s.Session.Medias[medias[0]].RTCPPacketsReceived, - RTCPPacketsSent: s.Session.Medias[medias[0]].RTCPPacketsSent, - Formats: map[format.Format]SessionStatsFormat{ - medias[0].Formats[0]: { - RTPPacketsSent: s.Session.Medias[medias[0]].Formats[medias[0].Formats[0]].RTPPacketsSent, - RTPPacketsReceived: s.Session.Medias[medias[0]].Formats[medias[0].Formats[0]].RTPPacketsReceived, - LocalSSRC: s.Session.Medias[medias[0]].Formats[medias[0].Formats[0]].LocalSSRC, - RemoteSSRC: s.Session.Medias[medias[0]].Formats[medias[0].Formats[0]].RemoteSSRC, - RTPPacketsLastNTP: s.Session.Medias[medias[0]].Formats[medias[0].Formats[0]].RTPPacketsLastNTP, - }, - }, - }, - }, - }, - }, s) + require.Equal(t, s.Conn.InboundBytes, s.Conn.BytesReceived) + require.Equal(t, s.Conn.OutboundBytes, s.Conn.BytesSent) + require.Equal(t, s.Session.InboundBytes, s.Session.BytesReceived) + require.Equal(t, s.Session.OutboundBytes, s.Session.BytesSent) + require.Equal(t, s.Session.InboundRTPPackets, s.Session.RTPPacketsReceived) + require.Equal(t, s.Session.OutboundRTPPackets, s.Session.RTPPacketsSent) + require.Equal(t, s.Session.InboundRTCPPackets, s.Session.RTCPPacketsReceived) + require.Equal(t, s.Session.OutboundRTCPPackets, s.Session.RTCPPacketsSent) + require.Len(t, s.Session.Medias, 1) + + mediaStats := s.Session.Medias[medias[0]] + require.Equal(t, mediaStats.InboundBytes, mediaStats.BytesReceived) + require.Equal(t, mediaStats.OutboundBytes, mediaStats.BytesSent) + require.Equal(t, mediaStats.InboundRTCPPackets, mediaStats.RTCPPacketsReceived) + require.Equal(t, mediaStats.OutboundRTCPPackets, mediaStats.RTCPPacketsSent) + + formatStats := mediaStats.Formats[medias[0].Formats[0]] + require.Equal(t, formatStats.InboundRTPPackets, formatStats.RTPPacketsReceived) + require.Equal(t, formatStats.OutboundRTPPackets, formatStats.RTPPacketsSent) + require.Equal(t, formatStats.InboundRTPPacketsLost, formatStats.RTPPacketsLost) + require.Equal(t, formatStats.InboundRTPPacketsJitter, formatStats.RTPPacketsJitter) + require.Equal(t, formatStats.InboundRTPPacketsLastSequenceNumber, formatStats.RTPPacketsLastSequenceNumber) + require.Equal(t, formatStats.OutboundRTPPacketsLastNTP, formatStats.RTPPacketsLastNTP) require.Greater(t, s.Session.BytesSent, uint64(15)) require.Less(t, s.Session.BytesSent, uint64(30)) diff --git a/conn_stats.go b/conn_stats.go index c6c728b5..1834ac84 100644 --- a/conn_stats.go +++ b/conn_stats.go @@ -2,8 +2,13 @@ package gortsplib // ConnStats are connection statistics. type ConnStats struct { - // received bytes + // inbound bytes + InboundBytes uint64 + // outbound bytes + OutboundBytes uint64 + + // Deprecated: use InboundBytes. BytesReceived uint64 - // sent bytes + // Deprecated: use OutboundBytes. BytesSent uint64 } diff --git a/examples/client-play-timestamp/main.go b/examples/client-play-timestamp/main.go index 73fa1b42..eb2e76c3 100644 --- a/examples/client-play-timestamp/main.go +++ b/examples/client-play-timestamp/main.go @@ -14,7 +14,7 @@ import ( // This example shows how to: // 1. connect to a RTSP server. // 2. read all media streams on a path. -// 3. Get PTS and NTP of incoming RTP packets. +// 3. Get PTS and NTP of inbound RTP packets. func main() { // parse URL diff --git a/examples/proxy-backchannel/client.go b/examples/proxy-backchannel/client.go index a746be64..f0d0e75b 100644 --- a/examples/proxy-backchannel/client.go +++ b/examples/proxy-backchannel/client.go @@ -102,7 +102,7 @@ func (c *client) read() error { rc.OnPacketRTPAny(func(medi *description.Media, _ format.Format, pkt *rtp.Packet) { log.Printf("received RTP packet from the client, routing to readers") - // route incoming packets to the server stream + // route inbound packets to the server stream err2 := stream.WritePacketRTP(medi, pkt) if err2 != nil { log.Printf("ERR: %v", err2) diff --git a/examples/proxy/client.go b/examples/proxy/client.go index 2f678293..469f5805 100644 --- a/examples/proxy/client.go +++ b/examples/proxy/client.go @@ -73,7 +73,7 @@ func (c *client) read() error { // called when a RTP packet arrives rc.OnPacketRTPAny(func(medi *description.Media, _ format.Format, pkt *rtp.Packet) { - // route incoming packets to the server stream + // route inbound packets to the server stream err2 := stream.WritePacketRTP(medi, pkt) if err2 != nil { log.Printf("ERR: %v", err2) diff --git a/examples/server-play-backchannel/main.go b/examples/server-play-backchannel/main.go index a0774612..dce7510e 100644 --- a/examples/server-play-backchannel/main.go +++ b/examples/server-play-backchannel/main.go @@ -83,7 +83,7 @@ func (sh *serverHandler) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Re return } - log.Printf("incoming RTP packet with PTS=%v size=%v", pts, len(pkt.Payload)) + log.Printf("inbound RTP packet with PTS=%v size=%v", pts, len(pkt.Payload)) }) return &base.Response{ diff --git a/pkg/rtpreceiver/receiver.go b/pkg/rtpreceiver/receiver.go index 3c47c39b..9038dd40 100644 --- a/pkg/rtpreceiver/receiver.go +++ b/pkg/rtpreceiver/receiver.go @@ -44,21 +44,21 @@ type Receiver struct { mutex sync.RWMutex // data from RTP packets - firstRTPPacketReceived bool - timeInitialized bool - buffer []*rtp.Packet - absPos uint16 - negativeCount int - sequenceNumberCycles uint16 - lastValidSeqNum uint16 - remoteSSRC uint32 - lastRTP uint32 - lastSystem time.Time - totalLost uint64 - totalLostSinceReport uint64 - totalReceived uint64 - totalReceivedAndLostSinceReport uint64 - jitter float64 + firstRTPPacketReceived bool + timeInitialized bool + buffer []*rtp.Packet + absPos uint16 + negativeCount int + sequenceNumberCycles uint16 + lastValidSeqNum uint16 + remoteSSRC uint32 + lastRTP uint32 + lastSystem time.Time + lost uint64 + lostSinceReport uint64 + received uint64 + receivedAndLostSinceReport uint64 + jitter float64 // data from RTCP sender reports firstSenderReportReceived bool @@ -133,10 +133,10 @@ func (rr *Receiver) report() rtcp.Packet { system := rr.TimeNow() var fractionLost uint8 - if rr.totalReceivedAndLostSinceReport != 0 { + if rr.receivedAndLostSinceReport != 0 { // equivalent to taking the integer part after multiplying the // loss fraction by 256 - fractionLost = uint8((min(rr.totalLostSinceReport, 0xFFFFFF) * 256) / rr.totalReceivedAndLostSinceReport) + fractionLost = uint8((min(rr.lostSinceReport, 0xFFFFFF) * 256) / rr.receivedAndLostSinceReport) } report := &rtcp.ReceiverReport{ @@ -146,7 +146,7 @@ func (rr *Receiver) report() rtcp.Packet { SSRC: rr.remoteSSRC, LastSequenceNumber: uint32(rr.sequenceNumberCycles)<<16 | uint32(rr.lastValidSeqNum), FractionLost: fractionLost, - TotalLost: uint32(min(rr.totalLost, 0xFFFFFF)), // allow up to 24 bits + TotalLost: uint32(min(rr.lost, 0xFFFFFF)), // allow up to 24 bits Jitter: uint32(rr.jitter), }, }, @@ -162,13 +162,13 @@ func (rr *Receiver) report() rtcp.Packet { report.Reports[0].Delay = uint32(system.Sub(rr.lastSenderReportTimeSystem).Seconds() * 65536) } - rr.totalLostSinceReport = 0 - rr.totalReceivedAndLostSinceReport = 0 + rr.lostSinceReport = 0 + rr.receivedAndLostSinceReport = 0 return report } -// ProcessPacket processes an incoming RTP packet. +// ProcessPacket processes an inbound RTP packet. // It returns reordered packets and number of lost packets. // // Deprecated: replaced by ProcessPacket2. @@ -181,7 +181,7 @@ func (rr *Receiver) ProcessPacket( return pkts, lost, nil } -// ProcessPacket2 processes an incoming RTP packet. +// ProcessPacket2 processes an inbound RTP packet. // It returns reordered packets and number of lost packets. func (rr *Receiver) ProcessPacket2( pkt *rtp.Packet, @@ -194,8 +194,8 @@ func (rr *Receiver) ProcessPacket2( // first packet if !rr.firstRTPPacketReceived { rr.firstRTPPacketReceived = true - rr.totalReceived = 1 - rr.totalReceivedAndLostSinceReport = 1 + rr.received = 1 + rr.receivedAndLostSinceReport = 1 rr.lastValidSeqNum = pkt.SequenceNumber rr.remoteSSRC = pkt.SSRC @@ -218,10 +218,10 @@ func (rr *Receiver) ProcessPacket2( lost = uint64(pkt.SequenceNumber - rr.lastValidSeqNum - 1) } - rr.totalLost += lost - rr.totalLostSinceReport += lost - rr.totalReceived += uint64(len(pkts)) - rr.totalReceivedAndLostSinceReport += uint64(len(pkts)) + lost + rr.lost += lost + rr.lostSinceReport += lost + rr.received += uint64(len(pkts)) + rr.receivedAndLostSinceReport += uint64(len(pkts)) + lost for _, pkt := range pkts { // overflow @@ -348,7 +348,7 @@ func (rr *Receiver) reorder(pkt *rtp.Packet) ([]*rtp.Packet, uint64) { return ret, 0 } -// ProcessSenderReport processes an incoming RTCP sender report. +// ProcessSenderReport processes an inbound RTCP sender report. func (rr *Receiver) ProcessSenderReport(sr *rtcp.SenderReport, system time.Time) { rr.mutex.Lock() defer rr.mutex.Unlock() @@ -380,15 +380,25 @@ func (rr *Receiver) PacketNTP(ts uint32) (time.Time, bool) { // Stats are statistics. type Stats struct { + // number of inbound RTP packets correctly received and processed. + Received uint64 + // number of lost inbound RTP packets. + Lost uint64 + // Jitter of inbound RTP packets. + Jitter float64 + // Last sequence number of inbound RTP packets. + LastSequenceNumber uint16 + // Last RTP time of inbound RTP packets. + LastRTP uint32 + // Last NTP time of inbound RTP packets. + LastNTP time.Time + + // Deprecated: use Received. + TotalReceived uint64 + // Deprecated: use Lost. + TotalLost uint64 // Deprecated: will be removed in next version. RemoteSSRC uint32 - - LastSequenceNumber uint16 - LastRTP uint32 - LastNTP time.Time - Jitter float64 - TotalReceived uint64 - TotalLost uint64 } // Stats returns statistics. @@ -403,12 +413,15 @@ func (rr *Receiver) Stats() *Stats { ntp, _ := rr.packetNTPUnsafe(rr.lastRTP) return &Stats{ - RemoteSSRC: rr.remoteSSRC, + Received: rr.received, + Lost: rr.lost, + Jitter: rr.jitter, LastSequenceNumber: rr.lastValidSeqNum, LastRTP: rr.lastRTP, LastNTP: ntp, - Jitter: rr.jitter, - TotalReceived: rr.totalReceived, - TotalLost: rr.totalLost, + // deprecated + TotalReceived: rr.received, + TotalLost: rr.lost, + RemoteSSRC: rr.remoteSSRC, } } diff --git a/pkg/rtpreceiver/receiver_test.go b/pkg/rtpreceiver/receiver_test.go index 897c3cc2..7f9c4ba9 100644 --- a/pkg/rtpreceiver/receiver_test.go +++ b/pkg/rtpreceiver/receiver_test.go @@ -114,10 +114,11 @@ func TestStandard(t *testing.T) { stats := rr.Stats() require.Equal(t, &Stats{ - RemoteSSRC: 0xba9da416, + Received: 1, LastRTP: 0xafb45733, LastSequenceNumber: 945, TotalReceived: 1, + RemoteSSRC: 0xba9da416, }, stats) srPkt := rtcp.SenderReport{ @@ -136,11 +137,12 @@ func TestStandard(t *testing.T) { stats = rr.Stats() require.Equal(t, &Stats{ - RemoteSSRC: 0xba9da416, + Received: 1, LastRTP: 0xafb45733, LastSequenceNumber: 945, LastNTP: time.Date(2008, 5, 20, 22, 15, 20, 0, time.UTC).Local(), TotalReceived: 1, + RemoteSSRC: 0xba9da416, }, stats) rtpPkt = rtp.Packet{ @@ -188,11 +190,12 @@ func TestStandard(t *testing.T) { stats = rr.Stats() require.Equal(t, &Stats{ - RemoteSSRC: 0xba9da416, + Received: 3, LastRTP: 2947921603, LastSequenceNumber: 947, LastNTP: time.Date(2008, 5, 20, 22, 15, 21, 0, time.UTC).Local(), TotalReceived: 3, + RemoteSSRC: 0xba9da416, }, stats) }) } @@ -276,10 +279,11 @@ func TestZeroClockRate(t *testing.T) { stats = rr.Stats() require.Equal(t, &Stats{ - RemoteSSRC: 0xba9da416, + Received: 1, LastRTP: 0xafb45733, LastSequenceNumber: 945, TotalReceived: 1, + RemoteSSRC: 0xba9da416, }, stats) srPkt := rtcp.SenderReport{ @@ -294,10 +298,11 @@ func TestZeroClockRate(t *testing.T) { stats = rr.Stats() require.Equal(t, &Stats{ - RemoteSSRC: 0xba9da416, + Received: 1, LastRTP: 0xafb45733, LastSequenceNumber: 945, TotalReceived: 1, + RemoteSSRC: 0xba9da416, }, stats) rtpPkt = rtp.Packet{ @@ -338,10 +343,11 @@ func TestZeroClockRate(t *testing.T) { stats = rr.Stats() require.Equal(t, &Stats{ - RemoteSSRC: 0xba9da416, + Received: 3, LastRTP: 2947921603, LastSequenceNumber: 947, TotalReceived: 3, + RemoteSSRC: 0xba9da416, }, stats) } @@ -585,12 +591,14 @@ func TestReliablePacketsLost(t *testing.T) { stats := rr.Stats() require.Equal(t, &Stats{ - RemoteSSRC: 0xba9da416, + Received: 2, + Lost: 1, LastRTP: 0xafb45733, LastSequenceNumber: 290, LastNTP: time.Date(2020, 11, 21, 17, 44, 36, 869277776, time.UTC).Local(), TotalReceived: 2, TotalLost: 1, + RemoteSSRC: 0xba9da416, }, stats) } @@ -672,12 +680,14 @@ func TestReliableOverflowAndPacketsLost(t *testing.T) { stats := rr.Stats() require.Equal(t, &Stats{ - RemoteSSRC: 0xba9da416, + Received: 2, + Lost: 2, LastRTP: 0xafb45733, LastSequenceNumber: 2, LastNTP: time.Date(2020, 11, 21, 17, 44, 36, 869277776, time.UTC).Local(), TotalReceived: 2, TotalLost: 2, + RemoteSSRC: stats.RemoteSSRC, }, stats) } @@ -854,7 +864,7 @@ func TestUnrealiableReorder(t *testing.T) { stats := rr.Stats() require.Equal(t, &Stats{ - RemoteSSRC: 0, + Received: 8, LastRTP: 0, LastSequenceNumber: 1, TotalReceived: 8, @@ -946,7 +956,8 @@ func TestUnrealiableBufferFull(t *testing.T) { stats := rr.Stats() require.Equal(t, &Stats{ - RemoteSSRC: 0, + Received: 32, + Lost: 34, LastRTP: 0, LastSequenceNumber: 1629, TotalReceived: 32, @@ -1017,7 +1028,7 @@ func TestUnrealiableReset(t *testing.T) { stats := rr.Stats() require.Equal(t, &Stats{ - RemoteSSRC: 0, + Received: 2, LastRTP: 0, LastSequenceNumber: 40064, TotalReceived: 2, @@ -1078,6 +1089,8 @@ func TestUnrealiableCustomBufferSize(t *testing.T) { stats := rr.Stats() require.Equal(t, &Stats{ + Received: 2, + Lost: 130, LastSequenceNumber: 181, TotalReceived: 2, TotalLost: 130, diff --git a/pkg/rtpsender/sender.go b/pkg/rtpsender/sender.go index f2c16a69..f09f7a2c 100644 --- a/pkg/rtpsender/sender.go +++ b/pkg/rtpsender/sender.go @@ -29,7 +29,7 @@ type Sender struct { lastSystem time.Time localSSRC uint32 lastSequenceNumber uint16 - totalSent uint64 + sent uint64 octetCount uint32 terminate chan struct{} @@ -90,7 +90,7 @@ func (rs *Sender) report() rtcp.Packet { SSRC: rs.localSSRC, NTPTime: ntp.Encode(ntpTime), RTPTime: rtpTime, - PacketCount: uint32(rs.totalSent), + PacketCount: uint32(rs.sent), OctetCount: rs.octetCount, } } @@ -110,16 +110,23 @@ func (rs *Sender) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS boo rs.lastSequenceNumber = pkt.SequenceNumber - rs.totalSent++ + rs.sent++ rs.octetCount += uint32(len(pkt.Payload)) } // Stats are statistics. type Stats struct { + // number of outbound RTP packets. + Sent uint64 + // last sequence number of outbound RTP packets. LastSequenceNumber uint16 - LastRTP uint32 - LastNTP time.Time - TotalSent uint64 + // last RTP time of outbound RTP packets. + LastRTP uint32 + // last NTP time of outbound RTP packets. + LastNTP time.Time + + // Deprecated: use Sent. + TotalSent uint64 } // Stats returns statistics. @@ -132,9 +139,11 @@ func (rs *Sender) Stats() *Stats { } return &Stats{ + Sent: rs.sent, LastSequenceNumber: rs.lastSequenceNumber, LastRTP: rs.lastRTP, LastNTP: rs.lastNTP, - TotalSent: rs.totalSent, + // deprecated + TotalSent: rs.sent, } } diff --git a/pkg/rtpsender/sender_test.go b/pkg/rtpsender/sender_test.go index 2f526ee2..9ed552b7 100644 --- a/pkg/rtpsender/sender_test.go +++ b/pkg/rtpsender/sender_test.go @@ -101,6 +101,7 @@ func TestSender(t *testing.T) { stats = rs.Stats() require.Equal(t, &Stats{ + Sent: 3, LastSequenceNumber: 948, LastRTP: 1287987768, LastNTP: time.Date(2008, time.May, 20, 22, 15, 21, 0, time.UTC), @@ -161,6 +162,7 @@ func TestSenderZeroClockRate(t *testing.T) { stats = rs.Stats() require.Equal(t, &Stats{ + Sent: 1, LastSequenceNumber: 946, LastRTP: 1287987768, LastNTP: time.Date(2008, time.May, 20, 22, 15, 20, 0, time.UTC), diff --git a/server.go b/server.go index 92f7083d..d165d391 100644 --- a/server.go +++ b/server.go @@ -102,10 +102,10 @@ type Server struct { // This can be increased to reduce packet losses. // It defaults to the operating system default value. UDPReadBufferSize int - // Size of the queue of outgoing packets. + // Size of the queue of outbound packets. // It defaults to 256. WriteQueueSize int - // maximum size of outgoing RTP / RTCP packets. + // maximum size of outbound RTP / RTCP packets. // This must be less than the IPv4/UDP MTU (1472 bytes). // It defaults to 1472. MaxPacketSize int diff --git a/server_conn.go b/server_conn.go index d92c73ed..ef29c564 100644 --- a/server_conn.go +++ b/server_conn.go @@ -202,6 +202,8 @@ func (sc *ServerConn) Transport() *ConnTransport { // Stats returns connection statistics. func (sc *ServerConn) Stats() *ConnStats { return &ConnStats{ + InboundBytes: sc.bc.BytesReceived(), + OutboundBytes: sc.bc.BytesSent(), BytesReceived: sc.bc.BytesReceived(), BytesSent: sc.bc.BytesSent(), } diff --git a/server_play_test.go b/server_play_test.go index ff3e7183..9560f855 100644 --- a/server_play_test.go +++ b/server_play_test.go @@ -2693,21 +2693,28 @@ func TestServerPlayStreamStats(t *testing.T) { st := stream.Stats() require.Equal(t, &ServerStreamStats{ - BytesSent: 32, - RTPPacketsSent: 2, + OutboundBytes: 32, + OutboundRTPPackets: 2, + OutboundRTCPPackets: 0, Medias: map[*description.Media]ServerStreamStatsMedia{ stream.Desc.Medias[0]: { - BytesSent: 32, - RTCPPacketsSent: 0, + OutboundBytes: 32, + OutboundRTCPPackets: 0, Formats: map[format.Format]ServerStreamStatsFormat{ stream.Desc.Medias[0].Formats[0]: { - RTPPacketsSent: 2, + OutboundRTPPackets: 2, LocalSSRC: st.Medias[stream.Desc.Medias[0]]. Formats[stream.Desc.Medias[0].Formats[0]].LocalSSRC, + RTPPacketsSent: 2, }, }, + BytesSent: 32, + RTCPPacketsSent: 0, }, }, + BytesSent: 32, + RTPPacketsSent: 2, + RTCPPacketsSent: 0, }, st) } diff --git a/server_session.go b/server_session.go index 6946eec1..48df8d3c 100644 --- a/server_session.go +++ b/server_session.go @@ -457,92 +457,8 @@ func (ss *ServerSession) Stats() *SessionStats { return ret }() - return &SessionStats{ //nolint:dupl - BytesReceived: func() uint64 { - v := uint64(0) - for _, ms := range mediaStats { - v += ms.BytesReceived - } - return v - }(), - BytesSent: func() uint64 { - v := uint64(0) - for _, ms := range mediaStats { - v += ms.BytesSent - } - return v - }(), - RTPPacketsReceived: func() uint64 { - v := uint64(0) - for _, ms := range mediaStats { - for _, f := range ms.Formats { - v += f.RTPPacketsReceived - } - } - return v - }(), - RTPPacketsSent: func() uint64 { - v := uint64(0) - for _, ms := range mediaStats { - for _, f := range ms.Formats { - v += f.RTPPacketsSent - } - } - return v - }(), - RTPPacketsLost: func() uint64 { - v := uint64(0) - for _, ms := range mediaStats { - for _, f := range ms.Formats { - v += f.RTPPacketsLost - } - } - return v - }(), - RTPPacketsInError: func() uint64 { - v := uint64(0) - for _, ms := range mediaStats { - v += ms.RTPPacketsInError - } - return v - }(), - RTPPacketsJitter: func() float64 { - v := float64(0) - n := float64(0) - for _, ms := range mediaStats { - for _, f := range ms.Formats { - v += f.RTPPacketsJitter - n++ - } - } - if n != 0 { - return v / n - } - return 0 - }(), - RTCPPacketsReceived: func() uint64 { - v := uint64(0) - for _, ms := range mediaStats { - v += ms.RTCPPacketsReceived - } - return v - }(), - RTCPPacketsSent: func() uint64 { - v := uint64(0) - for _, ms := range mediaStats { - v += ms.RTCPPacketsSent - } - return v - }(), - RTCPPacketsInError: func() uint64 { - v := uint64(0) - for _, ms := range mediaStats { - v += ms.RTCPPacketsInError - } - return v - }(), - Medias: mediaStats, - } + stats := sessionStatsFromMedias(mediaStats) + return &stats } func (ss *ServerSession) onStreamWriteError(err error) { @@ -1636,7 +1552,7 @@ func (ss *ServerSession) WritePacketRTCP(medi *description.Media, pkt rtcp.Packe return sm.writePacketRTCP(pkt) } -// PacketPTS returns the PTS (presentation timestamp) of an incoming RTP packet. +// PacketPTS returns the PTS (presentation timestamp) of an inbound RTP packet. // It is computed by decoding the packet timestamp and sychronizing it with other tracks. func (ss *ServerSession) PacketPTS(medi *description.Media, pkt *rtp.Packet) (int64, bool) { sm := ss.setuppedMedias[medi] @@ -1644,7 +1560,7 @@ func (ss *ServerSession) PacketPTS(medi *description.Media, pkt *rtp.Packet) (in return ss.timeDecoder.Decode(sf.format, pkt) } -// PacketNTP returns the NTP (absolute timestamp) of an incoming RTP packet. +// PacketNTP returns the NTP (absolute timestamp) of an inbound RTP packet. // The NTP is computed from RTCP sender reports. func (ss *ServerSession) PacketNTP(medi *description.Media, pkt *rtp.Packet) (time.Time, bool) { sm := ss.setuppedMedias[medi] diff --git a/server_session_format.go b/server_session_format.go index c0257076..eb034a49 100644 --- a/server_session_format.go +++ b/server_session_format.go @@ -95,24 +95,66 @@ func (ssf *serverSessionFormat) stats() SessionStatsFormat { //nolint:dupl } return SessionStatsFormat{ - RTPPacketsReceived: func() uint64 { + InboundRTPPackets: func() uint64 { if recvStats != nil { - return recvStats.TotalReceived + return recvStats.Received } return 0 }(), - RTPPacketsSent: func() uint64 { + InboundRTPPacketsLost: func() uint64 { + if recvStats != nil { + return recvStats.Lost + } + return 0 + }(), + InboundRTPPacketsJitter: func() float64 { + if recvStats != nil { + return recvStats.Jitter + } + return 0 + }(), + InboundRTPPacketsLastSequenceNumber: func() uint16 { + if recvStats != nil { + return recvStats.LastSequenceNumber + } + return 0 + }(), + InboundRTPPacketsLastRTP: func() uint32 { + if recvStats != nil { + return recvStats.LastRTP + } + return 0 + }(), + InboundRTPPacketsLastNTP: func() time.Time { + if recvStats != nil { + return recvStats.LastNTP + } + return time.Time{} + }(), + OutboundRTPPackets: func() uint64 { if sentStats != nil { - return sentStats.TotalSent + return sentStats.Sent } return 0 }(), - RTPPacketsLost: func() uint64 { - if recvStats != nil { - return recvStats.TotalLost + OutboundRTPPacketsLastSequenceNumber: func() uint16 { + if sentStats != nil { + return sentStats.LastSequenceNumber } return 0 }(), + OutboundRTPPacketsLastRTP: func() uint32 { + if sentStats != nil { + return sentStats.LastRTP + } + return 0 + }(), + OutboundRTPPacketsLastNTP: func() time.Time { + if sentStats != nil { + return sentStats.LastNTP + } + return time.Time{} + }(), LocalSSRC: ssf.localSSRC, RemoteSSRC: func() uint32 { if v, ok := ssf.remoteSSRC(); ok { @@ -120,6 +162,25 @@ func (ssf *serverSessionFormat) stats() SessionStatsFormat { //nolint:dupl } return 0 }(), + // deprecated + RTPPacketsReceived: func() uint64 { + if recvStats != nil { + return recvStats.Received + } + return 0 + }(), + RTPPacketsSent: func() uint64 { + if sentStats != nil { + return sentStats.Sent + } + return 0 + }(), + RTPPacketsLost: func() uint64 { + if recvStats != nil { + return recvStats.Lost + } + return 0 + }(), RTPPacketsLastSequenceNumber: func() uint16 { if recvStats != nil { return recvStats.LastSequenceNumber diff --git a/server_session_media.go b/server_session_media.go index 0dfbc2be..ed5d4f62 100644 --- a/server_session_media.go +++ b/server_session_media.go @@ -143,14 +143,14 @@ func (ssm *serverSessionMedia) stop() { } } -func (ssm *serverSessionMedia) stats() SessionStatsMedia { +func (ssm *serverSessionMedia) stats() SessionStatsMedia { //nolint:dupl return SessionStatsMedia{ - BytesReceived: atomic.LoadUint64(ssm.bytesReceived), - BytesSent: atomic.LoadUint64(ssm.bytesSent), - RTPPacketsInError: atomic.LoadUint64(ssm.rtpPacketsInError), - RTCPPacketsReceived: atomic.LoadUint64(ssm.rtcpPacketsReceived), - RTCPPacketsSent: atomic.LoadUint64(ssm.rtcpPacketsSent), - RTCPPacketsInError: atomic.LoadUint64(ssm.rtcpPacketsInError), + InboundBytes: atomic.LoadUint64(ssm.bytesReceived), + InboundRTPPacketsInError: atomic.LoadUint64(ssm.rtpPacketsInError), + InboundRTCPPackets: atomic.LoadUint64(ssm.rtcpPacketsReceived), + InboundRTCPPacketsInError: atomic.LoadUint64(ssm.rtcpPacketsInError), + OutboundBytes: atomic.LoadUint64(ssm.bytesSent), + OutboundRTCPPackets: atomic.LoadUint64(ssm.rtcpPacketsSent), Formats: func() map[format.Format]SessionStatsFormat { ret := make(map[format.Format]SessionStatsFormat, len(ssm.formats)) for _, ssf := range ssm.formats { @@ -158,6 +158,13 @@ func (ssm *serverSessionMedia) stats() SessionStatsMedia { } return ret }(), + // deprecated + BytesReceived: atomic.LoadUint64(ssm.bytesReceived), + BytesSent: atomic.LoadUint64(ssm.bytesSent), + RTPPacketsInError: atomic.LoadUint64(ssm.rtpPacketsInError), + RTCPPacketsReceived: atomic.LoadUint64(ssm.rtcpPacketsReceived), + RTCPPacketsSent: atomic.LoadUint64(ssm.rtcpPacketsSent), + RTCPPacketsInError: atomic.LoadUint64(ssm.rtcpPacketsInError), } } diff --git a/server_stream.go b/server_stream.go index 1046054b..5e7ee387 100644 --- a/server_stream.go +++ b/server_stream.go @@ -149,10 +149,35 @@ func (st *ServerStream) Stats() *ServerStreamStats { }() return &ServerStreamStats{ + OutboundBytes: func() uint64 { + v := uint64(0) + for _, ms := range mediaStats { + v += ms.OutboundBytes + } + return v + }(), + OutboundRTPPackets: func() uint64 { + v := uint64(0) + for _, ms := range mediaStats { + for _, f := range ms.Formats { + v += f.OutboundRTPPackets + } + } + return v + }(), + OutboundRTCPPackets: func() uint64 { + v := uint64(0) + for _, ms := range mediaStats { + v += ms.OutboundRTCPPackets + } + return v + }(), + Medias: mediaStats, + // deprecated BytesSent: func() uint64 { v := uint64(0) for _, ms := range mediaStats { - v += ms.BytesSent + v += ms.OutboundBytes } return v }(), @@ -160,7 +185,7 @@ func (st *ServerStream) Stats() *ServerStreamStats { v := uint64(0) for _, ms := range mediaStats { for _, f := range ms.Formats { - v += f.RTPPacketsSent + v += f.OutboundRTPPackets } } return v @@ -168,11 +193,10 @@ func (st *ServerStream) Stats() *ServerStreamStats { RTCPPacketsSent: func() uint64 { v := uint64(0) for _, ms := range mediaStats { - v += ms.RTCPPacketsSent + v += ms.OutboundRTCPPackets } return v }(), - Medias: mediaStats, } } diff --git a/server_stream_format.go b/server_stream_format.go index ec9f7169..5ecdc380 100644 --- a/server_stream_format.go +++ b/server_stream_format.go @@ -71,9 +71,12 @@ func (ssf *serverStreamFormat) stats() ServerStreamStatsFormat { ssf.mutex.RLock() defer ssf.mutex.RUnlock() + rtpPacketsSent := ssf.rtpPacketsSent + return ServerStreamStatsFormat{ - RTPPacketsSent: ssf.rtpPacketsSent, - LocalSSRC: ssf.localSSRC, + OutboundRTPPackets: rtpPacketsSent, + LocalSSRC: ssf.localSSRC, + RTPPacketsSent: rtpPacketsSent, } } diff --git a/server_stream_media.go b/server_stream_media.go index b41a2ffc..6bda8848 100644 --- a/server_stream_media.go +++ b/server_stream_media.go @@ -53,9 +53,12 @@ func (ssm *serverStreamMedia) rtpInfoEntry(now time.Time) *headers.RTPInfoEntry } func (ssm *serverStreamMedia) stats() ServerStreamStatsMedia { + bytesSent := atomic.LoadUint64(ssm.bytesSent) + rtcpPacketsSent := atomic.LoadUint64(ssm.rtcpPacketsSent) + return ServerStreamStatsMedia{ - BytesSent: atomic.LoadUint64(ssm.bytesSent), - RTCPPacketsSent: atomic.LoadUint64(ssm.rtcpPacketsSent), + OutboundBytes: bytesSent, + OutboundRTCPPackets: rtcpPacketsSent, Formats: func() map[format.Format]ServerStreamStatsFormat { ret := make(map[format.Format]ServerStreamStatsFormat) for _, ssf := range ssm.formats { @@ -63,6 +66,9 @@ func (ssm *serverStreamMedia) stats() ServerStreamStatsMedia { } return ret }(), + // deprecated + BytesSent: bytesSent, + RTCPPacketsSent: rtcpPacketsSent, } } diff --git a/server_stream_stats.go b/server_stream_stats.go index a3784ff2..6caf3221 100644 --- a/server_stream_stats.go +++ b/server_stream_stats.go @@ -7,32 +7,47 @@ import ( // ServerStreamStatsFormat are stream format statistics. type ServerStreamStatsFormat struct { - // number of sent RTP packets - RTPPacketsSent uint64 + // number of outbound RTP packets + OutboundRTPPackets uint64 // local SSRC LocalSSRC uint32 + + // Deprecated: use OutboundRTPPackets. + RTPPacketsSent uint64 } // ServerStreamStatsMedia are stream media statistics. type ServerStreamStatsMedia struct { - // sent bytes - BytesSent uint64 - // number of sent RTCP packets - RTCPPacketsSent uint64 + // outbound bytes + OutboundBytes uint64 + // number of outbound RTCP packets + OutboundRTCPPackets uint64 // format statistics Formats map[format.Format]ServerStreamStatsFormat + + // Deprecated: use OutboundBytes. + BytesSent uint64 + // Deprecated: use OutboundRTCPPackets. + RTCPPacketsSent uint64 } // ServerStreamStats are stream statistics. type ServerStreamStats struct { - // sent bytes - BytesSent uint64 - // number of sent RTP packets - RTPPacketsSent uint64 - // number of sent RTCP packets - RTCPPacketsSent uint64 + // outbound bytes + OutboundBytes uint64 + // number of outbound RTP packets + OutboundRTPPackets uint64 + // number of outbound RTCP packets + OutboundRTCPPackets uint64 // media statistics Medias map[*description.Media]ServerStreamStatsMedia + + // Deprecated: use OutboundBytes. + BytesSent uint64 + // Deprecated: use OutboundRTPPackets. + RTPPacketsSent uint64 + // Deprecated: use OutboundRTCPPackets. + RTCPPacketsSent uint64 } diff --git a/session_stats.go b/session_stats.go index a289c832..34669017 100644 --- a/session_stats.go +++ b/session_stats.go @@ -9,68 +9,187 @@ import ( // SessionStatsFormat are session format statistics. type SessionStatsFormat struct { - // number of RTP packets correctly received and processed - RTPPacketsReceived uint64 - // number of sent RTP packets - RTPPacketsSent uint64 - // number of lost RTP packets - RTPPacketsLost uint64 - // mean jitter of received RTP packets - RTPPacketsJitter float64 + // number of inbound RTP packets correctly received and processed + InboundRTPPackets uint64 + // number of lost inbound RTP packets + InboundRTPPacketsLost uint64 + // mean jitter of inbound RTP packets + InboundRTPPacketsJitter float64 + // last sequence number of inbound RTP packets + InboundRTPPacketsLastSequenceNumber uint16 + // last RTP time of inbound RTP packets + InboundRTPPacketsLastRTP uint32 + // last NTP time of inbound RTP packets + InboundRTPPacketsLastNTP time.Time + + // number of outbound RTP packets + OutboundRTPPackets uint64 + // last sequence number of outbound RTP packets + OutboundRTPPacketsLastSequenceNumber uint16 + // last RTP time of outbound RTP packets + OutboundRTPPacketsLastRTP uint32 + // last NTP time of outbound RTP packets + OutboundRTPPacketsLastNTP time.Time + // local SSRC LocalSSRC uint32 // remote SSRC RemoteSSRC uint32 - // last sequence number of incoming/outgoing RTP packets + + // Deprecated: use InboundRTPPacketsLastSequenceNumber or OutboundRTPPacketsLastSequenceNumber. RTPPacketsLastSequenceNumber uint16 - // last RTP time of incoming/outgoing RTP packets + // Deprecated: use InboundRTPPacketsLastRTP or OutboundRTPPacketsLastRTP. RTPPacketsLastRTP uint32 - // last NTP time of incoming/outgoing NTP packets + // Deprecated: use InboundRTPPacketsLastNTP or OutboundRTPPacketsLastNTP. RTPPacketsLastNTP time.Time + // Deprecated: use InboundRTPPackets. + RTPPacketsReceived uint64 + // Deprecated: use OutboundRTPPackets. + RTPPacketsSent uint64 + // Deprecated: use InboundRTPPacketsLost. + RTPPacketsLost uint64 + // Deprecated: use InboundRTPPacketsJitter. + RTPPacketsJitter float64 } // SessionStatsMedia are session media statistics. type SessionStatsMedia struct { - // received bytes - BytesReceived uint64 - // sent bytes - BytesSent uint64 - // number of RTP packets that could not be processed - RTPPacketsInError uint64 - // number of RTCP packets correctly received and processed - RTCPPacketsReceived uint64 - // number of sent RTCP packets - RTCPPacketsSent uint64 - // number of RTCP packets that could not be processed - RTCPPacketsInError uint64 + // inbound bytes + InboundBytes uint64 + // number of inbound RTP packets that could not be processed + InboundRTPPacketsInError uint64 + // number of inbound RTCP packets correctly received and processed + InboundRTCPPackets uint64 + // number of inbound RTCP packets that could not be processed + InboundRTCPPacketsInError uint64 + + // outbound bytes + OutboundBytes uint64 + // number of outbound RTCP packets + OutboundRTCPPackets uint64 // format statistics Formats map[format.Format]SessionStatsFormat + + // Deprecated: use InboundBytes. + BytesReceived uint64 + // Deprecated: use OutboundBytes. + BytesSent uint64 + // Deprecated: use InboundRTPPacketsInError. + RTPPacketsInError uint64 + // Deprecated: use InboundRTCPPackets. + RTCPPacketsReceived uint64 + // Deprecated: use OutboundRTCPPackets. + RTCPPacketsSent uint64 + // Deprecated: use InboundRTCPPacketsInError. + RTCPPacketsInError uint64 } // SessionStats are session statistics. type SessionStats struct { - // received bytes - BytesReceived uint64 - // sent bytes - BytesSent uint64 - // number of RTP packets correctly received and processed - RTPPacketsReceived uint64 - // number of sent RTP packets - RTPPacketsSent uint64 - // number of lost RTP packets - RTPPacketsLost uint64 - // number of RTP packets that could not be processed - RTPPacketsInError uint64 - // mean jitter of received RTP packets - RTPPacketsJitter float64 - // number of RTCP packets correctly received and processed - RTCPPacketsReceived uint64 - // number of sent RTCP packets - RTCPPacketsSent uint64 - // number of RTCP packets that could not be processed - RTCPPacketsInError uint64 + // inbound bytes + InboundBytes uint64 + // number of inbound RTP packets correctly received and processed + InboundRTPPackets uint64 + // number of lost inbound RTP packets + InboundRTPPacketsLost uint64 + // number of inbound RTP packets that could not be processed + InboundRTPPacketsInError uint64 + // mean jitter of inbound RTP packets + InboundRTPPacketsJitter float64 + // number of inbound RTCP packets correctly received and processed + InboundRTCPPackets uint64 + // number of inbound RTCP packets that could not be processed + InboundRTCPPacketsInError uint64 + + // outbound bytes + OutboundBytes uint64 + // number of outbound RTP packets + OutboundRTPPackets uint64 + // number of outbound RTCP packets + OutboundRTCPPackets uint64 // media statistics Medias map[*description.Media]SessionStatsMedia + + // Deprecated: use InboundBytes. + BytesReceived uint64 + // Deprecated: use OutboundBytes. + BytesSent uint64 + // Deprecated: use InboundRTPPackets. + RTPPacketsReceived uint64 + // Deprecated: use OutboundRTPPackets. + RTPPacketsSent uint64 + // Deprecated: use InboundRTPPacketsLost. + RTPPacketsLost uint64 + // Deprecated: use InboundRTPPacketsInError. + RTPPacketsInError uint64 + // Deprecated: use InboundRTPPacketsJitter. + RTPPacketsJitter float64 + // Deprecated: use InboundRTCPPackets. + RTCPPacketsReceived uint64 + // Deprecated: use OutboundRTCPPackets. + RTCPPacketsSent uint64 + // Deprecated: use InboundRTCPPacketsInError. + RTCPPacketsInError uint64 +} + +func sessionStatsFromMedias(mediaStats map[*description.Media]SessionStatsMedia) SessionStats { + inboundBytes := uint64(0) + outboundBytes := uint64(0) + inboundRTPPackets := uint64(0) + outboundRTPPackets := uint64(0) + inboundRTPPacketsLost := uint64(0) + inboundRTPPacketsInError := uint64(0) + inboundRTPPacketsJitter := float64(0) + inboundRTPPacketsJitterCount := float64(0) + inboundRTCPPackets := uint64(0) + outboundRTCPPackets := uint64(0) + inboundRTCPPacketsInError := uint64(0) + + for _, ms := range mediaStats { + inboundBytes += ms.InboundBytes + outboundBytes += ms.OutboundBytes + inboundRTPPacketsInError += ms.InboundRTPPacketsInError + inboundRTCPPackets += ms.InboundRTCPPackets + outboundRTCPPackets += ms.OutboundRTCPPackets + inboundRTCPPacketsInError += ms.InboundRTCPPacketsInError + + for _, fs := range ms.Formats { + inboundRTPPackets += fs.InboundRTPPackets + outboundRTPPackets += fs.OutboundRTPPackets + inboundRTPPacketsLost += fs.InboundRTPPacketsLost + inboundRTPPacketsJitter += fs.InboundRTPPacketsJitter + inboundRTPPacketsJitterCount++ + } + } + + if inboundRTPPacketsJitterCount != 0 { + inboundRTPPacketsJitter /= inboundRTPPacketsJitterCount + } + + return SessionStats{ + InboundBytes: inboundBytes, + InboundRTPPackets: inboundRTPPackets, + InboundRTPPacketsLost: inboundRTPPacketsLost, + InboundRTPPacketsInError: inboundRTPPacketsInError, + InboundRTPPacketsJitter: inboundRTPPacketsJitter, + InboundRTCPPackets: inboundRTCPPackets, + InboundRTCPPacketsInError: inboundRTCPPacketsInError, + OutboundBytes: outboundBytes, + OutboundRTPPackets: outboundRTPPackets, + OutboundRTCPPackets: outboundRTCPPackets, + Medias: mediaStats, + // deprecated + BytesReceived: inboundBytes, + BytesSent: outboundBytes, + RTPPacketsReceived: inboundRTPPackets, + RTPPacketsSent: outboundRTPPackets, + RTPPacketsLost: inboundRTPPacketsLost, + RTPPacketsInError: inboundRTPPacketsInError, + RTPPacketsJitter: inboundRTPPacketsJitter, + RTCPPacketsReceived: inboundRTCPPackets, + RTCPPacketsSent: outboundRTCPPackets, + RTCPPacketsInError: inboundRTCPPacketsInError, + } }