Add support for ICECandidatePoolSize

support ICECandidatePoolSize for bundled gathering.
This commit is contained in:
Ji Young in
2026-01-28 19:41:11 +09:00
committed by GitHub
parent 1aaeffe2d9
commit 9228f0641a
6 changed files with 213 additions and 22 deletions
+2
View File
@@ -278,6 +278,8 @@ var (
"cannot convert to StatsICECandidatePairStateSucceeded invalid ice candidate state",
)
errICECandidatePoolSizeTooLarge = errors.New("ice candidate pool size greater than 1 is not supported")
errInvalidICECredentialTypeString = errors.New("invalid ICECredentialType")
errInvalidICEServer = errors.New("invalid ICEServer")
+75 -9
View File
@@ -44,6 +44,10 @@ type ICEGatherer struct {
// for ICE candidates generated by this gatherer.
sdpMid atomic.Value // string
sdpMLineIndex atomic.Uint32 // uint16
// Used for ICE candidate pooling
candidatePool []ice.Candidate
iceCandidatePoolSize uint8
}
// ICEAddressRewriteMode controls whether a rule replaces or appends candidates.
@@ -104,13 +108,15 @@ func (api *API) NewICEGatherer(opts ICEGatherOptions) (*ICEGatherer, error) {
}
return &ICEGatherer{
state: ICEGathererStateNew,
gatherPolicy: opts.ICEGatherPolicy,
validatedServers: validatedServers,
api: api,
log: api.settingEngine.LoggerFactory.NewLogger("ice"),
sdpMid: atomic.Value{},
sdpMLineIndex: atomic.Uint32{},
state: ICEGathererStateNew,
gatherPolicy: opts.ICEGatherPolicy,
validatedServers: validatedServers,
api: api,
log: api.settingEngine.LoggerFactory.NewLogger("ice"),
sdpMid: atomic.Value{},
sdpMLineIndex: atomic.Uint32{},
candidatePool: make([]ice.Candidate, 0, opts.ICECandidatePoolSize),
iceCandidatePoolSize: opts.ICECandidatePoolSize,
}, nil
}
@@ -133,7 +139,8 @@ func (g *ICEGatherer) updateServers(servers []ICEServer, policy ICETransportPoli
g.validatedServers = validatedServers
g.gatherPolicy = policy
if g.agent != nil {
if g.agent != nil && (g.State() != ICEGathererStateGathering ||
g.iceCandidatePoolSize == 0) {
return g.agent.UpdateOptions(ice.WithUrls(validatedServers))
}
@@ -430,6 +437,15 @@ func (g *ICEGatherer) Gather() error { //nolint:cyclop
sdpMLineIndex := uint16(g.sdpMLineIndex.Load()) //nolint:gosec // G115
if candidate != nil {
g.lock.Lock()
if g.iceCandidatePoolSize > 0 && g.candidatePool != nil {
g.candidatePool = append(g.candidatePool, candidate)
g.lock.Unlock()
return
}
g.lock.Unlock()
c, err := newICECandidateFromICE(candidate, sdpMid, sdpMLineIndex)
if err != nil {
g.log.Warnf("Failed to convert ice.Candidate: %s", err)
@@ -439,8 +455,18 @@ func (g *ICEGatherer) Gather() error { //nolint:cyclop
onLocalCandidateHandler(&c)
} else {
g.setState(ICEGathererStateComplete)
onGatheringCompleteHandler()
// If gathering completes before flushing (i.e., before SetLocalDescription), avoid triggering nil.
// Users expect valid candidates to be emitted before the nil completion signal.
g.lock.Lock()
if g.iceCandidatePoolSize > 0 && g.candidatePool != nil {
g.lock.Unlock()
return
}
g.lock.Unlock()
onLocalCandidateHandler(nil)
}
}); err != nil {
@@ -456,6 +482,46 @@ func (g *ICEGatherer) setMediaStreamIdentification(mid string, mLineIndex uint16
g.sdpMLineIndex.Store(uint32(mLineIndex))
}
func (g *ICEGatherer) flushCandidates() {
g.lock.Lock()
candidates := g.candidatePool
g.candidatePool = nil
g.iceCandidatePoolSize = 0
onLocalCandidateHandler := func(*ICECandidate) {}
if handler, ok := g.onLocalCandidateHandler.Load().(func(candidate *ICECandidate)); ok && handler != nil {
onLocalCandidateHandler = handler
}
sdpMid := ""
if mid, ok := g.sdpMid.Load().(string); ok {
sdpMid = mid
}
g.lock.Unlock()
sdpMLineIndex := uint16(g.sdpMLineIndex.Load()) //nolint:gosec // G115
currentState := g.State()
for _, candidate := range candidates {
c, err := newICECandidateFromICE(candidate, sdpMid, sdpMLineIndex)
if err != nil {
g.log.Warnf("Failed to convert pooled ice.Candidate: %s", err)
continue
}
onLocalCandidateHandler(&c)
}
// If this is true, gathering completed before flushing,
// so trigger nil to notify the user that all candidates have been gathered.
if currentState == ICEGathererStateComplete {
onLocalCandidateHandler(nil)
}
}
// Close prunes all local candidates, and closes the ports.
func (g *ICEGatherer) Close() error {
return g.close(false /* shouldGracefullyClose */)
+3 -2
View File
@@ -5,6 +5,7 @@ package webrtc
// ICEGatherOptions provides options relating to the gathering of ICE candidates.
type ICEGatherOptions struct {
ICEServers []ICEServer
ICEGatherPolicy ICETransportPolicy
ICEServers []ICEServer
ICEGatherPolicy ICETransportPolicy
ICECandidatePoolSize uint8
}
+31 -5
View File
@@ -200,6 +200,12 @@ func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection,
}
})
if pc.configuration.ICECandidatePoolSize > 0 {
if err := pc.iceGatherer.Gather(); err != nil {
return nil, err
}
}
pc.interceptorRTCPWriter = pc.api.interceptor.BindRTCPWriter(interceptor.RTCPWriterFunc(pc.writeRTCP))
return pc, nil
@@ -245,6 +251,11 @@ func (pc *PeerConnection) initConfiguration(configuration Configuration) error {
}
if configuration.ICECandidatePoolSize != 0 {
// Issue #2892, ice candidate pool size greater than 1 is not supported
if configuration.ICECandidatePoolSize > 1 {
return &rtcerr.NotSupportedError{Err: errICECandidatePoolSizeTooLarge}
}
pc.configuration.ICECandidatePoolSize = configuration.ICECandidatePoolSize
}
@@ -571,7 +582,13 @@ func (pc *PeerConnection) SetConfiguration(configuration Configuration) error {
pc.LocalDescription() != nil {
return &rtcerr.InvalidModificationError{Err: ErrModifyingICECandidatePoolSize}
}
pc.configuration.ICECandidatePoolSize = configuration.ICECandidatePoolSize
// Currently, there is no logic implemented to handle runtime changes to this value.
// Commenting out to prevent unexpected behavior.
// nolint:godox
// TODO: Re-enable this in a future update when proper handling is implemented.
// pc.configuration.ICECandidatePoolSize = configuration.ICECandidatePoolSize
pc.log.Warn("Changing ICECandidatePoolSize is not yet supported. The new value will be ignored.")
}
// https://www.w3.org/TR/webrtc/#set-the-configuration (step #4-6)
@@ -590,8 +607,14 @@ func (pc *PeerConnection) SetConfiguration(configuration Configuration) error {
pc.configuration.AlwaysNegotiateDataChannels = configuration.AlwaysNegotiateDataChannels
}
// Step #8: ICE candidate pool size is not implemented in pion/webrtc.
// The value is stored in configuration but candidate pooling is not supported.
// https://www.w3.org/TR/webrtc/#set-the-configuration (step #8)
// nolint:godox
// TODO: If the new ICE candidate pool size changes the existing setting,
// this may result in immediate gathering of new pooled candidates,
// or discarding of existing pooled candidates
if pc.configuration.ICECandidatePoolSize != configuration.ICECandidatePoolSize {
pc.log.Warn("Dynamic ICE candidate pool adjustment is not yet supported")
}
// https://www.w3.org/TR/webrtc/#set-the-configuration (step #9)
// Update the ICE gatherer so new servers take effect at the next gathering phase.
@@ -775,8 +798,9 @@ func (pc *PeerConnection) CreateOffer(options *OfferOptions) (SessionDescription
func (pc *PeerConnection) createICEGatherer() (*ICEGatherer, error) {
g, err := pc.api.NewICEGatherer(ICEGatherOptions{
ICEServers: pc.configuration.getICEServers(),
ICEGatherPolicy: pc.configuration.ICETransportPolicy,
ICEServers: pc.configuration.getICEServers(),
ICEGatherPolicy: pc.configuration.ICETransportPolicy,
ICECandidatePoolSize: pc.configuration.ICECandidatePoolSize,
})
if err != nil {
return nil, err
@@ -1112,6 +1136,8 @@ func (pc *PeerConnection) SetLocalDescription(desc SessionDescription) error {
pc.iceGatherer.setMediaStreamIdentification(mediaSection.SDPMid, mediaSection.SDPMLineIndex)
}
pc.iceGatherer.flushCandidates()
if pc.iceGatherer.State() == ICEGathererStateNew {
return pc.iceGatherer.Gather()
}
+3 -3
View File
@@ -85,7 +85,7 @@ func TestNew_Go(t *testing.T) {
RTCPMuxPolicy: RTCPMuxPolicyNegotiate,
PeerIdentity: "unittest",
Certificates: []Certificate{*certificate},
ICECandidatePoolSize: 5,
ICECandidatePoolSize: 1,
})
assert.Nil(t, err)
assert.NotNil(t, pc)
@@ -200,7 +200,7 @@ func TestPeerConnection_SetConfiguration_Go(t *testing.T) {
pc, err := api.NewPeerConnection(Configuration{
PeerIdentity: "unittest",
Certificates: []Certificate{*certificate1},
ICECandidatePoolSize: 5,
ICECandidatePoolSize: 1,
})
if err != nil {
return pc, err
@@ -230,7 +230,7 @@ func TestPeerConnection_SetConfiguration_Go(t *testing.T) {
RTCPMuxPolicy: RTCPMuxPolicyRequire,
PeerIdentity: "unittest",
Certificates: []Certificate{*certificate1},
ICECandidatePoolSize: 5,
ICECandidatePoolSize: 1,
})
if err != nil {
return pc, err
+99 -3
View File
@@ -4,6 +4,7 @@
package webrtc
import (
"runtime"
"sync"
"sync/atomic"
"testing"
@@ -172,7 +173,7 @@ func TestNew(t *testing.T) {
BundlePolicy: BundlePolicyMaxCompat,
RTCPMuxPolicy: RTCPMuxPolicyNegotiate,
PeerIdentity: "unittest",
ICECandidatePoolSize: 5,
ICECandidatePoolSize: 1,
})
assert.NoError(t, err)
assert.NotNil(t, pc)
@@ -194,7 +195,7 @@ func TestPeerConnection_SetConfiguration(t *testing.T) {
name: "valid",
init: func() (*PeerConnection, error) {
pc, err := NewPeerConnection(Configuration{
ICECandidatePoolSize: 5,
ICECandidatePoolSize: 1,
})
if err != nil {
return pc, err
@@ -212,7 +213,7 @@ func TestPeerConnection_SetConfiguration(t *testing.T) {
ICETransportPolicy: ICETransportPolicyAll,
BundlePolicy: BundlePolicyBalanced,
RTCPMuxPolicy: RTCPMuxPolicyRequire,
ICECandidatePoolSize: 5,
ICECandidatePoolSize: 1,
AlwaysNegotiateDataChannels: true,
})
if err != nil {
@@ -701,6 +702,101 @@ func TestGatherOnSetLocalDescription(t *testing.T) { //nolint:cyclop
closePairNow(t, pcOffer, pcAnswer)
}
// Assert that candidates are flushed by calling SetLocalDescription if ICECandidatePoolSize > 0.
func TestFlushOnSetLocalDescription(t *testing.T) {
if runtime.GOARCH == "wasm" {
t.Skip("Skipping ICECandidatePool test on WASM")
}
lim := test.TimeOut(time.Second * 30)
defer lim.Stop()
report := test.CheckRoutines(t)
defer report()
pcOfferFlushStarted := make(chan SessionDescription)
pcAnswerFlushStarted := make(chan SessionDescription)
var offerOnce sync.Once
var answerOnce sync.Once
pcOffer, err := NewPeerConnection(Configuration{
ICECandidatePoolSize: 1,
})
assert.NoError(t, err)
// We need to create a data channel in order to set mid
_, err = pcOffer.CreateDataChannel("initial_data_channel", nil)
assert.NoError(t, err)
pcOffer.OnICECandidate(func(i *ICECandidate) {
offerOnce.Do(func() {
close(pcOfferFlushStarted)
})
})
// Assert that ICEGatheringState changes immediately
assert.Eventually(t, func() bool {
return pcOffer.ICEGatheringState() != ICEGatheringStateNew
}, time.Second, 10*time.Millisecond, "ICEGatheringState should switch to Gathering or Complete immediately")
// Assert that no events are fired before SetLocalDescription
select {
case <-pcOfferFlushStarted:
assert.Fail(t, "Flush started before SetLocalDescription")
case <-time.After(time.Second):
}
// Verify that candidates are flushed immediately after SetLocalDescription
offer, err := pcOffer.CreateOffer(nil)
assert.NoError(t, err)
assert.NoError(t, pcOffer.SetLocalDescription(offer))
<-pcOfferFlushStarted
// Create Answer PeerConnection
pcAnswer, err := NewPeerConnection(Configuration{
ICECandidatePoolSize: 1,
})
assert.NoError(t, err)
pcAnswer.OnICECandidate(func(i *ICECandidate) {
answerOnce.Do(func() {
close(pcAnswerFlushStarted)
})
})
// Assert that ICEGatheringState changes immediately
assert.Eventually(t, func() bool {
return pcAnswer.ICEGatheringState() != ICEGatheringStateNew
}, time.Second, 10*time.Millisecond, "ICEGatheringState should switch to Gathering or Complete immediately")
assert.NoError(t, pcAnswer.SetRemoteDescription(offer))
select {
case <-pcAnswerFlushStarted:
assert.Fail(t, "Flush started before SetLocalDescription")
case <-time.After(time.Second):
}
// Verify that candidates are flushed immediately after SetLocalDescription
answer, err := pcAnswer.CreateAnswer(nil)
assert.NoError(t, err)
assert.NoError(t, pcAnswer.SetLocalDescription(answer))
<-pcAnswerFlushStarted
closePairNow(t, pcOffer, pcAnswer)
}
func TestSetICECandidatePoolSizeLarge(t *testing.T) {
if runtime.GOARCH == "wasm" {
t.Skip("Skipping ICECandidatePool test on WASM")
}
pc, err := NewPeerConnection(Configuration{
ICECandidatePoolSize: 2,
})
assert.Nil(t, pc)
assert.Equal(t, &rtcerr.NotSupportedError{Err: errICECandidatePoolSizeTooLarge}, err)
}
// Assert that SetRemoteDescription handles invalid states.
func TestSetRemoteDescriptionInvalid(t *testing.T) {
t.Run("local-offer+SetRemoteDescription(Offer)", func(t *testing.T) {