basichost: improve autonatv2 reachability logic (#3356)

This improves the reachability detection logic by introducing the concept of primary and secondary addresses. If we have a webtransport address which shares the IP and Port with a QUIC address, the WebTransport address will be considered secondary and the QUIC address will be considered primary.

If the Primary is reachable or unreachable, we require only one confirmation for the Secondary address. This speeds up address verification considerably. We also only refresh secondary addrs reachability once every 3 hours. For primary addresses this duration is 1 hour.
This commit is contained in:
sukun
2025-10-06 15:45:31 +05:30
committed by GitHub
parent 31f527dc87
commit c19b3d6945
2 changed files with 312 additions and 36 deletions
+172 -36
View File
@@ -361,9 +361,15 @@ const (
// and then a success(...S S S S F S). The confidence in the targetConfidence window will be equal to
// targetConfidence, the last F and S cancel each other, and we won't probe again for maxProbeInterval.
maxRecentDialsWindow = targetConfidence + 2
// secondaryAddrsScalingFactor is the multiplier applied to secondary address dial outcomes. For secondary
// addr, if the primary addr is reachable, a single successful dial is enough to consider the secondary addr
// reachable.
secondaryAddrsScalingFactor = targetConfidence
// highConfidenceAddrProbeInterval is the maximum interval between probes for an address
highConfidenceAddrProbeInterval = 1 * time.Hour
// maxProbeResultTTL is the maximum time to keep probe results for an address
// highConfidenceSecondaryAddrProbeInterval is the maximum interval between probes for an address
highConfidenceSecondaryAddrProbeInterval = 3 * time.Hour
// maxProbeResultTTL is the maximum time to keep probe results for a primary address
maxProbeResultTTL = maxRecentDialsWindow * highConfidenceAddrProbeInterval
)
@@ -380,7 +386,8 @@ type probeManager struct {
inProgressProbes map[string]int // addr -> count
inProgressProbesTotal int
statuses map[string]*addrStatus
addrs []ma.Multiaddr
primaryAddrs []ma.Multiaddr
secondaryAddrs []ma.Multiaddr
}
// newProbeManager creates a new probe manager.
@@ -397,7 +404,20 @@ func (m *probeManager) AppendConfirmedAddrs(reachable, unreachable, unknown []ma
m.mx.Lock()
defer m.mx.Unlock()
for _, a := range m.addrs {
for _, a := range m.primaryAddrs {
s := m.statuses[string(a.Bytes())]
s.RemoveBefore(m.now().Add(-maxProbeResultTTL)) // cleanup stale results
switch s.Reachability() {
case network.ReachabilityPublic:
reachable = append(reachable, a)
case network.ReachabilityPrivate:
unreachable = append(unreachable, a)
case network.ReachabilityUnknown:
unknown = append(unknown, a)
}
}
for _, a := range m.secondaryAddrs {
s := m.statuses[string(a.Bytes())]
s.RemoveBefore(m.now().Add(-maxProbeResultTTL)) // cleanup stale results
switch s.Reachability() {
@@ -425,9 +445,20 @@ func (m *probeManager) UpdateAddrs(addrs []ma.Multiaddr) {
statuses[k] = &addrStatus{Addr: addr}
} else {
statuses[k] = m.statuses[k]
// our addresses have changed, we may have removed the primary address
statuses[k].primary = nil
}
}
assignPrimaryAddrs(statuses)
m.primaryAddrs = m.primaryAddrs[:0]
m.secondaryAddrs = m.secondaryAddrs[:0]
for _, a := range addrs {
if statuses[string(a.Bytes())].primary == nil {
m.primaryAddrs = append(m.primaryAddrs, a)
} else {
m.secondaryAddrs = append(m.secondaryAddrs, a)
}
}
m.addrs = addrs
m.statuses = statuses
}
@@ -438,33 +469,70 @@ func (m *probeManager) GetProbe() probe {
m.mx.Lock()
defer m.mx.Unlock()
/*
- First, select the first address for the probe. The assumption is that this is the
address which will be dialled.
- Then, we fill the rest of the addresses in the probe while trying to ensure diversity.
*/
now := m.now()
for i, a := range m.addrs {
ab := a.Bytes()
pc := m.statuses[string(ab)].RequiredProbeCount(now)
if m.inProgressProbes[string(ab)] >= pc {
// first check if the probe's first address is a primary address
idx, ok := m.getFirstProbeAddrIdx(m.primaryAddrs, now)
var reqs probe
if ok {
reqs = make(probe, 0, maxAddrsPerRequest)
reqs = append(reqs, autonatv2.Request{Addr: m.primaryAddrs[idx], SendDialData: true})
reqs = m.appendRequestsToProbe(reqs, m.primaryAddrs, idx, true, now)
reqs = m.appendRequestsToProbe(reqs, m.secondaryAddrs, 0, false, now)
} else {
// no primary addresses available, try secondary.
idx, ok := m.getFirstProbeAddrIdx(m.secondaryAddrs, now)
if !ok {
return nil
}
reqs = make(probe, 0, maxAddrsPerRequest)
reqs = append(reqs, autonatv2.Request{Addr: m.secondaryAddrs[idx], SendDialData: true})
reqs = m.appendRequestsToProbe(reqs, m.primaryAddrs, 0, false, now)
reqs = m.appendRequestsToProbe(reqs, m.secondaryAddrs, idx, true, now)
}
if len(reqs) >= maxAddrsPerRequest {
reqs = reqs[:maxAddrsPerRequest]
}
return reqs
}
// getFirstProbeAddrIdx returns the idx of the probe's first address
func (m *probeManager) getFirstProbeAddrIdx(addrs []ma.Multiaddr, now time.Time) (int, bool) {
for i, a := range addrs {
s := m.statuses[string(a.Bytes())]
pc := s.RequiredProbeCount(now)
if pc == 0 || m.inProgressProbes[string(addrs[i].Bytes())] >= pc {
continue
}
reqs := make(probe, 0, maxAddrsPerRequest)
reqs = append(reqs, autonatv2.Request{Addr: a, SendDialData: true})
// We have the first(primary) address. Append other addresses, ignoring inprogress probes
// on secondary addresses. The expectation is that the primary address will
// be dialed.
for j := 1; j < len(m.addrs); j++ {
k := (i + j) % len(m.addrs)
ab := m.addrs[k].Bytes()
pc := m.statuses[string(ab)].RequiredProbeCount(now)
if pc == 0 {
continue
}
reqs = append(reqs, autonatv2.Request{Addr: m.addrs[k], SendDialData: true})
if len(reqs) >= maxAddrsPerRequest {
break
}
}
return reqs
return i, true
}
return nil
return -1, false
}
// appendRequestsToProbe appends requests to `reqs` after the first address has been determined
func (m *probeManager) appendRequestsToProbe(reqs probe, addrs []ma.Multiaddr, st int, skipStart bool, now time.Time) probe {
n := len(addrs)
for j := range n {
k := (j + st) % n // We start from index: st
if skipStart && k == st {
continue
}
s := m.statuses[string(addrs[k].Bytes())]
pc := s.RequiredProbeCount(now)
if pc == 0 {
continue
}
reqs = append(reqs, autonatv2.Request{Addr: addrs[k], SendDialData: true})
if len(reqs) >= maxAddrsPerRequest {
break
}
}
return reqs
}
// MarkProbeInProgress should be called when a probe is started.
@@ -499,10 +567,10 @@ func (m *probeManager) CompleteProbe(reqs probe, res autonatv2.Result, err error
defer m.mx.Unlock()
// decrement in-progress count for the first address
primaryAddrKey := string(reqs[0].Addr.Bytes())
m.inProgressProbes[primaryAddrKey]--
if m.inProgressProbes[primaryAddrKey] <= 0 {
delete(m.inProgressProbes, primaryAddrKey)
firstAddrKey := string(reqs[0].Addr.Bytes())
m.inProgressProbes[firstAddrKey]--
if m.inProgressProbes[firstAddrKey] <= 0 {
delete(m.inProgressProbes, firstAddrKey)
}
m.inProgressProbesTotal--
@@ -511,17 +579,17 @@ func (m *probeManager) CompleteProbe(reqs probe, res autonatv2.Result, err error
return
}
// Consider only primary address as refused. This increases the number of
// Consider only first address as refused. This increases the number of
// refused probes, but refused probes are cheap for a server as no dials are made.
if res.AllAddrsRefused {
if s, ok := m.statuses[primaryAddrKey]; ok {
if s, ok := m.statuses[firstAddrKey]; ok {
s.AddRefusal(now)
}
return
}
dialAddrKey := string(res.Addr.Bytes())
if dialAddrKey != primaryAddrKey {
if s, ok := m.statuses[primaryAddrKey]; ok {
if dialAddrKey != firstAddrKey {
if s, ok := m.statuses[firstAddrKey]; ok {
s.AddRefusal(now)
}
}
@@ -539,6 +607,7 @@ type dialOutcome struct {
type addrStatus struct {
Addr ma.Multiaddr
primary *addrStatus
lastRefusalTime time.Time
consecutiveRefusals int
dialTimes []time.Time
@@ -587,7 +656,8 @@ func (s *addrStatus) requiredProbeCountForConfirmation(now time.Time) int {
}
lastOutcome := s.outcomes[len(s.outcomes)-1]
// If the last probe result is old, we need to retest
if now.Sub(lastOutcome.At) > highConfidenceAddrProbeInterval {
if d := now.Sub(lastOutcome.At); (s.primary == nil && d > highConfidenceAddrProbeInterval) ||
(d > highConfidenceSecondaryAddrProbeInterval) {
return 1
}
// if the last probe result was different from reachability, probe again.
@@ -670,6 +740,15 @@ func (s *addrStatus) reachabilityAndCounts() (rch network.Reachability, successe
failures++
}
}
if s.primary != nil {
prch, _, _ := s.primary.reachabilityAndCounts()
switch prch {
case network.ReachabilityPublic:
successes *= secondaryAddrsScalingFactor
case network.ReachabilityPrivate:
failures *= secondaryAddrsScalingFactor
}
}
if successes-failures >= minConfidence {
return network.ReachabilityPublic, successes, failures
}
@@ -678,3 +757,60 @@ func (s *addrStatus) reachabilityAndCounts() (rch network.Reachability, successe
}
return network.ReachabilityUnknown, successes, failures
}
var errNotTW = errors.New("not a thinwaist address")
func thinWaistPart(a ma.Multiaddr) (ma.Multiaddr, error) {
if len(a) < 2 {
return nil, errNotTW
}
if c0, c1 := a[0].Code(), a[1].Code(); (c0 != ma.P_IP4 && c0 != ma.P_IP6) || (c1 != ma.P_TCP && c1 != ma.P_UDP) {
return nil, errNotTW
}
return a[:2], nil
}
func assignPrimaryAddrs(statuses map[string]*addrStatus) {
twMap := make(map[string][]ma.Multiaddr, len(statuses))
for _, s := range statuses {
twp, err := thinWaistPart(s.Addr)
if err != nil {
continue
}
twMap[string(twp.Bytes())] = append(twMap[string(twp.Bytes())], s.Addr)
}
score := func(a ma.Multiaddr) int {
score := 0
for _, p := range a {
switch p.Code() {
case ma.P_QUIC_V1, ma.P_TCP:
score += 1
case ma.P_WEBTRANSPORT:
score += 1 << 1
case ma.P_WEBRTC:
score += 1 << 2
case ma.P_WS, ma.P_WSS:
score += 1 << 3
}
}
if score == 0 {
return 1 << 20
}
return score
}
for _, addrs := range twMap {
if len(addrs) <= 1 {
continue
}
slices.SortFunc(addrs, func(a, b ma.Multiaddr) int {
return score(a) - score(b)
})
primary := addrs[0]
ps := statuses[string(primary.Bytes())]
for _, a := range addrs[1:] {
s := statuses[string(a.Bytes())]
s.primary = ps
}
}
}
@@ -19,6 +19,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/p2p/protocol/autonatv2"
ma "github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multiaddr/matest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -193,6 +194,87 @@ func TestProbeManager(t *testing.T) {
require.Empty(t, reachable)
require.Empty(t, unreachable)
})
t.Run("primary secondary", func(t *testing.T) {
quic := ma.StringCast("/ip4/1.1.1.1/udp/1/quic-v1")
webrtc := ma.StringCast("/ip4/1.1.1.1/udp/1/webrtc-direct")
tcp := ma.StringCast("/ip4/1.1.1.1/tcp/1")
websocket := ma.StringCast("/ip4/1.1.1.1/tcp/1/ws")
pm := makeNewProbeManager([]ma.Multiaddr{tcp, websocket, webrtc, quic})
extractAddrs := func(reqs probe) []ma.Multiaddr {
var res []ma.Multiaddr
for _, r := range reqs {
res = append(res, r.Addr)
}
return res
}
// tcp private
for range targetConfidence {
reqs := nextProbe(pm)
matest.AssertEqualMultiaddrs(t, []ma.Multiaddr{tcp, quic, websocket, webrtc}, extractAddrs(reqs))
pm.CompleteProbe(reqs, autonatv2.Result{Addr: tcp, Idx: 0, Reachability: network.ReachabilityPrivate}, nil)
}
// quic public
for range targetConfidence {
reqs := nextProbe(pm)
matest.AssertEqualMultiaddrs(t, []ma.Multiaddr{quic, websocket, webrtc}, extractAddrs(reqs))
pm.CompleteProbe(reqs, autonatv2.Result{Addr: quic, Idx: 0, Reachability: network.ReachabilityPublic}, nil)
}
// only 1 check now required for websocket
for range 1 {
reqs := nextProbe(pm)
matest.AssertEqualMultiaddrs(t, []ma.Multiaddr{websocket, webrtc}, extractAddrs(reqs))
pm.CompleteProbe(reqs, autonatv2.Result{Addr: websocket, Idx: 0, Reachability: network.ReachabilityPrivate}, nil)
}
// 3 checks required for webrtc to make its reachability different from quic.
for range targetConfidence {
reqs := nextProbe(pm)
matest.AssertEqualMultiaddrs(t, []ma.Multiaddr{webrtc}, extractAddrs(reqs))
pm.CompleteProbe(reqs, autonatv2.Result{Addr: webrtc, Idx: 0, Reachability: network.ReachabilityPrivate}, nil)
}
reachable, unreachable, _ := pm.AppendConfirmedAddrs(nil, nil, nil)
matest.AssertMultiaddrsMatch(t, []ma.Multiaddr{quic}, reachable)
matest.AssertMultiaddrsMatch(t, []ma.Multiaddr{tcp, websocket, webrtc}, unreachable)
// Every `highConfidenceAddrsProbeInterval` we refresh the primary addr binding
for range 2 {
cl.Add(highConfidenceAddrProbeInterval + 1*time.Millisecond)
reqs := nextProbe(pm)
matest.AssertEqualMultiaddrs(t, []ma.Multiaddr{tcp, quic}, extractAddrs(reqs))
pm.CompleteProbe(reqs, autonatv2.Result{Addr: tcp, Idx: 0, Reachability: network.ReachabilityPrivate}, nil)
reqs = nextProbe(pm)
matest.AssertEqualMultiaddrs(t, []ma.Multiaddr{quic}, extractAddrs(reqs))
pm.CompleteProbe(reqs, autonatv2.Result{Addr: quic, Idx: 0, Reachability: network.ReachabilityPublic}, nil)
reqs = nextProbe(pm)
require.Empty(t, reqs)
}
cl.Add(highConfidenceAddrProbeInterval + 1*time.Millisecond)
reqs := nextProbe(pm)
matest.AssertEqualMultiaddrs(t, []ma.Multiaddr{tcp, quic, websocket, webrtc}, extractAddrs(reqs))
pm.CompleteProbe(reqs, autonatv2.Result{Addr: tcp, Idx: 0, Reachability: network.ReachabilityPrivate}, nil)
reqs = nextProbe(pm)
matest.AssertEqualMultiaddrs(t, []ma.Multiaddr{quic, websocket, webrtc}, extractAddrs(reqs))
pm.CompleteProbe(reqs, autonatv2.Result{Addr: quic, Idx: 0, Reachability: network.ReachabilityPublic}, nil)
// secondary addrs refreshed at 3*highConfidenceProbeInterval
reqs = nextProbe(pm)
matest.AssertEqualMultiaddrs(t, []ma.Multiaddr{websocket, webrtc}, extractAddrs(reqs))
pm.CompleteProbe(reqs, autonatv2.Result{Addr: websocket, Idx: 0, Reachability: network.ReachabilityPrivate}, nil)
reqs = nextProbe(pm)
matest.AssertEqualMultiaddrs(t, []ma.Multiaddr{webrtc}, extractAddrs(reqs))
pm.CompleteProbe(reqs, autonatv2.Result{Addr: webrtc, Idx: 0, Reachability: network.ReachabilityPrivate}, nil)
reqs = nextProbe(pm)
require.Empty(t, reqs)
reachable, unreachable, _ = pm.AppendConfirmedAddrs(nil, nil, nil)
matest.AssertMultiaddrsMatch(t, reachable, []ma.Multiaddr{quic})
matest.AssertMultiaddrsMatch(t, unreachable, []ma.Multiaddr{tcp, websocket, webrtc})
})
}
type mockAutoNATClient struct {
@@ -720,6 +802,64 @@ func TestAddrStatusProbeCount(t *testing.T) {
}
}
func TestAssignPrimaryAddress(t *testing.T) {
webTransport1 := ma.StringCast("/ip4/127.0.0.1/udp/1/quic-v1/webtransport")
quic1 := ma.StringCast("/ip4/127.0.0.1/udp/1/quic-v1")
webRTC1 := ma.StringCast("/ip4/127.0.0.1/udp/1/webrtc-direct")
webTransport2 := ma.StringCast("/ip4/127.0.0.1/udp/2/quic-v1/webtransport")
quic2 := ma.StringCast("/ip4/127.0.0.1/udp/2/quic-v1")
webRTC2 := ma.StringCast("/ip4/127.0.0.1/udp/2/webrtc-direct")
tcp1 := ma.StringCast("/ip4/127.0.0.1/tcp/1")
ws1 := ma.StringCast("/ip4/127.0.0.1/tcp/1/ws")
tests := [][]struct{ secondary, primary ma.Multiaddr }{
{
{webTransport1, quic1},
{webRTC1, quic1},
},
{
{webTransport1, quic1},
{webRTC1, quic1},
{webTransport2, quic2},
{webRTC2, quic2},
},
{
{webTransport1, quic1},
{webRTC1, quic1},
{webTransport2, quic2},
{webRTC2, quic2},
{ws1, tcp1},
},
{
{webTransport1, nil},
{quic2, nil},
{ws1, nil},
},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
statuses := make(map[string]*addrStatus)
for _, p := range tt {
if p.primary != nil {
statuses[string(p.primary.Bytes())] = &addrStatus{Addr: p.primary}
}
statuses[string(p.secondary.Bytes())] = &addrStatus{Addr: p.secondary}
}
assignPrimaryAddrs(statuses)
for _, p := range tt {
if p.primary != nil {
require.Nil(t, statuses[string(p.primary.Bytes())].primary)
require.Equal(t, statuses[string(p.secondary.Bytes())].primary, statuses[string(p.primary.Bytes())])
} else {
require.Nil(t, statuses[string(p.secondary.Bytes())].primary)
}
}
})
}
}
func BenchmarkAddrTracker(b *testing.B) {
cl := clock.NewMock()
t := newProbeManager(cl.Now)