fix: update device_db_id in channelinfo

This commit is contained in:
pg
2025-02-13 20:14:35 +08:00
committed by pggiroro
parent 45408c78be
commit 827a0f3fc1
5 changed files with 57 additions and 29 deletions
+8 -7
View File
@@ -44,7 +44,7 @@ func (gb *GB28181ProPlugin) List(context.Context, *emptypb.Empty) (ret *pb.Respo
})
}
ret.Data = append(ret.Data, &pb.Device{
Id: d.ID,
Id: d.DeviceID,
Name: d.Name,
Manufacturer: d.Manufacturer,
Model: d.Model,
@@ -118,7 +118,7 @@ func (gb *GB28181ProPlugin) GetDevice(ctx context.Context, req *pb.GetDeviceRequ
})
}
resp.Data = &pb.Device{
Id: d.ID,
Id: d.DeviceID,
Name: d.Name,
Manufacturer: d.Manufacturer,
Model: d.Model,
@@ -147,7 +147,7 @@ func (gb *GB28181ProPlugin) GetDevices(ctx context.Context, req *pb.GetDevicesRe
total := 0
for d := range gb.devices.Range {
// TODO: 实现查询条件过滤
if req.Query != "" && !strings.Contains(d.ID, req.Query) && !strings.Contains(d.Name, req.Query) {
if req.Query != "" && !strings.Contains(d.DeviceID, req.Query) && !strings.Contains(d.Name, req.Query) {
continue
}
if req.Status && string(d.Status) != "ON" {
@@ -184,7 +184,7 @@ func (gb *GB28181ProPlugin) GetDevices(ctx context.Context, req *pb.GetDevicesRe
})
}
devices = append(devices, &pb.Device{
Id: d.ID,
Id: d.DeviceID,
Name: d.Name,
Manufacturer: d.Manufacturer,
Model: d.Model,
@@ -293,8 +293,9 @@ func (gb *GB28181ProPlugin) SyncDevice(ctx context.Context, req *pb.SyncDeviceRe
// 初始化 Task
var hash uint32
for i := 0; i < len(d.ID); i++ {
hash = hash*31 + uint32(d.ID[i])
for i := 0; i < len(d.DeviceID); i++ {
ch := d.DeviceID[i]
hash = hash*31 + uint32(ch)
}
d.Task.ID = hash
d.Task.Logger = d.Logger
@@ -321,7 +322,7 @@ func (gb *GB28181ProPlugin) SyncDevice(ctx context.Context, req *pb.SyncDeviceRe
d.Recipient = sip.Uri{
Host: d.IP,
Port: d.Port,
User: d.ID,
User: d.DeviceID,
}
// 初始化 SIP 客户端
+22 -11
View File
@@ -25,7 +25,8 @@ const (
type Device struct {
task.Task `gorm:"-:all"`
ID string `gorm:"primaryKey"` // 设备国标编号
ID int64 `gorm:"primaryKey;autoIncrement"` // 数据库自增长ID
DeviceID string // 设备国标编号
Name string // 设备名
Manufacturer string // 生产厂商
Model string // 型号
@@ -80,7 +81,7 @@ func (d *Device) TableName() string {
}
func (d *Device) GetKey() string {
return d.ID
return d.DeviceID
}
func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28181.Message) (err error) {
@@ -98,6 +99,16 @@ func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28
if d.plugin.DB != nil {
// 更新通道信息
for _, c := range msg.DeviceList {
// 设置关联的设备数据库ID
c.DeviceDBID = d.ID
// 先查询是否存在
var existing gb28181.ChannelInfo
if err := d.plugin.DB.Where("device_id = ?", c.DeviceID).First(&existing).Error; err == nil {
c.ID = existing.ID // 保持原有的自增ID
d.Debug("update channel", "channelId", c.DeviceID)
} else {
d.Debug("create channel", "channelId", c.DeviceID)
}
// 使用 Save 进行 upsert 操作
if err := d.plugin.DB.Save(&c).Error; err != nil {
d.Error("save channel failed", "error", err)
@@ -129,7 +140,7 @@ func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28
}
case "Alarm":
d.Status = DeviceAlarmedStatus
body = []byte(gb28181.BuildAlarmResponseXML(d.ID))
body = []byte(gb28181.BuildAlarmResponseXML(d.DeviceID))
case "Broadcast":
d.Info("broadcast message", "body", req.Body())
default:
@@ -202,7 +213,7 @@ func (d *Device) Go() (err error) {
parentId := path[len(path)-1]
//如果父ID并非本身所属设备,一般情况下这是因为下级设备上传了目录信息,该信息通常不需要处理。
// 暂时不考虑级联目录的实现
if d.ID != parentId {
if d.DeviceID != parentId {
if parent, ok := d.plugin.devices.Get(parentId); ok {
parent.addOrUpdateChannel(c)
continue
@@ -233,27 +244,27 @@ func (d *Device) catalog() (*sip.Response, error) {
request := d.CreateRequest(sip.MESSAGE)
//d.subscriber.Timeout = time.Now().Add(time.Second * time.Duration(expires))
request.AppendHeader(sip.NewHeader("Expires", "3600"))
request.SetBody(gb28181.BuildCatalogXML(d.SN, d.ID))
request.SetBody(gb28181.BuildCatalogXML(d.SN, d.DeviceID))
return d.send(request)
}
func (d *Device) subscribeCatalog() (*sip.Response, error) {
request := d.CreateRequest(sip.SUBSCRIBE)
request.AppendHeader(sip.NewHeader("Expires", "3600"))
request.SetBody(gb28181.BuildCatalogXML(d.SN, d.ID))
request.SetBody(gb28181.BuildCatalogXML(d.SN, d.DeviceID))
return d.send(request)
}
func (d *Device) queryDeviceInfo() (*sip.Response, error) {
request := d.CreateRequest(sip.MESSAGE)
request.SetBody(gb28181.BuildDeviceInfoXML(d.SN, d.ID))
request.SetBody(gb28181.BuildDeviceInfoXML(d.SN, d.DeviceID))
return d.send(request)
}
func (d *Device) subscribePosition(interval int) (*sip.Response, error) {
request := d.CreateRequest(sip.SUBSCRIBE)
request.AppendHeader(sip.NewHeader("Expires", "3600"))
request.SetBody(gb28181.BuildDevicePositionXML(d.SN, d.ID, interval))
request.SetBody(gb28181.BuildDevicePositionXML(d.SN, d.DeviceID, interval))
return d.send(request)
}
@@ -271,7 +282,7 @@ func (d *Device) addOrUpdateChannel(c gb28181.ChannelInfo) {
}
func (d *Device) GetID() string {
return d.ID
return d.DeviceID
}
func (d *Device) GetSdpIP() string {
@@ -297,8 +308,8 @@ func (d *Device) CreateDialogSession(req *sip.Request) (*sipgo.DialogClientSessi
func (d *Device) CreateSSRC(serial string) uint16 {
// 使用简单的 hash 函数将设备 ID 转换为 uint16
var hash uint16
for i := 0; i < len(d.ID); i++ {
hash = hash*31 + uint16(d.ID[i])
for i := 0; i < len(d.DeviceID); i++ {
hash = hash*31 + uint16(d.DeviceID[i])
}
return hash
}
+2
View File
@@ -79,7 +79,9 @@ func (d *Dialog) Start() (err error) {
}
func (d *Dialog) Run() (err error) {
d.Channel.Info("before WaitAnswer")
err = d.session.WaitAnswer(d.gb, sipgo.AnswerOptions{})
d.Channel.Info("after WaitAnswer")
if err != nil {
return
}
+22 -10
View File
@@ -143,7 +143,7 @@ type GB28181ProPlugin struct {
m7s.Plugin
AutoInvite bool `default:"true" desc:"自动邀请"`
Serial string `default:"34020000002000000001" desc:"sip 服务 id"` //sip 服务器 id, 默认 34020000002000000001
Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000
Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000
Username string
Password string
Sip SipConfig
@@ -266,6 +266,10 @@ func (gb *GB28181ProPlugin) OnRegister(req *sip.Request, tx sip.ServerTransactio
d.Expires = int(expSec)
d.Online = true
if gb.DB != nil {
var existing Device
if err := gb.DB.Where("device_id = ?", d.DeviceID).First(&existing).Error; err == nil {
d.ID = existing.ID // 保持原有的自增ID
}
gb.DB.Save(d)
}
response := sip.NewResponseFromRequest(req, sip.StatusOK, "OK", nil)
@@ -385,7 +389,7 @@ func (gb *GB28181ProPlugin) OnMessage(req *sip.Request, tx sip.ServerTransaction
// 缓存中没有,尝试从数据库获取
if gb.DB != nil {
var device Device
if err := gb.DB.Where("id = ?", id).First(&device).Error; err == nil {
if err := gb.DB.Where("device_id = ?", id).First(&device).Error; err == nil {
// 从数据库找到设备,恢复设备状态
d = &device
gb.RecoverDevice(d, req)
@@ -467,7 +471,7 @@ func (gb *GB28181ProPlugin) StoreDevice(id string, req *sip.Request) (d *Device)
now := time.Now()
d = &Device{
ID: id,
DeviceID: id,
CreateTime: now,
UpdateTime: now,
RegisterTime: now,
@@ -515,8 +519,9 @@ func (gb *GB28181ProPlugin) StoreDevice(id string, req *sip.Request) (d *Device)
// 使用简单的 hash 函数将设备 ID 转换为 uint32
var hash uint32
for i := 0; i < len(d.ID); i++ {
hash = hash*31 + uint32(d.ID[i])
for i := 0; i < len(d.DeviceID); i++ {
ch := d.DeviceID[i]
hash = hash*31 + uint32(ch)
}
d.Task.ID = hash
@@ -524,22 +529,22 @@ func (gb *GB28181ProPlugin) StoreDevice(id string, req *sip.Request) (d *Device)
gb.devices.Add(d)
d.channels.OnAdd(func(c *Channel) {
if absDevice, ok := gb.Server.PullProxies.Find(func(absDevice *m7s.PullProxy) bool {
return absDevice.Type == "gb28181" && absDevice.URL == fmt.Sprintf("%s/%s", d.ID, c.DeviceID)
return absDevice.Type == "gb28181" && absDevice.URL == fmt.Sprintf("%s/%s", d.DeviceID, c.DeviceID)
}); ok {
c.AbstractDevice = absDevice
absDevice.Handler = c
absDevice.ChangeStatus(m7s.PullProxyStatusOnline)
}
if gb.AutoInvite {
gb.Pull(fmt.Sprintf("%s/%s", d.ID, c.DeviceID), config.Pull{
URL: fmt.Sprintf("%s/%s", d.ID, c.DeviceID),
gb.Pull(fmt.Sprintf("%s/%s", d.DeviceID, c.DeviceID), config.Pull{
URL: fmt.Sprintf("%s/%s", d.DeviceID, c.DeviceID),
}, nil)
}
})
})
d.OnDispose(func() {
d.Status = DeviceOfflineStatus
if gb.devices.RemoveByKey(d.ID) {
if gb.devices.RemoveByKey(d.DeviceID) {
for c := range d.channels.Range {
if c.AbstractDevice != nil {
c.AbstractDevice.ChangeStatus(m7s.PullProxyStatusOffline)
@@ -550,6 +555,13 @@ func (gb *GB28181ProPlugin) StoreDevice(id string, req *sip.Request) (d *Device)
gb.AddTask(d)
if gb.DB != nil {
var existing Device
if err := gb.DB.Where("device_id = ?", d.DeviceID).First(&existing).Error; err == nil {
d.ID = existing.ID // 保持原有的自增ID
gb.Info("StoreDevice", "type", "更新设备", "deviceId", d.DeviceID)
} else {
gb.Info("StoreDevice", "type", "新增设备", "deviceId", d.DeviceID)
}
gb.DB.Save(d)
}
return
@@ -571,7 +583,7 @@ func (gb *GB28181ProPlugin) GetPullableList() []string {
return slices.Collect(func(yield func(string) bool) {
for d := range gb.devices.Range {
for c := range d.channels.Range {
yield(fmt.Sprintf("%s/%s", d.ID, c.DeviceID))
yield(fmt.Sprintf("%s/%s", d.DeviceID, c.DeviceID))
}
}
})
+3 -1
View File
@@ -123,7 +123,9 @@ type (
Type string
}
ChannelInfo struct {
DeviceID string `gorm:"primaryKey"` // 设备国标编号
ID int64 `gorm:"primaryKey;autoIncrement"` // 数据库自增长ID
DeviceDBID int64 // device表里的id
DeviceID string // 设备国标编号
ParentID string
Name string
Manufacturer string