feat: add auto recovery to mp4 record

This commit is contained in:
langhuihui
2025-04-23 17:17:03 +08:00
parent 42b29bccec
commit 16dcafba9d
8 changed files with 369 additions and 13 deletions
+20 -8
View File
@@ -56,6 +56,7 @@ type MP4Plugin struct {
RecordFileExpireDays int `desc:"录像自动删除的天数,0或未设置表示不自动删除"`
DiskMaxPercent float64 `default:"90" desc:"硬盘使用百分之上限值,超上限后触发报警,并停止当前所有磁盘写入动作。"`
AutoOverWriteDiskPercent float64 `default:"0" desc:"自动覆盖功能磁盘占用上限值,超过上限时连续录像自动删除日有录像,事件录像自动删除非重要事件录像,删除规则为删除距离当日最久日期的连续录像或非重要事件录像。"`
AutoRecovery bool `default:"true" desc:"是否自动恢复"`
ExceptionPostUrl string `desc:"第三方异常上报地址"`
EventRecordFilePath string `desc:"事件录像存放地址"`
}
@@ -80,15 +81,26 @@ func (p *MP4Plugin) RegisterHandler() map[string]http.HandlerFunc {
}
func (p *MP4Plugin) OnInit() (err error) {
if p.DB != nil && p.AutoOverWriteDiskPercent > 0 {
if p.DB != nil {
err = p.DB.AutoMigrate(&Exception{})
var deleteRecordTask DeleteRecordTask
deleteRecordTask.DB = p.DB
deleteRecordTask.DiskMaxPercent = p.DiskMaxPercent
deleteRecordTask.AutoOverWriteDiskPercent = p.AutoOverWriteDiskPercent
deleteRecordTask.RecordFileExpireDays = p.RecordFileExpireDays
deleteRecordTask.plugin = p
p.AddTask(&deleteRecordTask)
if err != nil {
return
}
if p.AutoOverWriteDiskPercent > 0 {
var deleteRecordTask DeleteRecordTask
deleteRecordTask.DB = p.DB
deleteRecordTask.DiskMaxPercent = p.DiskMaxPercent
deleteRecordTask.AutoOverWriteDiskPercent = p.AutoOverWriteDiskPercent
deleteRecordTask.RecordFileExpireDays = p.RecordFileExpireDays
deleteRecordTask.plugin = p
p.AddTask(&deleteRecordTask)
}
if p.AutoRecovery {
var recoveryTask RecordRecoveryTask
recoveryTask.DB = p.DB
recoveryTask.plugin = p
p.AddTask(&recoveryTask)
}
}
// go func() { //处理所有异常,录像中断异常、录像读取异常、录像导出文件中断、磁盘容量低于阈值异常、磁盘异常
// for exception := range exceptionChannel {
+2
View File
@@ -342,6 +342,8 @@ var (
TypeMETA = f("meta")
TypeAUXV = f("auxv")
TypeHINT = f("hint")
TypeUDTA = f("udta")
TypeM7SP = f("m7sp") // Custom box type for M7S StreamPath
)
// aligned(8) class Box (unsigned int(32) boxtype, optional unsigned int(8)[16] extended_type) {
+11 -3
View File
@@ -9,9 +9,9 @@ type (
MoovBox struct {
BaseBox
Tracks []*TrakBox
// UDTA *UdtaBody
MVHD *MovieHeaderBox
MVEX *MovieExtendsBox
UDTA *UserDataBox
MVHD *MovieHeaderBox
MVEX *MovieExtendsBox
}
EdtsBox struct {
@@ -26,6 +26,12 @@ func (m *MoovBox) WriteTo(w io.Writer) (n int64, err error) {
for _, track := range m.Tracks {
boxes = append(boxes, track)
}
if m.MVEX != nil {
boxes = append(boxes, m.MVEX)
}
if m.UDTA != nil {
boxes = append(boxes, m.UDTA)
}
return WriteTo(w, boxes...)
}
@@ -43,6 +49,8 @@ func (m *MoovBox) Unmarshal(buf []byte) (IBox, error) {
m.MVHD = box
case *MovieExtendsBox:
m.MVEX = box
case *UserDataBox:
m.UDTA = box
}
}
}
+95
View File
@@ -0,0 +1,95 @@
package box
import (
"bytes"
"io"
)
// User Data Box (udta)
// This box contains objects that declare user information about the containing box and its data.
type UserDataBox struct {
BaseBox
Entries []IBox
}
// Custom metadata box for storing stream path
type StreamPathBox struct {
FullBox
StreamPath string
}
// Create a new User Data Box
func CreateUserDataBox(entries ...IBox) *UserDataBox {
size := uint32(BasicBoxLen)
for _, entry := range entries {
size += uint32(entry.Size())
}
return &UserDataBox{
BaseBox: BaseBox{
typ: TypeUDTA,
size: size,
},
Entries: entries,
}
}
// Create a new StreamPath Box
func CreateStreamPathBox(streamPath string) *StreamPathBox {
return &StreamPathBox{
FullBox: FullBox{
BaseBox: BaseBox{
typ: TypeM7SP, // Custom box type for M7S StreamPath
size: uint32(FullBoxLen + len(streamPath)),
},
Version: 0,
Flags: [3]byte{0, 0, 0},
},
StreamPath: streamPath,
}
}
// WriteTo writes the UserDataBox to the given writer
func (box *UserDataBox) WriteTo(w io.Writer) (n int64, err error) {
for _, entry := range box.Entries {
var nn int64
nn, err = entry.WriteTo(w)
n += nn
if err != nil {
return
}
}
return
}
// Unmarshal parses the given buffer into a UserDataBox
func (box *UserDataBox) Unmarshal(buf []byte) (IBox, error) {
r := bytes.NewReader(buf)
for {
b, err := ReadFrom(r)
if err != nil {
if err == io.EOF {
break
}
return nil, err
}
box.Entries = append(box.Entries, b)
}
return box, nil
}
// WriteTo writes the StreamPathBox to the given writer
func (box *StreamPathBox) WriteTo(w io.Writer) (n int64, err error) {
nn, err := w.Write([]byte(box.StreamPath))
return int64(nn), err
}
// Unmarshal parses the given buffer into a StreamPathBox
func (box *StreamPathBox) Unmarshal(buf []byte) (IBox, error) {
box.StreamPath = string(buf)
return box, nil
}
func init() {
RegisterBox[*UserDataBox](TypeUDTA)
RegisterBox[*StreamPathBox](TypeM7SP)
}
+5
View File
@@ -446,3 +446,8 @@ func (d *Demuxer) RangeSample(yield func(*Track, *Sample) bool) {
}
}
}
// GetMoovBox returns the Movie Box from the demuxer
func (d *Demuxer) GetMoovBox() *MoovBox {
return d.moov
}
+16
View File
@@ -29,6 +29,7 @@ type (
moov IBox
mdatOffset uint64
mdatSize uint64
StreamPath string // Added to store the stream path
}
)
@@ -54,6 +55,13 @@ func NewMuxer(flag Flag) *Muxer {
}
}
// NewMuxerWithStreamPath creates a new muxer with the specified stream path
func NewMuxerWithStreamPath(flag Flag, streamPath string) *Muxer {
muxer := NewMuxer(flag)
muxer.StreamPath = streamPath
return muxer
}
func (m *Muxer) CreateFTYPBox() *FileTypeBox {
if m.isFragment() {
return CreateFTYPBox(TypeISOM, 1, TypeISOM, TypeAVC1)
@@ -223,6 +231,14 @@ func (m *Muxer) MakeMoov() IBox {
if m.isDash() || m.isFragment() {
children = append(children, m.makeMvex())
}
// Add user data box with stream path if available
if m.StreamPath != "" {
streamPathBox := CreateStreamPathBox(m.StreamPath)
udta := CreateUserDataBox(streamPathBox)
children = append(children, udta)
}
m.moov = CreateContainerBox(TypeMOOV, children...)
return m.moov
}
+2 -2
View File
@@ -203,9 +203,9 @@ func (r *Recorder) createStream(start time.Time) (err error) {
}
if recordJob.RecConf.Type == "fmp4" {
r.stream.Type = "fmp4"
r.muxer = NewMuxer(FLAG_FRAGMENT)
r.muxer = NewMuxerWithStreamPath(FLAG_FRAGMENT, r.stream.StreamPath)
} else {
r.muxer = NewMuxer(0)
r.muxer = NewMuxerWithStreamPath(0, r.stream.StreamPath)
}
r.muxer.WriteInitSegment(r.file)
if sub.Publisher.HasAudioTrack() {
+218
View File
@@ -0,0 +1,218 @@
package plugin_mp4
import (
"os"
"path/filepath"
"strings"
"time"
"gorm.io/gorm"
"m7s.live/v5"
"m7s.live/v5/pkg/task"
mp4 "m7s.live/v5/plugin/mp4/pkg"
"m7s.live/v5/plugin/mp4/pkg/box"
)
// RecordRecoveryTask 从录像文件中恢复数据库记录的任务
type RecordRecoveryTask struct {
task.TickTask
DB *gorm.DB
plugin *MP4Plugin
}
// GetTickInterval 设置任务执行间隔
func (t *RecordRecoveryTask) GetTickInterval() time.Duration {
return 24 * time.Hour // 默认每天执行一次
}
// Tick 执行任务
func (t *RecordRecoveryTask) Tick(any) {
t.Info("Starting record recovery task")
t.recoverRecordsFromFiles()
}
// recoverRecordsFromFiles 从文件系统中恢复录像记录
func (t *RecordRecoveryTask) recoverRecordsFromFiles() {
// 获取所有录像目录
var recordDirs []string
if len(t.plugin.GetCommonConf().OnPub.Record) > 0 {
for _, conf := range t.plugin.GetCommonConf().OnPub.Record {
dirPath := filepath.Dir(conf.FilePath)
recordDirs = append(recordDirs, dirPath)
}
}
if t.plugin.EventRecordFilePath != "" {
dirPath := filepath.Dir(t.plugin.EventRecordFilePath)
recordDirs = append(recordDirs, dirPath)
}
// 遍历所有录像目录
for _, dir := range recordDirs {
t.scanDirectory(dir)
}
}
// scanDirectory 扫描目录中的MP4文件
func (t *RecordRecoveryTask) scanDirectory(dir string) {
t.Info("Scanning directory for MP4 files", "directory", dir)
// 递归遍历目录
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
t.Error("Error accessing path", "path", path, "error", err)
return nil // 继续遍历
}
// 跳过目录
if info.IsDir() {
return nil
}
// 只处理MP4文件
if !strings.HasSuffix(strings.ToLower(path), ".mp4") {
return nil
}
// 检查文件是否已经有记录
var count int64
t.DB.Model(&m7s.RecordStream{}).Where("file_path = ?", path).Count(&count)
if count > 0 {
// 已有记录,跳过
return nil
}
// 解析MP4文件并创建记录
t.recoverRecordFromFile(path)
return nil
})
if err != nil {
t.Error("Error walking directory", "directory", dir, "error", err)
}
}
// recoverRecordFromFile 从MP4文件中恢复记录
func (t *RecordRecoveryTask) recoverRecordFromFile(filePath string) {
t.Info("Recovering record from file", "file", filePath)
// 打开文件
file, err := os.Open(filePath)
if err != nil {
t.Error("Failed to open MP4 file", "file", filePath, "error", err)
return
}
defer file.Close()
// 创建解析器
demuxer := mp4.NewDemuxer(file)
err = demuxer.Demux()
if err != nil {
t.Error("Failed to demux MP4 file", "file", filePath, "error", err)
return
}
// 提取文件信息
fileInfo, err := file.Stat()
if err != nil {
t.Error("Failed to get file info", "file", filePath, "error", err)
return
}
// 尝试从MP4文件中提取流路径,如果没有则从文件名和路径推断
streamPath := extractStreamPathFromMP4(demuxer)
if streamPath == "" {
streamPath = inferStreamPathFromFilePath(filePath)
}
// 创建记录
record := m7s.RecordStream{
FilePath: filePath,
StreamPath: streamPath,
Type: "mp4",
Mode: m7s.RecordModeAuto, // 默认为自动录制模式
EventLevel: m7s.EventLevelLow, // 默认为低级别事件
}
// 设置开始和结束时间
record.StartTime = fileInfo.ModTime().Add(-estimateDurationFromFile(demuxer))
record.EndTime = fileInfo.ModTime()
// 提取编解码器信息
for _, track := range demuxer.Tracks {
forcc := box.GetCodecNameWithCodecId(track.Cid)
if track.Cid.IsAudio() {
record.AudioCodec = string(forcc[:])
} else if track.Cid.IsVideo() {
record.VideoCodec = string(forcc[:])
}
}
// 保存记录到数据库
err = t.DB.Create(&record).Error
if err != nil {
t.Error("Failed to save record to database", "file", filePath, "error", err)
return
}
t.Info("Successfully recovered record", "file", filePath, "streamPath", streamPath)
}
// extractStreamPathFromMP4 从MP4文件中提取流路径
func extractStreamPathFromMP4(demuxer *mp4.Demuxer) string {
// 尝试从MP4文件的用户数据中提取流路径
moov := demuxer.GetMoovBox()
if moov != nil && moov.UDTA != nil {
for _, entry := range moov.UDTA.Entries {
if streamPathBox, ok := entry.(*box.StreamPathBox); ok {
return streamPathBox.StreamPath
}
}
}
return ""
}
// inferStreamPathFromFilePath 从文件路径推断流路径
func inferStreamPathFromFilePath(filePath string) string {
// 从文件路径中提取可能的流路径
// 这里使用简单的启发式方法,实际应用中可能需要更复杂的逻辑
base := filepath.Base(filePath)
ext := filepath.Ext(base)
name := strings.TrimSuffix(base, ext)
// 如果文件名是时间戳,尝试从父目录获取流名称
if _, err := time.Parse("20060102150405", name); err == nil || isNumeric(name) {
dir := filepath.Base(filepath.Dir(filePath))
if dir != "" && dir != "." {
return dir
}
}
return name
}
// isNumeric 检查字符串是否为数字
func isNumeric(s string) bool {
for _, c := range s {
if c < '0' || c > '9' {
return false
}
}
return len(s) > 0
}
// estimateDurationFromFile 估计文件的持续时间
func estimateDurationFromFile(demuxer *mp4.Demuxer) time.Duration {
var maxDuration uint32
for _, track := range demuxer.Tracks {
if len(track.Samplelist) > 0 {
lastSample := track.Samplelist[len(track.Samplelist)-1]
durationMs := lastSample.Timestamp * 1000 / uint32(track.Timescale)
if durationMs > maxDuration {
maxDuration = durationMs
}
}
}
return time.Duration(maxDuration) * time.Millisecond
}