feat: add av1

This commit is contained in:
langhuihui
2023-11-15 15:05:02 +08:00
parent 0f2869b866
commit 2b658ec786
6 changed files with 171 additions and 4 deletions
+70
View File
@@ -0,0 +1,70 @@
package codec
import (
"errors"
"io"
)
var (
ErrInvalidMarker = errors.New("invalid marker value found in AV1CodecConfigurationRecord")
ErrInvalidVersion = errors.New("unsupported AV1CodecConfigurationRecord version")
ErrNonZeroReservedBits = errors.New("non-zero reserved bits found in AV1CodecConfigurationRecord")
)
type AV1CodecConfigurationRecord struct {
Version byte
SeqProfile byte
SeqLevelIdx0 byte
SeqTier0 byte
HighBitdepth byte
TwelveBit byte
MonoChrome byte
ChromaSubsamplingX byte
ChromaSubsamplingY byte
ChromaSamplePosition byte
InitialPresentationDelayPresent byte
InitialPresentationDelayMinusOne byte
ConfigOBUs []byte
}
func (p *AV1CodecConfigurationRecord) Unmarshal(data []byte) (n int, err error) {
l := len(data)
if l < 4 {
err = io.ErrShortWrite
return
}
Marker := data[0] >> 7
if Marker != 1 {
return 0, ErrInvalidMarker
}
p.Version = data[0] & 0x7F
if p.Version != 1 {
return 1, ErrInvalidVersion
}
p.SeqProfile = data[1] >> 5
p.SeqLevelIdx0 = data[1] & 0x1F
p.SeqTier0 = data[2] >> 7
p.HighBitdepth = (data[2] >> 6) & 0x01
p.TwelveBit = (data[2] >> 5) & 0x01
p.MonoChrome = (data[2] >> 4) & 0x01
p.ChromaSubsamplingX = (data[2] >> 3) & 0x01
p.ChromaSubsamplingY = (data[2] >> 2) & 0x01
p.ChromaSamplePosition = data[2] & 0x03
if data[3]>>5 != 0 {
return 3, ErrNonZeroReservedBits
}
p.InitialPresentationDelayPresent = (data[3] >> 4) & 0x01
if p.InitialPresentationDelayPresent == 1 {
p.InitialPresentationDelayMinusOne = data[3] & 0x0F
} else {
if data[3]&0x0F != 0 {
return 3, ErrNonZeroReservedBits
}
p.InitialPresentationDelayMinusOne = 0
}
if l > 4 {
p.ConfigOBUs = data[4:]
}
return l, nil
}
+2 -2
View File
@@ -95,8 +95,8 @@ const (
var AudNalu = []byte{0x00, 0x00, 0x00, 0x01, 0x46, 0x01, 0x10}
var ErrHevc = errors.New("hevc parse config error")
var FourCC_H265 = []byte{'h', 'v', 'c', '1'}
var FourCC_H265_32 = util.BigEndian.Uint32(FourCC_H265)
var FourCC_H265_32 = util.BigEndian.Uint32([]byte{'h', 'v', 'c', '1'})
var FourCC_AV1_32 = util.BigEndian.Uint32([]byte{'a', 'v', '0', '1'})
// HVCC
type HVCDecoderConfigurationRecord struct {
PicWidthInLumaSamples uint32 // sps
+5 -1
View File
@@ -72,9 +72,13 @@ func (p *Publisher) WriteAVCCVideo(ts uint32, frame *util.BLL, pool util.BytesPo
// https://github.com/veovera/enhanced-rtmp/blob/main/enhanced-rtmp-v1.pdf
if isExtHeader := b0 & 0b1000_0000; isExtHeader != 0 {
fourCC := frame.GetUintN(1, 4)
if fourCC == codec.FourCC_H265_32 {
switch fourCC {
case codec.FourCC_H265_32:
p.VideoTrack = track.NewH265(p.Stream, pool)
p.VideoTrack.WriteAVCC(ts, frame)
case codec.FourCC_AV1_32:
p.VideoTrack = track.NewAV1(p.Stream, pool)
p.VideoTrack.WriteAVCC(ts, frame)
}
} else {
if frame.GetByte(1) == 0 {
+3
View File
@@ -52,6 +52,9 @@ func (pub *Puller) startPull(puller IPuller) {
stream = pub.Stream
if stream != nil {
puller.Error("puller already exists", zap.Int8("streamState", int8(stream.State)))
if stream.State == STATE_CLOSED {
oldPuller.(IPuller).Stop(zap.String("reason", "dead puller"))
}
} else {
puller.Error("puller already exists", zap.Time("createAt", pub.StartTime))
}
+6 -1
View File
@@ -6,6 +6,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"
@@ -196,10 +197,13 @@ func (tracks *Tracks) MarshalJSON() ([]byte, error) {
return json.Marshal(trackList)
}
var streamIdGen atomic.Uint32
// Stream 流定义
type Stream struct {
timeout *time.Timer //当前状态的超时定时器
actionChan util.SafeChan[any]
ID uint32 // 流ID
*log.Logger
StartTime time.Time //创建时间
StreamTimeoutConfig
@@ -276,13 +280,14 @@ func findOrCreateStream(streamPath string, waitTimeout time.Duration) (s *Stream
AppName: p[0],
StreamName: strings.Join(p[1:], "/"),
StartTime: time.Now(),
Logger: log.LocaleLogger.With(zap.String("stream", streamPath)),
timeout: time.NewTimer(waitTimeout),
})
if s := actual.(*Stream); loaded {
s.Debug("Stream Found")
return s, false
} else {
s.ID = streamIdGen.Add(1)
s.Logger = log.LocaleLogger.With(zap.String("stream", streamPath), zap.Uint32("id", s.ID))
s.Subscribers.Init()
s.actionChan.Init(10)
s.Info("created")
+85
View File
@@ -1,6 +1,10 @@
package track
import (
"io"
"github.com/bluenviron/gortsplib/v4/pkg/format/rtpav1"
"go.uber.org/zap"
"m7s.live/engine/v4/codec"
. "m7s.live/engine/v4/common"
"m7s.live/engine/v4/util"
@@ -10,6 +14,7 @@ var _ SpesificTrack = (*AV1)(nil)
type AV1 struct {
Video
decoder rtpav1.Decoder
}
func NewAV1(stream IStream, stuff ...any) (vt *AV1) {
@@ -23,3 +28,83 @@ func NewAV1(stream IStream, stuff ...any) (vt *AV1) {
vt.dtsEst = NewDTSEstimator()
return
}
func (vt *AV1) writeSequenceHead(head []byte) (err error) {
vt.WriteSequenceHead(head)
var info codec.AV1CodecConfigurationRecord
info.Unmarshal(head[5:])
vt.ParamaterSets[0] = info.ConfigOBUs
return
}
func (vt *AV1) WriteAVCC(ts uint32, frame *util.BLL) (err error) {
if l := frame.ByteLength; l < 6 {
vt.Error("AVCC data too short", zap.Int("len", l))
return io.ErrShortWrite
}
b0 := frame.GetByte(0)
if isExtHeader := (b0 >> 4) & 0b1000; isExtHeader != 0 {
firstBuffer := frame.Next.Value
packetType := b0 & 0b1111
switch packetType {
case codec.PacketTypeSequenceStart:
header := frame.ToBytes()
header[0] = 0x1d
header[1] = 0x00
header[2] = 0x00
header[3] = 0x00
header[4] = 0x00
err = vt.writeSequenceHead(header)
frame.Recycle()
return
case codec.PacketTypeCodedFrames:
firstBuffer[0] = b0 & 0b0111_1111 & 0xFD
firstBuffer[1] = 0x01
copy(firstBuffer[2:], firstBuffer[5:])
frame.Next.Value = firstBuffer[:firstBuffer.Len()-3]
frame.ByteLength -= 3
return vt.Video.WriteAVCC(ts, frame)
case codec.PacketTypeCodedFramesX:
firstBuffer[0] = b0 & 0b0111_1111 & 0xFD
firstBuffer[1] = 0x01
firstBuffer[2] = 0
firstBuffer[3] = 0
firstBuffer[4] = 0
return vt.Video.WriteAVCC(ts, frame)
}
} else {
if frame.GetByte(1) == 0 {
err = vt.writeSequenceHead(frame.ToBytes())
frame.Recycle()
return
} else {
return vt.Video.WriteAVCC(ts, frame)
}
}
return
}
func (vt *AV1) WriteRTPFrame(rtpItem *util.ListItem[RTPFrame]) {
defer func() {
err := recover()
if err != nil {
vt.Error("WriteRTPFrame panic", zap.Any("err", err))
vt.Stream.Close()
}
}()
if vt.lastSeq != vt.lastSeq2+1 && vt.lastSeq2 != 0 {
vt.lostFlag = true
vt.Warn("lost rtp packet", zap.Uint16("lastSeq", vt.lastSeq), zap.Uint16("lastSeq2", vt.lastSeq2))
}
frame := &rtpItem.Value
rv := vt.Value
rv.RTP.Push(rtpItem)
obus, err := vt.decoder.Decode(frame.Packet)
for _, obu := range obus {
rv.AUList.Push(vt.BytesPool.GetShell(obu))
}
if err == nil {
vt.generateTimestamp(frame.Timestamp)
vt.Flush()
}
}