webrtc: allow tuning WHEP timeouts (#5027) (#5479)

add whepHandshakeTimeout, whepTrackGatherTimeout, whepSTUNGatherTimeout
This commit is contained in:
Alessandro Ros
2026-02-18 16:29:25 +01:00
committed by GitHub
parent ffec1e48d4
commit f991821a5f
20 changed files with 201 additions and 206 deletions
+10 -2
View File
@@ -339,12 +339,12 @@ components:
type: array
items:
$ref: '#/components/schemas/WebRTCICEServer'
webrtcSTUNGatherTimeout:
type: string
webrtcHandshakeTimeout:
type: string
webrtcTrackGatherTimeout:
type: string
webrtcSTUNGatherTimeout:
type: string
# SRT server
srt:
@@ -430,6 +430,14 @@ components:
rtpSDP:
type: string
# WHEP source
whepSTUNGatherTimeout:
type: string
whepHandshakeTimeout:
type: string
whepTrackGatherTimeout:
type: string
# Redirect source
sourceRedirect:
type: string
+2 -2
View File
@@ -380,9 +380,9 @@ type Conf struct {
WebRTCIPsFromInterfacesList []string `json:"webrtcIPsFromInterfacesList"`
WebRTCAdditionalHosts []string `json:"webrtcAdditionalHosts"`
WebRTCICEServers2 []WebRTCICEServer `json:"webrtcICEServers2"`
WebRTCSTUNGatherTimeout Duration `json:"webrtcSTUNGatherTimeout"`
WebRTCHandshakeTimeout Duration `json:"webrtcHandshakeTimeout"`
WebRTCTrackGatherTimeout Duration `json:"webrtcTrackGatherTimeout"`
WebRTCSTUNGatherTimeout Duration `json:"webrtcSTUNGatherTimeout"`
WebRTCICEUDPMuxAddress *string `json:"webrtcICEUDPMuxAddress,omitempty"` // deprecated
WebRTCICETCPMuxAddress *string `json:"webrtcICETCPMuxAddress,omitempty"` // deprecated
WebRTCICEHostNAT1To1IPs *[]string `json:"webrtcICEHostNAT1To1IPs,omitempty"` // deprecated
@@ -513,9 +513,9 @@ func (conf *Conf) setDefaults() {
conf.WebRTCAllowOrigins = []string{"*"}
conf.WebRTCLocalUDPAddress = ":8189"
conf.WebRTCIPsFromInterfaces = true
conf.WebRTCSTUNGatherTimeout = 5 * Duration(time.Second)
conf.WebRTCHandshakeTimeout = 10 * Duration(time.Second)
conf.WebRTCTrackGatherTimeout = 2 * Duration(time.Second)
conf.WebRTCSTUNGatherTimeout = 5 * Duration(time.Second)
// SRT server
conf.SRT = true
+3
View File
@@ -63,6 +63,9 @@ func TestConfFromFile(t *testing.T) {
RecordSegmentDuration: 3600000000000,
RecordDeleteAfter: 86400000000000,
RTSPUDPSourcePortRange: []uint{10000, 65535},
WHEPSTUNGatherTimeout: 5 * Duration(time.Second),
WHEPHandshakeTimeout: 10 * Duration(time.Second),
WHEPTrackGatherTimeout: 2 * Duration(time.Second),
RPICameraWidth: 1920,
RPICameraHeight: 1080,
RPICameraContrast: 1,
+10
View File
@@ -196,6 +196,11 @@ type Path struct {
RTPSDP string `json:"rtpSDP"`
RTPUDPReadBufferSize *uint `json:"rtpUDPReadBufferSize,omitempty"` // deprecated
// WHEP source
WHEPSTUNGatherTimeout Duration `json:"whepSTUNGatherTimeout"`
WHEPHandshakeTimeout Duration `json:"whepHandshakeTimeout"`
WHEPTrackGatherTimeout Duration `json:"whepTrackGatherTimeout"`
// Redirect source
SourceRedirect string `json:"sourceRedirect"`
@@ -291,6 +296,11 @@ func (pconf *Path) setDefaults() {
// RTSP source
pconf.RTSPUDPSourcePortRange = []uint{10000, 65535}
// WHEP source
pconf.WHEPSTUNGatherTimeout = Duration(5 * time.Second)
pconf.WHEPHandshakeTimeout = Duration(10 * time.Second)
pconf.WHEPTrackGatherTimeout = Duration(2 * time.Second)
// Raspberry Pi Camera source
pconf.RPICameraWidth = 1920
pconf.RPICameraHeight = 1080
+18 -18
View File
@@ -364,24 +364,24 @@ func TestAPIProtocolListGet(t *testing.T) {
"srt",
} {
t.Run(ca, func(t *testing.T) {
conf := "api: yes\n"
cnf := "api: yes\n"
switch ca {
case "rtsps conns", "rtsps sessions":
conf += "rtspEncryption: strict\n" +
cnf += "rtspEncryption: strict\n" +
"rtspServerCert: " + serverCertFpath + "\n" +
"rtspServerKey: " + serverKeyFpath + "\n"
case "rtmps":
conf += "rtmpEncryption: strict\n" +
cnf += "rtmpEncryption: strict\n" +
"rtmpServerCert: " + serverCertFpath + "\n" +
"rtmpServerKey: " + serverKeyFpath + "\n"
}
conf += "paths:\n" +
cnf += "paths:\n" +
" all_others:\n"
p, ok := newInstance(conf)
p, ok := newInstance(cnf)
require.Equal(t, true, ok)
defer p.Close()
@@ -893,25 +893,25 @@ func TestAPIProtocolGetNotFound(t *testing.T) {
"srt",
} {
t.Run(ca, func(t *testing.T) {
conf := "api: yes\n"
cnf := "api: yes\n"
switch ca {
case "rtsps conns", "rtsps sessions":
conf += "rtspTransports: [tcp]\n" +
cnf += "rtspTransports: [tcp]\n" +
"rtspEncryption: strict\n" +
"rtspServerCert: " + serverCertFpath + "\n" +
"rtspServerKey: " + serverKeyFpath + "\n"
case "rtmps":
conf += "rtmpEncryption: strict\n" +
cnf += "rtmpEncryption: strict\n" +
"rtmpServerCert: " + serverCertFpath + "\n" +
"rtmpServerKey: " + serverKeyFpath + "\n"
}
conf += "paths:\n" +
cnf += "paths:\n" +
" all_others:\n"
p, ok := newInstance(conf)
p, ok := newInstance(cnf)
require.Equal(t, true, ok)
defer p.Close()
@@ -993,19 +993,19 @@ func TestAPIProtocolKick(t *testing.T) {
"srt",
} {
t.Run(ca, func(t *testing.T) {
conf := "api: yes\n"
cnf := "api: yes\n"
if ca == "rtsps" {
conf += "rtspTransports: [tcp]\n" +
cnf += "rtspTransports: [tcp]\n" +
"rtspEncryption: strict\n" +
"rtspServerCert: " + serverCertFpath + "\n" +
"rtspServerKey: " + serverKeyFpath + "\n"
}
conf += "paths:\n" +
cnf += "paths:\n" +
" all_others:\n"
p, ok := newInstance(conf)
p, ok := newInstance(cnf)
require.Equal(t, true, ok)
defer p.Close()
@@ -1171,19 +1171,19 @@ func TestAPIProtocolKickNotFound(t *testing.T) {
"srt",
} {
t.Run(ca, func(t *testing.T) {
conf := "api: yes\n"
cnf := "api: yes\n"
if ca == "rtsps" {
conf += "rtspTransports: [tcp]\n" +
cnf += "rtspTransports: [tcp]\n" +
"rtspEncryption: strict\n" +
"rtspServerCert: " + serverCertFpath + "\n" +
"rtspServerKey: " + serverKeyFpath + "\n"
}
conf += "paths:\n" +
cnf += "paths:\n" +
" all_others:\n"
p, ok := newInstance(conf)
p, ok := newInstance(cnf)
require.Equal(t, true, ok)
defer p.Close()
+2 -2
View File
@@ -627,8 +627,8 @@ func (p *Core) createResources(initial bool) error {
IPsFromInterfacesList: p.conf.WebRTCIPsFromInterfacesList,
AdditionalHosts: p.conf.WebRTCAdditionalHosts,
ICEServers: p.conf.WebRTCICEServers2,
HandshakeTimeout: p.conf.WebRTCHandshakeTimeout,
STUNGatherTimeout: p.conf.WebRTCSTUNGatherTimeout,
HandshakeTimeout: p.conf.WebRTCHandshakeTimeout,
TrackGatherTimeout: p.conf.WebRTCTrackGatherTimeout,
ExternalCmdPool: p.externalCmdPool,
Metrics: p.metrics,
@@ -911,8 +911,8 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
!reflect.DeepEqual(newConf.WebRTCIPsFromInterfacesList, p.conf.WebRTCIPsFromInterfacesList) ||
!reflect.DeepEqual(newConf.WebRTCAdditionalHosts, p.conf.WebRTCAdditionalHosts) ||
!reflect.DeepEqual(newConf.WebRTCICEServers2, p.conf.WebRTCICEServers2) ||
newConf.WebRTCHandshakeTimeout != p.conf.WebRTCHandshakeTimeout ||
newConf.WebRTCSTUNGatherTimeout != p.conf.WebRTCSTUNGatherTimeout ||
newConf.WebRTCHandshakeTimeout != p.conf.WebRTCHandshakeTimeout ||
newConf.WebRTCTrackGatherTimeout != p.conf.WebRTCTrackGatherTimeout ||
closeMetrics ||
closePathManager ||
+8 -8
View File
@@ -789,30 +789,30 @@ func TestPathFallback(t *testing.T) {
"source",
} {
t.Run(ca, func(t *testing.T) {
var conf string
var cnf string
switch ca {
case "absolute":
conf = "paths:\n" +
cnf = "paths:\n" +
" path1:\n" +
" fallback: rtsp://localhost:8554/path2\n" +
" path2:\n"
case "relative":
conf = "paths:\n" +
cnf = "paths:\n" +
" path1:\n" +
" fallback: /path2\n" +
" path2:\n"
case "source":
conf = "paths:\n" +
cnf = "paths:\n" +
" path1:\n" +
" fallback: /path2\n" +
" source: rtsp://localhost:3333/nonexistent\n" +
" path2:\n"
}
p1, ok := newInstance(conf)
p1, ok := newInstance(cnf)
require.Equal(t, true, ok)
defer p1.Close()
@@ -911,15 +911,15 @@ func TestPathOverridePublisher(t *testing.T) {
"disabled",
} {
t.Run(ca, func(t *testing.T) {
conf := "rtmp: no\n" +
cnf := "rtmp: no\n" +
"paths:\n" +
" all_others:\n"
if ca == "disabled" {
conf += " overridePublisher: no\n"
cnf += " overridePublisher: no\n"
}
p, ok := newInstance(conf)
p, ok := newInstance(cnf)
require.Equal(t, true, ok)
defer p.Close()
+11 -16
View File
@@ -7,7 +7,6 @@ import (
"github.com/bluenviron/gortsplib/v5/pkg/description"
"github.com/bluenviron/gortsplib/v5/pkg/format"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/bluenviron/mediamtx/internal/test"
@@ -112,24 +111,20 @@ func TestFromStreamResampleOpus(t *testing.T) {
require.NoError(t, err)
pc1 := &PeerConnection{
LocalRandomUDP: true,
IPsFromInterfaces: true,
HandshakeTimeout: conf.Duration(10 * time.Second),
TrackGatherTimeout: conf.Duration(2 * time.Second),
Publish: false,
Log: test.NilLogger,
LocalRandomUDP: true,
IPsFromInterfaces: true,
Publish: false,
Log: test.NilLogger,
}
err = pc1.Start()
require.NoError(t, err)
defer pc1.Close()
pc2 := &PeerConnection{
LocalRandomUDP: true,
IPsFromInterfaces: true,
HandshakeTimeout: conf.Duration(10 * time.Second),
TrackGatherTimeout: conf.Duration(2 * time.Second),
Publish: true,
Log: test.NilLogger,
LocalRandomUDP: true,
IPsFromInterfaces: true,
Publish: true,
Log: test.NilLogger,
}
r := &stream.Reader{Parent: nil}
@@ -150,10 +145,10 @@ func TestFromStreamResampleOpus(t *testing.T) {
err = pc1.SetAnswer(answer)
require.NoError(t, err)
err = pc1.WaitUntilConnected()
err = pc1.WaitUntilConnected(10 * time.Second)
require.NoError(t, err)
err = pc2.WaitUntilConnected()
err = pc2.WaitUntilConnected(10 * time.Second)
require.NoError(t, err)
strm.AddReader(r)
@@ -191,7 +186,7 @@ func TestFromStreamResampleOpus(t *testing.T) {
}},
})
err = pc1.GatherIncomingTracks()
err = pc1.GatherIncomingTracks(2 * time.Second)
require.NoError(t, err)
tracks := pc1.IncomingTracks()
+10 -9
View File
@@ -18,7 +18,6 @@ import (
"github.com/pion/sdp/v3"
"github.com/pion/webrtc/v4"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/logger"
)
@@ -140,9 +139,7 @@ type PeerConnection struct {
IPsFromInterfaces bool
IPsFromInterfacesList []string
AdditionalHosts []string
HandshakeTimeout conf.Duration
TrackGatherTimeout conf.Duration
STUNGatherTimeout conf.Duration
STUNGatherTimeout time.Duration
Publish bool
OutgoingTracks []*OutgoingTrack
OutgoingDataChannels []*OutgoingDataChannel
@@ -167,6 +164,10 @@ type PeerConnection struct {
// Start starts the peer connection.
func (co *PeerConnection) Start() error {
if co.STUNGatherTimeout == 0 {
co.STUNGatherTimeout = 5 * time.Second
}
settingsEngine := webrtc.SettingEngine{}
settingsEngine.SetIncludeLoopbackCandidate(true)
@@ -191,7 +192,7 @@ func (co *PeerConnection) Start() error {
settingsEngine.SetICETCPMux(co.ICETCPMux.Mux)
}
settingsEngine.SetSTUNGatherTimeout(time.Duration(co.STUNGatherTimeout))
settingsEngine.SetSTUNGatherTimeout(co.STUNGatherTimeout)
webrtcNet := &webrtcNet{
udpReadBufferSize: int(co.UDPReadBufferSize),
@@ -669,8 +670,8 @@ func (co *PeerConnection) waitGatheringDone() error {
}
// WaitUntilConnected waits until connection is established.
func (co *PeerConnection) WaitUntilConnected() error {
t := time.NewTimer(time.Duration(co.HandshakeTimeout))
func (co *PeerConnection) WaitUntilConnected(timeout time.Duration) error {
t := time.NewTimer(timeout)
defer t.Stop()
outer:
@@ -691,13 +692,13 @@ outer:
}
// GatherIncomingTracks gathers incoming tracks.
func (co *PeerConnection) GatherIncomingTracks() error {
func (co *PeerConnection) GatherIncomingTracks(timeout time.Duration) error {
var sdp sdp.SessionDescription
sdp.Unmarshal([]byte(co.wr.RemoteDescription().SDP)) //nolint:errcheck
maxTrackCount := len(sdp.MediaDescriptions)
t := time.NewTimer(time.Duration(co.TrackGatherTimeout))
t := time.NewTimer(timeout)
defer t.Stop()
for {
@@ -8,7 +8,6 @@ import (
"testing"
"time"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/test"
"github.com/pion/ice/v4"
"github.com/pion/logging"
@@ -37,13 +36,10 @@ func gatherCodecs(tracks []*IncomingTrack) []webrtc.RTPCodecParameters {
func TestPeerConnectionCloseImmediately(t *testing.T) {
pc := &PeerConnection{
LocalRandomUDP: true,
IPsFromInterfaces: true,
HandshakeTimeout: conf.Duration(10 * time.Second),
TrackGatherTimeout: conf.Duration(2 * time.Second),
STUNGatherTimeout: conf.Duration(5 * time.Second),
Publish: false,
Log: test.NilLogger,
LocalRandomUDP: true,
IPsFromInterfaces: true,
Publish: false,
Log: test.NilLogger,
}
err := pc.Start()
require.NoError(t, err)
@@ -115,8 +111,6 @@ func TestPeerConnectionCandidates(t *testing.T) {
ICETCPMux: tcpMux,
IPsFromInterfaces: true,
IPsFromInterfacesList: []string{"lo"},
HandshakeTimeout: conf.Duration(10 * time.Second),
TrackGatherTimeout: conf.Duration(2 * time.Second),
Log: test.NilLogger,
}
@@ -188,8 +182,6 @@ func TestPeerConnectionConnectivity(t *testing.T) {
IPsFromInterfaces: true,
IPsFromInterfacesList: []string{"lo"},
ICEServers: iceServers,
HandshakeTimeout: conf.Duration(10 * time.Second),
TrackGatherTimeout: conf.Duration(2 * time.Second),
Log: test.NilLogger,
}
err := clientPC.Start()
@@ -219,13 +211,11 @@ func TestPeerConnectionConnectivity(t *testing.T) {
}
serverPC := &PeerConnection{
LocalRandomUDP: (mode == "active udp"),
ICEUDPMux: udpMux,
ICETCPMux: tcpMux,
ICEServers: iceServers,
HandshakeTimeout: conf.Duration(10 * time.Second),
TrackGatherTimeout: conf.Duration(2 * time.Second),
Publish: true,
LocalRandomUDP: (mode == "active udp"),
ICEUDPMux: udpMux,
ICETCPMux: tcpMux,
ICEServers: iceServers,
Publish: true,
OutgoingTracks: []*OutgoingTrack{{
Caps: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeAV1,
@@ -270,7 +260,7 @@ func TestPeerConnectionConnectivity(t *testing.T) {
}
}()
err = serverPC.WaitUntilConnected()
err = serverPC.WaitUntilConnected(10 * time.Second)
require.NoError(t, err)
})
}
@@ -307,12 +297,10 @@ func TestPeerConnectionRead(t *testing.T) {
require.NoError(t, err)
reader := &PeerConnection{
LocalRandomUDP: true,
IPsFromInterfaces: true,
HandshakeTimeout: conf.Duration(10 * time.Second),
TrackGatherTimeout: conf.Duration(2 * time.Second),
Publish: false,
Log: test.NilLogger,
LocalRandomUDP: true,
IPsFromInterfaces: true,
Publish: false,
Log: test.NilLogger,
}
err = reader.Start()
require.NoError(t, err)
@@ -330,7 +318,7 @@ func TestPeerConnectionRead(t *testing.T) {
err = pub.SetRemoteDescription(*answer)
require.NoError(t, err)
err = reader.WaitUntilConnected()
err = reader.WaitUntilConnected(10 * time.Second)
require.NoError(t, err)
go func() {
@@ -363,7 +351,7 @@ func TestPeerConnectionRead(t *testing.T) {
require.NoError(t, err2)
}()
err = reader.GatherIncomingTracks()
err = reader.GatherIncomingTracks(2 * time.Second)
require.NoError(t, err)
codecs := gatherCodecs(reader.IncomingTracks())
@@ -428,23 +416,19 @@ func TestPeerConnectionRead(t *testing.T) {
func TestPeerConnectionPublishRead(t *testing.T) {
pc1 := &PeerConnection{
LocalRandomUDP: true,
IPsFromInterfaces: true,
HandshakeTimeout: conf.Duration(10 * time.Second),
TrackGatherTimeout: conf.Duration(2 * time.Second),
Publish: false,
Log: test.NilLogger,
LocalRandomUDP: true,
IPsFromInterfaces: true,
Publish: false,
Log: test.NilLogger,
}
err := pc1.Start()
require.NoError(t, err)
defer pc1.Close()
pc2 := &PeerConnection{
LocalRandomUDP: true,
IPsFromInterfaces: true,
HandshakeTimeout: conf.Duration(10 * time.Second),
TrackGatherTimeout: conf.Duration(2 * time.Second),
Publish: true,
LocalRandomUDP: true,
IPsFromInterfaces: true,
Publish: true,
OutgoingTracks: []*OutgoingTrack{
{
Caps: webrtc.RTPCodecCapability{
@@ -475,10 +459,10 @@ func TestPeerConnectionPublishRead(t *testing.T) {
err = pc1.SetAnswer(answer)
require.NoError(t, err)
err = pc1.WaitUntilConnected()
err = pc1.WaitUntilConnected(10 * time.Second)
require.NoError(t, err)
err = pc2.WaitUntilConnected()
err = pc2.WaitUntilConnected(10 * time.Second)
require.NoError(t, err)
for _, track := range pc2.OutgoingTracks {
@@ -496,7 +480,7 @@ func TestPeerConnectionPublishRead(t *testing.T) {
require.NoError(t, err)
}
err = pc1.GatherIncomingTracks()
err = pc1.GatherIncomingTracks(2 * time.Second)
require.NoError(t, err)
codecs := gatherCodecs(pc1.IncomingTracks())
@@ -531,23 +515,19 @@ func TestPeerConnectionPublishRead(t *testing.T) {
// test that an audio codec is present regardless of the fact that an audio track is.
func TestPeerConnectionFallbackCodecs(t *testing.T) {
pc1 := &PeerConnection{
LocalRandomUDP: true,
IPsFromInterfaces: true,
HandshakeTimeout: conf.Duration(10 * time.Second),
TrackGatherTimeout: conf.Duration(2 * time.Second),
Publish: false,
Log: test.NilLogger,
LocalRandomUDP: true,
IPsFromInterfaces: true,
Publish: false,
Log: test.NilLogger,
}
err := pc1.Start()
require.NoError(t, err)
defer pc1.Close()
pc2 := &PeerConnection{
LocalRandomUDP: true,
IPsFromInterfaces: true,
HandshakeTimeout: conf.Duration(10 * time.Second),
TrackGatherTimeout: conf.Duration(2 * time.Second),
Publish: true,
LocalRandomUDP: true,
IPsFromInterfaces: true,
Publish: true,
OutgoingTracks: []*OutgoingTrack{{
Caps: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeAV1,
@@ -621,12 +601,9 @@ func TestPeerConnectionPublishDataChannel(t *testing.T) {
require.NoError(t, err)
pc2 := &PeerConnection{
LocalRandomUDP: true,
IPsFromInterfaces: true,
HandshakeTimeout: conf.Duration(10 * time.Second),
TrackGatherTimeout: conf.Duration(2 * time.Second),
STUNGatherTimeout: conf.Duration(5 * time.Second),
Publish: true,
LocalRandomUDP: true,
IPsFromInterfaces: true,
Publish: true,
OutgoingDataChannels: []*OutgoingDataChannel{
{
Label: "test-channel",
@@ -644,7 +621,7 @@ func TestPeerConnectionPublishDataChannel(t *testing.T) {
err = pc1.SetRemoteDescription(*answer)
require.NoError(t, err)
err = pc2.WaitUntilConnected()
err = pc2.WaitUntilConnected(10 * time.Second)
require.NoError(t, err)
<-dataChanCreated
+10 -14
View File
@@ -335,11 +335,9 @@ func TestToStream(t *testing.T) {
for _, ca := range toFromStreamCases {
t.Run(ca.name, func(t *testing.T) {
pc1 := &PeerConnection{
LocalRandomUDP: true,
IPsFromInterfaces: true,
HandshakeTimeout: conf.Duration(10 * time.Second),
TrackGatherTimeout: conf.Duration(2 * time.Second),
Publish: true,
LocalRandomUDP: true,
IPsFromInterfaces: true,
Publish: true,
OutgoingTracks: []*OutgoingTrack{{
Caps: ca.webrtcCaps,
}},
@@ -350,12 +348,10 @@ func TestToStream(t *testing.T) {
defer pc1.Close()
pc2 := &PeerConnection{
LocalRandomUDP: true,
IPsFromInterfaces: true,
HandshakeTimeout: conf.Duration(10 * time.Second),
TrackGatherTimeout: conf.Duration(2 * time.Second),
Publish: false,
Log: test.NilLogger,
LocalRandomUDP: true,
IPsFromInterfaces: true,
Publish: false,
Log: test.NilLogger,
}
err = pc2.Start()
require.NoError(t, err)
@@ -383,10 +379,10 @@ func TestToStream(t *testing.T) {
}
}()
err = pc1.WaitUntilConnected()
err = pc1.WaitUntilConnected(10 * time.Second)
require.NoError(t, err)
err = pc2.WaitUntilConnected()
err = pc2.WaitUntilConnected(10 * time.Second)
require.NoError(t, err)
err = pc1.OutgoingTracks[0].WriteRTP(&rtp.Packet{
@@ -402,7 +398,7 @@ func TestToStream(t *testing.T) {
})
require.NoError(t, err)
err = pc2.GatherIncomingTracks()
err = pc2.GatherIncomingTracks(2 * time.Second)
require.NoError(t, err)
var subStream *stream.SubStream
+29 -23
View File
@@ -13,25 +13,22 @@ import (
"github.com/pion/sdp/v3"
pwebrtc "github.com/pion/webrtc/v4"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/protocols/httpp"
"github.com/bluenviron/mediamtx/internal/protocols/webrtc"
)
const (
handshakeTimeout = 10 * time.Second
trackGatherTimeout = 2 * time.Second
)
// Client is a WHIP client.
type Client struct {
URL *url.URL
Publish bool
OutgoingTracks []*webrtc.OutgoingTrack
HTTPClient *http.Client
UDPReadBufferSize uint
Log logger.Writer
URL *url.URL
Publish bool
OutgoingTracks []*webrtc.OutgoingTrack
HTTPClient *http.Client
UDPReadBufferSize uint
STUNGatherTimeout time.Duration
HandshakeTimeout time.Duration
TrackGatherTimeout time.Duration
Log logger.Writer
pc *webrtc.PeerConnection
patchIsSupported bool
@@ -39,21 +36,30 @@ type Client struct {
// Initialize initializes the Client.
func (c *Client) Initialize(ctx context.Context) error {
if c.STUNGatherTimeout == 0 {
c.STUNGatherTimeout = 5 * time.Second
}
if c.HandshakeTimeout == 0 {
c.HandshakeTimeout = 10 * time.Second
}
if c.TrackGatherTimeout == 0 {
c.TrackGatherTimeout = 2 * time.Second
}
iceServers, err := c.optionsICEServers(ctx)
if err != nil {
return err
}
c.pc = &webrtc.PeerConnection{
UDPReadBufferSize: c.UDPReadBufferSize,
LocalRandomUDP: true,
ICEServers: iceServers,
IPsFromInterfaces: true,
HandshakeTimeout: conf.Duration(10 * time.Second),
TrackGatherTimeout: conf.Duration(2 * time.Second),
Publish: c.Publish,
OutgoingTracks: c.OutgoingTracks,
Log: c.Log,
UDPReadBufferSize: c.UDPReadBufferSize,
LocalRandomUDP: true,
ICEServers: iceServers,
IPsFromInterfaces: true,
Publish: c.Publish,
STUNGatherTimeout: c.STUNGatherTimeout,
OutgoingTracks: c.OutgoingTracks,
Log: c.Log,
}
err = c.pc.Start()
if err != nil {
@@ -120,7 +126,7 @@ func (c *Client) initializeInner(ctx context.Context) error {
return err
}
t := time.NewTimer(handshakeTimeout)
t := time.NewTimer(c.HandshakeTimeout)
defer t.Stop()
outer:
@@ -145,7 +151,7 @@ outer:
}
if !c.Publish {
err = c.pc.GatherIncomingTracks()
err = c.pc.GatherIncomingTracks(c.TrackGatherTimeout)
if err != nil {
c.deleteSession(context.Background()) //nolint:errcheck
return err
+11 -18
View File
@@ -10,7 +10,6 @@ import (
"testing"
"time"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/protocols/webrtc"
"github.com/bluenviron/mediamtx/internal/test"
"github.com/pion/rtp"
@@ -70,14 +69,11 @@ func TestClientRead(t *testing.T) {
}
pc := &webrtc.PeerConnection{
LocalRandomUDP: true,
IPsFromInterfaces: true,
Publish: true,
HandshakeTimeout: conf.Duration(10 * time.Second),
TrackGatherTimeout: conf.Duration(2 * time.Second),
STUNGatherTimeout: conf.Duration(5 * time.Second),
OutgoingTracks: outgoingTracks,
Log: test.NilLogger,
LocalRandomUDP: true,
IPsFromInterfaces: true,
Publish: true,
OutgoingTracks: outgoingTracks,
Log: test.NilLogger,
}
err := pc.Start()
require.NoError(t, err)
@@ -116,7 +112,7 @@ func TestClientRead(t *testing.T) {
w.Write([]byte(answer.SDP))
go func() {
err3 := pc.WaitUntilConnected()
err3 := pc.WaitUntilConnected(10 * time.Second)
require.NoError(t, err3)
for _, track := range outgoingTracks {
@@ -243,12 +239,9 @@ func TestClientPublish(t *testing.T) {
for _, ca := range []string{"audio", "video+audio"} {
t.Run(ca, func(t *testing.T) {
pc := &webrtc.PeerConnection{
LocalRandomUDP: true,
IPsFromInterfaces: true,
HandshakeTimeout: conf.Duration(10 * time.Second),
TrackGatherTimeout: conf.Duration(2 * time.Second),
STUNGatherTimeout: conf.Duration(5 * time.Second),
Log: test.NilLogger,
LocalRandomUDP: true,
IPsFromInterfaces: true,
Log: test.NilLogger,
}
err := pc.Start()
require.NoError(t, err)
@@ -288,10 +281,10 @@ func TestClientPublish(t *testing.T) {
w.Write([]byte(answer.SDP))
go func() {
err3 := pc.WaitUntilConnected()
err3 := pc.WaitUntilConnected(10 * time.Second)
require.NoError(t, err3)
err3 = pc.GatherIncomingTracks()
err3 = pc.GatherIncomingTracks(2 * time.Second)
require.NoError(t, err3)
codecs := gatherCodecs(pc.IncomingTracks())
+2 -2
View File
@@ -201,9 +201,9 @@ type Server struct {
IPsFromInterfacesList []string
AdditionalHosts []string
ICEServers []conf.WebRTCICEServer
STUNGatherTimeout conf.Duration
HandshakeTimeout conf.Duration
TrackGatherTimeout conf.Duration
STUNGatherTimeout conf.Duration
ExternalCmdPool *externalcmd.Pool
Metrics serverMetrics
PathManager serverPathManager
@@ -357,9 +357,9 @@ outer:
additionalHosts: s.AdditionalHosts,
iceUDPMux: s.iceUDPMux,
iceTCPMux: s.iceTCPMux,
stunGatherTimeout: s.STUNGatherTimeout,
handshakeTimeout: s.HandshakeTimeout,
trackGatherTimeout: s.TrackGatherTimeout,
stunGatherTimeout: s.STUNGatherTimeout,
req: req,
wg: &wg,
externalCmdPool: s.ExternalCmdPool,
+4 -4
View File
@@ -161,9 +161,9 @@ func TestServerOptionsICEServer(t *testing.T) {
Username: "myuser",
Password: "mypass",
}},
STUNGatherTimeout: conf.Duration(5 * time.Second),
HandshakeTimeout: conf.Duration(10 * time.Second),
TrackGatherTimeout: conf.Duration(2 * time.Second),
STUNGatherTimeout: conf.Duration(5 * time.Second),
PathManager: pathManager,
Parent: test.NilLogger,
}
@@ -267,9 +267,9 @@ func TestServerPublish(t *testing.T) {
IPsFromInterfacesList: []string{},
AdditionalHosts: []string{},
ICEServers: []conf.WebRTCICEServer{},
STUNGatherTimeout: conf.Duration(5 * time.Second),
HandshakeTimeout: conf.Duration(10 * time.Second),
TrackGatherTimeout: conf.Duration(2 * time.Second),
STUNGatherTimeout: conf.Duration(5 * time.Second),
PathManager: pathManager,
Parent: test.NilLogger,
}
@@ -514,9 +514,9 @@ func TestServerRead(t *testing.T) {
IPsFromInterfacesList: []string{},
AdditionalHosts: []string{},
ICEServers: []conf.WebRTCICEServer{},
STUNGatherTimeout: conf.Duration(5 * time.Second),
HandshakeTimeout: conf.Duration(10 * time.Second),
TrackGatherTimeout: conf.Duration(2 * time.Second),
STUNGatherTimeout: conf.Duration(5 * time.Second),
PathManager: pathManager,
Parent: test.NilLogger,
}
@@ -603,9 +603,9 @@ func TestServerReadNotFound(t *testing.T) {
IPsFromInterfacesList: []string{},
AdditionalHosts: []string{},
ICEServers: []conf.WebRTCICEServer{},
STUNGatherTimeout: conf.Duration(5 * time.Second),
HandshakeTimeout: conf.Duration(10 * time.Second),
TrackGatherTimeout: conf.Duration(2 * time.Second),
STUNGatherTimeout: conf.Duration(5 * time.Second),
PathManager: pm,
Parent: test.NilLogger,
}
+6 -10
View File
@@ -48,9 +48,9 @@ type session struct {
additionalHosts []string
iceUDPMux ice.UDPMux
iceTCPMux *webrtc.TCPMuxWrapper
stunGatherTimeout conf.Duration
handshakeTimeout conf.Duration
trackGatherTimeout conf.Duration
stunGatherTimeout conf.Duration
req webRTCNewSessionReq
wg *sync.WaitGroup
externalCmdPool *externalcmd.Pool
@@ -166,9 +166,7 @@ func (s *session) runPublish() (int, error) {
IPsFromInterfaces: s.ipsFromInterfaces,
IPsFromInterfacesList: s.ipsFromInterfacesList,
AdditionalHosts: s.additionalHosts,
HandshakeTimeout: s.handshakeTimeout,
TrackGatherTimeout: s.trackGatherTimeout,
STUNGatherTimeout: s.stunGatherTimeout,
STUNGatherTimeout: time.Duration(s.stunGatherTimeout),
Publish: false,
Log: s,
}
@@ -219,7 +217,7 @@ func (s *session) runPublish() (int, error) {
go s.readRemoteCandidates(pc)
err = pc.WaitUntilConnected()
err = pc.WaitUntilConnected(time.Duration(s.handshakeTimeout))
if err != nil {
return 0, err
}
@@ -228,7 +226,7 @@ func (s *session) runPublish() (int, error) {
s.pc = pc
s.mutex.Unlock()
err = pc.GatherIncomingTracks()
err = pc.GatherIncomingTracks(time.Duration(s.trackGatherTimeout))
if err != nil {
return 0, err
}
@@ -311,9 +309,7 @@ func (s *session) runRead() (int, error) {
IPsFromInterfaces: s.ipsFromInterfaces,
IPsFromInterfacesList: s.ipsFromInterfacesList,
AdditionalHosts: s.additionalHosts,
HandshakeTimeout: s.handshakeTimeout,
TrackGatherTimeout: s.trackGatherTimeout,
STUNGatherTimeout: s.stunGatherTimeout,
STUNGatherTimeout: time.Duration(s.stunGatherTimeout),
Publish: true,
Log: s,
}
@@ -356,7 +352,7 @@ func (s *session) runRead() (int, error) {
go s.readRemoteCandidates(pc)
err = pc.WaitUntilConnected()
err = pc.WaitUntilConnected(time.Duration(s.handshakeTimeout))
if err != nil {
return 0, err
}
+5 -2
View File
@@ -59,8 +59,11 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
Timeout: time.Duration(s.ReadTimeout),
Transport: tr,
},
UDPReadBufferSize: s.UDPReadBufferSize,
Log: s,
UDPReadBufferSize: s.UDPReadBufferSize,
STUNGatherTimeout: time.Duration(params.Conf.WHEPSTUNGatherTimeout),
HandshakeTimeout: time.Duration(params.Conf.WHEPHandshakeTimeout),
TrackGatherTimeout: time.Duration(params.Conf.WHEPTrackGatherTimeout),
Log: s,
}
err = client.Initialize(params.Context)
if err != nil {
+6 -9
View File
@@ -36,14 +36,11 @@ func TestSource(t *testing.T) {
}}
pc := &webrtc.PeerConnection{
LocalRandomUDP: true,
IPsFromInterfaces: true,
Publish: true,
HandshakeTimeout: conf.Duration(10 * time.Second),
TrackGatherTimeout: conf.Duration(2 * time.Second),
STUNGatherTimeout: conf.Duration(5 * time.Second),
OutgoingTracks: outgoingTracks,
Log: test.NilLogger,
LocalRandomUDP: true,
IPsFromInterfaces: true,
Publish: true,
OutgoingTracks: outgoingTracks,
Log: test.NilLogger,
}
err := pc.Start()
require.NoError(t, err)
@@ -82,7 +79,7 @@ func TestSource(t *testing.T) {
w.Write([]byte(answer.SDP))
go func() {
err3 := pc.WaitUntilConnected()
err3 := pc.WaitUntilConnected(10 * time.Second)
require.NoError(t, err3)
err3 = outgoingTracks[0].WriteRTP(&rtp.Packet{
+2 -2
View File
@@ -46,13 +46,13 @@ func TestHLSServerAuth(t *testing.T) {
"fail",
} {
t.Run(result, func(t *testing.T) {
conf := "paths:\n" +
cnf := "paths:\n" +
" all_others:\n" +
" readUser: testreader\n" +
" readPass: testpass\n" +
" readIPs: [127.0.0.0/16]\n"
p, ok := newInstance(conf)
p, ok := newInstance(cnf)
require.Equal(t, true, ok)
defer p.Close()
+15 -5
View File
@@ -417,12 +417,12 @@ webrtcICEServers2: []
# username: ''
# password: ''
# clientOnly: false
# Maximum time to gather STUN candidates.
webrtcSTUNGatherTimeout: 5s
# Time to wait for the WebRTC handshake to complete.
webrtcHandshakeTimeout: 10s
# Maximum time to gather video tracks.
# Maximum time to gather tracks.
webrtcTrackGatherTimeout: 2s
# The maximum time to gather STUN candidates.
webrtcSTUNGatherTimeout: 5s
###############################################
# Global settings -> SRT server
@@ -458,8 +458,8 @@ pathDefaults:
# * unix+mpegts://socket -> the stream is pulled from MPEG-TS over Unix socket, by using the socket
# * udp+rtp://ip:port -> the stream is pulled from RTP over UDP, by listening on the specified address
# * srt://existing-url -> the stream is pulled from another SRT server / camera
# * whep://existing-url -> the stream is pulled from another WebRTC server / camera
# * wheps://existing-url -> the stream is pulled from another WebRTC server / camera with HTTPS
# * whep://existing-url -> the stream is pulled from another WebRTC server / camera with HTTP+WHEP
# * wheps://existing-url -> the stream is pulled from another WebRTC server / camera with HTTPS+WHEP
# * redirect -> the stream is provided by another path or server
# * rpiCamera -> the stream is provided by a Raspberry Pi Camera
# The following variables can be used in the source string:
@@ -568,6 +568,16 @@ pathDefaults:
# session description protocol (SDP) of the RTP stream.
rtpSDP:
###############################################
# Default path settings -> WebRTC / WHEP source (when source is WHEP)
# Maximum time to gather STUN candidates.
whepSTUNGatherTimeout: 5s
# Time to wait for the WebRTC handshake to complete.
whepHandshakeTimeout: 10s
# Maximum time to gather tracks.
whepTrackGatherTimeout: 2s
###############################################
# Default path settings -> Redirect source (when source is "redirect")