增加对PS流的支持

This commit is contained in:
langhuihui
2020-09-20 15:51:26 +08:00
parent 1b12ef72f2
commit cee1064660
6 changed files with 385 additions and 20 deletions
+1 -1
View File
@@ -1,3 +1,3 @@
# plugin-rtp
rtp 插件用于给rtsp、webrtc 提供基础功能
rtp 插件用于给rtsp、webrtc、gb28181 提供基础功能
主要作用是将RTP包解码传入引擎中
+110
View File
@@ -0,0 +1,110 @@
package plugin_rtp
import "errors"
//
const (
UDPTransfer int = 0
TCPTransferActive int = 1
TCPTransferPassive int = 2
LocalCache int = 3
)
//
const (
StreamTypeH264 = 0x1b
StreamTypeH265 = 0x24
G711Mu = 0x90
G7221AUDIOTYPE = 0x92
G7231AUDIOTYPE = 0x93
G729AUDIOTYPE = 0x99
)
//
const (
StreamIDVideo = 0xe0
StreamIDAudio = 0xc0
)
//
const (
StartCodePS = 0x000001ba
StartCodeSYS = 0x000001bb
StartCodeMAP = 0x000001bc
StartCodeVideo = 0x000001e0
StartCodeAudio = 0x000001c0
HaiKangCode = 0x000001bd
MEPGProgramEndCode = 0x000001b9
)
//... len limit
const (
RTPHeaderLength int = 12
PSHeaderLength int = 14
SystemHeaderLength int = 18
MAPHeaderLength int = 24
PESHeaderLength int = 19
RtpLoadLength int = 1460
PESLoadLength int = 0xFFFF
MAXFrameLen int = 1024 * 1024 * 2
)
var (
ErrNotFoundStartCode = errors.New("not found the need start code flag")
ErrMarkerBit = errors.New("marker bit value error")
ErrFormatPack = errors.New("not package standard")
ErrParsePakcet = errors.New("parse ps packet error")
)
/*
This implement from VLC source code
notes: https://github.com/videolan/vlc/blob/master/modules/mux/mpeg/bits.h
*/
//bitsBuffer bits buffer
type bitsBuffer struct {
iSize int
iData int
iMask uint8
pData []byte
}
func bitsInit(isize int, buffer []byte) *bitsBuffer {
bits := &bitsBuffer{
iSize: isize,
iData: 0,
iMask: 0x80,
pData: buffer,
}
if bits.pData == nil {
bits.pData = make([]byte, isize)
}
return bits
}
func bitsAlign(bits *bitsBuffer) {
if bits.iMask != 0x80 && bits.iData < bits.iSize {
bits.iMask = 0x80
bits.iData++
bits.pData[bits.iData] = 0x00
}
}
func bitsWrite(bits *bitsBuffer, count int, src uint64) *bitsBuffer {
for count > 0 {
count--
if ((src >> uint(count)) & 0x01) != 0 {
bits.pData[bits.iData] |= bits.iMask
} else {
bits.pData[bits.iData] &= ^bits.iMask
}
bits.iMask >>= 1
if bits.iMask == 0 {
bits.iData++
bits.iMask = 0x80
}
}
return bits
}
+2 -1
View File
@@ -3,6 +3,7 @@ module github.com/Monibuca/plugin-rtp
go 1.13
require (
github.com/Monibuca/engine/v2 v2.1.0
github.com/Monibuca/engine/v2 v2.2.0
github.com/mask-pp/rtp-ps v1.0.0
github.com/pion/rtp v1.5.4
)
+4 -2
View File
@@ -1,7 +1,7 @@
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Monibuca/engine/v2 v2.1.0 h1:pHeDCEFDusKFsZLpconYj8U5LCaWApnjd+yQRHYgQsQ=
github.com/Monibuca/engine/v2 v2.1.0/go.mod h1:34EYjjV15G6myuHOKaJkO7y5tJ1Arq/NfC9Weacr2mc=
github.com/Monibuca/engine/v2 v2.2.0 h1:A4SyWwzVLegd8Oa6LfSW3LpNfBmWq+MHJJLO55gvaYI=
github.com/Monibuca/engine/v2 v2.2.0/go.mod h1:34EYjjV15G6myuHOKaJkO7y5tJ1Arq/NfC9Weacr2mc=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
@@ -14,6 +14,8 @@ github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
github.com/logrusorgru/aurora v0.0.0-20200102142835-e9ef32dff381 h1:bqDmpDG49ZRnB5PcgP0RXtQvnMSgIF14M7CBd2shtXs=
github.com/logrusorgru/aurora v0.0.0-20200102142835-e9ef32dff381/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4=
github.com/mask-pp/rtp-ps v1.0.0 h1:JFxuJL9N+gD1ldgJlAy3b7rYfY8wAVHi9ODNmdP4+EE=
github.com/mask-pp/rtp-ps v1.0.0/go.mod h1:jCxsZ2G7z/jX+aqFypEWMePnhNrfnUiXUEKm6Xp0vgU=
github.com/mattn/go-colorable v0.1.6 h1:6Su7aK7lXmJ/U79bYtBjLNaha4Fs1Rg9plHpcH+vvnE=
github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
+62 -16
View File
@@ -2,6 +2,8 @@ package plugin_rtp
import (
. "github.com/Monibuca/engine/v2"
"github.com/Monibuca/engine/v2/avformat"
"github.com/Monibuca/engine/v2/util"
"github.com/pion/rtp"
)
@@ -36,26 +38,70 @@ func (rt RTPType) String() string {
type RTP struct {
NALU
}
type RTP_PS struct {
RTP
rtp.Packet
psPacket []byte
parser DecPSPackage
}
func (rtp *RTP_PS) PushPS (ps []byte) {
if err := rtp.Unmarshal(ps); err != nil {
Println(err)
}
if len(rtp.Payload) >= 4 && util.BigEndian.Uint32(rtp.Payload) == StartCodePS {
if rtp.psPacket != nil{
if err := rtp.parser.Read(rtp.psPacket); err == nil {
for _, payload := range avformat.SplitH264(rtp.parser.VideoPayload) {
rtp.WriteNALU(rtp.Timestamp, payload)
}
if rtp.parser.AudioPayload != nil{
//TODO: 需要增加一个字节的头
//rtpPublisher.PushAudio(psRtp.Timestamp, parser.AudioPayload)
}
} else {
Print(err)
}
rtp.psPacket = nil
}
rtp.psPacket = append(rtp.psPacket, rtp.Payload...)
} else if rtp.psPacket != nil {
rtp.psPacket = append(rtp.psPacket, rtp.Payload...)
}
}
func (rtp *RTP) PushPack(pack *RTPPack) {
switch pack.Type {
case RTP_TYPE_AUDIO:
payload := pack.Payload
auHeaderLen := (int16(payload[0]) << 8) + int16(payload[1])
auHeaderLen = auHeaderLen >> 3
auHeaderCount := int(auHeaderLen / 2)
var auLenArray []int
for iIndex := 0; iIndex < int(auHeaderCount); iIndex++ {
auHeaderInfo := (int16(payload[2+2*iIndex]) << 8) + int16(payload[2+2*iIndex+1])
auLen := auHeaderInfo >> 3
auLenArray = append(auLenArray, int(auLen))
}
startOffset := 2 + 2*auHeaderCount
for _, auLen := range auLenArray {
endOffset := startOffset + auLen
addHead := []byte{0xAF, 0x01}
rtp.PushAudio(0, append(addHead, payload[startOffset:endOffset]...))
startOffset = startOffset + auLen
switch rtp.AudioInfo.SoundFormat {
case 10:
auHeaderLen := (int16(payload[0]) << 8) + int16(payload[1])
auHeaderLen = auHeaderLen >> 3
auHeaderCount := int(auHeaderLen / 2)
var auLenArray []int
for iIndex := 0; iIndex < int(auHeaderCount); iIndex++ {
auHeaderInfo := (int16(payload[2+2*iIndex]) << 8) + int16(payload[2+2*iIndex+1])
auLen := auHeaderInfo >> 3
auLenArray = append(auLenArray, int(auLen))
}
startOffset := 2 + 2*auHeaderCount
for _, auLen := range auLenArray {
endOffset := startOffset + auLen
addHead := []byte{0xAF, 0x01}
rtp.PushAudio(pack.Timestamp, append(addHead, payload[startOffset:endOffset]...))
startOffset = startOffset + auLen
}
case 7,8:
asc := rtp.AudioInfo.SoundFormat<<4
switch {
case rtp.AudioInfo.SoundRate>=44000:
asc = asc + (3<<2)
case rtp.AudioInfo.SoundRate>=22000:
asc = asc + (2<<2)
case rtp.AudioInfo.SoundRate>=11000:
asc = asc + (1<<2)
}
asc = asc+ 1<<1
rtp.PushAudio(pack.Timestamp,append([]byte{asc},payload...))
}
case RTP_TYPE_VIDEO:
rtp.WriteNALU(pack.Timestamp, pack.Payload)
+206
View File
@@ -0,0 +1,206 @@
package plugin_rtp
import (
"github.com/mask-pp/rtp-ps/buffer"
)
/*
https://github.com/videolan/vlc/blob/master/modules/demux/mpeg
*/
type DecPSPackage struct {
systemClockReferenceBase uint64
systemClockReferenceExtension uint64
programMuxRate uint32
VideoStreamType uint32
AudioStreamType uint32
buffer.RawBuffer
VideoPayload []byte
AudioPayload []byte
}
// data包含 接受到完整一帧数据后,所有的payload, 解析出去后是一阵完整的raw数据
func (dec *DecPSPackage) Read(data []byte) error{
return dec.decPackHeader(append(data, 0x00, 0x00, 0x01, 0xb9))
}
func (dec *DecPSPackage) clean() {
dec.systemClockReferenceBase = 0
dec.systemClockReferenceExtension = 0
dec.programMuxRate = 0
dec.VideoPayload = nil
dec.AudioPayload = nil
}
func (dec *DecPSPackage) decPackHeader(data []byte) error {
dec.clean()
// 加载数据
dec.LoadBuffer(data)
if startcode, err := dec.Uint32(); err != nil {
return err
} else if startcode != StartCodePS {
return ErrNotFoundStartCode
}
if err := dec.Skip(9); err != nil {
return err
}
psl, err := dec.Uint8()
if err != nil {
return err
}
psl &= 0x07
if err = dec.Skip(int(psl)); err != nil {
return err
}
for {
nextStartCode, err := dec.Uint32()
if err != nil {
return err
}
switch nextStartCode {
case StartCodeSYS:
if err := dec.decSystemHeader(); err != nil {
return err
}
case StartCodeMAP:
if err := dec.decProgramStreamMap(); err != nil {
return err
}
case StartCodeVideo,StartCodeAudio:
if err := dec.decPESPacket(nextStartCode); err != nil {
return err
}
case HaiKangCode, MEPGProgramEndCode:
return nil
}
}
}
func (dec *DecPSPackage) decSystemHeader() error {
syslens, err := dec.Uint16()
if err != nil {
return err
}
// drop rate video audio bound and lock flag
syslens -= 6
if err = dec.Skip(6); err != nil {
return err
}
// ONE WAY: do not to parse the stream and skip the buffer
//br.Skip(syslen * 8)
// TWO WAY: parse every stream info
for syslens > 0 {
if nextbits, err := dec.Uint8(); err != nil {
return err
} else if (nextbits&0x80)>>7 != 1 {
break
}
if err = dec.Skip(2); err != nil {
return err
}
syslens -= 3
}
return nil
}
func (dec *DecPSPackage) decProgramStreamMap() error {
psm, err := dec.Uint16()
if err != nil {
return err
}
//drop psm version infor
if err = dec.Skip(2); err != nil {
return err
}
psm -= 2
programStreamInfoLen, err := dec.Uint16()
if err != nil {
return err
}
if err = dec.Skip(int(programStreamInfoLen)); err != nil {
return err
}
psm -= programStreamInfoLen + 2
programStreamMapLen, err := dec.Uint16()
if err != nil {
return err
}
psm -= 2 + programStreamMapLen
for programStreamMapLen > 0 {
streamType, err := dec.Uint8()
if err != nil {
return err
}
elementaryStreamID, err := dec.Uint8()
if err != nil {
return err
}
if elementaryStreamID >= 0xe0 && elementaryStreamID <= 0xef {
dec.VideoStreamType = uint32(streamType)
} else if elementaryStreamID >= 0xc0 && elementaryStreamID <= 0xdf {
dec.AudioStreamType = uint32(streamType)
}
elementaryStreamInfoLength, err := dec.Uint16()
if err != nil {
return err
}
if err = dec.Skip(int(elementaryStreamInfoLength)); err != nil {
return err
}
programStreamMapLen -= 4 + elementaryStreamInfoLength
}
// crc 32
if psm != 4 {
return ErrFormatPack
}
if err = dec.Skip(4); err != nil {
return err
}
return nil
}
func (dec *DecPSPackage) decPESPacket(t uint32) error {
payloadlen, err := dec.Uint16()
if err != nil {
return err
}
if err = dec.Skip(2); err != nil {
return err
}
payloadlen -= 2
pesHeaderDataLen, err := dec.Uint8()
if err != nil {
return err
}
payloadlen -= uint16(pesHeaderDataLen) + 1
if err = dec.Skip(int(pesHeaderDataLen)); err != nil {
return err
}
if payload, err := dec.Bytes(int(payloadlen)); err != nil {
return err
} else {
if StartCodeVideo == t {
dec.VideoPayload = append(dec.VideoPayload, payload...)
}else{
dec.AudioPayload = append(dec.AudioPayload, payload...)
}
}
return nil
}