国标设备状态检测

This commit is contained in:
xugo
2025-02-15 21:55:41 +08:00
parent 16fc5221a2
commit a42e066c16
14 changed files with 246 additions and 20 deletions
+6
View File
@@ -5,6 +5,7 @@ import (
"net"
"strings"
"sync"
"time"
"github.com/gowvp/gb28181/pkg/gbs/m"
"github.com/gowvp/gb28181/pkg/gbs/sip"
@@ -32,6 +33,9 @@ type Device struct {
source net.Addr
to *sip.Address
lastKeepaliveAt time.Time
lastRegisterAt time.Time
}
// Conn implements Targeter.
@@ -114,6 +118,8 @@ func (c *Client) Store(deviceID string, in *Device) {
v.conn = in.conn
v.source = in.source
v.to = in.to
v.lastKeepaliveAt = in.lastKeepaliveAt
v.lastRegisterAt = in.lastRegisterAt
}
}
+5 -3
View File
@@ -148,9 +148,11 @@ func (g *GB28181API) handlerRegister(ctx *sip.Context) {
fmt.Printf(">>> %p\n", conn)
g.svr.devices.Store(dev.DeviceID, &Device{
conn: conn,
source: ctx.Source,
to: ctx.To,
conn: conn,
source: ctx.Source,
to: ctx.To,
lastKeepaliveAt: time.Now(),
lastRegisterAt: time.Now(),
})
ctx.Log.Info("设备注册成功")
+25 -2
View File
@@ -1,18 +1,22 @@
package gbs
import (
"context"
"fmt"
"net"
"net/http"
"net/url"
"strconv"
"sync"
"time"
"github.com/gowvp/gb28181/internal/conf"
"github.com/gowvp/gb28181/internal/core/gb28181"
"github.com/gowvp/gb28181/internal/core/sms"
"github.com/gowvp/gb28181/pkg/gbs/m"
"github.com/gowvp/gb28181/pkg/gbs/sip"
"github.com/ixugo/goweb/pkg/conc"
"github.com/ixugo/goweb/pkg/system"
)
type Server struct {
@@ -28,7 +32,8 @@ type Server struct {
func NewServer(cfg *conf.Bootstrap, store gb28181.GB28181, sc sms.Core) (*Server, func()) {
api := NewGB28181API(cfg, store, sc.NodeManager)
uri, _ := sip.ParseSipURI(fmt.Sprintf("sip:%s@%s:%d", cfg.Sip.ID, cfg.Sip.Host, cfg.Sip.Port))
ip := system.LocalIP()
uri, _ := sip.ParseSipURI(fmt.Sprintf("sip:%s@%s:%d", cfg.Sip.ID, ip, cfg.Sip.Port))
from := sip.Address{
DisplayName: sip.String{Str: "gowvp"},
URI: &uri,
@@ -63,10 +68,28 @@ func NewServer(cfg *conf.Bootstrap, store gb28181.GB28181, sc sms.Core) (*Server
go svr.ListenUDPServer(fmt.Sprintf(":%d", cfg.Sip.Port))
go svr.ListenTCPServer(fmt.Sprintf(":%d", cfg.Sip.Port))
go c.startTickerCheck()
return &c, c.Close
}
// startTickerCheck 定时检查离线
func (s *Server) startTickerCheck() {
conc.Timer(context.Background(), 60*time.Second, time.Second, func() {
now := time.Now()
s.devices.devices.Range(func(key string, value *Device) bool {
if now.Sub(value.lastKeepaliveAt) >= 3*60*time.Second || value.conn == nil {
s.gb.logout(key, func(d *gb28181.Device) {
d.IsOnline = false
})
// 心跳超时
s.devices.devices.Delete(key)
return true
}
return true
})
})
}
func Start() {
// 数据库表初始化 启动时自动同步数据结构到数据库
// db.DBClient.AutoMigrate(new(Devices))
+1 -1
View File
@@ -161,7 +161,7 @@ func (s *Server) ListenTCPServer(addr string) {
slog.Info("ListenTCPServer Has Been Exits")
return
default:
conn, err := tcp.Accept()
conn, err := tcp.AcceptTCP()
if err != nil {
slog.Error("net.ListenTCP", "err", err, "addr", addr)
return
+57
View File
@@ -0,0 +1,57 @@
package zlm
const (
addStreamProxy = "/index/api/addStreamProxy"
)
type AddStreamProxyRequest struct {
Vhost string `json:"vhost"` // 添加的流的虚拟主机,例如__defaultVhost__
App string `json:"app"` // 添加的流的应用名,例如 live
Stream string `json:"stream"` // 添加的流的 id 名,例如 test
URL string `json:"url"` // 拉流地址,例如 rtmp://live.hkstv.hk.lxdns.com/live/hks2
RetryCount int `json:"retry_count"` // 拉流重试次数,默认为-1 无限重试
RTPType int `json:"rtp_type"` // rtsp 拉流时,拉流方式,0:tcp,1:udp,2:组播
TimeoutSec float32 `json:"timeout_sec"` // 拉流超时时间,单位秒,float 类型
EnableHLS *bool `json:"enable_hls,omitempty"` // 是否转换成 hls-mpegts 协议
EnableHLSFMP4 *bool `json:"enable_hls_fmp4,omitempty"` // 是否转换成 hls-fmp4 协议
EnableMP4 *bool `json:"enable_mp4,omitempty"` // 是否允许 mp4 录制
EnableRTSP *bool `json:"enable_rtsp,omitempty"` // 是否转 rtsp 协议
EnableRTMP *bool `json:"enable_rtmp,omitempty"` // 是否转 rtmp/flv 协议
EnableTS *bool `json:"enable_ts,omitempty"` // 是否转 http-ts/ws-ts 协议
EnableFMP4 *bool `json:"enable_fmp4,omitempty"` // 是否转 http-fmp4/ws-fmp4 协议
HLSDemand *bool `json:"hls_demand,omitempty"` // 该协议是否有人观看才生成
RTSPDemand *bool `json:"rtsp_demand,omitempty"` // 该协议是否有人观看才生成
RTMPDemand *bool `json:"rtmp_demand,omitempty"` // 该协议是否有人观看才生成
TSDemand *bool `json:"ts_demand,omitempty"` // 该协议是否有人观看才生成
FMP4Demand *bool `json:"fmp4_demand,omitempty"` // 该协议是否有人观看才生成
EnableAudio *bool `json:"enable_audio,omitempty"` // 转协议时是否开启音频
AddMuteAudio *bool `json:"add_mute_audio,omitempty"` // 转协议时,无音频是否添加静音 aac 音频
MP4SavePath *string `json:"mp4_save_path,omitempty"` // mp4 录制文件保存根目录,置空使用默认
MP4MaxSecond *int `json:"mp4_max_second,omitempty"` // mp4 录制切片大小,单位秒
MP4AsPlayer *bool `json:"mp4_as_player,omitempty"` // MP4 录制是否当作观看者参与播放人数计数
HLSSavePath *string `json:"hls_save_path,omitempty"` // hls 文件保存保存根目录,置空使用默认
ModifyStamp *int `json:"modify_stamp,omitempty"` // 该流是否开启时间戳覆盖(0:绝对时间戳/1:系统时间戳/2:相对时间戳)
AutoClose *bool `json:"auto_close,omitempty"` // 无人观看是否自动关闭流(不触发无人观看 hook)
}
type AddStreamProxyResponse struct {
FixedHeader
Data struct {
Key string `json:"key"`
} `json:"data"`
}
func (e *Engine) AddStreamProxy(in AddStreamProxyRequest) (*AddStreamProxyResponse, error) {
body, err := struct2map(in)
if err != nil {
return nil, err
}
var resp AddStreamProxyResponse
if err := e.post(addStreamProxy, body, &resp); err != nil {
return nil, err
}
if err := e.ErrHandle(resp.Code, resp.Msg); err != nil {
return nil, err
}
return &resp, nil
}