diff --git a/api.go b/api.go index 87d1149..490c1cd 100644 --- a/api.go +++ b/api.go @@ -96,22 +96,14 @@ func (s *Server) api_Stream_AnnexB_(rw http.ResponseWriter, r *http.Request) { return } defer reader.StopRead() - if reader.Value.Raw == nil { - if err = reader.Value.Demux(publisher.VideoTrack.ICodecCtx); err != nil { - http.Error(rw, err.Error(), http.StatusInternalServerError) - return - } - } - var annexb pkg.AnnexB - var t pkg.AVTrack - - t.ICodecCtx, t.SequenceFrame, err = annexb.ConvertCtx(publisher.VideoTrack.ICodecCtx) - if t.ICodecCtx == nil { - http.Error(rw, "unsupported codec", http.StatusInternalServerError) + var annexb *pkg.AnnexB + var converter = pkg.NewAVFrameConvert[*pkg.AnnexB](publisher.VideoTrack.AVTrack, nil) + annexb, err = converter.ConvertFromAVFrame(&reader.Value) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) return } - annexb.Mux(t.ICodecCtx, &reader.Value) - _, err = annexb.WriteTo(rw) + annexb.WriteTo(rw) } func (s *Server) getStreamInfo(pub *Publisher) (res *pb.StreamInfoResponse, err error) { diff --git a/pkg/avframe_convert.go b/pkg/avframe_convert.go index f340f84..27b9d20 100644 --- a/pkg/avframe_convert.go +++ b/pkg/avframe_convert.go @@ -42,10 +42,8 @@ func (c *AVFrameConvert[T]) ConvertFromAVFrame(avFrame *AVFrame) (to T, err erro return } } - if avFrame.Raw == nil { - if err = avFrame.Demux(c.FromTrack.ICodecCtx); err != nil { - return - } + if err = avFrame.Demux(c.FromTrack.ICodecCtx); err != nil { + return } to.SetAllocator(avFrame.Wraps[0].GetAllocator()) to.Mux(c.ToTrack.ICodecCtx, avFrame) @@ -67,10 +65,8 @@ func (c *AVFrameConvert[T]) Convert(frame IAVFrame) (to T, err error) { } } c.lastFromCodecCtx = c.FromTrack.ICodecCtx - if c.FromTrack.Value.Raw == nil { - if c.FromTrack.Value.Raw, err = frame.Demux(c.FromTrack.ICodecCtx); err != nil { - return - } + if c.FromTrack.Value.Raw, err = frame.Demux(c.FromTrack.ICodecCtx); err != nil { + return } to.SetAllocator(frame.GetAllocator()) to.Mux(c.ToTrack.ICodecCtx, &c.FromTrack.Value) diff --git a/pkg/track.go b/pkg/track.go index c631e7e..c86d99b 100644 --- a/pkg/track.go +++ b/pkg/track.go @@ -86,8 +86,6 @@ func NewAVTrack(args ...any) (t *AVTrack) { t.RingWriter.SLogger = t.Logger case *util.Promise: t.ready = v - case codec.ICodecCtx: - t.ICodecCtx = v } } //t.ready = util.NewPromise(struct{}{}) diff --git a/plugin/flv/download.go b/plugin/flv/download.go index 8ad1442..644ea52 100644 --- a/plugin/flv/download.go +++ b/plugin/flv/download.go @@ -202,6 +202,7 @@ func (plugin *FLVPlugin) processMp4ToFlv(w http.ResponseWriter, r *http.Request, StartTime: params.startTime, EndTime: params.endTime, Streams: mp4Streams, + Logger: plugin.Logger.With("demuxer", "mp4_flv"), }, } diff --git a/plugin/hls/download.go b/plugin/hls/download.go index 02b228f..770a16d 100644 --- a/plugin/hls/download.go +++ b/plugin/hls/download.go @@ -12,6 +12,7 @@ import ( m7s "m7s.live/v5" "m7s.live/v5/pkg" + "m7s.live/v5/pkg/codec" "m7s.live/v5/pkg/util" hls "m7s.live/v5/plugin/hls/pkg" mpegts "m7s.live/v5/plugin/hls/pkg/ts" @@ -188,6 +189,7 @@ func (plugin *HLSPlugin) processMp4ToTs(w http.ResponseWriter, r *http.Request, // 创建MP4流列表 var mp4Streams []m7s.RecordStream for _, info := range fileInfoList { + plugin.Debug("Processing MP4 file", "path", info.filePath, "startTime", info.startTime, "endTime", info.endTime) mp4Streams = append(mp4Streams, m7s.RecordStream{ FilePath: info.filePath, StartTime: info.startTime, @@ -202,6 +204,7 @@ func (plugin *HLSPlugin) processMp4ToTs(w http.ResponseWriter, r *http.Request, StartTime: params.startTime, EndTime: params.endTime, Streams: mp4Streams, + Logger: plugin.Logger.With("demuxer", "mp4_Ts"), }, } @@ -211,7 +214,14 @@ func (plugin *HLSPlugin) processMp4ToTs(w http.ResponseWriter, r *http.Request, // 写入PMT头的辅助函数 writePMTHeader := func() { if !hasWritten { - tsWriter.WritePMTPacket(demuxer.AudioTrack.ICodecCtx.FourCC(), demuxer.VideoTrack.ICodecCtx.FourCC()) + var audio, video codec.FourCC + if demuxer.AudioTrack != nil && demuxer.AudioTrack.ICodecCtx != nil { + audio = demuxer.AudioTrack.ICodecCtx.FourCC() + } + if demuxer.VideoTrack != nil && demuxer.VideoTrack.ICodecCtx != nil { + video = demuxer.VideoTrack.ICodecCtx.FourCC() + } + tsWriter.WritePMTPacket(audio, video) hasWritten = true } } diff --git a/plugin/hls/index.go b/plugin/hls/index.go index cb03858..c463dab 100644 --- a/plugin/hls/index.go +++ b/plugin/hls/index.go @@ -74,6 +74,9 @@ func (config *HLSPlugin) vod(w http.ResponseWriter, r *http.Request) { } query := r.URL.Query() fileName := query.Get("streamPath") + if fileName == "" { + fileName = r.PathValue("streamPath") + } waitTimeout, err := time.ParseDuration(query.Get("timeout")) if err == nil { config.Debug("request", "fileName", fileName, "timeout", waitTimeout) @@ -116,18 +119,17 @@ func (config *HLSPlugin) vod(w http.ResponseWriter, r *http.Request) { return } else if recordType == "ts" { playlist := hls.Playlist{ - Version: 7, + Version: 3, Sequence: 0, Targetduration: 10, } var plBuffer util.Buffer playlist.Writer = &plBuffer playlist.Init() - segDur := endTime.Sub(startTime) / 10 * time.Second - for i := startTime; i.Before(endTime); i = i.Add(segDur) { + for i := startTime; i.Before(endTime); i = i.Add(10 * time.Second) { playlist.WriteInf(hls.PlaylistInf{ Duration: 10, - URL: fmt.Sprintf("/hls/download/%s.ts?start=%d&end=%d", streamPath, i.Unix(), i.Add(segDur).Unix()), + URL: fmt.Sprintf("/hls/download/%s.ts?start=%d&end=%d", streamPath, i.Unix(), i.Add(10*time.Second).Unix()), Title: i.Format(time.RFC3339), }) } diff --git a/plugin/mp4/pkg/audio.go b/plugin/mp4/pkg/audio.go index f7e5c99..b588ed3 100644 --- a/plugin/mp4/pkg/audio.go +++ b/plugin/mp4/pkg/audio.go @@ -34,10 +34,6 @@ func (a *Audio) Parse(t *pkg.AVTrack) error { t.Value.IDR = false // 音频帧通常不是 IDR t.Value.Timestamp = time.Duration(a.Timestamp) * time.Millisecond t.Value.CTS = time.Duration(a.CTS) * time.Millisecond - - // 对于 MP4 音频帧,我们通常从 Sample 中获取数据 - // 这里可以添加更多的解析逻辑,比如解析编解码器信息 - return nil } diff --git a/plugin/mp4/pkg/demux-range.go b/plugin/mp4/pkg/demux-range.go index aceba07..b223874 100644 --- a/plugin/mp4/pkg/demux-range.go +++ b/plugin/mp4/pkg/demux-range.go @@ -2,6 +2,7 @@ package mp4 import ( "context" + "log/slog" "os" "time" @@ -16,6 +17,7 @@ import ( ) type DemuxerRange struct { + *slog.Logger StartTime, EndTime time.Time Streams []m7s.RecordStream AudioTrack, VideoTrack *pkg.AVTrack @@ -51,9 +53,12 @@ func (d *DemuxerRange) Demux(ctx context.Context, onAudio func(*Audio) error, on h264Ctx.CodecData, err = h264parser.NewCodecDataFromAVCDecoderConfRecord(track.ExtraData) if err == nil { if d.VideoTrack == nil { - d.VideoTrack = pkg.NewAVTrack(&Video{ - allocator: allocator, - }, &h264Ctx) + d.VideoTrack = &pkg.AVTrack{ + ICodecCtx: &h264Ctx, + RingWriter: &pkg.RingWriter{ + Ring: util.NewRing[pkg.AVFrame](1), + }} + d.VideoTrack.Logger = d.With("track", "video") } else { // 如果已经有视频轨道,使用现有的轨道 d.VideoTrack.ICodecCtx = &h264Ctx @@ -64,9 +69,12 @@ func (d *DemuxerRange) Demux(ctx context.Context, onAudio func(*Audio) error, on h265Ctx.CodecData, err = h265parser.NewCodecDataFromAVCDecoderConfRecord(track.ExtraData) if err == nil { if d.VideoTrack == nil { - d.VideoTrack = pkg.NewAVTrack(&Video{ - allocator: allocator, - }, &h265Ctx) + d.VideoTrack = &pkg.AVTrack{ + ICodecCtx: &h265Ctx, + RingWriter: &pkg.RingWriter{ + Ring: util.NewRing[pkg.AVFrame](1), + }} + d.VideoTrack.Logger = d.With("track", "video") } else { // 如果已经有视频轨道,使用现有的轨道 d.VideoTrack.ICodecCtx = &h265Ctx @@ -77,19 +85,46 @@ func (d *DemuxerRange) Demux(ctx context.Context, onAudio func(*Audio) error, on aacCtx.CodecData, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(track.ExtraData) if err == nil { if d.AudioTrack == nil { - d.AudioTrack = pkg.NewAVTrack(&Audio{ - allocator: allocator, - }, &aacCtx) + d.AudioTrack = &pkg.AVTrack{ + ICodecCtx: &aacCtx, + RingWriter: &pkg.RingWriter{ + Ring: util.NewRing[pkg.AVFrame](1), + }} + d.AudioTrack.Logger = d.With("track", "audio") } else { // 如果已经有音频轨道,使用现有的轨道 d.AudioTrack.ICodecCtx = &aacCtx } } - case box.MP4_CODEC_G711A, box.MP4_CODEC_G711U: + case box.MP4_CODEC_G711A: if d.AudioTrack == nil { - d.AudioTrack = pkg.NewAVTrack(&Audio{ - allocator: allocator, - }) + d.AudioTrack = &pkg.AVTrack{ + ICodecCtx: &codec.PCMACtx{ + AudioCtx: codec.AudioCtx{ + SampleRate: 8000, + Channels: 1, + SampleSize: 16, + }, + }, + RingWriter: &pkg.RingWriter{ + Ring: util.NewRing[pkg.AVFrame](1), + }} + d.AudioTrack.Logger = d.With("track", "audio") + } + case box.MP4_CODEC_G711U: + if d.AudioTrack == nil { + d.AudioTrack = &pkg.AVTrack{ + ICodecCtx: &codec.PCMUCtx{ + AudioCtx: codec.AudioCtx{ + SampleRate: 8000, + Channels: 1, + SampleSize: 16, + }, + }, + RingWriter: &pkg.RingWriter{ + Ring: util.NewRing[pkg.AVFrame](1), + }} + d.AudioTrack.Logger = d.With("track", "audio") } } } diff --git a/plugin/mp4/pkg/pull-recorder.go b/plugin/mp4/pkg/pull-recorder.go index c5a9cee..d5ffbd6 100644 --- a/plugin/mp4/pkg/pull-recorder.go +++ b/plugin/mp4/pkg/pull-recorder.go @@ -49,7 +49,9 @@ func (p *RecordReader) Run() (err error) { var tsOffset int64 // 时间戳偏移量 // 创建可复用的 DemuxerRange 实例 - demuxerRange := &DemuxerRange{} + demuxerRange := &DemuxerRange{ + Logger: p.Logger.With("demuxer", "mp4"), + } for loop := 0; loop < p.Loop; loop++ { // 每次循环时更新时间戳偏移量以保持连续性