diff --git a/plugin/gb28181pro/api.go b/plugin/gb28181pro/api.go index e99a765..2ba6bca 100644 --- a/plugin/gb28181pro/api.go +++ b/plugin/gb28181pro/api.go @@ -44,7 +44,7 @@ func (gb *GB28181ProPlugin) List(context.Context, *emptypb.Empty) (ret *pb.Respo }) } ret.Data = append(ret.Data, &pb.Device{ - Id: d.ID, + Id: d.DeviceID, Name: d.Name, Manufacturer: d.Manufacturer, Model: d.Model, @@ -118,7 +118,7 @@ func (gb *GB28181ProPlugin) GetDevice(ctx context.Context, req *pb.GetDeviceRequ }) } resp.Data = &pb.Device{ - Id: d.ID, + Id: d.DeviceID, Name: d.Name, Manufacturer: d.Manufacturer, Model: d.Model, @@ -147,7 +147,7 @@ func (gb *GB28181ProPlugin) GetDevices(ctx context.Context, req *pb.GetDevicesRe total := 0 for d := range gb.devices.Range { // TODO: 实现查询条件过滤 - if req.Query != "" && !strings.Contains(d.ID, req.Query) && !strings.Contains(d.Name, req.Query) { + if req.Query != "" && !strings.Contains(d.DeviceID, req.Query) && !strings.Contains(d.Name, req.Query) { continue } if req.Status && string(d.Status) != "ON" { @@ -184,7 +184,7 @@ func (gb *GB28181ProPlugin) GetDevices(ctx context.Context, req *pb.GetDevicesRe }) } devices = append(devices, &pb.Device{ - Id: d.ID, + Id: d.DeviceID, Name: d.Name, Manufacturer: d.Manufacturer, Model: d.Model, @@ -293,8 +293,9 @@ func (gb *GB28181ProPlugin) SyncDevice(ctx context.Context, req *pb.SyncDeviceRe // 初始化 Task var hash uint32 - for i := 0; i < len(d.ID); i++ { - hash = hash*31 + uint32(d.ID[i]) + for i := 0; i < len(d.DeviceID); i++ { + ch := d.DeviceID[i] + hash = hash*31 + uint32(ch) } d.Task.ID = hash d.Task.Logger = d.Logger @@ -321,7 +322,7 @@ func (gb *GB28181ProPlugin) SyncDevice(ctx context.Context, req *pb.SyncDeviceRe d.Recipient = sip.Uri{ Host: d.IP, Port: d.Port, - User: d.ID, + User: d.DeviceID, } // 初始化 SIP 客户端 diff --git a/plugin/gb28181pro/device.go b/plugin/gb28181pro/device.go index 879d775..f8af8db 100644 --- a/plugin/gb28181pro/device.go +++ b/plugin/gb28181pro/device.go @@ -25,7 +25,8 @@ const ( type Device struct { task.Task `gorm:"-:all"` - ID string `gorm:"primaryKey"` // 设备国标编号 + ID int64 `gorm:"primaryKey;autoIncrement"` // 数据库自增长ID + DeviceID string // 设备国标编号 Name string // 设备名 Manufacturer string // 生产厂商 Model string // 型号 @@ -80,7 +81,7 @@ func (d *Device) TableName() string { } func (d *Device) GetKey() string { - return d.ID + return d.DeviceID } func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28181.Message) (err error) { @@ -98,6 +99,16 @@ func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28 if d.plugin.DB != nil { // 更新通道信息 for _, c := range msg.DeviceList { + // 设置关联的设备数据库ID + c.DeviceDBID = d.ID + // 先查询是否存在 + var existing gb28181.ChannelInfo + if err := d.plugin.DB.Where("device_id = ?", c.DeviceID).First(&existing).Error; err == nil { + c.ID = existing.ID // 保持原有的自增ID + d.Debug("update channel", "channelId", c.DeviceID) + } else { + d.Debug("create channel", "channelId", c.DeviceID) + } // 使用 Save 进行 upsert 操作 if err := d.plugin.DB.Save(&c).Error; err != nil { d.Error("save channel failed", "error", err) @@ -129,7 +140,7 @@ func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28 } case "Alarm": d.Status = DeviceAlarmedStatus - body = []byte(gb28181.BuildAlarmResponseXML(d.ID)) + body = []byte(gb28181.BuildAlarmResponseXML(d.DeviceID)) case "Broadcast": d.Info("broadcast message", "body", req.Body()) default: @@ -202,7 +213,7 @@ func (d *Device) Go() (err error) { parentId := path[len(path)-1] //如果父ID并非本身所属设备,一般情况下这是因为下级设备上传了目录信息,该信息通常不需要处理。 // 暂时不考虑级联目录的实现 - if d.ID != parentId { + if d.DeviceID != parentId { if parent, ok := d.plugin.devices.Get(parentId); ok { parent.addOrUpdateChannel(c) continue @@ -233,27 +244,27 @@ func (d *Device) catalog() (*sip.Response, error) { request := d.CreateRequest(sip.MESSAGE) //d.subscriber.Timeout = time.Now().Add(time.Second * time.Duration(expires)) request.AppendHeader(sip.NewHeader("Expires", "3600")) - request.SetBody(gb28181.BuildCatalogXML(d.SN, d.ID)) + request.SetBody(gb28181.BuildCatalogXML(d.SN, d.DeviceID)) return d.send(request) } func (d *Device) subscribeCatalog() (*sip.Response, error) { request := d.CreateRequest(sip.SUBSCRIBE) request.AppendHeader(sip.NewHeader("Expires", "3600")) - request.SetBody(gb28181.BuildCatalogXML(d.SN, d.ID)) + request.SetBody(gb28181.BuildCatalogXML(d.SN, d.DeviceID)) return d.send(request) } func (d *Device) queryDeviceInfo() (*sip.Response, error) { request := d.CreateRequest(sip.MESSAGE) - request.SetBody(gb28181.BuildDeviceInfoXML(d.SN, d.ID)) + request.SetBody(gb28181.BuildDeviceInfoXML(d.SN, d.DeviceID)) return d.send(request) } func (d *Device) subscribePosition(interval int) (*sip.Response, error) { request := d.CreateRequest(sip.SUBSCRIBE) request.AppendHeader(sip.NewHeader("Expires", "3600")) - request.SetBody(gb28181.BuildDevicePositionXML(d.SN, d.ID, interval)) + request.SetBody(gb28181.BuildDevicePositionXML(d.SN, d.DeviceID, interval)) return d.send(request) } @@ -271,7 +282,7 @@ func (d *Device) addOrUpdateChannel(c gb28181.ChannelInfo) { } func (d *Device) GetID() string { - return d.ID + return d.DeviceID } func (d *Device) GetSdpIP() string { @@ -297,8 +308,8 @@ func (d *Device) CreateDialogSession(req *sip.Request) (*sipgo.DialogClientSessi func (d *Device) CreateSSRC(serial string) uint16 { // 使用简单的 hash 函数将设备 ID 转换为 uint16 var hash uint16 - for i := 0; i < len(d.ID); i++ { - hash = hash*31 + uint16(d.ID[i]) + for i := 0; i < len(d.DeviceID); i++ { + hash = hash*31 + uint16(d.DeviceID[i]) } return hash } diff --git a/plugin/gb28181pro/dialog.go b/plugin/gb28181pro/dialog.go index 015dea7..31e3306 100644 --- a/plugin/gb28181pro/dialog.go +++ b/plugin/gb28181pro/dialog.go @@ -79,7 +79,9 @@ func (d *Dialog) Start() (err error) { } func (d *Dialog) Run() (err error) { + d.Channel.Info("before WaitAnswer") err = d.session.WaitAnswer(d.gb, sipgo.AnswerOptions{}) + d.Channel.Info("after WaitAnswer") if err != nil { return } diff --git a/plugin/gb28181pro/index.go b/plugin/gb28181pro/index.go index 84f3484..349f41e 100644 --- a/plugin/gb28181pro/index.go +++ b/plugin/gb28181pro/index.go @@ -143,7 +143,7 @@ type GB28181ProPlugin struct { m7s.Plugin AutoInvite bool `default:"true" desc:"自动邀请"` Serial string `default:"34020000002000000001" desc:"sip 服务 id"` //sip 服务器 id, 默认 34020000002000000001 - Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000 + Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000 Username string Password string Sip SipConfig @@ -266,6 +266,10 @@ func (gb *GB28181ProPlugin) OnRegister(req *sip.Request, tx sip.ServerTransactio d.Expires = int(expSec) d.Online = true if gb.DB != nil { + var existing Device + if err := gb.DB.Where("device_id = ?", d.DeviceID).First(&existing).Error; err == nil { + d.ID = existing.ID // 保持原有的自增ID + } gb.DB.Save(d) } response := sip.NewResponseFromRequest(req, sip.StatusOK, "OK", nil) @@ -385,7 +389,7 @@ func (gb *GB28181ProPlugin) OnMessage(req *sip.Request, tx sip.ServerTransaction // 缓存中没有,尝试从数据库获取 if gb.DB != nil { var device Device - if err := gb.DB.Where("id = ?", id).First(&device).Error; err == nil { + if err := gb.DB.Where("device_id = ?", id).First(&device).Error; err == nil { // 从数据库找到设备,恢复设备状态 d = &device gb.RecoverDevice(d, req) @@ -467,7 +471,7 @@ func (gb *GB28181ProPlugin) StoreDevice(id string, req *sip.Request) (d *Device) now := time.Now() d = &Device{ - ID: id, + DeviceID: id, CreateTime: now, UpdateTime: now, RegisterTime: now, @@ -515,8 +519,9 @@ func (gb *GB28181ProPlugin) StoreDevice(id string, req *sip.Request) (d *Device) // 使用简单的 hash 函数将设备 ID 转换为 uint32 var hash uint32 - for i := 0; i < len(d.ID); i++ { - hash = hash*31 + uint32(d.ID[i]) + for i := 0; i < len(d.DeviceID); i++ { + ch := d.DeviceID[i] + hash = hash*31 + uint32(ch) } d.Task.ID = hash @@ -524,22 +529,22 @@ func (gb *GB28181ProPlugin) StoreDevice(id string, req *sip.Request) (d *Device) gb.devices.Add(d) d.channels.OnAdd(func(c *Channel) { if absDevice, ok := gb.Server.PullProxies.Find(func(absDevice *m7s.PullProxy) bool { - return absDevice.Type == "gb28181" && absDevice.URL == fmt.Sprintf("%s/%s", d.ID, c.DeviceID) + return absDevice.Type == "gb28181" && absDevice.URL == fmt.Sprintf("%s/%s", d.DeviceID, c.DeviceID) }); ok { c.AbstractDevice = absDevice absDevice.Handler = c absDevice.ChangeStatus(m7s.PullProxyStatusOnline) } if gb.AutoInvite { - gb.Pull(fmt.Sprintf("%s/%s", d.ID, c.DeviceID), config.Pull{ - URL: fmt.Sprintf("%s/%s", d.ID, c.DeviceID), + gb.Pull(fmt.Sprintf("%s/%s", d.DeviceID, c.DeviceID), config.Pull{ + URL: fmt.Sprintf("%s/%s", d.DeviceID, c.DeviceID), }, nil) } }) }) d.OnDispose(func() { d.Status = DeviceOfflineStatus - if gb.devices.RemoveByKey(d.ID) { + if gb.devices.RemoveByKey(d.DeviceID) { for c := range d.channels.Range { if c.AbstractDevice != nil { c.AbstractDevice.ChangeStatus(m7s.PullProxyStatusOffline) @@ -550,6 +555,13 @@ func (gb *GB28181ProPlugin) StoreDevice(id string, req *sip.Request) (d *Device) gb.AddTask(d) if gb.DB != nil { + var existing Device + if err := gb.DB.Where("device_id = ?", d.DeviceID).First(&existing).Error; err == nil { + d.ID = existing.ID // 保持原有的自增ID + gb.Info("StoreDevice", "type", "更新设备", "deviceId", d.DeviceID) + } else { + gb.Info("StoreDevice", "type", "新增设备", "deviceId", d.DeviceID) + } gb.DB.Save(d) } return @@ -571,7 +583,7 @@ func (gb *GB28181ProPlugin) GetPullableList() []string { return slices.Collect(func(yield func(string) bool) { for d := range gb.devices.Range { for c := range d.channels.Range { - yield(fmt.Sprintf("%s/%s", d.ID, c.DeviceID)) + yield(fmt.Sprintf("%s/%s", d.DeviceID, c.DeviceID)) } } }) diff --git a/plugin/gb28181pro/pkg/message.go b/plugin/gb28181pro/pkg/message.go index a3b4e8f..458353d 100644 --- a/plugin/gb28181pro/pkg/message.go +++ b/plugin/gb28181pro/pkg/message.go @@ -123,7 +123,9 @@ type ( Type string } ChannelInfo struct { - DeviceID string `gorm:"primaryKey"` // 设备国标编号 + ID int64 `gorm:"primaryKey;autoIncrement"` // 数据库自增长ID + DeviceDBID int64 // device表里的id + DeviceID string // 设备国标编号 ParentID string Name string Manufacturer string