diff --git a/link.go b/link.go index c2cecef..9a4b7ef 100644 --- a/link.go +++ b/link.go @@ -39,7 +39,6 @@ func NewRecordQueryLink(resultTimeout time.Duration) *recordQueryLink { pendingResult: make(map[string]recordQueryResult), pendingResp: make(map[string]recordQueryResp), } - go c.cleanTimeout() return c } @@ -50,22 +49,18 @@ func recordQueryKey(deviceId, channelId string, sn int) string { // 定期清理过期的查询结果和请求 func (c *recordQueryLink) cleanTimeout() { - tick := time.NewTicker(time.Millisecond * 100) - for { - <-tick.C - for k, s := range c.pendingResp { - if time.Since(s.startTime) > s.timeout { - if r, ok := c.pendingResult[k]; ok { - c.notify(k, r) - } else { - c.notify(k, recordQueryResult{err: fmt.Errorf("query time out")}) - } + for k, s := range c.pendingResp { + if time.Since(s.startTime) > s.timeout { + if r, ok := c.pendingResult[k]; ok { + c.notify(k, r) + } else { + c.notify(k, recordQueryResult{err: fmt.Errorf("query time out")}) } } - for k, r := range c.pendingResult { - if time.Since(r.time) > c.timeout { - delete(c.pendingResult, k) - } + } + for k, r := range c.pendingResult { + if time.Since(r.time) > c.timeout { + delete(c.pendingResult, k) } } } diff --git a/server.go b/server.go index 62655f8..0d40b37 100755 --- a/server.go +++ b/server.go @@ -142,12 +142,7 @@ func (c *GB28181Config) startServer() { } else { c.udpPorts.Init(c.MediaPortMin, c.MediaPortMax) } - - if c.Username != "" || c.Password != "" { - go c.removeBanDevice() - } - - go c.statusCheckJob() + go c.startJob() } // func queryCatalog(config *transaction.Config) { @@ -165,45 +160,58 @@ func (c *GB28181Config) startServer() { // } // } -func (c *GB28181Config) removeBanDevice() { - t := time.NewTicker(c.RemoveBanInterval) - for range t.C { - DeviceRegisterCount.Range(func(key, value interface{}) bool { - if value.(int) > MaxRegisterCount { - DeviceRegisterCount.Delete(key) +// 定时任务 +func (c *GB28181Config) startJob() { + statusTick := time.NewTicker(c.HeartbeatInterval / 2) + banTick := time.NewTicker(c.RemoveBanInterval) + linkTick := time.NewTicker(time.Millisecond * 100) + GB28181Plugin.Debug("start job") + for { + select { + case <-banTick.C: + if c.Username != "" || c.Password != "" { + c.removeBanDevice() } - return true - }) + case <-statusTick.C: + c.statusCheck() + case <-linkTick.C: + RecordQueryLink.cleanTimeout() + } } } -// statusCheckJob +func (c *GB28181Config) removeBanDevice() { + DeviceRegisterCount.Range(func(key, value interface{}) bool { + if value.(int) > MaxRegisterCount { + DeviceRegisterCount.Delete(key) + } + return true + }) +} + +// statusCheck // - 当设备超过 3 倍心跳时间未发送过心跳(通过 UpdateTime 判断), 视为离线 // - 当设备超过注册有效期内为发送过消息,则从设备列表中删除 // UpdateTime 在设备发送心跳之外的消息也会被更新,相对于 LastKeepaliveAt 更能体现出设备最会一次活跃的时间 -func (c *GB28181Config) statusCheckJob() { - GB28181Plugin.Info("Device status check job started") - t := time.NewTicker(c.HeartbeatInterval / 2) - for range t.C { - Devices.Range(func(key, value any) bool { - d := value.(*Device) - if time.Since(d.UpdateTime) > c.RegisterValidity { - Devices.Delete(key) - GB28181Plugin.Info("Device register timeout", - zap.String("id", d.ID), - zap.Time("registerTime", d.RegisterTime), - zap.Time("updateTime", d.UpdateTime), - ) - } else if time.Since(d.UpdateTime) > c.HeartbeatInterval*3 { - d.Status = DeviceOfflineStatus - d.channelMap.Range(func(key, value any) bool { - ch := value.(*Channel) - ch.Status = ChannelOffStatus - return true - }) - GB28181Plugin.Info("Device offline", zap.String("id", d.ID), zap.Time("updateTime", d.UpdateTime)) - } - return true - }) - } +func (c *GB28181Config) statusCheck() { + Devices.Range(func(key, value any) bool { + d := value.(*Device) + if time.Since(d.UpdateTime) > c.RegisterValidity { + Devices.Delete(key) + GB28181Plugin.Info("Device register timeout", + zap.String("id", d.ID), + zap.Time("registerTime", d.RegisterTime), + zap.Time("updateTime", d.UpdateTime), + ) + } else if time.Since(d.UpdateTime) > c.HeartbeatInterval*3 { + d.Status = DeviceOfflineStatus + d.channelMap.Range(func(key, value any) bool { + ch := value.(*Channel) + ch.Status = ChannelOffStatus + return true + }) + GB28181Plugin.Info("Device offline", zap.String("id", d.ID), zap.Time("updateTime", d.UpdateTime)) + } + return true + }) }