From 5e80554d20f492c2b14ad0276e3923c4e423abca Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Sat, 28 Feb 2026 18:39:52 +0100 Subject: [PATCH] improve playback precision of alwaysAvailable offline segment (#5530) --- docs/4-other/05-always-available.md | 2 +- internal/stream/offline_sub_stream_track.go | 39 ++++++++++++--------- 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/docs/4-other/05-always-available.md b/docs/4-other/05-always-available.md index 8231ff9e..3bd4dfe3 100644 --- a/docs/4-other/05-always-available.md +++ b/docs/4-other/05-always-available.md @@ -1,6 +1,6 @@ # Always-available streams -When the publisher or source of a stream is offline, the server can be configured to fill gaps in the stream with an offline segment that is played on repeat until a publisher comes back online. This allows readers to stay connected regardless of the state of the stream. The offline segment and any future online stream are concatenated without decoding or re-encoding packets, using the original codec. +When the publisher or source of a stream is offline, the server can be configured to fill gaps in the stream with an offline segment that is played on repeat until a publisher comes back online. This allows readers to stay connected regardless of the state of the stream. The offline segment and online stream are concatenated without re-encoding any frame, using the original codec. This feature can be enabled by toggling the `alwaysAvailable` flag and filling `alwaysAvailableTracks`: diff --git a/internal/stream/offline_sub_stream_track.go b/internal/stream/offline_sub_stream_track.go index dbf6dfe7..b184f67f 100644 --- a/internal/stream/offline_sub_stream_track.go +++ b/internal/stream/offline_sub_stream_track.go @@ -50,9 +50,6 @@ func (t *offlineSubStreamTrack) initialize() { func (t *offlineSubStreamTrack) run() { defer t.wg.Done() - var pts int64 - systemTime := time.Now() - if t.file != "" { f, err := os.Open(t.file) if err != nil { @@ -60,7 +57,7 @@ func (t *offlineSubStreamTrack) run() { } defer f.Close() - err = t.runFile(pts, systemTime, f, t.pos) + err = t.runFile(f, t.pos) if err != nil { panic(err) } @@ -68,12 +65,13 @@ func (t *offlineSubStreamTrack) run() { } const audioWritesPerSecond = 10 + var pts int64 + startSystemTime := time.Now() switch forma := t.format.(type) { case *format.Opus: unitsPerWrite := (forma.ClockRate() / 960) / audioWritesPerSecond writeDuration := 960 * int64(unitsPerWrite) - writeDurationGo := multiplyAndDivide2(time.Duration(writeDuration), time.Second, 48000) for { payload := make(unit.PayloadOpus, unitsPerWrite) @@ -88,7 +86,9 @@ func (t *offlineSubStreamTrack) run() { }) pts += writeDuration - systemTime = systemTime.Add(writeDurationGo) + + ptsGo := multiplyAndDivide2(time.Duration(pts), time.Second, 48000) + systemTime := startSystemTime.Add(ptsGo) if !t.sleep(systemTime) { return @@ -98,7 +98,6 @@ func (t *offlineSubStreamTrack) run() { case *format.MPEG4Audio: unitsPerWrite := (forma.ClockRate() / mpeg4audio.SamplesPerAccessUnit) / audioWritesPerSecond writeDuration := mpeg4audio.SamplesPerAccessUnit * int64(unitsPerWrite) - writeDurationGo := multiplyAndDivide2(time.Duration(writeDuration), time.Second, time.Duration(forma.ClockRate())) for { var frame []byte @@ -122,7 +121,9 @@ func (t *offlineSubStreamTrack) run() { }) pts += writeDuration - systemTime = systemTime.Add(writeDurationGo) + + ptsGo := multiplyAndDivide2(time.Duration(pts), time.Second, time.Duration(forma.ClockRate())) + systemTime := startSystemTime.Add(ptsGo) if !t.sleep(systemTime) { return @@ -132,7 +133,6 @@ func (t *offlineSubStreamTrack) run() { case *format.G711: samplesPerWrite := forma.ClockRate() / audioWritesPerSecond writeDuration := samplesPerWrite - writeDurationGo := multiplyAndDivide2(time.Duration(writeDuration), time.Second, time.Duration(forma.ClockRate())) for { var sample byte @@ -154,7 +154,9 @@ func (t *offlineSubStreamTrack) run() { }) pts += int64(writeDuration) - systemTime = systemTime.Add(writeDurationGo) + + ptsGo := multiplyAndDivide2(time.Duration(pts), time.Second, time.Duration(forma.ClockRate())) + systemTime := startSystemTime.Add(ptsGo) if !t.sleep(systemTime) { return @@ -164,7 +166,6 @@ func (t *offlineSubStreamTrack) run() { case *format.LPCM: samplesPerWrite := forma.ClockRate() / audioWritesPerSecond writeDuration := samplesPerWrite - writeDurationGo := multiplyAndDivide2(time.Duration(writeDuration), time.Second, time.Duration(forma.ClockRate())) for { payload := make(unit.PayloadLPCM, samplesPerWrite*forma.ChannelCount*(forma.BitDepth/8)) @@ -176,7 +177,9 @@ func (t *offlineSubStreamTrack) run() { }) pts += int64(writeDuration) - systemTime = systemTime.Add(writeDurationGo) + + ptsGo := multiplyAndDivide2(time.Duration(pts), time.Second, time.Duration(forma.ClockRate())) + systemTime := startSystemTime.Add(ptsGo) if !t.sleep(systemTime) { return @@ -205,14 +208,14 @@ func (t *offlineSubStreamTrack) run() { r := bytes.NewReader(buf) - err := t.runFile(pts, systemTime, r, 0) + err := t.runFile(r, 0) if err != nil { panic(err) } } } -func (t *offlineSubStreamTrack) runFile(pts int64, systemTime time.Time, r io.ReadSeeker, pos int) error { +func (t *offlineSubStreamTrack) runFile(r io.ReadSeeker, pos int) error { var presentation pmp4.Presentation err := presentation.Unmarshal(r) if err != nil { @@ -220,6 +223,8 @@ func (t *offlineSubStreamTrack) runFile(pts int64, systemTime time.Time, r io.Re } track := presentation.Tracks[pos] + var pts int64 + startSystemTime := time.Now() for { // in case of the embedded video, codec parameters are not in the description @@ -303,9 +308,9 @@ func (t *offlineSubStreamTrack) runFile(pts int64, systemTime time.Time, r io.Re pts += multiplyAndDivide(int64(sample.Duration)+int64(sample.PTSOffset), int64(t.format.ClockRate()), int64(track.TimeScale)) - durationGo := multiplyAndDivide2(time.Duration(int64(sample.Duration)+int64(sample.PTSOffset)), - time.Second, time.Duration(track.TimeScale)) - systemTime = systemTime.Add(durationGo) + + ptsGo := multiplyAndDivide2(time.Duration(pts), time.Second, time.Duration(t.format.ClockRate())) + systemTime := startSystemTime.Add(ptsGo) if !t.sleep(systemTime) { return nil