feature: support register to platform and keepalive

This commit is contained in:
pg
2025-02-15 13:05:20 +08:00
committed by pggiroro
parent bb92152c15
commit 321bba6a0c
3 changed files with 203 additions and 96 deletions
+6 -23
View File
@@ -5,7 +5,6 @@ import (
"fmt"
"net/http"
"os"
"strconv"
"strings"
"sync"
"time"
@@ -571,23 +570,15 @@ func (gb *GB28181ProPlugin) AddPlatform(ctx context.Context, req *pb.Platform) (
// 如果平台启用,则启动注册任务
if platform.Enable {
// 获取本地SIP端口
localPort := 5060 // 默认端口
if len(gb.Sip.ListenAddr) > 0 {
if port, err := strconv.Atoi(strings.Split(gb.Sip.ListenAddr[0], ":")[1]); err == nil {
localPort = port
}
}
// 创建平台命令器
commander := gb28181.NewSIPPlatformCommander(platform, gb.GetPublicIP(platform.ServerIP), localPort)
if err := commander.InitializeSIPClient(gb.ua); err != nil {
// 使用 platform 自身初始化 SIP 客户端
if err := platform.InitializeSIPClient(gb.ua); err != nil {
gb.Error("初始化SIP客户端失败", "error", err)
resp.Code = 500
resp.Message = fmt.Sprintf("初始化SIP客户端失败: %v", err)
return resp, nil
}
// 启动注册任务
commander.StartRegisterTask(gb)
platform.StartRegisterTask()
}
resp.Code = 0
@@ -726,22 +717,14 @@ func (gb *GB28181ProPlugin) UpdatePlatform(ctx context.Context, req *pb.Platform
// 处理平台启用状态变化
if !wasEnabled && platform.Enable {
// 获取本地SIP端口
localPort := 5060 // 默认端口
if len(gb.Sip.ListenAddr) > 0 {
if port, err := strconv.Atoi(strings.Split(gb.Sip.ListenAddr[0], ":")[1]); err == nil {
localPort = port
}
}
// 平台从禁用变为启用,启动注册任务
commander := gb28181.NewSIPPlatformCommander(&platform, gb.GetPublicIP(platform.ServerIP), localPort)
if err := commander.InitializeSIPClient(gb.ua); err != nil {
// 使用 platform 自身初始化 SIP 客户端
if err := platform.InitializeSIPClient(gb.ua); err != nil {
gb.Error("初始化SIP客户端失败", "error", err)
resp.Code = 500
resp.Message = fmt.Sprintf("初始化SIP客户端失败: %v", err)
return resp, nil
}
commander.StartRegisterTask(gb)
platform.StartRegisterTask()
} else if wasEnabled && !platform.Enable {
// TODO: 平台从启用变为禁用,需要处理注销逻辑
// 这里可以添加注销相关的代码
+77 -38
View File
@@ -2,52 +2,58 @@
package gb28181
import (
"context"
"fmt"
"time"
"github.com/emiago/sipgo"
"github.com/emiago/sipgo/sip"
"m7s.live/v5/pkg/task"
)
// Platform 表示GB28181平台的配置信息。
// 包含了平台的基本信息、SIP服务配置、设备信息、认证信息等。
// 用于存储和管理GB28181平台的所有相关参数。
type Platform struct {
ID int `gorm:"primaryKey;autoIncrement" json:"id"` // ID表示数据库中的唯一标识符
Enable bool `gorm:"column:enable" json:"enable"` // Enable表示该平台配置是否启用
Name string `gorm:"column:name" json:"name"` // Name表示平台的名称
ServerGBID string `gorm:"column:server_gb_id" json:"serverGBId"` // ServerGBID表示SIP服务器的国标编码
ServerGBDomain string `gorm:"column:server_gb_domain" json:"serverGBDomain"` // ServerGBDomain表示SIP服务器的国标
ServerIP string `gorm:"column:server_ip" json:"serverIp"` // ServerIP表示SIP服务器的IP地址
ServerPort int `gorm:"column:server_port" json:"serverPort"` // ServerPort表示SIP服务器的端口号
DeviceGBID string `gorm:"column:device_gb_id" json:"deviceGBId"` // DeviceGBID表示设备的国标编
DeviceIP string `gorm:"column:device_ip" json:"deviceIp"` // DeviceIP表示设备的IP地址
DevicePort int `gorm:"column:device_port" json:"devicePort"` // DevicePort表示设备的端口号
Username string `gorm:"column:username" json:"username"` // Username表示SIP认证的用户名,默认使用设备国标编
Password string `gorm:"column:password" json:"password"` // Password表示SIP认证的密码
Expires int `gorm:"column:expires" json:"expires"` // Expires表示注册的过期时间,单位为秒
KeepTimeout int `gorm:"column:keep_timeout" json:"keepTimeout"` // KeepTimeout表示心跳超时时间,单位为秒
Transport string `gorm:"column:transport" json:"transport"` // Transport表示传输协议类型
CharacterSet string `gorm:"column:character_set" json:"characterSet"` // CharacterSet表示字符集编码
PTZ bool `gorm:"column:ptz" json:"ptz"` // PTZ表示是否允许云台控制
RTCP bool `gorm:"column:rtcp" json:"rtcp"` // RTCP表示是否启用RTCP流保活
Status bool `gorm:"column:status" json:"status"` // Status表示平台当前的在线状态
ChannelCount int `gorm:"column:channel_count" json:"channelCount"` // ChannelCount表示通道数量
CatalogSubscribe bool `gorm:"column:catalog_subscribe" json:"catalogSubscribe"` // CatalogSubscribe表示是否已订阅目录信息
AlarmSubscribe bool `gorm:"column:alarm_subscribe" json:"alarmSubscribe"` // AlarmSubscribe表示是否已订阅报警信息
MobilePositionSubscribe bool `gorm:"column:mobile_position_subscribe" json:"mobilePositionSubscribe"` // MobilePositionSubscribe表示是否已订阅移动位置信息
CatalogGroup int `gorm:"column:catalog_group" json:"catalogGroup"` // CatalogGroup表示目录分组大小,每次向上级发送通道数量
UpdateTime string `gorm:"column:update_time" json:"updateTime"` // UpdateTime表示最后更新时间
CreateTime string `gorm:"column:create_time" json:"createTime"` // CreateTime表示创建时间
AsMessageChannel bool `gorm:"column:as_message_channel" json:"asMessageChannel"` // AsMessageChannel表示是否作为消息通道使用
SendStreamIP string `gorm:"column:send_stream_ip" json:"sendStreamIp"` // SendStreamIP表示点播回复200OK时使用的IP地址
AutoPushChannel *bool `gorm:"column:auto_push_channel" json:"autoPushChannel"` // AutoPushChannel表示是否自动推送通道变化
CatalogWithPlatform int `gorm:"column:catalog_with_platform" json:"catalogWithPlatform"` // CatalogWithPlatform表示目录信息是否包含平台信息(0:关闭,1:打开)
CatalogWithGroup int `gorm:"column:catalog_with_group" json:"catalogWithGroup"` // CatalogWithGroup表示目录信息是否包含分组信息(0:关闭,1:打开)
CatalogWithRegion int `gorm:"column:catalog_with_region" json:"catalogWithRegion"` // CatalogWithRegion表示目录信息是否包含行政区划(0:关闭,1:打开)
CivilCode string `gorm:"column:civil_code" json:"civilCode"` // CivilCode表示行政区划代码
Manufacturer string `gorm:"column:manufacturer" json:"manufacturer"` // Manufacturer表示平台厂商
Model string `gorm:"column:model" json:"model"` // Model表示平台型号
Address string `gorm:"column:address" json:"address"` // Address表示平台安装地址
RegisterWay int `gorm:"column:register_way" json:"registerWay"` // RegisterWay表示注册方式(1:标准认证注册,2:口令认证,3:数字证书双向认证,4:数字证书单向认证)
Secrecy int `gorm:"column:secrecy" json:"secrecy"` // Secrecy表示保密属性(0:不涉密,1:涉密)
task.Job `gorm:"-:all"` // 使用 Task 而不是 Job,并且排除 gorm 序列化
ID int `gorm:"primaryKey;autoIncrement" json:"id"` // ID表示数据库中的唯一标识符
Enable bool `gorm:"column:enable" json:"enable"` // Enable表示平台配置是否启用
Name string `gorm:"column:name" json:"name"` // Name表示平台的名称
ServerGBID string `gorm:"column:server_gb_id" json:"serverGBId"` // ServerGBID表示SIP服务器的国标编码
ServerGBDomain string `gorm:"column:server_gb_domain" json:"serverGBDomain"` // ServerGBDomain表示SIP服务器的国标域
ServerIP string `gorm:"column:server_ip" json:"serverIp"` // ServerIP表示SIP服务器的IP地址
ServerPort int `gorm:"column:server_port" json:"serverPort"` // ServerPort表示SIP服务器的端口
DeviceGBID string `gorm:"column:device_gb_id" json:"deviceGBId"` // DeviceGBID表示设备的国标编号
DeviceIP string `gorm:"column:device_ip" json:"deviceIp"` // DeviceIP表示设备的IP地址
DevicePort int `gorm:"column:device_port" json:"devicePort"` // DevicePort表示设备的端口
Username string `gorm:"column:username" json:"username"` // Username表示SIP认证的用户名,默认使用设备国标编号
Password string `gorm:"column:password" json:"password"` // Password表示SIP认证的密码
Expires int `gorm:"column:expires" json:"expires"` // Expires表示注册的过期时间,单位为秒
KeepTimeout int `gorm:"column:keep_timeout" json:"keepTimeout"` // KeepTimeout表示心跳超时时间,单位为秒
Transport string `gorm:"column:transport" json:"transport"` // Transport表示传输协议类型
CharacterSet string `gorm:"column:character_set" json:"characterSet"` // CharacterSet表示字符集编码
PTZ bool `gorm:"column:ptz" json:"ptz"` // PTZ表示是否允许云台控制
RTCP bool `gorm:"column:rtcp" json:"rtcp"` // RTCP表示是否启用RTCP流保活
Status bool `gorm:"column:status" json:"status"` // Status表示平台当前的在线状态
ChannelCount int `gorm:"column:channel_count" json:"channelCount"` // ChannelCount表示通道数量
CatalogSubscribe bool `gorm:"column:catalog_subscribe" json:"catalogSubscribe"` // CatalogSubscribe表示是否已订阅目录信息
AlarmSubscribe bool `gorm:"column:alarm_subscribe" json:"alarmSubscribe"` // AlarmSubscribe表示是否已订阅报警信息
MobilePositionSubscribe bool `gorm:"column:mobile_position_subscribe" json:"mobilePositionSubscribe"` // MobilePositionSubscribe表示是否已订阅移动位置信息
CatalogGroup int `gorm:"column:catalog_group" json:"catalogGroup"` // CatalogGroup表示目录分组大小,每次向上级发送通道数量
UpdateTime string `gorm:"column:update_time" json:"updateTime"` // UpdateTime表示最后更新时间
CreateTime string `gorm:"column:create_time" json:"createTime"` // CreateTime表示创建时间
AsMessageChannel bool `gorm:"column:as_message_channel" json:"asMessageChannel"` // AsMessageChannel表示是否作为消息通道使用
SendStreamIP string `gorm:"column:send_stream_ip" json:"sendStreamIp"` // SendStreamIP表示点播回复200OK时使用的IP地址
AutoPushChannel *bool `gorm:"column:auto_push_channel" json:"autoPushChannel"` // AutoPushChannel表示是否自动推送通道变化
CatalogWithPlatform int `gorm:"column:catalog_with_platform" json:"catalogWithPlatform"` // CatalogWithPlatform表示目录信息是否包含平台信息(0:关闭,1:打开)
CatalogWithGroup int `gorm:"column:catalog_with_group" json:"catalogWithGroup"` // CatalogWithGroup表示目录信息是否包含分组信息(0:关闭,1:打开)
CatalogWithRegion int `gorm:"column:catalog_with_region" json:"catalogWithRegion"` // CatalogWithRegion表示目录信息是否包含行政区划(0:关闭,1:打开)
CivilCode string `gorm:"column:civil_code" json:"civilCode"` // CivilCode表示行政区划代码
Manufacturer string `gorm:"column:manufacturer" json:"manufacturer"` // Manufacturer表示平台厂商
Model string `gorm:"column:model" json:"model"` // Model表示平台型号
Address string `gorm:"column:address" json:"address"` // Address表示平台安装地址
RegisterWay int `gorm:"column:register_way" json:"registerWay"` // RegisterWay表示注册方式(1:标准认证注册,2:口令认证,3:数字证书双向认证,4:数字证书单向认证)
Secrecy int `gorm:"column:secrecy" json:"secrecy"` // Secrecy表示保密属性(0:不涉密,1:涉密)
// 运行时字段,不存储到数据库
KeepAliveReply int `gorm:"-" json:"keepAliveReply"` // KeepAliveReply表示心跳未回复次数
@@ -110,3 +116,36 @@ func (p *Platform) SetCallID(callID string) {
func (p *Platform) GetCallID() string {
return p.CallID
}
// KeepAlive 任务
type KeepAlive struct {
task.TickTask
platform *Platform
}
func (k *KeepAlive) GetTickInterval() time.Duration {
return time.Second * time.Duration(k.platform.KeepTimeout)
}
func (k *KeepAlive) Tick(any) {
if !k.platform.Enable {
return
}
ctx := context.Background()
_, err := k.platform.Keepalive(ctx)
if err != nil {
k.platform.IncrementKeepAliveReply()
k.Error("keepalive", "error", err.Error())
if k.platform.KeepAliveReply >= 3 {
k.platform.Status = false
k.Stop(fmt.Errorf("max keepalive retries reached"))
// 重新启动注册任务
var rt RegisterTask
rt.platform = k.platform
k.platform.AddTask(&rt)
}
} else {
k.platform.ResetKeepAliveReply()
}
}
+120 -35
View File
@@ -8,11 +8,14 @@ import (
"github.com/emiago/sipgo"
"github.com/emiago/sipgo/sip"
myip "github.com/husanpao/ip"
"github.com/icholy/digest"
"m7s.live/v5/pkg/task"
)
// InitializeSIPClient 初始化SIP客户端
func (p *Platform) InitializeSIPClient(ua *sipgo.UserAgent, localIP string) error {
func (p *Platform) InitializeSIPClient(ua *sipgo.UserAgent) error {
localIP := myip.InternalIPv4()
var err error
p.Client, err = sipgo.NewClient(ua, sipgo.WithClientHostname(localIP))
if err != nil {
@@ -45,7 +48,7 @@ func (p *Platform) InitializeSIPClient(ua *sipgo.UserAgent, localIP string) erro
}
// Register 发送注册请求到上级平台
func (p *Platform) Register(ctx context.Context, plugin interface{}) (*sipgo.DialogClientSession, error) {
func (p *Platform) Register(ctx context.Context) (*sipgo.DialogClientSession, error) {
// 创建注册请求的目标URI,使用上级平台的信息
recipient := sip.Uri{
User: p.ServerGBID,
@@ -89,6 +92,7 @@ func (p *Platform) Register(ctx context.Context, plugin interface{}) (*sipgo.Dia
// 发送请求并获取响应
tx, err := p.Client.TransactionRequest(ctx, req, sipgo.ClientRequestAddVia)
if err != nil {
p.Error("register", "error", err.Error())
return nil, fmt.Errorf("创建事务失败: %v", err)
}
defer tx.Terminate()
@@ -96,6 +100,7 @@ func (p *Platform) Register(ctx context.Context, plugin interface{}) (*sipgo.Dia
// 获取响应
res, err := p.getResponse(tx)
if err != nil {
p.Error("register", "error", err.Error())
return nil, fmt.Errorf("获取响应失败: %v", err)
}
@@ -104,12 +109,14 @@ func (p *Platform) Register(ctx context.Context, plugin interface{}) (*sipgo.Dia
// 获取WWW-Authenticate头部
wwwAuth := res.GetHeader("WWW-Authenticate")
if wwwAuth == nil {
p.Error("register", "error", "no auth challenge")
return nil, fmt.Errorf("未收到认证质询")
}
// 解析认证质询
chal, err := digest.ParseChallenge(wwwAuth.Value())
if err != nil {
p.Error("register", "error", err.Error())
return nil, fmt.Errorf("解析认证质询失败: %v", err)
}
@@ -142,10 +149,11 @@ func (p *Platform) Register(ctx context.Context, plugin interface{}) (*sipgo.Dia
// 检查最终响应状态
if res.StatusCode != 200 {
p.Error("register", "status", res.StatusCode)
return nil, fmt.Errorf("注册失败,状态码: %d", res.StatusCode)
}
// 注册成功,不需要维护会话状态
p.Info("register", "response", res.String())
return nil, nil
}
@@ -160,13 +168,64 @@ func (p *Platform) getResponse(tx sip.ClientTransaction) (*sip.Response, error)
}
// Keepalive 发送心跳请求到上级平台
func (p *Platform) Keepalive(ctx context.Context, plugin interface{}) (*sipgo.DialogClientSession, error) {
// TODO: 实现心跳逻辑
func (p *Platform) Keepalive(ctx context.Context) (*sipgo.DialogClientSession, error) {
recipient := sip.Uri{
User: p.ServerGBID,
Host: p.ServerIP,
Port: p.ServerPort,
}
req := sip.NewRequest("MESSAGE", recipient)
req.SetTransport(strings.ToUpper(p.Transport))
// 添加From头部
fromHeader := sip.FromHeader{
Address: sip.Uri{
User: p.DeviceGBID,
Host: p.ServerGBDomain,
},
Params: sip.NewParams(),
}
fromHeader.Params.Add("tag", sip.GenerateTagN(16))
req.AppendHeader(&fromHeader)
// 添加To头部
toHeader := sip.ToHeader{
Address: sip.Uri{
User: p.ServerGBID,
Host: p.ServerGBDomain,
},
}
req.AppendHeader(&toHeader)
// 添加Contact头部
contactStr := fmt.Sprintf("<sip:%s@%s:%d>", p.DeviceGBID, p.DeviceIP, p.DevicePort)
req.AppendHeader(sip.NewHeader("Contact", contactStr))
tx, err := p.Client.TransactionRequest(ctx, req, sipgo.ClientRequestAddVia)
if err != nil {
p.Error("keepalive", "error", err.Error())
return nil, fmt.Errorf("创建事务失败: %v", err)
}
defer tx.Terminate()
res, err := p.getResponse(tx)
if err != nil {
p.Error("keepalive", "error", err.Error())
return nil, err
}
if res.StatusCode != 200 {
p.Error("keepalive", "status", res.StatusCode)
return nil, fmt.Errorf("心跳失败,状态码: %d", res.StatusCode)
}
p.Info("keepalive", "response", res.String())
return nil, nil
}
// Unregister 发送注销请求到上级平台
func (p *Platform) Unregister(ctx context.Context, plugin interface{}) (*sipgo.DialogClientSession, error) {
func (p *Platform) Unregister(ctx context.Context) (*sipgo.DialogClientSession, error) {
// 创建注销请求的目标URI
recipient := sip.Uri{
User: p.ServerGBID,
@@ -228,38 +287,64 @@ func (p *Platform) Unregister(ctx context.Context, plugin interface{}) (*sipgo.D
return nil, nil
}
// StartRegisterTask 启动注册任务
// 这个方法会在平台启用时被调用,负责处理注册和保活
func (p *Platform) StartRegisterTask(plugin interface{}) {
ctx := context.Background()
// RegisterTask 处理定时注册
type RegisterTask struct {
task.TickTask
platform *Platform
}
// 首次注册
session, err := p.Register(ctx, plugin)
if err != nil {
// TODO: 处理注册失败
func (r *RegisterTask) GetTickInterval() time.Duration {
return time.Second * time.Duration(r.platform.Expires)
}
func (r *RegisterTask) Tick(any) {
if !r.platform.Enable {
r.platform.Status = false
r.platform.CurrentSession = nil
ctx := context.Background()
_, _ = r.platform.Unregister(ctx)
r.Error("register", "error", "platform disabled")
r.Stop(fmt.Errorf("platform disabled"))
return
}
// 保存当前会话
p.CurrentSession = session
// 启动保活协程
go func() {
ticker := time.NewTicker(time.Duration(p.KeepTimeout) * time.Second)
defer ticker.Stop()
for range ticker.C {
if !p.Enable {
// 如果平台被禁用,发送注销请求并退出
_, _ = p.Unregister(ctx, plugin)
return
}
// 发送心跳
_, err := p.Keepalive(ctx, plugin)
if err != nil {
// TODO: 处理心跳失败
}
ctx := context.Background()
session, err := r.platform.Register(ctx)
if err != nil {
r.platform.IncrementRegisterAliveReply()
r.Error("register", "error", err.Error(), "retries", r.platform.RegisterAliveReply)
if r.platform.RegisterAliveReply >= 3 {
r.platform.Status = false
r.platform.CurrentSession = nil
r.Stop(fmt.Errorf("max retries reached: %d", r.platform.RegisterAliveReply))
}
}()
return
}
r.Info("register", "status", "success")
r.platform.Status = true
r.platform.CurrentSession = session
r.platform.ResetRegisterAliveReply()
}
// StartRegisterTask 启动注册任务
func (p *Platform) StartRegisterTask() {
ctx := context.Background()
// 首次注册
session, err := p.Register(ctx)
if err != nil {
p.Status = false
p.IncrementRegisterAliveReply()
// 注册失败,启动定时注册任务
var rt RegisterTask
rt.platform = p
p.AddTask(&rt)
return
}
// 注册成功,更新状态
p.Status = true
p.CurrentSession = session
p.ResetRegisterAliveReply()
}