feat: gb28181 ssrc check,choose downloadspeed 1/2/4

This commit is contained in:
pggiroro
2025-11-21 21:56:43 +08:00
parent 84592afb14
commit 9d5c01d3a0
9 changed files with 347 additions and 150 deletions
+8 -3
View File
@@ -222,14 +222,19 @@ func (t *TsTamer) Tame(ts time.Duration, fps int, scale float64) (result time.Du
t.BaseTs -= ts
}
result = max(1*time.Millisecond, t.BaseTs+ts)
if fps > 0 {
// 突变检测:仅在 fps 合理的情况下启用
// 如果 fps > 100,说明是快速下载场景,接收速度远大于真实帧率,不应该做突变检测
if fps > 0 && fps <= 100 {
frameDur := float64(time.Second) / float64(fps)
if math.Abs(float64(result-t.LastTs)) > 10*frameDur*scale { //时间戳突变
// t.Warn("timestamp mutation", "fps", t.FPS, "lastTs", uint32(t.LastTs/time.Millisecond), "ts", uint32(frame.Timestamp/time.Millisecond), "frameDur", time.Duration(frameDur))
diff := math.Abs(float64(result - t.LastTs))
threshold := 10 * frameDur * scale
if diff > threshold { //时间戳突变
result = t.LastTs + time.Duration(frameDur)
t.BaseTs = result - ts
}
}
t.LastTs = result
if t.LastScale != scale {
t.BeforeScaleChangedTs = result
+11 -2
View File
@@ -34,7 +34,7 @@ func (gb *GB28181Plugin) List(ctx context.Context, req *pb.GetDevicesRequest) (*
// 应用筛选条件
if req.Query != "" {
// 检查设备ID或名称是否包含查询字符串
if !strings.Contains(device.DeviceId, req.Query) && !strings.Contains(device.Name, req.Query) {
if !strings.Contains(device.DeviceId, req.Query) && !strings.Contains(device.Name, req.Query) && !strings.Contains(device.CustomName, req.Query) {
return true // 继续遍历
}
}
@@ -138,6 +138,7 @@ func (gb *GB28181Plugin) List(ctx context.Context, req *pb.GetDevicesRequest) (*
SubscribeCatalog: util.Conditional(d.SubscribeCatalog == 0, false, true),
SubscribePosition: util.Conditional(d.SubscribePosition == 0, false, true),
SubscribeAlarm: util.Conditional(d.SubscribeAlarm == 0, false, true),
SsrcCheck: d.SSRCCheck,
Charset: d.Charset,
})
}
@@ -518,6 +519,9 @@ func (gb *GB28181Plugin) UpdateDevice(ctx context.Context, req *pb.Device) (*pb.
d.BroadcastPushAfterAck = false
}
// 更新 SSRC 校验开关
d.SSRCCheck = req.SsrcCheck
d.UpdateTime = time.Now()
// 先停止设备任务
@@ -536,6 +540,7 @@ func (gb *GB28181Plugin) UpdateDevice(ctx context.Context, req *pb.Device) (*pb.
"subscribe_catalog": d.SubscribeCatalog,
"subscribe_position": d.SubscribePosition,
"subscribe_alarm": d.SubscribeAlarm,
"ssrc_check": d.SSRCCheck,
"update_time": d.UpdateTime,
}
@@ -669,6 +674,9 @@ func (gb *GB28181Plugin) UpdateDevice(ctx context.Context, req *pb.Device) (*pb.
updates["subscribe_position"] = 0 // 不订阅
}
// 更新 SSRC 校验开关
updates["ssrc_check"] = req.SsrcCheck
updates["update_time"] = time.Now()
// 保存到数据库
@@ -3488,6 +3496,7 @@ func (gb *GB28181Plugin) StartDownload(ctx context.Context, req *pb.StartDownloa
Status: "pending",
Progress: 0,
}
dialog.Logger = gb.Logger.With("streamPath", downloadId, "channelId", req.DeviceId+"_"+req.ChannelId)
dialog.Task.Context = ctx
// 10. 添加到下载对话集合(会自动调用 Start 方法)
@@ -3553,7 +3562,7 @@ func (gb *GB28181Plugin) GetDownloadProgress(ctx context.Context, req *pb.GetDow
Progress: int32(dialog.Progress),
FilePath: dialog.FilePath,
DownloadUrl: dialog.DownloadUrl,
Error: dialog.Error,
Error: dialog.ErrorString,
StartedAt: timestamppb.New(dialog.StartedAt),
}
if !dialog.CompletedAt.IsZero() {
+5 -5
View File
@@ -84,11 +84,11 @@ type Device struct {
CreateTime time.Time `gorm:"primaryKey"` // 创建时间
UpdateTime time.Time // 更新时间
Charset string // 字符集, 支持 UTF-8 与 GB2312
SubscribeCatalog int `gorm:"default:0"` // 目录订阅周期,0为不订阅
SubscribePosition int `gorm:"default:0"` // 移动设备位置订阅周期,0为不订阅
PositionInterval int `gorm:"default:6"` // 移动设备位置信息上报时间间隔,单位:秒,默认值6
SubscribeAlarm int `gorm:"default:0"` // 报警订阅周期,0为不订阅
SSRCCheck bool // 是否开启ssrc校验,默认关闭,开启可以防止串流
SubscribeCatalog int `gorm:"default:0"` // 目录订阅周期,0为不订阅
SubscribePosition int `gorm:"default:0"` // 移动设备位置订阅周期,0为不订阅
PositionInterval int `gorm:"default:6"` // 移动设备位置信息上报时间间隔,单位:秒,默认值6
SubscribeAlarm int `gorm:"default:0"` // 报警订阅周期,0为不订阅
SSRCCheck bool `gorm:"default:false" default:"false"` // 是否开启ssrc校验,默认关闭,开启可以防止串流
GeoCoordSys string // 地理坐标系, 目前支持 WGS84,GCJ02
Password string // 密码
SipIp string // SIP交互IP(设备访问平台的IP)
+19 -19
View File
@@ -107,7 +107,7 @@ func (d *Dialog) Start() (err error) {
sss := strings.Split(d.pullCtx.RemoteURL, "/")
if len(sss) < 2 {
d.Channel.Device.Info("remote url is invalid", d.pullCtx.RemoteURL)
d.Info("remote url is invalid", d.pullCtx.RemoteURL)
d.pullCtx.Fail("remote url is invalid")
return
}
@@ -243,23 +243,18 @@ func (d *Dialog) Start() (err error) {
// 设置必需的头部
contentTypeHeader := sip.ContentTypeHeader("APPLICATION/SDP")
subjectHeader := sip.NewHeader("Subject", fmt.Sprintf("%s:%s,%s:0", channelId, ssrc, d.gb.Serial))
//allowHeader := sip.NewHeader("Allow", "INVITE, ACK, CANCEL, REGISTER, MESSAGE, NOTIFY, BYE")
//Toheader里需要放入目录通道的id
toHeader := sip.ToHeader{
Address: sip.Uri{User: channelId, Host: channelId[0:10]},
}
userAgentHeader := sip.NewHeader("User-Agent", "M7S/"+m7s.Version)
//customCallID := fmt.Sprintf("%s-%s-%d@%s", device.DeviceId, channelId, time.Now().Unix(), device.SipIp)
customCallID := fmt.Sprintf("%s@%s", GenerateCallID(32), device.MediaIp)
callID := sip.CallIDHeader(customCallID)
maxforward := sip.MaxForwardsHeader(70)
//contentLengthHeader := sip.ContentLengthHeader(len(strings.Join(sdpInfo, "\r\n") + "\r\n"))
csqHeader := sip.CSeqHeader{
SeqNo: uint32(device.SN),
MethodName: "INVITE",
}
//request.AppendHeader(&contentLengthHeader)
contactHDR := sip.ContactHeader{
Address: sip.Uri{
User: d.gb.Serial,
@@ -355,6 +350,14 @@ func (d *Dialog) setupReceiver(pub *mrtp.PSReceiver) {
reader.Close()
d.gb.singlePorts.Remove(reader)
})
} else {
// 多端口模式:根据SSRCCheck配置决定是否启用SSRC过滤
if d.Channel.Device.SSRCCheck {
pub.ExpectedSSRC = d.SSRC
d.Info("multi-port mode, SSRC filtering enabled", "expectedSSRC", d.SSRC)
} else {
d.Info("multi-port mode, SSRC filtering disabled")
}
}
pub.ListenAddr = fmt.Sprintf(":%d", d.MediaPort)
case mrtp.StreamModeUDP:
@@ -384,10 +387,11 @@ func (d *Dialog) setupReceiver(pub *mrtp.PSReceiver) {
func (d *Dialog) Run() (err error) {
var pub mrtp.PSReceiver
pub.Publisher = d.pullCtx.Publisher
pub.Logger = d.gb.Logger.With("streamPath", d.StreamPath)
// 如果不是 BroadcastPushAfterAck 模式,提前创建监听器(多端口模式需要)
if !d.Channel.Device.BroadcastPushAfterAck {
d.Channel.Device.Info("creating listener before WaitAnswer", "broadcastPushAfterAck", false, "addr", d.MediaPort)
d.Info("creating listener before WaitAnswer", "broadcastPushAfterAck", false, "addr", d.MediaPort)
d.setupReceiver(&pub)
// 提前启动监听器
@@ -398,15 +402,15 @@ func (d *Dialog) Run() (err error) {
}
}
d.Channel.Device.Info("before WaitAnswer")
d.Info("before WaitAnswer")
err = d.session.WaitAnswer(d, sipgo.AnswerOptions{})
d.Channel.Device.Info("after WaitAnswer")
d.Info("after WaitAnswer")
if err != nil {
d.pullCtx.Fail("等待响应错误: " + err.Error())
return errors.Join(errors.New("wait answer error"), err)
}
inviteResponseBody := string(d.session.InviteResponse.Body())
d.Channel.Device.Info("inviteResponse", "body", inviteResponseBody)
d.Info("inviteResponse", "body", inviteResponseBody)
// 添加响应信息到 Description
d.SetDescriptions(task.Description{
@@ -450,10 +454,10 @@ func (d *Dialog) Run() (err error) {
}
}
}
// 修复 Contact 地址:某些设备响应的 Contact 包含错误的域名,导致 ACK 发送失败
// 强制使用原始的 Recipient 地址确保 ACK 能正确发送到设备
if d.session.InviteResponse.Contact() != nil {
if &d.session.InviteRequest.Recipient != &d.session.InviteResponse.Contact().Address {
d.session.InviteResponse.Contact().Address = d.session.InviteRequest.Recipient
}
d.session.InviteResponse.Contact().Address = d.session.InviteRequest.Recipient
}
// 添加解析后的响应参数到 Description
@@ -466,19 +470,15 @@ func (d *Dialog) Run() (err error) {
// 移动到流数据接收步骤
d.pullCtx.GoToStepConst(pkg.StepStreaming)
// 设置允许连接的IP地址(从INVITE响应中解析)
pub.Receiver.AllowedIP = d.targetIP
d.Channel.Device.Info("set allowed IP for receiver", "allowedIP", d.targetIP)
// TCP-ACTIVE 模式需要在解析 targetIP 后设置连接地址
if d.StreamMode == mrtp.StreamModeTCPActive {
pub.ListenAddr = fmt.Sprintf("%s:%d", d.targetIP, d.targetPort)
d.Channel.Device.Info("set TCP-ACTIVE connect address", "addr", pub.ListenAddr)
d.Info("set TCP-ACTIVE connect address", "addr", pub.ListenAddr)
}
// 如果是 BroadcastPushAfterAck 模式,在 Ack 后创建监听器配置
if d.Channel.Device.BroadcastPushAfterAck {
d.Channel.Device.Info("setup receiver after Ack", "broadcastPushAfterAck", true)
d.Info("setup receiver after Ack", "broadcastPushAfterAck", true)
d.setupReceiver(&pub)
}
+212 -95
View File
@@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"strings"
@@ -42,7 +43,7 @@ type DownloadDialog struct {
Progress int // 0-100
FilePath string
DownloadUrl string // 下载链接
Error string
ErrorString string
StartedAt time.Time
CompletedAt time.Time
}
@@ -87,6 +88,14 @@ func (d *DownloadDialog) setupReceiver(ps *mrtp.PSReceiver) {
reader.Close()
d.gb.singlePorts.Remove(reader)
})
} else {
// 多端口模式:根据SSRCCheck配置决定是否启用SSRC过滤
if d.device.SSRCCheck {
ps.ExpectedSSRC = d.SSRC
d.Info("multi-port mode, SSRC filtering enabled", "expectedSSRC", d.SSRC)
} else {
d.Info("multi-port mode, SSRC filtering disabled")
}
}
ps.ListenAddr = fmt.Sprintf(":%d", d.MediaPort)
case mrtp.StreamModeUDP:
@@ -107,6 +116,14 @@ func (d *DownloadDialog) setupReceiver(ps *mrtp.PSReceiver) {
reader.Close()
d.gb.singlePorts.Remove(reader)
})
} else {
// 多端口模式:根据SSRCCheck配置决定是否启用SSRC过滤
if d.device.SSRCCheck {
ps.ExpectedSSRC = d.SSRC
d.Info("multi-port mode, SSRC filtering enabled", "expectedSSRC", d.SSRC)
} else {
d.Info("multi-port mode, SSRC filtering disabled")
}
}
ps.ListenAddr = fmt.Sprintf(":%d", d.MediaPort)
}
@@ -128,8 +145,8 @@ func (d *DownloadDialog) Start() (err error) {
device, ok := d.gb.devices.Get(d.DeviceId)
if !ok {
d.Status = "failed"
d.Error = fmt.Sprintf("设备不存在: %s", d.DeviceId)
return errors.Join(fmt.Errorf("device not found"), errors.New(d.Error))
d.ErrorString = fmt.Sprintf("设备不存在: %s", d.DeviceId)
return errors.Join(fmt.Errorf("device not found"), errors.New(d.ErrorString))
}
d.device = device
@@ -137,8 +154,8 @@ func (d *DownloadDialog) Start() (err error) {
channel, ok := device.channels.Get(channelKey)
if !ok {
d.Status = "failed"
d.Error = fmt.Sprintf("通道不存在: %s", d.ChannelId)
return errors.Join(fmt.Errorf("channel not found"), errors.New(d.Error))
d.ErrorString = fmt.Sprintf("通道不存在: %s", d.ChannelId)
return errors.Join(fmt.Errorf("channel not found"), errors.New(d.ErrorString))
}
d.channel = channel
@@ -220,7 +237,7 @@ func (d *DownloadDialog) Start() (err error) {
// 添加下载速度属性(默认1倍速,避免丢帧)
downloadSpeed := d.DownloadSpeed
if downloadSpeed <= 0 || downloadSpeed > 4 {
downloadSpeed = 4 // 默认1倍速
downloadSpeed = 1 // 默认1倍速
}
sdpInfo = append(sdpInfo, fmt.Sprintf("a=downloadspeed:%d", downloadSpeed))
@@ -322,10 +339,10 @@ func (d *DownloadDialog) Start() (err error) {
// Go 运行下载会话(异步执行,支持并发)
func (d *DownloadDialog) Go() error {
var psReceiver mrtp.PSReceiver
psReceiver.Logger = d.gb.Logger.With("streamPath", d.DownloadId)
// 如果不是 BroadcastPushAfterAck 模式,提前创建监听器(多端口模式需要)
if !d.device.BroadcastPushAfterAck {
d.device.Info("creating listener before WaitAnswer", "broadcastPushAfterAck", false, "addr", d.MediaPort)
d.Info("creating listener before WaitAnswer", "broadcastPushAfterAck", false, "addr", d.MediaPort)
d.setupReceiver(&psReceiver)
// 提前启动监听器
@@ -335,18 +352,18 @@ func (d *DownloadDialog) Go() error {
}
}
d.device.Info("before WaitAnswer")
d.Info("before WaitAnswer")
err := d.session.WaitAnswer(d, sipgo.AnswerOptions{})
d.device.Info("after WaitAnswer")
d.Info("after WaitAnswer")
if err != nil {
d.Status = "failed"
d.Error = fmt.Sprintf("等待响应失败: %v", err)
d.ErrorString = fmt.Sprintf("等待响应失败: %v", err)
return errors.Join(errors.New("wait answer error"), err)
}
// 解析响应
inviteResponseBody := string(d.session.InviteResponse.Body())
d.device.Info("收到 INVITE 响应", "body", inviteResponseBody)
d.Info("收到 INVITE 响应", "body", inviteResponseBody)
// 添加响应信息到 Description
d.SetDescriptions(task.Description{
"responseStatus": d.session.InviteResponse.StatusCode,
@@ -388,16 +405,15 @@ func (d *DownloadDialog) Go() error {
}
}
}
// invite响应里的contact是域名的话,sip尝试去解析,可能失败,这时候用invite请求里的recipient
// 修复 Contact 地址:某些设备响应的 Contact 包含错误的域名,导致 ACK 发送失败
// 强制使用原始的 Recipient 地址确保 ACK 能正确发送到设备
if d.session.InviteResponse.Contact() != nil {
if &d.session.InviteRequest.Recipient != &d.session.InviteResponse.Contact().Address {
d.session.InviteResponse.Contact().Address = d.session.InviteRequest.Recipient
}
d.session.InviteResponse.Contact().Address = d.session.InviteRequest.Recipient
}
// 如果是 BroadcastPushAfterAck 模式,在 Ack 后创建监听器配置
if d.device.BroadcastPushAfterAck {
d.device.Info("setup receiver after Ack", "broadcastPushAfterAck", true)
d.Info("setup receiver after Ack", "broadcastPushAfterAck", true)
d.setupReceiver(&psReceiver)
}
@@ -405,10 +421,10 @@ func (d *DownloadDialog) Go() error {
err = d.session.Ack(d)
if err != nil {
// 与 dialog.Run 保持一致,仅记录错误,不直接 panic
d.device.Error("ack session", "err", err)
d.Error("ack session", "err", err)
}
d.gb.Info("下载会话已建立",
d.Info("下载会话已建立",
"ssrc", d.SSRC,
"targetIP", d.targetIP,
"targetPort", d.targetPort)
@@ -429,7 +445,7 @@ func (d *DownloadDialog) Go() error {
publisher, err := d.gb.PublishWithConfig(d, streamPath, pubConf)
if err != nil {
d.Status = "failed"
d.Error = fmt.Sprintf("创建 Publisher 失败: %v", err)
d.ErrorString = fmt.Sprintf("创建 Publisher 失败: %v", err)
return fmt.Errorf("创建 Publisher 失败: %w", err)
}
@@ -439,7 +455,7 @@ func (d *DownloadDialog) Go() error {
// 监听 Publisher 停止事件,主动停止 PSReceiver
// 避免 Publisher timeout 后 PSReceiver 仍在阻塞等待数据
publisher.OnStop(func() {
d.gb.Info("Publisher 已停止,主动停止 PSReceiver",
d.Info("Publisher 已停止,主动停止 PSReceiver",
"downloadId", d.DownloadId,
"progress", d.Progress)
psReceiver.Stop(io.EOF)
@@ -465,7 +481,7 @@ func (d *DownloadDialog) Go() error {
// 生成下载 URL
d.DownloadUrl = fmt.Sprintf("/gb28181/download?downloadId=%s", d.DownloadId)
d.gb.Info("MP4 录制器已创建",
d.Info("MP4 录制器已创建",
"streamPath", streamPath,
"storagePathPrefix", filePath,
"downloadUrl", d.DownloadUrl)
@@ -473,7 +489,7 @@ func (d *DownloadDialog) Go() error {
d.gb.Warn("MP4 插件未加载,无法录制")
}
d.gb.Info("开始接收 RTP 数据并录制", "streamPath", streamPath)
d.Info("开始接收 RTP 数据并录制", "streamPath", streamPath)
// 8. 设置进度更新回调(在 RTP 读取循环中触发,无需单独协程)
totalDuration := d.EndTime.Sub(d.StartTime).Seconds()
@@ -481,32 +497,93 @@ func (d *DownloadDialog) Go() error {
d.updateProgress(&psReceiver, totalDuration)
}
// 9. 使用 RunTask 运行 PSReceiver(会阻塞直到完成)
err = d.RunTask(&psReceiver)
// 9. 使用 RunTask 运行 PSReceiver,并添加数据接收超时监控
// 如果超过 30 秒没有收到新数据(PTS 不更新),则认为下载超时
dataTimeout := 30 * time.Second
errChan := make(chan error, 1)
go func() {
errChan <- d.RunTask(&psReceiver)
}()
// 监控数据接收超时
ticker := time.NewTicker(5 * time.Second) // 每 5 秒检查一次
defer ticker.Stop()
for {
select {
case err = <-errChan:
// PSReceiver 正常结束或出错
goto DOWNLOAD_COMPLETE
case <-ticker.C:
// 获取当前进度
elapsedSeconds := psReceiver.GetElapsedSeconds()
currentProgress := int((elapsedSeconds / totalDuration) * 100)
// 检查是否接近完成且 PTS 稳定
// 如果进度 >= 95% 且 PTS 已稳定(超过 2 秒无更新),认为下载完成
if currentProgress >= 95 && psReceiver.IsPtsStable() {
d.Info("下载接近完成且 PTS 已稳定,主动结束任务",
"downloadId", d.DownloadId,
"progress", currentProgress,
"elapsedSeconds", elapsedSeconds,
"totalDuration", totalDuration)
// 主动停止 PSReceiver,标记为正常完成
psReceiver.Stop(io.EOF)
err = <-errChan // 等待 RunTask 返回
goto DOWNLOAD_COMPLETE
}
// 检查是否超时:如果已经有数据但长时间无更新,则超时
if psReceiver.IsPtsStable() && time.Since(psReceiver.GetLastPtsUpdateTime()) > dataTimeout {
d.Warn("下载超时:超过 30 秒未收到新数据",
"downloadId", d.DownloadId,
"lastPtsUpdate", psReceiver.GetLastPtsUpdateTime(),
"timeout", dataTimeout)
// 主动停止 PSReceiver
psReceiver.Stop(fmt.Errorf("data receive timeout: no data for %v", dataTimeout))
err = <-errChan // 等待 RunTask 返回
goto DOWNLOAD_COMPLETE
}
}
}
DOWNLOAD_COMPLETE:
// 10. 任务完成,更新状态
if err != nil {
// 判断是否为正常结束:EOF/timeout 且视频PTS已稳定(说明流真的结束了)
errStr := err.Error()
isNormalEnd := err == io.EOF ||
strings.Contains(errStr, "EOF") ||
strings.Contains(errStr, "timeout")
// 判断是否为数据接收超时(我们主动停止的)
isDataTimeout := strings.Contains(errStr, "data receive timeout")
// 判断是否为正常结束:EOF 且视频PTS已稳定(说明流真的结束了)
// 注意:不包括 "timeout",因为那可能是我们主动停止的超时
isNormalEnd := err == io.EOF || strings.Contains(errStr, "EOF")
// PTS稳定说明设备已经停止发送数据,流真正结束了
ptsStable := psReceiver.IsPtsStable()
if isNormalEnd && ptsStable {
d.gb.Info("下载完成:视频 PTS 已稳定,视为成功",
if isDataTimeout {
// 数据接收超时,标记为失败
d.Status = "failed"
d.ErrorString = "下载超时:超过30秒未收到新数据"
d.Warn("下载超时失败",
"downloadId", d.DownloadId,
"progress", d.Progress,
"error", d.Error)
} else if isNormalEnd && ptsStable {
// EOF 且 PTS 稳定,视为正常完成
d.Info("下载完成:视频 PTS 已稳定,视为成功",
"downloadId", d.DownloadId,
"progress", d.Progress,
"error", errStr)
d.Status = "completed"
d.Progress = 100
d.Error = "" // 清除错误信息
d.ErrorString = "" // 清除错误信息
} else {
// 其他错误,标记为失败
d.Status = "failed"
d.Error = err.Error()
d.gb.Warn("下载失败",
d.ErrorString = err.Error()
d.Warn("下载失败",
"downloadId", d.DownloadId,
"progress", d.Progress,
"ptsStable", ptsStable,
@@ -518,84 +595,124 @@ func (d *DownloadDialog) Go() error {
}
d.CompletedAt = time.Now()
// 11. 延迟 5 秒后再返回,确保前端能轮询到 100% 状态
// 11. 查询完整的文件路径(成功时需要,失败时可选)
var actualFilePath string
if d.gb.DB != nil && d.FilePath != "" {
var record m7s.RecordStream
// 使用 LIKE 查询匹配存储路径前缀的记录
if err := d.gb.DB.Where("file_path LIKE ?", d.FilePath+"%").
Order("start_time DESC").First(&record).Error; err == nil {
actualFilePath = record.FilePath
d.FilePath = actualFilePath // 更新为完整路径
d.Info("找到完整文件路径",
"downloadId", d.DownloadId,
"filePath", actualFilePath)
} else {
d.Warn("未找到匹配的录制文件",
"downloadId", d.DownloadId,
"searchPath", d.FilePath,
"error", err)
}
}
// 12. 创建 CompletedDownloadDialog(无论成功还是失败都需要)
completed := &CompletedDownloadDialog{
DownloadId: d.DownloadId,
DeviceId: d.DeviceId,
ChannelId: d.ChannelId,
Status: d.Status,
Progress: d.Progress,
FilePath: d.FilePath,
DownloadUrl: d.DownloadUrl,
Error: d.ErrorString,
StartedAt: d.StartedAt,
CompletedAt: d.CompletedAt,
}
d.gb.completedDownloads.Set(completed)
// 13. 清理 RecordStream 记录(成功和失败都需要)
if d.gb.DB != nil {
if actualFilePath != "" {
// 有完整路径,使用精确匹配删除
if err := d.gb.DB.Where("file_path = ?", actualFilePath).Delete(&m7s.RecordStream{}).Error; err != nil {
d.Error("删除RecordStream记录失败",
"filePath", actualFilePath,
"error", err)
} else {
d.Info("已清理RecordStream记录",
"filePath", actualFilePath)
}
} else if d.FilePath != "" {
// 没有完整路径,使用模糊匹配删除
if err := d.gb.DB.Where("file_path LIKE ?", d.FilePath+"%").Delete(&m7s.RecordStream{}).Error; err != nil {
d.Error("删除RecordStream记录失败",
"searchPath", d.FilePath,
"error", err)
} else {
d.Info("已清理RecordStream记录",
"searchPath", d.FilePath)
}
}
}
// 14. 根据状态执行不同的后续操作
if d.Status == "completed" {
d.gb.Info("下载任务已完成,延迟 5 秒后释放资源(确保前端获取到 100% 状态)",
d.Info("下载任务已完成",
"downloadId", d.DownloadId,
"progress", d.Progress)
// 12. 从 RecordStream 表查询完整的文件路径(通过 LIKE 模糊匹配)
var actualFilePath string
if d.gb.DB != nil && d.FilePath != "" {
var record m7s.RecordStream
// 使用 LIKE 查询匹配存储路径前缀的记录
if err := d.gb.DB.Where("file_path LIKE ?", d.FilePath+"%").
Order("start_time DESC").First(&record).Error; err == nil {
actualFilePath = record.FilePath
d.FilePath = actualFilePath // 更新为完整路径
d.gb.Info("找到完整文件路径",
"downloadId", d.DownloadId,
"filePath", actualFilePath)
} else {
d.gb.Warn("未找到匹配的录制文件",
"downloadId", d.DownloadId,
"searchPath", d.FilePath,
"error", err)
}
}
completed := &CompletedDownloadDialog{
DownloadId: d.DownloadId,
DeviceId: d.DeviceId,
ChannelId: d.ChannelId,
Status: d.Status,
Progress: d.Progress,
FilePath: d.FilePath,
DownloadUrl: d.DownloadUrl,
Error: d.Error,
StartedAt: d.StartedAt,
CompletedAt: d.CompletedAt,
}
d.gb.completedDownloads.Set(completed)
// 13. 保存到 GB28181Record 缓存表并清理RecordStream记录
// 保存到 GB28181Record 缓存表
if d.gb.DB != nil && actualFilePath != "" {
record := &gb28181.GB28181Record{
DownloadId: d.DownloadId,
FilePath: actualFilePath,
Status: "completed",
}
// 使用 Save 方法,如果存在则更新,不存在则插入
if err := d.gb.DB.Save(record).Error; err != nil {
d.gb.Error("保存下载记录到缓存表失败",
d.Error("保存下载记录到缓存表失败",
"downloadId", d.DownloadId,
"error", err)
} else {
d.gb.Info("下载记录已保存到缓存表",
d.Info("下载记录已保存到缓存表",
"downloadId", d.DownloadId,
"filePath", actualFilePath)
// 清理MP4插件的RecordStream记录(通过完整路径)
if err := d.gb.DB.Where("file_path = ?", actualFilePath).Delete(&m7s.RecordStream{}).Error; err != nil {
d.gb.Error("删除RecordStream记录失败",
"filePath", actualFilePath,
"error", err)
} else {
d.gb.Info("已清理RecordStream记录",
"filePath", actualFilePath)
}
}
}
} else if d.Status == "failed" {
// 14. 下载失败时也需要清理RecordStream记录(通过 LIKE 模糊匹配)
if d.gb.DB != nil && d.FilePath != "" {
if err := d.gb.DB.Where("file_path LIKE ?", d.FilePath+"%").Delete(&m7s.RecordStream{}).Error; err != nil {
d.gb.Error("删除失败任务的RecordStream记录失败",
"searchPath", d.FilePath,
d.Warn("下载任务失败",
"downloadId", d.DownloadId,
"progress", d.Progress,
"error", d.Error)
// 删除失败任务的文件(避免垃圾文件累积)
if actualFilePath != "" {
// 优先使用从 RecordStream 查询到的完整路径
if err := os.Remove(actualFilePath); err == nil {
d.Info("已删除失败任务的文件",
"downloadId", d.DownloadId,
"filePath", actualFilePath)
} else if !os.IsNotExist(err) {
d.Warn("删除失败任务的文件失败",
"downloadId", d.DownloadId,
"filePath", actualFilePath,
"error", err)
}
} else if d.FilePath != "" {
// 如果没有查询到 actualFilePath,使用 d.FilePath 并添加扩展名
filePath := d.FilePath
if !strings.HasSuffix(strings.ToLower(filePath), ".mp4") {
filePath += ".mp4"
}
if err := os.Remove(filePath); err == nil {
d.Info("已删除失败任务的文件",
"downloadId", d.DownloadId,
"filePath", filePath)
} else if !os.IsNotExist(err) {
d.Warn("删除失败任务的文件失败",
"downloadId", d.DownloadId,
"filePath", filePath,
"error", err)
} else {
d.gb.Info("已清理失败任务的RecordStream记录",
"searchPath", d.FilePath)
}
}
}
@@ -617,7 +734,7 @@ func (d *DownloadDialog) updateProgress(psReceiver *mrtp.PSReceiver, totalDurati
}
d.Progress = progress
d.gb.Info("下载进度更新",
d.Info("下载进度更新",
"downloadId", d.DownloadId,
"elapsedSeconds", elapsedSeconds,
"totalDuration", totalDuration,
@@ -648,7 +765,7 @@ func (d *DownloadDialog) Dispose() {
}()
// 2. 记录日志
d.gb.Info("download dialog dispose",
d.Info("download dialog dispose",
"downloadId", d.DownloadId,
"ssrc", d.SSRC,
"mediaPort", d.MediaPort,
@@ -660,7 +777,7 @@ func (d *DownloadDialog) Dispose() {
if d.session != nil && d.session.InviteResponse != nil {
err := d.session.Bye(d)
if err != nil {
d.gb.Error("发送 BYE 失败", "error", err)
d.Error("发送 BYE 失败", "error", err)
}
}
}
+2 -4
View File
@@ -8,7 +8,7 @@ import (
sipgo "github.com/emiago/sipgo"
"github.com/emiago/sipgo/sip"
"github.com/langhuihui/gotask"
task "github.com/langhuihui/gotask"
m7s "m7s.live/v5"
"m7s.live/v5/pkg/util"
gb28181 "m7s.live/v5/plugin/gb28181/pkg"
@@ -269,9 +269,7 @@ func (d *ForwardDialog) Run() (err error) {
}
}
if d.session.InviteResponse.Contact() != nil {
if &d.session.InviteRequest.Recipient != &d.session.InviteResponse.Contact().Address {
d.session.InviteResponse.Contact().Address = d.session.InviteRequest.Recipient
}
d.session.InviteResponse.Contact().Address = d.session.InviteRequest.Recipient
}
err = d.session.Ack(d)
if err != nil {
+12 -3
View File
@@ -943,6 +943,7 @@ type Device struct {
Port int32 `protobuf:"varint,23,opt,name=port,proto3" json:"port,omitempty"`
BroadcastPushAfterAck bool `protobuf:"varint,24,opt,name=broadcastPushAfterAck,proto3" json:"broadcastPushAfterAck,omitempty"`
Charset string `protobuf:"bytes,25,opt,name=charset,proto3" json:"charset,omitempty"`
SsrcCheck bool `protobuf:"varint,26,opt,name=ssrcCheck,proto3" json:"ssrcCheck,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -1152,6 +1153,13 @@ func (x *Device) GetCharset() string {
return ""
}
func (x *Device) GetSsrcCheck() bool {
if x != nil {
return x.SsrcCheck
}
return false
}
type ResponseList struct {
state protoimpl.MessageState `protogen:"open.v1"`
Code int32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
@@ -7003,7 +7011,7 @@ func (x *StartDownloadRequest) GetDownloadSpeed() int32 {
// StartDownloadData 下载任务数据
type StartDownloadData struct {
state protoimpl.MessageState `protogen:"open.v1"`
DownloadId string `protobuf:"bytes,1,opt,name=downloadId,proto3" json:"downloadId,omitempty"` // 下载任务ID(格式:startTime_endTime_deviceId_channelId
DownloadId string `protobuf:"bytes,1,opt,name=downloadId,proto3" json:"downloadId,omitempty"` // 下载任务ID(格式:deviceId_channelId_startTime_endTime
Status string `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"` // 初始状态(pending
DownloadUrl string `protobuf:"bytes,3,opt,name=downloadUrl,proto3" json:"downloadUrl,omitempty"` // 下载链接(完成后可直接访问)
unknownFields protoimpl.UnknownFields
@@ -7489,7 +7497,7 @@ const file_gb28181_proto_rawDesc = "" +
"\x06status\x18\x10 \x01(\tR\x06status\x124\n" +
"\agpsTime\x18\x11 \x01(\v2\x1a.google.protobuf.TimestampR\agpsTime\x12\x1c\n" +
"\tlongitude\x18\x12 \x01(\tR\tlongitude\x12\x1a\n" +
"\blatitude\x18\x13 \x01(\tR\blatitude\"\xef\x06\n" +
"\blatitude\x18\x13 \x01(\tR\blatitude\"\x8d\a\n" +
"\x06Device\x12\x1a\n" +
"\bdeviceId\x18\x01 \x01(\tR\bdeviceId\x12\x12\n" +
"\x04name\x18\x02 \x01(\tR\x04name\x12\"\n" +
@@ -7520,7 +7528,8 @@ const file_gb28181_proto_rawDesc = "" +
"\x02ip\x18\x16 \x01(\tR\x02ip\x12\x12\n" +
"\x04port\x18\x17 \x01(\x05R\x04port\x124\n" +
"\x15broadcastPushAfterAck\x18\x18 \x01(\bR\x15broadcastPushAfterAck\x12\x18\n" +
"\acharset\x18\x19 \x01(\tR\acharset\"d\n" +
"\acharset\x18\x19 \x01(\tR\acharset\x12\x1c\n" +
"\tssrcCheck\x18\x1a \x01(\bR\tssrcCheck\"d\n" +
"\fResponseList\x12\x12\n" +
"\x04code\x18\x01 \x01(\x05R\x04code\x12\x18\n" +
"\amessage\x18\x02 \x01(\tR\amessage\x12&\n" +
+1
View File
@@ -649,6 +649,7 @@ message Device {
int32 port = 23;
bool broadcastPushAfterAck =24;
string charset =25;
bool ssrcCheck =26;
}
message ResponseList {
+77 -19
View File
@@ -1,6 +1,7 @@
package rtp
import (
"encoding/binary"
"errors"
"fmt"
"io"
@@ -58,12 +59,12 @@ type Receiver struct {
*util.BufReader
ListenAddr string
net.Listener
StreamMode StreamMode
RTPMouth chan []byte
SinglePort io.ReadCloser
rtpReader *RTPPayloadReader // 保存 RTP 读取器引用
AllowedIP string // 允许连接的IP地址(为则不限制
started bool // 标记是否已经启动过
StreamMode StreamMode
RTPMouth chan []byte
SinglePort io.ReadCloser
rtpReader *RTPPayloadReader // 保存 RTP 读取器引用
ExpectedSSRC uint32 // 预期的SSRC(为0则不过滤
started bool // 标记是否已经启动过
}
type PSReceiver struct {
@@ -111,8 +112,6 @@ func (p *PSReceiver) Run() error {
p.MpegPsDemuxer.Allocator = gomem.NewScalableMemoryAllocator(1 << gomem.MinPowerOf2)
p.Using(p.MpegPsDemuxer.Allocator)
// 确保回调已设置
p.MpegPsDemuxer.OnVideoPtsUpdate = p.UpdateVideoPts
return p.MpegPsDemuxer.Feed(p.BufReader)
}
@@ -167,6 +166,11 @@ func (p *PSReceiver) IsPtsStable() bool {
return time.Since(p.lastPtsUpdate) > 2*time.Second
}
// GetLastPtsUpdateTime 获取最后一次 PTS 更新的时间
func (p *PSReceiver) GetLastPtsUpdateTime() time.Time {
return p.lastPtsUpdate
}
func (p *Receiver) Start() (err error) {
// 如果已经启动过,直接返回
if p.started {
@@ -212,10 +216,6 @@ func (p *Receiver) Start() (err error) {
} else {
conn = p.SinglePort
}
if err != nil {
p.Error("accept", "err", err)
return err
}
p.OnStop(conn.Close)
rtpReader = NewRTPPayloadReader(NewRTPTCPReader(conn))
p.rtpReader = rtpReader
@@ -255,14 +255,72 @@ func (p *Receiver) Start() (err error) {
func (p *Receiver) Run() error {
if p.Listener != nil {
conn, err := p.Accept()
if err != nil {
return err
// 循环Accept,支持SSRC过滤时拒绝不匹配的连接
for {
conn, err := p.Accept()
if err != nil {
return err
}
// 如果设置了ExpectedSSRC,验证第一个RTP包的SSRC
if p.ExpectedSSRC != 0 {
// TCP模式:RFC 4571格式,前2字节是长度,然后是RTP包
var buffer [14]byte // 2字节长度 + 12字节RTP header
conn.SetReadDeadline(time.Now().Add(5 * time.Second))
_, err := io.ReadFull(conn, buffer[:])
conn.SetReadDeadline(time.Time{})
if err != nil {
p.Warn("failed to read RTP header for SSRC validation", "err", err, "remote", conn.RemoteAddr())
conn.Close()
continue
}
// 解析SSRC(跳过2字节长度前缀,RTP header的字节8-11
ssrc := binary.BigEndian.Uint32(buffer[10:14])
if ssrc != p.ExpectedSSRC {
p.Warn("reject connection with wrong SSRC",
"expected", p.ExpectedSSRC,
"actual", ssrc,
"remote", conn.RemoteAddr())
conn.Close()
continue
}
p.Info("accept connection with correct SSRC", "ssrc", ssrc, "remote", conn.RemoteAddr())
// 创建带缓冲的连接,包含已读取的数据(2字节长度+12字节RTP header
conn = &BufferedConn{
Conn: conn,
buffer: buffer[:],
}
}
p.OnStop(conn.Close)
rtpReader := NewRTPPayloadReader(NewRTPTCPReader(conn))
p.rtpReader = rtpReader
p.BufReader = util.NewBufReader(rtpReader)
return nil
}
p.OnStop(conn.Close)
rtpReader := NewRTPPayloadReader(NewRTPTCPReader(conn))
p.rtpReader = rtpReader
p.BufReader = util.NewBufReader(rtpReader)
}
return nil
}
// BufferedConn 包装已读取的数据,用于SSRC验证后重新读取RTP header
type BufferedConn struct {
net.Conn
buffer []byte
offset int
}
func (bc *BufferedConn) Read(p []byte) (n int, err error) {
// 先读取缓冲区中的数据
if bc.offset < len(bc.buffer) {
n = copy(p, bc.buffer[bc.offset:])
bc.offset += n
return n, nil
}
// 缓冲区读完后,从底层连接读取
return bc.Conn.Read(p)
}