From 16dcafba9dbb039c6ea8cd7b8646fe1e147e1355 Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Wed, 23 Apr 2025 17:17:03 +0800 Subject: [PATCH] feat: add auto recovery to mp4 record --- plugin/mp4/index.go | 28 +++-- plugin/mp4/pkg/box/box.go | 2 + plugin/mp4/pkg/box/moov.go | 14 ++- plugin/mp4/pkg/box/udta.go | 95 ++++++++++++++++ plugin/mp4/pkg/demuxer.go | 5 + plugin/mp4/pkg/muxer.go | 16 +++ plugin/mp4/pkg/record.go | 4 +- plugin/mp4/recovery.go | 218 +++++++++++++++++++++++++++++++++++++ 8 files changed, 369 insertions(+), 13 deletions(-) create mode 100644 plugin/mp4/pkg/box/udta.go create mode 100644 plugin/mp4/recovery.go diff --git a/plugin/mp4/index.go b/plugin/mp4/index.go index ae66819..646a004 100644 --- a/plugin/mp4/index.go +++ b/plugin/mp4/index.go @@ -56,6 +56,7 @@ type MP4Plugin struct { RecordFileExpireDays int `desc:"录像自动删除的天数,0或未设置表示不自动删除"` DiskMaxPercent float64 `default:"90" desc:"硬盘使用百分之上限值,超上限后触发报警,并停止当前所有磁盘写入动作。"` AutoOverWriteDiskPercent float64 `default:"0" desc:"自动覆盖功能磁盘占用上限值,超过上限时连续录像自动删除日有录像,事件录像自动删除非重要事件录像,删除规则为删除距离当日最久日期的连续录像或非重要事件录像。"` + AutoRecovery bool `default:"true" desc:"是否自动恢复"` ExceptionPostUrl string `desc:"第三方异常上报地址"` EventRecordFilePath string `desc:"事件录像存放地址"` } @@ -80,15 +81,26 @@ func (p *MP4Plugin) RegisterHandler() map[string]http.HandlerFunc { } func (p *MP4Plugin) OnInit() (err error) { - if p.DB != nil && p.AutoOverWriteDiskPercent > 0 { + if p.DB != nil { err = p.DB.AutoMigrate(&Exception{}) - var deleteRecordTask DeleteRecordTask - deleteRecordTask.DB = p.DB - deleteRecordTask.DiskMaxPercent = p.DiskMaxPercent - deleteRecordTask.AutoOverWriteDiskPercent = p.AutoOverWriteDiskPercent - deleteRecordTask.RecordFileExpireDays = p.RecordFileExpireDays - deleteRecordTask.plugin = p - p.AddTask(&deleteRecordTask) + if err != nil { + return + } + if p.AutoOverWriteDiskPercent > 0 { + var deleteRecordTask DeleteRecordTask + deleteRecordTask.DB = p.DB + deleteRecordTask.DiskMaxPercent = p.DiskMaxPercent + deleteRecordTask.AutoOverWriteDiskPercent = p.AutoOverWriteDiskPercent + deleteRecordTask.RecordFileExpireDays = p.RecordFileExpireDays + deleteRecordTask.plugin = p + p.AddTask(&deleteRecordTask) + } + if p.AutoRecovery { + var recoveryTask RecordRecoveryTask + recoveryTask.DB = p.DB + recoveryTask.plugin = p + p.AddTask(&recoveryTask) + } } // go func() { //处理所有异常,录像中断异常、录像读取异常、录像导出文件中断、磁盘容量低于阈值异常、磁盘异常 // for exception := range exceptionChannel { diff --git a/plugin/mp4/pkg/box/box.go b/plugin/mp4/pkg/box/box.go index e2009da..6d23fd6 100644 --- a/plugin/mp4/pkg/box/box.go +++ b/plugin/mp4/pkg/box/box.go @@ -342,6 +342,8 @@ var ( TypeMETA = f("meta") TypeAUXV = f("auxv") TypeHINT = f("hint") + TypeUDTA = f("udta") + TypeM7SP = f("m7sp") // Custom box type for M7S StreamPath ) // aligned(8) class Box (unsigned int(32) boxtype, optional unsigned int(8)[16] extended_type) { diff --git a/plugin/mp4/pkg/box/moov.go b/plugin/mp4/pkg/box/moov.go index a25653c..628087b 100644 --- a/plugin/mp4/pkg/box/moov.go +++ b/plugin/mp4/pkg/box/moov.go @@ -9,9 +9,9 @@ type ( MoovBox struct { BaseBox Tracks []*TrakBox - // UDTA *UdtaBody - MVHD *MovieHeaderBox - MVEX *MovieExtendsBox + UDTA *UserDataBox + MVHD *MovieHeaderBox + MVEX *MovieExtendsBox } EdtsBox struct { @@ -26,6 +26,12 @@ func (m *MoovBox) WriteTo(w io.Writer) (n int64, err error) { for _, track := range m.Tracks { boxes = append(boxes, track) } + if m.MVEX != nil { + boxes = append(boxes, m.MVEX) + } + if m.UDTA != nil { + boxes = append(boxes, m.UDTA) + } return WriteTo(w, boxes...) } @@ -43,6 +49,8 @@ func (m *MoovBox) Unmarshal(buf []byte) (IBox, error) { m.MVHD = box case *MovieExtendsBox: m.MVEX = box + case *UserDataBox: + m.UDTA = box } } } diff --git a/plugin/mp4/pkg/box/udta.go b/plugin/mp4/pkg/box/udta.go new file mode 100644 index 0000000..12bff52 --- /dev/null +++ b/plugin/mp4/pkg/box/udta.go @@ -0,0 +1,95 @@ +package box + +import ( + "bytes" + "io" +) + +// User Data Box (udta) +// This box contains objects that declare user information about the containing box and its data. +type UserDataBox struct { + BaseBox + Entries []IBox +} + +// Custom metadata box for storing stream path +type StreamPathBox struct { + FullBox + StreamPath string +} + +// Create a new User Data Box +func CreateUserDataBox(entries ...IBox) *UserDataBox { + size := uint32(BasicBoxLen) + for _, entry := range entries { + size += uint32(entry.Size()) + } + return &UserDataBox{ + BaseBox: BaseBox{ + typ: TypeUDTA, + size: size, + }, + Entries: entries, + } +} + +// Create a new StreamPath Box +func CreateStreamPathBox(streamPath string) *StreamPathBox { + return &StreamPathBox{ + FullBox: FullBox{ + BaseBox: BaseBox{ + typ: TypeM7SP, // Custom box type for M7S StreamPath + size: uint32(FullBoxLen + len(streamPath)), + }, + Version: 0, + Flags: [3]byte{0, 0, 0}, + }, + StreamPath: streamPath, + } +} + +// WriteTo writes the UserDataBox to the given writer +func (box *UserDataBox) WriteTo(w io.Writer) (n int64, err error) { + for _, entry := range box.Entries { + var nn int64 + nn, err = entry.WriteTo(w) + n += nn + if err != nil { + return + } + } + return +} + +// Unmarshal parses the given buffer into a UserDataBox +func (box *UserDataBox) Unmarshal(buf []byte) (IBox, error) { + r := bytes.NewReader(buf) + for { + b, err := ReadFrom(r) + if err != nil { + if err == io.EOF { + break + } + return nil, err + } + box.Entries = append(box.Entries, b) + } + return box, nil +} + +// WriteTo writes the StreamPathBox to the given writer +func (box *StreamPathBox) WriteTo(w io.Writer) (n int64, err error) { + nn, err := w.Write([]byte(box.StreamPath)) + return int64(nn), err +} + +// Unmarshal parses the given buffer into a StreamPathBox +func (box *StreamPathBox) Unmarshal(buf []byte) (IBox, error) { + box.StreamPath = string(buf) + return box, nil +} + +func init() { + RegisterBox[*UserDataBox](TypeUDTA) + RegisterBox[*StreamPathBox](TypeM7SP) +} diff --git a/plugin/mp4/pkg/demuxer.go b/plugin/mp4/pkg/demuxer.go index b812f06..5d22e47 100644 --- a/plugin/mp4/pkg/demuxer.go +++ b/plugin/mp4/pkg/demuxer.go @@ -446,3 +446,8 @@ func (d *Demuxer) RangeSample(yield func(*Track, *Sample) bool) { } } } + +// GetMoovBox returns the Movie Box from the demuxer +func (d *Demuxer) GetMoovBox() *MoovBox { + return d.moov +} diff --git a/plugin/mp4/pkg/muxer.go b/plugin/mp4/pkg/muxer.go index 9fb06b9..b984c2a 100644 --- a/plugin/mp4/pkg/muxer.go +++ b/plugin/mp4/pkg/muxer.go @@ -29,6 +29,7 @@ type ( moov IBox mdatOffset uint64 mdatSize uint64 + StreamPath string // Added to store the stream path } ) @@ -54,6 +55,13 @@ func NewMuxer(flag Flag) *Muxer { } } +// NewMuxerWithStreamPath creates a new muxer with the specified stream path +func NewMuxerWithStreamPath(flag Flag, streamPath string) *Muxer { + muxer := NewMuxer(flag) + muxer.StreamPath = streamPath + return muxer +} + func (m *Muxer) CreateFTYPBox() *FileTypeBox { if m.isFragment() { return CreateFTYPBox(TypeISOM, 1, TypeISOM, TypeAVC1) @@ -223,6 +231,14 @@ func (m *Muxer) MakeMoov() IBox { if m.isDash() || m.isFragment() { children = append(children, m.makeMvex()) } + + // Add user data box with stream path if available + if m.StreamPath != "" { + streamPathBox := CreateStreamPathBox(m.StreamPath) + udta := CreateUserDataBox(streamPathBox) + children = append(children, udta) + } + m.moov = CreateContainerBox(TypeMOOV, children...) return m.moov } diff --git a/plugin/mp4/pkg/record.go b/plugin/mp4/pkg/record.go index f4171a0..0356b9a 100644 --- a/plugin/mp4/pkg/record.go +++ b/plugin/mp4/pkg/record.go @@ -203,9 +203,9 @@ func (r *Recorder) createStream(start time.Time) (err error) { } if recordJob.RecConf.Type == "fmp4" { r.stream.Type = "fmp4" - r.muxer = NewMuxer(FLAG_FRAGMENT) + r.muxer = NewMuxerWithStreamPath(FLAG_FRAGMENT, r.stream.StreamPath) } else { - r.muxer = NewMuxer(0) + r.muxer = NewMuxerWithStreamPath(0, r.stream.StreamPath) } r.muxer.WriteInitSegment(r.file) if sub.Publisher.HasAudioTrack() { diff --git a/plugin/mp4/recovery.go b/plugin/mp4/recovery.go new file mode 100644 index 0000000..26b33e6 --- /dev/null +++ b/plugin/mp4/recovery.go @@ -0,0 +1,218 @@ +package plugin_mp4 + +import ( + "os" + "path/filepath" + "strings" + "time" + + "gorm.io/gorm" + "m7s.live/v5" + "m7s.live/v5/pkg/task" + mp4 "m7s.live/v5/plugin/mp4/pkg" + "m7s.live/v5/plugin/mp4/pkg/box" +) + +// RecordRecoveryTask 从录像文件中恢复数据库记录的任务 +type RecordRecoveryTask struct { + task.TickTask + DB *gorm.DB + plugin *MP4Plugin +} + +// GetTickInterval 设置任务执行间隔 +func (t *RecordRecoveryTask) GetTickInterval() time.Duration { + return 24 * time.Hour // 默认每天执行一次 +} + +// Tick 执行任务 +func (t *RecordRecoveryTask) Tick(any) { + t.Info("Starting record recovery task") + t.recoverRecordsFromFiles() +} + +// recoverRecordsFromFiles 从文件系统中恢复录像记录 +func (t *RecordRecoveryTask) recoverRecordsFromFiles() { + // 获取所有录像目录 + var recordDirs []string + if len(t.plugin.GetCommonConf().OnPub.Record) > 0 { + for _, conf := range t.plugin.GetCommonConf().OnPub.Record { + dirPath := filepath.Dir(conf.FilePath) + recordDirs = append(recordDirs, dirPath) + } + } + if t.plugin.EventRecordFilePath != "" { + dirPath := filepath.Dir(t.plugin.EventRecordFilePath) + recordDirs = append(recordDirs, dirPath) + } + + // 遍历所有录像目录 + for _, dir := range recordDirs { + t.scanDirectory(dir) + } +} + +// scanDirectory 扫描目录中的MP4文件 +func (t *RecordRecoveryTask) scanDirectory(dir string) { + t.Info("Scanning directory for MP4 files", "directory", dir) + + // 递归遍历目录 + err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + t.Error("Error accessing path", "path", path, "error", err) + return nil // 继续遍历 + } + + // 跳过目录 + if info.IsDir() { + return nil + } + + // 只处理MP4文件 + if !strings.HasSuffix(strings.ToLower(path), ".mp4") { + return nil + } + + // 检查文件是否已经有记录 + var count int64 + t.DB.Model(&m7s.RecordStream{}).Where("file_path = ?", path).Count(&count) + if count > 0 { + // 已有记录,跳过 + return nil + } + + // 解析MP4文件并创建记录 + t.recoverRecordFromFile(path) + return nil + }) + + if err != nil { + t.Error("Error walking directory", "directory", dir, "error", err) + } +} + +// recoverRecordFromFile 从MP4文件中恢复记录 +func (t *RecordRecoveryTask) recoverRecordFromFile(filePath string) { + t.Info("Recovering record from file", "file", filePath) + + // 打开文件 + file, err := os.Open(filePath) + if err != nil { + t.Error("Failed to open MP4 file", "file", filePath, "error", err) + return + } + defer file.Close() + + // 创建解析器 + demuxer := mp4.NewDemuxer(file) + err = demuxer.Demux() + if err != nil { + t.Error("Failed to demux MP4 file", "file", filePath, "error", err) + return + } + + // 提取文件信息 + fileInfo, err := file.Stat() + if err != nil { + t.Error("Failed to get file info", "file", filePath, "error", err) + return + } + + // 尝试从MP4文件中提取流路径,如果没有则从文件名和路径推断 + streamPath := extractStreamPathFromMP4(demuxer) + if streamPath == "" { + streamPath = inferStreamPathFromFilePath(filePath) + } + + // 创建记录 + record := m7s.RecordStream{ + FilePath: filePath, + StreamPath: streamPath, + Type: "mp4", + Mode: m7s.RecordModeAuto, // 默认为自动录制模式 + EventLevel: m7s.EventLevelLow, // 默认为低级别事件 + } + + // 设置开始和结束时间 + record.StartTime = fileInfo.ModTime().Add(-estimateDurationFromFile(demuxer)) + record.EndTime = fileInfo.ModTime() + + // 提取编解码器信息 + for _, track := range demuxer.Tracks { + forcc := box.GetCodecNameWithCodecId(track.Cid) + if track.Cid.IsAudio() { + record.AudioCodec = string(forcc[:]) + } else if track.Cid.IsVideo() { + record.VideoCodec = string(forcc[:]) + } + } + + // 保存记录到数据库 + err = t.DB.Create(&record).Error + if err != nil { + t.Error("Failed to save record to database", "file", filePath, "error", err) + return + } + + t.Info("Successfully recovered record", "file", filePath, "streamPath", streamPath) +} + +// extractStreamPathFromMP4 从MP4文件中提取流路径 +func extractStreamPathFromMP4(demuxer *mp4.Demuxer) string { + // 尝试从MP4文件的用户数据中提取流路径 + moov := demuxer.GetMoovBox() + if moov != nil && moov.UDTA != nil { + for _, entry := range moov.UDTA.Entries { + if streamPathBox, ok := entry.(*box.StreamPathBox); ok { + return streamPathBox.StreamPath + } + } + } + return "" +} + +// inferStreamPathFromFilePath 从文件路径推断流路径 +func inferStreamPathFromFilePath(filePath string) string { + // 从文件路径中提取可能的流路径 + // 这里使用简单的启发式方法,实际应用中可能需要更复杂的逻辑 + base := filepath.Base(filePath) + ext := filepath.Ext(base) + name := strings.TrimSuffix(base, ext) + + // 如果文件名是时间戳,尝试从父目录获取流名称 + if _, err := time.Parse("20060102150405", name); err == nil || isNumeric(name) { + dir := filepath.Base(filepath.Dir(filePath)) + if dir != "" && dir != "." { + return dir + } + } + + return name +} + +// isNumeric 检查字符串是否为数字 +func isNumeric(s string) bool { + for _, c := range s { + if c < '0' || c > '9' { + return false + } + } + return len(s) > 0 +} + +// estimateDurationFromFile 估计文件的持续时间 +func estimateDurationFromFile(demuxer *mp4.Demuxer) time.Duration { + var maxDuration uint32 + + for _, track := range demuxer.Tracks { + if len(track.Samplelist) > 0 { + lastSample := track.Samplelist[len(track.Samplelist)-1] + durationMs := lastSample.Timestamp * 1000 / uint32(track.Timescale) + if durationMs > maxDuration { + maxDuration = durationMs + } + } + } + + return time.Duration(maxDuration) * time.Millisecond +}