diff --git a/pkg/track.go b/pkg/track.go index 708908f..26225e5 100644 --- a/pkg/track.go +++ b/pkg/track.go @@ -222,14 +222,19 @@ func (t *TsTamer) Tame(ts time.Duration, fps int, scale float64) (result time.Du t.BaseTs -= ts } result = max(1*time.Millisecond, t.BaseTs+ts) - if fps > 0 { + + // 突变检测:仅在 fps 合理的情况下启用 + // 如果 fps > 100,说明是快速下载场景,接收速度远大于真实帧率,不应该做突变检测 + if fps > 0 && fps <= 100 { frameDur := float64(time.Second) / float64(fps) - if math.Abs(float64(result-t.LastTs)) > 10*frameDur*scale { //时间戳突变 - // t.Warn("timestamp mutation", "fps", t.FPS, "lastTs", uint32(t.LastTs/time.Millisecond), "ts", uint32(frame.Timestamp/time.Millisecond), "frameDur", time.Duration(frameDur)) + diff := math.Abs(float64(result - t.LastTs)) + threshold := 10 * frameDur * scale + if diff > threshold { //时间戳突变 result = t.LastTs + time.Duration(frameDur) t.BaseTs = result - ts } } + t.LastTs = result if t.LastScale != scale { t.BeforeScaleChangedTs = result diff --git a/plugin/gb28181/api.go b/plugin/gb28181/api.go index d8f25fb..e31d874 100644 --- a/plugin/gb28181/api.go +++ b/plugin/gb28181/api.go @@ -34,7 +34,7 @@ func (gb *GB28181Plugin) List(ctx context.Context, req *pb.GetDevicesRequest) (* // 应用筛选条件 if req.Query != "" { // 检查设备ID或名称是否包含查询字符串 - if !strings.Contains(device.DeviceId, req.Query) && !strings.Contains(device.Name, req.Query) { + if !strings.Contains(device.DeviceId, req.Query) && !strings.Contains(device.Name, req.Query) && !strings.Contains(device.CustomName, req.Query) { return true // 继续遍历 } } @@ -138,6 +138,7 @@ func (gb *GB28181Plugin) List(ctx context.Context, req *pb.GetDevicesRequest) (* SubscribeCatalog: util.Conditional(d.SubscribeCatalog == 0, false, true), SubscribePosition: util.Conditional(d.SubscribePosition == 0, false, true), SubscribeAlarm: util.Conditional(d.SubscribeAlarm == 0, false, true), + SsrcCheck: d.SSRCCheck, Charset: d.Charset, }) } @@ -518,6 +519,9 @@ func (gb *GB28181Plugin) UpdateDevice(ctx context.Context, req *pb.Device) (*pb. d.BroadcastPushAfterAck = false } + // 更新 SSRC 校验开关 + d.SSRCCheck = req.SsrcCheck + d.UpdateTime = time.Now() // 先停止设备任务 @@ -536,6 +540,7 @@ func (gb *GB28181Plugin) UpdateDevice(ctx context.Context, req *pb.Device) (*pb. "subscribe_catalog": d.SubscribeCatalog, "subscribe_position": d.SubscribePosition, "subscribe_alarm": d.SubscribeAlarm, + "ssrc_check": d.SSRCCheck, "update_time": d.UpdateTime, } @@ -669,6 +674,9 @@ func (gb *GB28181Plugin) UpdateDevice(ctx context.Context, req *pb.Device) (*pb. updates["subscribe_position"] = 0 // 不订阅 } + // 更新 SSRC 校验开关 + updates["ssrc_check"] = req.SsrcCheck + updates["update_time"] = time.Now() // 保存到数据库 @@ -3488,6 +3496,7 @@ func (gb *GB28181Plugin) StartDownload(ctx context.Context, req *pb.StartDownloa Status: "pending", Progress: 0, } + dialog.Logger = gb.Logger.With("streamPath", downloadId, "channelId", req.DeviceId+"_"+req.ChannelId) dialog.Task.Context = ctx // 10. 添加到下载对话集合(会自动调用 Start 方法) @@ -3553,7 +3562,7 @@ func (gb *GB28181Plugin) GetDownloadProgress(ctx context.Context, req *pb.GetDow Progress: int32(dialog.Progress), FilePath: dialog.FilePath, DownloadUrl: dialog.DownloadUrl, - Error: dialog.Error, + Error: dialog.ErrorString, StartedAt: timestamppb.New(dialog.StartedAt), } if !dialog.CompletedAt.IsZero() { diff --git a/plugin/gb28181/device.go b/plugin/gb28181/device.go index 94ecc83..6e16bf6 100644 --- a/plugin/gb28181/device.go +++ b/plugin/gb28181/device.go @@ -84,11 +84,11 @@ type Device struct { CreateTime time.Time `gorm:"primaryKey"` // 创建时间 UpdateTime time.Time // 更新时间 Charset string // 字符集, 支持 UTF-8 与 GB2312 - SubscribeCatalog int `gorm:"default:0"` // 目录订阅周期,0为不订阅 - SubscribePosition int `gorm:"default:0"` // 移动设备位置订阅周期,0为不订阅 - PositionInterval int `gorm:"default:6"` // 移动设备位置信息上报时间间隔,单位:秒,默认值6 - SubscribeAlarm int `gorm:"default:0"` // 报警订阅周期,0为不订阅 - SSRCCheck bool // 是否开启ssrc校验,默认关闭,开启可以防止串流 + SubscribeCatalog int `gorm:"default:0"` // 目录订阅周期,0为不订阅 + SubscribePosition int `gorm:"default:0"` // 移动设备位置订阅周期,0为不订阅 + PositionInterval int `gorm:"default:6"` // 移动设备位置信息上报时间间隔,单位:秒,默认值6 + SubscribeAlarm int `gorm:"default:0"` // 报警订阅周期,0为不订阅 + SSRCCheck bool `gorm:"default:false" default:"false"` // 是否开启ssrc校验,默认关闭,开启可以防止串流 GeoCoordSys string // 地理坐标系, 目前支持 WGS84,GCJ02 Password string // 密码 SipIp string // SIP交互IP(设备访问平台的IP) diff --git a/plugin/gb28181/dialog.go b/plugin/gb28181/dialog.go index b14be6e..d6761ee 100644 --- a/plugin/gb28181/dialog.go +++ b/plugin/gb28181/dialog.go @@ -107,7 +107,7 @@ func (d *Dialog) Start() (err error) { sss := strings.Split(d.pullCtx.RemoteURL, "/") if len(sss) < 2 { - d.Channel.Device.Info("remote url is invalid", d.pullCtx.RemoteURL) + d.Info("remote url is invalid", d.pullCtx.RemoteURL) d.pullCtx.Fail("remote url is invalid") return } @@ -243,23 +243,18 @@ func (d *Dialog) Start() (err error) { // 设置必需的头部 contentTypeHeader := sip.ContentTypeHeader("APPLICATION/SDP") subjectHeader := sip.NewHeader("Subject", fmt.Sprintf("%s:%s,%s:0", channelId, ssrc, d.gb.Serial)) - //allowHeader := sip.NewHeader("Allow", "INVITE, ACK, CANCEL, REGISTER, MESSAGE, NOTIFY, BYE") - //Toheader里需要放入目录通道的id toHeader := sip.ToHeader{ Address: sip.Uri{User: channelId, Host: channelId[0:10]}, } userAgentHeader := sip.NewHeader("User-Agent", "M7S/"+m7s.Version) - //customCallID := fmt.Sprintf("%s-%s-%d@%s", device.DeviceId, channelId, time.Now().Unix(), device.SipIp) customCallID := fmt.Sprintf("%s@%s", GenerateCallID(32), device.MediaIp) callID := sip.CallIDHeader(customCallID) maxforward := sip.MaxForwardsHeader(70) - //contentLengthHeader := sip.ContentLengthHeader(len(strings.Join(sdpInfo, "\r\n") + "\r\n")) csqHeader := sip.CSeqHeader{ SeqNo: uint32(device.SN), MethodName: "INVITE", } - //request.AppendHeader(&contentLengthHeader) contactHDR := sip.ContactHeader{ Address: sip.Uri{ User: d.gb.Serial, @@ -355,6 +350,14 @@ func (d *Dialog) setupReceiver(pub *mrtp.PSReceiver) { reader.Close() d.gb.singlePorts.Remove(reader) }) + } else { + // 多端口模式:根据SSRCCheck配置决定是否启用SSRC过滤 + if d.Channel.Device.SSRCCheck { + pub.ExpectedSSRC = d.SSRC + d.Info("multi-port mode, SSRC filtering enabled", "expectedSSRC", d.SSRC) + } else { + d.Info("multi-port mode, SSRC filtering disabled") + } } pub.ListenAddr = fmt.Sprintf(":%d", d.MediaPort) case mrtp.StreamModeUDP: @@ -384,10 +387,11 @@ func (d *Dialog) setupReceiver(pub *mrtp.PSReceiver) { func (d *Dialog) Run() (err error) { var pub mrtp.PSReceiver pub.Publisher = d.pullCtx.Publisher + pub.Logger = d.gb.Logger.With("streamPath", d.StreamPath) // 如果不是 BroadcastPushAfterAck 模式,提前创建监听器(多端口模式需要) if !d.Channel.Device.BroadcastPushAfterAck { - d.Channel.Device.Info("creating listener before WaitAnswer", "broadcastPushAfterAck", false, "addr", d.MediaPort) + d.Info("creating listener before WaitAnswer", "broadcastPushAfterAck", false, "addr", d.MediaPort) d.setupReceiver(&pub) // 提前启动监听器 @@ -398,15 +402,15 @@ func (d *Dialog) Run() (err error) { } } - d.Channel.Device.Info("before WaitAnswer") + d.Info("before WaitAnswer") err = d.session.WaitAnswer(d, sipgo.AnswerOptions{}) - d.Channel.Device.Info("after WaitAnswer") + d.Info("after WaitAnswer") if err != nil { d.pullCtx.Fail("等待响应错误: " + err.Error()) return errors.Join(errors.New("wait answer error"), err) } inviteResponseBody := string(d.session.InviteResponse.Body()) - d.Channel.Device.Info("inviteResponse", "body", inviteResponseBody) + d.Info("inviteResponse", "body", inviteResponseBody) // 添加响应信息到 Description d.SetDescriptions(task.Description{ @@ -450,10 +454,10 @@ func (d *Dialog) Run() (err error) { } } } + // 修复 Contact 地址:某些设备响应的 Contact 包含错误的域名,导致 ACK 发送失败 + // 强制使用原始的 Recipient 地址确保 ACK 能正确发送到设备 if d.session.InviteResponse.Contact() != nil { - if &d.session.InviteRequest.Recipient != &d.session.InviteResponse.Contact().Address { - d.session.InviteResponse.Contact().Address = d.session.InviteRequest.Recipient - } + d.session.InviteResponse.Contact().Address = d.session.InviteRequest.Recipient } // 添加解析后的响应参数到 Description @@ -466,19 +470,15 @@ func (d *Dialog) Run() (err error) { // 移动到流数据接收步骤 d.pullCtx.GoToStepConst(pkg.StepStreaming) - // 设置允许连接的IP地址(从INVITE响应中解析) - pub.Receiver.AllowedIP = d.targetIP - d.Channel.Device.Info("set allowed IP for receiver", "allowedIP", d.targetIP) - // TCP-ACTIVE 模式需要在解析 targetIP 后设置连接地址 if d.StreamMode == mrtp.StreamModeTCPActive { pub.ListenAddr = fmt.Sprintf("%s:%d", d.targetIP, d.targetPort) - d.Channel.Device.Info("set TCP-ACTIVE connect address", "addr", pub.ListenAddr) + d.Info("set TCP-ACTIVE connect address", "addr", pub.ListenAddr) } // 如果是 BroadcastPushAfterAck 模式,在 Ack 后创建监听器配置 if d.Channel.Device.BroadcastPushAfterAck { - d.Channel.Device.Info("setup receiver after Ack", "broadcastPushAfterAck", true) + d.Info("setup receiver after Ack", "broadcastPushAfterAck", true) d.setupReceiver(&pub) } diff --git a/plugin/gb28181/downloaddialog.go b/plugin/gb28181/downloaddialog.go index 3dd4926..b725d73 100644 --- a/plugin/gb28181/downloaddialog.go +++ b/plugin/gb28181/downloaddialog.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "io" + "os" "path/filepath" "strconv" "strings" @@ -42,7 +43,7 @@ type DownloadDialog struct { Progress int // 0-100 FilePath string DownloadUrl string // 下载链接 - Error string + ErrorString string StartedAt time.Time CompletedAt time.Time } @@ -87,6 +88,14 @@ func (d *DownloadDialog) setupReceiver(ps *mrtp.PSReceiver) { reader.Close() d.gb.singlePorts.Remove(reader) }) + } else { + // 多端口模式:根据SSRCCheck配置决定是否启用SSRC过滤 + if d.device.SSRCCheck { + ps.ExpectedSSRC = d.SSRC + d.Info("multi-port mode, SSRC filtering enabled", "expectedSSRC", d.SSRC) + } else { + d.Info("multi-port mode, SSRC filtering disabled") + } } ps.ListenAddr = fmt.Sprintf(":%d", d.MediaPort) case mrtp.StreamModeUDP: @@ -107,6 +116,14 @@ func (d *DownloadDialog) setupReceiver(ps *mrtp.PSReceiver) { reader.Close() d.gb.singlePorts.Remove(reader) }) + } else { + // 多端口模式:根据SSRCCheck配置决定是否启用SSRC过滤 + if d.device.SSRCCheck { + ps.ExpectedSSRC = d.SSRC + d.Info("multi-port mode, SSRC filtering enabled", "expectedSSRC", d.SSRC) + } else { + d.Info("multi-port mode, SSRC filtering disabled") + } } ps.ListenAddr = fmt.Sprintf(":%d", d.MediaPort) } @@ -128,8 +145,8 @@ func (d *DownloadDialog) Start() (err error) { device, ok := d.gb.devices.Get(d.DeviceId) if !ok { d.Status = "failed" - d.Error = fmt.Sprintf("设备不存在: %s", d.DeviceId) - return errors.Join(fmt.Errorf("device not found"), errors.New(d.Error)) + d.ErrorString = fmt.Sprintf("设备不存在: %s", d.DeviceId) + return errors.Join(fmt.Errorf("device not found"), errors.New(d.ErrorString)) } d.device = device @@ -137,8 +154,8 @@ func (d *DownloadDialog) Start() (err error) { channel, ok := device.channels.Get(channelKey) if !ok { d.Status = "failed" - d.Error = fmt.Sprintf("通道不存在: %s", d.ChannelId) - return errors.Join(fmt.Errorf("channel not found"), errors.New(d.Error)) + d.ErrorString = fmt.Sprintf("通道不存在: %s", d.ChannelId) + return errors.Join(fmt.Errorf("channel not found"), errors.New(d.ErrorString)) } d.channel = channel @@ -220,7 +237,7 @@ func (d *DownloadDialog) Start() (err error) { // 添加下载速度属性(默认1倍速,避免丢帧) downloadSpeed := d.DownloadSpeed if downloadSpeed <= 0 || downloadSpeed > 4 { - downloadSpeed = 4 // 默认1倍速 + downloadSpeed = 1 // 默认1倍速 } sdpInfo = append(sdpInfo, fmt.Sprintf("a=downloadspeed:%d", downloadSpeed)) @@ -322,10 +339,10 @@ func (d *DownloadDialog) Start() (err error) { // Go 运行下载会话(异步执行,支持并发) func (d *DownloadDialog) Go() error { var psReceiver mrtp.PSReceiver - + psReceiver.Logger = d.gb.Logger.With("streamPath", d.DownloadId) // 如果不是 BroadcastPushAfterAck 模式,提前创建监听器(多端口模式需要) if !d.device.BroadcastPushAfterAck { - d.device.Info("creating listener before WaitAnswer", "broadcastPushAfterAck", false, "addr", d.MediaPort) + d.Info("creating listener before WaitAnswer", "broadcastPushAfterAck", false, "addr", d.MediaPort) d.setupReceiver(&psReceiver) // 提前启动监听器 @@ -335,18 +352,18 @@ func (d *DownloadDialog) Go() error { } } - d.device.Info("before WaitAnswer") + d.Info("before WaitAnswer") err := d.session.WaitAnswer(d, sipgo.AnswerOptions{}) - d.device.Info("after WaitAnswer") + d.Info("after WaitAnswer") if err != nil { d.Status = "failed" - d.Error = fmt.Sprintf("等待响应失败: %v", err) + d.ErrorString = fmt.Sprintf("等待响应失败: %v", err) return errors.Join(errors.New("wait answer error"), err) } // 解析响应 inviteResponseBody := string(d.session.InviteResponse.Body()) - d.device.Info("收到 INVITE 响应", "body", inviteResponseBody) + d.Info("收到 INVITE 响应", "body", inviteResponseBody) // 添加响应信息到 Description d.SetDescriptions(task.Description{ "responseStatus": d.session.InviteResponse.StatusCode, @@ -388,16 +405,15 @@ func (d *DownloadDialog) Go() error { } } } - // invite响应里的contact是域名的话,sip尝试去解析,可能失败,这时候用invite请求里的recipient + // 修复 Contact 地址:某些设备响应的 Contact 包含错误的域名,导致 ACK 发送失败 + // 强制使用原始的 Recipient 地址确保 ACK 能正确发送到设备 if d.session.InviteResponse.Contact() != nil { - if &d.session.InviteRequest.Recipient != &d.session.InviteResponse.Contact().Address { - d.session.InviteResponse.Contact().Address = d.session.InviteRequest.Recipient - } + d.session.InviteResponse.Contact().Address = d.session.InviteRequest.Recipient } // 如果是 BroadcastPushAfterAck 模式,在 Ack 后创建监听器配置 if d.device.BroadcastPushAfterAck { - d.device.Info("setup receiver after Ack", "broadcastPushAfterAck", true) + d.Info("setup receiver after Ack", "broadcastPushAfterAck", true) d.setupReceiver(&psReceiver) } @@ -405,10 +421,10 @@ func (d *DownloadDialog) Go() error { err = d.session.Ack(d) if err != nil { // 与 dialog.Run 保持一致,仅记录错误,不直接 panic - d.device.Error("ack session", "err", err) + d.Error("ack session", "err", err) } - d.gb.Info("下载会话已建立", + d.Info("下载会话已建立", "ssrc", d.SSRC, "targetIP", d.targetIP, "targetPort", d.targetPort) @@ -429,7 +445,7 @@ func (d *DownloadDialog) Go() error { publisher, err := d.gb.PublishWithConfig(d, streamPath, pubConf) if err != nil { d.Status = "failed" - d.Error = fmt.Sprintf("创建 Publisher 失败: %v", err) + d.ErrorString = fmt.Sprintf("创建 Publisher 失败: %v", err) return fmt.Errorf("创建 Publisher 失败: %w", err) } @@ -439,7 +455,7 @@ func (d *DownloadDialog) Go() error { // 监听 Publisher 停止事件,主动停止 PSReceiver // 避免 Publisher timeout 后 PSReceiver 仍在阻塞等待数据 publisher.OnStop(func() { - d.gb.Info("Publisher 已停止,主动停止 PSReceiver", + d.Info("Publisher 已停止,主动停止 PSReceiver", "downloadId", d.DownloadId, "progress", d.Progress) psReceiver.Stop(io.EOF) @@ -465,7 +481,7 @@ func (d *DownloadDialog) Go() error { // 生成下载 URL d.DownloadUrl = fmt.Sprintf("/gb28181/download?downloadId=%s", d.DownloadId) - d.gb.Info("MP4 录制器已创建", + d.Info("MP4 录制器已创建", "streamPath", streamPath, "storagePathPrefix", filePath, "downloadUrl", d.DownloadUrl) @@ -473,7 +489,7 @@ func (d *DownloadDialog) Go() error { d.gb.Warn("MP4 插件未加载,无法录制") } - d.gb.Info("开始接收 RTP 数据并录制", "streamPath", streamPath) + d.Info("开始接收 RTP 数据并录制", "streamPath", streamPath) // 8. 设置进度更新回调(在 RTP 读取循环中触发,无需单独协程) totalDuration := d.EndTime.Sub(d.StartTime).Seconds() @@ -481,32 +497,93 @@ func (d *DownloadDialog) Go() error { d.updateProgress(&psReceiver, totalDuration) } - // 9. 使用 RunTask 运行 PSReceiver(会阻塞直到完成) - err = d.RunTask(&psReceiver) + // 9. 使用 RunTask 运行 PSReceiver,并添加数据接收超时监控 + // 如果超过 30 秒没有收到新数据(PTS 不更新),则认为下载超时 + dataTimeout := 30 * time.Second + errChan := make(chan error, 1) + go func() { + errChan <- d.RunTask(&psReceiver) + }() + + // 监控数据接收超时 + ticker := time.NewTicker(5 * time.Second) // 每 5 秒检查一次 + defer ticker.Stop() + + for { + select { + case err = <-errChan: + // PSReceiver 正常结束或出错 + goto DOWNLOAD_COMPLETE + case <-ticker.C: + // 获取当前进度 + elapsedSeconds := psReceiver.GetElapsedSeconds() + currentProgress := int((elapsedSeconds / totalDuration) * 100) + + // 检查是否接近完成且 PTS 稳定 + // 如果进度 >= 95% 且 PTS 已稳定(超过 2 秒无更新),认为下载完成 + if currentProgress >= 95 && psReceiver.IsPtsStable() { + d.Info("下载接近完成且 PTS 已稳定,主动结束任务", + "downloadId", d.DownloadId, + "progress", currentProgress, + "elapsedSeconds", elapsedSeconds, + "totalDuration", totalDuration) + // 主动停止 PSReceiver,标记为正常完成 + psReceiver.Stop(io.EOF) + err = <-errChan // 等待 RunTask 返回 + goto DOWNLOAD_COMPLETE + } + // 检查是否超时:如果已经有数据但长时间无更新,则超时 + if psReceiver.IsPtsStable() && time.Since(psReceiver.GetLastPtsUpdateTime()) > dataTimeout { + d.Warn("下载超时:超过 30 秒未收到新数据", + "downloadId", d.DownloadId, + "lastPtsUpdate", psReceiver.GetLastPtsUpdateTime(), + "timeout", dataTimeout) + // 主动停止 PSReceiver + psReceiver.Stop(fmt.Errorf("data receive timeout: no data for %v", dataTimeout)) + err = <-errChan // 等待 RunTask 返回 + goto DOWNLOAD_COMPLETE + } + } + } + +DOWNLOAD_COMPLETE: // 10. 任务完成,更新状态 if err != nil { - // 判断是否为正常结束:EOF/timeout 且视频PTS已稳定(说明流真的结束了) errStr := err.Error() - isNormalEnd := err == io.EOF || - strings.Contains(errStr, "EOF") || - strings.Contains(errStr, "timeout") + + // 判断是否为数据接收超时(我们主动停止的) + isDataTimeout := strings.Contains(errStr, "data receive timeout") + + // 判断是否为正常结束:EOF 且视频PTS已稳定(说明流真的结束了) + // 注意:不包括 "timeout",因为那可能是我们主动停止的超时 + isNormalEnd := err == io.EOF || strings.Contains(errStr, "EOF") // PTS稳定说明设备已经停止发送数据,流真正结束了 ptsStable := psReceiver.IsPtsStable() - if isNormalEnd && ptsStable { - d.gb.Info("下载完成:视频 PTS 已稳定,视为成功", + if isDataTimeout { + // 数据接收超时,标记为失败 + d.Status = "failed" + d.ErrorString = "下载超时:超过30秒未收到新数据" + d.Warn("下载超时失败", + "downloadId", d.DownloadId, + "progress", d.Progress, + "error", d.Error) + } else if isNormalEnd && ptsStable { + // EOF 且 PTS 稳定,视为正常完成 + d.Info("下载完成:视频 PTS 已稳定,视为成功", "downloadId", d.DownloadId, "progress", d.Progress, "error", errStr) d.Status = "completed" d.Progress = 100 - d.Error = "" // 清除错误信息 + d.ErrorString = "" // 清除错误信息 } else { + // 其他错误,标记为失败 d.Status = "failed" - d.Error = err.Error() - d.gb.Warn("下载失败", + d.ErrorString = err.Error() + d.Warn("下载失败", "downloadId", d.DownloadId, "progress", d.Progress, "ptsStable", ptsStable, @@ -518,84 +595,124 @@ func (d *DownloadDialog) Go() error { } d.CompletedAt = time.Now() - // 11. 延迟 5 秒后再返回,确保前端能轮询到 100% 状态 + // 11. 查询完整的文件路径(成功时需要,失败时可选) + var actualFilePath string + if d.gb.DB != nil && d.FilePath != "" { + var record m7s.RecordStream + // 使用 LIKE 查询匹配存储路径前缀的记录 + if err := d.gb.DB.Where("file_path LIKE ?", d.FilePath+"%"). + Order("start_time DESC").First(&record).Error; err == nil { + actualFilePath = record.FilePath + d.FilePath = actualFilePath // 更新为完整路径 + d.Info("找到完整文件路径", + "downloadId", d.DownloadId, + "filePath", actualFilePath) + } else { + d.Warn("未找到匹配的录制文件", + "downloadId", d.DownloadId, + "searchPath", d.FilePath, + "error", err) + } + } + + // 12. 创建 CompletedDownloadDialog(无论成功还是失败都需要) + completed := &CompletedDownloadDialog{ + DownloadId: d.DownloadId, + DeviceId: d.DeviceId, + ChannelId: d.ChannelId, + Status: d.Status, + Progress: d.Progress, + FilePath: d.FilePath, + DownloadUrl: d.DownloadUrl, + Error: d.ErrorString, + StartedAt: d.StartedAt, + CompletedAt: d.CompletedAt, + } + d.gb.completedDownloads.Set(completed) + + // 13. 清理 RecordStream 记录(成功和失败都需要) + if d.gb.DB != nil { + if actualFilePath != "" { + // 有完整路径,使用精确匹配删除 + if err := d.gb.DB.Where("file_path = ?", actualFilePath).Delete(&m7s.RecordStream{}).Error; err != nil { + d.Error("删除RecordStream记录失败", + "filePath", actualFilePath, + "error", err) + } else { + d.Info("已清理RecordStream记录", + "filePath", actualFilePath) + } + } else if d.FilePath != "" { + // 没有完整路径,使用模糊匹配删除 + if err := d.gb.DB.Where("file_path LIKE ?", d.FilePath+"%").Delete(&m7s.RecordStream{}).Error; err != nil { + d.Error("删除RecordStream记录失败", + "searchPath", d.FilePath, + "error", err) + } else { + d.Info("已清理RecordStream记录", + "searchPath", d.FilePath) + } + } + } + + // 14. 根据状态执行不同的后续操作 if d.Status == "completed" { - d.gb.Info("下载任务已完成,延迟 5 秒后释放资源(确保前端获取到 100% 状态)", + d.Info("下载任务已完成", "downloadId", d.DownloadId, "progress", d.Progress) - // 12. 从 RecordStream 表查询完整的文件路径(通过 LIKE 模糊匹配) - var actualFilePath string - if d.gb.DB != nil && d.FilePath != "" { - var record m7s.RecordStream - // 使用 LIKE 查询匹配存储路径前缀的记录 - if err := d.gb.DB.Where("file_path LIKE ?", d.FilePath+"%"). - Order("start_time DESC").First(&record).Error; err == nil { - actualFilePath = record.FilePath - d.FilePath = actualFilePath // 更新为完整路径 - d.gb.Info("找到完整文件路径", - "downloadId", d.DownloadId, - "filePath", actualFilePath) - } else { - d.gb.Warn("未找到匹配的录制文件", - "downloadId", d.DownloadId, - "searchPath", d.FilePath, - "error", err) - } - } - - completed := &CompletedDownloadDialog{ - DownloadId: d.DownloadId, - DeviceId: d.DeviceId, - ChannelId: d.ChannelId, - Status: d.Status, - Progress: d.Progress, - FilePath: d.FilePath, - DownloadUrl: d.DownloadUrl, - Error: d.Error, - StartedAt: d.StartedAt, - CompletedAt: d.CompletedAt, - } - d.gb.completedDownloads.Set(completed) - - // 13. 保存到 GB28181Record 缓存表并清理RecordStream记录 + // 保存到 GB28181Record 缓存表 if d.gb.DB != nil && actualFilePath != "" { record := &gb28181.GB28181Record{ DownloadId: d.DownloadId, FilePath: actualFilePath, Status: "completed", } - // 使用 Save 方法,如果存在则更新,不存在则插入 if err := d.gb.DB.Save(record).Error; err != nil { - d.gb.Error("保存下载记录到缓存表失败", + d.Error("保存下载记录到缓存表失败", "downloadId", d.DownloadId, "error", err) } else { - d.gb.Info("下载记录已保存到缓存表", + d.Info("下载记录已保存到缓存表", "downloadId", d.DownloadId, "filePath", actualFilePath) - - // 清理MP4插件的RecordStream记录(通过完整路径) - if err := d.gb.DB.Where("file_path = ?", actualFilePath).Delete(&m7s.RecordStream{}).Error; err != nil { - d.gb.Error("删除RecordStream记录失败", - "filePath", actualFilePath, - "error", err) - } else { - d.gb.Info("已清理RecordStream记录", - "filePath", actualFilePath) - } } } } else if d.Status == "failed" { - // 14. 下载失败时也需要清理RecordStream记录(通过 LIKE 模糊匹配) - if d.gb.DB != nil && d.FilePath != "" { - if err := d.gb.DB.Where("file_path LIKE ?", d.FilePath+"%").Delete(&m7s.RecordStream{}).Error; err != nil { - d.gb.Error("删除失败任务的RecordStream记录失败", - "searchPath", d.FilePath, + d.Warn("下载任务失败", + "downloadId", d.DownloadId, + "progress", d.Progress, + "error", d.Error) + + // 删除失败任务的文件(避免垃圾文件累积) + if actualFilePath != "" { + // 优先使用从 RecordStream 查询到的完整路径 + if err := os.Remove(actualFilePath); err == nil { + d.Info("已删除失败任务的文件", + "downloadId", d.DownloadId, + "filePath", actualFilePath) + } else if !os.IsNotExist(err) { + d.Warn("删除失败任务的文件失败", + "downloadId", d.DownloadId, + "filePath", actualFilePath, + "error", err) + } + } else if d.FilePath != "" { + // 如果没有查询到 actualFilePath,使用 d.FilePath 并添加扩展名 + filePath := d.FilePath + if !strings.HasSuffix(strings.ToLower(filePath), ".mp4") { + filePath += ".mp4" + } + + if err := os.Remove(filePath); err == nil { + d.Info("已删除失败任务的文件", + "downloadId", d.DownloadId, + "filePath", filePath) + } else if !os.IsNotExist(err) { + d.Warn("删除失败任务的文件失败", + "downloadId", d.DownloadId, + "filePath", filePath, "error", err) - } else { - d.gb.Info("已清理失败任务的RecordStream记录", - "searchPath", d.FilePath) } } } @@ -617,7 +734,7 @@ func (d *DownloadDialog) updateProgress(psReceiver *mrtp.PSReceiver, totalDurati } d.Progress = progress - d.gb.Info("下载进度更新", + d.Info("下载进度更新", "downloadId", d.DownloadId, "elapsedSeconds", elapsedSeconds, "totalDuration", totalDuration, @@ -648,7 +765,7 @@ func (d *DownloadDialog) Dispose() { }() // 2. 记录日志 - d.gb.Info("download dialog dispose", + d.Info("download dialog dispose", "downloadId", d.DownloadId, "ssrc", d.SSRC, "mediaPort", d.MediaPort, @@ -660,7 +777,7 @@ func (d *DownloadDialog) Dispose() { if d.session != nil && d.session.InviteResponse != nil { err := d.session.Bye(d) if err != nil { - d.gb.Error("发送 BYE 失败", "error", err) + d.Error("发送 BYE 失败", "error", err) } } } diff --git a/plugin/gb28181/forwarddialog.go b/plugin/gb28181/forwarddialog.go index 48fc91f..d8645c7 100644 --- a/plugin/gb28181/forwarddialog.go +++ b/plugin/gb28181/forwarddialog.go @@ -8,7 +8,7 @@ import ( sipgo "github.com/emiago/sipgo" "github.com/emiago/sipgo/sip" - "github.com/langhuihui/gotask" + task "github.com/langhuihui/gotask" m7s "m7s.live/v5" "m7s.live/v5/pkg/util" gb28181 "m7s.live/v5/plugin/gb28181/pkg" @@ -269,9 +269,7 @@ func (d *ForwardDialog) Run() (err error) { } } if d.session.InviteResponse.Contact() != nil { - if &d.session.InviteRequest.Recipient != &d.session.InviteResponse.Contact().Address { - d.session.InviteResponse.Contact().Address = d.session.InviteRequest.Recipient - } + d.session.InviteResponse.Contact().Address = d.session.InviteRequest.Recipient } err = d.session.Ack(d) if err != nil { diff --git a/plugin/gb28181/pb/gb28181.pb.go b/plugin/gb28181/pb/gb28181.pb.go index ad64241..21afb6d 100644 --- a/plugin/gb28181/pb/gb28181.pb.go +++ b/plugin/gb28181/pb/gb28181.pb.go @@ -943,6 +943,7 @@ type Device struct { Port int32 `protobuf:"varint,23,opt,name=port,proto3" json:"port,omitempty"` BroadcastPushAfterAck bool `protobuf:"varint,24,opt,name=broadcastPushAfterAck,proto3" json:"broadcastPushAfterAck,omitempty"` Charset string `protobuf:"bytes,25,opt,name=charset,proto3" json:"charset,omitempty"` + SsrcCheck bool `protobuf:"varint,26,opt,name=ssrcCheck,proto3" json:"ssrcCheck,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -1152,6 +1153,13 @@ func (x *Device) GetCharset() string { return "" } +func (x *Device) GetSsrcCheck() bool { + if x != nil { + return x.SsrcCheck + } + return false +} + type ResponseList struct { state protoimpl.MessageState `protogen:"open.v1"` Code int32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"` @@ -7003,7 +7011,7 @@ func (x *StartDownloadRequest) GetDownloadSpeed() int32 { // StartDownloadData 下载任务数据 type StartDownloadData struct { state protoimpl.MessageState `protogen:"open.v1"` - DownloadId string `protobuf:"bytes,1,opt,name=downloadId,proto3" json:"downloadId,omitempty"` // 下载任务ID(格式:startTime_endTime_deviceId_channelId) + DownloadId string `protobuf:"bytes,1,opt,name=downloadId,proto3" json:"downloadId,omitempty"` // 下载任务ID(格式:deviceId_channelId_startTime_endTime) Status string `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"` // 初始状态(pending) DownloadUrl string `protobuf:"bytes,3,opt,name=downloadUrl,proto3" json:"downloadUrl,omitempty"` // 下载链接(完成后可直接访问) unknownFields protoimpl.UnknownFields @@ -7489,7 +7497,7 @@ const file_gb28181_proto_rawDesc = "" + "\x06status\x18\x10 \x01(\tR\x06status\x124\n" + "\agpsTime\x18\x11 \x01(\v2\x1a.google.protobuf.TimestampR\agpsTime\x12\x1c\n" + "\tlongitude\x18\x12 \x01(\tR\tlongitude\x12\x1a\n" + - "\blatitude\x18\x13 \x01(\tR\blatitude\"\xef\x06\n" + + "\blatitude\x18\x13 \x01(\tR\blatitude\"\x8d\a\n" + "\x06Device\x12\x1a\n" + "\bdeviceId\x18\x01 \x01(\tR\bdeviceId\x12\x12\n" + "\x04name\x18\x02 \x01(\tR\x04name\x12\"\n" + @@ -7520,7 +7528,8 @@ const file_gb28181_proto_rawDesc = "" + "\x02ip\x18\x16 \x01(\tR\x02ip\x12\x12\n" + "\x04port\x18\x17 \x01(\x05R\x04port\x124\n" + "\x15broadcastPushAfterAck\x18\x18 \x01(\bR\x15broadcastPushAfterAck\x12\x18\n" + - "\acharset\x18\x19 \x01(\tR\acharset\"d\n" + + "\acharset\x18\x19 \x01(\tR\acharset\x12\x1c\n" + + "\tssrcCheck\x18\x1a \x01(\bR\tssrcCheck\"d\n" + "\fResponseList\x12\x12\n" + "\x04code\x18\x01 \x01(\x05R\x04code\x12\x18\n" + "\amessage\x18\x02 \x01(\tR\amessage\x12&\n" + diff --git a/plugin/gb28181/pb/gb28181.proto b/plugin/gb28181/pb/gb28181.proto index 30ed6f4..37abe84 100644 --- a/plugin/gb28181/pb/gb28181.proto +++ b/plugin/gb28181/pb/gb28181.proto @@ -649,6 +649,7 @@ message Device { int32 port = 23; bool broadcastPushAfterAck =24; string charset =25; + bool ssrcCheck =26; } message ResponseList { diff --git a/plugin/rtp/pkg/transceiver.go b/plugin/rtp/pkg/transceiver.go index 1f44d07..c676120 100644 --- a/plugin/rtp/pkg/transceiver.go +++ b/plugin/rtp/pkg/transceiver.go @@ -1,6 +1,7 @@ package rtp import ( + "encoding/binary" "errors" "fmt" "io" @@ -58,12 +59,12 @@ type Receiver struct { *util.BufReader ListenAddr string net.Listener - StreamMode StreamMode - RTPMouth chan []byte - SinglePort io.ReadCloser - rtpReader *RTPPayloadReader // 保存 RTP 读取器引用 - AllowedIP string // 允许连接的IP地址(为空则不限制) - started bool // 标记是否已经启动过 + StreamMode StreamMode + RTPMouth chan []byte + SinglePort io.ReadCloser + rtpReader *RTPPayloadReader // 保存 RTP 读取器引用 + ExpectedSSRC uint32 // 预期的SSRC(为0则不过滤) + started bool // 标记是否已经启动过 } type PSReceiver struct { @@ -111,8 +112,6 @@ func (p *PSReceiver) Run() error { p.MpegPsDemuxer.Allocator = gomem.NewScalableMemoryAllocator(1 << gomem.MinPowerOf2) p.Using(p.MpegPsDemuxer.Allocator) - // 确保回调已设置 - p.MpegPsDemuxer.OnVideoPtsUpdate = p.UpdateVideoPts return p.MpegPsDemuxer.Feed(p.BufReader) } @@ -167,6 +166,11 @@ func (p *PSReceiver) IsPtsStable() bool { return time.Since(p.lastPtsUpdate) > 2*time.Second } +// GetLastPtsUpdateTime 获取最后一次 PTS 更新的时间 +func (p *PSReceiver) GetLastPtsUpdateTime() time.Time { + return p.lastPtsUpdate +} + func (p *Receiver) Start() (err error) { // 如果已经启动过,直接返回 if p.started { @@ -212,10 +216,6 @@ func (p *Receiver) Start() (err error) { } else { conn = p.SinglePort } - if err != nil { - p.Error("accept", "err", err) - return err - } p.OnStop(conn.Close) rtpReader = NewRTPPayloadReader(NewRTPTCPReader(conn)) p.rtpReader = rtpReader @@ -255,14 +255,72 @@ func (p *Receiver) Start() (err error) { func (p *Receiver) Run() error { if p.Listener != nil { - conn, err := p.Accept() - if err != nil { - return err + // 循环Accept,支持SSRC过滤时拒绝不匹配的连接 + for { + conn, err := p.Accept() + if err != nil { + return err + } + + // 如果设置了ExpectedSSRC,验证第一个RTP包的SSRC + if p.ExpectedSSRC != 0 { + // TCP模式:RFC 4571格式,前2字节是长度,然后是RTP包 + var buffer [14]byte // 2字节长度 + 12字节RTP header + conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + _, err := io.ReadFull(conn, buffer[:]) + conn.SetReadDeadline(time.Time{}) + + if err != nil { + p.Warn("failed to read RTP header for SSRC validation", "err", err, "remote", conn.RemoteAddr()) + conn.Close() + continue + } + + // 解析SSRC(跳过2字节长度前缀,RTP header的字节8-11) + ssrc := binary.BigEndian.Uint32(buffer[10:14]) + + if ssrc != p.ExpectedSSRC { + p.Warn("reject connection with wrong SSRC", + "expected", p.ExpectedSSRC, + "actual", ssrc, + "remote", conn.RemoteAddr()) + conn.Close() + continue + } + + p.Info("accept connection with correct SSRC", "ssrc", ssrc, "remote", conn.RemoteAddr()) + + // 创建带缓冲的连接,包含已读取的数据(2字节长度+12字节RTP header) + conn = &BufferedConn{ + Conn: conn, + buffer: buffer[:], + } + } + + p.OnStop(conn.Close) + rtpReader := NewRTPPayloadReader(NewRTPTCPReader(conn)) + p.rtpReader = rtpReader + p.BufReader = util.NewBufReader(rtpReader) + return nil } - p.OnStop(conn.Close) - rtpReader := NewRTPPayloadReader(NewRTPTCPReader(conn)) - p.rtpReader = rtpReader - p.BufReader = util.NewBufReader(rtpReader) } return nil } + +// BufferedConn 包装已读取的数据,用于SSRC验证后重新读取RTP header +type BufferedConn struct { + net.Conn + buffer []byte + offset int +} + +func (bc *BufferedConn) Read(p []byte) (n int, err error) { + // 先读取缓冲区中的数据 + if bc.offset < len(bc.buffer) { + n = copy(p, bc.buffer[bc.offset:]) + bc.offset += n + return n, nil + } + // 缓冲区读完后,从底层连接读取 + return bc.Conn.Read(p) +}