From 938f23955b8de7c728c9a7bf0328403b8573a5cb Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Thu, 4 Jul 2024 19:12:13 +0800 Subject: [PATCH] feat: aac --- api.go | 13 +- example/default/config.yaml | 4 - example/rtsp-push/config2.yaml | 1 - pkg/annexb.go | 54 +++---- pkg/av-reader.go | 7 +- pkg/avframe.go | 71 ++++---- pkg/codec/audio.go | 39 ++++- pkg/codec/av1.go | 9 +- pkg/codec/h264.go | 5 +- pkg/codec/h265.go | 5 +- pkg/codec/index.go | 7 + pkg/config/config.go | 10 +- pkg/config/types.go | 17 -- pkg/track.go | 5 +- pkg/util/buf-reader.go | 5 +- pkg/util/buffer_test.go | 2 +- pkg/util/buffers.go | 27 ++-- pkg/util/mem.go | 10 +- pkg/util/range.go | 21 ++- plugin.go | 10 +- plugin/rtmp/pkg/audio.go | 77 +++++---- plugin/rtmp/pkg/codec.go | 25 +-- plugin/rtmp/pkg/const.go | 4 + plugin/rtmp/pkg/video.go | 103 ++++++------ plugin/rtp/pkg/audio.go | 286 ++++++++++++++++++++++++++++++--- plugin/rtp/pkg/video.go | 175 +++++++++----------- plugin/rtp/pkg/video_test.go | 16 +- plugin/webrtc/index.go | 2 +- publisher.go | 57 +++---- server.go | 13 +- subscriber.go | 148 +++++++++-------- 31 files changed, 723 insertions(+), 505 deletions(-) create mode 100644 pkg/codec/index.go diff --git a/api.go b/api.go index 40c39fd..acfdecc 100644 --- a/api.go +++ b/api.go @@ -72,8 +72,7 @@ func (s *Server) api_Stream_AnnexB_(rw http.ResponseWriter, r *http.Request) { } defer reader.StopRead() if reader.Value.Raw == nil { - reader.Value.Raw, err = reader.Value.Wraps[0].ToRaw(publisher.VideoTrack.ICodecCtx) - if err != nil { + if err = reader.Value.Demux(publisher.VideoTrack.ICodecCtx); err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) return } @@ -81,17 +80,13 @@ func (s *Server) api_Stream_AnnexB_(rw http.ResponseWriter, r *http.Request) { var annexb pkg.AnnexB var t pkg.AVTrack - err = annexb.DecodeConfig(&t, publisher.VideoTrack.ICodecCtx) + err = annexb.ConvertCtx(publisher.VideoTrack.ICodecCtx, &t) if t.ICodecCtx == nil { http.Error(rw, "unsupported codec", http.StatusInternalServerError) return } - frame, err := t.CreateFrame(&reader.Value) - if err != nil { - http.Error(rw, err.Error(), http.StatusInternalServerError) - return - } - _, err = frame.(*pkg.AnnexB).WriteTo(rw) + annexb.Mux(t.ICodecCtx, &reader.Value) + _, err = annexb.WriteTo(rw) } func (s *Server) getStreamInfo(pub *Publisher) (res *pb.StreamInfoResponse, err error) { diff --git a/example/default/config.yaml b/example/default/config.yaml index 03fd6d1..63b5fef 100644 --- a/example/default/config.yaml +++ b/example/default/config.yaml @@ -3,8 +3,6 @@ global: enableauth: true tcp: listenaddr: :50051 - publish: - pubaudio: false # ringsize: 20-250 # buffertime: 10s # speed: 1 @@ -13,8 +11,6 @@ console: logrotate: level: debug rtsp: - subscribe: - subaudio: false rtmp: # tcp: # listenaddr: :11935 diff --git a/example/rtsp-push/config2.yaml b/example/rtsp-push/config2.yaml index a4a8bc9..02e489b 100644 --- a/example/rtsp-push/config2.yaml +++ b/example/rtsp-push/config2.yaml @@ -12,7 +12,6 @@ rtsp: pushlist: live/test: rtsp://localhost/live/test hdl: - publish: pubaudio: false pull: diff --git a/pkg/annexb.go b/pkg/annexb.go index d6ce162..fa38427 100644 --- a/pkg/annexb.go +++ b/pkg/annexb.go @@ -24,13 +24,8 @@ func (a *AnnexB) Dump(t byte, w io.Writer) { } // DecodeConfig implements pkg.IAVFrame. -func (a *AnnexB) DecodeConfig(t *AVTrack, ctx ICodecCtx) error { - switch c := ctx.(type) { - case codec.IH264Ctx: - var annexb264 Annexb264Ctx - annexb264.H264Ctx = *c.GetH264Ctx() - t.ICodecCtx = &annexb264 - } +func (a *AnnexB) ConvertCtx(ctx codec.ICodecCtx, t *AVTrack) error { + t.ICodecCtx = ctx.GetBase() return nil } @@ -42,9 +37,12 @@ func (a *AnnexB) GetSize() int { func (a *AnnexB) GetTimestamp() time.Duration { return a.DTS * time.Millisecond / 90 } +func (a *AnnexB) GetCTS() time.Duration { + return (a.PTS - a.DTS) * time.Millisecond / 90 +} // Parse implements pkg.IAVFrame. -func (a *AnnexB) Parse(t *AVTrack) (isIDR bool, isSeq bool, raw any, err error) { +func (a *AnnexB) Parse(t *AVTrack) (err error) { panic("unimplemented") } @@ -53,35 +51,25 @@ func (a *AnnexB) String() string { return fmt.Sprintf("%d %d", a.DTS, a.Memory.Size) } -// ToRaw implements pkg.IAVFrame. -func (a *AnnexB) ToRaw(ctx ICodecCtx) (any, error) { - // var nalus Nalus - // nalus.PTS = a.PTS - // nalus.DTS = a.DTS +// Demux implements pkg.IAVFrame. +func (a *AnnexB) Demux(ctx codec.ICodecCtx) (any, error) { panic("unimplemented") } -type Annexb264Ctx struct { - codec.H264Ctx -} - -type Annexb265Ctx struct { - codec.H265Ctx -} - -func (a *Annexb264Ctx) CreateFrame(frame *AVFrame) (IAVFrame, error) { - var annexb AnnexB - // annexb.RecyclableBuffers.ScalableMemoryAllocator = frame.Wraps[0].GetScalableMemoryAllocator() - annexb.Append(codec.NALU_Delimiter2) +func (a *AnnexB) Mux(codecCtx codec.ICodecCtx, frame *AVFrame) { + a.AppendOne(codec.NALU_Delimiter2) if frame.IDR { - annexb.Append(a.SPS[0], codec.NALU_Delimiter2, a.PPS[0], codec.NALU_Delimiter2) - } - var nalus = frame.Raw.(Nalus) - for i, nalu := range nalus.Nalus { - if i > 0 { - annexb.Append(codec.NALU_Delimiter1) + switch ctx := codecCtx.(type) { + case *codec.H264Ctx: + a.Append(ctx.SPS[0], codec.NALU_Delimiter2, ctx.PPS[0], codec.NALU_Delimiter2) + case *codec.H265Ctx: + a.Append(ctx.SPS[0], codec.NALU_Delimiter2, ctx.PPS[0], codec.NALU_Delimiter2, ctx.VPS[0], codec.NALU_Delimiter2) } - annexb.Append(nalu.Buffers...) } - return &annexb, nil + for i, nalu := range frame.Raw.(Nalus) { + if i > 0 { + a.AppendOne(codec.NALU_Delimiter1) + } + a.Append(nalu.Buffers...) + } } diff --git a/pkg/av-reader.go b/pkg/av-reader.go index 1d569b8..92e587f 100644 --- a/pkg/av-reader.go +++ b/pkg/av-reader.go @@ -3,6 +3,7 @@ package pkg import ( "context" "log/slog" + "m7s.live/m7s/v5/pkg/codec" "m7s.live/m7s/v5/pkg/config" "time" ) @@ -27,7 +28,7 @@ type AVRingReader struct { FirstTs time.Duration SkipTs time.Duration //ms beforeJump time.Duration - LastCodecCtx ICodecCtx + LastCodecCtx codec.ICodecCtx startTime time.Time AbsTime uint32 Delay uint32 @@ -143,11 +144,11 @@ func (r *AVRingReader) ReadFrame(conf *config.Subscribe) (err error) { } // func (r *AVRingReader) GetPTS32() uint32 { -// return uint32((r.Value.Raw.PTS - r.SkipTs*90/time.Millisecond)) +// return uint32((r.Value.Raw.Timestamp - r.SkipTs*90/time.Millisecond)) // } // func (r *AVRingReader) GetDTS32() uint32 { -// return uint32((r.Value.DTS - r.SkipTs*90/time.Millisecond)) +// return uint32((r.Value.CTS - r.SkipTs*90/time.Millisecond)) // } func (r *AVRingReader) ResetAbsTime() { diff --git a/pkg/avframe.go b/pkg/avframe.go index dc5f430..e26d9af 100644 --- a/pkg/avframe.go +++ b/pkg/avframe.go @@ -12,47 +12,49 @@ import ( ) type ( - ICodecCtx interface { - CreateFrame(*AVFrame) (IAVFrame, error) - FourCC() codec.FourCC - GetInfo() string - } IAudioCodecCtx interface { - ICodecCtx + codec.ICodecCtx GetSampleRate() int GetChannels() int GetSampleSize() int } IVideoCodecCtx interface { - ICodecCtx + codec.ICodecCtx GetWidth() int GetHeight() int } IDataFrame interface { } + // Source -> Parse -> Demux -> (ConvertCtx) -> Mux(GetAllocator) -> Recycle IAVFrame interface { - GetScalableMemoryAllocator() *util.ScalableMemoryAllocator - Parse(*AVTrack) (bool, bool, any, error) - DecodeConfig(*AVTrack, ICodecCtx) error - ToRaw(ICodecCtx) (any, error) + GetAllocator() *util.ScalableMemoryAllocator + SetAllocator(*util.ScalableMemoryAllocator) + Parse(*AVTrack) error // get codec info, idr + ConvertCtx(codec.ICodecCtx, *AVTrack) error // convert codec from source stream + Demux(codec.ICodecCtx) (any, error) // demux to raw format + Mux(codec.ICodecCtx, *AVFrame) // mux from raw format GetTimestamp() time.Duration + GetCTS() time.Duration GetSize() int Recycle() String() string Dump(byte, io.Writer) } - Nalus struct { - PTS time.Duration - DTS time.Duration - Nalus []util.Memory - } + Nalus []util.Memory + + AudioData = util.Memory + + OBUs AudioData + AVFrame struct { DataFrame IDR bool Timestamp time.Duration // 绝对时间戳 + CTS time.Duration // composition time stamp Wraps []IAVFrame // 封装格式 } + AVRing = util.Ring[AVFrame] DataFrame struct { sync.RWMutex @@ -67,6 +69,7 @@ var _ IAVFrame = (*AnnexB)(nil) func (frame *AVFrame) Reset() { frame.Timestamp = 0 + frame.Raw = nil if len(frame.Wraps) > 0 { for _, wrap := range frame.Wraps { wrap.Recycle() @@ -80,6 +83,11 @@ func (frame *AVFrame) Discard() { frame.Reset() } +func (frame *AVFrame) Demux(codecCtx codec.ICodecCtx) (err error) { + frame.Raw, err = frame.Wraps[0].Demux(codecCtx) + return +} + func (df *DataFrame) StartWrite() bool { if df.TryLock() { return true @@ -95,15 +103,15 @@ func (df *DataFrame) Ready() { } func (nalus *Nalus) H264Type() codec.H264NALUType { - return codec.ParseH264NALUType(nalus.Nalus[0].Buffers[0][0]) + return codec.ParseH264NALUType((*nalus)[0].Buffers[0][0]) } func (nalus *Nalus) H265Type() codec.H265NALUType { - return codec.ParseH265NALUType(nalus.Nalus[0].Buffers[0][0]) + return codec.ParseH265NALUType((*nalus)[0].Buffers[0][0]) } func (nalus *Nalus) Append(bytes []byte) { - nalus.Nalus = append(nalus.Nalus, util.Memory{Buffers: net.Buffers{bytes}, Size: len(bytes)}) + *nalus = append(*nalus, util.Memory{Buffers: net.Buffers{bytes}, Size: len(bytes)}) } func (nalus *Nalus) ParseAVCC(reader *util.MemoryReader, naluSizeLen int) error { @@ -112,27 +120,26 @@ func (nalus *Nalus) ParseAVCC(reader *util.MemoryReader, naluSizeLen int) error if err != nil { return err } - reader.RangeN(l, nalus.Append) + var mem util.Memory + reader.RangeN(int(l), mem.AppendOne) + *nalus = append(*nalus, mem) } return nil } -type OBUs struct { - PTS time.Duration - OBUs []net.Buffers -} - -func (obus *OBUs) Append(bytes ...[]byte) { - obus.OBUs = append(obus.OBUs, bytes) -} - func (obus *OBUs) ParseAVCC(reader *util.MemoryReader) error { var obuHeader av1.OBUHeader startLen := reader.Length for reader.Length > 0 { offset := reader.Size - reader.Length - b, _ := reader.ReadByte() - obuHeader.Unmarshal([]byte{b}) + b, err := reader.ReadByte() + if err != nil { + return err + } + err = obuHeader.Unmarshal([]byte{b}) + if err != nil { + return err + } // if log.Trace { // vt.Trace("obu", zap.Any("type", obuHeader.Type), zap.Bool("iframe", vt.Value.IFrame)) // } @@ -144,7 +151,7 @@ func (obus *OBUs) ParseAVCC(reader *util.MemoryReader) error { if err != nil { return err } - obus.Append(obu) + (*AudioData)(obus).AppendOne(obu) } return nil } diff --git a/pkg/codec/audio.go b/pkg/codec/audio.go index 5f4417d..5f43e2c 100644 --- a/pkg/codec/audio.go +++ b/pkg/codec/audio.go @@ -1,21 +1,28 @@ package codec +import ( + "fmt" +) + type ( AudioCtx struct { SampleRate int Channels int SampleSize int } - PCMACtx AudioCtx - PCMUCtx AudioCtx - OPUSCtx AudioCtx - AACCtx struct { + PCMACtx struct { + AudioCtx + } + PCMUCtx struct { + AudioCtx + } + OPUSCtx struct { + AudioCtx + } + AACCtx struct { AudioCtx Asc []byte } - IAACCtx interface { - GetAACCtx() *AACCtx - } ) func (ctx *AudioCtx) GetSampleRate() int { @@ -30,7 +37,11 @@ func (ctx *AudioCtx) GetSampleSize() int { return ctx.SampleSize } -func (ctx *AACCtx) GetAACCtx() *AACCtx { +func (ctx *AudioCtx) GetInfo() string { + return fmt.Sprintf("sample rate: %d, channels: %d, sample size: %d", ctx.SampleRate, ctx.Channels, ctx.SampleSize) +} + +func (ctx *AACCtx) GetBase() ICodecCtx { return ctx } @@ -42,6 +53,14 @@ func (*PCMACtx) FourCC() FourCC { return FourCC_ALAW } +func (ctx *PCMACtx) GetBase() ICodecCtx { + return ctx +} + +func (ctx *PCMUCtx) GetBase() ICodecCtx { + return ctx +} + func (*AACCtx) FourCC() FourCC { return FourCC_MP4A } @@ -49,3 +68,7 @@ func (*AACCtx) FourCC() FourCC { func (*OPUSCtx) FourCC() FourCC { return FourCC_OPUS } + +func (ctx *OPUSCtx) GetBase() ICodecCtx { + return ctx +} diff --git a/pkg/codec/av1.go b/pkg/codec/av1.go index 4504648..f0963af 100644 --- a/pkg/codec/av1.go +++ b/pkg/codec/av1.go @@ -13,15 +13,16 @@ const ( ) type ( - IAV1Ctx interface { - GetAV1Ctx() *AV1Ctx - } AV1Ctx struct { ConfigOBUs []byte } ) -func (ctx *AV1Ctx) GetAV1Ctx() *AV1Ctx { +func (ctx *AV1Ctx) GetInfo() string { + return "AV1" +} + +func (ctx *AV1Ctx) GetBase() ICodecCtx { return ctx } diff --git a/pkg/codec/h264.go b/pkg/codec/h264.go index e51724c..7754c71 100644 --- a/pkg/codec/h264.go +++ b/pkg/codec/h264.go @@ -106,9 +106,6 @@ func SplitH264(payload []byte) (nalus [][]byte) { } type ( - IH264Ctx interface { - GetH264Ctx() *H264Ctx - } H264Ctx struct { SPSInfo SPS [][]byte @@ -132,6 +129,6 @@ func (h264 *H264Ctx) GetHeight() int { return int(h264.Height) } -func (h264 *H264Ctx) GetH264Ctx() *H264Ctx { +func (h264 *H264Ctx) GetBase() ICodecCtx { return h264 } diff --git a/pkg/codec/h265.go b/pkg/codec/h265.go index f9bf5a2..04c42bc 100644 --- a/pkg/codec/h265.go +++ b/pkg/codec/h265.go @@ -90,9 +90,6 @@ const ( var AudNalu = []byte{0x00, 0x00, 0x00, 0x01, 0x46, 0x01, 0x10} type ( - IH265Ctx interface { - GetH265Ctx() *H265Ctx - } H265Ctx struct { H264Ctx VPS [][]byte @@ -115,6 +112,6 @@ func (*H265Ctx) FourCC() FourCC { return FourCC_H265 } -func (h265 *H265Ctx) GetH265Ctx() *H265Ctx { +func (h265 *H265Ctx) GetBase() ICodecCtx { return h265 } diff --git a/pkg/codec/index.go b/pkg/codec/index.go new file mode 100644 index 0000000..c78e29f --- /dev/null +++ b/pkg/codec/index.go @@ -0,0 +1,7 @@ +package codec + +type ICodecCtx interface { + FourCC() FourCC + GetInfo() string + GetBase() ICodecCtx +} diff --git a/pkg/config/config.go b/pkg/config/config.go index 280e9a6..2f94c91 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -30,9 +30,10 @@ type Config struct { tag reflect.StructTag } -var durationType = reflect.TypeOf(time.Duration(0)) -var regexpType = reflect.TypeOf(Regexp{}) -var regexpYaml = regexp.MustCompile(`^(.+: )"(.+)"$`) +var ( + durationType = reflect.TypeOf(time.Duration(0)) + regexpType = reflect.TypeOf(Regexp{}) +) func (config *Config) Range(f func(key string, value Config)) { if m, ok := config.GetValue().(map[string]Config); ok { @@ -261,8 +262,7 @@ func (config *Config) valueWithoutModify() any { } func equal(vwm, v any) bool { - ft := reflect.TypeOf(vwm) - switch ft { + switch ft := reflect.TypeOf(vwm); ft { case regexpType: return vwm.(Regexp).String() == v.(Regexp).String() default: diff --git a/pkg/config/types.go b/pkg/config/types.go index 504a632..e3c6d3c 100755 --- a/pkg/config/types.go +++ b/pkg/config/types.go @@ -156,23 +156,6 @@ func (p *Push) CheckPush(streamPath string) string { return url } -type Console struct { - Server string `default:"console.monibuca.com:44944" desc:"远程控制台地址"` //远程控制台地址 - Secret string `desc:"远程控制台密钥"` //远程控制台密钥 - PublicAddr string `desc:"远程控制台公网地址"` //公网地址,提供远程控制台访问的地址,不配置的话使用自动识别的地址 - PublicAddrTLS string `desc:"远程控制台公网TLS地址"` -} - -type Engine struct { - EnableSubEvent bool `default:"true" desc:"启用订阅事件,禁用可以提高性能"` //启用订阅事件,禁用可以提高性能 - LogLang string `default:"zh" desc:"日志语言" enum:"zh:中文,en:英文"` //日志语言 - SettingDir string `default:".m7s" desc:""` - EventBusSize int `default:"10" desc:"事件总线大小"` //事件总线大小 - PulseInterval time.Duration `default:"5s" desc:"心跳事件间隔"` //心跳事件间隔 - DisableAll bool `default:"false" desc:"禁用所有插件"` //禁用所有插件 - RTPReorderBufferLen int `default:"50" desc:"RTP重排序缓冲区长度"` //RTP重排序缓冲区长度 -} - type Common struct { PublicIP string LogLevel string `default:"info" enum:"trace:跟踪,debug:调试,info:信息,warn:警告,error:错误"` //日志级别 diff --git a/pkg/track.go b/pkg/track.go index c16c430..24471a7 100644 --- a/pkg/track.go +++ b/pkg/track.go @@ -3,6 +3,7 @@ package pkg import ( "context" "log/slog" + "m7s.live/m7s/v5/pkg/codec" "m7s.live/m7s/v5/pkg/config" "reflect" "time" @@ -29,7 +30,7 @@ type ( AVTrack struct { Track *RingWriter - ICodecCtx + codec.ICodecCtx Allocator *util.ScalableMemoryAllocator SequenceFrame IAVFrame WrapIndex int @@ -42,7 +43,7 @@ func NewAVTrack(args ...any) (t *AVTrack) { switch v := arg.(type) { case IAVFrame: t.FrameType = reflect.TypeOf(v) - t.Allocator = v.GetScalableMemoryAllocator() + t.Allocator = v.GetAllocator() case reflect.Type: t.FrameType = v case *slog.Logger: diff --git a/pkg/util/buf-reader.go b/pkg/util/buf-reader.go index 8a8a773..84d4f2c 100644 --- a/pkg/util/buf-reader.go +++ b/pkg/util/buf-reader.go @@ -146,10 +146,7 @@ func (r *BufReader) ReadString(n int) (s string, err error) { } func (r *BufReader) ReadBytes(n int) (mem Memory, err error) { - err = r.ReadRange(n, func(buf []byte) { - mem.Buffers = append(mem.Buffers, buf) - }) - mem.Size = n + err = r.ReadRange(n, mem.AppendOne) return } diff --git a/pkg/util/buffer_test.go b/pkg/util/buffer_test.go index b7b1f11..4eb1f1d 100644 --- a/pkg/util/buffer_test.go +++ b/pkg/util/buffer_test.go @@ -24,7 +24,7 @@ func TestReadBytesTo(t *testing.T) { s := RandomString(100) t.Logf("s:%s", s) var m Memory - m.Append([]byte(s)) + m.AppendOne([]byte(s)) r := m.NewReader() seededRand := rand.New(rand.NewSource(time.Now().UnixNano())) var total []byte diff --git a/pkg/util/buffers.go b/pkg/util/buffers.go index dbb3642..3a67022 100644 --- a/pkg/util/buffers.go +++ b/pkg/util/buffers.go @@ -19,16 +19,18 @@ type MemoryReader struct { } func NewReadableBuffersFromBytes(b ...[]byte) *MemoryReader { - buf := NewMemory(b) + buf := &Memory{Buffers: b} + for _, level0 := range b { + buf.Size += len(level0) + } return &MemoryReader{Memory: buf, Length: buf.Size} } -func NewMemory(buffers net.Buffers) *Memory { - ret := &Memory{Buffers: buffers} - for _, level0 := range buffers { - ret.Size += len(level0) +func NewMemory(buf []byte) Memory { + return Memory{ + Buffers: net.Buffers{buf}, + Size: len(buf), } - return ret } func (m *Memory) UpdateBuffer(index int, buf []byte) { @@ -42,7 +44,7 @@ func (m *Memory) UpdateBuffer(index int, buf []byte) { func (m *Memory) CopyFrom(b *Memory) { buf := make([]byte, b.Size) b.CopyTo(buf) - m.Append(buf) + m.AppendOne(buf) } func (m *Memory) CopyTo(buf []byte) { @@ -59,6 +61,11 @@ func (m *Memory) ToBytes() []byte { return buf } +func (m *Memory) AppendOne(b []byte) { + m.Buffers = append(m.Buffers, b) + m.Size += len(b) +} + func (m *Memory) Append(b ...[]byte) { m.Buffers = append(m.Buffers, b...) for _, level0 := range b { @@ -230,13 +237,13 @@ func (r *MemoryReader) ReadBytes(n int) ([]byte, error) { return b[:actual], nil } -func (r *MemoryReader) ReadBE(n int) (num int, err error) { +func (r *MemoryReader) ReadBE(n int) (num uint32, err error) { for i := range n { b, err := r.ReadByte() if err != nil { - return -1, err + return 0, err } - num += int(b) << ((n - i - 1) << 3) + num += uint32(b) << ((n - i - 1) << 3) } return } diff --git a/pkg/util/mem.go b/pkg/util/mem.go index 0bc8592..b8391cc 100644 --- a/pkg/util/mem.go +++ b/pkg/util/mem.go @@ -199,7 +199,7 @@ func (sma *ScalableMemoryAllocator) Malloc(size int) (memory []byte) { return } -func (sma *ScalableMemoryAllocator) GetScalableMemoryAllocator() *ScalableMemoryAllocator { +func (sma *ScalableMemoryAllocator) GetAllocator() *ScalableMemoryAllocator { return sma } @@ -253,6 +253,10 @@ type RecyclableMemory struct { RecycleIndexes []int } +func (r *RecyclableMemory) SetAllocator(allocator *ScalableMemoryAllocator) { + r.ScalableMemoryAllocator = allocator +} + func (r *RecyclableMemory) NextN(size int) (memory []byte) { memory = r.ScalableMemoryAllocator.Malloc(size) if memory == nil { @@ -260,7 +264,7 @@ func (r *RecyclableMemory) NextN(size int) (memory []byte) { } else if r.RecycleIndexes != nil { r.RecycleIndexes = append(r.RecycleIndexes, r.Count()) } - r.Append(memory) + r.AppendOne(memory) return } @@ -268,7 +272,7 @@ func (r *RecyclableMemory) AddRecycleBytes(b []byte) { if r.RecycleIndexes != nil { r.RecycleIndexes = append(r.RecycleIndexes, r.Count()) } - r.Append(b) + r.AppendOne(b) } func (r *RecyclableMemory) RemoveRecycleBytes(index int) (buf []byte) { diff --git a/pkg/util/range.go b/pkg/util/range.go index ca48702..b98839d 100644 --- a/pkg/util/range.go +++ b/pkg/util/range.go @@ -1,6 +1,7 @@ package util import ( + "fmt" "gopkg.in/yaml.v3" "strconv" "strings" @@ -21,8 +22,20 @@ func (r *Range[T]) Valid() bool { return r.Size() >= 0 } -func (r *Range[T]) UnmarshalYAML(value *yaml.Node) error { - ss := strings.Split(value.Value, "-") +func (r *Range[T]) Resolve(s string) error { + ss := strings.Split(s, "-") + if len(ss) == 0 { + return fmt.Errorf("invalid range: %s", s) + } + if len(ss) == 1 { + i64, err := strconv.ParseInt(s, 10, 0) + r[0] = T(i64) + if err != nil { + return err + } + r[1] = r[0] + return nil + } i64, err := strconv.ParseInt(ss[0], 10, 0) if err != nil { return err @@ -35,3 +48,7 @@ func (r *Range[T]) UnmarshalYAML(value *yaml.Node) error { r[1] = T(i64) return nil } + +func (r *Range[T]) UnmarshalYAML(value *yaml.Node) error { + return r.Resolve(value.Value) +} diff --git a/plugin.go b/plugin.go index 745ffbc..9434405 100644 --- a/plugin.go +++ b/plugin.go @@ -304,18 +304,18 @@ func (p *Plugin) Pull(streamPath string, url string, options ...any) (puller *Pu puller.Publish = p.config.Publish puller.PublishTimeout = 0 puller.StreamPath = streamPath + var pullHandler PullHandler for _, option := range options { switch v := option.(type) { case PullHandler: - defer func() { - if err == nil { - puller.Start(v) - } - }() + pullHandler = v } } puller.Init(p, streamPath, &puller.Publish, options...) _, err = p.server.Call(puller) + if err == nil && pullHandler != nil { + err = puller.Start(pullHandler) + } return } diff --git a/plugin/rtmp/pkg/audio.go b/plugin/rtmp/pkg/audio.go index ccc6dcc..338b0e6 100644 --- a/plugin/rtmp/pkg/audio.go +++ b/plugin/rtmp/pkg/audio.go @@ -3,13 +3,15 @@ package rtmp import ( . "m7s.live/m7s/v5/pkg" "m7s.live/m7s/v5/pkg/codec" + "m7s.live/m7s/v5/pkg/util" + "time" ) type RTMPAudio struct { RTMPData } -func (avcc *RTMPAudio) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) { +func (avcc *RTMPAudio) Parse(t *AVTrack) (err error) { reader := avcc.NewReader() var b, b0, b1 byte b, err = reader.ReadByte() @@ -19,7 +21,7 @@ func (avcc *RTMPAudio) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) switch b & 0b1111_0000 >> 4 { case 7: if t.ICodecCtx == nil { - var ctx PCMACtx + var ctx codec.PCMACtx ctx.SampleRate = 8000 ctx.Channels = 1 ctx.SampleSize = 8 @@ -27,7 +29,7 @@ func (avcc *RTMPAudio) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) } case 8: if t.ICodecCtx == nil { - var ctx PCMUCtx + var ctx codec.PCMUCtx ctx.SampleRate = 8000 ctx.Channels = 1 ctx.SampleSize = 8 @@ -38,8 +40,7 @@ func (avcc *RTMPAudio) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) if err != nil { return } - isSeq = b == 0 - if isSeq { + if b == 0 { var ctx AACCtx b0, err = reader.ReadByte() if err != nil { @@ -51,7 +52,7 @@ func (avcc *RTMPAudio) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) } var cloneFrame RTMPAudio cloneFrame.CopyFrom(&avcc.Memory) - ctx.Asc = cloneFrame.Buffers[0] + ctx.Asc = []byte{b0, b1} ctx.AudioObjectType = b0 >> 3 ctx.SamplingFrequencyIndex = (b0 & 0x07 << 1) | (b1 >> 7) ctx.ChannelConfiguration = (b1 >> 3) & 0x0F @@ -68,52 +69,48 @@ func (avcc *RTMPAudio) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) return } -func (avcc *RTMPAudio) DecodeConfig(t *AVTrack, from ICodecCtx) (err error) { +func (avcc *RTMPAudio) ConvertCtx(from codec.ICodecCtx, t *AVTrack) (err error) { switch fourCC := from.FourCC(); fourCC { - case codec.FourCC_ALAW: - var ctx PCMACtx - t.ICodecCtx = &ctx - case codec.FourCC_ULAW: - var ctx PCMUCtx - ctx.SampleRate = 8000 - ctx.Channels = 1 - ctx.SampleSize = 8 - t.ICodecCtx = &ctx case codec.FourCC_MP4A: var ctx AACCtx - ctx.SampleRate = 44100 - ctx.Channels = 2 - ctx.SampleSize = 16 + ctx.AACCtx = *from.GetBase().(*codec.AACCtx) + b0, b1 := ctx.Asc[0], ctx.Asc[1] + ctx.AudioObjectType = b0 >> 3 + ctx.SamplingFrequencyIndex = (b0 & 0x07 << 1) | (b1 >> 7) + ctx.ChannelConfiguration = (b1 >> 3) & 0x0F + ctx.FrameLengthFlag = (b1 >> 2) & 0x01 + ctx.DependsOnCoreCoder = (b1 >> 1) & 0x01 + ctx.ExtensionFlag = b1 & 0x01 t.ICodecCtx = &ctx + default: + t.ICodecCtx = from.GetBase() } return } -func (avcc *RTMPAudio) ToRaw(codecCtx ICodecCtx) (any, error) { +func (avcc *RTMPAudio) Demux(codecCtx codec.ICodecCtx) (raw any, err error) { reader := avcc.NewReader() + var result util.Memory if _, ok := codecCtx.(*AACCtx); ok { - err := reader.Skip(2) - return reader.Memory, err + err = reader.Skip(2) + reader.Range(result.AppendOne) + return result, err } else { - err := reader.Skip(1) - return reader.Memory, err + err = reader.Skip(1) + reader.Range(result.AppendOne) + return result, err } } -func (aac *AACCtx) CreateFrame(*AVFrame) (frame IAVFrame, err error) { - var rtmpAudio RTMPAudio - frame = &rtmpAudio - return -} - -func (g711 *PCMACtx) CreateFrame(*AVFrame) (frame IAVFrame, err error) { - var rtmpAudio RTMPAudio - frame = &rtmpAudio - return -} - -func (g711 *PCMUCtx) CreateFrame(*AVFrame) (frame IAVFrame, err error) { - var rtmpAudio RTMPAudio - frame = &rtmpAudio - return +func (avcc *RTMPAudio) Mux(codecCtx codec.ICodecCtx, from *AVFrame) { + avcc.Timestamp = uint32(from.Timestamp / time.Millisecond) + audioData := from.Raw.(AudioData) + switch c := codecCtx.FourCC(); c { + case codec.FourCC_MP4A: + avcc.AppendOne([]byte{0xAF, 0x01}) + avcc.Append(audioData.Buffers...) + case codec.FourCC_ALAW, codec.FourCC_ULAW: + avcc.AppendOne([]byte{byte(ParseAudioCodec(c))<<4 | (1 << 1)}) + avcc.Append(audioData.Buffers...) + } } diff --git a/plugin/rtmp/pkg/codec.go b/plugin/rtmp/pkg/codec.go index 33a37f5..7719413 100644 --- a/plugin/rtmp/pkg/codec.go +++ b/plugin/rtmp/pkg/codec.go @@ -47,12 +47,7 @@ type ( InitialPresentationDelayPresent byte InitialPresentationDelayMinusOne byte } - PCMACtx struct { - codec.PCMACtx - } - PCMUCtx struct { - codec.PCMUCtx - } + AACCtx struct { codec.AACCtx AudioSpecificConfig @@ -217,7 +212,7 @@ func (ctx *H264Ctx) Unmarshal(b *util.MemoryReader) (err error) { if err1 != nil { return err1 } - spsbytes, err2 := b.ReadBytes(spslen) + spsbytes, err2 := b.ReadBytes(int(spslen)) if err2 != nil { return err2 } @@ -239,7 +234,7 @@ func (ctx *H264Ctx) Unmarshal(b *util.MemoryReader) (err error) { if err1 != nil { return err1 } - ppsbytes, err2 := b.ReadBytes(ppslen) + ppsbytes, err2 := b.ReadBytes(int(ppslen)) if err2 != nil { return err2 } @@ -292,7 +287,7 @@ func (ctx *H265Ctx) Unmarshal(b *util.MemoryReader) (err error) { if err != nil { return ErrHevc } - vps, err := b.ReadBytes(vpslen) + vps, err := b.ReadBytes(int(vpslen)) if err != nil { return ErrHevc } @@ -311,7 +306,7 @@ func (ctx *H265Ctx) Unmarshal(b *util.MemoryReader) (err error) { if err != nil { return ErrHevc } - sps, err := b.ReadBytes(spslen) + sps, err := b.ReadBytes(int(spslen)) if err != nil { return ErrHevc } @@ -334,7 +329,7 @@ func (ctx *H265Ctx) Unmarshal(b *util.MemoryReader) (err error) { if err != nil { return ErrHevc } - pps, err := b.ReadBytes(ppslen) + pps, err := b.ReadBytes(int(ppslen)) if err != nil { return ErrHevc } @@ -765,14 +760,6 @@ func (p *AV1Ctx) Unmarshal(data *util.MemoryReader) (err error) { return nil } -func (PCMACtx) GetInfo() string { - return "pcma" -} - -func (PCMUCtx) GetInfo() string { - return "pcmu" -} - func (ctx *AACCtx) GetInfo() string { return fmt.Sprintf("AudioObjectType: %d, SamplingFrequencyIndex: %d, ChannelConfiguration: %d, FrameLengthFlag: %d, DependsOnCoreCoder: %d, ExtensionFlag: %d", ctx.AudioObjectType, ctx.SamplingFrequencyIndex, ctx.ChannelConfiguration, ctx.FrameLengthFlag, ctx.DependsOnCoreCoder, ctx.ExtensionFlag) } diff --git a/plugin/rtmp/pkg/const.go b/plugin/rtmp/pkg/const.go index caa8570..0c19bdf 100644 --- a/plugin/rtmp/pkg/const.go +++ b/plugin/rtmp/pkg/const.go @@ -51,6 +51,10 @@ func (avcc *RTMPData) GetTimestamp() time.Duration { return time.Duration(avcc.Timestamp) * time.Millisecond } +func (avcc *RTMPData) GetCTS() time.Duration { + return 0 +} + func (avcc *RTMPData) WrapAudio() *RTMPAudio { return &RTMPAudio{RTMPData: *avcc} } diff --git a/plugin/rtmp/pkg/video.go b/plugin/rtmp/pkg/video.go index 73d96e6..ac6d21e 100644 --- a/plugin/rtmp/pkg/video.go +++ b/plugin/rtmp/pkg/video.go @@ -3,6 +3,7 @@ package rtmp import ( "context" "encoding/binary" + "io" "time" . "m7s.live/m7s/v5/pkg" @@ -14,9 +15,18 @@ var _ IAVFrame = (*RTMPVideo)(nil) type RTMPVideo struct { RTMPData + CTS uint32 } -func (avcc *RTMPVideo) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) { +func (avcc *RTMPVideo) GetCTS() time.Duration { + return time.Duration(avcc.CTS) * time.Millisecond +} + +func (avcc *RTMPVideo) Parse(t *AVTrack) (err error) { + if avcc.Size <= 10 { + err = io.ErrShortBuffer + return + } reader := avcc.NewReader() var b0 byte b0, err = reader.ReadByte() @@ -24,12 +34,11 @@ func (avcc *RTMPVideo) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) return } enhanced := b0&0b1000_0000 != 0 // https://veovera.github.io/enhanced-rtmp/docs/enhanced/enhanced-rtmp-v1.pdf - isIDR = b0&0b0111_0000>>4 == 1 + t.Value.IDR = b0&0b0111_0000>>4 == 1 packetType := b0 & 0b1111 var fourCC codec.FourCC parseSequence := func() (err error) { - isSeq = true - isIDR = false + t.Value.IDR = false var cloneFrame RTMPVideo cloneFrame.CopyFrom(&avcc.Memory) switch fourCC { @@ -101,10 +110,10 @@ func (avcc *RTMPVideo) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) return } -func (avcc *RTMPVideo) DecodeConfig(t *AVTrack, from ICodecCtx) (err error) { +func (avcc *RTMPVideo) ConvertCtx(from codec.ICodecCtx, t *AVTrack) (err error) { switch fourCC := from.FourCC(); fourCC { case codec.FourCC_H264: - h264ctx := from.(codec.IH264Ctx).GetH264Ctx() + h264ctx := from.GetBase().(*codec.H264Ctx) var ctx H264Ctx ctx.H264Ctx = *h264ctx lenSPS := len(h264ctx.SPS[0]) @@ -125,7 +134,7 @@ func (avcc *RTMPVideo) DecodeConfig(t *AVTrack, from ICodecCtx) (err error) { b.Write(h264ctx.PPS[0]) t.ICodecCtx = &ctx var seqFrame RTMPData - seqFrame.Append(b) + seqFrame.AppendOne(b) t.SequenceFrame = seqFrame.WrapVideo() if t.Enabled(context.TODO(), TraceLevel) { c := t.FourCC().String() @@ -133,8 +142,9 @@ func (avcc *RTMPVideo) DecodeConfig(t *AVTrack, from ICodecCtx) (err error) { data := seqFrame.String() t.Trace("decConfig", "codec", c, "size", size, "data", data) } + case codec.FourCC_H265: + // TODO: H265 } - return } @@ -143,9 +153,8 @@ func (avcc *RTMPVideo) parseH264(ctx *H264Ctx, reader *util.MemoryReader) (any, if err != nil { return nil, err } + avcc.CTS = cts var nalus Nalus - nalus.PTS = time.Duration(avcc.Timestamp+uint32(cts)) * 90 - nalus.DTS = time.Duration(avcc.Timestamp) * 90 if err := nalus.ParseAVCC(reader, ctx.NalulenSize); err != nil { return nalus, err } @@ -157,9 +166,8 @@ func (avcc *RTMPVideo) parseH265(ctx *H265Ctx, reader *util.MemoryReader) (any, if err != nil { return nil, err } + avcc.CTS = cts var nalus Nalus - nalus.PTS = time.Duration(avcc.Timestamp+uint32(cts)) * 90 - nalus.DTS = time.Duration(avcc.Timestamp) * 90 if err := nalus.ParseAVCC(reader, ctx.NalulenSize); err != nil { return nalus, err } @@ -168,14 +176,13 @@ func (avcc *RTMPVideo) parseH265(ctx *H265Ctx, reader *util.MemoryReader) (any, func (avcc *RTMPVideo) parseAV1(reader *util.MemoryReader) (any, error) { var obus OBUs - obus.PTS = time.Duration(avcc.Timestamp) * 90 if err := obus.ParseAVCC(reader); err != nil { return obus, err } return obus, nil } -func (avcc *RTMPVideo) ToRaw(codecCtx ICodecCtx) (any, error) { +func (avcc *RTMPVideo) Demux(codecCtx codec.ICodecCtx) (any, error) { reader := avcc.NewReader() b0, err := reader.ReadByte() if err != nil { @@ -217,24 +224,13 @@ func (avcc *RTMPVideo) ToRaw(codecCtx ICodecCtx) (any, error) { var nalus Nalus if codecCtx.FourCC() == codec.FourCC_H265 { var ctx = codecCtx.(*H265Ctx) - var spsM util.Memory - spsM.Append(ctx.SPS[0]) - var ppsM util.Memory - ppsM.Append(ctx.PPS[0]) - var vpsM util.Memory - vpsM.Append(ctx.VPS[0]) - nalus.PTS = time.Duration(avcc.Timestamp) * 90 - nalus.DTS = time.Duration(avcc.Timestamp) * 90 - nalus.Nalus = append(nalus.Nalus, spsM, ppsM, vpsM) + nalus.Append(ctx.SPS[0]) + nalus.Append(ctx.PPS[0]) + nalus.Append(ctx.VPS[0]) } else { var ctx = codecCtx.(*H264Ctx) - var spsM util.Memory - spsM.Append(ctx.SPS[0]) - var ppsM util.Memory - ppsM.Append(ctx.PPS[0]) - nalus.PTS = time.Duration(avcc.Timestamp) * 90 - nalus.DTS = time.Duration(avcc.Timestamp) * 90 - nalus.Nalus = append(nalus.Nalus, spsM, ppsM) + nalus.Append(ctx.SPS[0]) + nalus.Append(ctx.PPS[0]) } return nalus, nil } else { @@ -247,33 +243,24 @@ func (avcc *RTMPVideo) ToRaw(codecCtx ICodecCtx) (any, error) { } return nil, nil } -func createH26xFrame(from *AVFrame, codecID VideoCodecID) (frame IAVFrame, err error) { - var rtmpVideo RTMPVideo - rtmpVideo.Timestamp = uint32(from.Timestamp / time.Millisecond) - rtmpVideo.ScalableMemoryAllocator = from.Wraps[0].GetScalableMemoryAllocator() - nalus := from.Raw.(Nalus) - rtmpVideo.RecycleIndexes = make([]int, 0, len(nalus.Nalus)) // Recycle partial data - head := rtmpVideo.NextN(5) - head[0] = util.Conditoinal[byte](from.IDR, 0x10, 0x20) | byte(codecID) - head[1] = 1 - util.PutBE(head[2:5], (nalus.PTS-nalus.DTS)/90) // cts - for _, nalu := range nalus.Nalus { - naluLenM := rtmpVideo.NextN(4) - naluLen := uint32(nalu.Size) - binary.BigEndian.PutUint32(naluLenM, naluLen) - rtmpVideo.Append(nalu.Buffers...) + +func (avcc *RTMPVideo) Mux(codecCtx codec.ICodecCtx, from *AVFrame) { + avcc.Timestamp = uint32(from.Timestamp / time.Millisecond) + switch ctx := codecCtx.(type) { + case *AV1Ctx: + panic(ctx) + default: + nalus := from.Raw.(Nalus) + avcc.RecycleIndexes = make([]int, 0, len(nalus)) // Recycle partial data + head := avcc.NextN(5) + head[0] = util.Conditoinal[byte](from.IDR, 0x10, 0x20) | byte(ParseVideoCodec(codecCtx.FourCC())) + head[1] = 1 + util.PutBE(head[2:5], from.CTS/time.Millisecond) // cts + for _, nalu := range nalus { + naluLenM := avcc.NextN(4) + naluLen := uint32(nalu.Size) + binary.BigEndian.PutUint32(naluLenM, naluLen) + avcc.Append(nalu.Buffers...) + } } - frame = &rtmpVideo - return -} -func (h264 *H264Ctx) CreateFrame(from *AVFrame) (frame IAVFrame, err error) { - return createH26xFrame(from, ParseVideoCodec(h264.FourCC())) -} - -func (h265 *H265Ctx) CreateFrame(from *AVFrame) (frame IAVFrame, err error) { - return createH26xFrame(from, ParseVideoCodec(h265.FourCC())) -} - -func (av1 *AV1Ctx) CreateFrame(*AVFrame) (frame IAVFrame, err error) { - return } diff --git a/plugin/rtp/pkg/audio.go b/plugin/rtp/pkg/audio.go index 490bf83..b273dc1 100644 --- a/plugin/rtp/pkg/audio.go +++ b/plugin/rtp/pkg/audio.go @@ -5,7 +5,10 @@ import ( "encoding/binary" "encoding/hex" "fmt" + "github.com/bluenviron/mediacommon/pkg/bits" "io" + "regexp" + "strings" "time" "unsafe" @@ -16,6 +19,8 @@ import ( "m7s.live/m7s/v5/pkg/util" ) +var configRegexp = regexp.MustCompile(`config=(.+),([^;]+)(;|$)`) + type RTPData struct { *webrtc.RTPCodecParameters Packets []*rtp.Packet @@ -48,6 +53,10 @@ func (r *RTPData) GetTimestamp() time.Duration { return time.Duration(r.Packets[0].Timestamp) * time.Second / time.Duration(r.ClockRate) } +func (r *RTPData) GetCTS() time.Duration { + return 0 +} + func (r *RTPData) GetSize() (s int) { for _, p := range r.Packets { s += p.MarshalSize() @@ -58,6 +67,7 @@ func (r *RTPData) GetSize() (s int) { type ( RTPCtx struct { webrtc.RTPCodecParameters + Fmtp map[string]string SequenceNumber uint16 SSRC uint32 } @@ -76,12 +86,29 @@ type ( RTPAACCtx struct { RTPCtx codec.AACCtx + SizeLength int // 通常为13 + IndexLength int + IndexDeltaLength int } IRTPCtx interface { GetRTPCodecParameter() webrtc.RTPCodecParameters } ) +func (r *RTPCtx) parseFmtpLine(cp *webrtc.RTPCodecParameters) { + r.RTPCodecParameters = *cp + r.Fmtp = make(map[string]string) + kvs := strings.Split(r.SDPFmtpLine, ";") + for _, kv := range kvs { + if kv = strings.TrimSpace(kv); kv == "" { + continue + } + if key, value, found := strings.Cut(kv, "="); found { + r.Fmtp[strings.TrimSpace(key)] = strings.TrimSpace(value) + } + } +} + func (r *RTPCtx) GetInfo() string { return r.GetRTPCodecParameter().SDPFmtpLine } @@ -94,11 +121,27 @@ func (r *RTPCtx) GetSequenceFrame() IAVFrame { return nil } -func (r *RTPData) DecodeConfig(t *AVTrack, from ICodecCtx) (err error) { - switch c := from.(type) { - case codec.IH264Ctx: +func (r *RTPData) Append(ctx *RTPCtx, ts uint32, payload []byte) (lastPacket *rtp.Packet) { + ctx.SequenceNumber++ + lastPacket = &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + SequenceNumber: ctx.SequenceNumber, + Timestamp: ts, + SSRC: ctx.SSRC, + PayloadType: uint8(ctx.PayloadType), + }, + Payload: payload, + } + r.Packets = append(r.Packets, lastPacket) + return +} + +func (r *RTPData) ConvertCtx(from codec.ICodecCtx, t *AVTrack) (err error) { + switch from.FourCC() { + case codec.FourCC_H264: var ctx RTPH264Ctx - ctx.H264Ctx = *c.GetH264Ctx() + ctx.H264Ctx = *from.GetBase().(*codec.H264Ctx) ctx.PayloadType = 96 ctx.MimeType = webrtc.MimeTypeH264 ctx.ClockRate = 90000 @@ -106,24 +149,52 @@ func (r *RTPData) DecodeConfig(t *AVTrack, from ICodecCtx) (err error) { ctx.SDPFmtpLine = fmt.Sprintf("sprop-parameter-sets=%s,%s;profile-level-id=%02x%02x%02x;level-asymmetry-allowed=1;packetization-mode=1", base64.StdEncoding.EncodeToString(ctx.SPS[0]), base64.StdEncoding.EncodeToString(ctx.PPS[0]), spsInfo.ProfileIdc, spsInfo.ConstraintSetFlag, spsInfo.LevelIdc) ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx))) t.ICodecCtx = &ctx - case codec.IH265Ctx: + case codec.FourCC_H265: var ctx RTPH265Ctx - ctx.H265Ctx = *c.GetH265Ctx() + ctx.H265Ctx = *from.GetBase().(*codec.H265Ctx) ctx.PayloadType = 98 ctx.MimeType = webrtc.MimeTypeH265 ctx.SDPFmtpLine = fmt.Sprintf("profile-id=1;sprop-sps=%s;sprop-pps=%s;sprop-vps=%s", base64.StdEncoding.EncodeToString(ctx.SPS[0]), base64.StdEncoding.EncodeToString(ctx.PPS[0]), base64.StdEncoding.EncodeToString(ctx.VPS[0])) ctx.ClockRate = 90000 ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx))) t.ICodecCtx = &ctx - case codec.IAACCtx: + case codec.FourCC_MP4A: var ctx RTPAACCtx ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx))) - ctx.AACCtx = *c.GetAACCtx() + ctx.AACCtx = *from.GetBase().(*codec.AACCtx) ctx.MimeType = "audio/MPEG4-GENERIC" - ctx.SDPFmtpLine = fmt.Sprintf("profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=%s", hex.EncodeToString(ctx.AACCtx.Asc)) + ctx.SDPFmtpLine = fmt.Sprintf("profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=%s", hex.EncodeToString(ctx.AACCtx.Asc)) + ctx.IndexLength = 3 + ctx.IndexDeltaLength = 3 + ctx.SizeLength = 13 + ctx.RTPCtx.Channels = uint16(ctx.AACCtx.Channels) ctx.PayloadType = 97 ctx.ClockRate = uint32(ctx.SampleRate) t.ICodecCtx = &ctx + case codec.FourCC_ALAW: + var ctx RTPPCMACtx + ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx))) + ctx.PCMACtx = *from.GetBase().(*codec.PCMACtx) + ctx.MimeType = webrtc.MimeTypePCMA + ctx.PayloadType = 8 + ctx.ClockRate = uint32(ctx.SampleRate) + t.ICodecCtx = &ctx + case codec.FourCC_ULAW: + var ctx RTPPCMUCtx + ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx))) + ctx.PCMUCtx = *from.GetBase().(*codec.PCMUCtx) + ctx.MimeType = webrtc.MimeTypePCMU + ctx.PayloadType = 0 + ctx.ClockRate = uint32(ctx.SampleRate) + t.ICodecCtx = &ctx + case codec.FourCC_OPUS: + var ctx RTPOPUSCtx + ctx.SSRC = uint32(uintptr(unsafe.Pointer(&ctx))) + ctx.OPUSCtx = *from.GetBase().(*codec.OPUSCtx) + ctx.MimeType = webrtc.MimeTypeOpus + ctx.PayloadType = 111 + ctx.ClockRate = uint32(ctx.SampleRate) + t.ICodecCtx = &ctx } return } @@ -132,32 +203,207 @@ type RTPAudio struct { RTPData } -func (r *RTPAudio) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) { +func (r *RTPAudio) Parse(t *AVTrack) (err error) { switch r.MimeType { case webrtc.MimeTypeOpus: var ctx RTPOPUSCtx - ctx.RTPCodecParameters = *r.RTPCodecParameters + ctx.parseFmtpLine(r.RTPCodecParameters) t.ICodecCtx = &ctx case webrtc.MimeTypePCMA: var ctx RTPPCMACtx - ctx.RTPCodecParameters = *r.RTPCodecParameters + ctx.parseFmtpLine(r.RTPCodecParameters) t.ICodecCtx = &ctx case webrtc.MimeTypePCMU: var ctx RTPPCMUCtx - ctx.RTPCodecParameters = *r.RTPCodecParameters + ctx.parseFmtpLine(r.RTPCodecParameters) t.ICodecCtx = &ctx case "audio/MPEG4-GENERIC": - var ctx RTPAACCtx - ctx.RTPCodecParameters = *r.RTPCodecParameters - t.ICodecCtx = &ctx + var ctx *RTPAACCtx + if t.ICodecCtx != nil { + ctx = t.ICodecCtx.(*RTPAACCtx) + } else { + ctx = &RTPAACCtx{} + ctx.parseFmtpLine(r.RTPCodecParameters) + ctx.IndexLength = 3 + ctx.IndexDeltaLength = 3 + ctx.SizeLength = 13 + if conf, ok := ctx.Fmtp["config"]; ok { + if ctx.AACCtx.Asc, err = hex.DecodeString(conf); err == nil { + ctx.SampleRate = int(r.ClockRate) + ctx.Channels = int(r.Channels) + ctx.SampleSize = 16 + } + } + t.ICodecCtx = ctx + } } return } -func (ctx *RTPCtx) CreateFrame(*AVFrame) (IAVFrame, error) { - panic("unimplemented") +func (r *RTPAudio) Demux(codexCtx codec.ICodecCtx) (any, error) { + var data AudioData + switch codexCtx.(type) { + case *RTPAACCtx: + var fragments util.Memory + for _, packet := range r.Packets { + if len(packet.Payload) < 2 { + continue + } + auHeaderLen := util.ReadBE[int](packet.Payload[:2]) + if auHeaderLen == 0 { + data.AppendOne(packet.Payload) + } else { + dataLens, err := r.readAUHeaders(codexCtx.(*RTPAACCtx), packet.Payload[2:], auHeaderLen) + if err != nil { + return nil, err + } + payload := packet.Payload[2:] + pos := auHeaderLen >> 3 + if (auHeaderLen % 8) != 0 { + pos++ + } + payload = payload[pos:] + if fragments.Size == 0 { + if packet.Marker { + for _, dataLen := range dataLens { + if len(payload) < int(dataLen) { + return nil, fmt.Errorf("invalid data len %d", dataLen) + } + data.AppendOne(payload[:dataLen]) + payload = payload[dataLen:] + } + } else { + if len(dataLens) != 1 { + return nil, fmt.Errorf("a fragmented packet can only contain one AU") + } + fragments.AppendOne(payload) + } + } else { + if len(dataLens) != 1 { + return nil, fmt.Errorf("a fragmented packet can only contain one AU") + } + fragments.AppendOne(payload) + if !packet.Header.Marker { + continue + } + if uint64(fragments.Size) != dataLens[0] { + return nil, fmt.Errorf("fragmented AU size is not correct %d != %d", dataLens[0], fragments.Size) + } + data.Append(fragments.Buffers...) + fragments = util.Memory{} + } + } + break + } + default: + for _, packet := range r.Packets { + data.AppendOne(packet.Payload) + } + } + return data, nil } -func (r *RTPAudio) ToRaw(ICodecCtx) (any, error) { - return nil, nil +func (r *RTPAudio) Mux(codexCtx codec.ICodecCtx, from *AVFrame) { + data := from.Raw.(AudioData) + var ctx *RTPCtx + var lastPacket *rtp.Packet + switch c := codexCtx.(type) { + case *RTPAACCtx: + ctx = &c.RTPCtx + pts := uint32(from.Timestamp * time.Duration(ctx.ClockRate) / time.Second) + //AU_HEADER_LENGTH,因为单位是bit, 除以8就是auHeader的字节长度;又因为单个auheader字节长度2字节,所以再除以2就是auheader的个数。 + auHeaderLen := []byte{0x00, 0x10, (byte)((r.Size & 0x1fe0) >> 5), (byte)((r.Size & 0x1f) << 3)} // 3 = 16-13, 5 = 8-3 + for reader := data.NewReader(); reader.Length > 0; { + payloadLen := MTUSize + if reader.Length+4 < MTUSize { + payloadLen = reader.Length + 4 + } + mem := r.NextN(payloadLen) + copy(mem, auHeaderLen) + reader.ReadBytesTo(mem[4:]) + lastPacket = r.Append(ctx, pts, mem) + } + lastPacket.Header.Marker = true + return + case *RTPPCMACtx: + ctx = &c.RTPCtx + case *RTPPCMUCtx: + ctx = &c.RTPCtx + } + pts := uint32(from.Timestamp * time.Duration(ctx.ClockRate) / time.Second) + if reader := data.NewReader(); reader.Length > MTUSize { + for reader.Length > 0 { + payloadLen := MTUSize + if reader.Length < MTUSize { + payloadLen = reader.Length + } + mem := r.NextN(payloadLen) + reader.ReadBytesTo(mem) + lastPacket = r.Append(ctx, pts, mem) + } + } else { + mem := r.NextN(reader.Length) + reader.ReadBytesTo(mem) + lastPacket = r.Append(ctx, pts, mem) + } + lastPacket.Header.Marker = true +} + +func (r *RTPAudio) readAUHeaders(ctx *RTPAACCtx, buf []byte, headersLen int) ([]uint64, error) { + firstRead := false + + count := 0 + for i := 0; i < headersLen; { + if i == 0 { + i += ctx.SizeLength + i += ctx.IndexLength + } else { + i += ctx.SizeLength + i += ctx.IndexDeltaLength + } + count++ + } + + dataLens := make([]uint64, count) + + pos := 0 + i := 0 + + for headersLen > 0 { + dataLen, err := bits.ReadBits(buf, &pos, ctx.SizeLength) + if err != nil { + return nil, err + } + headersLen -= ctx.SizeLength + + if !firstRead { + firstRead = true + if ctx.IndexLength > 0 { + auIndex, err := bits.ReadBits(buf, &pos, ctx.IndexLength) + if err != nil { + return nil, err + } + headersLen -= ctx.IndexLength + + if auIndex != 0 { + return nil, fmt.Errorf("AU-index different than zero is not supported") + } + } + } else if ctx.IndexDeltaLength > 0 { + auIndexDelta, err := bits.ReadBits(buf, &pos, ctx.IndexDeltaLength) + if err != nil { + return nil, err + } + headersLen -= ctx.IndexDeltaLength + + if auIndexDelta != 0 { + return nil, fmt.Errorf("AU-index-delta different than zero is not supported") + } + } + + dataLens[i] = dataLen + i++ + } + + return dataLens, nil } diff --git a/plugin/rtp/pkg/video.go b/plugin/rtp/pkg/video.go index 59df3b8..b6a4ff6 100644 --- a/plugin/rtp/pkg/video.go +++ b/plugin/rtp/pkg/video.go @@ -4,7 +4,7 @@ import ( "encoding/base64" "errors" "fmt" - "regexp" + "strings" "time" "github.com/pion/rtp" @@ -36,11 +36,10 @@ type ( ) var ( - _ IAVFrame = (*RTPVideo)(nil) - _ IVideoCodecCtx = (*RTPH264Ctx)(nil) - _ IVideoCodecCtx = (*RTPH265Ctx)(nil) - _ IVideoCodecCtx = (*RTPAV1Ctx)(nil) - spropReg = regexp.MustCompile(`sprop-parameter-sets=(.+),([^;]+)(;|$)`) + _ IAVFrame = (*RTPVideo)(nil) + _ IVideoCodecCtx = (*RTPH264Ctx)(nil) + _ IVideoCodecCtx = (*RTPH265Ctx)(nil) + _ IVideoCodecCtx = (*RTPAV1Ctx)(nil) ) const ( @@ -49,7 +48,7 @@ const ( MTUSize = 1460 ) -func (r *RTPVideo) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) { +func (r *RTPVideo) Parse(t *AVTrack) (err error) { switch r.MimeType { case webrtc.MimeTypeH264: var ctx *RTPH264Ctx @@ -57,47 +56,42 @@ func (r *RTPVideo) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) { ctx = t.ICodecCtx.(*RTPH264Ctx) } else { ctx = &RTPH264Ctx{} + ctx.parseFmtpLine(r.RTPCodecParameters) //packetization-mode=1; sprop-parameter-sets=J2QAKaxWgHgCJ+WagICAgQ==,KO48sA==; profile-level-id=640029 - ctx.RTPCodecParameters = *r.RTPCodecParameters - if match := spropReg.FindStringSubmatch(ctx.SDPFmtpLine); len(match) > 2 { - if sps, err := base64.StdEncoding.DecodeString(match[1]); err == nil { - ctx.SPS = [][]byte{sps} - } - if pps, err := base64.StdEncoding.DecodeString(match[2]); err == nil { - ctx.PPS = [][]byte{pps} + if sprop, ok := ctx.Fmtp["sprop-parameter-sets"]; ok { + if sprops := strings.Split(sprop, ","); len(sprops) == 2 { + if sps, err := base64.StdEncoding.DecodeString(sprops[0]); err == nil { + ctx.SPS = [][]byte{sps} + } + if pps, err := base64.StdEncoding.DecodeString(sprops[1]); err == nil { + ctx.PPS = [][]byte{pps} + } } } t.ICodecCtx = ctx } - raw, err = r.ToRaw(ctx) - if err != nil { + if t.Value.Raw, err = r.Demux(ctx); err != nil { return } - nalus := raw.(Nalus) - for _, nalu := range nalus.Nalus { + for _, nalu := range t.Value.Raw.(Nalus) { switch codec.ParseH264NALUType(nalu.Buffers[0][0]) { case codec.NALU_SPS: - ctx = &RTPH264Ctx{} ctx.SPS = [][]byte{nalu.ToBytes()} if err = ctx.SPSInfo.Unmarshal(ctx.SPS[0]); err != nil { return } - ctx.RTPCodecParameters = *r.RTPCodecParameters - t.ICodecCtx = ctx case codec.NALU_PPS: ctx.PPS = [][]byte{nalu.ToBytes()} case codec.NALU_IDR_Picture: - isIDR = true + t.Value.IDR = true } } case webrtc.MimeTypeVP9: // var ctx RTPVP9Ctx - // ctx.FourCC = codec.FourCC_VP9 // ctx.RTPCodecParameters = *r.RTPCodecParameters // codecCtx = &ctx case webrtc.MimeTypeAV1: // var ctx RTPAV1Ctx - // ctx.FourCC = codec.FourCC_AV1 // ctx.RTPCodecParameters = *r.RTPCodecParameters // codecCtx = &ctx case webrtc.MimeTypeH265: @@ -106,15 +100,13 @@ func (r *RTPVideo) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) { ctx = t.ICodecCtx.(*RTPH265Ctx) } else { ctx = &RTPH265Ctx{} - ctx.RTPCodecParameters = *r.RTPCodecParameters + ctx.parseFmtpLine(r.RTPCodecParameters) t.ICodecCtx = ctx } - raw, err = r.ToRaw(ctx) - if err != nil { + if t.Value.Raw, err = r.Demux(ctx); err != nil { return } - nalus := raw.(Nalus) - for _, nalu := range nalus.Nalus { + for _, nalu := range t.Value.Raw.(Nalus) { switch codec.ParseH265NALUType(nalu.Buffers[0][0]) { case codec.NAL_UNIT_SPS: ctx = &RTPH265Ctx{} @@ -132,13 +124,9 @@ func (r *RTPVideo) Parse(t *AVTrack) (isIDR, isSeq bool, raw any, err error) { codec.NAL_UNIT_CODED_SLICE_IDR, codec.NAL_UNIT_CODED_SLICE_IDR_N_LP, codec.NAL_UNIT_CODED_SLICE_CRA: - isIDR = true + t.Value.IDR = true } } - case "audio/MPEG4-GENERIC", "audio/AAC": - var ctx RTPAACCtx - ctx.RTPCodecParameters = *r.RTPCodecParameters - t.ICodecCtx = &ctx default: err = ErrUnsupportCodec } @@ -153,65 +141,57 @@ func (h265 *RTPH265Ctx) GetInfo() string { return h265.SDPFmtpLine } -func (h264 *RTPH264Ctx) CreateFrame(from *AVFrame) (frame IAVFrame, err error) { - var r RTPVideo - r.RTPCodecParameters = &h264.RTPCodecParameters - if len(from.Wraps) > 0 { - r.ScalableMemoryAllocator = from.Wraps[0].GetScalableMemoryAllocator() - } - nalus := from.Raw.(Nalus) - var lastPacket *rtp.Packet - createPacket := func(payload []byte) *rtp.Packet { - h264.SequenceNumber++ - lastPacket = &rtp.Packet{ - Header: rtp.Header{ - Version: 2, - SequenceNumber: h264.SequenceNumber, - Timestamp: uint32(nalus.PTS), - SSRC: h264.SSRC, - PayloadType: uint8(h264.PayloadType), - }, - Payload: payload, - } - return lastPacket - } - if nalus.H264Type() == codec.NALU_IDR_Picture && len(h264.SPS) > 0 && len(h264.PPS) > 0 { - r.Packets = append(r.Packets, createPacket(h264.SPS[0]), createPacket(h264.PPS[0])) - } - for _, nalu := range nalus.Nalus { - if reader := nalu.NewReader(); reader.Length > MTUSize { - //fu-a - mem := r.Malloc(MTUSize) - n := reader.ReadBytesTo(mem[1:]) - fuaHead := codec.NALU_FUA.Or(mem[1] & 0x60) - mem[0] = fuaHead - naluType := mem[1] & 0x1f - mem[1] = naluType | startBit - r.FreeRest(&mem, n+1) - r.AddRecycleBytes(mem) - r.Packets = append(r.Packets, createPacket(mem)) - for reader.Length > 0 { - mem = r.Malloc(MTUSize) - n = reader.ReadBytesTo(mem[2:]) - mem[0] = fuaHead - mem[1] = naluType - r.FreeRest(&mem, n+2) - r.AddRecycleBytes(mem) - r.Packets = append(r.Packets, createPacket(mem)) - } - lastPacket.Payload[1] |= endBit - } else { - mem := r.NextN(reader.Length) - reader.ReadBytesTo(mem) - r.Packets = append(r.Packets, createPacket(mem)) - } - } - frame = &r - lastPacket.Header.Marker = true - return +func (av1 *RTPAV1Ctx) GetInfo() string { + return av1.SDPFmtpLine } -func (r *RTPVideo) ToRaw(ictx ICodecCtx) (any, error) { +func (r *RTPVideo) GetCTS() time.Duration { + return 0 +} + +func (r *RTPVideo) Mux(codecCtx codec.ICodecCtx, from *AVFrame) { + pts := uint32((from.Timestamp + from.CTS) * 90 / time.Millisecond) + switch c := codecCtx.(type) { + case *RTPH264Ctx: + ctx := &c.RTPCtx + r.RTPCodecParameters = &ctx.RTPCodecParameters + var lastPacket *rtp.Packet + if from.IDR && len(c.SPS) > 0 && len(c.PPS) > 0 { + r.Append(ctx, pts, c.SPS[0]) + r.Append(ctx, pts, c.PPS[0]) + } + for _, nalu := range from.Raw.(Nalus) { + if reader := nalu.NewReader(); reader.Length > MTUSize { + payloadLen := MTUSize + if reader.Length+1 < payloadLen { + payloadLen = reader.Length + 1 + } + //fu-a + mem := r.NextN(payloadLen) + reader.ReadBytesTo(mem[1:]) + fuaHead, naluType := codec.NALU_FUA.Or(mem[1]&0x60), mem[1]&0x1f + mem[0], mem[1] = fuaHead, naluType|startBit + lastPacket = r.Append(ctx, pts, mem) + for payloadLen = MTUSize; reader.Length > 0; lastPacket = r.Append(ctx, pts, mem) { + if reader.Length+2 < payloadLen { + payloadLen = reader.Length + 2 + } + mem = r.NextN(payloadLen) + reader.ReadBytesTo(mem[2:]) + mem[0], mem[1] = fuaHead, naluType + } + lastPacket.Payload[1] |= endBit + } else { + mem := r.NextN(reader.Length) + reader.ReadBytesTo(mem) + lastPacket = r.Append(ctx, pts, mem) + } + } + lastPacket.Header.Marker = true + } +} + +func (r *RTPVideo) Demux(ictx codec.ICodecCtx) (any, error) { switch ictx.(type) { case *RTPH264Ctx: var nalus Nalus @@ -219,17 +199,14 @@ func (r *RTPVideo) ToRaw(ictx ICodecCtx) (any, error) { var naluType codec.H264NALUType gotNalu := func() { if nalu.Size > 0 { - nalus.Nalus = append(nalus.Nalus, nalu) + nalus = append(nalus, nalu) nalu = util.Memory{} } } for _, packet := range r.Packets { - nalus.PTS = time.Duration(packet.Timestamp) - // TODO: B-frame - nalus.DTS = nalus.PTS b0 := packet.Payload[0] if t := codec.ParseH264NALUType(b0); t < 24 { - nalu.Append(packet.Payload) + nalu.AppendOne(packet.Payload) gotNalu() } else { offset := t.Offset() @@ -240,7 +217,7 @@ func (r *RTPVideo) ToRaw(ictx ICodecCtx) (any, error) { } for buffer := util.Buffer(packet.Payload[offset:]); buffer.CanRead(); { if nextSize := int(buffer.ReadUint16()); buffer.Len() >= nextSize { - nalu.Append(buffer.ReadN(nextSize)) + nalu.AppendOne(buffer.ReadN(nextSize)) gotNalu() } else { return nil, fmt.Errorf("invalid nalu size %d", nextSize) @@ -250,10 +227,10 @@ func (r *RTPVideo) ToRaw(ictx ICodecCtx) (any, error) { b1 := packet.Payload[1] if util.Bit1(b1, 0) { naluType.Parse(b1) - nalu.Append([]byte{naluType.Or(b0 & 0x60)}) + nalu.AppendOne([]byte{naluType.Or(b0 & 0x60)}) } if nalu.Size > 0 { - nalu.Append(packet.Payload[offset:]) + nalu.AppendOne(packet.Payload[offset:]) } else { return nil, errors.New("fu have no start") } diff --git a/plugin/rtp/pkg/video_test.go b/plugin/rtp/pkg/video_test.go index b97626f..283c31e 100644 --- a/plugin/rtp/pkg/video_test.go +++ b/plugin/rtp/pkg/video_test.go @@ -24,22 +24,16 @@ func TestRTPH264Ctx_CreateFrame(t *testing.T) { var avFrame = &pkg.AVFrame{} var mem util.Memory mem.Append([]byte(randStr)) - avFrame.Raw = pkg.Nalus{ - Nalus: []util.Memory{mem}, - } - f, err := ctx.CreateFrame(avFrame) - if err != nil { - t.Error(err) - return - } - frame := f.(*RTPVideo) + avFrame.Raw = []util.Memory{mem} + frame := new(RTPVideo) + frame.Mux(ctx, avFrame) var track = &pkg.AVTrack{} - _, _, raw, err := frame.Parse(track) + err := frame.Parse(track) if err != nil { t.Error(err) return } - if s := string(raw.(pkg.Nalus).Nalus[0].ToBytes()); s != randStr { + if s := string(track.Value.Raw.(pkg.Nalus)[0].ToBytes()); s != randStr { t.Error("not equal", len(s), len(randStr)) } } diff --git a/plugin/webrtc/index.go b/plugin/webrtc/index.go index ed11bcb..c6176e5 100644 --- a/plugin/webrtc/index.go +++ b/plugin/webrtc/index.go @@ -349,7 +349,7 @@ func (conf *WebRTCPlugin) Play_(w http.ResponseWriter, r *http.Request) { } else { var rtpCtx mrtp.RTPData var tmpAVTrack AVTrack - err = rtpCtx.DecodeConfig(&tmpAVTrack, vt.ICodecCtx) + err = rtpCtx.ConvertCtx(vt.ICodecCtx, &tmpAVTrack) if err == nil { rcc = tmpAVTrack.ICodecCtx.(mrtp.IRTPCtx).GetRTPCodecParameter() } else { diff --git a/publisher.go b/publisher.go index af31159..0b9816e 100644 --- a/publisher.go +++ b/publisher.go @@ -192,6 +192,7 @@ func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) { frame := &t.Value frame.Wraps = append(frame.Wraps, data) ts := data.GetTimestamp() + frame.CTS = data.GetCTS() if p.lastTs == 0 { p.baseTs -= ts } @@ -232,7 +233,7 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) { p.Unlock() } oldCodecCtx := t.ICodecCtx - t.Value.IDR, _, t.Value.Raw, err = data.Parse(t) + err = data.Parse(t) codecCtxChanged := oldCodecCtx != t.ICodecCtx if err != nil { p.Error("parse", "err", err) @@ -261,16 +262,16 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) { p.writeAV(t, data) if p.VideoTrack.Length > 1 && p.VideoTrack.IsReady() { if t.Value.Raw == nil { - t.Value.Raw, err = t.Value.Wraps[0].ToRaw(t.ICodecCtx) - if err != nil { + if err = t.Value.Demux(t.ICodecCtx); err != nil { t.Error("to raw", "err", err) return err } } - var toFrame IAVFrame for i, track := range p.VideoTrack.Items[1:] { + toType := track.FrameType.Elem() + toFrame := reflect.New(toType).Interface().(IAVFrame) if track.ICodecCtx == nil { - err = (reflect.New(track.FrameType.Elem()).Interface().(IAVFrame)).DecodeConfig(track, t.ICodecCtx) + err = toFrame.ConvertCtx(t.ICodecCtx, track) if err != nil { track.Error("DecodeConfig", "err", err) return @@ -278,26 +279,22 @@ func (p *Publisher) WriteVideo(data IAVFrame) (err error) { if t.IDRingList.Len() > 0 { for rf := t.IDRingList.Front().Value; rf != t.Ring; rf = rf.Next() { if i == 0 && rf.Value.Raw == nil { - rf.Value.Raw, err = rf.Value.Wraps[0].ToRaw(t.ICodecCtx) - if err != nil { + if err = rf.Value.Demux(t.ICodecCtx); err != nil { t.Error("to raw", "err", err) return err } } - if toFrame, err = track.CreateFrame(&rf.Value); err != nil { - track.Error("from raw", "err", err) - return - } + toFrame := reflect.New(toType).Interface().(IAVFrame) + toFrame.SetAllocator(data.GetAllocator()) + toFrame.Mux(track.ICodecCtx, &rf.Value) rf.Value.Wraps = append(rf.Value.Wraps, toFrame) } } } - if toFrame, err = track.CreateFrame(&t.Value); err != nil { - track.Error("from raw", "err", err) - return - } + toFrame.SetAllocator(data.GetAllocator()) + toFrame.Mux(track.ICodecCtx, &t.Value) if codecCtxChanged { - toFrame.DecodeConfig(track, t.ICodecCtx) + err = toFrame.ConvertCtx(t.ICodecCtx, track) } t.Value.Wraps = append(t.Value.Wraps, toFrame) if track.ICodecCtx != nil { @@ -336,7 +333,7 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) { p.Unlock() } oldCodecCtx := t.ICodecCtx - _, _, t.Value.Raw, err = data.Parse(t) + err = data.Parse(t) codecCtxChanged := oldCodecCtx != t.ICodecCtx if t.ICodecCtx == nil { return ErrUnsupportCodec @@ -345,16 +342,16 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) { p.writeAV(t, data) if p.AudioTrack.Length > 1 && p.AudioTrack.IsReady() { if t.Value.Raw == nil { - t.Value.Raw, err = t.Value.Wraps[0].ToRaw(t.ICodecCtx) - if err != nil { + if err = t.Value.Demux(t.ICodecCtx); err != nil { t.Error("to raw", "err", err) return err } } - var toFrame IAVFrame for i, track := range p.AudioTrack.Items[1:] { + toType := track.FrameType.Elem() + toFrame := reflect.New(toType).Interface().(IAVFrame) if track.ICodecCtx == nil { - err = (reflect.New(track.FrameType.Elem()).Interface().(IAVFrame)).DecodeConfig(track, t.ICodecCtx) + err = toFrame.ConvertCtx(t.ICodecCtx, track) if err != nil { track.Error("DecodeConfig", "err", err) return @@ -362,26 +359,22 @@ func (p *Publisher) WriteAudio(data IAVFrame) (err error) { if idr := p.AudioTrack.GetOldestIDR(); idr != nil { for rf := idr; rf != t.Ring; rf = rf.Next() { if i == 0 && rf.Value.Raw == nil { - rf.Value.Raw, err = rf.Value.Wraps[0].ToRaw(t.ICodecCtx) - if err != nil { + if err = rf.Value.Demux(t.ICodecCtx); err != nil { t.Error("to raw", "err", err) return err } } - if toFrame, err = track.CreateFrame(&rf.Value); err != nil { - track.Error("from raw", "err", err) - return - } + toFrame := reflect.New(toType).Interface().(IAVFrame) + toFrame.SetAllocator(data.GetAllocator()) + toFrame.Mux(track.ICodecCtx, &rf.Value) rf.Value.Wraps = append(rf.Value.Wraps, toFrame) } } } - if toFrame, err = track.CreateFrame(&t.Value); err != nil { - track.Error("from raw", "err", err) - return - } + toFrame.SetAllocator(data.GetAllocator()) + toFrame.Mux(track.ICodecCtx, &t.Value) if codecCtxChanged { - toFrame.DecodeConfig(track, t.ICodecCtx) + err = toFrame.ConvertCtx(t.ICodecCtx, track) } t.Value.Wraps = append(t.Value.Wraps, toFrame) if track.ICodecCtx != nil { diff --git a/server.go b/server.go index ffb5bfe..17c5d51 100644 --- a/server.go +++ b/server.go @@ -21,7 +21,6 @@ import ( "gopkg.in/yaml.v3" "m7s.live/m7s/v5/pb" . "m7s.live/m7s/v5/pkg" - "m7s.live/m7s/v5/pkg/config" "m7s.live/m7s/v5/pkg/util" ) @@ -40,10 +39,18 @@ var ( defaultLogHandler = console.NewHandler(os.Stdout, &console.HandlerOptions{TimeFormat: "15:04:05.000000"}) ) +type ServerConfig struct { + EnableSubEvent bool `default:"true" desc:"启用订阅事件,禁用可以提高性能"` //启用订阅事件,禁用可以提高性能 + SettingDir string `default:".m7s" desc:""` + EventBusSize int `default:"10" desc:"事件总线大小"` //事件总线大小 + PulseInterval time.Duration `default:"5s" desc:"心跳事件间隔"` //心跳事件间隔 + DisableAll bool `default:"false" desc:"禁用所有插件"` //禁用所有插件 +} + type Server struct { pb.UnimplementedGlobalServer Plugin - config.Engine + ServerConfig ID int eventChan chan any Plugins util.Collection[string, *Plugin] @@ -146,7 +153,7 @@ func (s *Server) run(ctx context.Context, conf any) (err error) { } } s.Config.Parse(&s.config, "GLOBAL") - s.Config.Parse(&s.Engine, "GLOBAL") + s.Config.Parse(&s.ServerConfig, "GLOBAL") if cg != nil { s.Config.ParseUserFile(cg["global"]) } diff --git a/subscriber.go b/subscriber.go index 0c5646b..267a67b 100644 --- a/subscriber.go +++ b/subscriber.go @@ -17,6 +17,8 @@ import ( "m7s.live/m7s/v5/pkg/util" ) +var AVFrameType = reflect.TypeOf((*AVFrame)(nil)) + type Owner struct { Conn net.Conn File *os.File @@ -89,11 +91,62 @@ type Subscriber struct { VideoReader *AVRingReader } +func (s *Subscriber) createAudioReader(dataType reflect.Type, startAudioTs time.Duration) (awi int) { + if s.Publisher == nil || dataType == nil { + return + } + var at *AVTrack + if dataType == AVFrameType { + at = s.Publisher.AudioTrack.AVTrack + awi = -1 + } else { + at = s.Publisher.GetAudioTrack(dataType) + if at != nil { + awi = at.WrapIndex + } + } + if at != nil { + if err := at.WaitReady(); err != nil { + return + } + ar := NewAVRingReader(at) + s.AudioReader = ar + ar.StartTs = startAudioTs + ar.Logger = s.Logger.With("reader", dataType.String()) + ar.Info("start read") + } + return +} + +func (s *Subscriber) createVideoReader(dataType reflect.Type, startVideoTs time.Duration) (vwi int) { + if s.Publisher == nil || dataType == nil { + return + } + var vt *AVTrack + if dataType == AVFrameType { + vt = s.Publisher.VideoTrack.AVTrack + vwi = -1 + } else { + vt = s.Publisher.GetVideoTrack(dataType) + if vt != nil { + vwi = vt.WrapIndex + } + } + if vt != nil { + if err := vt.WaitReady(); err != nil { + return + } + vr := NewAVRingReader(vt) + vr.StartTs = startVideoTs + s.VideoReader = vr + vr.Logger = s.Logger.With("reader", dataType.String()) + vr.Info("start read") + } + return +} + func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func(V) error) (err error) { - var ar, vr *AVRingReader var a1, v1 reflect.Type - var at, vt *AVTrack - var awi, vwi int var startAudioTs, startVideoTs time.Duration var initState = 0 prePublisher := s.Publisher @@ -104,62 +157,14 @@ func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func( if s.SubVideo { v1 = reflect.TypeOf(onVideo).In(0) } - createAudioReader := func() { - if s.Publisher == nil || a1 == nil { - return - } - if a1 == reflect.TypeOf(audioFrame) { - at = s.Publisher.AudioTrack.AVTrack - awi = -1 - } else { - at = s.Publisher.GetAudioTrack(a1) - if at != nil { - awi = at.WrapIndex - } - } - if at != nil { - if err := at.WaitReady(); err != nil { - return - } - ar = NewAVRingReader(at) - s.AudioReader = ar - ar.StartTs = startAudioTs - ar.Logger = s.Logger.With("reader", a1.String()) - ar.Info("start read") - } - } - createVideoReader := func() { - if s.Publisher == nil || v1 == nil { - return - } - if v1 == reflect.TypeOf(videoFrame) { - vt = s.Publisher.VideoTrack.AVTrack - vwi = -1 - } else { - vt = s.Publisher.GetVideoTrack(v1) - if vt != nil { - vwi = vt.WrapIndex - } - } - if vt != nil { - if err := vt.WaitReady(); err != nil { - return - } - vr = NewAVRingReader(vt) - vr.StartTs = startVideoTs - s.VideoReader = vr - vr.Logger = s.Logger.With("reader", v1.String()) - vr.Info("start read") - } - } - createAudioReader() - createVideoReader() + awi := s.createAudioReader(a1, startAudioTs) + vwi := s.createVideoReader(v1, startVideoTs) defer func() { - if ar != nil { - ar.StopRead() + if s.AudioReader != nil { + s.AudioReader.StopRead() } - if vr != nil { - vr.StopRead() + if s.VideoReader != nil { + s.VideoReader.StopRead() } }() sendAudioFrame := func() (err error) { @@ -170,7 +175,7 @@ func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func( } err = onAudio(audioFrame.Wraps[awi].(A)) } else { - ar.StopRead() + s.AudioReader.StopRead() } } else { err = onAudio(any(audioFrame).(A)) @@ -189,7 +194,7 @@ func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func( } err = onVideo(videoFrame.Wraps[vwi].(V)) } else { - vr.StopRead() + s.VideoReader.StopRead() } } else { err = onVideo(any(videoFrame).(V)) @@ -203,23 +208,24 @@ func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func( checkPublisherChange := func() { if prePublisher != s.Publisher { s.Info("publisher changed", "prePublisher", prePublisher.ID, "publisher", s.Publisher.ID) - if ar != nil { - startAudioTs = time.Duration(ar.AbsTime) * time.Millisecond - ar.StopRead() - ar = nil + if s.AudioReader != nil { + startAudioTs = time.Duration(s.AudioReader.AbsTime) * time.Millisecond + s.AudioReader.StopRead() + s.AudioReader = nil } - if vr != nil { - startVideoTs = time.Duration(vr.AbsTime) * time.Millisecond - vr.StopRead() - vr = nil + if s.VideoReader != nil { + startVideoTs = time.Duration(s.VideoReader.AbsTime) * time.Millisecond + s.VideoReader.StopRead() + s.VideoReader = nil } - createAudioReader() - createVideoReader() + awi = s.createAudioReader(a1, startAudioTs) + vwi = s.createVideoReader(v1, startVideoTs) prePublisher = s.Publisher } } for err == nil { err = s.Err() + ar, vr := s.AudioReader, s.VideoReader if vr != nil { for err == nil { err = vr.ReadFrame(&s.Subscribe) @@ -260,7 +266,7 @@ func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func( } } } else { - createVideoReader() + vwi = s.createVideoReader(v1, startVideoTs) } // 正常模式下或者纯音频模式下,音频开始播放 if ar != nil { @@ -305,7 +311,7 @@ func PlayBlock[A any, V any](s *Subscriber, onAudio func(A) error, onVideo func( } } } else { - createAudioReader() + awi = s.createAudioReader(a1, startAudioTs) } checkPublisherChange() runtime.Gosched()