diff --git a/internal/servers/hls/muxer.go b/internal/servers/hls/muxer.go index 8d73994b..5c5b2e90 100644 --- a/internal/servers/hls/muxer.go +++ b/internal/servers/hls/muxer.go @@ -44,10 +44,6 @@ type muxerGetInstanceRes struct { cumulatedOutboundFramesDiscarded uint64 } -type muxerGetInstanceReq struct { - res chan muxerGetInstanceRes -} - type muxer struct { parentCtx context.Context remoteAddr string @@ -71,8 +67,9 @@ type muxer struct { lastRequestTime atomic.Int64 bytesSent atomic.Uint64 - // in - chGetInstance chan muxerGetInstanceReq + instanceMutex sync.RWMutex + instance *muxerInstance + cumulatedOutboundFramesDiscarded uint64 } func (m *muxer) initialize() { @@ -83,7 +80,6 @@ func (m *muxer) initialize() { m.created = time.Now() m.lastRequestTime.Store(time.Now().UnixNano()) m.bytesSent.Store(0) - m.chGetInstance = make(chan muxerGetInstanceReq) m.Log(logger.Info, "created %s", func() string { if m.remoteAddr == "" { @@ -92,6 +88,9 @@ func (m *muxer) initialize() { return "(requested by " + m.remoteAddr + ")" }()) + // block first request to getInstance() until the first instance is available + m.instanceMutex.Lock() + m.wg.Add(1) go m.run() } @@ -132,6 +131,7 @@ func (m *muxer) runInner() error { }, }) if err != nil { + m.instanceMutex.Unlock() return err } @@ -139,27 +139,30 @@ func (m *muxer) runInner() error { defer m.path.RemoveReader(defs.PathRemoveReaderReq{Author: m}) - mi, err := m.createInstance(res.Stream) + tmp, err := m.createInstance(res.Stream) if err != nil { if m.remoteAddr != "" || errors.Is(err, hls.ErrNoSupportedCodecs) { + m.instanceMutex.Unlock() return err } m.Log(logger.Error, err.Error()) - mi = nil } + m.instance = tmp + m.instanceMutex.Unlock() + defer func() { - if mi != nil { - mi.close() + if m.instance != nil { + m.closeInstance() } }() var instanceError chan error var recreateTimer *time.Timer - if mi != nil { - instanceError = mi.errorChan() + if m.instance != nil { + instanceError = m.instance.errorChan() recreateTimer = emptyTimer() } else { instanceError = make(chan error) @@ -173,36 +176,29 @@ func (m *muxer) runInner() error { activityCheckTimer = emptyTimer() } - cumulatedOutboundFramesDiscarded := uint64(0) - for { select { - case req := <-m.chGetInstance: - req.res <- muxerGetInstanceRes{ - instance: mi, - cumulatedOutboundFramesDiscarded: cumulatedOutboundFramesDiscarded, - } - case err = <-instanceError: if m.remoteAddr != "" { return err } m.Log(logger.Error, err.Error()) - mi.close() - cumulatedOutboundFramesDiscarded += mi.reader.OutboundFramesDiscarded() - mi = nil + m.closeInstance() instanceError = make(chan error) recreateTimer = time.NewTimer(recreatePause) case <-recreateTimer.C: - mi, err = m.createInstance(res.Stream) + tmp, err = m.createInstance(res.Stream) if err != nil { m.Log(logger.Error, err.Error()) - mi = nil recreateTimer = time.NewTimer(recreatePause) } else { - instanceError = mi.errorChan() + m.instanceMutex.Lock() + m.instance = tmp + m.instanceMutex.Unlock() + + instanceError = m.instance.errorChan() } case <-activityCheckTimer.C: @@ -218,6 +214,16 @@ func (m *muxer) runInner() error { } } +func (m *muxer) closeInstance() { + m.instanceMutex.Lock() + m.cumulatedOutboundFramesDiscarded += m.instance.reader.OutboundFramesDiscarded() + var tmp *muxerInstance + tmp, m.instance = m.instance, nil + m.instanceMutex.Unlock() + + tmp.close() +} + func (m *muxer) createInstance(strm *stream.Stream) (*muxerInstance, error) { mi := &muxerInstance{ variant: m.variant, @@ -235,14 +241,12 @@ func (m *muxer) createInstance(strm *stream.Stream) (*muxerInstance, error) { } func (m *muxer) getInstance() muxerGetInstanceRes { - req := muxerGetInstanceReq{res: make(chan muxerGetInstanceRes)} + m.instanceMutex.RLock() + defer m.instanceMutex.RUnlock() - select { - case m.chGetInstance <- req: - return <-req.res - - case <-m.ctx.Done(): - return muxerGetInstanceRes{} + return muxerGetInstanceRes{ + instance: m.instance, + cumulatedOutboundFramesDiscarded: m.cumulatedOutboundFramesDiscarded, } }