diff --git a/plugin/gb28181pro/api.go b/plugin/gb28181pro/api.go index 548f665..6f0ef8d 100644 --- a/plugin/gb28181pro/api.go +++ b/plugin/gb28181pro/api.go @@ -5,7 +5,6 @@ import ( "fmt" "net/http" "os" - "strconv" "strings" "sync" "time" @@ -571,23 +570,15 @@ func (gb *GB28181ProPlugin) AddPlatform(ctx context.Context, req *pb.Platform) ( // 如果平台启用,则启动注册任务 if platform.Enable { - // 获取本地SIP端口 - localPort := 5060 // 默认端口 - if len(gb.Sip.ListenAddr) > 0 { - if port, err := strconv.Atoi(strings.Split(gb.Sip.ListenAddr[0], ":")[1]); err == nil { - localPort = port - } - } - // 创建平台命令器 - commander := gb28181.NewSIPPlatformCommander(platform, gb.GetPublicIP(platform.ServerIP), localPort) - if err := commander.InitializeSIPClient(gb.ua); err != nil { + // 使用 platform 自身初始化 SIP 客户端 + if err := platform.InitializeSIPClient(gb.ua); err != nil { gb.Error("初始化SIP客户端失败", "error", err) resp.Code = 500 resp.Message = fmt.Sprintf("初始化SIP客户端失败: %v", err) return resp, nil } // 启动注册任务 - commander.StartRegisterTask(gb) + platform.StartRegisterTask() } resp.Code = 0 @@ -726,22 +717,14 @@ func (gb *GB28181ProPlugin) UpdatePlatform(ctx context.Context, req *pb.Platform // 处理平台启用状态变化 if !wasEnabled && platform.Enable { - // 获取本地SIP端口 - localPort := 5060 // 默认端口 - if len(gb.Sip.ListenAddr) > 0 { - if port, err := strconv.Atoi(strings.Split(gb.Sip.ListenAddr[0], ":")[1]); err == nil { - localPort = port - } - } - // 平台从禁用变为启用,启动注册任务 - commander := gb28181.NewSIPPlatformCommander(&platform, gb.GetPublicIP(platform.ServerIP), localPort) - if err := commander.InitializeSIPClient(gb.ua); err != nil { + // 使用 platform 自身初始化 SIP 客户端 + if err := platform.InitializeSIPClient(gb.ua); err != nil { gb.Error("初始化SIP客户端失败", "error", err) resp.Code = 500 resp.Message = fmt.Sprintf("初始化SIP客户端失败: %v", err) return resp, nil } - commander.StartRegisterTask(gb) + platform.StartRegisterTask() } else if wasEnabled && !platform.Enable { // TODO: 平台从启用变为禁用,需要处理注销逻辑 // 这里可以添加注销相关的代码 diff --git a/plugin/gb28181pro/pkg/platform.go b/plugin/gb28181pro/pkg/platform.go index 915a698..2ee178d 100644 --- a/plugin/gb28181pro/pkg/platform.go +++ b/plugin/gb28181pro/pkg/platform.go @@ -2,52 +2,58 @@ package gb28181 import ( + "context" + "fmt" + "time" + "github.com/emiago/sipgo" "github.com/emiago/sipgo/sip" + "m7s.live/v5/pkg/task" ) // Platform 表示GB28181平台的配置信息。 // 包含了平台的基本信息、SIP服务配置、设备信息、认证信息等。 // 用于存储和管理GB28181平台的所有相关参数。 type Platform struct { - ID int `gorm:"primaryKey;autoIncrement" json:"id"` // ID表示数据库中的唯一标识符 - Enable bool `gorm:"column:enable" json:"enable"` // Enable表示该平台配置是否启用 - Name string `gorm:"column:name" json:"name"` // Name表示平台的名称 - ServerGBID string `gorm:"column:server_gb_id" json:"serverGBId"` // ServerGBID表示SIP服务器的国标编码 - ServerGBDomain string `gorm:"column:server_gb_domain" json:"serverGBDomain"` // ServerGBDomain表示SIP服务器的国标域 - ServerIP string `gorm:"column:server_ip" json:"serverIp"` // ServerIP表示SIP服务器的IP地址 - ServerPort int `gorm:"column:server_port" json:"serverPort"` // ServerPort表示SIP服务器的端口号 - DeviceGBID string `gorm:"column:device_gb_id" json:"deviceGBId"` // DeviceGBID表示设备的国标编号 - DeviceIP string `gorm:"column:device_ip" json:"deviceIp"` // DeviceIP表示设备的IP地址 - DevicePort int `gorm:"column:device_port" json:"devicePort"` // DevicePort表示设备的端口号 - Username string `gorm:"column:username" json:"username"` // Username表示SIP认证的用户名,默认使用设备国标编号 - Password string `gorm:"column:password" json:"password"` // Password表示SIP认证的密码 - Expires int `gorm:"column:expires" json:"expires"` // Expires表示注册的过期时间,单位为秒 - KeepTimeout int `gorm:"column:keep_timeout" json:"keepTimeout"` // KeepTimeout表示心跳超时时间,单位为秒 - Transport string `gorm:"column:transport" json:"transport"` // Transport表示传输协议类型 - CharacterSet string `gorm:"column:character_set" json:"characterSet"` // CharacterSet表示字符集编码 - PTZ bool `gorm:"column:ptz" json:"ptz"` // PTZ表示是否允许云台控制 - RTCP bool `gorm:"column:rtcp" json:"rtcp"` // RTCP表示是否启用RTCP流保活 - Status bool `gorm:"column:status" json:"status"` // Status表示平台当前的在线状态 - ChannelCount int `gorm:"column:channel_count" json:"channelCount"` // ChannelCount表示通道数量 - CatalogSubscribe bool `gorm:"column:catalog_subscribe" json:"catalogSubscribe"` // CatalogSubscribe表示是否已订阅目录信息 - AlarmSubscribe bool `gorm:"column:alarm_subscribe" json:"alarmSubscribe"` // AlarmSubscribe表示是否已订阅报警信息 - MobilePositionSubscribe bool `gorm:"column:mobile_position_subscribe" json:"mobilePositionSubscribe"` // MobilePositionSubscribe表示是否已订阅移动位置信息 - CatalogGroup int `gorm:"column:catalog_group" json:"catalogGroup"` // CatalogGroup表示目录分组大小,每次向上级发送通道数量 - UpdateTime string `gorm:"column:update_time" json:"updateTime"` // UpdateTime表示最后更新时间 - CreateTime string `gorm:"column:create_time" json:"createTime"` // CreateTime表示创建时间 - AsMessageChannel bool `gorm:"column:as_message_channel" json:"asMessageChannel"` // AsMessageChannel表示是否作为消息通道使用 - SendStreamIP string `gorm:"column:send_stream_ip" json:"sendStreamIp"` // SendStreamIP表示点播回复200OK时使用的IP地址 - AutoPushChannel *bool `gorm:"column:auto_push_channel" json:"autoPushChannel"` // AutoPushChannel表示是否自动推送通道变化 - CatalogWithPlatform int `gorm:"column:catalog_with_platform" json:"catalogWithPlatform"` // CatalogWithPlatform表示目录信息是否包含平台信息(0:关闭,1:打开) - CatalogWithGroup int `gorm:"column:catalog_with_group" json:"catalogWithGroup"` // CatalogWithGroup表示目录信息是否包含分组信息(0:关闭,1:打开) - CatalogWithRegion int `gorm:"column:catalog_with_region" json:"catalogWithRegion"` // CatalogWithRegion表示目录信息是否包含行政区划(0:关闭,1:打开) - CivilCode string `gorm:"column:civil_code" json:"civilCode"` // CivilCode表示行政区划代码 - Manufacturer string `gorm:"column:manufacturer" json:"manufacturer"` // Manufacturer表示平台厂商 - Model string `gorm:"column:model" json:"model"` // Model表示平台型号 - Address string `gorm:"column:address" json:"address"` // Address表示平台安装地址 - RegisterWay int `gorm:"column:register_way" json:"registerWay"` // RegisterWay表示注册方式(1:标准认证注册,2:口令认证,3:数字证书双向认证,4:数字证书单向认证) - Secrecy int `gorm:"column:secrecy" json:"secrecy"` // Secrecy表示保密属性(0:不涉密,1:涉密) + task.Job `gorm:"-:all"` // 使用 Task 而不是 Job,并且排除 gorm 序列化 + ID int `gorm:"primaryKey;autoIncrement" json:"id"` // ID表示数据库中的唯一标识符 + Enable bool `gorm:"column:enable" json:"enable"` // Enable表示该平台配置是否启用 + Name string `gorm:"column:name" json:"name"` // Name表示平台的名称 + ServerGBID string `gorm:"column:server_gb_id" json:"serverGBId"` // ServerGBID表示SIP服务器的国标编码 + ServerGBDomain string `gorm:"column:server_gb_domain" json:"serverGBDomain"` // ServerGBDomain表示SIP服务器的国标域 + ServerIP string `gorm:"column:server_ip" json:"serverIp"` // ServerIP表示SIP服务器的IP地址 + ServerPort int `gorm:"column:server_port" json:"serverPort"` // ServerPort表示SIP服务器的端口号 + DeviceGBID string `gorm:"column:device_gb_id" json:"deviceGBId"` // DeviceGBID表示设备的国标编号 + DeviceIP string `gorm:"column:device_ip" json:"deviceIp"` // DeviceIP表示设备的IP地址 + DevicePort int `gorm:"column:device_port" json:"devicePort"` // DevicePort表示设备的端口号 + Username string `gorm:"column:username" json:"username"` // Username表示SIP认证的用户名,默认使用设备国标编号 + Password string `gorm:"column:password" json:"password"` // Password表示SIP认证的密码 + Expires int `gorm:"column:expires" json:"expires"` // Expires表示注册的过期时间,单位为秒 + KeepTimeout int `gorm:"column:keep_timeout" json:"keepTimeout"` // KeepTimeout表示心跳超时时间,单位为秒 + Transport string `gorm:"column:transport" json:"transport"` // Transport表示传输协议类型 + CharacterSet string `gorm:"column:character_set" json:"characterSet"` // CharacterSet表示字符集编码 + PTZ bool `gorm:"column:ptz" json:"ptz"` // PTZ表示是否允许云台控制 + RTCP bool `gorm:"column:rtcp" json:"rtcp"` // RTCP表示是否启用RTCP流保活 + Status bool `gorm:"column:status" json:"status"` // Status表示平台当前的在线状态 + ChannelCount int `gorm:"column:channel_count" json:"channelCount"` // ChannelCount表示通道数量 + CatalogSubscribe bool `gorm:"column:catalog_subscribe" json:"catalogSubscribe"` // CatalogSubscribe表示是否已订阅目录信息 + AlarmSubscribe bool `gorm:"column:alarm_subscribe" json:"alarmSubscribe"` // AlarmSubscribe表示是否已订阅报警信息 + MobilePositionSubscribe bool `gorm:"column:mobile_position_subscribe" json:"mobilePositionSubscribe"` // MobilePositionSubscribe表示是否已订阅移动位置信息 + CatalogGroup int `gorm:"column:catalog_group" json:"catalogGroup"` // CatalogGroup表示目录分组大小,每次向上级发送通道数量 + UpdateTime string `gorm:"column:update_time" json:"updateTime"` // UpdateTime表示最后更新时间 + CreateTime string `gorm:"column:create_time" json:"createTime"` // CreateTime表示创建时间 + AsMessageChannel bool `gorm:"column:as_message_channel" json:"asMessageChannel"` // AsMessageChannel表示是否作为消息通道使用 + SendStreamIP string `gorm:"column:send_stream_ip" json:"sendStreamIp"` // SendStreamIP表示点播回复200OK时使用的IP地址 + AutoPushChannel *bool `gorm:"column:auto_push_channel" json:"autoPushChannel"` // AutoPushChannel表示是否自动推送通道变化 + CatalogWithPlatform int `gorm:"column:catalog_with_platform" json:"catalogWithPlatform"` // CatalogWithPlatform表示目录信息是否包含平台信息(0:关闭,1:打开) + CatalogWithGroup int `gorm:"column:catalog_with_group" json:"catalogWithGroup"` // CatalogWithGroup表示目录信息是否包含分组信息(0:关闭,1:打开) + CatalogWithRegion int `gorm:"column:catalog_with_region" json:"catalogWithRegion"` // CatalogWithRegion表示目录信息是否包含行政区划(0:关闭,1:打开) + CivilCode string `gorm:"column:civil_code" json:"civilCode"` // CivilCode表示行政区划代码 + Manufacturer string `gorm:"column:manufacturer" json:"manufacturer"` // Manufacturer表示平台厂商 + Model string `gorm:"column:model" json:"model"` // Model表示平台型号 + Address string `gorm:"column:address" json:"address"` // Address表示平台安装地址 + RegisterWay int `gorm:"column:register_way" json:"registerWay"` // RegisterWay表示注册方式(1:标准认证注册,2:口令认证,3:数字证书双向认证,4:数字证书单向认证) + Secrecy int `gorm:"column:secrecy" json:"secrecy"` // Secrecy表示保密属性(0:不涉密,1:涉密) // 运行时字段,不存储到数据库 KeepAliveReply int `gorm:"-" json:"keepAliveReply"` // KeepAliveReply表示心跳未回复次数 @@ -110,3 +116,36 @@ func (p *Platform) SetCallID(callID string) { func (p *Platform) GetCallID() string { return p.CallID } + +// KeepAlive 任务 +type KeepAlive struct { + task.TickTask + platform *Platform +} + +func (k *KeepAlive) GetTickInterval() time.Duration { + return time.Second * time.Duration(k.platform.KeepTimeout) +} + +func (k *KeepAlive) Tick(any) { + if !k.platform.Enable { + return + } + + ctx := context.Background() + _, err := k.platform.Keepalive(ctx) + if err != nil { + k.platform.IncrementKeepAliveReply() + k.Error("keepalive", "error", err.Error()) + if k.platform.KeepAliveReply >= 3 { + k.platform.Status = false + k.Stop(fmt.Errorf("max keepalive retries reached")) + // 重新启动注册任务 + var rt RegisterTask + rt.platform = k.platform + k.platform.AddTask(&rt) + } + } else { + k.platform.ResetKeepAliveReply() + } +} diff --git a/plugin/gb28181pro/pkg/sipplatformcommand.go b/plugin/gb28181pro/pkg/sipplatformcommand.go index b5e9e77..c75ef7e 100644 --- a/plugin/gb28181pro/pkg/sipplatformcommand.go +++ b/plugin/gb28181pro/pkg/sipplatformcommand.go @@ -8,11 +8,14 @@ import ( "github.com/emiago/sipgo" "github.com/emiago/sipgo/sip" + myip "github.com/husanpao/ip" "github.com/icholy/digest" + "m7s.live/v5/pkg/task" ) // InitializeSIPClient 初始化SIP客户端 -func (p *Platform) InitializeSIPClient(ua *sipgo.UserAgent, localIP string) error { +func (p *Platform) InitializeSIPClient(ua *sipgo.UserAgent) error { + localIP := myip.InternalIPv4() var err error p.Client, err = sipgo.NewClient(ua, sipgo.WithClientHostname(localIP)) if err != nil { @@ -45,7 +48,7 @@ func (p *Platform) InitializeSIPClient(ua *sipgo.UserAgent, localIP string) erro } // Register 发送注册请求到上级平台 -func (p *Platform) Register(ctx context.Context, plugin interface{}) (*sipgo.DialogClientSession, error) { +func (p *Platform) Register(ctx context.Context) (*sipgo.DialogClientSession, error) { // 创建注册请求的目标URI,使用上级平台的信息 recipient := sip.Uri{ User: p.ServerGBID, @@ -89,6 +92,7 @@ func (p *Platform) Register(ctx context.Context, plugin interface{}) (*sipgo.Dia // 发送请求并获取响应 tx, err := p.Client.TransactionRequest(ctx, req, sipgo.ClientRequestAddVia) if err != nil { + p.Error("register", "error", err.Error()) return nil, fmt.Errorf("创建事务失败: %v", err) } defer tx.Terminate() @@ -96,6 +100,7 @@ func (p *Platform) Register(ctx context.Context, plugin interface{}) (*sipgo.Dia // 获取响应 res, err := p.getResponse(tx) if err != nil { + p.Error("register", "error", err.Error()) return nil, fmt.Errorf("获取响应失败: %v", err) } @@ -104,12 +109,14 @@ func (p *Platform) Register(ctx context.Context, plugin interface{}) (*sipgo.Dia // 获取WWW-Authenticate头部 wwwAuth := res.GetHeader("WWW-Authenticate") if wwwAuth == nil { + p.Error("register", "error", "no auth challenge") return nil, fmt.Errorf("未收到认证质询") } // 解析认证质询 chal, err := digest.ParseChallenge(wwwAuth.Value()) if err != nil { + p.Error("register", "error", err.Error()) return nil, fmt.Errorf("解析认证质询失败: %v", err) } @@ -142,10 +149,11 @@ func (p *Platform) Register(ctx context.Context, plugin interface{}) (*sipgo.Dia // 检查最终响应状态 if res.StatusCode != 200 { + p.Error("register", "status", res.StatusCode) return nil, fmt.Errorf("注册失败,状态码: %d", res.StatusCode) } - // 注册成功,不需要维护会话状态 + p.Info("register", "response", res.String()) return nil, nil } @@ -160,13 +168,64 @@ func (p *Platform) getResponse(tx sip.ClientTransaction) (*sip.Response, error) } // Keepalive 发送心跳请求到上级平台 -func (p *Platform) Keepalive(ctx context.Context, plugin interface{}) (*sipgo.DialogClientSession, error) { - // TODO: 实现心跳逻辑 +func (p *Platform) Keepalive(ctx context.Context) (*sipgo.DialogClientSession, error) { + recipient := sip.Uri{ + User: p.ServerGBID, + Host: p.ServerIP, + Port: p.ServerPort, + } + + req := sip.NewRequest("MESSAGE", recipient) + req.SetTransport(strings.ToUpper(p.Transport)) + + // 添加From头部 + fromHeader := sip.FromHeader{ + Address: sip.Uri{ + User: p.DeviceGBID, + Host: p.ServerGBDomain, + }, + Params: sip.NewParams(), + } + fromHeader.Params.Add("tag", sip.GenerateTagN(16)) + req.AppendHeader(&fromHeader) + + // 添加To头部 + toHeader := sip.ToHeader{ + Address: sip.Uri{ + User: p.ServerGBID, + Host: p.ServerGBDomain, + }, + } + req.AppendHeader(&toHeader) + + // 添加Contact头部 + contactStr := fmt.Sprintf("", p.DeviceGBID, p.DeviceIP, p.DevicePort) + req.AppendHeader(sip.NewHeader("Contact", contactStr)) + + tx, err := p.Client.TransactionRequest(ctx, req, sipgo.ClientRequestAddVia) + if err != nil { + p.Error("keepalive", "error", err.Error()) + return nil, fmt.Errorf("创建事务失败: %v", err) + } + defer tx.Terminate() + + res, err := p.getResponse(tx) + if err != nil { + p.Error("keepalive", "error", err.Error()) + return nil, err + } + + if res.StatusCode != 200 { + p.Error("keepalive", "status", res.StatusCode) + return nil, fmt.Errorf("心跳失败,状态码: %d", res.StatusCode) + } + + p.Info("keepalive", "response", res.String()) return nil, nil } // Unregister 发送注销请求到上级平台 -func (p *Platform) Unregister(ctx context.Context, plugin interface{}) (*sipgo.DialogClientSession, error) { +func (p *Platform) Unregister(ctx context.Context) (*sipgo.DialogClientSession, error) { // 创建注销请求的目标URI recipient := sip.Uri{ User: p.ServerGBID, @@ -228,38 +287,64 @@ func (p *Platform) Unregister(ctx context.Context, plugin interface{}) (*sipgo.D return nil, nil } -// StartRegisterTask 启动注册任务 -// 这个方法会在平台启用时被调用,负责处理注册和保活 -func (p *Platform) StartRegisterTask(plugin interface{}) { - ctx := context.Background() +// RegisterTask 处理定时注册 +type RegisterTask struct { + task.TickTask + platform *Platform +} - // 首次注册 - session, err := p.Register(ctx, plugin) - if err != nil { - // TODO: 处理注册失败 +func (r *RegisterTask) GetTickInterval() time.Duration { + return time.Second * time.Duration(r.platform.Expires) +} + +func (r *RegisterTask) Tick(any) { + if !r.platform.Enable { + r.platform.Status = false + r.platform.CurrentSession = nil + ctx := context.Background() + _, _ = r.platform.Unregister(ctx) + r.Error("register", "error", "platform disabled") + r.Stop(fmt.Errorf("platform disabled")) return } - // 保存当前会话 - p.CurrentSession = session - - // 启动保活协程 - go func() { - ticker := time.NewTicker(time.Duration(p.KeepTimeout) * time.Second) - defer ticker.Stop() - - for range ticker.C { - if !p.Enable { - // 如果平台被禁用,发送注销请求并退出 - _, _ = p.Unregister(ctx, plugin) - return - } - - // 发送心跳 - _, err := p.Keepalive(ctx, plugin) - if err != nil { - // TODO: 处理心跳失败 - } + ctx := context.Background() + session, err := r.platform.Register(ctx) + if err != nil { + r.platform.IncrementRegisterAliveReply() + r.Error("register", "error", err.Error(), "retries", r.platform.RegisterAliveReply) + if r.platform.RegisterAliveReply >= 3 { + r.platform.Status = false + r.platform.CurrentSession = nil + r.Stop(fmt.Errorf("max retries reached: %d", r.platform.RegisterAliveReply)) } - }() + return + } + + r.Info("register", "status", "success") + r.platform.Status = true + r.platform.CurrentSession = session + r.platform.ResetRegisterAliveReply() +} + +// StartRegisterTask 启动注册任务 +func (p *Platform) StartRegisterTask() { + ctx := context.Background() + + // 首次注册 + session, err := p.Register(ctx) + if err != nil { + p.Status = false + p.IncrementRegisterAliveReply() + // 注册失败,启动定时注册任务 + var rt RegisterTask + rt.platform = p + p.AddTask(&rt) + return + } + + // 注册成功,更新状态 + p.Status = true + p.CurrentSession = session + p.ResetRegisterAliveReply() }