diff --git a/README.md b/README.md index b0e6bff..74e0c9d 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,7 @@ # 引擎默认配置 ```yaml global: + loglang: zh # 日志语言,可选值:zh,en loglevel: info # 日志级别,可选值:debug,info,warn,error,panic,fatal http: listenaddr: :8080 # 网关地址,用于访问API @@ -47,28 +48,29 @@ global: cors: true # 是否自动添加cors头 username: "" # 用户名和密码,用于API访问时的基本身份认证 password: "" - readtimeout: 0 # 读取超时时间,单位秒,0为不限制 - writetimeout: 0 # 写入超时时间,单位秒,0为不限制 - idletimeout: 0 # 空闲超时时间,单位秒,0为不限制 + readtimeout: 0 # 读取超时时间,0为不限制 + writetimeout: 0 # 写入超时时间,0为不限制 + idletimeout: 0 # 空闲超时时间,0为不限制 publish: pubaudio: true # 是否发布音频流 pubvideo: true # 是否发布视频流 kickexist: false # 剔出已经存在的发布者,用于顶替原有发布者 - publishtimeout: 10 # 发布流默认过期时间单位秒,超过该时间发布者没有恢复流将被删除 - delayclosetimeout: 0 # 自动关闭触发后延迟的秒数(期间内如果有新的订阅则取消触发关闭),0为关闭该功能,保持连接。 + publishtimeout: 10s # 发布流默认过期时间,超过该时间发布者没有恢复流将被删除 + delayclosetimeout: 0 # 自动关闭触发后延迟的时间(期间内如果有新的订阅则取消触发关闭),0为关闭该功能,保持连接。 waitclosetimeout: 0 # 发布者断开后等待时间,超过该时间发布者没有恢复流将被删除,0为关闭该功能,由订阅者决定是否删除 subscribe: subaudio: true # 是否订阅音频流 subvideo: true # 是否订阅视频流 iframeonly: false # 只订阅关键帧 - waittimeout: 10 # 等待发布者的秒数,用于订阅尚未发布的流 + waittimeout: 10s # 等待发布者超时时间,用于订阅尚未发布的流 enableavcc : true # 启用AVCC格式缓存,用于rtmp协议 enablertp : true # 启用rtp格式缓存,用于rtsp、websocket、gb28181协议 enableauth: true # 启用鉴权,详细查看鉴权机制 enablesubevent: true # 启用订阅事件,用于订阅者上下线事件,关闭可以提高性能 - rtpreoderbufferlen: 50 # rtp乱序重排缓存长度 - speedlimit: 0 # 限速超时时间(毫秒)0为不限速,对于读取文件这类流需要限速,否则读取过快 + rtpreroderbufferlen: 50 # rtp乱序重排缓存长度 + speedlimit: 500ms # 限速超时时间 0为不限速,对于读取文件这类流需要限速,否则读取过快 eventbussize: 10 # 事件总线缓存大小,事件较多时容易堵阻塞线程,需要增大缓存 + pulseinterval: 5s # 心跳事件间隔时间 console: server : console.monibuca.com:4242 # 连接远程控制台的地址 secret: "" # 远程控制台的秘钥 diff --git a/config/config.go b/config/config.go index c413afd..9f3d97f 100644 --- a/config/config.go +++ b/config/config.go @@ -3,6 +3,7 @@ package config import ( "net" "net/http" + "os" "reflect" "strings" "time" @@ -43,11 +44,11 @@ func (config Config) CreateElem(eleType reflect.Type) reflect.Value { } func (config Config) Unmarshal(s any) { - defer func() { - if err := recover(); err != nil { - log.Error("Unmarshal error:", err) - } - }() + // defer func() { + // if err := recover(); err != nil { + // log.Error("Unmarshal error:", err) + // } + // }() if s == nil { return } @@ -106,6 +107,13 @@ func (config Config) Unmarshal(s any) { fv.SetInt(0) } else if d, err := time.ParseDuration(value.String()); err == nil { fv.SetInt(int64(d)) + } else { + if Global.LogLang == "zh" { + log.Errorf("%s 无效的时间值: %v 请添加单位(s,m,h,d),例如:100ms, 10s, 4m, 1h", k, value) + } else { + log.Errorf("%s invalid duration value: %v please add unit (s,m,h,d),eg: 100ms, 10s, 4m, 1h", k, value) + } + os.Exit(1) } continue } @@ -176,7 +184,11 @@ func (config Config) Merge(source Config) { case Config: m.Merge(v.(Config)) default: - log.Debug("merge", k, v) + if Global.LogLang == "zh" { + log.Debug("合并配置", k, ":", v) + } else { + log.Debug("merge", k, ":", v) + } config[k] = v } } else { diff --git a/config/http.go b/config/http.go index 90abab6..3f9da8b 100644 --- a/config/http.go +++ b/config/http.go @@ -67,7 +67,11 @@ func (config *HTTP) Listen(ctx context.Context) error { var g errgroup.Group if config.ListenAddrTLS != "" && (config == &Global.HTTP || config.ListenAddrTLS != Global.ListenAddrTLS) { g.Go(func() error { - log.Info("🌐 https listen at ", Blink(config.ListenAddrTLS)) + if Global.LogLang == "zh" { + log.Info("🌐 https 监听在 ", Blink(config.ListenAddrTLS)) + } else { + log.Info("🌐 https listen at ", Blink(config.ListenAddrTLS)) + } var server = http.Server{ Addr: config.ListenAddrTLS, ReadTimeout: config.ReadTimeout, @@ -80,7 +84,11 @@ func (config *HTTP) Listen(ctx context.Context) error { } if config.ListenAddr != "" && (config == &Global.HTTP || config.ListenAddr != Global.ListenAddr) { g.Go(func() error { - log.Info("🌐 http listen at ", Blink(config.ListenAddr)) + if Global.LogLang == "zh" { + log.Info("🌐 http 监听在 ", Blink(config.ListenAddr)) + } else { + log.Info("🌐 http listen at ", Blink(config.ListenAddr)) + } var server = http.Server{ Addr: config.ListenAddr, ReadTimeout: config.ReadTimeout, diff --git a/config/remote.go b/config/remote.go index 6e98098..7126ae6 100644 --- a/config/remote.go +++ b/config/remote.go @@ -58,11 +58,19 @@ func (cfg *Engine) Remote(ctx context.Context) error { var rMessage map[string]any if err = json.Unmarshal(msg[:len(msg)-1], &rMessage); err == nil { if rMessage["code"].(float64) != 0 { - log.Error("response from console server ", cfg.Server, " ", rMessage["msg"]) + if Global.LogLang == "zh" { + log.Error("控制台服务器", cfg.Server, "返回错误", rMessage["msg"]) + } else { + log.Error("response from console server ", cfg.Server, " ", rMessage["msg"]) + } return nil } else { cfg.reportStream = stream - log.Info("response from console server ", cfg.Server, " success ", rMessage) + if Global.LogLang == "zh" { + log.Info("连接到控制台服务器", cfg.Server, "成功", rMessage) + } else { + log.Info("response from console server ", cfg.Server, " success ", rMessage) + } if v, ok := rMessage["enableReport"]; ok { cfg.enableReport = v.(bool) } @@ -85,7 +93,11 @@ func (cfg *Engine) Remote(ctx context.Context) error { if err != nil { if wasConnected { - log.Error("connect to console server ", cfg.Server, " ", err) + if Global.LogLang == "zh" { + log.Error("连接到控制台服务器", cfg.Server, "失败", err) + } else { + log.Error("connect to console server ", cfg.Server, " ", err) + } } if ctx.Err() == nil { go cfg.Remote(ctx) diff --git a/config/tcp.go b/config/tcp.go index 931686d..c50a6ba 100644 --- a/config/tcp.go +++ b/config/tcp.go @@ -42,7 +42,11 @@ func (tcp *TCP) listen(l net.Listener, handler func(*net.TCPConn)) { func (tcp *TCP) Listen(ctx context.Context, plugin TCPPlugin) error { l, err := net.Listen("tcp", tcp.ListenAddr) if err != nil { - log.Fatalf("%s: Listen error: %v", tcp.ListenAddr, err) + if Global.LogLang == "zh" { + log.Fatalf("%s: 监听失败: %v", tcp.ListenAddr, err) + } else { + log.Fatalf("%s: Listen error: %v", tcp.ListenAddr, err) + } return err } count := tcp.ListenNum diff --git a/config/types.go b/config/types.go index 70383c6..a4654d3 100755 --- a/config/types.go +++ b/config/types.go @@ -117,10 +117,12 @@ type Engine struct { EnableSubEvent bool `default:"true"` //启用订阅事件,禁用可以提高性能 EnableAuth bool `default:"true"` //启用鉴权 Console - LogLevel string `default:"info"` + LogLang string `default:"zh"` //日志语言 + LogLevel string `default:"info"` //日志级别 RTPReorderBufferLen int `default:"50"` //RTP重排序缓冲长度 SpeedLimit time.Duration `default:"500ms"` //速度限制最大等待时间 EventBusSize int `default:"10"` //事件总线大小 + PulseInterval time.Duration `default:"5s"` //心跳事件间隔 enableReport bool `default:"false"` //启用报告,用于统计和监控 reportStream quic.Stream // console server connection instanceId string // instance id 来自console diff --git a/go.mod b/go.mod index e7d57f5..9bdd858 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module m7s.live/engine/v4 go 1.19 require ( + github.com/aler9/gortsplib/v2 v2.2.2 github.com/cnotch/ipchub v1.1.0 github.com/google/uuid v1.3.0 github.com/logrusorgru/aurora v2.0.3+incompatible @@ -13,7 +14,7 @@ require ( github.com/quic-go/quic-go v0.32.0 github.com/shirou/gopsutil/v3 v3.22.10 go.uber.org/zap v1.23.0 - golang.org/x/net v0.4.0 + golang.org/x/net v0.8.0 golang.org/x/sync v0.1.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -45,6 +46,6 @@ require ( golang.org/x/crypto v0.4.0 // indirect golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect golang.org/x/mod v0.7.0 // indirect - golang.org/x/sys v0.3.0 // indirect + golang.org/x/sys v0.6.0 // indirect golang.org/x/tools v0.3.0 // indirect ) diff --git a/go.sum b/go.sum index 13992e4..0ca92ae 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/aler9/gortsplib/v2 v2.2.2 h1:tTw8pdKSOEjlZjjE1S4ftXPHJkYOqjNNv3hjQ0Nto9M= +github.com/aler9/gortsplib/v2 v2.2.2/go.mod h1:k6uBVHGwsIc/0L5SLLqWwi6bSJUb4VR0HfvncyHlKQI= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= @@ -138,8 +140,8 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk= github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM= github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI= @@ -193,8 +195,8 @@ golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su golang.org/x/net v0.0.0-20220531201128-c960675eff93/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= -golang.org/x/net v0.4.0 h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU= -golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= +golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= +golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -228,8 +230,8 @@ golang.org/x/sys v0.0.0-20220622161953-175b2fd9d664/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ= -golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -239,7 +241,7 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM= +golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= diff --git a/http.go b/http.go index 7c1a51a..d588916 100644 --- a/http.go +++ b/http.go @@ -29,18 +29,29 @@ func (conf *GlobalConfig) ServeHTTP(rw http.ResponseWriter, r *http.Request) { } } +func fetchSummary() *Summary { + return &summary +} + func (conf *GlobalConfig) API_summary(rw http.ResponseWriter, r *http.Request) { + format := r.URL.Query().Get("format") if r.Header.Get("Accept") == "text/event-stream" { summary.Add() defer summary.Done() - util.ReturnJson(func() *Summary { - return &summary - }, time.Second, rw, r) + if format == "yaml" { + util.ReturnYaml(fetchSummary, time.Second, rw, r) + } else { + util.ReturnJson(fetchSummary, time.Second, rw, r) + } } else { if !summary.Running() { summary.collect() } - if err := json.NewEncoder(rw).Encode(&summary); err != nil { + if format == "yaml" { + if err := yaml.NewEncoder(rw).Encode(&summary); err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + } + } else if err := json.NewEncoder(rw).Encode(&summary); err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) } } diff --git a/io.go b/io.go index 6f2fd42..9c112ef 100644 --- a/io.go +++ b/io.go @@ -11,6 +11,7 @@ import ( "go.uber.org/zap" "m7s.live/engine/v4/config" + "m7s.live/engine/v4/log" "m7s.live/engine/v4/util" ) @@ -35,7 +36,7 @@ type IO struct { Type string context.Context `json:"-"` //不要直接设置,应当通过OnEvent传入父级Context context.CancelFunc `json:"-"` //流关闭是关闭发布者或者订阅者 - *zap.Logger `json:"-"` + *log.Logger `json:"-"` StartTime time.Time //创建时间 Stream *Stream `json:"-"` io.Reader `json:"-"` @@ -67,7 +68,7 @@ func (i *IO) SetParentCtx(parent context.Context) { i.Context, i.CancelFunc = context.WithCancel(parent) } -func (i *IO) SetLogger(logger *zap.Logger) { +func (i *IO) SetLogger(logger *log.Logger) { i.Logger = logger } @@ -97,7 +98,7 @@ type IIO interface { Stop() SetIO(any) SetParentCtx(context.Context) - SetLogger(*zap.Logger) + SetLogger(*log.Logger) IsShutdown() bool } @@ -126,7 +127,11 @@ func (io *IO) receive(streamPath string, specific IIO) error { streamPath = strings.Trim(streamPath, "/") u, err := url.Parse(streamPath) if err != nil { - io.Error("receive streamPath wrong format", zap.String("streamPath", streamPath), zap.Error(err)) + if EngineConfig.LogLang == "zh" { + io.Error("接收流路径(流唯一标识)格式错误,必须形如 live/test ", zap.String("流路径", streamPath), zap.Error(err)) + } else { + io.Error("receive streamPath wrong format", zap.String("streamPath", streamPath), zap.Error(err)) + } return err } io.Args = u.Query() @@ -156,6 +161,7 @@ func (io *IO) receive(streamPath string, specific IIO) error { if v, ok := specific.(IPublisher); ok { conf := v.GetPublisher().Config io.Type = strings.TrimSuffix(io.Type, "Publisher") + io.Info("publish") oldPublisher := s.Publisher if oldPublisher != nil && !oldPublisher.IsClosed() { // 根据配置是否剔出原来的发布者 @@ -200,6 +206,7 @@ func (io *IO) receive(streamPath string, specific IIO) error { } } else { io.Type = strings.TrimSuffix(io.Type, "Subscriber") + io.Info("subscribe") if create { EventBus <- s // 通知发布者按需拉流 } diff --git a/lang/lang.go b/lang/lang.go new file mode 100644 index 0000000..70afe69 --- /dev/null +++ b/lang/lang.go @@ -0,0 +1,36 @@ +package lang + +import ( + _ "embed" + + "gopkg.in/yaml.v3" +) + +//go:embed zh.yaml +var zhYaml []byte +var zh map[string]string + +func init() { + yaml.Unmarshal(zhYaml, &zh) +} + +func Get(lang string) map[string]string { + if lang == "zh" { + return zh + } + return nil +} + +func Update(lang string, key string, value string) { + if lang == "zh" { + zh[key] = value + } +} + +func Merge(lang string, data map[string]string) { + if lang == "zh" { + for k, v := range data { + zh[k] = v + } + } +} \ No newline at end of file diff --git a/lang/zh.yaml b/lang/zh.yaml new file mode 100644 index 0000000..23ca296 --- /dev/null +++ b/lang/zh.yaml @@ -0,0 +1,50 @@ +engine: 引擎 +config: 配置 +stream: 流 +publish: 发布 +subscribe: 订阅 +pull: 拉取 +push: 推送 +action: 动作 +type: 类型 +enable: 启用 +listenaddr: 监听地址 +kick: 踢出 +stop: 停止 +start: 开始 +install: 安装 +version: 版本 +name: 名称 +state: 状态 +initialize: 初始化 +"start read": 开始读取 +"wait publisher": 等待发布者发布 +"wait timeout": 等待超时 +created: 已创建 +create: 创建 +timeout: 超时 +track: 轨道 +"track timeout": 轨道超时 +"last writetime": 最后写入时间 +track+1: 轨道+1 +playblock: 阻塞式播放 +"play neither video nor audio": 播放既没有视频也没有音频 +"play before subscribe": 播放之前需要先订阅 +"suber -1": 订阅者-1 +"suber +1": 订阅者+1 +"innersuber +1": 内部订阅者+1 +reamins: 剩余 +"need sequence frame": 需要序列帧 +"video codecID not support": 视频编码不支持 +"http handle added": http处理器已添加 +"http handle added to engine": http处理器已添加到引擎 +"plugin disabled": 插件已禁用 +"pull failed": 拉取失败 +"audio codec not support yet": 音频编码暂不支持 +"video track attached": 视频轨道已附加 +"audio track attached": 音频轨道已附加 +"first frame read": 第一帧已读取 +firstTs: 第一帧时间戳 +firstSeq: 第一帧序列号 +skipSeq: 跳过序列号 +skipTs: 跳过时间戳 \ No newline at end of file diff --git a/log/log.go b/log/log.go index 05bfdb9..fd06d8c 100644 --- a/log/log.go +++ b/log/log.go @@ -3,408 +3,121 @@ package log import ( // . "github.com/logrusorgru/aurora" "io" - "os" // "github.com/mattn/go-colorable" "gopkg.in/yaml.v3" // log "github.com/sirupsen/logrus" + . "github.com/logrusorgru/aurora" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) -var sugaredLogger *zap.SugaredLogger -var logger *zap.Logger - -// var levelColors = []func(any) Value{Red, Red, Red, Yellow, Blue, Green, White} - -// type LogWriter func(*log.Entry) string - -// var colorableStdout = colorable.NewColorableStdout() -type MultipleWriter []io.Writer - -func (m *MultipleWriter) Write(p []byte) (n int, err error) { - for _, w := range *m { - n, err = w.Write(p) - if err != nil { - m.Delete(w) - } - } - return len(p), nil -} -func (m *MultipleWriter) Delete(writer io.Writer) { - for i, w := range *m { - if w == writer { - *m = append((*m)[:i], (*m)[i+1:]...) - return - } - } -} -func (m *MultipleWriter) Add(writer io.Writer) { - *m = append(*m, writer) -} - -var multipleWriter = &MultipleWriter{os.Stdout} -var Config = zap.NewDevelopmentConfig() -func AddWriter(writer io.Writer) { - multipleWriter.Add(writer) -} -func DeleteWriter(writer io.Writer) { - multipleWriter.Delete(writer) -} -func init() { - // std.SetOutput(colorableStdout) - // std.SetFormatter(LogWriter(defaultFormatter)) - Config.EncoderConfig.NewReflectedEncoder = func(w io.Writer) zapcore.ReflectedEncoder { +var engineConfig = zapcore.EncoderConfig{ + // Keys can be anything except the empty string. + TimeKey: "T", + LevelKey: "L", + NameKey: "N", + CallerKey: "C", + FunctionKey: zapcore.OmitKey, + MessageKey: "M", + StacktraceKey: "S", + LineEnding: zapcore.DefaultLineEnding, + EncodeLevel: zapcore.CapitalColorLevelEncoder, + EncodeTime: zapcore.TimeEncoderOfLayout("15:04:05"), + EncodeDuration: zapcore.StringDurationEncoder, + EncodeCaller: zapcore.ShortCallerEncoder, + EncodeName: NameEncoder, + NewReflectedEncoder: func(w io.Writer) zapcore.ReflectedEncoder { return yaml.NewEncoder(w) - } - Config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder - Config.EncoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("15:04:05") - logger = zap.New( - zapcore.NewCore(zapcore.NewConsoleEncoder(Config.EncoderConfig), zapcore.AddSync(multipleWriter), Config.Level), - ) - sugaredLogger = logger.Sugar() + }, +} +var LogLevel = zap.NewAtomicLevelAt(zap.DebugLevel) +var logger *zap.Logger = zap.New( + zapcore.NewCore(zapcore.NewConsoleEncoder(engineConfig), zapcore.AddSync(multipleWriter), LogLevel), +) +var sugaredLogger *zap.SugaredLogger = logger.Sugar() +var LocaleLogger *Logger + +func NameEncoder(loggerName string, enc zapcore.PrimitiveArrayEncoder) { + enc.AppendString(Colorize(loggerName, WhiteFg|BlackBg).String()) } type Zap interface { - With(fields ...zap.Field) *zap.Logger + Lang(lang map[string]string) *Logger + Named(name string) *Logger + With(fields ...zap.Field) *Logger Debug(msg string, fields ...zap.Field) Info(msg string, fields ...zap.Field) Warn(msg string, fields ...zap.Field) Error(msg string, fields ...zap.Field) } -func With(fields ...zap.Field) *zap.Logger { - return logger.With(fields...) +type Logger struct { + *zap.Logger + lang map[string]string } -func Debug(args ...any) { - sugaredLogger.Debug(args...) +func (l Logger) Lang(lang map[string]string) *Logger { + l.Logger = logger + l.lang = lang + return &l } -func Info(args ...any) { - sugaredLogger.Info(args...) +func (l Logger) Named(name string) *Logger { + l.Logger = l.Logger.Named(name) + return &l } -func Warn(args ...any) { - sugaredLogger.Warn(args...) +func (l Logger) With(fields ...zap.Field) *Logger { + for i, field := range fields { + if v, ok := l.lang[field.Key]; ok { + fields[i].Key = v + } + } + l.Logger = l.Logger.With(fields...) + return &l } -func Error(args ...any) { - sugaredLogger.Error(args...) +func (l *Logger) formatLang(msg *string, fields []zapcore.Field) { + if l.lang != nil { + if v, ok := l.lang[*msg]; ok { + *msg = v + } + for i, field := range fields { + if v, ok := l.lang[field.Key]; ok { + fields[i].Key = v + } + } + } } -func Debugf(format string, args ...interface{}) { - sugaredLogger.Debugf(format, args...) +func (l *Logger) Debug(msg string, fields ...zap.Field) { + l.formatLang(&msg, fields) + l.Logger.Debug(msg, fields...) } -// Infof logs a message at level Info on the standard logger. -func Infof(format string, args ...interface{}) { - sugaredLogger.Infof(format, args...) +func (l *Logger) Info(msg string, fields ...zap.Field) { + l.formatLang(&msg, fields) + l.Logger.Info(msg, fields...) } -// Warnf logs a message at level Warn on the standard logger. -func Warnf(format string, args ...interface{}) { - sugaredLogger.Warnf(format, args...) +func (l *Logger) Warn(msg string, fields ...zap.Field) { + l.formatLang(&msg, fields) + l.Logger.Warn(msg, fields...) } -// Errorf logs a message at level Error on the standard logger. -func Errorf(format string, args ...interface{}) { - sugaredLogger.Errorf(format, args...) +func (l *Logger) Error(msg string, fields ...zap.Field) { + l.formatLang(&msg, fields) + l.Logger.Error(msg, fields...) } -// Panicf logs a message at level Panic on the standard logger. -func Panicf(format string, args ...interface{}) { - sugaredLogger.Panicf(format, args...) +func (l *Logger) Fatal(msg string, fields ...zap.Field) { + l.formatLang(&msg, fields) + l.Logger.Fatal(msg, fields...) } -// Fatalf logs a message at level Fatal on the standard logger then the process will exit with status set to 1. -func Fatalf(format string, args ...interface{}) { - sugaredLogger.Fatalf(format, args...) +func (l *Logger) Panic(msg string, fields ...zap.Field) { + l.formatLang(&msg, fields) + l.Logger.Panic(msg, fields...) } - -// func defaultFormatter(entry *log.Entry) string { -// pl := entry.Data["plugin"] -// if pl == nil { -// pl = "Engine" -// } -// l := strings.ToUpper(entry.Level.String())[:1] -// var props string -// if stream := entry.Data["stream"]; stream != nil { -// props = Sprintf("[s:%s] ", stream) -// } -// if puber := entry.Data["puber"]; puber != nil { -// props += Sprintf("[pub:%s] ", puber) -// } -// if suber := entry.Data["suber"]; suber != nil { -// props += Sprintf("[sub:%s] ", suber) -// } -// return Sprintf(levelColors[entry.Level]("%s [%s] [%s]\t %s%s\n"), l, entry.Time.Format("15:04:05"), pl, props, entry.Message) -// } - -// func (f LogWriter) Format(entry *log.Entry) (b []byte, err error) { -// return []byte(f(entry)), nil -// } - -// var ( -// // std is the name of the standard logger in stdlib `log` -// std = log.New() -// ) - -// func StandardLogger() *log.Logger { -// return std -// } - -// // SetOutput sets the standard logger output. -// func SetOutput(out io.Writer) { -// std.SetOutput(out) -// } - -// // SetFormatter sets the standard logger formatter. -// func SetFormatter(formatter log.Formatter) { -// std.SetFormatter(formatter) -// } - -// // SetReportCaller sets whether the standard logger will include the calling -// // method as a field. -// func SetReportCaller(include bool) { -// std.SetReportCaller(include) -// } - -// // SetLevel sets the standard logger level. -// func SetLevel(level log.Level) { -// std.SetLevel(level) -// } - -// // GetLevel returns the standard logger level. -// func GetLevel() log.Level { -// return std.GetLevel() -// } - -// // IsLevelEnabled checks if the log level of the standard logger is greater than the level param -// func IsLevelEnabled(level log.Level) bool { -// return std.IsLevelEnabled(level) -// } - -// // AddHook adds a hook to the standard logger hooks. -// func AddHook(hook log.Hook) { -// std.AddHook(hook) -// } - -// // WithError creates an entry from the standard logger and adds an error to it, using the value defined in ErrorKey as key. -// func WithError(err error) *log.Entry { -// return std.WithField(log.ErrorKey, err) -// } - -// // WithContext creates an entry from the standard logger and adds a context to it. -// func WithContext(ctx context.Context) *log.Entry { -// return std.WithContext(ctx) -// } - -// // WithField creates an entry from the standard logger and adds a field to -// // it. If you want multiple fields, use `WithFields`. -// // -// // Note that it doesn't log until you call Debug, Print, Info, Warn, Fatal -// // or Panic on the Entry it returns. -// func WithField(key string, value interface{}) *log.Entry { -// return std.WithField(key, value) -// } - -// // WithFields creates an entry from the standard logger and adds multiple -// // fields to it. This is simply a helper for `WithField`, invoking it -// // once for each field. -// // -// // Note that it doesn't log until you call Debug, Print, Info, Warn, Fatal -// // or Panic on the Entry it returns. -// func WithFields(fields log.Fields) *log.Entry { -// return std.WithFields(fields) -// } - -// // WithTime creates an entry from the standard logger and overrides the time of -// // logs generated with it. -// // -// // Note that it doesn't log until you call Debug, Print, Info, Warn, Fatal -// // or Panic on the Entry it returns. -// func WithTime(t time.Time) *log.Entry { -// return std.WithTime(t) -// } - -// // Trace logs a message at level Trace on the standard logger. -// func Trace(args ...interface{}) { -// std.Trace(args...) -// } - -// // Debug logs a message at level Debug on the standard logger. -// func Debug(args ...interface{}) { -// std.Debug(args...) -// } - -// // Print logs a message at level Info on the standard logger. -// func Print(args ...interface{}) { -// std.Print(args...) -// } - -// // Info logs a message at level Info on the standard logger. -// func Info(args ...interface{}) { -// std.Info(args...) -// } - -// // Warn logs a message at level Warn on the standard logger. -// func Warn(args ...interface{}) { -// std.Warn(args...) -// } - -// // Warning logs a message at level Warn on the standard logger. -// func Warning(args ...interface{}) { -// std.Warning(args...) -// } - -// // Error logs a message at level Error on the standard logger. -// func Error(args ...interface{}) { -// std.Error(args...) -// } - -// // Panic logs a message at level Panic on the standard logger. -// func Panic(args ...interface{}) { -// std.Panic(args...) -// } - -// // Fatal logs a message at level Fatal on the standard logger then the process will exit with status set to 1. -// func Fatal(args ...interface{}) { -// std.Fatal(args...) -// } - -// // TraceFn logs a message from a func at level Trace on the standard logger. -// func TraceFn(fn log.LogFunction) { -// std.TraceFn(fn) -// } - -// // DebugFn logs a message from a func at level Debug on the standard logger. -// func DebugFn(fn log.LogFunction) { -// std.DebugFn(fn) -// } - -// // PrintFn logs a message from a func at level Info on the standard logger. -// func PrintFn(fn log.LogFunction) { -// std.PrintFn(fn) -// } - -// // InfoFn logs a message from a func at level Info on the standard logger. -// func InfoFn(fn log.LogFunction) { -// std.InfoFn(fn) -// } - -// // WarnFn logs a message from a func at level Warn on the standard logger. -// func WarnFn(fn log.LogFunction) { -// std.WarnFn(fn) -// } - -// // WarningFn logs a message from a func at level Warn on the standard logger. -// func WarningFn(fn log.LogFunction) { -// std.WarningFn(fn) -// } - -// // ErrorFn logs a message from a func at level Error on the standard logger. -// func ErrorFn(fn log.LogFunction) { -// std.ErrorFn(fn) -// } - -// // PanicFn logs a message from a func at level Panic on the standard logger. -// func PanicFn(fn log.LogFunction) { -// std.PanicFn(fn) -// } - -// // FatalFn logs a message from a func at level Fatal on the standard logger then the process will exit with status set to 1. -// func FatalFn(fn log.LogFunction) { -// std.FatalFn(fn) -// } - -// // Tracef logs a message at level Trace on the standard logger. -// func Tracef(format string, args ...interface{}) { -// std.Tracef(format, args...) -// } - -// // Debugf logs a message at level Debug on the standard logger. -// func Debugf(format string, args ...interface{}) { -// std.Debugf(format, args...) -// } - -// // Printf logs a message at level Info on the standard logger. -// func Printf(format string, args ...interface{}) { -// std.Printf(format, args...) -// } - -// // Infof logs a message at level Info on the standard logger. -// func Infof(format string, args ...interface{}) { -// std.Infof(format, args...) -// } - -// // Warnf logs a message at level Warn on the standard logger. -// func Warnf(format string, args ...interface{}) { -// std.Warnf(format, args...) -// } - -// // Warningf logs a message at level Warn on the standard logger. -// func Warningf(format string, args ...interface{}) { -// std.Warningf(format, args...) -// } - -// // Errorf logs a message at level Error on the standard logger. -// func Errorf(format string, args ...interface{}) { -// std.Errorf(format, args...) -// } - -// // Panicf logs a message at level Panic on the standard logger. -// func Panicf(format string, args ...interface{}) { -// std.Panicf(format, args...) -// } - -// // Fatalf logs a message at level Fatal on the standard logger then the process will exit with status set to 1. -// func Fatalf(format string, args ...interface{}) { -// std.Fatalf(format, args...) -// } - -// // Traceln logs a message at level Trace on the standard logger. -// func Traceln(args ...interface{}) { -// std.Traceln(args...) -// } - -// // Debugln logs a message at level Debug on the standard logger. -// func Debugln(args ...interface{}) { -// std.Debugln(args...) -// } - -// // Println logs a message at level Info on the standard logger. -// func Println(args ...interface{}) { -// std.Println(args...) -// } - -// // Infoln logs a message at level Info on the standard logger. -// func Infoln(args ...interface{}) { -// std.Infoln(args...) -// } - -// // Warnln logs a message at level Warn on the standard logger. -// func Warnln(args ...interface{}) { -// std.Warnln(args...) -// } - -// // Warningln logs a message at level Warn on the standard logger. -// func Warningln(args ...interface{}) { -// std.Warningln(args...) -// } - -// // Errorln logs a message at level Error on the standard logger. -// func Errorln(args ...interface{}) { -// std.Errorln(args...) -// } - -// // Panicln logs a message at level Panic on the standard logger. -// func Panicln(args ...interface{}) { -// std.Panicln(args...) -// } - -// // Fatalln logs a message at level Fatal on the standard logger then the process will exit with status set to 1. -// func Fatalln(args ...interface{}) { -// std.Fatalln(args...) -// } diff --git a/log/sugar.go b/log/sugar.go new file mode 100644 index 0000000..caaf102 --- /dev/null +++ b/log/sugar.go @@ -0,0 +1,46 @@ +package log + +func Debug(args ...any) { + sugaredLogger.Debug(args...) +} + +func Info(args ...any) { + sugaredLogger.Info(args...) +} + +func Warn(args ...any) { + sugaredLogger.Warn(args...) +} + +func Error(args ...any) { + sugaredLogger.Error(args...) +} + +func Debugf(format string, args ...interface{}) { + sugaredLogger.Debugf(format, args...) +} + +// Infof logs a message at level Info on the standard logger. +func Infof(format string, args ...interface{}) { + sugaredLogger.Infof(format, args...) +} + +// Warnf logs a message at level Warn on the standard logger. +func Warnf(format string, args ...interface{}) { + sugaredLogger.Warnf(format, args...) +} + +// Errorf logs a message at level Error on the standard logger. +func Errorf(format string, args ...interface{}) { + sugaredLogger.Errorf(format, args...) +} + +// Panicf logs a message at level Panic on the standard logger. +func Panicf(format string, args ...interface{}) { + sugaredLogger.Panicf(format, args...) +} + +// Fatalf logs a message at level Fatal on the standard logger then the process will exit with status set to 1. +func Fatalf(format string, args ...interface{}) { + sugaredLogger.Fatalf(format, args...) +} diff --git a/log/writer.go b/log/writer.go new file mode 100644 index 0000000..6abf776 --- /dev/null +++ b/log/writer.go @@ -0,0 +1,38 @@ +package log + +import ( + "io" + "os" +) + +type MultipleWriter []io.Writer + +func (m *MultipleWriter) Write(p []byte) (n int, err error) { + for _, w := range *m { + n, err = w.Write(p) + if err != nil { + m.Delete(w) + } + } + return len(p), nil +} +func (m *MultipleWriter) Delete(writer io.Writer) { + for i, w := range *m { + if w == writer { + *m = append((*m)[:i], (*m)[i+1:]...) + return + } + } +} +func (m *MultipleWriter) Add(writer io.Writer) { + *m = append(*m, writer) +} + +var multipleWriter = &MultipleWriter{os.Stdout} + +func AddWriter(writer io.Writer) { + multipleWriter.Add(writer) +} +func DeleteWriter(writer io.Writer) { + multipleWriter.Delete(writer) +} diff --git a/main.go b/main.go index c0f98c7..a531356 100755 --- a/main.go +++ b/main.go @@ -24,6 +24,7 @@ import ( "m7s.live/engine/v4/config" "m7s.live/engine/v4/log" "m7s.live/engine/v4/util" + "m7s.live/engine/v4/lang" ) var ( @@ -86,19 +87,23 @@ func Run(ctx context.Context, configFile string) (err error) { } } loglevel, err := zapcore.ParseLevel(EngineConfig.LogLevel) + var logger log.Logger + log.LocaleLogger = logger.Lang(lang.Get(EngineConfig.LogLang)) if err != nil { - log.Error("parse log level error:", err) + logger.Error("parse log level error:", zap.Error(err)) loglevel = zapcore.InfoLevel } - log.Config.Level.SetLevel(loglevel) - Engine.Logger = log.With(zap.Bool("engine", true)) + log.LogLevel.SetLevel(loglevel) + Engine.Logger = log.LocaleLogger.Named("engine") // 使得RawConfig具备全量配置信息,用于合并到插件配置中 Engine.RawConfig = config.Struct2Config(EngineConfig.Engine) Engine.assign() - log.With(zap.String("config", "global")).Debug("", zap.Any("config", EngineConfig)) + Engine.Logger.Debug("", zap.Any("config", EngineConfig)) EventBus = make(chan any, EngineConfig.EventBusSize) go EngineConfig.Listen(Engine) for _, plugin := range plugins { + plugin.Logger = log.LocaleLogger.Named(plugin.Name) + plugin.Info("initialize", zap.String("version", plugin.Version)) userConfig := cg.GetChild(plugin.Name) if userConfig != nil { if b, err := yaml.Marshal(userConfig); err == nil { @@ -124,22 +129,34 @@ func Run(ctx context.Context, configFile string) (err error) { if ver, ok := ctx.Value("version").(string); ok && ver != "" && ver != "dev" { version = ver } - log.Info("monibuca", version, Green(" start success")) + if EngineConfig.LogLang == "zh" { + log.Info("monibuca 引擎版本:", version, Green(" 启动成功")) + } else { + log.Info("monibuca", version, Green(" start success")) + } var enabledPlugins, disabledPlugins []string for _, plugin := range plugins { - if plugin.RawConfig["enable"] == false { + if plugin.RawConfig["enable"] == false || plugin.Disabled { plugin.Disabled = true disabledPlugins = append(disabledPlugins, plugin.Name) } else { enabledPlugins = append(enabledPlugins, plugin.Name) } } - fmt.Print("已运行的插件:") + if EngineConfig.LogLang == "zh" { + fmt.Print("已运行的插件:") + } else { + fmt.Print("enabled plugins:") + } for _, plugin := range enabledPlugins { fmt.Print(Colorize(" "+plugin+" ", BlackFg|GreenBg|BoldFm), " ") } fmt.Println() - fmt.Print("已禁用的插件:") + if EngineConfig.LogLang == "zh" { + fmt.Print("已禁用的插件:") + } else { + fmt.Print("disabled plugins:") + } for _, plugin := range disabledPlugins { fmt.Print(Colorize(" "+plugin+" ", BlackFg|RedBg|CrossedOutFm), " ") } diff --git a/plugin.go b/plugin.go index 72a9cc4..941c941 100644 --- a/plugin.go +++ b/plugin.go @@ -45,10 +45,8 @@ func InstallPlugin(config config.Plugin) *Plugin { case *GlobalConfig: v.InitDefaultHttp() default: - plugin.Logger = log.With(zap.String("plugin", name)) Plugins[name] = plugin plugins = append(plugins, plugin) - plugin.Info("install", zap.String("version", plugin.Version)) } return plugin } @@ -67,7 +65,7 @@ type Plugin struct { modifiedYaml string //修改过的配置的yaml文件内容 RawConfig config.Config //最终合并后的配置的map形式方便查询 Modified config.Config //修改过的配置项 - *zap.Logger `json:"-"` + *log.Logger `json:"-"` saveTimer *time.Timer //用于保存的时候的延迟,防抖 Disabled bool } @@ -75,6 +73,8 @@ type Plugin struct { func (opt *Plugin) logHandler(pattern string, handler http.Handler) http.Handler { return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { opt.Debug("visit", zap.String("path", r.URL.String()), zap.String("remote", r.RemoteAddr)) + name := strings.ToLower(opt.Name) + r.URL.Path = strings.TrimPrefix(r.URL.Path, "/"+name) handler.ServeHTTP(rw, r) }) } @@ -87,12 +87,12 @@ func (opt *Plugin) handle(pattern string, handler http.Handler) { pattern = "/" + pattern } if ok { - opt.Debug("http handle added:" + pattern) + opt.Debug("http handle added", zap.String("pattern", pattern)) conf.Handle(pattern, opt.logHandler(pattern, handler)) } if opt != Engine { pattern = "/" + strings.ToLower(opt.Name) + pattern - opt.Debug("http handle added to engine:" + pattern) + opt.Debug("http handle added to engine", zap.String("pattern", pattern)) EngineConfig.Handle(pattern, opt.logHandler(pattern, handler)) } apiList = append(apiList, pattern) @@ -122,7 +122,7 @@ func (opt *Plugin) assign() { if opt.RawConfig == nil { opt.RawConfig = config.Config{} } else if opt.RawConfig["enable"] == false { - opt.Warn("disabled") + opt.Warn("plugin disabled") return } else if opt.RawConfig["enable"] == true { //移除这个属性防止反序列化报错 @@ -214,7 +214,6 @@ func (opt *Plugin) Save() error { } func (opt *Plugin) Publish(streamPath string, pub IPublisher) error { - opt.Info("publish", zap.String("path", streamPath)) conf, ok := opt.Config.(config.PublishConfig) if !ok { conf = EngineConfig @@ -244,12 +243,19 @@ func (opt *Plugin) SubscribeExist(streamPath string, sub ISubscriber) error { // Subscribe 订阅一个流,如果流不存在则创建一个等待流 func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) error { - opt.Info("subscribe", zap.String("path", streamPath)) + suber := sub.GetSubscriber() + if suber == nil { + if EngineConfig.LogLang == "zh" { + return errors.New("不是订阅者") + } else { + return errors.New("not subscriber") + } + } conf, ok := opt.Config.(config.SubscribeConfig) if !ok { conf = EngineConfig } - sub.GetSubscriber().Config = conf.GetSubscribeConfig() + suber.Config = conf.GetSubscribeConfig() return sub.receive(streamPath, sub) } diff --git a/publisher-rtpdump.go b/publisher-rtpdump.go index 94e88d0..dc02fbe 100644 --- a/publisher-rtpdump.go +++ b/publisher-rtpdump.go @@ -5,7 +5,7 @@ import ( "sync" "time" - "github.com/aler9/gortsplib/pkg/mpeg4audio" + "github.com/aler9/gortsplib/v2/pkg/codecs/mpeg4audio" "github.com/pion/webrtc/v3/pkg/media/rtpdump" "go.uber.org/zap" "m7s.live/engine/v4/codec" diff --git a/publisher.go b/publisher.go index 88b3141..ac7bbe4 100644 --- a/publisher.go +++ b/publisher.go @@ -69,7 +69,7 @@ func (p *Publisher) WriteAVCCVideo(ts uint32, frame *util.BLL, pool util.BytesPo case codec.CodecID_H265: p.VideoTrack = track.NewH265(p.Stream, pool) default: - p.Stream.Error("video codecID not support: ", zap.Uint8("codeId", uint8(codecID))) + p.Stream.Error("video codecID not support", zap.Uint8("codeId", uint8(codecID))) return } p.VideoTrack.WriteAVCC(ts, frame) diff --git a/report.go b/report.go deleted file mode 100644 index abe5459..0000000 --- a/report.go +++ /dev/null @@ -1,118 +0,0 @@ -package engine - -import ( - "time" - - "gopkg.in/yaml.v3" - "m7s.live/engine/v4/common" -) - -type ReportCreateStream struct { - StreamPath string - Time int64 -} - -type ReportCloseStream struct { - StreamPath string - Time int64 -} - -type ReportAddTrack struct { - Name string - StreamPath string - Time int64 -} -type ReportTrackInfo struct { - BPS int - FPS int - Drops int - RBSize int -} -type ReportPulse struct { - StreamPath string - Tracks map[string]ReportTrackInfo - Subscribers map[string]struct { - Type string - Readers map[string]struct { - Delay uint32 - } - } - Time int64 -} - -type Reportor struct { - Subscriber - pulse ReportPulse -} - -func (r *Reportor) OnEvent(event any) { - switch v := event.(type) { - case PulseEvent: - r.pulse.Tracks = make(map[string]ReportTrackInfo) - r.pulse.Subscribers = make(map[string]struct { - Type string - Readers map[string]struct { - Delay uint32 - } - }) - r.Stream.Tracks.Range(func(k string, t common.Track) { - track := t.GetBase() - r.pulse.Tracks[k] = ReportTrackInfo{ - BPS: track.BPS, - FPS: track.FPS, - Drops: track.Drops, - RBSize: t.GetRBSize(), - } - }) - r.Stream.Subscribers.RangeAll(func(sub ISubscriber, wait *waitTracks) { - suber := sub.GetSubscriber() - r.pulse.Subscribers[suber.ID] = struct { - Type string - Readers map[string]struct { - Delay uint32 - } - }{Type: suber.Type, Readers: map[string]struct { - Delay uint32 - }{suber.Audio.Name: {Delay: suber.AudioReader.Delay}, suber.Video.Name: {Delay: suber.VideoReader.Delay}}} - }) - r.pulse.Time = time.Now().Unix() - EngineConfig.Report("pulse", r.pulse) - case common.Track: - EngineConfig.Report("addtrack", &ReportAddTrack{v.GetBase().Name, r.Stream.Path, time.Now().Unix()}) - } -} - -func (conf *GlobalConfig) OnEvent(event any) { - if !conf.GetEnableReport() { - conf.Engine.OnEvent(event) - return - } - switch v := event.(type) { - case SEcreate: - conf.Report("create", &ReportCreateStream{v.Target.Path, time.Now().Unix()}) - var reportor Reportor - reportor.IsInternal = true - reportor.pulse.StreamPath = v.Target.Path - if Engine.Subscribe(v.Target.Path, &reportor) == nil { - reportor.SubPulse() - } - case SEpublish: - case SErepublish: - case SEKick: - case SEclose: - conf.Report("close", &ReportCloseStream{v.Target.Path, time.Now().Unix()}) - case SEwaitClose: - case SEwaitPublish: - case ISubscriber: - case UnsubscribeEvent: - default: - conf.Engine.OnEvent(event) - } -} - -func (conf *GlobalConfig) Report(t string, v any) { - out, err := yaml.Marshal(v) - if err == nil { - conf.Engine.OnEvent(append([]byte("type: "+t+"\n"), out...)) - } -} diff --git a/stream.go b/stream.go index c9381bd..95d0ae7 100644 --- a/stream.go +++ b/stream.go @@ -160,7 +160,7 @@ func (tracks *Tracks) MarshalJSON() ([]byte, error) { type Stream struct { timeout *time.Timer //当前状态的超时定时器 actionChan util.SafeChan[any] - *zap.Logger + *log.Logger StartTime time.Time //创建时间 StreamTimeoutConfig Path string @@ -239,7 +239,7 @@ func findOrCreateStream(streamPath string, waitTimeout time.Duration) (s *Stream timeout: time.NewTimer(waitTimeout), } s.Subscribers.Init() - s.Logger = log.With(zap.String("stream", streamPath)) + s.Logger = log.LocaleLogger.With(zap.String("stream", streamPath)) s.Info("created") Streams.Map[streamPath] = s s.actionChan.Init(1) @@ -282,7 +282,7 @@ func (r *Stream) action(action StreamAction) (ok bool) { waitTime = time.Millisecond * 10 //没有订阅者也没有配置发布者等待重连时间,默认10ms后关闭流 } r.timeout.Reset(waitTime) - r.Debug("wait publish", zap.Duration("wait", waitTime)) + r.Debug("wait publisher", zap.Duration("wait timeout", waitTime)) case STATE_PUBLISHING: if len(r.SEHistory) > 1 { stateEvent = SErepublish{event} @@ -362,7 +362,7 @@ func (s *Stream) onSuberClose(sub ISubscriber) { // 流状态处理中枢,包括接收订阅发布指令等 func (s *Stream) run() { EventBus <- SEcreate{StreamEvent{Event[*Stream]{Target: s, Time: time.Now()}}} - pulseTicker := time.NewTicker(time.Second * 5) + pulseTicker := time.NewTicker(EngineConfig.PulseInterval) defer pulseTicker.Stop() pulseSuber := make(map[ISubscriber]struct{}) for { @@ -387,7 +387,7 @@ func (s *Stream) run() { s.Tracks.ModifyRange(func(name string, t Track) { // track 超过一定时间没有更新数据了 if lastWriteTime := t.LastWriteTime(); !lastWriteTime.IsZero() && time.Since(lastWriteTime) > s.PublishTimeout { - s.Warn("track timeout", zap.String("name", name), zap.Time("lastWriteTime", lastWriteTime), zap.Duration("timeout", s.PublishTimeout)) + s.Warn("track timeout", zap.String("name", name), zap.Time("last writetime", lastWriteTime), zap.Duration("timeout", s.PublishTimeout)) delete(s.Tracks.Map.Map, name) var event TrackTimeoutEvent event.Target = t diff --git a/subscriber.go b/subscriber.go index 48e714e..d6b9cf4 100644 --- a/subscriber.go +++ b/subscriber.go @@ -165,7 +165,7 @@ func (s *Subscriber) IsPlaying() bool { } func (s *Subscriber) SubPulse() { - s.Stream.Receive(SubPulse{s}) + s.Stream.Receive(SubPulse{s.Spesific.(ISubscriber)}) } func (s *Subscriber) PlayRaw() { @@ -210,10 +210,11 @@ func (s *Subscriber) PlayBlock(subType byte) { switch subType { case SUBTYPE_RAW: sendVideoFrame = func(frame *AVFrame) { - // fmt.Println("v", frame.Sequence, s.VideoReader.AbsTime, frame.IFrame) + // fmt.Println("v", s.VideoReader.Delay) spesic.OnEvent(VideoFrame{frame, s.Video, s.VideoReader.AbsTime, s.VideoReader.GetPTS32(), s.VideoReader.GetDTS32()}) } sendAudioFrame = func(frame *AVFrame) { + // fmt.Println("a", s.AudioReader.Delay) // fmt.Println("a", frame.Sequence, s.AudioReader.AbsTime) spesic.OnEvent(AudioFrame{frame, s.Audio, s.AudioReader.AbsTime, s.AudioReader.GetPTS32(), s.AudioReader.GetDTS32()}) } diff --git a/subscribers.go b/subscribers.go index 4086efc..8e4d5f2 100644 --- a/subscribers.go +++ b/subscribers.go @@ -49,7 +49,13 @@ func (s *Subscribers) Len() int { return len(s.public) } -func (s *Subscribers) RangeAll(f func(sub ISubscriber, wait *waitTracks)) { +func (s *Subscribers) RangeAll(f func(sub ISubscriber)) { + s.rangeAll(func(sub ISubscriber, wait *waitTracks) { + f(sub) + }) +} + +func (s *Subscribers) rangeAll(f func(sub ISubscriber, wait *waitTracks)) { for sub, wait := range s.internal { f(sub, wait) } @@ -59,7 +65,7 @@ func (s *Subscribers) RangeAll(f func(sub ISubscriber, wait *waitTracks)) { } func (s *Subscribers) OnTrack(track common.Track) { - s.RangeAll(func(sub ISubscriber, wait *waitTracks) { + s.rangeAll(func(sub ISubscriber, wait *waitTracks) { if _, ok := s.waits[wait]; ok { if wait.Accept(track) { delete(s.waits, wait) @@ -71,7 +77,7 @@ func (s *Subscribers) OnTrack(track common.Track) { } func (s *Subscribers) OnPublisherLost(event StateEvent) { - s.RangeAll(func(sub ISubscriber, wait *waitTracks) { + s.rangeAll(func(sub ISubscriber, wait *waitTracks) { if _, ok := s.waits[wait]; ok { wait.Reject(ErrPublisherLost) delete(s.waits, wait) diff --git a/track/base.go b/track/base.go index 3210c30..b31e2de 100644 --- a/track/base.go +++ b/track/base.go @@ -267,7 +267,7 @@ func (av *Media) Flush() { } curValue.DeltaTime = uint32((curValue.Timestamp - preValue.Timestamp) / time.Millisecond) } - // fmt.Println(av.Name, curValue.Timestamp, curValue.DeltaTime) + // fmt.Println(av.Name, curValue.DTS, curValue.Timestamp, curValue.DeltaTime) if curValue.AUList.Length > 0 { // 补完RTP if config.Global.EnableRTP && curValue.RTP.Length == 0 { diff --git a/track/h265.go b/track/h265.go index fa6c4e2..ded05f2 100644 --- a/track/h265.go +++ b/track/h265.go @@ -46,12 +46,14 @@ func (vt *H265) WriteSliceBytes(slice []byte) { case codec.NAL_UNIT_PPS: vt.PPS = slice vt.ParamaterSets[2] = slice - extraData, err := codec.BuildH265SeqHeaderFromVpsSpsPps(vt.VPS, vt.SPS, vt.PPS) - if err == nil { - vt.WriteSequenceHead(extraData) - } else { - vt.Error("H265 BuildH265SeqHeaderFromVpsSpsPps", zap.Error(err)) - vt.Stream.Close() + if vt.VPS != nil && vt.SPS != nil && vt.PPS != nil { + extraData, err := codec.BuildH265SeqHeaderFromVpsSpsPps(vt.VPS, vt.SPS, vt.PPS) + if err == nil { + vt.WriteSequenceHead(extraData) + } else { + vt.Error("H265 BuildH265SeqHeaderFromVpsSpsPps", zap.Error(err)) + vt.Stream.Close() + } } case codec.NAL_UNIT_CODED_SLICE_BLA, diff --git a/track/reader-av.go b/track/reader-av.go index 29a6440..2208383 100644 --- a/track/reader-av.go +++ b/track/reader-av.go @@ -7,6 +7,7 @@ import ( "go.uber.org/zap" "m7s.live/engine/v4/common" + "m7s.live/engine/v4/log" "m7s.live/engine/v4/util" ) @@ -31,7 +32,7 @@ type AVRingReader struct { Frame *common.AVFrame AbsTime uint32 Delay uint32 - *zap.Logger + *log.Logger } func (r *AVRingReader) DecConfChanged() bool { diff --git a/util/socket.go b/util/socket.go index 3b57fc7..fd6512e 100644 --- a/util/socket.go +++ b/util/socket.go @@ -7,6 +7,8 @@ import ( "net" "net/http" "time" + + "gopkg.in/yaml.v3" ) func ReturnJson[T any](fetch func() T, tickDur time.Duration, rw http.ResponseWriter, r *http.Request) { @@ -24,24 +26,21 @@ func ReturnJson[T any](fetch func() T, tickDur time.Duration, rw http.ResponseWr } } -// func GetJsonHandler[T any](fetch func() T, tickDur time.Duration) http.HandlerFunc { -// return func(rw http.ResponseWriter, r *http.Request) { -// if r.URL.Query().Get("json") != "" { -// if err := json.NewEncoder(rw).Encode(fetch()); err != nil { -// rw.WriteHeader(500) -// } -// return -// } -// sse := NewSSE(rw, r.Context()) -// tick := time.NewTicker(tickDur) -// for range tick.C { -// if sse.WriteJSON(fetch()) != nil { -// tick.Stop() -// break -// } -// } -// } -// } +func ReturnYaml[T any](fetch func() T, tickDur time.Duration, rw http.ResponseWriter, r *http.Request) { + if r.Header.Get("Accept") == "text/event-stream" { + sse := NewSSE(rw, r.Context()) + tick := time.NewTicker(tickDur) + defer tick.Stop() + for range tick.C { + if sse.WriteYAML(fetch()) != nil { + return + } + } + } else if err := yaml.NewEncoder(rw).Encode(fetch()); err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + } +} + func ListenUDP(address string, networkBuffer int) (*net.UDPConn, error) { addr, err := net.ResolveUDPAddr("udp", address) diff --git a/util/sse.go b/util/sse.go index 5a29fd7..fb97be7 100644 --- a/util/sse.go +++ b/util/sse.go @@ -6,6 +6,8 @@ import ( "net" "net/http" "os/exec" + + "gopkg.in/yaml.v3" ) var ( @@ -59,6 +61,9 @@ func NewSSE(w http.ResponseWriter, ctx context.Context) *SSE { func (sse *SSE) WriteJSON(data any) error { return json.NewEncoder(sse).Encode(data) } +func (sse *SSE) WriteYAML(data any) error { + return yaml.NewEncoder(sse).Encode(data) +} func (sse *SSE) WriteExec(cmd *exec.Cmd) error { cmd.Stderr = sse cmd.Stdout = sse