Files
gb28181/internal/core/sms/node_manager.go
T
2026-01-08 13:14:12 +08:00

298 lines
7.4 KiB
Go

package sms
import (
"context"
"fmt"
"log/slog"
"os"
"path/filepath"
"regexp"
"time"
"github.com/gowvp/owl/internal/conf"
"github.com/gowvp/owl/pkg/zlm"
"github.com/ixugo/goddd/pkg/conc"
"github.com/ixugo/goddd/pkg/orm"
"github.com/ixugo/goddd/pkg/reason"
"github.com/ixugo/goddd/pkg/web"
)
const KeepaliveInterval = 2 * 15 * time.Second
type WarpMediaServer struct {
IsOnline bool
LastUpdatedAt time.Time
Config *MediaServer
}
type NodeManager struct {
storer Storer
drivers map[string]Driver
cacheServers conc.Map[string, *WarpMediaServer]
quit chan struct{}
}
func NewNodeManager(storer Storer) *NodeManager {
n := NodeManager{
storer: storer,
drivers: make(map[string]Driver),
quit: make(chan struct{}, 1),
}
n.RegisterDriver(ProtocolZLMediaKit, NewZLMDriver())
n.RegisterDriver(ProtocolLalmax, NewLalmaxDriver())
go n.tickCheck()
return &n
}
func (n *NodeManager) RegisterDriver(name string, driver Driver) {
n.drivers[name] = driver
}
func (n *NodeManager) getDriver(name string) (Driver, error) {
if name == "" {
name = "zlm"
}
d, ok := n.drivers[name]
if !ok {
return nil, fmt.Errorf("driver [%s] not found", name)
}
return d, nil
}
func (n *NodeManager) Close() {
close(n.quit)
}
// tickCheck 定时检查服务是否离线
func (n *NodeManager) tickCheck() {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case <-n.quit:
return
case <-ticker.C:
n.cacheServers.Range(func(_ string, ms *WarpMediaServer) bool {
if time.Since(ms.LastUpdatedAt) < KeepaliveInterval {
ms.IsOnline = true
return true
}
// 尝试主动探测
if ms.Config != nil {
driver, err := n.getDriver(ms.Config.Type)
if err == nil {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
if err := driver.Ping(ctx, ms.Config); err == nil {
ms.LastUpdatedAt = time.Now()
ms.IsOnline = true
cancel()
return true
}
cancel()
}
}
ms.IsOnline = false
return true
})
}
}
}
// 读取 config.ini 文件,通过正则表达式,获取 secret 的值
func getSecret(configDir string) (string, error) {
for _, file := range []string{"zlm.ini", "config.ini"} {
content, err := os.ReadFile(filepath.Join(configDir, file))
if err != nil {
continue
}
re := regexp.MustCompile(`secret=(\w+)`)
matches := re.FindStringSubmatch(string(content))
if len(matches) < 2 {
continue
}
return matches[1], nil
}
return "", fmt.Errorf("unknow")
}
// TODO: 发现配置会导致程序延迟 1~2s 才能启动
func setupSecret(bc *conf.Bootstrap) {
// 六六大顺
for range 6 {
secret, err := getSecret(bc.ConfigDir)
if err == nil {
slog.Info("发现 zlm 配置,已赋值,未回写配置文件", "secret", secret)
bc.Media.Secret = secret
return
}
time.Sleep(200 * time.Millisecond)
continue
}
slog.Warn("未发现 zlm 配置,请手动配置 zlm secret")
}
func (n *NodeManager) Run(bc *conf.Bootstrap, serverPort int) error {
ctx := context.Background()
setupSecret(bc)
cfg := bc.Media
setValueFn := func(ms *MediaServer) {
ms.ID = DefaultMediaServerID
ms.IP = cfg.IP
ms.Ports.HTTP = cfg.HTTPPort
ms.Secret = cfg.Secret
ms.Type = cfg.Type
// TODO: 应该读取环境变量
if ms.Type == "" {
ms.Type = ProtocolZLMediaKit
}
ms.Status = false
ms.RTPPortRange = cfg.RTPPortRange
ms.HookIP = cfg.WebHookIP
ms.SDPIP = cfg.SDPIP
}
var ms MediaServer
if err := n.storer.MediaServer().Edit(ctx, &ms, func(b *MediaServer) {
setValueFn(b)
}, orm.Where("id=?", DefaultMediaServerID)); err != nil {
if !orm.IsErrRecordNotFound(err) {
return err
}
setValueFn(&ms)
if err := n.storer.MediaServer().Add(ctx, &ms); err != nil {
return err
}
}
mediaServers, _, err := n.findMediaServer(ctx, &FindMediaServerInput{
PagerFilter: web.NewPagerFilterMaxSize(),
})
if err != nil {
return err
}
for _, ms := range mediaServers {
go func(ms *MediaServer) {
if err := n.connection(ms, serverPort); err != nil {
slog.Error("Connect media server failed", "id", ms.ID, "err", err)
}
}(ms)
}
return nil
}
func (n *NodeManager) connection(server *MediaServer, serverPort int) error {
n.cacheServers.Store(server.ID, &WarpMediaServer{
LastUpdatedAt: time.Now(),
Config: server,
})
driver, err := n.getDriver(server.Type)
if err != nil {
slog.Error("获取驱动失败", "type", server.Type, "err", err)
return err
}
log := slog.With("id", server.ID, "type", server.Type)
log.Info("MediaServer 连接中...")
ctx := context.Background()
if err := driver.Connect(ctx, server); err != nil {
log.Error("MediaServer 连接失败", "err", err)
return err
}
log.Info("MediaServer 连接成功")
// 更新数据库中的端口信息等
if err := n.storer.MediaServer().Edit(ctx, &MediaServer{}, func(b *MediaServer) {
// 更新字段
b.Ports = server.Ports
b.HookAliveInterval = server.HookAliveInterval
b.Status = server.Status
}, orm.Where("id=?", server.ID)); err != nil {
panic(fmt.Errorf("保存 MediaServer 失败 %w", err))
}
log.Info("MediaServer 配置设置...")
hookPrefix := fmt.Sprintf("http://%s:%d/webhook", server.HookIP, serverPort)
if err := driver.Setup(ctx, server, hookPrefix); err != nil {
log.Error("MediaServer 配置设置失败", "err", err)
return err
}
return nil
}
func (n *NodeManager) Keepalive(serverID string) {
value, ok := n.cacheServers.Load(serverID)
if !ok {
return
}
value.LastUpdatedAt = time.Now()
}
func (n *NodeManager) IsOnline(serverID string) bool {
value, ok := n.cacheServers.Load(serverID)
if !ok {
return false
}
return value.IsOnline
}
// findMediaServer Paginated search
func (n *NodeManager) findMediaServer(ctx context.Context, in *FindMediaServerInput) ([]*MediaServer, int64, error) {
items := make([]*MediaServer, 0)
total, err := n.storer.MediaServer().Find(ctx, &items, in)
if err != nil {
return nil, 0, reason.ErrDB.Withf(`Find err[%s]`, err.Error())
}
return items, total, nil
}
// OpenRTPServer 开启RTP服务器
func (n *NodeManager) OpenRTPServer(server *MediaServer, in zlm.OpenRTPServerRequest) (*zlm.OpenRTPServerResponse, error) {
driver, err := n.getDriver(server.Type)
if err != nil {
return nil, err
}
return driver.OpenRTPServer(context.Background(), server, &in)
}
// CloseRTPServer 关闭RTP服务器
func (n *NodeManager) CloseRTPServer(server *MediaServer, in zlm.CloseRTPServerRequest) (*zlm.CloseRTPServerResponse, error) {
driver, err := n.getDriver(server.Type)
if err != nil {
return nil, err
}
return driver.CloseRTPServer(context.Background(), server, &in)
}
// AddStreamProxy 添加流代理
func (n *NodeManager) AddStreamProxy(server *MediaServer, in AddStreamProxyRequest) (*zlm.AddStreamProxyResponse, error) {
driver, err := n.getDriver(server.Type)
if err != nil {
return nil, err
}
return driver.AddStreamProxy(context.Background(), server, &in)
}
func (n *NodeManager) GetSnapshot(server *MediaServer, in GetSnapRequest) ([]byte, error) {
driver, err := n.getDriver(server.Type)
if err != nil {
return nil, err
}
return driver.GetSnapshot(context.Background(), server, &in)
}
func (n *NodeManager) GetStreamLiveAddr(server *MediaServer, httpPrefix, host, app, stream string) StreamLiveAddr {
driver, err := n.getDriver(server.Type)
if err != nil {
return StreamLiveAddr{Label: err.Error()}
}
return driver.GetStreamLiveAddr(context.Background(), server, httpPrefix, host, app, stream)
}