mirror of
https://github.com/Monibuca/engine.git
synced 2026-04-23 00:07:06 +08:00
增加日志级别配置,增加发布者断开后自动删除流的延时配置
This commit is contained in:
@@ -14,6 +14,7 @@
|
||||
- 引擎包了zap日志框架
|
||||
- 引擎提供事件总线机制,可以对所有插件广播事件
|
||||
## 引擎自带HTTP接口
|
||||
- 获取某一个流的详情 `/api/stream?streamPath=xxx`
|
||||
- 终止某一个流 `/api/closeStream?streamPath=xxx`
|
||||
- 获取engine信息 `/api/sysInfo` 返回值{Version:xxx,StartTime:xxx,IP:[xxx.xxx.xxx.xxx]}
|
||||
- 获取系统基本情况 `/api/summary` 返回值Summary数据
|
||||
@@ -24,6 +25,7 @@
|
||||
# 引擎默认配置
|
||||
```yaml
|
||||
global:
|
||||
loglevel: info # 日志级别,可选值:debug,info,warn,error,panic,fatal
|
||||
http:
|
||||
# 网关地址,用于访问API
|
||||
listenaddr: :8080
|
||||
@@ -45,7 +47,9 @@ global:
|
||||
kickexist: false
|
||||
# 发布流默认过期时间单位秒,超过该时间发布者没有恢复流将被删除
|
||||
publishtimeout: 10
|
||||
# 自动关闭触发后延迟的秒数(期间内如果有新的订阅则取消触发关闭)
|
||||
# 自动关闭触发后延迟的秒数(期间内如果有新的订阅则取消触发关闭),0为关闭该功能,保持连接。
|
||||
delayclosetimeout: 0
|
||||
# 发布者断开后等待时间,超过该时间发布者没有恢复流将被删除,0为关闭该功能,由订阅者决定是否删除
|
||||
waitclosetimeout: 0
|
||||
subscribe:
|
||||
# 是否订阅音频流
|
||||
@@ -62,10 +66,15 @@ global:
|
||||
enableavcc : true
|
||||
# 启用rtp格式缓存,用于rtsp、websocket、gb28181协议
|
||||
enablertp : true
|
||||
# 连接远程控制台的地址
|
||||
consoleurl : wss://console.monibuca.com/ws/v1
|
||||
# 远程控制台的秘钥
|
||||
secret: ""
|
||||
console:
|
||||
# 连接远程控制台的地址
|
||||
server : wss://console.monibuca.com/ws/v1
|
||||
# 远程控制台的秘钥
|
||||
secret: ""
|
||||
# 实例公网地址,提供远程控制台访问的地址,不配置的话使用自动识别的地址
|
||||
publicaddr: ""
|
||||
# 实例公网地址,提供远程控制台访问的地址,不配置的话使用自动识别的地址(https)
|
||||
publicaddrtls: ""
|
||||
```
|
||||
|
||||
# 配置覆盖机制
|
||||
|
||||
+9
-7
@@ -26,11 +26,12 @@ type PushConfig interface {
|
||||
}
|
||||
|
||||
type Publish struct {
|
||||
PubAudio bool
|
||||
PubVideo bool
|
||||
KickExist bool // 是否踢掉已经存在的发布者
|
||||
PublishTimeout int // 发布无数据超时
|
||||
WaitCloseTimeout int // 延迟自动关闭(无订阅时)
|
||||
PubAudio bool
|
||||
PubVideo bool
|
||||
KickExist bool // 是否踢掉已经存在的发布者
|
||||
PublishTimeout int // 发布无数据超时
|
||||
WaitCloseTimeout int // 延迟自动关闭(等待重连)
|
||||
DelayCloseTimeout int // 延迟自动关闭(无订阅时)
|
||||
}
|
||||
|
||||
func (c *Publish) GetPublishConfig() *Publish {
|
||||
@@ -98,6 +99,7 @@ type Engine struct {
|
||||
EnableAVCC bool //启用AVCC格式,rtmp协议使用
|
||||
EnableRTP bool //启用RTP格式,rtsp、gb18181等协议使用
|
||||
Console
|
||||
LogLevel string
|
||||
}
|
||||
type myResponseWriter struct {
|
||||
*websocket.Conn
|
||||
@@ -174,10 +176,10 @@ func (cfg *Engine) OnEvent(event any) {
|
||||
}
|
||||
|
||||
var Global = &Engine{
|
||||
Publish{true, true, false, 10, 0},
|
||||
Publish{true, true, false, 10, 0, 0},
|
||||
Subscribe{true, true, true, false, 10},
|
||||
HTTP{ListenAddr: ":8080", CORS: true, mux: http.DefaultServeMux},
|
||||
false, true, true, Console{
|
||||
"wss://console.monibuca.com:9999/ws/v1", "", "", "",
|
||||
},
|
||||
}, "info",
|
||||
}
|
||||
|
||||
@@ -128,6 +128,9 @@ func (io *IO[C]) receive(streamPath string, specific IIO, conf *C) error {
|
||||
if v, ok := c.(*config.Publish); ok {
|
||||
io.Type = strings.TrimSuffix(io.Type, "Publisher")
|
||||
oldPublisher := s.Publisher
|
||||
if !create && v.WaitCloseTimeout != 0 {
|
||||
s.WaitTimeout = util.Second2Duration(v.WaitCloseTimeout)
|
||||
}
|
||||
if s.Publisher != nil && !s.Publisher.IsClosed() {
|
||||
// 根据配置是否剔出原来的发布者
|
||||
if v.KickExist {
|
||||
@@ -138,7 +141,7 @@ func (io *IO[C]) receive(streamPath string, specific IIO, conf *C) error {
|
||||
}
|
||||
}
|
||||
s.PublishTimeout = util.Second2Duration(v.PublishTimeout)
|
||||
s.WaitCloseTimeout = util.Second2Duration(v.WaitCloseTimeout)
|
||||
s.DelayCloseTimeout = util.Second2Duration(v.DelayCloseTimeout)
|
||||
defer func() {
|
||||
if err == nil {
|
||||
if oldPublisher == nil {
|
||||
|
||||
+5
-3
@@ -53,19 +53,21 @@ func DeleteWriter(writer io.Writer) {
|
||||
multipleWriter.Delete(writer)
|
||||
}
|
||||
func init() {
|
||||
// std.SetOutput(colorableStdout)
|
||||
// std.SetFormatter(LogWriter(defaultFormatter))
|
||||
}
|
||||
func Init(level zapcore.Level) {
|
||||
config := zap.NewDevelopmentConfig()
|
||||
config.EncoderConfig.NewReflectedEncoder = func(w io.Writer) zapcore.ReflectedEncoder {
|
||||
return yaml.NewEncoder(w)
|
||||
}
|
||||
config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
|
||||
config.EncoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("15:04:05")
|
||||
|
||||
config.Level.SetLevel(level)
|
||||
logger = zap.New(
|
||||
zapcore.NewCore(zapcore.NewConsoleEncoder(config.EncoderConfig), zapcore.AddSync(multipleWriter), config.Level),
|
||||
)
|
||||
sugaredLogger = logger.Sugar()
|
||||
// std.SetOutput(colorableStdout)
|
||||
// std.SetFormatter(LogWriter(defaultFormatter))
|
||||
}
|
||||
|
||||
type Zap interface {
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
"github.com/google/uuid"
|
||||
. "github.com/logrusorgru/aurora"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"gopkg.in/yaml.v3"
|
||||
"m7s.live/engine/v4/config"
|
||||
"m7s.live/engine/v4/log"
|
||||
@@ -78,6 +79,12 @@ func Run(ctx context.Context, configFile string) (err error) {
|
||||
log.Error("parsing yml error:", err)
|
||||
}
|
||||
}
|
||||
loglevel, err := zapcore.ParseLevel(EngineConfig.LogLevel)
|
||||
if err != nil {
|
||||
log.Error("parse log level error:", err)
|
||||
loglevel = zapcore.InfoLevel
|
||||
}
|
||||
log.Init(loglevel)
|
||||
Engine.Logger = log.With(zap.Bool("engine", true))
|
||||
Engine.registerHandler()
|
||||
// 使得RawConfig具备全量配置信息,用于合并到插件配置中
|
||||
|
||||
@@ -119,9 +119,9 @@ func FilterStreams[T IPublisher]() (ss []*Stream) {
|
||||
}
|
||||
|
||||
type StreamTimeoutConfig struct {
|
||||
WaitTimeout time.Duration
|
||||
PublishTimeout time.Duration
|
||||
WaitCloseTimeout time.Duration
|
||||
WaitTimeout time.Duration //等待发布者上线
|
||||
PublishTimeout time.Duration //发布者无数据后超时
|
||||
DelayCloseTimeout time.Duration //发布者丢失后等待
|
||||
}
|
||||
type Tracks struct {
|
||||
util.Map[string, Track]
|
||||
@@ -231,7 +231,7 @@ func (r *Stream) action(action StreamAction) (ok bool) {
|
||||
r.timeout.Reset(time.Second * 5) // 5秒心跳,检测track的存活度
|
||||
case STATE_WAITCLOSE:
|
||||
stateEvent = SEwaitClose{event}
|
||||
r.timeout.Reset(r.WaitCloseTimeout)
|
||||
r.timeout.Reset(r.DelayCloseTimeout)
|
||||
case STATE_CLOSED:
|
||||
for !r.actionChan.Close() {
|
||||
// 等待channel发送完毕
|
||||
@@ -289,7 +289,7 @@ func (s *Stream) run() {
|
||||
if s.Publisher != nil {
|
||||
s.Publisher.OnEvent(sub) // 通知Publisher有订阅者离开,在回调中可以去获取订阅者数量
|
||||
}
|
||||
if l == 0 && s.WaitCloseTimeout > 0 {
|
||||
if l == 0 && s.DelayCloseTimeout > 0 {
|
||||
s.action(ACTION_LASTLEAVE)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user