mirror of
https://github.com/Monibuca/engine.git
synced 2026-04-22 15:57:03 +08:00
支持多语言日志,增加心跳事件广播,兼容一开始缺少 vps 的流
This commit is contained in:
@@ -38,6 +38,7 @@
|
||||
# 引擎默认配置
|
||||
```yaml
|
||||
global:
|
||||
loglang: zh # 日志语言,可选值:zh,en
|
||||
loglevel: info # 日志级别,可选值:debug,info,warn,error,panic,fatal
|
||||
http:
|
||||
listenaddr: :8080 # 网关地址,用于访问API
|
||||
@@ -47,28 +48,29 @@ global:
|
||||
cors: true # 是否自动添加cors头
|
||||
username: "" # 用户名和密码,用于API访问时的基本身份认证
|
||||
password: ""
|
||||
readtimeout: 0 # 读取超时时间,单位秒,0为不限制
|
||||
writetimeout: 0 # 写入超时时间,单位秒,0为不限制
|
||||
idletimeout: 0 # 空闲超时时间,单位秒,0为不限制
|
||||
readtimeout: 0 # 读取超时时间,0为不限制
|
||||
writetimeout: 0 # 写入超时时间,0为不限制
|
||||
idletimeout: 0 # 空闲超时时间,0为不限制
|
||||
publish:
|
||||
pubaudio: true # 是否发布音频流
|
||||
pubvideo: true # 是否发布视频流
|
||||
kickexist: false # 剔出已经存在的发布者,用于顶替原有发布者
|
||||
publishtimeout: 10 # 发布流默认过期时间单位秒,超过该时间发布者没有恢复流将被删除
|
||||
delayclosetimeout: 0 # 自动关闭触发后延迟的秒数(期间内如果有新的订阅则取消触发关闭),0为关闭该功能,保持连接。
|
||||
publishtimeout: 10s # 发布流默认过期时间,超过该时间发布者没有恢复流将被删除
|
||||
delayclosetimeout: 0 # 自动关闭触发后延迟的时间(期间内如果有新的订阅则取消触发关闭),0为关闭该功能,保持连接。
|
||||
waitclosetimeout: 0 # 发布者断开后等待时间,超过该时间发布者没有恢复流将被删除,0为关闭该功能,由订阅者决定是否删除
|
||||
subscribe:
|
||||
subaudio: true # 是否订阅音频流
|
||||
subvideo: true # 是否订阅视频流
|
||||
iframeonly: false # 只订阅关键帧
|
||||
waittimeout: 10 # 等待发布者的秒数,用于订阅尚未发布的流
|
||||
waittimeout: 10s # 等待发布者超时时间,用于订阅尚未发布的流
|
||||
enableavcc : true # 启用AVCC格式缓存,用于rtmp协议
|
||||
enablertp : true # 启用rtp格式缓存,用于rtsp、websocket、gb28181协议
|
||||
enableauth: true # 启用鉴权,详细查看鉴权机制
|
||||
enablesubevent: true # 启用订阅事件,用于订阅者上下线事件,关闭可以提高性能
|
||||
rtpreoderbufferlen: 50 # rtp乱序重排缓存长度
|
||||
speedlimit: 0 # 限速超时时间(毫秒)0为不限速,对于读取文件这类流需要限速,否则读取过快
|
||||
rtpreroderbufferlen: 50 # rtp乱序重排缓存长度
|
||||
speedlimit: 500ms # 限速超时时间 0为不限速,对于读取文件这类流需要限速,否则读取过快
|
||||
eventbussize: 10 # 事件总线缓存大小,事件较多时容易堵阻塞线程,需要增大缓存
|
||||
pulseinterval: 5s # 心跳事件间隔时间
|
||||
console:
|
||||
server : console.monibuca.com:4242 # 连接远程控制台的地址
|
||||
secret: "" # 远程控制台的秘钥
|
||||
|
||||
+18
-6
@@ -3,6 +3,7 @@ package config
|
||||
import (
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -43,11 +44,11 @@ func (config Config) CreateElem(eleType reflect.Type) reflect.Value {
|
||||
}
|
||||
|
||||
func (config Config) Unmarshal(s any) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
log.Error("Unmarshal error:", err)
|
||||
}
|
||||
}()
|
||||
// defer func() {
|
||||
// if err := recover(); err != nil {
|
||||
// log.Error("Unmarshal error:", err)
|
||||
// }
|
||||
// }()
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
@@ -106,6 +107,13 @@ func (config Config) Unmarshal(s any) {
|
||||
fv.SetInt(0)
|
||||
} else if d, err := time.ParseDuration(value.String()); err == nil {
|
||||
fv.SetInt(int64(d))
|
||||
} else {
|
||||
if Global.LogLang == "zh" {
|
||||
log.Errorf("%s 无效的时间值: %v 请添加单位(s,m,h,d),例如:100ms, 10s, 4m, 1h", k, value)
|
||||
} else {
|
||||
log.Errorf("%s invalid duration value: %v please add unit (s,m,h,d),eg: 100ms, 10s, 4m, 1h", k, value)
|
||||
}
|
||||
os.Exit(1)
|
||||
}
|
||||
continue
|
||||
}
|
||||
@@ -176,7 +184,11 @@ func (config Config) Merge(source Config) {
|
||||
case Config:
|
||||
m.Merge(v.(Config))
|
||||
default:
|
||||
log.Debug("merge", k, v)
|
||||
if Global.LogLang == "zh" {
|
||||
log.Debug("合并配置", k, ":", v)
|
||||
} else {
|
||||
log.Debug("merge", k, ":", v)
|
||||
}
|
||||
config[k] = v
|
||||
}
|
||||
} else {
|
||||
|
||||
+10
-2
@@ -67,7 +67,11 @@ func (config *HTTP) Listen(ctx context.Context) error {
|
||||
var g errgroup.Group
|
||||
if config.ListenAddrTLS != "" && (config == &Global.HTTP || config.ListenAddrTLS != Global.ListenAddrTLS) {
|
||||
g.Go(func() error {
|
||||
log.Info("🌐 https listen at ", Blink(config.ListenAddrTLS))
|
||||
if Global.LogLang == "zh" {
|
||||
log.Info("🌐 https 监听在 ", Blink(config.ListenAddrTLS))
|
||||
} else {
|
||||
log.Info("🌐 https listen at ", Blink(config.ListenAddrTLS))
|
||||
}
|
||||
var server = http.Server{
|
||||
Addr: config.ListenAddrTLS,
|
||||
ReadTimeout: config.ReadTimeout,
|
||||
@@ -80,7 +84,11 @@ func (config *HTTP) Listen(ctx context.Context) error {
|
||||
}
|
||||
if config.ListenAddr != "" && (config == &Global.HTTP || config.ListenAddr != Global.ListenAddr) {
|
||||
g.Go(func() error {
|
||||
log.Info("🌐 http listen at ", Blink(config.ListenAddr))
|
||||
if Global.LogLang == "zh" {
|
||||
log.Info("🌐 http 监听在 ", Blink(config.ListenAddr))
|
||||
} else {
|
||||
log.Info("🌐 http listen at ", Blink(config.ListenAddr))
|
||||
}
|
||||
var server = http.Server{
|
||||
Addr: config.ListenAddr,
|
||||
ReadTimeout: config.ReadTimeout,
|
||||
|
||||
+15
-3
@@ -58,11 +58,19 @@ func (cfg *Engine) Remote(ctx context.Context) error {
|
||||
var rMessage map[string]any
|
||||
if err = json.Unmarshal(msg[:len(msg)-1], &rMessage); err == nil {
|
||||
if rMessage["code"].(float64) != 0 {
|
||||
log.Error("response from console server ", cfg.Server, " ", rMessage["msg"])
|
||||
if Global.LogLang == "zh" {
|
||||
log.Error("控制台服务器", cfg.Server, "返回错误", rMessage["msg"])
|
||||
} else {
|
||||
log.Error("response from console server ", cfg.Server, " ", rMessage["msg"])
|
||||
}
|
||||
return nil
|
||||
} else {
|
||||
cfg.reportStream = stream
|
||||
log.Info("response from console server ", cfg.Server, " success ", rMessage)
|
||||
if Global.LogLang == "zh" {
|
||||
log.Info("连接到控制台服务器", cfg.Server, "成功", rMessage)
|
||||
} else {
|
||||
log.Info("response from console server ", cfg.Server, " success ", rMessage)
|
||||
}
|
||||
if v, ok := rMessage["enableReport"]; ok {
|
||||
cfg.enableReport = v.(bool)
|
||||
}
|
||||
@@ -85,7 +93,11 @@ func (cfg *Engine) Remote(ctx context.Context) error {
|
||||
|
||||
if err != nil {
|
||||
if wasConnected {
|
||||
log.Error("connect to console server ", cfg.Server, " ", err)
|
||||
if Global.LogLang == "zh" {
|
||||
log.Error("连接到控制台服务器", cfg.Server, "失败", err)
|
||||
} else {
|
||||
log.Error("connect to console server ", cfg.Server, " ", err)
|
||||
}
|
||||
}
|
||||
if ctx.Err() == nil {
|
||||
go cfg.Remote(ctx)
|
||||
|
||||
+5
-1
@@ -42,7 +42,11 @@ func (tcp *TCP) listen(l net.Listener, handler func(*net.TCPConn)) {
|
||||
func (tcp *TCP) Listen(ctx context.Context, plugin TCPPlugin) error {
|
||||
l, err := net.Listen("tcp", tcp.ListenAddr)
|
||||
if err != nil {
|
||||
log.Fatalf("%s: Listen error: %v", tcp.ListenAddr, err)
|
||||
if Global.LogLang == "zh" {
|
||||
log.Fatalf("%s: 监听失败: %v", tcp.ListenAddr, err)
|
||||
} else {
|
||||
log.Fatalf("%s: Listen error: %v", tcp.ListenAddr, err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
count := tcp.ListenNum
|
||||
|
||||
+3
-1
@@ -117,10 +117,12 @@ type Engine struct {
|
||||
EnableSubEvent bool `default:"true"` //启用订阅事件,禁用可以提高性能
|
||||
EnableAuth bool `default:"true"` //启用鉴权
|
||||
Console
|
||||
LogLevel string `default:"info"`
|
||||
LogLang string `default:"zh"` //日志语言
|
||||
LogLevel string `default:"info"` //日志级别
|
||||
RTPReorderBufferLen int `default:"50"` //RTP重排序缓冲长度
|
||||
SpeedLimit time.Duration `default:"500ms"` //速度限制最大等待时间
|
||||
EventBusSize int `default:"10"` //事件总线大小
|
||||
PulseInterval time.Duration `default:"5s"` //心跳事件间隔
|
||||
enableReport bool `default:"false"` //启用报告,用于统计和监控
|
||||
reportStream quic.Stream // console server connection
|
||||
instanceId string // instance id 来自console
|
||||
|
||||
@@ -3,6 +3,7 @@ module m7s.live/engine/v4
|
||||
go 1.19
|
||||
|
||||
require (
|
||||
github.com/aler9/gortsplib/v2 v2.2.2
|
||||
github.com/cnotch/ipchub v1.1.0
|
||||
github.com/google/uuid v1.3.0
|
||||
github.com/logrusorgru/aurora v2.0.3+incompatible
|
||||
@@ -13,7 +14,7 @@ require (
|
||||
github.com/quic-go/quic-go v0.32.0
|
||||
github.com/shirou/gopsutil/v3 v3.22.10
|
||||
go.uber.org/zap v1.23.0
|
||||
golang.org/x/net v0.4.0
|
||||
golang.org/x/net v0.8.0
|
||||
golang.org/x/sync v0.1.0
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
)
|
||||
@@ -45,6 +46,6 @@ require (
|
||||
golang.org/x/crypto v0.4.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect
|
||||
golang.org/x/mod v0.7.0 // indirect
|
||||
golang.org/x/sys v0.3.0 // indirect
|
||||
golang.org/x/sys v0.6.0 // indirect
|
||||
golang.org/x/tools v0.3.0 // indirect
|
||||
)
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/aler9/gortsplib/v2 v2.2.2 h1:tTw8pdKSOEjlZjjE1S4ftXPHJkYOqjNNv3hjQ0Nto9M=
|
||||
github.com/aler9/gortsplib/v2 v2.2.2/go.mod h1:k6uBVHGwsIc/0L5SLLqWwi6bSJUb4VR0HfvncyHlKQI=
|
||||
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
|
||||
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
|
||||
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
|
||||
@@ -138,8 +140,8 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
|
||||
github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk=
|
||||
github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM=
|
||||
github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI=
|
||||
@@ -193,8 +195,8 @@ golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su
|
||||
golang.org/x/net v0.0.0-20220531201128-c960675eff93/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
|
||||
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
|
||||
golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
|
||||
golang.org/x/net v0.4.0 h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU=
|
||||
golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
|
||||
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
|
||||
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
@@ -228,8 +230,8 @@ golang.org/x/sys v0.0.0-20220622161953-175b2fd9d664/go.mod h1:oPkhp1MJrh7nUepCBc
|
||||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
|
||||
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||
@@ -239,7 +241,7 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
|
||||
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
|
||||
golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM=
|
||||
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
|
||||
@@ -29,18 +29,29 @@ func (conf *GlobalConfig) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
func fetchSummary() *Summary {
|
||||
return &summary
|
||||
}
|
||||
|
||||
func (conf *GlobalConfig) API_summary(rw http.ResponseWriter, r *http.Request) {
|
||||
format := r.URL.Query().Get("format")
|
||||
if r.Header.Get("Accept") == "text/event-stream" {
|
||||
summary.Add()
|
||||
defer summary.Done()
|
||||
util.ReturnJson(func() *Summary {
|
||||
return &summary
|
||||
}, time.Second, rw, r)
|
||||
if format == "yaml" {
|
||||
util.ReturnYaml(fetchSummary, time.Second, rw, r)
|
||||
} else {
|
||||
util.ReturnJson(fetchSummary, time.Second, rw, r)
|
||||
}
|
||||
} else {
|
||||
if !summary.Running() {
|
||||
summary.collect()
|
||||
}
|
||||
if err := json.NewEncoder(rw).Encode(&summary); err != nil {
|
||||
if format == "yaml" {
|
||||
if err := yaml.NewEncoder(rw).Encode(&summary); err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
} else if err := json.NewEncoder(rw).Encode(&summary); err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
"go.uber.org/zap"
|
||||
"m7s.live/engine/v4/config"
|
||||
"m7s.live/engine/v4/log"
|
||||
"m7s.live/engine/v4/util"
|
||||
)
|
||||
|
||||
@@ -35,7 +36,7 @@ type IO struct {
|
||||
Type string
|
||||
context.Context `json:"-"` //不要直接设置,应当通过OnEvent传入父级Context
|
||||
context.CancelFunc `json:"-"` //流关闭是关闭发布者或者订阅者
|
||||
*zap.Logger `json:"-"`
|
||||
*log.Logger `json:"-"`
|
||||
StartTime time.Time //创建时间
|
||||
Stream *Stream `json:"-"`
|
||||
io.Reader `json:"-"`
|
||||
@@ -67,7 +68,7 @@ func (i *IO) SetParentCtx(parent context.Context) {
|
||||
i.Context, i.CancelFunc = context.WithCancel(parent)
|
||||
}
|
||||
|
||||
func (i *IO) SetLogger(logger *zap.Logger) {
|
||||
func (i *IO) SetLogger(logger *log.Logger) {
|
||||
i.Logger = logger
|
||||
}
|
||||
|
||||
@@ -97,7 +98,7 @@ type IIO interface {
|
||||
Stop()
|
||||
SetIO(any)
|
||||
SetParentCtx(context.Context)
|
||||
SetLogger(*zap.Logger)
|
||||
SetLogger(*log.Logger)
|
||||
IsShutdown() bool
|
||||
}
|
||||
|
||||
@@ -126,7 +127,11 @@ func (io *IO) receive(streamPath string, specific IIO) error {
|
||||
streamPath = strings.Trim(streamPath, "/")
|
||||
u, err := url.Parse(streamPath)
|
||||
if err != nil {
|
||||
io.Error("receive streamPath wrong format", zap.String("streamPath", streamPath), zap.Error(err))
|
||||
if EngineConfig.LogLang == "zh" {
|
||||
io.Error("接收流路径(流唯一标识)格式错误,必须形如 live/test ", zap.String("流路径", streamPath), zap.Error(err))
|
||||
} else {
|
||||
io.Error("receive streamPath wrong format", zap.String("streamPath", streamPath), zap.Error(err))
|
||||
}
|
||||
return err
|
||||
}
|
||||
io.Args = u.Query()
|
||||
@@ -156,6 +161,7 @@ func (io *IO) receive(streamPath string, specific IIO) error {
|
||||
if v, ok := specific.(IPublisher); ok {
|
||||
conf := v.GetPublisher().Config
|
||||
io.Type = strings.TrimSuffix(io.Type, "Publisher")
|
||||
io.Info("publish")
|
||||
oldPublisher := s.Publisher
|
||||
if oldPublisher != nil && !oldPublisher.IsClosed() {
|
||||
// 根据配置是否剔出原来的发布者
|
||||
@@ -200,6 +206,7 @@ func (io *IO) receive(streamPath string, specific IIO) error {
|
||||
}
|
||||
} else {
|
||||
io.Type = strings.TrimSuffix(io.Type, "Subscriber")
|
||||
io.Info("subscribe")
|
||||
if create {
|
||||
EventBus <- s // 通知发布者按需拉流
|
||||
}
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
package lang
|
||||
|
||||
import (
|
||||
_ "embed"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
//go:embed zh.yaml
|
||||
var zhYaml []byte
|
||||
var zh map[string]string
|
||||
|
||||
func init() {
|
||||
yaml.Unmarshal(zhYaml, &zh)
|
||||
}
|
||||
|
||||
func Get(lang string) map[string]string {
|
||||
if lang == "zh" {
|
||||
return zh
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Update(lang string, key string, value string) {
|
||||
if lang == "zh" {
|
||||
zh[key] = value
|
||||
}
|
||||
}
|
||||
|
||||
func Merge(lang string, data map[string]string) {
|
||||
if lang == "zh" {
|
||||
for k, v := range data {
|
||||
zh[k] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,50 @@
|
||||
engine: 引擎
|
||||
config: 配置
|
||||
stream: 流
|
||||
publish: 发布
|
||||
subscribe: 订阅
|
||||
pull: 拉取
|
||||
push: 推送
|
||||
action: 动作
|
||||
type: 类型
|
||||
enable: 启用
|
||||
listenaddr: 监听地址
|
||||
kick: 踢出
|
||||
stop: 停止
|
||||
start: 开始
|
||||
install: 安装
|
||||
version: 版本
|
||||
name: 名称
|
||||
state: 状态
|
||||
initialize: 初始化
|
||||
"start read": 开始读取
|
||||
"wait publisher": 等待发布者发布
|
||||
"wait timeout": 等待超时
|
||||
created: 已创建
|
||||
create: 创建
|
||||
timeout: 超时
|
||||
track: 轨道
|
||||
"track timeout": 轨道超时
|
||||
"last writetime": 最后写入时间
|
||||
track+1: 轨道+1
|
||||
playblock: 阻塞式播放
|
||||
"play neither video nor audio": 播放既没有视频也没有音频
|
||||
"play before subscribe": 播放之前需要先订阅
|
||||
"suber -1": 订阅者-1
|
||||
"suber +1": 订阅者+1
|
||||
"innersuber +1": 内部订阅者+1
|
||||
reamins: 剩余
|
||||
"need sequence frame": 需要序列帧
|
||||
"video codecID not support": 视频编码不支持
|
||||
"http handle added": http处理器已添加
|
||||
"http handle added to engine": http处理器已添加到引擎
|
||||
"plugin disabled": 插件已禁用
|
||||
"pull failed": 拉取失败
|
||||
"audio codec not support yet": 音频编码暂不支持
|
||||
"video track attached": 视频轨道已附加
|
||||
"audio track attached": 音频轨道已附加
|
||||
"first frame read": 第一帧已读取
|
||||
firstTs: 第一帧时间戳
|
||||
firstSeq: 第一帧序列号
|
||||
skipSeq: 跳过序列号
|
||||
skipTs: 跳过时间戳
|
||||
+78
-365
@@ -3,408 +3,121 @@ package log
|
||||
import (
|
||||
// . "github.com/logrusorgru/aurora"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
// "github.com/mattn/go-colorable"
|
||||
"gopkg.in/yaml.v3"
|
||||
|
||||
// log "github.com/sirupsen/logrus"
|
||||
. "github.com/logrusorgru/aurora"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
||||
var sugaredLogger *zap.SugaredLogger
|
||||
var logger *zap.Logger
|
||||
|
||||
// var levelColors = []func(any) Value{Red, Red, Red, Yellow, Blue, Green, White}
|
||||
|
||||
// type LogWriter func(*log.Entry) string
|
||||
|
||||
// var colorableStdout = colorable.NewColorableStdout()
|
||||
type MultipleWriter []io.Writer
|
||||
|
||||
func (m *MultipleWriter) Write(p []byte) (n int, err error) {
|
||||
for _, w := range *m {
|
||||
n, err = w.Write(p)
|
||||
if err != nil {
|
||||
m.Delete(w)
|
||||
}
|
||||
}
|
||||
return len(p), nil
|
||||
}
|
||||
func (m *MultipleWriter) Delete(writer io.Writer) {
|
||||
for i, w := range *m {
|
||||
if w == writer {
|
||||
*m = append((*m)[:i], (*m)[i+1:]...)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
func (m *MultipleWriter) Add(writer io.Writer) {
|
||||
*m = append(*m, writer)
|
||||
}
|
||||
|
||||
var multipleWriter = &MultipleWriter{os.Stdout}
|
||||
var Config = zap.NewDevelopmentConfig()
|
||||
func AddWriter(writer io.Writer) {
|
||||
multipleWriter.Add(writer)
|
||||
}
|
||||
func DeleteWriter(writer io.Writer) {
|
||||
multipleWriter.Delete(writer)
|
||||
}
|
||||
func init() {
|
||||
// std.SetOutput(colorableStdout)
|
||||
// std.SetFormatter(LogWriter(defaultFormatter))
|
||||
Config.EncoderConfig.NewReflectedEncoder = func(w io.Writer) zapcore.ReflectedEncoder {
|
||||
var engineConfig = zapcore.EncoderConfig{
|
||||
// Keys can be anything except the empty string.
|
||||
TimeKey: "T",
|
||||
LevelKey: "L",
|
||||
NameKey: "N",
|
||||
CallerKey: "C",
|
||||
FunctionKey: zapcore.OmitKey,
|
||||
MessageKey: "M",
|
||||
StacktraceKey: "S",
|
||||
LineEnding: zapcore.DefaultLineEnding,
|
||||
EncodeLevel: zapcore.CapitalColorLevelEncoder,
|
||||
EncodeTime: zapcore.TimeEncoderOfLayout("15:04:05"),
|
||||
EncodeDuration: zapcore.StringDurationEncoder,
|
||||
EncodeCaller: zapcore.ShortCallerEncoder,
|
||||
EncodeName: NameEncoder,
|
||||
NewReflectedEncoder: func(w io.Writer) zapcore.ReflectedEncoder {
|
||||
return yaml.NewEncoder(w)
|
||||
}
|
||||
Config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
|
||||
Config.EncoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("15:04:05")
|
||||
logger = zap.New(
|
||||
zapcore.NewCore(zapcore.NewConsoleEncoder(Config.EncoderConfig), zapcore.AddSync(multipleWriter), Config.Level),
|
||||
)
|
||||
sugaredLogger = logger.Sugar()
|
||||
},
|
||||
}
|
||||
var LogLevel = zap.NewAtomicLevelAt(zap.DebugLevel)
|
||||
var logger *zap.Logger = zap.New(
|
||||
zapcore.NewCore(zapcore.NewConsoleEncoder(engineConfig), zapcore.AddSync(multipleWriter), LogLevel),
|
||||
)
|
||||
var sugaredLogger *zap.SugaredLogger = logger.Sugar()
|
||||
var LocaleLogger *Logger
|
||||
|
||||
func NameEncoder(loggerName string, enc zapcore.PrimitiveArrayEncoder) {
|
||||
enc.AppendString(Colorize(loggerName, WhiteFg|BlackBg).String())
|
||||
}
|
||||
|
||||
type Zap interface {
|
||||
With(fields ...zap.Field) *zap.Logger
|
||||
Lang(lang map[string]string) *Logger
|
||||
Named(name string) *Logger
|
||||
With(fields ...zap.Field) *Logger
|
||||
Debug(msg string, fields ...zap.Field)
|
||||
Info(msg string, fields ...zap.Field)
|
||||
Warn(msg string, fields ...zap.Field)
|
||||
Error(msg string, fields ...zap.Field)
|
||||
}
|
||||
|
||||
func With(fields ...zap.Field) *zap.Logger {
|
||||
return logger.With(fields...)
|
||||
type Logger struct {
|
||||
*zap.Logger
|
||||
lang map[string]string
|
||||
}
|
||||
|
||||
func Debug(args ...any) {
|
||||
sugaredLogger.Debug(args...)
|
||||
func (l Logger) Lang(lang map[string]string) *Logger {
|
||||
l.Logger = logger
|
||||
l.lang = lang
|
||||
return &l
|
||||
}
|
||||
|
||||
func Info(args ...any) {
|
||||
sugaredLogger.Info(args...)
|
||||
func (l Logger) Named(name string) *Logger {
|
||||
l.Logger = l.Logger.Named(name)
|
||||
return &l
|
||||
}
|
||||
|
||||
func Warn(args ...any) {
|
||||
sugaredLogger.Warn(args...)
|
||||
func (l Logger) With(fields ...zap.Field) *Logger {
|
||||
for i, field := range fields {
|
||||
if v, ok := l.lang[field.Key]; ok {
|
||||
fields[i].Key = v
|
||||
}
|
||||
}
|
||||
l.Logger = l.Logger.With(fields...)
|
||||
return &l
|
||||
}
|
||||
|
||||
func Error(args ...any) {
|
||||
sugaredLogger.Error(args...)
|
||||
func (l *Logger) formatLang(msg *string, fields []zapcore.Field) {
|
||||
if l.lang != nil {
|
||||
if v, ok := l.lang[*msg]; ok {
|
||||
*msg = v
|
||||
}
|
||||
for i, field := range fields {
|
||||
if v, ok := l.lang[field.Key]; ok {
|
||||
fields[i].Key = v
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func Debugf(format string, args ...interface{}) {
|
||||
sugaredLogger.Debugf(format, args...)
|
||||
func (l *Logger) Debug(msg string, fields ...zap.Field) {
|
||||
l.formatLang(&msg, fields)
|
||||
l.Logger.Debug(msg, fields...)
|
||||
}
|
||||
|
||||
// Infof logs a message at level Info on the standard logger.
|
||||
func Infof(format string, args ...interface{}) {
|
||||
sugaredLogger.Infof(format, args...)
|
||||
func (l *Logger) Info(msg string, fields ...zap.Field) {
|
||||
l.formatLang(&msg, fields)
|
||||
l.Logger.Info(msg, fields...)
|
||||
}
|
||||
|
||||
// Warnf logs a message at level Warn on the standard logger.
|
||||
func Warnf(format string, args ...interface{}) {
|
||||
sugaredLogger.Warnf(format, args...)
|
||||
func (l *Logger) Warn(msg string, fields ...zap.Field) {
|
||||
l.formatLang(&msg, fields)
|
||||
l.Logger.Warn(msg, fields...)
|
||||
}
|
||||
|
||||
// Errorf logs a message at level Error on the standard logger.
|
||||
func Errorf(format string, args ...interface{}) {
|
||||
sugaredLogger.Errorf(format, args...)
|
||||
func (l *Logger) Error(msg string, fields ...zap.Field) {
|
||||
l.formatLang(&msg, fields)
|
||||
l.Logger.Error(msg, fields...)
|
||||
}
|
||||
|
||||
// Panicf logs a message at level Panic on the standard logger.
|
||||
func Panicf(format string, args ...interface{}) {
|
||||
sugaredLogger.Panicf(format, args...)
|
||||
func (l *Logger) Fatal(msg string, fields ...zap.Field) {
|
||||
l.formatLang(&msg, fields)
|
||||
l.Logger.Fatal(msg, fields...)
|
||||
}
|
||||
|
||||
// Fatalf logs a message at level Fatal on the standard logger then the process will exit with status set to 1.
|
||||
func Fatalf(format string, args ...interface{}) {
|
||||
sugaredLogger.Fatalf(format, args...)
|
||||
func (l *Logger) Panic(msg string, fields ...zap.Field) {
|
||||
l.formatLang(&msg, fields)
|
||||
l.Logger.Panic(msg, fields...)
|
||||
}
|
||||
|
||||
// func defaultFormatter(entry *log.Entry) string {
|
||||
// pl := entry.Data["plugin"]
|
||||
// if pl == nil {
|
||||
// pl = "Engine"
|
||||
// }
|
||||
// l := strings.ToUpper(entry.Level.String())[:1]
|
||||
// var props string
|
||||
// if stream := entry.Data["stream"]; stream != nil {
|
||||
// props = Sprintf("[s:%s] ", stream)
|
||||
// }
|
||||
// if puber := entry.Data["puber"]; puber != nil {
|
||||
// props += Sprintf("[pub:%s] ", puber)
|
||||
// }
|
||||
// if suber := entry.Data["suber"]; suber != nil {
|
||||
// props += Sprintf("[sub:%s] ", suber)
|
||||
// }
|
||||
// return Sprintf(levelColors[entry.Level]("%s [%s] [%s]\t %s%s\n"), l, entry.Time.Format("15:04:05"), pl, props, entry.Message)
|
||||
// }
|
||||
|
||||
// func (f LogWriter) Format(entry *log.Entry) (b []byte, err error) {
|
||||
// return []byte(f(entry)), nil
|
||||
// }
|
||||
|
||||
// var (
|
||||
// // std is the name of the standard logger in stdlib `log`
|
||||
// std = log.New()
|
||||
// )
|
||||
|
||||
// func StandardLogger() *log.Logger {
|
||||
// return std
|
||||
// }
|
||||
|
||||
// // SetOutput sets the standard logger output.
|
||||
// func SetOutput(out io.Writer) {
|
||||
// std.SetOutput(out)
|
||||
// }
|
||||
|
||||
// // SetFormatter sets the standard logger formatter.
|
||||
// func SetFormatter(formatter log.Formatter) {
|
||||
// std.SetFormatter(formatter)
|
||||
// }
|
||||
|
||||
// // SetReportCaller sets whether the standard logger will include the calling
|
||||
// // method as a field.
|
||||
// func SetReportCaller(include bool) {
|
||||
// std.SetReportCaller(include)
|
||||
// }
|
||||
|
||||
// // SetLevel sets the standard logger level.
|
||||
// func SetLevel(level log.Level) {
|
||||
// std.SetLevel(level)
|
||||
// }
|
||||
|
||||
// // GetLevel returns the standard logger level.
|
||||
// func GetLevel() log.Level {
|
||||
// return std.GetLevel()
|
||||
// }
|
||||
|
||||
// // IsLevelEnabled checks if the log level of the standard logger is greater than the level param
|
||||
// func IsLevelEnabled(level log.Level) bool {
|
||||
// return std.IsLevelEnabled(level)
|
||||
// }
|
||||
|
||||
// // AddHook adds a hook to the standard logger hooks.
|
||||
// func AddHook(hook log.Hook) {
|
||||
// std.AddHook(hook)
|
||||
// }
|
||||
|
||||
// // WithError creates an entry from the standard logger and adds an error to it, using the value defined in ErrorKey as key.
|
||||
// func WithError(err error) *log.Entry {
|
||||
// return std.WithField(log.ErrorKey, err)
|
||||
// }
|
||||
|
||||
// // WithContext creates an entry from the standard logger and adds a context to it.
|
||||
// func WithContext(ctx context.Context) *log.Entry {
|
||||
// return std.WithContext(ctx)
|
||||
// }
|
||||
|
||||
// // WithField creates an entry from the standard logger and adds a field to
|
||||
// // it. If you want multiple fields, use `WithFields`.
|
||||
// //
|
||||
// // Note that it doesn't log until you call Debug, Print, Info, Warn, Fatal
|
||||
// // or Panic on the Entry it returns.
|
||||
// func WithField(key string, value interface{}) *log.Entry {
|
||||
// return std.WithField(key, value)
|
||||
// }
|
||||
|
||||
// // WithFields creates an entry from the standard logger and adds multiple
|
||||
// // fields to it. This is simply a helper for `WithField`, invoking it
|
||||
// // once for each field.
|
||||
// //
|
||||
// // Note that it doesn't log until you call Debug, Print, Info, Warn, Fatal
|
||||
// // or Panic on the Entry it returns.
|
||||
// func WithFields(fields log.Fields) *log.Entry {
|
||||
// return std.WithFields(fields)
|
||||
// }
|
||||
|
||||
// // WithTime creates an entry from the standard logger and overrides the time of
|
||||
// // logs generated with it.
|
||||
// //
|
||||
// // Note that it doesn't log until you call Debug, Print, Info, Warn, Fatal
|
||||
// // or Panic on the Entry it returns.
|
||||
// func WithTime(t time.Time) *log.Entry {
|
||||
// return std.WithTime(t)
|
||||
// }
|
||||
|
||||
// // Trace logs a message at level Trace on the standard logger.
|
||||
// func Trace(args ...interface{}) {
|
||||
// std.Trace(args...)
|
||||
// }
|
||||
|
||||
// // Debug logs a message at level Debug on the standard logger.
|
||||
// func Debug(args ...interface{}) {
|
||||
// std.Debug(args...)
|
||||
// }
|
||||
|
||||
// // Print logs a message at level Info on the standard logger.
|
||||
// func Print(args ...interface{}) {
|
||||
// std.Print(args...)
|
||||
// }
|
||||
|
||||
// // Info logs a message at level Info on the standard logger.
|
||||
// func Info(args ...interface{}) {
|
||||
// std.Info(args...)
|
||||
// }
|
||||
|
||||
// // Warn logs a message at level Warn on the standard logger.
|
||||
// func Warn(args ...interface{}) {
|
||||
// std.Warn(args...)
|
||||
// }
|
||||
|
||||
// // Warning logs a message at level Warn on the standard logger.
|
||||
// func Warning(args ...interface{}) {
|
||||
// std.Warning(args...)
|
||||
// }
|
||||
|
||||
// // Error logs a message at level Error on the standard logger.
|
||||
// func Error(args ...interface{}) {
|
||||
// std.Error(args...)
|
||||
// }
|
||||
|
||||
// // Panic logs a message at level Panic on the standard logger.
|
||||
// func Panic(args ...interface{}) {
|
||||
// std.Panic(args...)
|
||||
// }
|
||||
|
||||
// // Fatal logs a message at level Fatal on the standard logger then the process will exit with status set to 1.
|
||||
// func Fatal(args ...interface{}) {
|
||||
// std.Fatal(args...)
|
||||
// }
|
||||
|
||||
// // TraceFn logs a message from a func at level Trace on the standard logger.
|
||||
// func TraceFn(fn log.LogFunction) {
|
||||
// std.TraceFn(fn)
|
||||
// }
|
||||
|
||||
// // DebugFn logs a message from a func at level Debug on the standard logger.
|
||||
// func DebugFn(fn log.LogFunction) {
|
||||
// std.DebugFn(fn)
|
||||
// }
|
||||
|
||||
// // PrintFn logs a message from a func at level Info on the standard logger.
|
||||
// func PrintFn(fn log.LogFunction) {
|
||||
// std.PrintFn(fn)
|
||||
// }
|
||||
|
||||
// // InfoFn logs a message from a func at level Info on the standard logger.
|
||||
// func InfoFn(fn log.LogFunction) {
|
||||
// std.InfoFn(fn)
|
||||
// }
|
||||
|
||||
// // WarnFn logs a message from a func at level Warn on the standard logger.
|
||||
// func WarnFn(fn log.LogFunction) {
|
||||
// std.WarnFn(fn)
|
||||
// }
|
||||
|
||||
// // WarningFn logs a message from a func at level Warn on the standard logger.
|
||||
// func WarningFn(fn log.LogFunction) {
|
||||
// std.WarningFn(fn)
|
||||
// }
|
||||
|
||||
// // ErrorFn logs a message from a func at level Error on the standard logger.
|
||||
// func ErrorFn(fn log.LogFunction) {
|
||||
// std.ErrorFn(fn)
|
||||
// }
|
||||
|
||||
// // PanicFn logs a message from a func at level Panic on the standard logger.
|
||||
// func PanicFn(fn log.LogFunction) {
|
||||
// std.PanicFn(fn)
|
||||
// }
|
||||
|
||||
// // FatalFn logs a message from a func at level Fatal on the standard logger then the process will exit with status set to 1.
|
||||
// func FatalFn(fn log.LogFunction) {
|
||||
// std.FatalFn(fn)
|
||||
// }
|
||||
|
||||
// // Tracef logs a message at level Trace on the standard logger.
|
||||
// func Tracef(format string, args ...interface{}) {
|
||||
// std.Tracef(format, args...)
|
||||
// }
|
||||
|
||||
// // Debugf logs a message at level Debug on the standard logger.
|
||||
// func Debugf(format string, args ...interface{}) {
|
||||
// std.Debugf(format, args...)
|
||||
// }
|
||||
|
||||
// // Printf logs a message at level Info on the standard logger.
|
||||
// func Printf(format string, args ...interface{}) {
|
||||
// std.Printf(format, args...)
|
||||
// }
|
||||
|
||||
// // Infof logs a message at level Info on the standard logger.
|
||||
// func Infof(format string, args ...interface{}) {
|
||||
// std.Infof(format, args...)
|
||||
// }
|
||||
|
||||
// // Warnf logs a message at level Warn on the standard logger.
|
||||
// func Warnf(format string, args ...interface{}) {
|
||||
// std.Warnf(format, args...)
|
||||
// }
|
||||
|
||||
// // Warningf logs a message at level Warn on the standard logger.
|
||||
// func Warningf(format string, args ...interface{}) {
|
||||
// std.Warningf(format, args...)
|
||||
// }
|
||||
|
||||
// // Errorf logs a message at level Error on the standard logger.
|
||||
// func Errorf(format string, args ...interface{}) {
|
||||
// std.Errorf(format, args...)
|
||||
// }
|
||||
|
||||
// // Panicf logs a message at level Panic on the standard logger.
|
||||
// func Panicf(format string, args ...interface{}) {
|
||||
// std.Panicf(format, args...)
|
||||
// }
|
||||
|
||||
// // Fatalf logs a message at level Fatal on the standard logger then the process will exit with status set to 1.
|
||||
// func Fatalf(format string, args ...interface{}) {
|
||||
// std.Fatalf(format, args...)
|
||||
// }
|
||||
|
||||
// // Traceln logs a message at level Trace on the standard logger.
|
||||
// func Traceln(args ...interface{}) {
|
||||
// std.Traceln(args...)
|
||||
// }
|
||||
|
||||
// // Debugln logs a message at level Debug on the standard logger.
|
||||
// func Debugln(args ...interface{}) {
|
||||
// std.Debugln(args...)
|
||||
// }
|
||||
|
||||
// // Println logs a message at level Info on the standard logger.
|
||||
// func Println(args ...interface{}) {
|
||||
// std.Println(args...)
|
||||
// }
|
||||
|
||||
// // Infoln logs a message at level Info on the standard logger.
|
||||
// func Infoln(args ...interface{}) {
|
||||
// std.Infoln(args...)
|
||||
// }
|
||||
|
||||
// // Warnln logs a message at level Warn on the standard logger.
|
||||
// func Warnln(args ...interface{}) {
|
||||
// std.Warnln(args...)
|
||||
// }
|
||||
|
||||
// // Warningln logs a message at level Warn on the standard logger.
|
||||
// func Warningln(args ...interface{}) {
|
||||
// std.Warningln(args...)
|
||||
// }
|
||||
|
||||
// // Errorln logs a message at level Error on the standard logger.
|
||||
// func Errorln(args ...interface{}) {
|
||||
// std.Errorln(args...)
|
||||
// }
|
||||
|
||||
// // Panicln logs a message at level Panic on the standard logger.
|
||||
// func Panicln(args ...interface{}) {
|
||||
// std.Panicln(args...)
|
||||
// }
|
||||
|
||||
// // Fatalln logs a message at level Fatal on the standard logger then the process will exit with status set to 1.
|
||||
// func Fatalln(args ...interface{}) {
|
||||
// std.Fatalln(args...)
|
||||
// }
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
package log
|
||||
|
||||
func Debug(args ...any) {
|
||||
sugaredLogger.Debug(args...)
|
||||
}
|
||||
|
||||
func Info(args ...any) {
|
||||
sugaredLogger.Info(args...)
|
||||
}
|
||||
|
||||
func Warn(args ...any) {
|
||||
sugaredLogger.Warn(args...)
|
||||
}
|
||||
|
||||
func Error(args ...any) {
|
||||
sugaredLogger.Error(args...)
|
||||
}
|
||||
|
||||
func Debugf(format string, args ...interface{}) {
|
||||
sugaredLogger.Debugf(format, args...)
|
||||
}
|
||||
|
||||
// Infof logs a message at level Info on the standard logger.
|
||||
func Infof(format string, args ...interface{}) {
|
||||
sugaredLogger.Infof(format, args...)
|
||||
}
|
||||
|
||||
// Warnf logs a message at level Warn on the standard logger.
|
||||
func Warnf(format string, args ...interface{}) {
|
||||
sugaredLogger.Warnf(format, args...)
|
||||
}
|
||||
|
||||
// Errorf logs a message at level Error on the standard logger.
|
||||
func Errorf(format string, args ...interface{}) {
|
||||
sugaredLogger.Errorf(format, args...)
|
||||
}
|
||||
|
||||
// Panicf logs a message at level Panic on the standard logger.
|
||||
func Panicf(format string, args ...interface{}) {
|
||||
sugaredLogger.Panicf(format, args...)
|
||||
}
|
||||
|
||||
// Fatalf logs a message at level Fatal on the standard logger then the process will exit with status set to 1.
|
||||
func Fatalf(format string, args ...interface{}) {
|
||||
sugaredLogger.Fatalf(format, args...)
|
||||
}
|
||||
@@ -0,0 +1,38 @@
|
||||
package log
|
||||
|
||||
import (
|
||||
"io"
|
||||
"os"
|
||||
)
|
||||
|
||||
type MultipleWriter []io.Writer
|
||||
|
||||
func (m *MultipleWriter) Write(p []byte) (n int, err error) {
|
||||
for _, w := range *m {
|
||||
n, err = w.Write(p)
|
||||
if err != nil {
|
||||
m.Delete(w)
|
||||
}
|
||||
}
|
||||
return len(p), nil
|
||||
}
|
||||
func (m *MultipleWriter) Delete(writer io.Writer) {
|
||||
for i, w := range *m {
|
||||
if w == writer {
|
||||
*m = append((*m)[:i], (*m)[i+1:]...)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
func (m *MultipleWriter) Add(writer io.Writer) {
|
||||
*m = append(*m, writer)
|
||||
}
|
||||
|
||||
var multipleWriter = &MultipleWriter{os.Stdout}
|
||||
|
||||
func AddWriter(writer io.Writer) {
|
||||
multipleWriter.Add(writer)
|
||||
}
|
||||
func DeleteWriter(writer io.Writer) {
|
||||
multipleWriter.Delete(writer)
|
||||
}
|
||||
@@ -24,6 +24,7 @@ import (
|
||||
"m7s.live/engine/v4/config"
|
||||
"m7s.live/engine/v4/log"
|
||||
"m7s.live/engine/v4/util"
|
||||
"m7s.live/engine/v4/lang"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -86,19 +87,23 @@ func Run(ctx context.Context, configFile string) (err error) {
|
||||
}
|
||||
}
|
||||
loglevel, err := zapcore.ParseLevel(EngineConfig.LogLevel)
|
||||
var logger log.Logger
|
||||
log.LocaleLogger = logger.Lang(lang.Get(EngineConfig.LogLang))
|
||||
if err != nil {
|
||||
log.Error("parse log level error:", err)
|
||||
logger.Error("parse log level error:", zap.Error(err))
|
||||
loglevel = zapcore.InfoLevel
|
||||
}
|
||||
log.Config.Level.SetLevel(loglevel)
|
||||
Engine.Logger = log.With(zap.Bool("engine", true))
|
||||
log.LogLevel.SetLevel(loglevel)
|
||||
Engine.Logger = log.LocaleLogger.Named("engine")
|
||||
// 使得RawConfig具备全量配置信息,用于合并到插件配置中
|
||||
Engine.RawConfig = config.Struct2Config(EngineConfig.Engine)
|
||||
Engine.assign()
|
||||
log.With(zap.String("config", "global")).Debug("", zap.Any("config", EngineConfig))
|
||||
Engine.Logger.Debug("", zap.Any("config", EngineConfig))
|
||||
EventBus = make(chan any, EngineConfig.EventBusSize)
|
||||
go EngineConfig.Listen(Engine)
|
||||
for _, plugin := range plugins {
|
||||
plugin.Logger = log.LocaleLogger.Named(plugin.Name)
|
||||
plugin.Info("initialize", zap.String("version", plugin.Version))
|
||||
userConfig := cg.GetChild(plugin.Name)
|
||||
if userConfig != nil {
|
||||
if b, err := yaml.Marshal(userConfig); err == nil {
|
||||
@@ -124,22 +129,34 @@ func Run(ctx context.Context, configFile string) (err error) {
|
||||
if ver, ok := ctx.Value("version").(string); ok && ver != "" && ver != "dev" {
|
||||
version = ver
|
||||
}
|
||||
log.Info("monibuca", version, Green(" start success"))
|
||||
if EngineConfig.LogLang == "zh" {
|
||||
log.Info("monibuca 引擎版本:", version, Green(" 启动成功"))
|
||||
} else {
|
||||
log.Info("monibuca", version, Green(" start success"))
|
||||
}
|
||||
var enabledPlugins, disabledPlugins []string
|
||||
for _, plugin := range plugins {
|
||||
if plugin.RawConfig["enable"] == false {
|
||||
if plugin.RawConfig["enable"] == false || plugin.Disabled {
|
||||
plugin.Disabled = true
|
||||
disabledPlugins = append(disabledPlugins, plugin.Name)
|
||||
} else {
|
||||
enabledPlugins = append(enabledPlugins, plugin.Name)
|
||||
}
|
||||
}
|
||||
fmt.Print("已运行的插件:")
|
||||
if EngineConfig.LogLang == "zh" {
|
||||
fmt.Print("已运行的插件:")
|
||||
} else {
|
||||
fmt.Print("enabled plugins:")
|
||||
}
|
||||
for _, plugin := range enabledPlugins {
|
||||
fmt.Print(Colorize(" "+plugin+" ", BlackFg|GreenBg|BoldFm), " ")
|
||||
}
|
||||
fmt.Println()
|
||||
fmt.Print("已禁用的插件:")
|
||||
if EngineConfig.LogLang == "zh" {
|
||||
fmt.Print("已禁用的插件:")
|
||||
} else {
|
||||
fmt.Print("disabled plugins:")
|
||||
}
|
||||
for _, plugin := range disabledPlugins {
|
||||
fmt.Print(Colorize(" "+plugin+" ", BlackFg|RedBg|CrossedOutFm), " ")
|
||||
}
|
||||
|
||||
@@ -45,10 +45,8 @@ func InstallPlugin(config config.Plugin) *Plugin {
|
||||
case *GlobalConfig:
|
||||
v.InitDefaultHttp()
|
||||
default:
|
||||
plugin.Logger = log.With(zap.String("plugin", name))
|
||||
Plugins[name] = plugin
|
||||
plugins = append(plugins, plugin)
|
||||
plugin.Info("install", zap.String("version", plugin.Version))
|
||||
}
|
||||
return plugin
|
||||
}
|
||||
@@ -67,7 +65,7 @@ type Plugin struct {
|
||||
modifiedYaml string //修改过的配置的yaml文件内容
|
||||
RawConfig config.Config //最终合并后的配置的map形式方便查询
|
||||
Modified config.Config //修改过的配置项
|
||||
*zap.Logger `json:"-"`
|
||||
*log.Logger `json:"-"`
|
||||
saveTimer *time.Timer //用于保存的时候的延迟,防抖
|
||||
Disabled bool
|
||||
}
|
||||
@@ -75,6 +73,8 @@ type Plugin struct {
|
||||
func (opt *Plugin) logHandler(pattern string, handler http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
|
||||
opt.Debug("visit", zap.String("path", r.URL.String()), zap.String("remote", r.RemoteAddr))
|
||||
name := strings.ToLower(opt.Name)
|
||||
r.URL.Path = strings.TrimPrefix(r.URL.Path, "/"+name)
|
||||
handler.ServeHTTP(rw, r)
|
||||
})
|
||||
}
|
||||
@@ -87,12 +87,12 @@ func (opt *Plugin) handle(pattern string, handler http.Handler) {
|
||||
pattern = "/" + pattern
|
||||
}
|
||||
if ok {
|
||||
opt.Debug("http handle added:" + pattern)
|
||||
opt.Debug("http handle added", zap.String("pattern", pattern))
|
||||
conf.Handle(pattern, opt.logHandler(pattern, handler))
|
||||
}
|
||||
if opt != Engine {
|
||||
pattern = "/" + strings.ToLower(opt.Name) + pattern
|
||||
opt.Debug("http handle added to engine:" + pattern)
|
||||
opt.Debug("http handle added to engine", zap.String("pattern", pattern))
|
||||
EngineConfig.Handle(pattern, opt.logHandler(pattern, handler))
|
||||
}
|
||||
apiList = append(apiList, pattern)
|
||||
@@ -122,7 +122,7 @@ func (opt *Plugin) assign() {
|
||||
if opt.RawConfig == nil {
|
||||
opt.RawConfig = config.Config{}
|
||||
} else if opt.RawConfig["enable"] == false {
|
||||
opt.Warn("disabled")
|
||||
opt.Warn("plugin disabled")
|
||||
return
|
||||
} else if opt.RawConfig["enable"] == true {
|
||||
//移除这个属性防止反序列化报错
|
||||
@@ -214,7 +214,6 @@ func (opt *Plugin) Save() error {
|
||||
}
|
||||
|
||||
func (opt *Plugin) Publish(streamPath string, pub IPublisher) error {
|
||||
opt.Info("publish", zap.String("path", streamPath))
|
||||
conf, ok := opt.Config.(config.PublishConfig)
|
||||
if !ok {
|
||||
conf = EngineConfig
|
||||
@@ -244,12 +243,19 @@ func (opt *Plugin) SubscribeExist(streamPath string, sub ISubscriber) error {
|
||||
|
||||
// Subscribe 订阅一个流,如果流不存在则创建一个等待流
|
||||
func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) error {
|
||||
opt.Info("subscribe", zap.String("path", streamPath))
|
||||
suber := sub.GetSubscriber()
|
||||
if suber == nil {
|
||||
if EngineConfig.LogLang == "zh" {
|
||||
return errors.New("不是订阅者")
|
||||
} else {
|
||||
return errors.New("not subscriber")
|
||||
}
|
||||
}
|
||||
conf, ok := opt.Config.(config.SubscribeConfig)
|
||||
if !ok {
|
||||
conf = EngineConfig
|
||||
}
|
||||
sub.GetSubscriber().Config = conf.GetSubscribeConfig()
|
||||
suber.Config = conf.GetSubscribeConfig()
|
||||
return sub.receive(streamPath, sub)
|
||||
}
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/aler9/gortsplib/pkg/mpeg4audio"
|
||||
"github.com/aler9/gortsplib/v2/pkg/codecs/mpeg4audio"
|
||||
"github.com/pion/webrtc/v3/pkg/media/rtpdump"
|
||||
"go.uber.org/zap"
|
||||
"m7s.live/engine/v4/codec"
|
||||
|
||||
+1
-1
@@ -69,7 +69,7 @@ func (p *Publisher) WriteAVCCVideo(ts uint32, frame *util.BLL, pool util.BytesPo
|
||||
case codec.CodecID_H265:
|
||||
p.VideoTrack = track.NewH265(p.Stream, pool)
|
||||
default:
|
||||
p.Stream.Error("video codecID not support: ", zap.Uint8("codeId", uint8(codecID)))
|
||||
p.Stream.Error("video codecID not support", zap.Uint8("codeId", uint8(codecID)))
|
||||
return
|
||||
}
|
||||
p.VideoTrack.WriteAVCC(ts, frame)
|
||||
|
||||
@@ -1,118 +0,0 @@
|
||||
package engine
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
"m7s.live/engine/v4/common"
|
||||
)
|
||||
|
||||
type ReportCreateStream struct {
|
||||
StreamPath string
|
||||
Time int64
|
||||
}
|
||||
|
||||
type ReportCloseStream struct {
|
||||
StreamPath string
|
||||
Time int64
|
||||
}
|
||||
|
||||
type ReportAddTrack struct {
|
||||
Name string
|
||||
StreamPath string
|
||||
Time int64
|
||||
}
|
||||
type ReportTrackInfo struct {
|
||||
BPS int
|
||||
FPS int
|
||||
Drops int
|
||||
RBSize int
|
||||
}
|
||||
type ReportPulse struct {
|
||||
StreamPath string
|
||||
Tracks map[string]ReportTrackInfo
|
||||
Subscribers map[string]struct {
|
||||
Type string
|
||||
Readers map[string]struct {
|
||||
Delay uint32
|
||||
}
|
||||
}
|
||||
Time int64
|
||||
}
|
||||
|
||||
type Reportor struct {
|
||||
Subscriber
|
||||
pulse ReportPulse
|
||||
}
|
||||
|
||||
func (r *Reportor) OnEvent(event any) {
|
||||
switch v := event.(type) {
|
||||
case PulseEvent:
|
||||
r.pulse.Tracks = make(map[string]ReportTrackInfo)
|
||||
r.pulse.Subscribers = make(map[string]struct {
|
||||
Type string
|
||||
Readers map[string]struct {
|
||||
Delay uint32
|
||||
}
|
||||
})
|
||||
r.Stream.Tracks.Range(func(k string, t common.Track) {
|
||||
track := t.GetBase()
|
||||
r.pulse.Tracks[k] = ReportTrackInfo{
|
||||
BPS: track.BPS,
|
||||
FPS: track.FPS,
|
||||
Drops: track.Drops,
|
||||
RBSize: t.GetRBSize(),
|
||||
}
|
||||
})
|
||||
r.Stream.Subscribers.RangeAll(func(sub ISubscriber, wait *waitTracks) {
|
||||
suber := sub.GetSubscriber()
|
||||
r.pulse.Subscribers[suber.ID] = struct {
|
||||
Type string
|
||||
Readers map[string]struct {
|
||||
Delay uint32
|
||||
}
|
||||
}{Type: suber.Type, Readers: map[string]struct {
|
||||
Delay uint32
|
||||
}{suber.Audio.Name: {Delay: suber.AudioReader.Delay}, suber.Video.Name: {Delay: suber.VideoReader.Delay}}}
|
||||
})
|
||||
r.pulse.Time = time.Now().Unix()
|
||||
EngineConfig.Report("pulse", r.pulse)
|
||||
case common.Track:
|
||||
EngineConfig.Report("addtrack", &ReportAddTrack{v.GetBase().Name, r.Stream.Path, time.Now().Unix()})
|
||||
}
|
||||
}
|
||||
|
||||
func (conf *GlobalConfig) OnEvent(event any) {
|
||||
if !conf.GetEnableReport() {
|
||||
conf.Engine.OnEvent(event)
|
||||
return
|
||||
}
|
||||
switch v := event.(type) {
|
||||
case SEcreate:
|
||||
conf.Report("create", &ReportCreateStream{v.Target.Path, time.Now().Unix()})
|
||||
var reportor Reportor
|
||||
reportor.IsInternal = true
|
||||
reportor.pulse.StreamPath = v.Target.Path
|
||||
if Engine.Subscribe(v.Target.Path, &reportor) == nil {
|
||||
reportor.SubPulse()
|
||||
}
|
||||
case SEpublish:
|
||||
case SErepublish:
|
||||
case SEKick:
|
||||
case SEclose:
|
||||
conf.Report("close", &ReportCloseStream{v.Target.Path, time.Now().Unix()})
|
||||
case SEwaitClose:
|
||||
case SEwaitPublish:
|
||||
case ISubscriber:
|
||||
case UnsubscribeEvent:
|
||||
default:
|
||||
conf.Engine.OnEvent(event)
|
||||
}
|
||||
}
|
||||
|
||||
func (conf *GlobalConfig) Report(t string, v any) {
|
||||
out, err := yaml.Marshal(v)
|
||||
if err == nil {
|
||||
conf.Engine.OnEvent(append([]byte("type: "+t+"\n"), out...))
|
||||
}
|
||||
}
|
||||
@@ -160,7 +160,7 @@ func (tracks *Tracks) MarshalJSON() ([]byte, error) {
|
||||
type Stream struct {
|
||||
timeout *time.Timer //当前状态的超时定时器
|
||||
actionChan util.SafeChan[any]
|
||||
*zap.Logger
|
||||
*log.Logger
|
||||
StartTime time.Time //创建时间
|
||||
StreamTimeoutConfig
|
||||
Path string
|
||||
@@ -239,7 +239,7 @@ func findOrCreateStream(streamPath string, waitTimeout time.Duration) (s *Stream
|
||||
timeout: time.NewTimer(waitTimeout),
|
||||
}
|
||||
s.Subscribers.Init()
|
||||
s.Logger = log.With(zap.String("stream", streamPath))
|
||||
s.Logger = log.LocaleLogger.With(zap.String("stream", streamPath))
|
||||
s.Info("created")
|
||||
Streams.Map[streamPath] = s
|
||||
s.actionChan.Init(1)
|
||||
@@ -282,7 +282,7 @@ func (r *Stream) action(action StreamAction) (ok bool) {
|
||||
waitTime = time.Millisecond * 10 //没有订阅者也没有配置发布者等待重连时间,默认10ms后关闭流
|
||||
}
|
||||
r.timeout.Reset(waitTime)
|
||||
r.Debug("wait publish", zap.Duration("wait", waitTime))
|
||||
r.Debug("wait publisher", zap.Duration("wait timeout", waitTime))
|
||||
case STATE_PUBLISHING:
|
||||
if len(r.SEHistory) > 1 {
|
||||
stateEvent = SErepublish{event}
|
||||
@@ -362,7 +362,7 @@ func (s *Stream) onSuberClose(sub ISubscriber) {
|
||||
// 流状态处理中枢,包括接收订阅发布指令等
|
||||
func (s *Stream) run() {
|
||||
EventBus <- SEcreate{StreamEvent{Event[*Stream]{Target: s, Time: time.Now()}}}
|
||||
pulseTicker := time.NewTicker(time.Second * 5)
|
||||
pulseTicker := time.NewTicker(EngineConfig.PulseInterval)
|
||||
defer pulseTicker.Stop()
|
||||
pulseSuber := make(map[ISubscriber]struct{})
|
||||
for {
|
||||
@@ -387,7 +387,7 @@ func (s *Stream) run() {
|
||||
s.Tracks.ModifyRange(func(name string, t Track) {
|
||||
// track 超过一定时间没有更新数据了
|
||||
if lastWriteTime := t.LastWriteTime(); !lastWriteTime.IsZero() && time.Since(lastWriteTime) > s.PublishTimeout {
|
||||
s.Warn("track timeout", zap.String("name", name), zap.Time("lastWriteTime", lastWriteTime), zap.Duration("timeout", s.PublishTimeout))
|
||||
s.Warn("track timeout", zap.String("name", name), zap.Time("last writetime", lastWriteTime), zap.Duration("timeout", s.PublishTimeout))
|
||||
delete(s.Tracks.Map.Map, name)
|
||||
var event TrackTimeoutEvent
|
||||
event.Target = t
|
||||
|
||||
+3
-2
@@ -165,7 +165,7 @@ func (s *Subscriber) IsPlaying() bool {
|
||||
}
|
||||
|
||||
func (s *Subscriber) SubPulse() {
|
||||
s.Stream.Receive(SubPulse{s})
|
||||
s.Stream.Receive(SubPulse{s.Spesific.(ISubscriber)})
|
||||
}
|
||||
|
||||
func (s *Subscriber) PlayRaw() {
|
||||
@@ -210,10 +210,11 @@ func (s *Subscriber) PlayBlock(subType byte) {
|
||||
switch subType {
|
||||
case SUBTYPE_RAW:
|
||||
sendVideoFrame = func(frame *AVFrame) {
|
||||
// fmt.Println("v", frame.Sequence, s.VideoReader.AbsTime, frame.IFrame)
|
||||
// fmt.Println("v", s.VideoReader.Delay)
|
||||
spesic.OnEvent(VideoFrame{frame, s.Video, s.VideoReader.AbsTime, s.VideoReader.GetPTS32(), s.VideoReader.GetDTS32()})
|
||||
}
|
||||
sendAudioFrame = func(frame *AVFrame) {
|
||||
// fmt.Println("a", s.AudioReader.Delay)
|
||||
// fmt.Println("a", frame.Sequence, s.AudioReader.AbsTime)
|
||||
spesic.OnEvent(AudioFrame{frame, s.Audio, s.AudioReader.AbsTime, s.AudioReader.GetPTS32(), s.AudioReader.GetDTS32()})
|
||||
}
|
||||
|
||||
+9
-3
@@ -49,7 +49,13 @@ func (s *Subscribers) Len() int {
|
||||
return len(s.public)
|
||||
}
|
||||
|
||||
func (s *Subscribers) RangeAll(f func(sub ISubscriber, wait *waitTracks)) {
|
||||
func (s *Subscribers) RangeAll(f func(sub ISubscriber)) {
|
||||
s.rangeAll(func(sub ISubscriber, wait *waitTracks) {
|
||||
f(sub)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Subscribers) rangeAll(f func(sub ISubscriber, wait *waitTracks)) {
|
||||
for sub, wait := range s.internal {
|
||||
f(sub, wait)
|
||||
}
|
||||
@@ -59,7 +65,7 @@ func (s *Subscribers) RangeAll(f func(sub ISubscriber, wait *waitTracks)) {
|
||||
}
|
||||
|
||||
func (s *Subscribers) OnTrack(track common.Track) {
|
||||
s.RangeAll(func(sub ISubscriber, wait *waitTracks) {
|
||||
s.rangeAll(func(sub ISubscriber, wait *waitTracks) {
|
||||
if _, ok := s.waits[wait]; ok {
|
||||
if wait.Accept(track) {
|
||||
delete(s.waits, wait)
|
||||
@@ -71,7 +77,7 @@ func (s *Subscribers) OnTrack(track common.Track) {
|
||||
}
|
||||
|
||||
func (s *Subscribers) OnPublisherLost(event StateEvent) {
|
||||
s.RangeAll(func(sub ISubscriber, wait *waitTracks) {
|
||||
s.rangeAll(func(sub ISubscriber, wait *waitTracks) {
|
||||
if _, ok := s.waits[wait]; ok {
|
||||
wait.Reject(ErrPublisherLost)
|
||||
delete(s.waits, wait)
|
||||
|
||||
+1
-1
@@ -267,7 +267,7 @@ func (av *Media) Flush() {
|
||||
}
|
||||
curValue.DeltaTime = uint32((curValue.Timestamp - preValue.Timestamp) / time.Millisecond)
|
||||
}
|
||||
// fmt.Println(av.Name, curValue.Timestamp, curValue.DeltaTime)
|
||||
// fmt.Println(av.Name, curValue.DTS, curValue.Timestamp, curValue.DeltaTime)
|
||||
if curValue.AUList.Length > 0 {
|
||||
// 补完RTP
|
||||
if config.Global.EnableRTP && curValue.RTP.Length == 0 {
|
||||
|
||||
+8
-6
@@ -46,12 +46,14 @@ func (vt *H265) WriteSliceBytes(slice []byte) {
|
||||
case codec.NAL_UNIT_PPS:
|
||||
vt.PPS = slice
|
||||
vt.ParamaterSets[2] = slice
|
||||
extraData, err := codec.BuildH265SeqHeaderFromVpsSpsPps(vt.VPS, vt.SPS, vt.PPS)
|
||||
if err == nil {
|
||||
vt.WriteSequenceHead(extraData)
|
||||
} else {
|
||||
vt.Error("H265 BuildH265SeqHeaderFromVpsSpsPps", zap.Error(err))
|
||||
vt.Stream.Close()
|
||||
if vt.VPS != nil && vt.SPS != nil && vt.PPS != nil {
|
||||
extraData, err := codec.BuildH265SeqHeaderFromVpsSpsPps(vt.VPS, vt.SPS, vt.PPS)
|
||||
if err == nil {
|
||||
vt.WriteSequenceHead(extraData)
|
||||
} else {
|
||||
vt.Error("H265 BuildH265SeqHeaderFromVpsSpsPps", zap.Error(err))
|
||||
vt.Stream.Close()
|
||||
}
|
||||
}
|
||||
case
|
||||
codec.NAL_UNIT_CODED_SLICE_BLA,
|
||||
|
||||
+2
-1
@@ -7,6 +7,7 @@ import (
|
||||
|
||||
"go.uber.org/zap"
|
||||
"m7s.live/engine/v4/common"
|
||||
"m7s.live/engine/v4/log"
|
||||
"m7s.live/engine/v4/util"
|
||||
)
|
||||
|
||||
@@ -31,7 +32,7 @@ type AVRingReader struct {
|
||||
Frame *common.AVFrame
|
||||
AbsTime uint32
|
||||
Delay uint32
|
||||
*zap.Logger
|
||||
*log.Logger
|
||||
}
|
||||
|
||||
func (r *AVRingReader) DecConfChanged() bool {
|
||||
|
||||
+17
-18
@@ -7,6 +7,8 @@ import (
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
func ReturnJson[T any](fetch func() T, tickDur time.Duration, rw http.ResponseWriter, r *http.Request) {
|
||||
@@ -24,24 +26,21 @@ func ReturnJson[T any](fetch func() T, tickDur time.Duration, rw http.ResponseWr
|
||||
}
|
||||
}
|
||||
|
||||
// func GetJsonHandler[T any](fetch func() T, tickDur time.Duration) http.HandlerFunc {
|
||||
// return func(rw http.ResponseWriter, r *http.Request) {
|
||||
// if r.URL.Query().Get("json") != "" {
|
||||
// if err := json.NewEncoder(rw).Encode(fetch()); err != nil {
|
||||
// rw.WriteHeader(500)
|
||||
// }
|
||||
// return
|
||||
// }
|
||||
// sse := NewSSE(rw, r.Context())
|
||||
// tick := time.NewTicker(tickDur)
|
||||
// for range tick.C {
|
||||
// if sse.WriteJSON(fetch()) != nil {
|
||||
// tick.Stop()
|
||||
// break
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
func ReturnYaml[T any](fetch func() T, tickDur time.Duration, rw http.ResponseWriter, r *http.Request) {
|
||||
if r.Header.Get("Accept") == "text/event-stream" {
|
||||
sse := NewSSE(rw, r.Context())
|
||||
tick := time.NewTicker(tickDur)
|
||||
defer tick.Stop()
|
||||
for range tick.C {
|
||||
if sse.WriteYAML(fetch()) != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
} else if err := yaml.NewEncoder(rw).Encode(fetch()); err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
func ListenUDP(address string, networkBuffer int) (*net.UDPConn, error) {
|
||||
addr, err := net.ResolveUDPAddr("udp", address)
|
||||
|
||||
@@ -6,6 +6,8 @@ import (
|
||||
"net"
|
||||
"net/http"
|
||||
"os/exec"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -59,6 +61,9 @@ func NewSSE(w http.ResponseWriter, ctx context.Context) *SSE {
|
||||
func (sse *SSE) WriteJSON(data any) error {
|
||||
return json.NewEncoder(sse).Encode(data)
|
||||
}
|
||||
func (sse *SSE) WriteYAML(data any) error {
|
||||
return yaml.NewEncoder(sse).Encode(data)
|
||||
}
|
||||
func (sse *SSE) WriteExec(cmd *exec.Cmd) error {
|
||||
cmd.Stderr = sse
|
||||
cmd.Stdout = sse
|
||||
|
||||
Reference in New Issue
Block a user