mirror of
https://github.com/Monibuca/engine.git
synced 2026-04-22 15:57:03 +08:00
105 lines
2.9 KiB
Go
105 lines
2.9 KiB
Go
package track
|
|
|
|
import (
|
|
"time"
|
|
|
|
"github.com/pion/rtp"
|
|
"go.uber.org/zap"
|
|
. "m7s.live/engine/v4/common"
|
|
"m7s.live/engine/v4/util"
|
|
)
|
|
|
|
const RTPMTU = 1400
|
|
|
|
// WriteRTPPack 写入已反序列化的RTP包,已经排序过了的
|
|
func (av *Media) WriteRTPPack(p *rtp.Packet) {
|
|
var frame RTPFrame
|
|
p.SSRC = av.SSRC
|
|
p.Padding = false
|
|
p.PaddingSize = 0
|
|
frame.Packet = p
|
|
av.Value.BytesIn += len(frame.Payload) + 12
|
|
av.lastSeq2 = av.lastSeq
|
|
av.lastSeq = frame.SequenceNumber
|
|
av.DropCount += int(av.lastSeq - av.lastSeq2 - 1)
|
|
if len(p.Payload) > 0 {
|
|
av.WriteRTPFrame(util.NewListItem(frame))
|
|
}
|
|
}
|
|
|
|
// WriteRTPFrame 写入未反序列化的RTP包, 未排序的
|
|
func (av *Media) WriteRTP(raw *LIRTP) {
|
|
for frame := av.recorderRTP(raw); frame != nil; frame = av.nextRTPFrame() {
|
|
frame.Value.SSRC = av.SSRC
|
|
av.Value.BytesIn += len(frame.Value.Payload) + 12
|
|
av.DropCount += int(av.lastSeq - av.lastSeq2 - 1)
|
|
if len(frame.Value.Payload) > 0 {
|
|
av.WriteRTPFrame(frame)
|
|
// av.Info("rtp", zap.Uint32("ts", (frame.Value.Timestamp)), zap.Int("len", len(frame.Value.Payload)), zap.Bool("marker", frame.Value.Marker), zap.Uint16("seq", frame.Value.SequenceNumber))
|
|
} else {
|
|
av.Debug("rtp payload is empty", zap.Uint32("ts", (frame.Value.Timestamp)), zap.Any("ext", frame.Value.GetExtensionIDs()), zap.Uint16("seq", frame.Value.SequenceNumber))
|
|
frame.Recycle()
|
|
}
|
|
}
|
|
}
|
|
|
|
// https://www.cnblogs.com/moonwalk/p/15903760.html
|
|
// Packetize packetizes the payload of an RTP packet and returns one or more RTP packets
|
|
func (av *Media) PacketizeRTP(payloads ...[][]byte) {
|
|
var rtpItem *LIRTP
|
|
for _, pp := range payloads {
|
|
rtpItem = av.GetRTPFromPool()
|
|
packet := &rtpItem.Value
|
|
br := util.LimitBuffer{Buffer: packet.Payload}
|
|
if av.SampleRate != 90000 {
|
|
packet.Timestamp = uint32(time.Duration(av.SampleRate) * av.Value.PTS / 90000)
|
|
} else {
|
|
packet.Timestamp = uint32(av.Value.PTS)
|
|
}
|
|
packet.Marker = false
|
|
for _, p := range pp {
|
|
if _, err := br.Write(p); err != nil {
|
|
av.Error("rtp payload write error", zap.Error(err))
|
|
for i, pp := range payloads {
|
|
for j, p := range pp {
|
|
av.Error("rtp payload", zap.Int("i", i), zap.Int("j", j), zap.Int("len", len(p)))
|
|
}
|
|
}
|
|
return
|
|
}
|
|
}
|
|
packet.Payload = br.Bytes()
|
|
av.Value.RTP.Push(rtpItem)
|
|
}
|
|
// 最后一个rtp包标记为true
|
|
rtpItem.Value.Marker = true
|
|
}
|
|
|
|
type RTPDemuxer struct {
|
|
lastSeq uint16 //上一个rtp包的序号
|
|
lastSeq2 uint16 //上上一个rtp包的序号
|
|
乱序重排 util.RTPReorder[*LIRTP]
|
|
}
|
|
|
|
// 获取缓存中下一个rtpFrame
|
|
func (av *RTPDemuxer) nextRTPFrame() (frame *LIRTP) {
|
|
frame = av.乱序重排.Pop()
|
|
if frame == nil {
|
|
return
|
|
}
|
|
av.lastSeq2 = av.lastSeq
|
|
av.lastSeq = frame.Value.SequenceNumber
|
|
return
|
|
}
|
|
|
|
// 对RTP包乱序重排
|
|
func (av *RTPDemuxer) recorderRTP(item *LIRTP) (frame *LIRTP) {
|
|
frame = av.乱序重排.Push(item.Value.SequenceNumber, item)
|
|
if frame == nil {
|
|
return
|
|
}
|
|
av.lastSeq2 = av.lastSeq
|
|
av.lastSeq = frame.Value.SequenceNumber
|
|
return
|
|
}
|