From e47e039d299b83bc8d2cd6cc30739d1de53722d4 Mon Sep 17 00:00:00 2001 From: pg Date: Mon, 9 Dec 2024 21:58:28 +0800 Subject: [PATCH] fix:optimize mp4 event record --- plugin/mp4/api.go | 119 +++++++++++++++++++++++---------------- plugin/mp4/exception.go | 2 +- plugin/mp4/index.go | 1 + plugin/mp4/pkg/record.go | 61 +++++++++++++++----- recoder.go | 45 ++++++++------- 5 files changed, 148 insertions(+), 80 deletions(-) diff --git a/plugin/mp4/api.go b/plugin/mp4/api.go index f99a4ba..75fd885 100644 --- a/plugin/mp4/api.go +++ b/plugin/mp4/api.go @@ -7,16 +7,19 @@ import ( "net/http" "net/url" "os" + "path/filepath" "strings" "time" + "github.com/mcuadros/go-defaults" + "m7s.live/v5/pkg/config" + "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" - "m7s.live/v5/pkg" - "m7s.live/v5/plugin/mp4/pb" - m7s "m7s.live/v5" + "m7s.live/v5/pkg" "m7s.live/v5/pkg/util" + "m7s.live/v5/plugin/mp4/pb" mp4 "m7s.live/v5/plugin/mp4/pkg" "m7s.live/v5/plugin/mp4/pkg/box" ) @@ -284,53 +287,75 @@ func (p *MP4Plugin) download(w http.ResponseWriter, r *http.Request) { } func (p *MP4Plugin) EventStart(ctx context.Context, req *pb.ReqEventRecord) (res *pb.ResponseEventRecord, err error) { - recordStream := &m7s.RecordStream{ - StreamPath: req.StreamPath, - EventId: req.EventId, - EventLevel: req.EventLevel, - EventDesc: req.EventDesc, - EventName: req.EventName, - RecordMode: "1", - BeforeDuration: req.BeforeDuration, - AfterDuration: req.AfterDuration, + beforeDuration := p.BeforeDuration + afterDuration := p.AfterDuration + + if req.BeforeDuration != "" { + beforeDuration, err = time.ParseDuration(req.BeforeDuration) + if err != nil { + p.Info("error", err) + } } - if req.BeforeDuration == "" { - recordStream.BeforeDuration = p.BeforeDuration.String() + if req.AfterDuration != "" { + afterDuration, err = time.ParseDuration(req.AfterDuration) + if err != nil { + p.Info("error", err) + } } - if req.AfterDuration == "" { - recordStream.AfterDuration = p.AfterDuration.String() + recorder := p.Meta.Recorder() + recorderJobs := p.Server.Records + var tmpJob *m7s.RecordJob + if recorderJobs.Length > 0 { + for job := range recorderJobs.Range { + if job.StreamPath == req.StreamPath { + tmpJob = job + } + } } - now := time.Now() - beforeDuration, err := time.ParseDuration(recordStream.BeforeDuration) - if err != nil { - p.Info("error", err) - } - afterDuration, err := time.ParseDuration(recordStream.AfterDuration) - if err != nil { - p.Info("error", err) - } - startTime := now.Add(-beforeDuration) - endTime := now.Add(afterDuration) - recordStream.StartTime = startTime - recordStream.EndTime = endTime - //tmpFragment, _ := time.ParseDuration(recordStream.Fragment) - //if stream, ok := p.Server.Streams.Get(req.StreamPath); ok { - // recordConf := &config.Record{ - // Append: false, - // Fragment: tmpFragment, - // FilePath: filepath.Join(p.EventRecordFilePath, stream.StreamPath, time.Now().Local().Format("2006-01-02-15-04-05")), - // } - // p.Record(stream, *recordConf) - // - // //for r, recConf := range p.GetCommonConf().OnPub.Record { - // // if recConf.FilePath = r.Replace(stream.StreamPath, recConf.FilePath); recConf.FilePath != "" { - // // recConf.Fragment = tmpFragment - // // } - // //} - //} - err = p.DB.Save(recordStream).Error - res = &pb.ResponseEventRecord{ - Id: uint32(recordStream.ID), + if tmpJob == nil { //为空表示没有正在进行的录制,也就是没有自动录像,则进行正常的事件录像 + if stream, ok := p.Server.Streams.Get(req.StreamPath); ok { + recordConf := config.Record{ + Append: false, + Fragment: 0, + FilePath: filepath.Join(p.EventRecordFilePath, stream.StreamPath, time.Now().Local().Format("2006-01-02-15-04-05")), + } + recordJob := recorder.GetRecordJob() + recordJob.EventId = req.EventId + recordJob.EventLevel = req.EventLevel + recordJob.EventName = req.EventName + recordJob.EventDesc = req.EventDesc + recordJob.AfterDuration = afterDuration + recordJob.BeforeDuration = beforeDuration + recordJob.RecordMode = "1" + var subconfig config.Subscribe + defaults.SetDefaults(&subconfig) + subconfig.BufferTime = beforeDuration + job := recorder.GetRecordJob().Init(recorder, &p.Plugin, stream.StreamPath, recordConf, &subconfig) + job.Depend(stream) + } + } else { + if tmpJob.AfterDuration != 0 { //当前有事件录像正在录制,则更新该录像的结束时间 + tmpJob.AfterDuration = time.Duration(tmpJob.Subscriber.VideoReader.AbsTime)*time.Millisecond + afterDuration + } else { //当前有自动录像正在录制,则生成事件录像的记录,而不去生成事件录像的文件 + recordStream := &m7s.RecordStream{ + StreamPath: req.StreamPath, + EventId: req.EventId, + EventLevel: req.EventLevel, + EventDesc: req.EventDesc, + EventName: req.EventName, + RecordMode: "1", + BeforeDuration: beforeDuration, + AfterDuration: afterDuration, + } + now := time.Now() + startTime := now.Add(-beforeDuration) + endTime := now.Add(afterDuration) + recordStream.StartTime = startTime + recordStream.EndTime = endTime + if p.DB != nil { + p.DB.Save(&recordStream) + } + } } return res, err } diff --git a/plugin/mp4/exception.go b/plugin/mp4/exception.go index 77bd164..e05db79 100644 --- a/plugin/mp4/exception.go +++ b/plugin/mp4/exception.go @@ -135,7 +135,7 @@ func (t *DeleteRecordTask) Tick(any) { var eventRecords []m7s.RecordStream expireTime := time.Now().AddDate(0, 0, -t.RecordFileExpireDays) t.Debug("RecordFileExpireDays is set to auto delete oldestfile", "expireTime", expireTime.Format("2006-01-02 15:04:05")) - err := t.DB.Find(&eventRecords, "end_time < ? AND event_level=1", expireTime).Error + err := t.DB.Find(&eventRecords, "end_time < ? AND event_level=1 AND end_time != '1970-01-01 00:00:00'", expireTime).Error if err == nil { for _, record := range eventRecords { t.Info("RecordFileExpireDays is set to auto delete oldestfile", "ID", record.ID, "create time", record.EndTime, "filepath", record.FilePath) diff --git a/plugin/mp4/index.go b/plugin/mp4/index.go index 09ac9ab..8783b44 100644 --- a/plugin/mp4/index.go +++ b/plugin/mp4/index.go @@ -76,6 +76,7 @@ type MP4Plugin struct { DiskMaxPercent float64 `default:"90" desc:"硬盘使用百分之上限值,超上限后触发报警,并停止当前所有磁盘写入动作。"` AutoOverWriteDiskPercent float64 `default:"80" desc:"自动覆盖功能磁盘占用上限值,超过上限时连续录像自动删除日有录像,事件录像自动删除非重要事件录像,删除规则为删除距离当日最久日期的连续录像或非重要事件录像。"` ExceptionPostUrl string `desc:"第三方异常上报地址"` + EventRecordFilePath string `desc:"事件录像存放地址"` } const defaultConfig m7s.DefaultYaml = `publish: diff --git a/plugin/mp4/pkg/record.go b/plugin/mp4/pkg/record.go index 38a8c83..d248a5b 100644 --- a/plugin/mp4/pkg/record.go +++ b/plugin/mp4/pkg/record.go @@ -85,7 +85,6 @@ func (t *eventRecordCheck) Run() (err error) { t.DB.Find(&eventRecordStreams, "record_mode=1 AND event_level=0 AND stream_path=?", t.streamPath) //搜索事件录像,且为重要事件(无法自动删除) if len(eventRecordStreams) > 0 { for _, recordStream := range eventRecordStreams { - t.Info("abc", recordStream.StartTime) var unimportantEventRecordStreams []m7s.RecordStream query := `(start_time BETWEEN ? AND ?) OR (end_time BETWEEN ? AND ?) @@ -143,9 +142,16 @@ func (r *Recorder) createStream(start time.Time) (err error) { sub := recordJob.Subscriber var file *os.File r.stream = m7s.RecordStream{ - StartTime: start, - StreamPath: sub.StreamPath, - FilePath: CustomFileName(&r.RecordJob), + StartTime: start, + StreamPath: sub.StreamPath, + FilePath: CustomFileName(&r.RecordJob), + EventId: recordJob.EventId, + EventDesc: recordJob.EventDesc, + EventName: recordJob.EventName, + EventLevel: recordJob.EventLevel, + BeforeDuration: recordJob.BeforeDuration, + AfterDuration: recordJob.AfterDuration, + RecordMode: recordJob.RecordMode, } dir := filepath.Dir(r.stream.FilePath) if err = os.MkdirAll(dir, 0755); err != nil { @@ -187,12 +193,25 @@ func (r *Recorder) Run() (err error) { recordJob := &r.RecordJob sub := recordJob.Subscriber var audioTrack, videoTrack *Track - err = r.createStream(time.Now()) + startTime := time.Now() + if recordJob.BeforeDuration > 0 { + startTime = startTime.Add(-recordJob.BeforeDuration) + } + err = r.createStream(startTime) if err != nil { return } var at, vt *pkg.AVTrack + checkEventRecordStop := func(absTime uint32) (err error) { + if duration := int64(absTime); time.Duration(duration)*time.Millisecond >= recordJob.AfterDuration+recordJob.BeforeDuration { + now := time.Now() + r.writeTailer(now) + r.RecordJob.Stop(task.ErrStopByUser) + } + return + } + checkFragment := func(absTime uint32) (err error) { if duration := int64(absTime); time.Duration(duration)*time.Millisecond >= recordJob.Fragment { now := time.Now() @@ -215,10 +234,18 @@ func (r *Recorder) Run() (err error) { } return m7s.PlayBlock(sub, func(audio *pkg.RawAudio) error { - if sub.VideoReader == nil && recordJob.Fragment != 0 { - err := checkFragment(sub.AudioReader.AbsTime) - if err != nil { - return err + if sub.VideoReader == nil { + if recordJob.AfterDuration != 0 { + err := checkEventRecordStop(sub.VideoReader.AbsTime) + if err != nil { + return err + } + } + if recordJob.Fragment != 0 { + err := checkFragment(sub.AudioReader.AbsTime) + if err != nil { + return err + } } } if at == nil { @@ -249,10 +276,18 @@ func (r *Recorder) Run() (err error) { DTS: uint64(dts), }) }, func(video *rtmp.RTMPVideo) error { - if sub.VideoReader.Value.IDR && recordJob.Fragment != 0 { - err := checkFragment(sub.VideoReader.AbsTime) - if err != nil { - return err + if sub.VideoReader.Value.IDR { + if recordJob.AfterDuration != 0 { + err := checkEventRecordStop(sub.VideoReader.AbsTime) + if err != nil { + return err + } + } + if recordJob.Fragment != 0 { + err := checkFragment(sub.VideoReader.AbsTime) + if err != nil { + return err + } } } offset := 5 diff --git a/recoder.go b/recoder.go index ee32f6b..d44dc39 100644 --- a/recoder.go +++ b/recoder.go @@ -20,31 +20,38 @@ type ( Recorder = func() IRecorder RecordJob struct { task.Job - StreamPath string // 对应本地流 - Plugin *Plugin - Subscriber *Subscriber - SubConf *config.Subscribe - Fragment time.Duration - Append bool - FilePath string - recorder IRecorder + StreamPath string // 对应本地流 + Plugin *Plugin + Subscriber *Subscriber + SubConf *config.Subscribe + Fragment time.Duration + Append bool + FilePath string + recorder IRecorder + EventId string `json:"eventId" desc:"事件编号"` + RecordMode string `json:"recordMode" desc:"事件类型,0=连续录像模式,1=事件录像模式"` + BeforeDuration time.Duration `json:"beforeDuration" desc:"事件前缓存时长"` + AfterDuration time.Duration `json:"afterDuration" desc:"事件后缓存时长"` + EventDesc string `json:"eventDesc" desc:"事件描述"` + EventLevel string `json:"eventLevel" desc:"事件级别"` + EventName string `json:"eventName" desc:"事件名称"` } DefaultRecorder struct { task.Task RecordJob RecordJob } RecordStream struct { - ID uint `gorm:"primarykey"` - StartTime, EndTime time.Time - EventId string `json:"eventId" desc:"事件编号" gorm:"type:varchar(255);comment:事件编号"` - RecordMode string `json:"recordMode" desc:"事件类型,0=连续录像模式,1=事件录像模式" gorm:"type:varchar(255);comment:事件类型,0=连续录像模式,1=事件录像模式;default:'0'"` - EventName string `json:"eventName" desc:"事件名称" gorm:"type:varchar(255);comment:事件名称"` - BeforeDuration string `json:"beforeDuration" desc:"事件前缓存时长" gorm:"type:varchar(255);comment:事件前缓存时长;default:'30s'"` - AfterDuration string `json:"afterDuration" desc:"事件后缓存时长" gorm:"type:varchar(255);comment:事件后缓存时长;default:'30s'"` - Filename string `json:"fileName" desc:"文件名" gorm:"type:varchar(255);comment:文件名"` - EventDesc string `json:"eventDesc" desc:"事件描述" gorm:"type:varchar(255);comment:事件描述"` - Type string `json:"type" desc:"录像文件类型" gorm:"type:varchar(255);comment:录像文件类型,flv,mp4,raw,fmp4,hls"` - EventLevel string `json:"eventLevel" desc:"事件级别" gorm:"type:varchar(255);comment:事件级别,0表示重要事件,无法删除且表示无需自动删除,1表示非重要事件,达到自动删除时间后,自动删除;default:'1'"` + ID uint `gorm:"primarykey"` + StartTime, EndTime time.Time `gorm:"default:'1970-01-01 00:00:00'"` + EventId string `json:"eventId" desc:"事件编号" gorm:"type:varchar(255);comment:事件编号"` + RecordMode string `json:"recordMode" desc:"事件类型,0=连续录像模式,1=事件录像模式" gorm:"type:varchar(255);comment:事件类型,0=连续录像模式,1=事件录像模式;default:'0'"` + EventName string `json:"eventName" desc:"事件名称" gorm:"type:varchar(255);comment:事件名称"` + BeforeDuration time.Duration `json:"beforeDuration" desc:"事件前缓存时长" gorm:"type:BIGINT;comment:事件前缓存时长;default:30000000000"` + AfterDuration time.Duration `json:"afterDuration" desc:"事件后缓存时长" gorm:"type:BIGINT;comment:事件后缓存时长;default:30000000000"` + Filename string `json:"fileName" desc:"文件名" gorm:"type:varchar(255);comment:文件名"` + EventDesc string `json:"eventDesc" desc:"事件描述" gorm:"type:varchar(255);comment:事件描述"` + Type string `json:"type" desc:"录像文件类型" gorm:"type:varchar(255);comment:录像文件类型,flv,mp4,raw,fmp4,hls"` + EventLevel string `json:"eventLevel" desc:"事件级别" gorm:"type:varchar(255);comment:事件级别,0表示重要事件,无法删除且表示无需自动删除,1表示非重要事件,达到自动删除时间后,自动删除;default:'1'"` FilePath string StreamPath string AudioCodec, VideoCodec string