Merge branch 'v3' of github.com:WXC9102/engine into v3

This commit is contained in:
wxc
2022-07-07 06:53:26 +08:00
7 changed files with 95 additions and 34 deletions
+2
View File
@@ -13,6 +13,8 @@ PublishTimeout = 60
AutoCloseDelay = 10
# 启用RTP包乱序重排
RTPReorder = false
# 按需发布等流时间 单位秒
OnDemandPublishTimeout = 20
```
# 引擎的基本功能
+1
View File
@@ -14,6 +14,7 @@ const (
HOOK_UNSUBSCRIBE = "UnSubscibe"
HOOK_STREAMCLOSE = "StreamClose"
HOOK_PUBLISH = "Publish"
HOOK_ONDEMAND_PUBLISH = "hookOndemandPublish"
HOOK_REQUEST_TRANSAUDIO = "RequestTransAudio"
)
+6 -5
View File
@@ -11,13 +11,14 @@ func TestAddHook(t *testing.T) {
t.Run(t.Name(), func(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go AddHook("test", func(a, b int) {
fmt.Printf("on test,%d,%d", a, b)
AddHook("test", func(a, b int) {
fmt.Printf("on test1,%d,%d\n", a, b)
})
go AddHook("done", wg.Done)
AddHook("done", wg.Done)
<-time.After(time.Millisecond * 100)
TriggerHook("test", 2, 10)
go AddHook("test", func(a, b int) {
fmt.Printf("on test,%d,%d", a, b)
AddHook("test", func(a, b int) {
fmt.Printf("on test2,%d,%d\n", a, b)
})
<-time.After(time.Millisecond * 100)
TriggerHook("test", 1, 12)
+19 -12
View File
@@ -1,6 +1,7 @@
package engine
import (
"bytes"
"context"
"encoding/json"
"fmt"
@@ -26,13 +27,14 @@ var Version = "3.2.2"
var (
config = &struct {
EnableAudio bool
EnableVideo bool
PublishTimeout time.Duration
MaxRingSize int
AutoCloseAfter int
RTPReorder bool
}{true, true, 60, 256, -1, false}
EnableAudio bool
EnableVideo bool
PublishTimeout time.Duration
MaxRingSize int
AutoCloseDelay int
RTPReorder bool
OnDemandPublishTimeout time.Duration
}{true, true, 60, 256, -1, false, 10}
// ConfigRaw 配置信息的原始数据
ConfigRaw []byte
StartTime time.Time //启动时间
@@ -95,6 +97,7 @@ func Run(ctx context.Context, configFile string) (err error) {
log.Println(err)
}
config.PublishTimeout *= time.Second
config.OnDemandPublishTimeout *= time.Second
}
for name, config := range Plugins {
if cfg, ok := cg[name]; ok {
@@ -116,13 +119,17 @@ func Run(ctx context.Context, configFile string) (err error) {
}
UUID := uuid.NewString()
reportTimer := time.NewTimer(time.Minute)
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, "http://monibuca.com:2022/report/engine", nil)
req.Header.Set("os", runtime.GOOS)
req.Header.Set("version", Version)
req.Header.Set("uuid", UUID)
contentBuf := bytes.NewBuffer(nil)
req, _ := http.NewRequestWithContext(ctx, http.MethodPost, "https://logs-01.loggly.com/inputs/758a662d-f630-40cb-95ed-2502a5e9c872/tag/monibuca/", nil)
req.Header.Set("Content-Type", "application/json")
content := fmt.Sprintf(`{"uuid":"%s","version":"%s","os":"%s","arch":"%s"`, UUID, Version, runtime.GOOS, runtime.GOARCH)
var c http.Client
for {
req.Header.Set("streams", fmt.Sprintf("%d", len(Streams.m)))
contentBuf.Reset()
postJson := fmt.Sprintf(`%s,"streams":%d}`, content, len(Streams.m))
contentBuf.WriteString(postJson)
req.Body = ioutil.NopCloser(contentBuf)
c.Do(req)
select {
case <-ctx.Done():
+15 -4
View File
@@ -54,19 +54,30 @@ func (r *RTPDemuxer) push(ts uint32, payload []byte) {
r.OnDemux(r.PTS, r.Payload)
r.lastTs = ts
}
func IsNewer(value uint16, prev_value uint16) bool {
if prev_value == 0 {
prev_value = value
}
seq := int32(value)
prevSeq := int32(prev_value)
if (seq - prevSeq == 32768) || (seq!=prevSeq && seq - prevSeq < 32768) {
return true
}
return false
}
func (r *RTPDemuxer) Push(rtpRaw []byte) {
if err := r.Unmarshal(rtpRaw); err != nil {
utils.Println("RTP Unmarshal error", err)
return
}
if config.RTPReorder {
if r.SequenceNumber < r.lastSeq {
return
} else if r.lastSeq == 0 {
if !IsNewer(r.SequenceNumber, r.lastSeq) {
r.timestamp = time.Now()
r.tryPop(r.Timestamp, r.Payload)
r.lastSeq = r.SequenceNumber
} else if r.lastSeq+1 == r.SequenceNumber {
}else if r.lastSeq+1 == r.SequenceNumber {
r.tryPop(r.Timestamp, r.Payload)
} else if _, ok := r.orderMap[r.SequenceNumber]; !ok {
r.orderMap[r.SequenceNumber] = RTPNalu{
+38 -10
View File
@@ -60,6 +60,34 @@ func FindStream(streamPath string) *Stream {
return Streams.GetStream(streamPath)
}
func WaitStream(streamPath string, waitTimeout time.Duration) (s *Stream) {
s = nil
p := strings.Split(streamPath, "/")
if len(p) < 2 {
utils.Print(Red("Stream Path Format Error:"), streamPath)
return nil
}
TriggerHook(HOOK_ONDEMAND_PUBLISH, streamPath)
c2 := make(chan string, 1)
defer close(c2)
go func() {
for {
if s = FindStream(streamPath); s != nil {
c2 <- "done"
break
}
time.Sleep(10 * time.Millisecond)
}
}()
select {
case res := <-c2:
utils.Println(streamPath + " is published OnDemand '" + res + "'")
case <-time.After(waitTimeout):
//fmt.Println("timeout 2")
}
return s
}
// Publish 直接发布
func Publish(streamPath, t string) *Stream {
var stream = &Stream{
@@ -110,8 +138,8 @@ type Stream struct {
AudioTracks Tracks
DataTracks Tracks
//AutoCloseAfter 当无人订阅时延迟N秒后自动停止发布
AutoCloseAfter *int
//AutoCloseDelay 当无人订阅时延迟N秒后自动停止发布
AutoCloseDelay *int
//Transcoding 转码配置,key:目标编码,value:发布者提供的编码
Transcoding map[string]string
@@ -158,12 +186,12 @@ func (r *Stream) Publish() bool {
if _, ok := Streams.m[r.StreamPath]; ok {
return false
}
if r.AutoCloseAfter == nil {
r.AutoCloseAfter = &config.AutoCloseAfter
if r.AutoCloseDelay == nil {
r.AutoCloseDelay = &config.AutoCloseDelay
}
var closeChann <-chan time.Time
if *r.AutoCloseAfter > 0 {
r.closeDelay = time.NewTimer(time.Duration(*r.AutoCloseAfter) * time.Second)
if *r.AutoCloseDelay > 0 {
r.closeDelay = time.NewTimer(time.Duration(*r.AutoCloseDelay) * time.Second)
r.closeDelay.Stop()
closeChann = r.closeDelay.C
}
@@ -243,7 +271,7 @@ func (r *Stream) Subscribe(s *Subscriber) {
utils.Print(Sprintf(Yellow("subscribe :%s %s,to Stream %s"), Blue(s.Type), Cyan(s.ID), BrightCyan(r.StreamPath)))
s.Context, s.cancel = context.WithCancel(r)
r.subscribeMutex.Lock()
if *r.AutoCloseAfter > 0 {
if *r.AutoCloseDelay > 0 {
r.closeDelay.Stop()
}
r.Subscribers = append(r.Subscribers, s)
@@ -264,11 +292,11 @@ func (r *Stream) UnSubscribe(s *Subscriber) {
utils.Print(Sprintf(Yellow("%s subscriber %s removed remains:%d"), BrightCyan(r.StreamPath), Cyan(s.ID), Blue(len(r.Subscribers))))
l := len(r.Subscribers)
TriggerHook(HOOK_UNSUBSCRIBE, s, l)
if l == 0 && *r.AutoCloseAfter >= 0 {
if *r.AutoCloseAfter == 0 {
if l == 0 && *r.AutoCloseDelay >= 0 {
if *r.AutoCloseDelay == 0 {
r.Close()
} else {
r.closeDelay.Reset(time.Duration(*r.AutoCloseAfter) * time.Second)
r.closeDelay.Reset(time.Duration(*r.AutoCloseDelay) * time.Second)
}
}
}
+14 -3
View File
@@ -51,12 +51,23 @@ func (s *Subscriber) Subscribe(streamPath string) error {
} else {
streamPath = u.Path
}
if stream := FindStream(streamPath); stream == nil {
return errors.Errorf("subscribe %s faild :stream not found", streamPath)
} else {
var stream *Stream = nil
if stream = FindStream(streamPath); stream == nil {
if config.OnDemandPublishTimeout > 0 {
//TriggerHook(HOOK_ONDEMAND_PUBLISH, streamPath)
stream = WaitStream(streamPath, config.OnDemandPublishTimeout)
} else {
return errors.Errorf("subscribe %s faild :stream not found", streamPath)
}
}
if stream != nil {
if stream.Subscribe(s); s.Context == nil {
return errors.Errorf("subscribe %s faild :stream closed", streamPath)
}
} else {
return errors.Errorf("subscribe %s faild :stream not found", streamPath)
}
return nil
}