1、增加内置鉴权机制(同腾讯云)

2、增加trace级别日志功能
3、消除rtp包的padding
4、自动创建BytesPool
5、StreamName修改为除去app后的部分
This commit is contained in:
langhuihui
2023-04-24 13:43:50 +08:00
parent e9177794cb
commit 2eea5c3706
13 changed files with 113 additions and 22 deletions
+39 -2
View File
@@ -30,7 +30,7 @@
- 获取系统基本情况 `/api/summary` 返回值Summary数据
- 获取所有插件信息 `/api/plugins` 返回值Plugin数据
- 获取指定的配置信息 `/api/getconfig?name=xxx` 返回xxx插件的配置信息,如果不带参数或参数为空则返回全局配置
- 修改并保存配置信息 `/api/modifyconfig?name=xxx` 修改xxx插件的配置信息,在请求的body中传入修改后的配置json字符串
- 修改并保存配置信息 `/api/modifyconfig?name=xxx&yaml=1` 修改xxx插件的配置信息,在请求的body中传入修改后的配置yaml字符串
- 热更新配置信息 `/api/updateconfig?name=xxx` 热更新xxx插件的配置信息,如果不带参数或参数为空则热更新全局配置
- 获取所有远端拉流信息 `/api/list/pull` 返回{RemoteURL:"",StreamPath:"",Type:"",StartTime:""}
- 获取所有向远端推流信息 `/api/list/push` 返回{RemoteURL:"",StreamPath:"",Type:"",StartTime:""}
@@ -58,11 +58,23 @@ global:
publishtimeout: 10s # 发布流默认过期时间,超过该时间发布者没有恢复流将被删除
delayclosetimeout: 0 # 自动关闭触发后延迟的时间(期间内如果有新的订阅则取消触发关闭),0为关闭该功能,保持连接。
waitclosetimeout: 0 # 发布者断开后等待时间,超过该时间发布者没有恢复流将被删除,0为关闭该功能,由订阅者决定是否删除
key: # 发布鉴权key
secretargname: secret # 发布鉴权参数名
expireargname: expire # 发布鉴权失效时间参数名
subscribe:
subaudio: true # 是否订阅音频流
subvideo: true # 是否订阅视频流
subaudioargname: ats # 订阅音频轨道参数名
subvideoargname: vts # 订阅视频轨道参数名
subdataargname: dts # 订阅数据轨道参数名
subaudiotracks: [] # 订阅音频轨道名称列表
subvideotracks: [] # 订阅视频轨道名称列表
submode: 0 # 订阅模式,0为跳帧追赶模式,1为不追赶(多用于录制),2为时光回溯模式
iframeonly: false # 只订阅关键帧
waittimeout: 10s # 等待发布者超时时间,用于订阅尚未发布的流
waittimeout: 10s # 等待发布者超时时间,用于订阅尚未发布的流
key: # 订阅鉴权key
secretargname: secret # 订阅鉴权参数名
expireargname: expire # 订阅鉴权失效时间参数名
enableavcc : true # 启用AVCC格式缓存,用于rtmp协议
enablertp : true # 启用rtp格式缓存,用于rtsp、websocket、gb28181协议
enableauth: true # 启用鉴权,详细查看鉴权机制
@@ -104,8 +116,31 @@ stateDiagram-v2
```
# 鉴权机制
## 默认鉴权
在publish 和 subscribe 中配置 key 引擎会自动进行鉴权,
推流或者拉流时需要在url中添加参数 secret=xxx&expire=xxx。
- secret为鉴权前面,MD5(key+StreamPath+expire)
- expire为鉴权失效时间,格式是十六进制 UNIX 时间戳
### 时间戳计算
```
设置时间:2018.12.01 08:30:00
十进制 UNIX 时间戳:1543624200
十六进制 UNIX 时间戳:5C01D608(云直播鉴权配置使用十六进制 UNIX 时间戳,十六进制不区分字母大小写)
```
### 鉴权签名计算
```
secret = MD5(key+StreamPath+expire)
secret = MD5(ngoeiq03+test/01+5C01D608)
secret = MD5(ngoeiq03test/015C01D608)
secret = ce797dc6238156d548ef945e6ad1ea20
```
## 单独鉴权
如果需要自定义鉴权,可以在插件中实现鉴权接口,
引擎中定义如下两个接口,插件中的发布者或者订阅者可以实现这两个接口,引擎会在发布或者订阅时调用这两个接口进行鉴权
```go
type AuthSub interface {
@@ -120,6 +155,8 @@ type AuthPub interface {
- Promise方便异步鉴权,可以后续调用其Resolve或Reject方法进行鉴权结果的返回
## 全局鉴权
自定义鉴权也可以全局生效,
引擎中定义如下两个全局函数的变量,插件中可以对这两个变量进行赋值,引擎会在发布或者订阅时调用这两个接口进行鉴权
```go
var OnAuthSub func(p *util.Promise[ISubscriber]) error
+6 -1
View File
@@ -35,6 +35,9 @@ type Publish struct {
WaitCloseTimeout time.Duration `default:"0s"` // 延迟自动关闭(等待重连)
DelayCloseTimeout time.Duration `default:"0s"` // 延迟自动关闭(无订阅时)
BufferTime time.Duration `default:"0s"` // 缓冲长度(单位:秒),0代表取最近关键帧
Key string // 发布鉴权key
SecretArgName string `default:"secret"` // 发布鉴权参数名
ExpireArgName string `default:"expire"` // 发布鉴权失效时间参数名
}
func (c Publish) GetPublishConfig() Publish {
@@ -56,6 +59,9 @@ type Subscribe struct {
WaitTimeout time.Duration `default:"10s"` // 等待流超时
WriteBufferSize int `default:"0"` // 写缓冲大小
Poll time.Duration `default:"20ms"` // 读取Ring时的轮询间隔,单位毫秒
Key string // 订阅鉴权key
SecretArgName string `default:"secret"` // 订阅鉴权参数名
ExpireArgName string `default:"expire"` // 订阅鉴权失效时间参数名
}
func (c *Subscribe) GetSubscribeConfig() *Subscribe {
@@ -126,7 +132,6 @@ type Engine struct {
SpeedLimit time.Duration `default:"500ms"` //速度限制最大等待时间
EventBusSize int `default:"10"` //事件总线大小
PulseInterval time.Duration `default:"5s"` //心跳事件间隔
PrintTs bool `default:"false"` //打印时间戳
enableReport bool `default:"false"` //启用报告,用于统计和监控
reportStream quic.Stream // console server connection
instanceId string // instance id 来自console
+25 -1
View File
@@ -2,10 +2,12 @@ package engine
import (
"context"
"crypto/md5"
"errors"
"io"
"net/url"
"reflect"
"strconv"
"strings"
"time"
@@ -35,7 +37,7 @@ type IO struct {
ID string
Type string
context.Context `json:"-" yaml:"-"` //不要直接设置,应当通过OnEvent传入父级Context
context.CancelFunc `json:"-" yaml:"-"` //流关闭是关闭发布者或者订阅者
context.CancelFunc `json:"-" yaml:"-"` //流关闭是关闭发布者或者订阅者
*log.Logger `json:"-" yaml:"-"`
StartTime time.Time //创建时间
Stream *Stream `json:"-" yaml:"-"`
@@ -122,6 +124,7 @@ var (
ErrBadTrackName = errors.New("Track Already Exist")
ErrStreamIsClosed = errors.New("Stream Is Closed")
ErrPublisherLost = errors.New("Publisher Lost")
ErrAuth = errors.New("Auth Failed")
OnAuthSub func(p *util.Promise[ISubscriber]) error
OnAuthPub func(p *util.Promise[IPublisher]) error
)
@@ -202,6 +205,16 @@ func (io *IO) receive(streamPath string, specific IIO) error {
if err != nil {
return err
}
} else if conf.Key != "" {
secret := io.Args.Get(conf.SecretArgName)
t := io.Args.Get(conf.ExpireArgName)
if unixTime, err := strconv.ParseInt(t, 16, 64); err != nil || time.Now().Unix() > unixTime {
return ErrAuth
}
trueSecret := md5.Sum([]byte(conf.Key + s.StreamName + t))
if string(trueSecret[:]) != secret {
return ErrAuth
}
}
}
if promise := util.NewPromise(specific.(IPublisher)); s.Receive(promise) {
@@ -209,6 +222,7 @@ func (io *IO) receive(streamPath string, specific IIO) error {
return err
}
} else {
io.Type = strings.TrimSuffix(io.Type, "Subscriber")
io.Info("subscribe")
if create {
@@ -232,6 +246,16 @@ func (io *IO) receive(streamPath string, specific IIO) error {
if err != nil {
return err
}
} else if conf := specific.(ISubscriber).GetSubscriber().Config; conf.Key != "" {
secret := io.Args.Get(conf.SecretArgName)
t := io.Args.Get(conf.ExpireArgName)
if unixTime, err := strconv.ParseInt(t, 16, 64); err != nil || time.Now().Unix() > unixTime {
return ErrAuth
}
trueSecret := md5.Sum([]byte(conf.Key + s.StreamName + t))
if string(trueSecret[:]) != secret {
return ErrAuth
}
}
}
if promise := util.NewPromise(specific.(ISubscriber)); s.Receive(promise) {
+9
View File
@@ -33,6 +33,7 @@ var engineConfig = zapcore.EncoderConfig{
},
}
var LogLevel = zap.NewAtomicLevelAt(zap.DebugLevel)
var Trace bool
var logger *zap.Logger = zap.New(
zapcore.NewCore(zapcore.NewConsoleEncoder(engineConfig), zapcore.AddSync(multipleWriter), LogLevel),
)
@@ -47,6 +48,7 @@ type Zap interface {
Lang(lang map[string]string) *Logger
Named(name string) *Logger
With(fields ...zap.Field) *Logger
Trace(msg string, fields ...zap.Field)
Debug(msg string, fields ...zap.Field)
Info(msg string, fields ...zap.Field)
Warn(msg string, fields ...zap.Field)
@@ -92,6 +94,13 @@ func (l *Logger) formatLang(msg *string, fields []zapcore.Field) {
}
}
func (l *Logger) Trace(msg string, fields ...zap.Field) {
if Trace {
l.formatLang(&msg, fields)
l.Logger.Debug(msg, fields...)
}
}
func (l *Logger) Debug(msg string, fields ...zap.Field) {
l.formatLang(&msg, fields)
l.Logger.Debug(msg, fields...)
+11 -5
View File
@@ -86,14 +86,20 @@ func Run(ctx context.Context, configFile string) (err error) {
log.Error("parsing yml error:", err)
}
}
loglevel, err := zapcore.ParseLevel(EngineConfig.LogLevel)
var logger log.Logger
log.LocaleLogger = logger.Lang(lang.Get(EngineConfig.LogLang))
if err != nil {
logger.Error("parse log level error:", zap.Error(err))
loglevel = zapcore.InfoLevel
if EngineConfig.LogLevel == "trace" {
log.Trace = true
log.LogLevel.SetLevel(zap.DebugLevel)
} else {
loglevel, err := zapcore.ParseLevel(EngineConfig.LogLevel)
if err != nil {
logger.Error("parse log level error:", zap.Error(err))
loglevel = zapcore.InfoLevel
}
log.LogLevel.SetLevel(loglevel)
}
log.LogLevel.SetLevel(loglevel)
Engine.Logger = log.LocaleLogger.Named("engine")
// 使得RawConfig具备全量配置信息,用于合并到插件配置中
Engine.RawConfig = config.Struct2Config(&EngineConfig.Engine, "GLOBAL")
+1 -1
View File
@@ -241,7 +241,7 @@ func findOrCreateStream(streamPath string, waitTimeout time.Duration) (s *Stream
s = &Stream{
Path: streamPath,
AppName: p[0],
StreamName: util.LastElement(p),
StreamName: strings.Join(p[1:], "/"),
StartTime: time.Now(),
timeout: time.NewTimer(waitTimeout),
}
+1 -2
View File
@@ -3,7 +3,6 @@ package engine
import (
"bufio"
"context"
"fmt"
"io"
"net"
"strconv"
@@ -292,7 +291,7 @@ func (s *Subscriber) PlayBlock(subType byte) {
sendFlvFrame(codec.FLV_TAG_TYPE_VIDEO, s.VideoReader.AbsTime, frame.AVCC.ToBuffers()...)
}
sendAudioFrame = func(frame *AVFrame) {
fmt.Println(frame.Sequence, s.AudioReader.AbsTime, s.AudioReader.Delay)
// fmt.Println(frame.Sequence, s.AudioReader.AbsTime, s.AudioReader.Delay)
sendFlvFrame(codec.FLV_TAG_TYPE_AUDIO, s.AudioReader.AbsTime, frame.AVCC.ToBuffers()...)
}
}
+6 -3
View File
@@ -17,7 +17,7 @@ var _ SpesificTrack = (*AAC)(nil)
func NewAAC(stream IStream, stuff ...any) (aac *AAC) {
aac = &AAC{
Mode: 2,
Mode: 2,
}
aac.SizeLength = 13
aac.IndexLength = 3
@@ -27,6 +27,9 @@ func NewAAC(stream IStream, stuff ...any) (aac *AAC) {
aac.SampleSize = 16
aac.SetStuff("aac", stream, int(256+128), byte(97), aac, time.Millisecond*10)
aac.SetStuff(stuff...)
if aac.BytesPool == nil {
aac.BytesPool = make(util.BytesPool, 17)
}
aac.AVCCHead = []byte{0xAF, 1}
return
}
@@ -34,8 +37,8 @@ func NewAAC(stream IStream, stuff ...any) (aac *AAC) {
type AAC struct {
Audio
Mode int // 1为lbr2为hbr
fragments *util.BLL // 用于处理不完整的AU,缺少的字节数
Mode int // 1为lbr2为hbr
fragments *util.BLL // 用于处理不完整的AU,缺少的字节数
}
func (aac *AAC) WriteADTS(ts uint32, adts []byte) {
+1 -6
View File
@@ -1,7 +1,6 @@
package track
import (
"fmt"
"time"
"unsafe"
@@ -264,11 +263,7 @@ func (av *Media) Flush() {
}
curValue.DeltaTime = uint32((curValue.Timestamp - preValue.Timestamp) / time.Millisecond)
}
if config.Global.PrintTs {
fmt.Println(av.Name, curValue.DTS, av.deltaDTSRange, curValue.DTS-preValue.DTS, curValue.Timestamp, curValue.DeltaTime)
}
av.Trace("write", zap.Uint32("seq", curValue.Sequence), zap.Duration("dts", curValue.DTS), zap.Duration("dts delta", curValue.DTS-preValue.DTS), zap.Uint32("delta", curValue.DeltaTime), zap.Duration("timestamp", curValue.Timestamp))
bufferTime := av.Stream.GetPublisherConfig().BufferTime
if bufferTime > 0 && av.IDRingList.Length > 1 && curValue.Timestamp-av.IDRingList.Next.Next.Value.Value.Timestamp > bufferTime {
av.ShiftIDR()
+3
View File
@@ -31,6 +31,9 @@ func NewG711(stream IStream, alaw bool, stuff ...any) (g711 *G711) {
g711.AVCCHead = []byte{(byte(g711.CodecID) << 4) | (1 << 1)}
g711.SetStuff(stream, int(32), uint32(8000), g711, time.Millisecond*10)
g711.SetStuff(stuff...)
if g711.BytesPool == nil {
g711.BytesPool = make(util.BytesPool, 17)
}
g711.Attach()
return
}
+4 -1
View File
@@ -21,6 +21,9 @@ func NewH264(stream IStream, stuff ...any) (vt *H264) {
vt.Video.CodecID = codec.CodecID_H264
vt.SetStuff("h264", int(256), byte(96), uint32(90000), stream, vt, time.Millisecond*10)
vt.SetStuff(stuff...)
if vt.BytesPool == nil {
vt.BytesPool = make(util.BytesPool, 17)
}
vt.ParamaterSets = make(ParamaterSets, 2)
vt.nalulenSize = 4
vt.dtsEst = NewDTSEstimator()
@@ -29,7 +32,7 @@ func NewH264(stream IStream, stuff ...any) (vt *H264) {
func (vt *H264) WriteSliceBytes(slice []byte) {
naluType := codec.ParseH264NALUType(slice[0])
// vt.Info("naluType", zap.Uint8("naluType", naluType.Byte()))
vt.Trace("naluType", zap.Uint8("naluType", naluType.Byte()))
switch naluType {
case codec.NALU_SPS:
spsInfo, _ := codec.ParseSPS(slice)
+3
View File
@@ -22,6 +22,9 @@ func NewH265(stream IStream, stuff ...any) (vt *H265) {
vt.Video.CodecID = codec.CodecID_H265
vt.SetStuff("h265", int(256), byte(96), uint32(90000), stream, vt, time.Millisecond*10)
vt.SetStuff(stuff...)
if vt.BytesPool == nil {
vt.BytesPool = make(util.BytesPool, 17)
}
vt.ParamaterSets = make(ParamaterSets, 3)
vt.nalulenSize = 4
vt.dtsEst = NewDTSEstimator()
+4
View File
@@ -14,6 +14,9 @@ const RTPMTU = 1400
// WriteRTPPack 写入已反序列化的RTP包,已经排序过了的
func (av *Media) WriteRTPPack(p *rtp.Packet) {
var frame RTPFrame
p.SSRC = av.SSRC
p.Padding = false
p.PaddingSize = 0
frame.Packet = p
av.Value.BytesIn += len(frame.Payload) + 12
av.Value.RTP.PushValue(frame)
@@ -28,6 +31,7 @@ func (av *Media) WriteRTPPack(p *rtp.Packet) {
// WriteRTPFrame 写入未反序列化的RTP包, 未排序的
func (av *Media) WriteRTP(raw *util.ListItem[RTPFrame]) {
for frame := av.recorderRTP(raw); frame != nil; frame = av.nextRTPFrame() {
frame.Value.SSRC = av.SSRC
av.Value.BytesIn += len(frame.Value.Payload) + 12
av.DropCount += int(av.lastSeq - av.lastSeq2 - 1)
if len(frame.Value.Payload) > 0 {