fix: modify ptz api;modify updateplatform api

This commit is contained in:
pg
2025-03-04 16:16:54 +08:00
committed by pggiroro
parent 1fa85d39d9
commit 0dcfe382fd
5 changed files with 142 additions and 118 deletions
+49 -52
View File
@@ -738,7 +738,7 @@ func (gb *GB28181ProPlugin) AddPlatform(ctx context.Context, req *pb.Platform) (
CreateTime: req.CreateTime,
AsMessageChannel: req.AsMessageChannel,
SendStreamIP: req.SendStreamIp,
AutoPushChannel: &req.AutoPushChannel,
AutoPushChannel: req.AutoPushChannel,
CatalogWithPlatform: int(req.CatalogWithPlatform),
CatalogWithGroup: int(req.CatalogWithGroup),
CatalogWithRegion: int(req.CatalogWithRegion),
@@ -760,11 +760,7 @@ func (gb *GB28181ProPlugin) AddPlatform(ctx context.Context, req *pb.Platform) (
// 如果平台启用,则创建Platform实例并启动任务
if platformModel.Enable {
// 创建Platform实例
platform := &Platform{
PlatformModel: platformModel,
plugin: gb,
}
platform := NewPlatform(platformModel, gb)
// 添加到任务系统
gb.AddTask(platform)
gb.platforms.Set(platform)
@@ -822,7 +818,7 @@ func (gb *GB28181ProPlugin) GetPlatform(ctx context.Context, req *pb.GetPlatform
CreateTime: platform.CreateTime,
AsMessageChannel: platform.AsMessageChannel,
SendStreamIp: platform.SendStreamIP,
AutoPushChannel: platform.AutoPushChannel != nil && *platform.AutoPushChannel,
AutoPushChannel: platform.AutoPushChannel,
CatalogWithPlatform: int32(platform.CatalogWithPlatform),
CatalogWithGroup: int32(platform.CatalogWithGroup),
CatalogWithRegion: int32(platform.CatalogWithRegion),
@@ -860,45 +856,49 @@ func (gb *GB28181ProPlugin) UpdatePlatform(ctx context.Context, req *pb.Platform
runningPlatform.Stop(errors.New("stop running platform,platform.ServerGBID is " + platform.ServerGBID))
}
// 更新平台信息
platform.Enable = req.Enable
platform.Name = req.Name
platform.ServerGBID = req.ServerGBId
platform.ServerGBDomain = req.ServerGBDomain
platform.ServerIP = req.ServerIp
platform.ServerPort = int(req.ServerPort)
platform.DeviceGBID = req.DeviceGBId
platform.DeviceIP = req.DeviceIp
platform.DevicePort = int(req.DevicePort)
platform.Username = req.Username
platform.Password = req.Password
platform.Expires = int(req.Expires)
platform.KeepTimeout = int(req.KeepTimeout)
platform.Transport = req.Transport
platform.CharacterSet = req.CharacterSet
platform.PTZ = req.Ptz
platform.RTCP = req.Rtcp
platform.Status = req.Status
platform.ChannelCount = int(req.ChannelCount)
platform.CatalogSubscribe = req.CatalogSubscribe
platform.AlarmSubscribe = req.AlarmSubscribe
platform.MobilePositionSubscribe = req.MobilePositionSubscribe
platform.CatalogGroup = int(req.CatalogGroup)
platform.UpdateTime = req.UpdateTime
platform.AsMessageChannel = req.AsMessageChannel
platform.SendStreamIP = req.SendStreamIp
platform.AutoPushChannel = &req.AutoPushChannel
platform.CatalogWithPlatform = int(req.CatalogWithPlatform)
platform.CatalogWithGroup = int(req.CatalogWithGroup)
platform.CatalogWithRegion = int(req.CatalogWithRegion)
platform.CivilCode = req.CivilCode
platform.Manufacturer = req.Manufacturer
platform.Model = req.Model
platform.Address = req.Address
platform.RegisterWay = int(req.RegisterWay)
platform.Secrecy = int(req.Secrecy)
// 从请求中创建一个新的平台模型
updatedPlatform := gb28181.PlatformModel{
ID: req.ID,
Enable: req.Enable,
Name: req.Name,
ServerGBID: req.ServerGBId,
ServerGBDomain: req.ServerGBDomain,
ServerIP: req.ServerIp,
ServerPort: int(req.ServerPort),
DeviceGBID: req.DeviceGBId,
DeviceIP: req.DeviceIp,
DevicePort: int(req.DevicePort),
Username: req.Username,
Password: req.Password,
Expires: int(req.Expires),
KeepTimeout: int(req.KeepTimeout),
Transport: req.Transport,
CharacterSet: req.CharacterSet,
PTZ: req.Ptz,
RTCP: req.Rtcp,
Status: req.Status,
ChannelCount: int(req.ChannelCount),
CatalogSubscribe: req.CatalogSubscribe,
AlarmSubscribe: req.AlarmSubscribe,
MobilePositionSubscribe: req.MobilePositionSubscribe,
CatalogGroup: int(req.CatalogGroup),
UpdateTime: req.UpdateTime,
AsMessageChannel: req.AsMessageChannel,
SendStreamIP: req.SendStreamIp,
AutoPushChannel: req.AutoPushChannel,
CatalogWithPlatform: int(req.CatalogWithPlatform),
CatalogWithGroup: int(req.CatalogWithGroup),
CatalogWithRegion: int(req.CatalogWithRegion),
CivilCode: req.CivilCode,
Manufacturer: req.Manufacturer,
Model: req.Model,
Address: req.Address,
RegisterWay: int(req.RegisterWay),
Secrecy: int(req.Secrecy),
}
if err := gb.DB.Save(&platform).Error; err != nil {
// 使用 GORM 的 Updates 方法更新非零值字段
if err := gb.DB.Model(&platform).Updates(updatedPlatform).Error; err != nil {
resp.Code = 500
resp.Message = fmt.Sprintf("failed to update platform: %v", err)
return resp, nil
@@ -913,19 +913,16 @@ func (gb *GB28181ProPlugin) UpdatePlatform(ctx context.Context, req *pb.Platform
}
// 创建新的Platform实例
platformInstance := &Platform{
PlatformModel: &platform,
plugin: gb,
}
platformInstance := NewPlatform(&platform, gb)
platformInstance.Unregister()
// 添加到任务系统
gb.AddTask(platformInstance)
// 添加到platforms集合中
gb.platforms.Add(platformInstance)
} else {
// 如果平台被禁用,停止并移除旧的platform实例
if oldPlatform, ok := gb.platforms.Get(platform.ID); ok {
oldPlatform.Unregister()
oldPlatform.Stop(fmt.Errorf("platform disabled"))
gb.platforms.Remove(oldPlatform)
}
@@ -1040,7 +1037,7 @@ func (gb *GB28181ProPlugin) ListPlatforms(ctx context.Context, req *pb.ListPlatf
CreateTime: p.CreateTime,
AsMessageChannel: p.AsMessageChannel,
SendStreamIp: p.SendStreamIP,
AutoPushChannel: p.AutoPushChannel != nil && *p.AutoPushChannel,
AutoPushChannel: p.AutoPushChannel,
CatalogWithPlatform: int32(p.CatalogWithPlatform),
CatalogWithGroup: int32(p.CatalogWithGroup),
CatalogWithRegion: int32(p.CatalogWithRegion),
+39 -9
View File
@@ -119,6 +119,9 @@ func (gb *GB28181ProPlugin) OnInit() (err error) {
if err := gb.checkDeviceExpire(); err != nil {
gb.Error("检查设备过期状态失败", "error", err)
}
// 检查并初始化平台
gb.checkPlatform()
}
}
if gb.Parent != "" {
@@ -130,6 +133,41 @@ func (gb *GB28181ProPlugin) OnInit() (err error) {
return
}
// checkPlatform 从数据库中查找启用状态的平台,初始化它们,并进行注册和定时任务设置
func (gb *GB28181ProPlugin) checkPlatform() {
// 检查数据库是否初始化
if gb.DB == nil {
gb.Error("数据库未初始化,无法检查平台")
return
}
// 查询所有启用状态的平台
var platformModels []*gb28181.PlatformModel
platformModel := gb28181.PlatformModel{Enable: true}
if err := gb.DB.Where(&platformModel).Find(&platformModels).Error; err != nil {
gb.Error("查询平台失败", "error", err.Error())
return
}
gb.Info("找到启用状态的平台", "count", len(platformModels))
// 遍历所有平台进行初始化和注册
for _, platformModel := range platformModels {
// 创建Platform实例
platform := NewPlatform(platformModel, gb)
_, err := platform.Unregister()
if err != nil {
gb.Error("unregister err ", err)
} else {
// 添加到任务系统
gb.AddTask(platform)
gb.platforms.Set(platform)
gb.Info("平台初始化完成", "ID", platformModel.ID, "Name", platformModel.Name)
}
}
}
func (gb *GB28181ProPlugin) checkDeviceExpire() (err error) {
// 从数据库中查询所有设备
var devices []*Device
@@ -390,18 +428,10 @@ func (gb *GB28181ProPlugin) OnMessage(req *sip.Request, tx sip.ServerTransaction
var platform *Platform
if platformtmp, ok := gb.platforms.Get(p.ID); !ok {
// 创建 Platform 实例
platform = &Platform{
PlatformModel: p,
plugin: gb,
}
platform.init()
gb.Info("222222222222222222")
platform = NewPlatform(p, gb)
} else {
gb.Info("1111111111111111")
platform = platformtmp
}
if err = platform.OnMessage(req, tx, temp); err != nil {
gb.Error("onMessage", "error", err.Error(), "type", "platform")
}
+38 -38
View File
@@ -9,44 +9,44 @@ import (
// 包含了平台的基本信息、SIP服务配置、设备信息、认证信息等。
// 用于存储和管理GB28181平台的所有相关参数。
type PlatformModel struct {
ID uint32 `gorm:"primaryKey;autoIncrement"` // ID表示数据库中的唯一标识符
Enable bool `gorm:"column:enable" json:"enable"` // Enable表示该平台配置是否启用
Name string `gorm:"column:name" json:"name"` // Name表示平台的名称
ServerGBID string `gorm:"column:server_gb_id" json:"serverGBId"` // ServerGBID表示SIP服务器的国标编码
ServerGBDomain string `gorm:"column:server_gb_domain" json:"serverGBDomain"` // ServerGBDomain表示SIP服务器的国标域
ServerIP string `gorm:"column:server_ip" json:"serverIp"` // ServerIP表示SIP服务器的IP地址
ServerPort int `gorm:"column:server_port" json:"serverPort"` // ServerPort表示SIP服务器的端口号
DeviceGBID string `gorm:"column:device_gb_id" json:"deviceGBId"` // DeviceGBID表示设备的国标编号
DeviceIP string `gorm:"column:device_ip" json:"deviceIp"` // DeviceIP表示设备的IP地址
DevicePort int `gorm:"column:device_port" json:"devicePort"` // DevicePort表示设备的端口号
Username string `gorm:"column:username" json:"username"` // Username表示SIP认证的用户名,默认使用设备国标编号
Password string `gorm:"column:password" json:"password"` // Password表示SIP认证的密码
Expires int `gorm:"column:expires" json:"expires"` // Expires表示注册的过期时间,单位为秒
KeepTimeout int `gorm:"column:keep_timeout" json:"keepTimeout"` // KeepTimeout表示心跳超时时间,单位为秒
Transport string `gorm:"column:transport" json:"transport"` // Transport表示传输协议类型
CharacterSet string `gorm:"column:character_set" json:"characterSet"` // CharacterSet表示字符集编码
PTZ bool `gorm:"column:ptz" json:"ptz"` // PTZ表示是否允许云台控制
RTCP bool `gorm:"column:rtcp" json:"rtcp"` // RTCP表示是否启用RTCP流保活
Status bool `gorm:"column:status" json:"status"` // Status表示平台当前的在线状态
ChannelCount int `gorm:"column:channel_count" json:"channelCount"` // ChannelCount表示通道数量
CatalogSubscribe bool `gorm:"column:catalog_subscribe" json:"catalogSubscribe"` // CatalogSubscribe表示是否已订阅目录信息
AlarmSubscribe bool `gorm:"column:alarm_subscribe" json:"alarmSubscribe"` // AlarmSubscribe表示是否已订阅报警信息
MobilePositionSubscribe bool `gorm:"column:mobile_position_subscribe" json:"mobilePositionSubscribe"` // MobilePositionSubscribe表示是否已订阅移动位置信息
CatalogGroup int `gorm:"column:catalog_group" json:"catalogGroup"` // CatalogGroup表示目录分组大小,每次向上级发送通道数量
UpdateTime string `gorm:"column:update_time" json:"updateTime"` // UpdateTime表示最后更新时间
CreateTime string `gorm:"column:create_time" json:"createTime"` // CreateTime表示创建时间
AsMessageChannel bool `gorm:"column:as_message_channel" json:"asMessageChannel"` // AsMessageChannel表示是否作为消息通道使用
SendStreamIP string `gorm:"column:send_stream_ip" json:"sendStreamIp"` // SendStreamIP表示点播回复200OK时使用的IP地址
AutoPushChannel *bool `gorm:"column:auto_push_channel" json:"autoPushChannel"` // AutoPushChannel表示是否自动推送通道变化
CatalogWithPlatform int `gorm:"column:catalog_with_platform" json:"catalogWithPlatform"` // CatalogWithPlatform表示目录信息是否包含平台信息(0:关闭,1:打开)
CatalogWithGroup int `gorm:"column:catalog_with_group" json:"catalogWithGroup"` // CatalogWithGroup表示目录信息是否包含分组信息(0:关闭,1:打开)
CatalogWithRegion int `gorm:"column:catalog_with_region" json:"catalogWithRegion"` // CatalogWithRegion表示目录信息是否包含行政区划(0:关闭,1:打开)
CivilCode string `gorm:"column:civil_code" json:"civilCode"` // CivilCode表示行政区划代码
Manufacturer string `gorm:"column:manufacturer" json:"manufacturer"` // Manufacturer表示平台厂商
Model string `gorm:"column:model" json:"model"` // Model表示平台型号
Address string `gorm:"column:address" json:"address"` // Address表示平台安装地址
RegisterWay int `gorm:"column:register_way" json:"registerWay"` // RegisterWay表示注册方式(1:标准认证注册,2:口令认证,3:数字证书双向认证,4:数字证书单向认证)
Secrecy int `gorm:"column:secrecy" json:"secrecy"` // Secrecy表示保密属性(0:不涉密,1:涉密)
ID uint32 `gorm:"primaryKey;autoIncrement"` // ID表示数据库中的唯一标识符
Enable bool `gorm:"column:enable" json:"enable"` // Enable表示该平台配置是否启用
Name string `gorm:"column:name;omitempty" json:"name"` // Name表示平台的名称
ServerGBID string `gorm:"column:server_gb_id;omitempty" json:"serverGBId"` // ServerGBID表示SIP服务器的国标编码
ServerGBDomain string `gorm:"column:server_gb_domain;omitempty" json:"serverGBDomain"` // ServerGBDomain表示SIP服务器的国标域
ServerIP string `gorm:"column:server_ip;omitempty" json:"serverIp"` // ServerIP表示SIP服务器的IP地址
ServerPort int `gorm:"column:server_port;omitempty" json:"serverPort"` // ServerPort表示SIP服务器的端口号
DeviceGBID string `gorm:"column:device_gb_id;omitempty" json:"deviceGBId"` // DeviceGBID表示设备的国标编号
DeviceIP string `gorm:"column:device_ip;omitempty" json:"deviceIp"` // DeviceIP表示设备的IP地址
DevicePort int `gorm:"column:device_port;omitempty" json:"devicePort"` // DevicePort表示设备的端口号
Username string `gorm:"column:username;omitempty" json:"username"` // Username表示SIP认证的用户名,默认使用设备国标编号
Password string `gorm:"column:password;omitempty" json:"password"` // Password表示SIP认证的密码
Expires int `gorm:"column:expires;omitempty" json:"expires"` // Expires表示注册的过期时间,单位为秒
KeepTimeout int `gorm:"column:keep_timeout;omitempty" json:"keepTimeout"` // KeepTimeout表示心跳超时时间,单位为秒
Transport string `gorm:"column:transport;omitempty" json:"transport"` // Transport表示传输协议类型
CharacterSet string `gorm:"column:character_set;omitempty" json:"characterSet"` // CharacterSet表示字符集编码
PTZ bool `gorm:"column:ptz" json:"ptz"` // PTZ表示是否允许云台控制
RTCP bool `gorm:"column:rtcp" json:"rtcp"` // RTCP表示是否启用RTCP流保活
Status bool `gorm:"column:status" json:"status"` // Status表示平台当前的在线状态
ChannelCount int `gorm:"column:channel_count;omitempty" json:"channelCount"` // ChannelCount表示通道数量
CatalogSubscribe bool `gorm:"column:catalog_subscribe" json:"catalogSubscribe"` // CatalogSubscribe表示是否已订阅目录信息
AlarmSubscribe bool `gorm:"column:alarm_subscribe" json:"alarmSubscribe"` // AlarmSubscribe表示是否已订阅报警信息
MobilePositionSubscribe bool `gorm:"column:mobile_position_subscribe" json:"mobilePositionSubscribe"` // MobilePositionSubscribe表示是否已订阅移动位置信息
CatalogGroup int `gorm:"column:catalog_group;omitempty" json:"catalogGroup"` // CatalogGroup表示目录分组大小,每次向上级发送通道数量
UpdateTime string `gorm:"column:update_time;omitempty" json:"updateTime"` // UpdateTime表示最后更新时间
CreateTime string `gorm:"column:create_time;omitempty" json:"createTime"` // CreateTime表示创建时间
AsMessageChannel bool `gorm:"column:as_message_channel" json:"asMessageChannel"` // AsMessageChannel表示是否作为消息通道使用
SendStreamIP string `gorm:"column:send_stream_ip;omitempty" json:"sendStreamIp"` // SendStreamIP表示点播回复200OK时使用的IP地址
AutoPushChannel bool `gorm:"column:auto_push_channel" json:"autoPushChannel"` // AutoPushChannel表示是否自动推送通道变化
CatalogWithPlatform int `gorm:"column:catalog_with_platform;omitempty" json:"catalogWithPlatform"` // CatalogWithPlatform表示目录信息是否包含平台信息(0:关闭,1:打开)
CatalogWithGroup int `gorm:"column:catalog_with_group;omitempty" json:"catalogWithGroup"` // CatalogWithGroup表示目录信息是否包含分组信息(0:关闭,1:打开)
CatalogWithRegion int `gorm:"column:catalog_with_region;omitempty" json:"catalogWithRegion"` // CatalogWithRegion表示目录信息是否包含行政区划(0:关闭,1:打开)
CivilCode string `gorm:"column:civil_code;omitempty" json:"civilCode"` // CivilCode表示行政区划代码
Manufacturer string `gorm:"column:manufacturer;omitempty" json:"manufacturer"` // Manufacturer表示平台厂商
Model string `gorm:"column:model;omitempty" json:"model"` // Model表示平台型号
Address string `gorm:"column:address;omitempty" json:"address"` // Address表示平台安装地址
RegisterWay int `gorm:"column:register_way;omitempty" json:"registerWay"` // RegisterWay表示注册方式(1:标准认证注册,2:口令认证,3:数字证书双向认证,4:数字证书单向认证)
Secrecy int `gorm:"column:secrecy;omitempty" json:"secrecy"` // Secrecy表示保密属性(0:不涉密,1:涉密)
}
// TableName 指定数据库表名
+15 -16
View File
@@ -41,7 +41,11 @@ type Platform struct {
ctx context.Context
}
func (p *Platform) init() {
func NewPlatform(pm *gb28181.PlatformModel, plugin *GB28181ProPlugin) *Platform {
p := &Platform{
PlatformModel: pm,
plugin: plugin,
}
p.ctx = context.Background()
client, err := sipgo.NewClient(p.plugin.ua, sipgo.WithClientHostname(p.PlatformModel.DeviceIP), sipgo.WithClientPort(p.PlatformModel.DevicePort))
if err != nil {
@@ -74,14 +78,11 @@ func (p *Platform) init() {
p.DialogClient = sipgo.NewDialogClient(p.Client, *p.ContactHDR)
p.MaxForwardsHDR = sip.MaxForwardsHeader(70)
p.plugin.platforms.Add(p)
p.plugin.platforms.Set(p)
return p
}
func (p *Platform) Start() error {
if _, ok := p.plugin.platforms.Get(p.ID); !ok {
p.init()
}
register := NewRegister(p, "firstRegister")
register.OnStart(func() {
register.Tick(nil)
@@ -101,7 +102,7 @@ func (p *Platform) getResponse(tx sip.ClientTransaction) (*sip.Response, error)
}
// Keepalive 发送心跳请求到上级平台
func (p *Platform) Keepalive(ctx context.Context) (*sipgo.DialogClientSession, error) {
func (p *Platform) Keepalive() (*sipgo.DialogClientSession, error) {
req := sip.NewRequest("MESSAGE", p.Recipient)
req.SetTransport(strings.ToUpper(p.PlatformModel.Transport))
@@ -149,7 +150,7 @@ func (p *Platform) Keepalive(ctx context.Context) (*sipgo.DialogClientSession, e
req.SetBody(gb28181.BuildKeepAliveXML(p.SN, p.PlatformModel.DeviceGBID))
p.SN++
tx, err := p.Client.TransactionRequest(ctx, req)
tx, err := p.Client.TransactionRequest(p.ctx, req)
if err != nil {
p.Error("keepalive", "error", err.Error())
return nil, fmt.Errorf("创建事务失败: %v", err)
@@ -172,7 +173,7 @@ func (p *Platform) Keepalive(ctx context.Context) (*sipgo.DialogClientSession, e
}
// Unregister 发送注销请求到上级平台
func (p *Platform) Unregister(ctx context.Context) (*sipgo.DialogClientSession, error) {
func (p *Platform) Unregister() (*sipgo.DialogClientSession, error) {
// 创建注销请求的目标URI
recipient := sip.Uri{
User: p.PlatformModel.ServerGBID,
@@ -214,7 +215,7 @@ func (p *Platform) Unregister(ctx context.Context) (*sipgo.DialogClientSession,
req.SetTransport(strings.ToUpper(p.PlatformModel.Transport))
// 发送请求并获取响应
tx, err := p.Client.TransactionRequest(ctx, req)
tx, err := p.Client.TransactionRequest(p.ctx, req)
if err != nil {
return nil, fmt.Errorf("创建事务失败: %v", err)
}
@@ -248,9 +249,7 @@ func (k *PlatformKeepAliveTask) Tick(any) {
if !k.platform.PlatformModel.Enable {
return
}
ctx := context.Background()
_, err := k.platform.Keepalive(ctx)
_, err := k.platform.Keepalive()
if err != nil {
k.platform.KeepAliveReply++
k.Error("keepalive", "error", err.Error())
@@ -708,7 +707,7 @@ func (p *Platform) GetKey() uint32 {
}
// Register 执行注册流程
func (p *Platform) DoRegister(ctx context.Context) error {
func (p *Platform) DoRegister() error {
// 创建基本的REGISTER请求
req := sip.NewRequest(sip.REGISTER, p.Recipient)
@@ -770,7 +769,7 @@ func (p *Platform) DoRegister(ctx context.Context) error {
// 设置传输协议
req.SetTransport(strings.ToUpper(p.PlatformModel.Transport))
tx, err := p.Client.TransactionRequest(ctx, req)
tx, err := p.Client.TransactionRequest(p.ctx, req)
if err != nil {
p.Error("register", "error", err.Error())
return fmt.Errorf("创建事务失败: %v", err)
@@ -814,7 +813,7 @@ func (p *Platform) DoRegister(ctx context.Context) error {
newReq.AppendHeader(sip.NewHeader("Authorization", cred.String()))
// 发送认证请求
tx, err = p.Client.TransactionRequest(ctx, newReq, sipgo.ClientRequestAddVia)
tx, err = p.Client.TransactionRequest(p.ctx, newReq, sipgo.ClientRequestAddVia)
if err != nil {
p.Error("register", "error", err.Error())
return err
+1 -3
View File
@@ -1,7 +1,6 @@
package plugin_gb28181pro
import (
"context"
"errors"
"time"
@@ -38,8 +37,7 @@ func (r *Register) Tick(any) {
}
func (r *Register) Register() {
ctx := context.Background()
if err := r.platform.DoRegister(ctx); err != nil {
if err := r.platform.DoRegister(); err != nil {
if r.registerType == "keepaliveRegister" { //保活注册失败,需要回到首次注册类型
r.Error("keepaliveRegister err", err, "register type is ", r.registerType, "DeviceGBId is", r.platform.PlatformModel.DeviceGBID)
//r.platform.eventChan <- r