mirror of
https://github.com/Monibuca/plugin-gb28181.git
synced 2026-04-23 00:17:13 +08:00
合并三处定时任务到一个协程
This commit is contained in:
@@ -39,7 +39,6 @@ func NewRecordQueryLink(resultTimeout time.Duration) *recordQueryLink {
|
||||
pendingResult: make(map[string]recordQueryResult),
|
||||
pendingResp: make(map[string]recordQueryResp),
|
||||
}
|
||||
go c.cleanTimeout()
|
||||
return c
|
||||
}
|
||||
|
||||
@@ -50,22 +49,18 @@ func recordQueryKey(deviceId, channelId string, sn int) string {
|
||||
|
||||
// 定期清理过期的查询结果和请求
|
||||
func (c *recordQueryLink) cleanTimeout() {
|
||||
tick := time.NewTicker(time.Millisecond * 100)
|
||||
for {
|
||||
<-tick.C
|
||||
for k, s := range c.pendingResp {
|
||||
if time.Since(s.startTime) > s.timeout {
|
||||
if r, ok := c.pendingResult[k]; ok {
|
||||
c.notify(k, r)
|
||||
} else {
|
||||
c.notify(k, recordQueryResult{err: fmt.Errorf("query time out")})
|
||||
}
|
||||
for k, s := range c.pendingResp {
|
||||
if time.Since(s.startTime) > s.timeout {
|
||||
if r, ok := c.pendingResult[k]; ok {
|
||||
c.notify(k, r)
|
||||
} else {
|
||||
c.notify(k, recordQueryResult{err: fmt.Errorf("query time out")})
|
||||
}
|
||||
}
|
||||
for k, r := range c.pendingResult {
|
||||
if time.Since(r.time) > c.timeout {
|
||||
delete(c.pendingResult, k)
|
||||
}
|
||||
}
|
||||
for k, r := range c.pendingResult {
|
||||
if time.Since(r.time) > c.timeout {
|
||||
delete(c.pendingResult, k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -142,12 +142,7 @@ func (c *GB28181Config) startServer() {
|
||||
} else {
|
||||
c.udpPorts.Init(c.MediaPortMin, c.MediaPortMax)
|
||||
}
|
||||
|
||||
if c.Username != "" || c.Password != "" {
|
||||
go c.removeBanDevice()
|
||||
}
|
||||
|
||||
go c.statusCheckJob()
|
||||
go c.startJob()
|
||||
}
|
||||
|
||||
// func queryCatalog(config *transaction.Config) {
|
||||
@@ -165,45 +160,58 @@ func (c *GB28181Config) startServer() {
|
||||
// }
|
||||
// }
|
||||
|
||||
func (c *GB28181Config) removeBanDevice() {
|
||||
t := time.NewTicker(c.RemoveBanInterval)
|
||||
for range t.C {
|
||||
DeviceRegisterCount.Range(func(key, value interface{}) bool {
|
||||
if value.(int) > MaxRegisterCount {
|
||||
DeviceRegisterCount.Delete(key)
|
||||
// 定时任务
|
||||
func (c *GB28181Config) startJob() {
|
||||
statusTick := time.NewTicker(c.HeartbeatInterval / 2)
|
||||
banTick := time.NewTicker(c.RemoveBanInterval)
|
||||
linkTick := time.NewTicker(time.Millisecond * 100)
|
||||
GB28181Plugin.Debug("start job")
|
||||
for {
|
||||
select {
|
||||
case <-banTick.C:
|
||||
if c.Username != "" || c.Password != "" {
|
||||
c.removeBanDevice()
|
||||
}
|
||||
return true
|
||||
})
|
||||
case <-statusTick.C:
|
||||
c.statusCheck()
|
||||
case <-linkTick.C:
|
||||
RecordQueryLink.cleanTimeout()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// statusCheckJob
|
||||
func (c *GB28181Config) removeBanDevice() {
|
||||
DeviceRegisterCount.Range(func(key, value interface{}) bool {
|
||||
if value.(int) > MaxRegisterCount {
|
||||
DeviceRegisterCount.Delete(key)
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
// statusCheck
|
||||
// - 当设备超过 3 倍心跳时间未发送过心跳(通过 UpdateTime 判断), 视为离线
|
||||
// - 当设备超过注册有效期内为发送过消息,则从设备列表中删除
|
||||
// UpdateTime 在设备发送心跳之外的消息也会被更新,相对于 LastKeepaliveAt 更能体现出设备最会一次活跃的时间
|
||||
func (c *GB28181Config) statusCheckJob() {
|
||||
GB28181Plugin.Info("Device status check job started")
|
||||
t := time.NewTicker(c.HeartbeatInterval / 2)
|
||||
for range t.C {
|
||||
Devices.Range(func(key, value any) bool {
|
||||
d := value.(*Device)
|
||||
if time.Since(d.UpdateTime) > c.RegisterValidity {
|
||||
Devices.Delete(key)
|
||||
GB28181Plugin.Info("Device register timeout",
|
||||
zap.String("id", d.ID),
|
||||
zap.Time("registerTime", d.RegisterTime),
|
||||
zap.Time("updateTime", d.UpdateTime),
|
||||
)
|
||||
} else if time.Since(d.UpdateTime) > c.HeartbeatInterval*3 {
|
||||
d.Status = DeviceOfflineStatus
|
||||
d.channelMap.Range(func(key, value any) bool {
|
||||
ch := value.(*Channel)
|
||||
ch.Status = ChannelOffStatus
|
||||
return true
|
||||
})
|
||||
GB28181Plugin.Info("Device offline", zap.String("id", d.ID), zap.Time("updateTime", d.UpdateTime))
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
func (c *GB28181Config) statusCheck() {
|
||||
Devices.Range(func(key, value any) bool {
|
||||
d := value.(*Device)
|
||||
if time.Since(d.UpdateTime) > c.RegisterValidity {
|
||||
Devices.Delete(key)
|
||||
GB28181Plugin.Info("Device register timeout",
|
||||
zap.String("id", d.ID),
|
||||
zap.Time("registerTime", d.RegisterTime),
|
||||
zap.Time("updateTime", d.UpdateTime),
|
||||
)
|
||||
} else if time.Since(d.UpdateTime) > c.HeartbeatInterval*3 {
|
||||
d.Status = DeviceOfflineStatus
|
||||
d.channelMap.Range(func(key, value any) bool {
|
||||
ch := value.(*Channel)
|
||||
ch.Status = ChannelOffStatus
|
||||
return true
|
||||
})
|
||||
GB28181Plugin.Info("Device offline", zap.String("id", d.ID), zap.Time("updateTime", d.UpdateTime))
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user