mirror of
https://github.com/Monibuca/engine.git
synced 2026-04-23 00:07:06 +08:00
feat: remove ps to ps plugin,add idletimeout
decs: 将ps处理逻辑移入ps插件重,增加idletimeout配置,用于首次发布空闲超时断开连接,增加对订阅者消费过慢的处理,增加dataTrack中元素的回收
This commit is contained in:
@@ -1,112 +0,0 @@
|
||||
package mpegps
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"io"
|
||||
)
|
||||
|
||||
type IOBuffer struct {
|
||||
buf []byte // contents are the bytes buf[off : len(buf)]
|
||||
off int // read at &buf[off], write at &buf[len(buf)]
|
||||
}
|
||||
|
||||
func (b *IOBuffer) Next(n int) []byte {
|
||||
m := b.Len()
|
||||
if n > m {
|
||||
n = m
|
||||
}
|
||||
data := b.buf[b.off : b.off+n]
|
||||
b.off += n
|
||||
return data
|
||||
}
|
||||
func (b *IOBuffer) Uint16() (uint16, error) {
|
||||
if b.Len() > 1 {
|
||||
|
||||
return binary.BigEndian.Uint16(b.Next(2)), nil
|
||||
}
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
||||
func (b *IOBuffer) Skip(n int) (err error) {
|
||||
_, err = b.ReadN(n)
|
||||
return
|
||||
}
|
||||
|
||||
func (b *IOBuffer) Uint32() (uint32, error) {
|
||||
if b.Len() > 3 {
|
||||
return binary.BigEndian.Uint32(b.Next(4)), nil
|
||||
}
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
||||
func (b *IOBuffer) ReadN(length int) ([]byte, error) {
|
||||
if b.Len() >= length {
|
||||
return b.Next(length), nil
|
||||
}
|
||||
return nil, io.EOF
|
||||
}
|
||||
|
||||
//func (b *IOBuffer) Read(buf []byte) (n int, err error) {
|
||||
// var ret []byte
|
||||
// ret, err = b.ReadN(len(buf))
|
||||
// copy(buf, ret)
|
||||
// return len(ret), err
|
||||
//}
|
||||
|
||||
// empty reports whether the unread portion of the buffer is empty.
|
||||
func (b *IOBuffer) empty() bool { return b.Len() <= b.off }
|
||||
|
||||
func (b *IOBuffer) ReadByte() (byte, error) {
|
||||
if b.empty() {
|
||||
// Buffer is empty, reset to recover space.
|
||||
b.Reset()
|
||||
return 0, io.EOF
|
||||
}
|
||||
c := b.buf[b.off]
|
||||
b.off++
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (b *IOBuffer) Reset() {
|
||||
b.buf = b.buf[:0]
|
||||
b.off = 0
|
||||
}
|
||||
|
||||
func (b *IOBuffer) Len() int { return len(b.buf) - b.off }
|
||||
|
||||
// tryGrowByReslice is a inlineable version of grow for the fast-case where the
|
||||
// internal buffer only needs to be resliced.
|
||||
// It returns the index where bytes should be written and whether it succeeded.
|
||||
func (b *IOBuffer) tryGrowByReslice(n int) (int, bool) {
|
||||
if l := len(b.buf); n <= cap(b.buf)-l {
|
||||
b.buf = b.buf[:l+n]
|
||||
return l, true
|
||||
}
|
||||
return 0, false
|
||||
}
|
||||
|
||||
var ErrTooLarge = errors.New("IOBuffer: too large")
|
||||
|
||||
func (b *IOBuffer) Write(p []byte) (n int, err error) {
|
||||
l := copy(b.buf, b.buf[b.off:])
|
||||
b.buf = append(b.buf[:l], p...)
|
||||
b.off = 0
|
||||
// println(b.buf, b.off, b.buf[b.off], b.buf[b.off+1], b.buf[b.off+2], b.buf[b.off+3])
|
||||
return len(p), nil
|
||||
// defer func() {
|
||||
// if recover() != nil {
|
||||
// panic(ErrTooLarge)
|
||||
// }
|
||||
// }()
|
||||
// l := len(p)
|
||||
// oldLen := len(b.buf)
|
||||
// m, ok := b.tryGrowByReslice(l)
|
||||
// if !ok {
|
||||
// m = oldLen - b.off
|
||||
// buf := append(append(([]byte)(nil), b.buf[b.off:]...), p...)
|
||||
// b.off = 0
|
||||
// b.buf = buf
|
||||
// }
|
||||
// return copy(b.buf[m:], p), nil
|
||||
}
|
||||
@@ -1,250 +0,0 @@
|
||||
package mpegps
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"m7s.live/engine/v4/util"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrNotFoundStartCode = errors.New("not found the need start code flag")
|
||||
ErrMarkerBit = errors.New("marker bit value error")
|
||||
ErrFormatPack = errors.New("not package standard")
|
||||
ErrParsePakcet = errors.New("parse ps packet error")
|
||||
)
|
||||
|
||||
/*
|
||||
This implement from VLC source code
|
||||
notes: https://github.com/videolan/vlc/blob/master/modules/mux/mpeg/bits.h
|
||||
*/
|
||||
|
||||
/*
|
||||
https://github.com/videolan/vlc/blob/master/modules/demux/mpeg
|
||||
*/
|
||||
type DecPSPackage struct {
|
||||
systemClockReferenceBase uint64
|
||||
systemClockReferenceExtension uint64
|
||||
programMuxRate uint32
|
||||
IOBuffer
|
||||
Payload []byte
|
||||
PTS uint32
|
||||
DTS uint32
|
||||
EsHandler
|
||||
audio MpegPsEsStream
|
||||
video MpegPsEsStream
|
||||
}
|
||||
|
||||
func (dec *DecPSPackage) clean() {
|
||||
dec.systemClockReferenceBase = 0
|
||||
dec.systemClockReferenceExtension = 0
|
||||
dec.programMuxRate = 0
|
||||
dec.Payload = nil
|
||||
dec.PTS = 0
|
||||
dec.DTS = 0
|
||||
}
|
||||
|
||||
func (dec *DecPSPackage) ReadPayload() (payload []byte, err error) {
|
||||
payloadlen, err := dec.Uint16()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return dec.ReadN(int(payloadlen))
|
||||
}
|
||||
func (dec *DecPSPackage) Feed(ps []byte) {
|
||||
if len(ps) >= 4 && util.BigEndian.Uint32(ps) == StartCodePS {
|
||||
if dec.Len() > 0 {
|
||||
dec.Skip(4)
|
||||
dec.Read(0)
|
||||
dec.Reset()
|
||||
}
|
||||
dec.Write(ps)
|
||||
} else if dec.Len() > 0 {
|
||||
dec.Write(ps)
|
||||
}
|
||||
}
|
||||
|
||||
// read the buffer and push video or audio
|
||||
func (dec *DecPSPackage) Read(ts uint32) error {
|
||||
again:
|
||||
dec.clean()
|
||||
if err := dec.Skip(9); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
psl, err := dec.ReadByte()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
psl &= 0x07
|
||||
if err = dec.Skip(int(psl)); err != nil {
|
||||
return err
|
||||
}
|
||||
var video []byte
|
||||
var nextStartCode, videoTs, videoCts uint32
|
||||
loop:
|
||||
for err == nil {
|
||||
if nextStartCode, err = dec.Uint32(); err != nil {
|
||||
break
|
||||
}
|
||||
switch nextStartCode {
|
||||
case StartCodeSYS:
|
||||
dec.ReadPayload()
|
||||
//err = dec.decSystemHeader()
|
||||
case StartCodeMAP:
|
||||
err = dec.decProgramStreamMap()
|
||||
case StartCodeVideo:
|
||||
// var cts uint32
|
||||
if err = dec.decPESPacket(); err == nil {
|
||||
if len(video) == 0 {
|
||||
dec.video.PTS = dec.PTS
|
||||
dec.video.DTS = dec.DTS
|
||||
// if dec.PTS == 0 {
|
||||
// dec.PTS = ts
|
||||
// }
|
||||
// if dec.DTS != 0 {
|
||||
// cts = dec.PTS - dec.DTS
|
||||
// } else {
|
||||
// dec.DTS = dec.PTS
|
||||
// }
|
||||
// videoTs = dec.DTS / 90
|
||||
// videoCts = cts / 90
|
||||
}
|
||||
video = append(video, dec.Payload...)
|
||||
} else {
|
||||
// utils.Println("video", err)
|
||||
}
|
||||
case StartCodeAudio:
|
||||
if err = dec.decPESPacket(); err == nil {
|
||||
// ts := ts / 90
|
||||
// if dec.PTS != 0 {
|
||||
// ts = dec.PTS / 90
|
||||
// }
|
||||
dec.audio.PTS = dec.PTS
|
||||
dec.audio.Buffer = dec.Payload
|
||||
dec.ReceiveAudio(dec.audio)
|
||||
// pusher.PushAudio(ts, dec.Payload)
|
||||
} else {
|
||||
// utils.Println("audio", err)
|
||||
}
|
||||
case StartCodePS:
|
||||
break loop
|
||||
default:
|
||||
dec.ReadPayload()
|
||||
}
|
||||
}
|
||||
if len(video) > 0 {
|
||||
dec.video.Buffer = video
|
||||
dec.ReceiveVideo(dec.video)
|
||||
if false {
|
||||
println("video", videoTs, videoCts, len(video))
|
||||
}
|
||||
// pusher.PushVideo(videoTs, videoCts, video)
|
||||
}
|
||||
if nextStartCode == StartCodePS {
|
||||
// utils.Println(aurora.Red("StartCodePS recursion..."), err)
|
||||
goto again
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
/*
|
||||
func (dec *DecPSPackage) decSystemHeader() error {
|
||||
syslens, err := dec.Uint16()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// drop rate video audio bound and lock flag
|
||||
syslens -= 6
|
||||
if err = dec.Skip(6); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// ONE WAY: do not to parse the stream and skip the buffer
|
||||
//br.Skip(syslen * 8)
|
||||
|
||||
// TWO WAY: parse every stream info
|
||||
for syslens > 0 {
|
||||
if nextbits, err := dec.Uint8(); err != nil {
|
||||
return err
|
||||
} else if (nextbits&0x80)>>7 != 1 {
|
||||
break
|
||||
}
|
||||
if err = dec.Skip(2); err != nil {
|
||||
return err
|
||||
}
|
||||
syslens -= 3
|
||||
}
|
||||
return nil
|
||||
}
|
||||
*/
|
||||
func (dec *DecPSPackage) decProgramStreamMap() error {
|
||||
psm, err := dec.ReadPayload()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
l := len(psm)
|
||||
index := 2
|
||||
programStreamInfoLen := util.BigEndian.Uint16(psm[index:])
|
||||
index += 2
|
||||
index += int(programStreamInfoLen)
|
||||
programStreamMapLen := util.BigEndian.Uint16(psm[index:])
|
||||
index += 2
|
||||
for programStreamMapLen > 0 {
|
||||
if l <= index+1 {
|
||||
break
|
||||
}
|
||||
streamType := psm[index]
|
||||
index++
|
||||
elementaryStreamID := psm[index]
|
||||
index++
|
||||
if elementaryStreamID >= 0xe0 && elementaryStreamID <= 0xef {
|
||||
dec.video.Type = streamType
|
||||
} else if elementaryStreamID >= 0xc0 && elementaryStreamID <= 0xdf {
|
||||
dec.audio.Type = streamType
|
||||
}
|
||||
if l <= index+1 {
|
||||
break
|
||||
}
|
||||
elementaryStreamInfoLength := util.BigEndian.Uint16(psm[index:])
|
||||
index += 2
|
||||
index += int(elementaryStreamInfoLength)
|
||||
programStreamMapLen -= 4 + elementaryStreamInfoLength
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dec *DecPSPackage) decPESPacket() error {
|
||||
payload, err := dec.ReadPayload()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(payload) < 4 {
|
||||
return errors.New("not enough data")
|
||||
}
|
||||
//data_alignment_indicator := (payload[0]&0b0001_0000)>>4 == 1
|
||||
flag := payload[1]
|
||||
ptsFlag := flag>>7 == 1
|
||||
dtsFlag := (flag&0b0100_0000)>>6 == 1
|
||||
var pts, dts uint32
|
||||
pesHeaderDataLen := payload[2]
|
||||
payload = payload[3:]
|
||||
extraData := payload[:pesHeaderDataLen]
|
||||
if ptsFlag && len(extraData) > 4 {
|
||||
pts = uint32(extraData[0]&0b0000_1110) << 29
|
||||
pts += uint32(extraData[1]) << 22
|
||||
pts += uint32(extraData[2]&0b1111_1110) << 14
|
||||
pts += uint32(extraData[3]) << 7
|
||||
pts += uint32(extraData[4]) >> 1
|
||||
if dtsFlag && len(extraData) > 9 {
|
||||
dts = uint32(extraData[5]&0b0000_1110) << 29
|
||||
dts += uint32(extraData[6]) << 22
|
||||
dts += uint32(extraData[7]&0b1111_1110) << 14
|
||||
dts += uint32(extraData[8]) << 7
|
||||
dts += uint32(extraData[9]) >> 1
|
||||
}
|
||||
}
|
||||
dec.PTS = pts
|
||||
dec.DTS = dts
|
||||
dec.Payload = payload[pesHeaderDataLen:]
|
||||
return err
|
||||
}
|
||||
@@ -1,159 +0,0 @@
|
||||
package mpegps
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"io"
|
||||
|
||||
"m7s.live/engine/v4/util"
|
||||
)
|
||||
|
||||
const (
|
||||
StartCodePS = 0x000001ba
|
||||
StartCodeSYS = 0x000001bb
|
||||
StartCodeMAP = 0x000001bc
|
||||
StartCodeVideo = 0x000001e0
|
||||
StartCodeAudio = 0x000001c0
|
||||
PrivateStreamCode = 0x000001bd
|
||||
MEPGProgramEndCode = 0x000001b9
|
||||
)
|
||||
|
||||
type EsHandler interface {
|
||||
ReceiveAudio(MpegPsEsStream)
|
||||
ReceiveVideo(MpegPsEsStream)
|
||||
}
|
||||
|
||||
type MpegPsEsStream struct {
|
||||
Type byte
|
||||
util.Buffer
|
||||
PTS uint32
|
||||
DTS uint32
|
||||
}
|
||||
|
||||
type MpegPsStream struct {
|
||||
buffer util.Buffer
|
||||
EsHandler
|
||||
audio MpegPsEsStream
|
||||
video MpegPsEsStream
|
||||
}
|
||||
|
||||
func (ps *MpegPsStream) Reset() {
|
||||
ps.buffer.Reset()
|
||||
ps.audio.Reset()
|
||||
if ps.video.Buffer.CanRead() {
|
||||
ps.ReceiveVideo(ps.video)
|
||||
ps.video.Buffer = make(util.Buffer, 0)
|
||||
} else {
|
||||
ps.video.Reset()
|
||||
}
|
||||
}
|
||||
|
||||
func (ps *MpegPsStream) Feed(data util.Buffer) (err error) {
|
||||
reader := &data
|
||||
if ps.buffer.CanRead() {
|
||||
ps.buffer.Write(data)
|
||||
reader = &ps.buffer
|
||||
}
|
||||
var begin util.Buffer
|
||||
var payload []byte
|
||||
defer func() {
|
||||
if err != nil && begin.CanRead() {
|
||||
ps.buffer.Reset()
|
||||
ps.buffer.Write(begin)
|
||||
}
|
||||
}()
|
||||
for err == nil && reader.CanReadN(4) {
|
||||
begin = *reader
|
||||
code := reader.ReadUint32()
|
||||
switch code {
|
||||
case StartCodePS:
|
||||
if ps.audio.Buffer.CanRead() {
|
||||
ps.ReceiveAudio(ps.audio)
|
||||
ps.audio.Buffer = make(util.Buffer, 0)
|
||||
}
|
||||
if ps.video.Buffer.CanRead() {
|
||||
ps.ReceiveVideo(ps.video)
|
||||
ps.video.Buffer = make(util.Buffer, 0)
|
||||
}
|
||||
if reader.CanReadN(9) {
|
||||
reader.ReadN(9)
|
||||
if reader.CanRead() {
|
||||
psl := reader.ReadByte() & 0x07
|
||||
if reader.CanReadN(int(psl)) {
|
||||
reader.ReadN(int(psl))
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
err = io.ErrShortBuffer
|
||||
case StartCodeSYS, PrivateStreamCode:
|
||||
_, err = ps.ReadPayload(reader)
|
||||
case StartCodeMAP:
|
||||
err = ps.decProgramStreamMap(reader)
|
||||
case StartCodeVideo:
|
||||
payload, err = ps.ReadPayload(reader)
|
||||
if err == nil {
|
||||
err = ps.video.parsePESPacket(payload)
|
||||
}
|
||||
case StartCodeAudio:
|
||||
payload, err = ps.ReadPayload(reader)
|
||||
if err == nil {
|
||||
err = ps.audio.parsePESPacket(payload)
|
||||
}
|
||||
case MEPGProgramEndCode:
|
||||
return
|
||||
default:
|
||||
err = errors.New("start code error")
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (ps *MpegPsStream) ReadPayload(data *util.Buffer) (payload []byte, err error) {
|
||||
if !data.CanReadN(2) {
|
||||
return nil, io.ErrShortBuffer
|
||||
}
|
||||
payloadlen := data.ReadUint16()
|
||||
if data.CanReadN(int(payloadlen)) {
|
||||
payload = data.ReadN(int(payloadlen))
|
||||
} else {
|
||||
err = io.ErrShortBuffer
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (ps *MpegPsStream) decProgramStreamMap(data *util.Buffer) error {
|
||||
psm, err := ps.ReadPayload(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
l := len(psm)
|
||||
index := 2
|
||||
programStreamInfoLen := binary.BigEndian.Uint16(psm[index:])
|
||||
index += 2
|
||||
index += int(programStreamInfoLen)
|
||||
programStreamMapLen := binary.BigEndian.Uint16(psm[index:])
|
||||
index += 2
|
||||
for programStreamMapLen > 0 {
|
||||
if l <= index+1 {
|
||||
break
|
||||
}
|
||||
streamType := psm[index]
|
||||
index++
|
||||
elementaryStreamID := psm[index]
|
||||
index++
|
||||
if elementaryStreamID >= 0xe0 && elementaryStreamID <= 0xef {
|
||||
ps.video.Type = streamType
|
||||
} else if elementaryStreamID >= 0xc0 && elementaryStreamID <= 0xdf {
|
||||
ps.audio.Type = streamType
|
||||
}
|
||||
if l <= index+1 {
|
||||
break
|
||||
}
|
||||
elementaryStreamInfoLength := binary.BigEndian.Uint16(psm[index:])
|
||||
index += 2
|
||||
index += int(elementaryStreamInfoLength)
|
||||
programStreamMapLen -= 4 + elementaryStreamInfoLength
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -1,35 +0,0 @@
|
||||
package mpegps
|
||||
|
||||
import "io"
|
||||
|
||||
func (es *MpegPsEsStream) parsePESPacket(payload []byte) (err error) {
|
||||
if len(payload) < 4 {
|
||||
return io.ErrShortBuffer
|
||||
}
|
||||
//data_alignment_indicator := (payload[0]&0b0001_0000)>>4 == 1
|
||||
flag := payload[1]
|
||||
ptsFlag := flag>>7 == 1
|
||||
dtsFlag := (flag&0b0100_0000)>>6 == 1
|
||||
pesHeaderDataLen := payload[2]
|
||||
if len(payload) < int(pesHeaderDataLen) {
|
||||
return io.ErrShortBuffer
|
||||
}
|
||||
payload = payload[3:]
|
||||
extraData := payload[:pesHeaderDataLen]
|
||||
if ptsFlag && len(extraData) > 4 {
|
||||
es.PTS = uint32(extraData[0]&0b0000_1110) << 29
|
||||
es.PTS += uint32(extraData[1]) << 22
|
||||
es.PTS += uint32(extraData[2]&0b1111_1110) << 14
|
||||
es.PTS += uint32(extraData[3]) << 7
|
||||
es.PTS += uint32(extraData[4]) >> 1
|
||||
if dtsFlag && len(extraData) > 9 {
|
||||
es.DTS = uint32(extraData[5]&0b0000_1110) << 29
|
||||
es.DTS += uint32(extraData[6]) << 22
|
||||
es.DTS += uint32(extraData[7]&0b1111_1110) << 14
|
||||
es.DTS += uint32(extraData[8]) << 7
|
||||
es.DTS += uint32(extraData[9]) >> 1
|
||||
}
|
||||
}
|
||||
es.Write(payload[pesHeaderDataLen:])
|
||||
return
|
||||
}
|
||||
@@ -1,283 +0,0 @@
|
||||
package mpegps
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/yapingcat/gomedia/go-codec"
|
||||
"github.com/yapingcat/gomedia/go-mpeg2"
|
||||
)
|
||||
|
||||
type psstream struct {
|
||||
sid uint8
|
||||
cid mpeg2.PS_STREAM_TYPE
|
||||
pts uint64
|
||||
dts uint64
|
||||
streamBuf []byte
|
||||
}
|
||||
|
||||
func newpsstream(sid uint8, cid mpeg2.PS_STREAM_TYPE) *psstream {
|
||||
return &psstream{
|
||||
sid: sid,
|
||||
cid: cid,
|
||||
streamBuf: make([]byte, 0, 4096),
|
||||
}
|
||||
}
|
||||
|
||||
type PSDemuxer struct {
|
||||
streamMap map[uint8]*psstream
|
||||
pkg *mpeg2.PSPacket
|
||||
mpeg1 bool
|
||||
cache []byte
|
||||
OnFrame func(frame []byte, cid mpeg2.PS_STREAM_TYPE, pts uint64, dts uint64)
|
||||
//解ps包过程中,解码回调psm,system header,pes包等
|
||||
//decodeResult 解码ps包时的产生的错误
|
||||
//这个回调主要用于debug,查看是否ps包存在问题
|
||||
OnPacket func(pkg mpeg2.Display, decodeResult error)
|
||||
}
|
||||
|
||||
func NewPSDemuxer() *PSDemuxer {
|
||||
return &PSDemuxer{
|
||||
streamMap: make(map[uint8]*psstream),
|
||||
pkg: new(mpeg2.PSPacket),
|
||||
cache: make([]byte, 0, 256),
|
||||
OnFrame: nil,
|
||||
OnPacket: nil,
|
||||
}
|
||||
}
|
||||
|
||||
func (psdemuxer *PSDemuxer) Feed(data []byte) error {
|
||||
var bs *codec.BitStream
|
||||
if len(psdemuxer.cache) > 0 {
|
||||
psdemuxer.cache = append(psdemuxer.cache, data...)
|
||||
bs = codec.NewBitStream(psdemuxer.cache)
|
||||
} else {
|
||||
bs = codec.NewBitStream(data)
|
||||
}
|
||||
|
||||
saveReseved := func() {
|
||||
tmpcache := make([]byte, bs.RemainBytes())
|
||||
copy(tmpcache, bs.RemainData())
|
||||
psdemuxer.cache = tmpcache
|
||||
}
|
||||
|
||||
var ret error = nil
|
||||
for !bs.EOS() {
|
||||
if mpegerr, ok := ret.(mpeg2.Error); ok {
|
||||
if mpegerr.NeedMore() {
|
||||
saveReseved()
|
||||
}
|
||||
break
|
||||
}
|
||||
if bs.RemainBits() < 32 {
|
||||
ret = io.ErrShortBuffer
|
||||
saveReseved()
|
||||
break
|
||||
}
|
||||
prefix_code := bs.NextBits(32)
|
||||
switch prefix_code {
|
||||
case 0x000001BA: //pack header
|
||||
if psdemuxer.pkg.Header == nil {
|
||||
psdemuxer.pkg.Header = new(mpeg2.PSPackHeader)
|
||||
}
|
||||
ret = psdemuxer.pkg.Header.Decode(bs)
|
||||
psdemuxer.mpeg1 = psdemuxer.pkg.Header.IsMpeg1
|
||||
if psdemuxer.OnPacket != nil {
|
||||
psdemuxer.OnPacket(psdemuxer.pkg.Header, ret)
|
||||
}
|
||||
case 0x000001BB: //system header
|
||||
if psdemuxer.pkg.Header == nil {
|
||||
panic("psdemuxer.pkg.Header must not be nil")
|
||||
}
|
||||
if psdemuxer.pkg.System == nil {
|
||||
psdemuxer.pkg.System = new(mpeg2.System_header)
|
||||
}
|
||||
ret = psdemuxer.pkg.System.Decode(bs)
|
||||
if psdemuxer.OnPacket != nil {
|
||||
psdemuxer.OnPacket(psdemuxer.pkg.System, ret)
|
||||
}
|
||||
case 0x000001BC: //program stream map
|
||||
if psdemuxer.pkg.Psm == nil {
|
||||
psdemuxer.pkg.Psm = new(mpeg2.Program_stream_map)
|
||||
}
|
||||
if ret = psdemuxer.pkg.Psm.Decode(bs); ret == nil {
|
||||
for _, streaminfo := range psdemuxer.pkg.Psm.Stream_map {
|
||||
if _, found := psdemuxer.streamMap[streaminfo.Elementary_stream_id]; !found {
|
||||
stream := newpsstream(streaminfo.Elementary_stream_id, mpeg2.PS_STREAM_TYPE(streaminfo.Stream_type))
|
||||
psdemuxer.streamMap[stream.sid] = stream
|
||||
}
|
||||
}
|
||||
}
|
||||
if psdemuxer.OnPacket != nil {
|
||||
psdemuxer.OnPacket(psdemuxer.pkg.Psm, ret)
|
||||
}
|
||||
case 0x000001BD, 0x000001BE, 0x000001BF, 0x000001F0, 0x000001F1,
|
||||
0x000001F2, 0x000001F3, 0x000001F4, 0x000001F5, 0x000001F6,
|
||||
0x000001F7, 0x000001F8, 0x000001F9, 0x000001FA, 0x000001FB:
|
||||
if psdemuxer.pkg.CommPes == nil {
|
||||
psdemuxer.pkg.CommPes = new(mpeg2.CommonPesPacket)
|
||||
}
|
||||
ret = psdemuxer.pkg.CommPes.Decode(bs)
|
||||
case 0x000001FF: //program stream directory
|
||||
if psdemuxer.pkg.Psd == nil {
|
||||
psdemuxer.pkg.Psd = new(mpeg2.Program_stream_directory)
|
||||
}
|
||||
ret = psdemuxer.pkg.Psd.Decode(bs)
|
||||
case 0x000001B9: //MPEG_program_end_code
|
||||
continue
|
||||
default:
|
||||
if prefix_code&0xFFFFFFE0 == 0x000001C0 || prefix_code&0xFFFFFFE0 == 0x000001E0 {
|
||||
if psdemuxer.pkg.Pes == nil {
|
||||
psdemuxer.pkg.Pes = mpeg2.NewPesPacket()
|
||||
}
|
||||
if psdemuxer.mpeg1 {
|
||||
ret = psdemuxer.pkg.Pes.DecodeMpeg1(bs)
|
||||
} else {
|
||||
ret = psdemuxer.pkg.Pes.Decode(bs)
|
||||
}
|
||||
if psdemuxer.OnPacket != nil {
|
||||
psdemuxer.OnPacket(psdemuxer.pkg.Pes, ret)
|
||||
}
|
||||
if ret == nil {
|
||||
if stream, found := psdemuxer.streamMap[psdemuxer.pkg.Pes.Stream_id]; found {
|
||||
if psdemuxer.mpeg1 && stream.cid == mpeg2.PS_STREAM_UNKNOW {
|
||||
psdemuxer.guessCodecid(stream)
|
||||
}
|
||||
psdemuxer.demuxPespacket(stream, psdemuxer.pkg.Pes)
|
||||
} else {
|
||||
if psdemuxer.mpeg1 {
|
||||
stream := newpsstream(psdemuxer.pkg.Pes.Stream_id, mpeg2.PS_STREAM_UNKNOW)
|
||||
psdemuxer.streamMap[stream.sid] = stream
|
||||
stream.streamBuf = append(stream.streamBuf, psdemuxer.pkg.Pes.Pes_payload...)
|
||||
stream.pts = psdemuxer.pkg.Pes.Pts
|
||||
stream.dts = psdemuxer.pkg.Pes.Dts
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
bs.SkipBits(8)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if ret == nil && len(psdemuxer.cache) > 0 {
|
||||
psdemuxer.cache = nil
|
||||
}
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
func (psdemuxer *PSDemuxer) Reset() {
|
||||
psdemuxer.cache = psdemuxer.cache[:0]
|
||||
for _, stream := range psdemuxer.streamMap {
|
||||
if len(stream.streamBuf) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func (psdemuxer *PSDemuxer) guessCodecid(stream *psstream) {
|
||||
if stream.sid&0xE0 == uint8(mpeg2.PES_STREAM_AUDIO) {
|
||||
stream.cid = mpeg2.PS_STREAM_AAC
|
||||
} else if stream.sid&0xE0 == uint8(mpeg2.PES_STREAM_VIDEO) {
|
||||
h264score := 0
|
||||
h265score := 0
|
||||
codec.SplitFrame(stream.streamBuf, func(nalu []byte) bool {
|
||||
h264nalutype := codec.H264NaluTypeWithoutStartCode(nalu)
|
||||
h265nalutype := codec.H265NaluTypeWithoutStartCode(nalu)
|
||||
if h264nalutype == codec.H264_NAL_PPS ||
|
||||
h264nalutype == codec.H264_NAL_SPS ||
|
||||
h264nalutype == codec.H264_NAL_I_SLICE {
|
||||
h264score += 2
|
||||
} else if h264nalutype < 5 {
|
||||
h264score += 1
|
||||
} else if h264nalutype > 20 {
|
||||
h264score -= 1
|
||||
}
|
||||
|
||||
if h265nalutype == codec.H265_NAL_PPS ||
|
||||
h265nalutype == codec.H265_NAL_SPS ||
|
||||
h265nalutype == codec.H265_NAL_VPS ||
|
||||
(h265nalutype >= codec.H265_NAL_SLICE_BLA_W_LP && h265nalutype <= codec.H265_NAL_SLICE_CRA) {
|
||||
h265score += 2
|
||||
} else if h265nalutype >= codec.H265_NAL_Slice_TRAIL_N && h265nalutype <= codec.H265_NAL_SLICE_RASL_R {
|
||||
h265score += 1
|
||||
} else if h265nalutype > 40 {
|
||||
h265score -= 1
|
||||
}
|
||||
if h264score > h265score && h264score >= 4 {
|
||||
stream.cid = mpeg2.PS_STREAM_H264
|
||||
} else if h264score < h265score && h265score >= 4 {
|
||||
stream.cid = mpeg2.PS_STREAM_H265
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (psdemuxer *PSDemuxer) demuxPespacket(stream *psstream, pes *mpeg2.PesPacket) error {
|
||||
switch stream.cid {
|
||||
case mpeg2.PS_STREAM_AAC, mpeg2.PS_STREAM_G711A, mpeg2.PS_STREAM_G711U:
|
||||
return psdemuxer.demuxAudio(stream, pes)
|
||||
case mpeg2.PS_STREAM_H264, mpeg2.PS_STREAM_H265:
|
||||
return psdemuxer.demuxAudio(stream, pes)
|
||||
case mpeg2.PS_STREAM_UNKNOW:
|
||||
if stream.pts != pes.Pts {
|
||||
stream.streamBuf = nil
|
||||
}
|
||||
stream.streamBuf = append(stream.streamBuf, pes.Pes_payload...)
|
||||
stream.pts = pes.Pts
|
||||
stream.dts = pes.Dts
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (psdemuxer *PSDemuxer) demuxAudio(stream *psstream, pes *mpeg2.PesPacket) error {
|
||||
if stream.pts != pes.Pts && len(stream.streamBuf) > 0 {
|
||||
if psdemuxer.OnFrame != nil {
|
||||
psdemuxer.OnFrame(stream.streamBuf, stream.cid, stream.pts, stream.dts)
|
||||
}
|
||||
stream.streamBuf = nil
|
||||
// stream.streamBuf = stream.streamBuf[:0]
|
||||
}
|
||||
stream.streamBuf = append(stream.streamBuf, pes.Pes_payload...)
|
||||
stream.pts = pes.Pts
|
||||
stream.dts = pes.Dts
|
||||
return nil
|
||||
}
|
||||
|
||||
// func (psdemuxer *PSDemuxer) demuxH26x(stream *psstream, pes *mpeg2.PesPacket) error {
|
||||
// if len(stream.streamBuf) == 0 {
|
||||
// stream.pts = pes.Pts
|
||||
// stream.dts = pes.Dts
|
||||
// }
|
||||
// stream.streamBuf = append(stream.streamBuf, pes.Pes_payload...)
|
||||
// start, sc := codec.FindStartCode(stream.streamBuf, 0)
|
||||
// for start >= 0 {
|
||||
// end, sc2 := codec.FindStartCode(stream.streamBuf, start+int(sc))
|
||||
// if end < 0 {
|
||||
// break
|
||||
// }
|
||||
// if stream.cid == mpeg2.PS_STREAM_H264 {
|
||||
// naluType := codec.H264NaluType(stream.streamBuf[start:])
|
||||
// if naluType != codec.H264_NAL_AUD {
|
||||
// if psdemuxer.OnFrame != nil {
|
||||
// psdemuxer.OnFrame(stream.streamBuf[start:end], stream.cid, stream.pts/90, stream.dts/90)
|
||||
// }
|
||||
// }
|
||||
// } else if stream.cid == mpeg2.PS_STREAM_H265 {
|
||||
// naluType := codec.H265NaluType(stream.streamBuf[start:])
|
||||
// if naluType != codec.H265_NAL_AUD {
|
||||
// if psdemuxer.OnFrame != nil {
|
||||
// psdemuxer.OnFrame(stream.streamBuf[start:end], stream.cid, stream.pts/90, stream.dts/90)
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// start = end
|
||||
// sc = sc2
|
||||
// }
|
||||
// stream.streamBuf = stream.streamBuf[start:]
|
||||
// stream.pts = pes.Pts
|
||||
// stream.dts = pes.Dts
|
||||
// return nil
|
||||
// }
|
||||
+4
-3
@@ -32,9 +32,10 @@ type Publish struct {
|
||||
PubVideo bool `default:"true"`
|
||||
KickExist bool // 是否踢掉已经存在的发布者
|
||||
PublishTimeout time.Duration `default:"10s"` // 发布无数据超时
|
||||
WaitCloseTimeout time.Duration `default:"0s"` // 延迟自动关闭(等待重连)
|
||||
DelayCloseTimeout time.Duration `default:"0s"` // 延迟自动关闭(无订阅时)
|
||||
BufferTime time.Duration `default:"0s"` // 缓冲长度(单位:秒),0代表取最近关键帧
|
||||
WaitCloseTimeout time.Duration // 延迟自动关闭(等待重连)
|
||||
DelayCloseTimeout time.Duration // 延迟自动关闭(无订阅时)
|
||||
IdleTimeout time.Duration // 空闲(无订阅)超时
|
||||
BufferTime time.Duration // 缓冲长度(单位:秒),0代表取最近关键帧
|
||||
Key string // 发布鉴权key
|
||||
SecretArgName string `default:"secret"` // 发布鉴权参数名
|
||||
ExpireArgName string `default:"expire"` // 发布鉴权失效时间参数名
|
||||
|
||||
@@ -331,32 +331,4 @@ func (conf *GlobalConfig) API_replay_mp4(w http.ResponseWriter, r *http.Request)
|
||||
w.Write([]byte("ok"))
|
||||
go pub.ReadMP4Data(f)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *GlobalConfig) API_replay_ps(w http.ResponseWriter, r *http.Request) {
|
||||
dump := r.URL.Query().Get("dump")
|
||||
streamPath := r.URL.Query().Get("streamPath")
|
||||
if dump == "" {
|
||||
dump = "dump/ps"
|
||||
}
|
||||
f, err := os.OpenFile(dump, os.O_RDONLY, 0644)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
} else {
|
||||
if streamPath == "" {
|
||||
if strings.HasPrefix(dump, "/") {
|
||||
streamPath = "replay" + dump
|
||||
} else {
|
||||
streamPath = "replay/" + dump
|
||||
}
|
||||
}
|
||||
var pub PSPublisher
|
||||
pub.SetIO(f)
|
||||
if err = Engine.Publish(streamPath, &pub); err == nil {
|
||||
go pub.Replay(f)
|
||||
w.Write([]byte("ok"))
|
||||
} else {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -122,6 +122,7 @@ func (io *IO) Stop() {
|
||||
var (
|
||||
ErrBadStreamName = errors.New("Stream Already Exist")
|
||||
ErrBadTrackName = errors.New("Track Already Exist")
|
||||
ErrTrackMute = errors.New("Track Mute")
|
||||
ErrStreamIsClosed = errors.New("Stream Is Closed")
|
||||
ErrPublisherLost = errors.New("Publisher Lost")
|
||||
ErrAuth = errors.New("Auth Failed")
|
||||
@@ -197,6 +198,7 @@ func (io *IO) receive(streamPath string, specific IIO) error {
|
||||
}
|
||||
s.PublishTimeout = conf.PublishTimeout
|
||||
s.DelayCloseTimeout = conf.DelayCloseTimeout
|
||||
s.IdleTimeout = conf.IdleTimeout
|
||||
defer func() {
|
||||
if err == nil {
|
||||
if oldPublisher == nil {
|
||||
|
||||
@@ -41,13 +41,16 @@ var (
|
||||
plugins []*Plugin //插件列表
|
||||
EngineConfig = &GlobalConfig{}
|
||||
Engine = InstallPlugin(EngineConfig)
|
||||
settingDir = filepath.Join(ExecDir, ".m7s") //配置缓存目录,该目录按照插件名称作为文件名存储修改过的配置
|
||||
SettingDir = filepath.Join(ExecDir, ".m7s") //配置缓存目录,该目录按照插件名称作为文件名存储修改过的配置
|
||||
MergeConfigs = []string{"Publish", "Subscribe", "HTTP"} //需要合并配置的属性项,插件若没有配置则使用全局配置
|
||||
EventBus chan any
|
||||
apiList []string //注册到引擎的API接口列表
|
||||
)
|
||||
|
||||
func init() {
|
||||
if setting_dir := os.Getenv("M7S_SETTING_DIR"); setting_dir != "" {
|
||||
SettingDir = setting_dir
|
||||
}
|
||||
if conn, err := net.Dial("udp", "114.114.114.114:80"); err == nil {
|
||||
SysInfo.LocalIP, _, _ = strings.Cut(conn.LocalAddr().String(), ":")
|
||||
}
|
||||
@@ -68,7 +71,7 @@ func Run(ctx context.Context, configFile string) (err error) {
|
||||
if ConfigRaw, err = ioutil.ReadFile(configFile); err != nil {
|
||||
log.Warn("read config file error:", err.Error())
|
||||
}
|
||||
if err = os.MkdirAll(settingDir, 0766); err != nil {
|
||||
if err = os.MkdirAll(SettingDir, 0766); err != nil {
|
||||
log.Error("create dir .m7s error:", err)
|
||||
return
|
||||
}
|
||||
@@ -174,8 +177,7 @@ func Run(ctx context.Context, configFile string) (err error) {
|
||||
fmt.Println()
|
||||
fmt.Println(Bold(Cyan("官网地址: ")), Yellow("https://m7s.live"))
|
||||
fmt.Println(Bold(Cyan("启动工程: ")), Yellow("https://github.com/langhuihui/monibuca"))
|
||||
fmt.Println(Bold(Cyan("使用文档: ")), Yellow("https://m7s.live/guide/introduction.html"))
|
||||
fmt.Println(Bold(Cyan("开发文档: ")), Yellow("https://m7s.live/devel/startup.html"))
|
||||
fmt.Println(Bold(Cyan("文档地址: ")), Yellow("https://docs.m7s.live"))
|
||||
fmt.Println(Bold(Cyan("视频教程: ")), Yellow("https://space.bilibili.com/328443019/channel/collectiondetail?sid=514619"))
|
||||
fmt.Println(Bold(Cyan("远程界面: ")), Yellow("https://console.monibuca.com"))
|
||||
rp := struct {
|
||||
|
||||
@@ -190,7 +190,7 @@ func (opt *Plugin) registerHandler() {
|
||||
}
|
||||
|
||||
func (opt *Plugin) settingPath() string {
|
||||
return filepath.Join(settingDir, strings.ToLower(opt.Name)+".yaml")
|
||||
return filepath.Join(SettingDir, strings.ToLower(opt.Name)+".yaml")
|
||||
}
|
||||
|
||||
func (opt *Plugin) Save() error {
|
||||
|
||||
-216
@@ -1,216 +0,0 @@
|
||||
package engine
|
||||
|
||||
import (
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/pion/rtp"
|
||||
"github.com/yapingcat/gomedia/go-mpeg2"
|
||||
"go.uber.org/zap"
|
||||
"m7s.live/engine/v4/codec"
|
||||
"m7s.live/engine/v4/codec/mpegps"
|
||||
"m7s.live/engine/v4/codec/mpegts"
|
||||
. "m7s.live/engine/v4/track"
|
||||
"m7s.live/engine/v4/util"
|
||||
)
|
||||
|
||||
type cacheItem struct {
|
||||
Seq uint16
|
||||
*util.ListItem[util.Buffer]
|
||||
}
|
||||
|
||||
type PSPublisher struct {
|
||||
Publisher
|
||||
DisableReorder bool //是否禁用rtp重排序,TCP模式下应当禁用
|
||||
// mpegps.MpegPsStream `json:"-" yaml:"-"`
|
||||
// *mpegps.PSDemuxer `json:"-" yaml:"-"`
|
||||
mpegps.DecPSPackage `json:"-" yaml:"-"`
|
||||
reorder util.RTPReorder[*cacheItem]
|
||||
pool util.BytesPool
|
||||
lastSeq uint16
|
||||
}
|
||||
|
||||
// 解析rtp封装 https://www.ietf.org/rfc/rfc2250.txt
|
||||
func (p *PSPublisher) PushPS(rtp *rtp.Packet) {
|
||||
if p.Stream == nil {
|
||||
return
|
||||
}
|
||||
if p.pool == nil {
|
||||
// p.PSDemuxer = mpegps.NewPSDemuxer()
|
||||
// p.PSDemuxer.OnPacket = p.OnPacket
|
||||
// p.PSDemuxer.OnFrame = p.OnFrame
|
||||
p.EsHandler = p
|
||||
p.lastSeq = rtp.SequenceNumber - 1
|
||||
p.pool = make(util.BytesPool, 17)
|
||||
}
|
||||
if p.DisableReorder {
|
||||
p.Feed(rtp.Payload)
|
||||
p.lastSeq = rtp.SequenceNumber
|
||||
} else {
|
||||
item := p.pool.Get(len(rtp.Payload))
|
||||
copy(item.Value, rtp.Payload)
|
||||
for rtpPacket := p.reorder.Push(rtp.SequenceNumber, &cacheItem{rtp.SequenceNumber, item}); rtpPacket != nil; rtpPacket = p.reorder.Pop() {
|
||||
if rtpPacket.Seq != p.lastSeq+1 {
|
||||
p.Debug("drop", zap.Uint16("seq", rtpPacket.Seq), zap.Uint16("lastSeq", p.lastSeq))
|
||||
p.Reset()
|
||||
if p.VideoTrack != nil {
|
||||
p.SetLostFlag()
|
||||
}
|
||||
}
|
||||
p.Feed(rtpPacket.Value)
|
||||
p.lastSeq = rtpPacket.Seq
|
||||
rtpPacket.Recycle()
|
||||
}
|
||||
}
|
||||
}
|
||||
func (p *PSPublisher) OnFrame(frame []byte, cid mpeg2.PS_STREAM_TYPE, pts uint64, dts uint64) {
|
||||
switch cid {
|
||||
case mpeg2.PS_STREAM_AAC:
|
||||
if p.AudioTrack != nil {
|
||||
p.AudioTrack.WriteADTS(uint32(pts), frame)
|
||||
} else {
|
||||
p.AudioTrack = NewAAC(p.Publisher.Stream, p.pool)
|
||||
}
|
||||
case mpeg2.PS_STREAM_G711A:
|
||||
if p.AudioTrack != nil {
|
||||
p.AudioTrack.WriteRaw(uint32(pts), frame)
|
||||
} else {
|
||||
p.AudioTrack = NewG711(p.Publisher.Stream, true, p.pool)
|
||||
}
|
||||
case mpeg2.PS_STREAM_G711U:
|
||||
if p.AudioTrack != nil {
|
||||
p.AudioTrack.WriteRaw(uint32(pts), frame)
|
||||
} else {
|
||||
p.AudioTrack = NewG711(p.Publisher.Stream, false, p.pool)
|
||||
}
|
||||
case mpeg2.PS_STREAM_H264:
|
||||
if p.VideoTrack != nil {
|
||||
// p.WriteNalu(uint32(pts), uint32(dts), frame)
|
||||
p.WriteAnnexB(uint32(pts), uint32(dts), frame)
|
||||
} else {
|
||||
p.VideoTrack = NewH264(p.Publisher.Stream, p.pool)
|
||||
}
|
||||
case mpeg2.PS_STREAM_H265:
|
||||
if p.VideoTrack != nil {
|
||||
// p.WriteNalu(uint32(pts), uint32(dts), frame)
|
||||
p.WriteAnnexB(uint32(pts), uint32(dts), frame)
|
||||
} else {
|
||||
p.VideoTrack = NewH265(p.Publisher.Stream, p.pool)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PSPublisher) OnPacket(pkg mpeg2.Display, decodeResult error) {
|
||||
// switch value := pkg.(type) {
|
||||
// case *mpeg2.PSPackHeader:
|
||||
// // fd3.WriteString("--------------PS Pack Header--------------\n")
|
||||
// if decodeResult == nil {
|
||||
// // value.PrettyPrint(fd3)
|
||||
// } else {
|
||||
// // fd3.WriteString(fmt.Sprintf("Decode Ps Packet Failed %s\n", decodeResult.Error()))
|
||||
// }
|
||||
// case *mpeg2.System_header:
|
||||
// // fd3.WriteString("--------------System Header--------------\n")
|
||||
// if decodeResult == nil {
|
||||
// // value.PrettyPrint(fd3)
|
||||
// } else {
|
||||
// // fd3.WriteString(fmt.Sprintf("Decode Ps Packet Failed %s\n", decodeResult.Error()))
|
||||
// }
|
||||
// case *mpeg2.Program_stream_map:
|
||||
// // fd3.WriteString("--------------------PSM-------------------\n")
|
||||
// if decodeResult == nil {
|
||||
// // value.PrettyPrint(fd3)
|
||||
// } else {
|
||||
// // fd3.WriteString(fmt.Sprintf("Decode Ps Packet Failed %s\n", decodeResult.Error()))
|
||||
// }
|
||||
// case *mpeg2.PesPacket:
|
||||
// // fd3.WriteString("-------------------PES--------------------\n")
|
||||
// if decodeResult == nil {
|
||||
// // value.PrettyPrint(fd3)
|
||||
// } else {
|
||||
// // fd3.WriteString(fmt.Sprintf("Decode Ps Packet Failed %s\n", decodeResult.Error()))
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
func (p *PSPublisher) ReceiveVideo(es mpegps.MpegPsEsStream) {
|
||||
if p.VideoTrack == nil {
|
||||
switch es.Type {
|
||||
case mpegts.STREAM_TYPE_H264:
|
||||
p.VideoTrack = NewH264(p.Publisher.Stream, p.pool)
|
||||
case mpegts.STREAM_TYPE_H265:
|
||||
p.VideoTrack = NewH265(p.Publisher.Stream, p.pool)
|
||||
default:
|
||||
//推测编码类型
|
||||
var maybe264 codec.H264NALUType
|
||||
maybe264 = maybe264.Parse(es.Buffer[4])
|
||||
switch maybe264 {
|
||||
case codec.NALU_Non_IDR_Picture,
|
||||
codec.NALU_IDR_Picture,
|
||||
codec.NALU_SEI,
|
||||
codec.NALU_SPS,
|
||||
codec.NALU_PPS,
|
||||
codec.NALU_Access_Unit_Delimiter:
|
||||
p.VideoTrack = NewH264(p.Publisher.Stream, p.pool)
|
||||
default:
|
||||
p.Info("maybe h265", zap.Uint8("type", maybe264.Byte()))
|
||||
p.VideoTrack = NewH265(p.Publisher.Stream, p.pool)
|
||||
}
|
||||
}
|
||||
}
|
||||
payload, pts, dts := es.Buffer, es.PTS, es.DTS
|
||||
if dts == 0 {
|
||||
dts = pts
|
||||
}
|
||||
// if binary.BigEndian.Uint32(payload) != 1 {
|
||||
// panic("not annexb")
|
||||
// }
|
||||
p.WriteAnnexB(pts, dts, payload)
|
||||
}
|
||||
|
||||
func (p *PSPublisher) ReceiveAudio(es mpegps.MpegPsEsStream) {
|
||||
ts, payload := es.PTS, es.Buffer
|
||||
if p.AudioTrack == nil {
|
||||
switch es.Type {
|
||||
case mpegts.STREAM_TYPE_G711A:
|
||||
p.AudioTrack = NewG711(p.Publisher.Stream, true, p.pool)
|
||||
case mpegts.STREAM_TYPE_G711U:
|
||||
p.AudioTrack = NewG711(p.Publisher.Stream, false, p.pool)
|
||||
case mpegts.STREAM_TYPE_AAC:
|
||||
p.AudioTrack = NewAAC(p.Publisher.Stream, p.pool)
|
||||
p.WriteADTS(ts, payload)
|
||||
case 0: //推测编码类型
|
||||
if payload[0] == 0xff && payload[1]>>4 == 0xf {
|
||||
p.AudioTrack = NewAAC(p.Publisher.Stream)
|
||||
p.WriteADTS(ts, payload)
|
||||
}
|
||||
default:
|
||||
p.Error("audio type not supported yet", zap.Uint8("type", es.Type))
|
||||
}
|
||||
} else if es.Type == mpegts.STREAM_TYPE_AAC {
|
||||
p.WriteADTS(ts, payload)
|
||||
} else {
|
||||
p.WriteRaw(ts, payload)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PSPublisher) Replay(f *os.File) (err error) {
|
||||
var rtpPacket rtp.Packet
|
||||
defer f.Close()
|
||||
var t uint16
|
||||
for l := make([]byte, 6); !p.IsClosed(); time.Sleep(time.Millisecond * time.Duration(t)) {
|
||||
_, err = f.Read(l)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
payload := make([]byte, util.ReadBE[int](l[:4]))
|
||||
t = util.ReadBE[uint16](l[4:])
|
||||
_, err = f.Read(payload)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
rtpPacket.Unmarshal(payload)
|
||||
p.PushPS(&rtpPacket)
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -124,7 +124,8 @@ func FilterStreams[T IPublisher]() (ss []*Stream) {
|
||||
|
||||
type StreamTimeoutConfig struct {
|
||||
PublishTimeout time.Duration //发布者无数据后超时
|
||||
DelayCloseTimeout time.Duration //发布者丢失后等待
|
||||
DelayCloseTimeout time.Duration //无订阅者后超时,必须先有一次订阅才会激活
|
||||
IdleTimeout time.Duration //无订阅者后超时,不需要订阅即可激活
|
||||
}
|
||||
type Tracks struct {
|
||||
util.Map[string, Track]
|
||||
@@ -297,10 +298,18 @@ func (r *Stream) action(action StreamAction) (ok bool) {
|
||||
stateEvent = SEpublish{event}
|
||||
}
|
||||
r.Subscribers.Broadcast(stateEvent)
|
||||
r.timeout.Reset(r.PublishTimeout) // 5秒心跳,检测track的存活度
|
||||
if r.IdleTimeout > 0 && r.Subscribers.Len() == 0 {
|
||||
return r.action(ACTION_LASTLEAVE)
|
||||
} else {
|
||||
r.timeout.Reset(r.PublishTimeout) // 5秒心跳,检测track的存活度
|
||||
}
|
||||
case STATE_WAITCLOSE:
|
||||
stateEvent = SEwaitClose{event}
|
||||
r.timeout.Reset(r.DelayCloseTimeout)
|
||||
if r.IdleTimeout > 0 {
|
||||
r.timeout.Reset(r.IdleTimeout)
|
||||
} else {
|
||||
r.timeout.Reset(r.DelayCloseTimeout)
|
||||
}
|
||||
case STATE_CLOSED:
|
||||
for !r.actionChan.Close() {
|
||||
// 等待channel发送完毕,伪自旋锁
|
||||
@@ -361,7 +370,7 @@ func (s *Stream) onSuberClose(sub ISubscriber) {
|
||||
if s.Publisher != nil {
|
||||
s.Publisher.OnEvent(sub) // 通知Publisher有订阅者离开,在回调中可以去获取订阅者数量
|
||||
}
|
||||
if s.DelayCloseTimeout > 0 && s.Subscribers.Len() == 0 {
|
||||
if (s.DelayCloseTimeout > 0 || s.IdleTimeout > 0) && s.Subscribers.Len() == 0 {
|
||||
s.action(ACTION_LASTLEAVE)
|
||||
}
|
||||
}
|
||||
@@ -514,14 +523,23 @@ func (s *Stream) run() {
|
||||
if s.State == STATE_WAITPUBLISH {
|
||||
s.action(ACTION_PUBLISH)
|
||||
}
|
||||
pubConfig := s.GetPublisherConfig()
|
||||
name := v.Value.GetBase().Name
|
||||
if _, ok := v.Value.(*track.Video); ok && !pubConfig.PubVideo {
|
||||
v.Reject(ErrTrackMute)
|
||||
return
|
||||
}
|
||||
if _, ok := v.Value.(*track.Audio); ok && !pubConfig.PubAudio {
|
||||
v.Reject(ErrTrackMute)
|
||||
return
|
||||
}
|
||||
if s.Tracks.Add(name, v.Value) {
|
||||
v.Resolve()
|
||||
s.Subscribers.OnTrack(v.Value)
|
||||
if _, ok := v.Value.(*track.Video); ok && !s.GetPublisherConfig().PubAudio {
|
||||
if _, ok := v.Value.(*track.Video); ok && !pubConfig.PubAudio {
|
||||
s.Subscribers.AbortWait()
|
||||
}
|
||||
if _, ok := v.Value.(*track.Audio); ok && !s.GetPublisherConfig().PubVideo {
|
||||
if _, ok := v.Value.(*track.Audio); ok && !pubConfig.PubVideo {
|
||||
s.Subscribers.AbortWait()
|
||||
}
|
||||
// 这里重置的目的是当PublishTimeout设置很大的情况下,需要及时取消订阅者的等待
|
||||
|
||||
+5
-6
@@ -107,7 +107,7 @@ type ISubscriber interface {
|
||||
type TrackPlayer struct {
|
||||
context.Context
|
||||
context.CancelFunc
|
||||
AudioReader, VideoReader track.AVRingReader
|
||||
AudioReader, VideoReader *track.AVRingReader
|
||||
Audio *track.Audio
|
||||
Video *track.Video
|
||||
}
|
||||
@@ -142,9 +142,8 @@ func (s *Subscriber) OnEvent(event any) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Subscriber) CreateTrackReader(t *track.Media) (result track.AVRingReader) {
|
||||
result.Poll = s.Config.Poll
|
||||
result.Track = t
|
||||
func (s *Subscriber) CreateTrackReader(t *track.Media) (result *track.AVRingReader) {
|
||||
result = track.NewAVRingReader(t, s.Config.Poll)
|
||||
result.Logger = s.With(zap.String("track", t.Name))
|
||||
return
|
||||
}
|
||||
@@ -152,13 +151,13 @@ func (s *Subscriber) CreateTrackReader(t *track.Media) (result track.AVRingReade
|
||||
func (s *Subscriber) AddTrack(t Track) bool {
|
||||
switch v := t.(type) {
|
||||
case *track.Video:
|
||||
if s.VideoReader.Track != nil || !s.Config.SubVideo {
|
||||
if s.VideoReader != nil || !s.Config.SubVideo {
|
||||
return false
|
||||
}
|
||||
s.VideoReader = s.CreateTrackReader(&v.Media)
|
||||
s.Video = v
|
||||
case *track.Audio:
|
||||
if s.AudioReader.Track != nil || !s.Config.SubAudio {
|
||||
if s.AudioReader != nil || !s.Config.SubAudio {
|
||||
return false
|
||||
}
|
||||
s.AudioReader = s.CreateTrackReader(&v.Media)
|
||||
|
||||
@@ -33,6 +33,10 @@ func (dt *Data) Push(data any) {
|
||||
dt.Lock()
|
||||
defer dt.Unlock()
|
||||
}
|
||||
switch d := data.(type) {
|
||||
case util.Recyclable:
|
||||
d.Recycle()
|
||||
}
|
||||
dt.Value.WriteTime = time.Now()
|
||||
dt.Write(data)
|
||||
}
|
||||
|
||||
+18
-6
@@ -21,7 +21,7 @@ type AVRingReader struct {
|
||||
ctx context.Context
|
||||
Track *Media
|
||||
*util.Ring[common.AVFrame]
|
||||
Poll time.Duration
|
||||
wait func()
|
||||
State byte
|
||||
FirstSeq uint32
|
||||
FirstTs time.Duration
|
||||
@@ -39,17 +39,29 @@ func (r *AVRingReader) DecConfChanged() bool {
|
||||
return r.ConfSeq != r.Track.SequenceHeadSeq
|
||||
}
|
||||
|
||||
func (r *AVRingReader) wait() {
|
||||
if r.Poll == 0 {
|
||||
runtime.Gosched()
|
||||
} else {
|
||||
time.Sleep(r.Poll)
|
||||
func NewAVRingReader(t *Media, poll time.Duration) *AVRingReader {
|
||||
r := &AVRingReader{
|
||||
Track: t,
|
||||
}
|
||||
if poll == 0 {
|
||||
r.wait = runtime.Gosched
|
||||
} else {
|
||||
r.wait = func() {
|
||||
time.Sleep(poll)
|
||||
}
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *AVRingReader) ReadFrame() *common.AVFrame {
|
||||
for r.Frame = &r.Value; r.ctx.Err() == nil && !r.Frame.CanRead; r.wait() {
|
||||
}
|
||||
// 超过一半的缓冲区大小,说明Reader太慢,需要丢帧
|
||||
if r.State == READSTATE_NORMAL && r.Track.LastValue.Sequence-r.Frame.Sequence > uint32(r.Track.Size/2) && r.Track.IDRing != nil && r.Track.IDRing.Value.Sequence > r.Frame.Sequence {
|
||||
r.Warn("reader too slow", zap.Uint32("lastSeq", r.Track.LastValue.Sequence), zap.Uint32("seq", r.Frame.Sequence))
|
||||
r.Ring = r.Track.IDRing
|
||||
return r.ReadFrame()
|
||||
}
|
||||
return r.Frame
|
||||
}
|
||||
|
||||
|
||||
@@ -72,6 +72,23 @@ func (r *BLLReader) ReadN(n int) (result net.Buffers) {
|
||||
return
|
||||
}
|
||||
|
||||
func (r *BLLReader) WriteNTo(n int, result *net.Buffers) (actual int) {
|
||||
actual = n
|
||||
for r.CanRead() {
|
||||
l := r.Value.Len() - r.pos
|
||||
if l > n {
|
||||
*result = append(*result, r.Value[r.pos:r.pos+n])
|
||||
r.pos += n
|
||||
return
|
||||
}
|
||||
*result = append(*result, r.Value[r.pos:])
|
||||
n -= l
|
||||
r.ListItem = r.Next
|
||||
r.pos = 0
|
||||
}
|
||||
return actual - n
|
||||
}
|
||||
|
||||
func (r *BLLReader) GetOffset() int {
|
||||
return r.pos
|
||||
}
|
||||
|
||||
+8
-2
@@ -6,15 +6,21 @@ import (
|
||||
"log"
|
||||
"os"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
func init() {
|
||||
logFile, err := os.OpenFile("./fatal.log", os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666)
|
||||
fatal_log := "./fatal.log"
|
||||
if _fatal_log := os.Getenv("M7S_FATAL_LOG"); _fatal_log != "" {
|
||||
fatal_log = _fatal_log
|
||||
}
|
||||
logFile, err := os.OpenFile(fatal_log, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666)
|
||||
if err != nil {
|
||||
log.Println("服务启动出错", "打开异常日志文件失败", err)
|
||||
return
|
||||
}
|
||||
// 将进程标准出错重定向至文件,进程崩溃时运行时将向该文件记录协程调用栈信息
|
||||
syscall.Dup2(int(logFile.Fd()), int(os.Stderr.Fd()))
|
||||
os.Stderr.WriteString("\n--------------------------------\n")
|
||||
|
||||
os.Stderr.WriteString("\n" + time.Now().Format("2006-01-02 15:04:05") + "--------------------------------\n")
|
||||
}
|
||||
|
||||
@@ -6,15 +6,20 @@ import (
|
||||
"log"
|
||||
"os"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
func init() {
|
||||
logFile, err := os.OpenFile("./fatal.log", os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666)
|
||||
fatal_log := "./fatal.log"
|
||||
if _fatal_log := os.Getenv("M7S_FATAL_LOG"); _fatal_log != "" {
|
||||
fatal_log = _fatal_log
|
||||
}
|
||||
logFile, err := os.OpenFile(fatal_log, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666)
|
||||
if err != nil {
|
||||
log.Println("服务启动出错", "打开异常日志文件失败", err)
|
||||
return
|
||||
}
|
||||
// 将进程标准出错重定向至文件,进程崩溃时运行时将向该文件记录协程调用栈信息
|
||||
syscall.Dup3(int(logFile.Fd()), int(os.Stderr.Fd()), 0)
|
||||
os.Stderr.WriteString("\n--------------------------------\n")
|
||||
os.Stderr.WriteString("\n" + time.Now().Format("2006-01-02 15:04:05") + "--------------------------------\n")
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"log"
|
||||
"os"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -26,12 +27,16 @@ func setStdHandle(stdhandle int32, handle syscall.Handle) error {
|
||||
|
||||
// redirectStderr to the file passed in
|
||||
func init() {
|
||||
logFile, err := os.OpenFile("./fatal.log", os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666)
|
||||
fatal_log := "./fatal.log"
|
||||
if _fatal_log := os.Getenv("M7S_FATAL_LOG"); _fatal_log != "" {
|
||||
fatal_log = _fatal_log
|
||||
}
|
||||
logFile, err := os.OpenFile(fatal_log, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666)
|
||||
err = setStdHandle(syscall.STD_ERROR_HANDLE, syscall.Handle(logFile.Fd()))
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to redirect stderr to file: %v", err)
|
||||
}
|
||||
// SetStdHandle does not affect prior references to stderr
|
||||
os.Stderr = logFile
|
||||
os.Stderr.WriteString("\n--------------------------------\n")
|
||||
os.Stderr.WriteString("\n" + time.Now().Format("2006-01-02 15:04:05") + "--------------------------------\n")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user