diff --git a/api.go b/api.go index 46de85b..153b9f8 100644 --- a/api.go +++ b/api.go @@ -132,7 +132,7 @@ func startApiServer(addr string) { if stream.AppConfig.GB28181.Enable { apiServer.router.HandleFunc("/ws/v1/gb28181/talk", apiServer.OnGBTalk) // 对讲的主讲人WebSocket连接 - apiServer.router.HandleFunc("/api/v1/control/ws-talk/{device}/{channel}", apiServer.OnGBTalk) // 对讲的主讲人WebSocket连接 + apiServer.router.HandleFunc("/api/v1/control/ws-talk/{device}/{channel}", apiServer.OnLiveGBSTalk) // livegbs一对一对讲 apiServer.router.HandleFunc("/api/v1/gb28181/source/create", withJsonParams(apiServer.OnGBOfferCreate, &SourceSDP{})) // 创建国标源 apiServer.router.HandleFunc("/api/v1/gb28181/answer/set", withJsonParams(apiServer.OnGBSourceConnect, &SourceSDP{})) // 设置应答sdp, 如果是active模式拉流, 设置对方的地址. 下载文件设置文件大小 apiServer.router.HandleFunc("/api/v1/gb28181/speed/set", withJsonParams(apiServer.OnGBSpeedSet, &SourceSDP{})) diff --git a/api_gb.go b/api_gb.go index cab197f..28285bf 100644 --- a/api_gb.go +++ b/api_gb.go @@ -310,9 +310,7 @@ func (api *ApiServer) OnLiveGBSTalk(w http.ResponseWriter, r *http.Request) { return } - // 获取id id := device + "/" + channel + ".broadcast" - talkSource := gb28181.NewTalkSource(id, conn) talkSource.Init() talkSource.SetUrlValues(r.Form) @@ -349,9 +347,9 @@ func (api *ApiServer) OnLiveGBSTalk(w http.ResponseWriter, r *http.Request) { // base64解密 var pcmN int - pcmN, err = base64.StdEncoding.Decode(bytes, pcm) - if err == nil { - log.Sugar.Errorf(err.Error()) + pcmN, err = base64.StdEncoding.Decode(pcm, bytes) + if err != nil { + log.Sugar.Errorf("base64解密失败, source: %s err: %s", id, err.Error()) continue } diff --git a/stream/stream_publisher.go b/stream/stream_publisher.go index 48b921d..e7bc1f5 100644 --- a/stream/stream_publisher.go +++ b/stream/stream_publisher.go @@ -85,6 +85,7 @@ type transStreamPublisher struct { source string streamEvents *NonBlockingChannel[*StreamEvent] mainContextEvents chan func() + earlyEvents collections.LinkedList[func()] // 早于启动前的事件, 等待启动后执行 sinkCount int // 拉流计数 gopBuffer GOPBuffer // GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop @@ -108,6 +109,8 @@ type transStreamPublisher struct { streamEndInfo *StreamEndInfo // 上次结束推流的信息 lastStreamEndTime time.Time // 最近结束拉流的时间 bitstreamFilterBuffer *collections.RBBlockBuffer // annexb和avcc转换的缓冲区 + mute sync.Mutex + started bool } func (t *transStreamPublisher) Post(event *StreamEvent) { @@ -157,6 +160,9 @@ func (t *transStreamPublisher) run() { } func (t *transStreamPublisher) start() { + t.mute.Lock() + defer t.mute.Unlock() + t.streamEvents = NewNonBlockingChannel[*StreamEvent](256) t.mainContextEvents = make(chan func(), 256) @@ -166,10 +172,26 @@ func (t *transStreamPublisher) start() { t.transcodeTracks = make(map[utils.AVCodecID]*TranscodeTrack, 4) go t.run() + t.started = true + + // 放置先于启动的事件到主管道 + for t.earlyEvents.Size() > 0 { + t.mainContextEvents <- t.earlyEvents.Remove(0) + } } func (t *transStreamPublisher) PostEvent(cb func()) { - t.mainContextEvents <- cb + if t.started { + t.mainContextEvents <- cb + return + } + + // 早于启动前的事件, 添加到等待队列 + t.mute.Lock() + defer t.mute.Unlock() + if !t.started { + t.earlyEvents.Add(cb) + } } func (t *transStreamPublisher) ExecuteSyncEvent(cb func()) {