mirror of
https://github.com/langhuihui/monibuca.git
synced 2026-04-23 01:07:03 +08:00
fix: gb28181 update channels when device modify channelid or add,delete channels
This commit is contained in:
+74
-10
@@ -7,6 +7,7 @@ import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gorm.io/gorm"
|
||||
@@ -68,10 +69,11 @@ type Device struct {
|
||||
// 保留原有字段
|
||||
Status DeviceStatus
|
||||
SN int
|
||||
Recipient sip.Uri `gorm:"-:all"`
|
||||
channels util.Collection[string, *Channel] `gorm:"-:all"`
|
||||
MediaIP string `desc:"收流IP"`
|
||||
Longitude, Latitude string // 经度,纬度
|
||||
Recipient sip.Uri `gorm:"-:all"`
|
||||
channels util.Collection[string, *Channel] `gorm:"-:all"`
|
||||
catalogReqs util.Collection[int, *CatalogRequest] `gorm:"-:all"`
|
||||
MediaIP string `desc:"收流IP"`
|
||||
Longitude, Latitude string // 经度,纬度
|
||||
eventChan chan any
|
||||
client *sipgo.Client
|
||||
contactHDR sip.ContactHeader
|
||||
@@ -105,6 +107,35 @@ func (d *Device) GetKey() string {
|
||||
return d.DeviceID
|
||||
}
|
||||
|
||||
// CatalogRequest 目录请求结构体
|
||||
type CatalogRequest struct {
|
||||
SN, SumNum int
|
||||
FirstResponse bool // 是否为第一个响应
|
||||
*util.Promise
|
||||
sync.Mutex // 保护并发访问
|
||||
}
|
||||
|
||||
func (r *CatalogRequest) GetKey() int {
|
||||
return r.SN
|
||||
}
|
||||
|
||||
// AddResponse 处理响应并返回是否是第一个响应
|
||||
func (r *CatalogRequest) AddResponse() bool {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
wasFirst := r.FirstResponse
|
||||
r.FirstResponse = false
|
||||
return wasFirst
|
||||
}
|
||||
|
||||
// IsComplete 检查是否完成接收
|
||||
func (r *CatalogRequest) IsComplete(channelsLength int) bool {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
return channelsLength >= r.SumNum
|
||||
}
|
||||
|
||||
func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28181.Message) (err error) {
|
||||
source := req.Source()
|
||||
hostname, portStr, _ := net.SplitHostPort(source)
|
||||
@@ -135,12 +166,34 @@ func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28
|
||||
}
|
||||
}
|
||||
case "Catalog":
|
||||
d.eventChan <- msg.DeviceChannelList
|
||||
// 处理目录信息
|
||||
catalogReq, exists := d.catalogReqs.Get(msg.SN)
|
||||
if !exists {
|
||||
// 创建新的目录请求
|
||||
catalogReq = &CatalogRequest{
|
||||
SN: msg.SN,
|
||||
SumNum: msg.SumNum,
|
||||
FirstResponse: true,
|
||||
Promise: util.NewPromise(context.Background()),
|
||||
}
|
||||
d.catalogReqs.Set(catalogReq)
|
||||
}
|
||||
|
||||
// 添加响应并获取是否是第一个响应
|
||||
isFirst := catalogReq.AddResponse()
|
||||
|
||||
// 更新设备信息到数据库
|
||||
if d.plugin.DB != nil {
|
||||
//if err := d.plugin.DB.Where(&gb28181.DeviceChannel{DeviceID: d.DeviceID}).Delete(&gb28181.DeviceChannel{}).Error; err != nil {
|
||||
// d.Error("删除通道失败", "error", err, "deviceId", d.DeviceID)
|
||||
//}
|
||||
// 如果是第一个响应,先清空现有通道
|
||||
if isFirst {
|
||||
d.Debug("清空现有通道", "deviceId", d.DeviceID)
|
||||
if err := d.plugin.DB.Where("device_id = ?", d.DeviceID).Delete(&gb28181.DeviceChannel{}).Error; err != nil {
|
||||
d.Error("删除通道失败", "error", err, "deviceId", d.DeviceID)
|
||||
}
|
||||
// 清空内存中的通道缓存
|
||||
d.channels.Clear()
|
||||
}
|
||||
|
||||
// 更新通道信息
|
||||
for _, c := range msg.DeviceChannelList {
|
||||
// 设置关联的设备数据库ID
|
||||
@@ -153,10 +206,11 @@ func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28
|
||||
}
|
||||
d.addOrUpdateChannel(c)
|
||||
}
|
||||
|
||||
// 更新当前设备的通道数
|
||||
d.ChannelCount = msg.SumNum
|
||||
d.UpdateTime = time.Now()
|
||||
d.Debug("save channel", "deviceid", d.DeviceID)
|
||||
d.Debug("save channel", "deviceid", d.DeviceID, "channels count", d.channels.Length)
|
||||
if err := d.plugin.DB.Model(d).Updates(map[string]interface{}{
|
||||
"channel_count": d.ChannelCount,
|
||||
"update_time": d.UpdateTime,
|
||||
@@ -164,6 +218,12 @@ func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28
|
||||
d.Error("save device failed", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// 在所有通道都添加完成后,检查是否完成接收
|
||||
if catalogReq.IsComplete(d.channels.Length) {
|
||||
catalogReq.Resolve()
|
||||
d.catalogReqs.RemoveByKey(msg.SN)
|
||||
}
|
||||
case "RecordInfo":
|
||||
if channel, ok := d.channels.Get(msg.DeviceID); ok {
|
||||
if req, ok := channel.RecordReqs.Get(msg.SN); ok {
|
||||
@@ -329,6 +389,10 @@ func (d *Device) send(req *sip.Request) (*sip.Response, error) {
|
||||
|
||||
func (d *Device) Go() (err error) {
|
||||
var response *sip.Response
|
||||
|
||||
// 初始化catalogReqs
|
||||
d.catalogReqs.L = new(sync.RWMutex)
|
||||
|
||||
response, err = d.queryDeviceInfo()
|
||||
if err != nil {
|
||||
d.Error("queryDeviceInfo", "err", err)
|
||||
@@ -345,7 +409,7 @@ func (d *Device) Go() (err error) {
|
||||
}
|
||||
subTick := time.NewTicker(time.Second * 3600)
|
||||
defer subTick.Stop()
|
||||
catalogTick := time.NewTicker(time.Second * 60)
|
||||
catalogTick := time.NewTicker(time.Minute * 10)
|
||||
keepaliveSeconds := 60
|
||||
if d.KeepaliveInterval >= 5 {
|
||||
keepaliveSeconds = d.KeepaliveInterval
|
||||
|
||||
Reference in New Issue
Block a user