basichost: move EvtLocalAddrsChanged to addrs_manager (#3355)

This commit is contained in:
sukun
2025-09-30 16:20:38 +05:30
committed by GitHub
parent e1218c52d6
commit 3e69227598
4 changed files with 338 additions and 261 deletions
+253 -28
View File
@@ -11,8 +11,12 @@ import (
"sync/atomic"
"time"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/record"
"github.com/libp2p/go-libp2p/p2p/host/basic/internal/backoff"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-netroute"
@@ -26,6 +30,13 @@ const maxObservedAddrsPerListenAddr = 3
// addrChangeTickrInterval is the interval to recompute host addrs.
var addrChangeTickrInterval = 5 * time.Second
const maxPeerRecordSize = 8 * 1024 // 8k to be compatible with identify's limit
// addrStore is a minimal interface for storing peer addresses
type addrStore interface {
SetAddrs(peer.ID, []ma.Multiaddr, time.Duration)
}
// ObservedAddrsManager maps our local listen addrs to externally observed addrs.
type ObservedAddrsManager interface {
Addrs(minObservers int) []ma.Multiaddr
@@ -51,9 +62,6 @@ type addrsManager struct {
interfaceAddrs *interfaceAddrsCache
addrsReachabilityTracker *addrsReachabilityTracker
// addrsUpdatedChan is notified when addrs change. This is provided by the caller.
addrsUpdatedChan chan struct{}
// triggerAddrsUpdateChan is used to trigger an addresses update.
triggerAddrsUpdateChan chan chan struct{}
// started is used to check whether the addrsManager has started.
@@ -66,6 +74,11 @@ type addrsManager struct {
addrsMx sync.RWMutex
currentAddrs hostAddrs
signKey crypto.PrivKey
addrStore addrStore
signedRecordStore peerstore.CertifiedAddrBook
hostID peer.ID
wg sync.WaitGroup
ctx context.Context
ctxCancel context.CancelFunc
@@ -78,10 +91,13 @@ func newAddrsManager(
listenAddrs func() []ma.Multiaddr,
addCertHashes func([]ma.Multiaddr) []ma.Multiaddr,
observedAddrsManager ObservedAddrsManager,
addrsUpdatedChan chan struct{},
client autonatv2Client,
enableMetrics bool,
registerer prometheus.Registerer,
disableSignedPeerRecord bool,
signKey crypto.PrivKey,
addrStore addrStore,
hostID peer.ID,
) (*addrsManager, error) {
ctx, cancel := context.WithCancel(context.Background())
as := &addrsManager{
@@ -93,14 +109,24 @@ func newAddrsManager(
addrsFactory: addrsFactory,
triggerAddrsUpdateChan: make(chan chan struct{}, 1),
triggerReachabilityUpdate: make(chan struct{}, 1),
addrsUpdatedChan: addrsUpdatedChan,
interfaceAddrs: &interfaceAddrsCache{},
signKey: signKey,
addrStore: addrStore,
hostID: hostID,
ctx: ctx,
ctxCancel: cancel,
}
unknownReachability := network.ReachabilityUnknown
as.hostReachability.Store(&unknownReachability)
if !disableSignedPeerRecord {
var ok bool
as.signedRecordStore, ok = as.addrStore.(peerstore.CertifiedAddrBook)
if !ok {
return nil, errors.New("peerstore doesn't implement CertifiedAddrBook interface")
}
}
if client != nil {
var metricsTracker MetricsTracker
if enableMetrics {
@@ -118,7 +144,14 @@ func (a *addrsManager) Start() error {
return fmt.Errorf("error starting addrs reachability tracker: %s", err)
}
}
return a.startBackgroundWorker()
if err := a.startBackgroundWorker(); err != nil {
return fmt.Errorf("error starting background worker: %s", err)
}
// this ensures that listens concurrent with Start are reflected correctly after Start exits.
a.started.Store(true)
a.updateAddrsSync()
return nil
}
func (a *addrsManager) Close() {
@@ -183,6 +216,15 @@ func (a *addrsManager) startBackgroundWorker() (retErr error) {
mc.Close(),
)
}
mc = append(mc, emitter)
localAddrsEmitter, err := a.bus.Emitter(new(event.EvtLocalAddressesUpdated), eventbus.Stateful)
if err != nil {
return errors.Join(
fmt.Errorf("error creating local addrs emitter: %s", err),
mc.Close(),
)
}
var relayAddrs []ma.Multiaddr
// update relay addrs in case we're private
@@ -201,19 +243,18 @@ func (a *addrsManager) startBackgroundWorker() (retErr error) {
}
default:
}
// this ensures that listens concurrent with Start are reflected correctly after Start exits.
a.started.Store(true)
// update addresses before starting the worker loop. This ensures that any address updates
// before calling addrsManager.Start are correctly reported after Start returns.
a.updateAddrs(relayAddrs)
a.wg.Add(1)
go a.background(autoRelayAddrsSub, autonatReachabilitySub, emitter, relayAddrs)
go a.background(autoRelayAddrsSub, autonatReachabilitySub, emitter, localAddrsEmitter, relayAddrs)
return nil
}
func (a *addrsManager) background(autoRelayAddrsSub, autonatReachabilitySub event.Subscription,
emitter event.Emitter, relayAddrs []ma.Multiaddr,
func (a *addrsManager) background(
autoRelayAddrsSub,
autonatReachabilitySub event.Subscription,
emitter event.Emitter,
localAddrsEmitter event.Emitter,
relayAddrs []ma.Multiaddr,
) {
defer a.wg.Done()
defer func() {
@@ -229,20 +270,17 @@ func (a *addrsManager) background(autoRelayAddrsSub, autonatReachabilitySub even
if err != nil {
log.Warn("error closing host reachability emitter", "err", err)
}
err = localAddrsEmitter.Close()
if err != nil {
log.Warn("error closing local addrs emitter", "err", err)
}
}()
ticker := time.NewTicker(addrChangeTickrInterval)
defer ticker.Stop()
var previousAddrs hostAddrs
var notifCh chan struct{}
notifCh := make(chan struct{})
for {
currAddrs := a.updateAddrs(relayAddrs)
if notifCh != nil {
close(notifCh)
notifCh = nil
}
a.notifyAddrsChanged(emitter, previousAddrs, currAddrs)
previousAddrs = currAddrs
select {
case <-ticker.C:
case notifCh = <-a.triggerAddrsUpdateChan:
@@ -258,13 +296,21 @@ func (a *addrsManager) background(autoRelayAddrsSub, autonatReachabilitySub even
case <-a.ctx.Done():
return
}
currAddrs := a.updateAddrs(previousAddrs, relayAddrs)
if notifCh != nil {
close(notifCh)
notifCh = nil
}
a.notifyAddrsUpdated(emitter, localAddrsEmitter, previousAddrs, currAddrs)
previousAddrs = currAddrs
}
}
// updateAddrs updates the addresses of the host and returns the new updated
// addrs. This must only be called from the background goroutine or from the Start method otherwise
// we may end up with stale addrs.
func (a *addrsManager) updateAddrs(relayAddrs []ma.Multiaddr) hostAddrs {
func (a *addrsManager) updateAddrs(prevHostAddrs hostAddrs, relayAddrs []ma.Multiaddr) hostAddrs {
localAddrs := a.getLocalAddrs()
var currReachableAddrs, currUnreachableAddrs, currUnknownAddrs []ma.Multiaddr
if a.addrsReachabilityTracker != nil {
@@ -273,6 +319,11 @@ func (a *addrsManager) updateAddrs(relayAddrs []ma.Multiaddr) hostAddrs {
relayAddrs = slices.Clone(relayAddrs)
currAddrs := a.getAddrs(slices.Clone(localAddrs), relayAddrs)
if areAddrsDifferent(prevHostAddrs.addrs, currAddrs) {
_, _, removed := diffAddrs(prevHostAddrs.addrs, currAddrs)
a.updatePeerStore(currAddrs, removed)
}
a.addrsMx.Lock()
a.currentAddrs = hostAddrs{
addrs: append(a.currentAddrs.addrs[:0], currAddrs...),
@@ -294,7 +345,32 @@ func (a *addrsManager) updateAddrs(relayAddrs []ma.Multiaddr) hostAddrs {
}
}
func (a *addrsManager) notifyAddrsChanged(emitter event.Emitter, previous, current hostAddrs) {
// updatePeerStore updates the peer store for the host
func (a *addrsManager) updatePeerStore(currentAddrs []ma.Multiaddr, removedAddrs []ma.Multiaddr) {
// update host addresses in the peer store
a.addrStore.SetAddrs(a.hostID, currentAddrs, peerstore.PermanentAddrTTL)
a.addrStore.SetAddrs(a.hostID, removedAddrs, 0)
var sr *record.Envelope
// Our addresses have changed.
// store the signed peer record in the peer store.
if a.signedRecordStore != nil {
var err error
// add signed peer record to the event
// in case of an error drop this event.
sr, err = a.makeSignedPeerRecord(currentAddrs)
if err != nil {
log.Error("error creating a signed peer record from the set of current addresses", "err", err)
return
}
if _, err := a.signedRecordStore.ConsumePeerRecord(sr, peerstore.PermanentAddrTTL); err != nil {
log.Error("failed to persist signed peer record in peer store", "err", err)
return
}
}
}
func (a *addrsManager) notifyAddrsUpdated(emitter event.Emitter, localAddrsEmitter event.Emitter, previous, current hostAddrs) {
if areAddrsDifferent(previous.localAddrs, current.localAddrs) {
log.Debug("host local addresses updated", "addrs", current.localAddrs)
if a.addrsReachabilityTracker != nil {
@@ -303,10 +379,7 @@ func (a *addrsManager) notifyAddrsChanged(emitter event.Emitter, previous, curre
}
if areAddrsDifferent(previous.addrs, current.addrs) {
log.Debug("host addresses updated", "addrs", current.localAddrs)
select {
case a.addrsUpdatedChan <- struct{}{}:
default:
}
a.emitLocalAddrsUpdated(localAddrsEmitter, current.addrs, previous.addrs)
}
// We *must* send both reachability changed and addrs changed events from the
@@ -489,6 +562,76 @@ func (a *addrsManager) appendObservedAddrs(dst []ma.Multiaddr, listenAddrs, ifac
return dst
}
// makeSignedPeerRecord creates a signed peer record for the given addresses
func (a *addrsManager) makeSignedPeerRecord(addrs []ma.Multiaddr) (*record.Envelope, error) {
if a.signKey == nil {
return nil, errors.New("signKey is nil")
}
// Limit the length of currentAddrs to ensure that our signed peer records aren't rejected
peerRecordSize := 64 // HostID
k, err := a.signKey.Raw()
var nk int
if err == nil {
nk = len(k)
} else {
nk = 1024 // In case of error, use a large enough value.
}
peerRecordSize += 2 * nk // 1 for signature, 1 for public key
// we want the final address list to be small for keeping the signed peer record in size
addrs = trimHostAddrList(addrs, maxPeerRecordSize-peerRecordSize-256) // 256 B of buffer
rec := peer.PeerRecordFromAddrInfo(peer.AddrInfo{
ID: a.hostID,
Addrs: addrs,
})
return record.Seal(rec, a.signKey)
}
// emitLocalAddrsUpdated emits an EvtLocalAddressesUpdated event and updates the addresses in the peerstore.
func (a *addrsManager) emitLocalAddrsUpdated(emitter event.Emitter, currentAddrs []ma.Multiaddr, lastAddrs []ma.Multiaddr) {
added, maintained, removed := diffAddrs(lastAddrs, currentAddrs)
if len(added) == 0 && len(removed) == 0 {
return
}
var sr *record.Envelope
if a.signedRecordStore != nil {
sr = a.signedRecordStore.GetPeerRecord(a.hostID)
}
evt := &event.EvtLocalAddressesUpdated{
Diffs: true,
Current: make([]event.UpdatedAddress, 0, len(currentAddrs)),
Removed: make([]event.UpdatedAddress, 0, len(removed)),
SignedPeerRecord: sr,
}
for _, addr := range maintained {
evt.Current = append(evt.Current, event.UpdatedAddress{
Address: addr,
Action: event.Maintained,
})
}
for _, addr := range added {
evt.Current = append(evt.Current, event.UpdatedAddress{
Address: addr,
Action: event.Added,
})
}
for _, addr := range removed {
evt.Removed = append(evt.Removed, event.UpdatedAddress{
Address: addr,
Action: event.Removed,
})
}
// emit addr change event
if err := emitter.Emit(*evt); err != nil {
log.Warn("error emitting event for updated addrs", "err", err)
}
}
func areAddrsDifferent(prev, current []ma.Multiaddr) bool {
// TODO: make the sorted nature of ma.Unique a guarantee in multiaddrs
prev = ma.Unique(prev)
@@ -506,6 +649,88 @@ func areAddrsDifferent(prev, current []ma.Multiaddr) bool {
return false
}
// diffAddrs diffs prev and current addrs and returns added, maintained, and removed addrs.
// Both prev and current are expected to be sorted using ma.Compare()
func diffAddrs(prev, current []ma.Multiaddr) (added, maintained, removed []ma.Multiaddr) {
i, j := 0, 0
for i < len(prev) && j < len(current) {
cmp := prev[i].Compare(current[j])
switch {
case cmp < 0:
// prev < current
removed = append(removed, prev[i])
i++
case cmp > 0:
// current < prev
added = append(added, current[j])
j++
default:
maintained = append(maintained, current[j])
i++
j++
}
}
// All remaining current addresses are added
added = append(added, current[j:]...)
// All remaining previous addresses are removed
removed = append(removed, prev[i:]...)
return
}
// trimHostAddrList trims the address list to fit within the maximum size
func trimHostAddrList(addrs []ma.Multiaddr, maxSize int) []ma.Multiaddr {
totalSize := 0
for _, a := range addrs {
totalSize += len(a.Bytes())
}
if totalSize <= maxSize {
return addrs
}
score := func(addr ma.Multiaddr) int {
var res int
if manet.IsPublicAddr(addr) {
res |= 1 << 12
} else if !manet.IsIPLoopback(addr) {
res |= 1 << 11
}
var protocolWeight int
ma.ForEach(addr, func(c ma.Component) bool {
switch c.Protocol().Code {
case ma.P_QUIC_V1:
protocolWeight = 5
case ma.P_TCP:
protocolWeight = 4
case ma.P_WSS:
protocolWeight = 3
case ma.P_WEBTRANSPORT:
protocolWeight = 2
case ma.P_WEBRTC_DIRECT:
protocolWeight = 1
case ma.P_P2P:
return false
}
return true
})
res |= 1 << protocolWeight
return res
}
slices.SortStableFunc(addrs, func(a, b ma.Multiaddr) int {
return score(b) - score(a) // b-a for reverse order
})
totalSize = 0
for i, a := range addrs {
totalSize += len(a.Bytes())
if totalSize > maxSize {
addrs = addrs[:i]
break
}
}
return addrs
}
const interfaceAddrsCacheTTL = time.Minute
type interfaceAddrsCache struct {
+71 -2
View File
@@ -2,6 +2,7 @@ package basichost
import (
"context"
"crypto/rand"
"errors"
"fmt"
"slices"
@@ -9,9 +10,13 @@ import (
"testing"
"time"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
"github.com/libp2p/go-libp2p/p2p/protocol/autonatv2"
ma "github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multiaddr/matest"
@@ -52,6 +57,13 @@ func (m *mockObservedAddrs) AddrsFor(local ma.Multiaddr) []ma.Multiaddr { return
var _ ObservedAddrsManager = &mockObservedAddrs{}
type addrStoreArgs struct {
AddrStore addrStore
SignKey crypto.PrivKey
HostID peer.ID
DisableSignedPeerRecord bool
}
type addrsManagerArgs struct {
NATManager NATManager
AddrsFactory AddrsFactory
@@ -60,6 +72,7 @@ type addrsManagerArgs struct {
AddCertHashes func([]ma.Multiaddr) []ma.Multiaddr
AutoNATClient autonatv2Client
Bus event.Bus
AddrStoreArgs addrStoreArgs
}
type addrsManagerTestCase struct {
@@ -76,7 +89,6 @@ func newAddrsManagerTestCase(tb testing.TB, args addrsManagerArgs) addrsManagerT
if args.AddrsFactory == nil {
args.AddrsFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr { return addrs }
}
addrsUpdatedChan := make(chan struct{}, 1)
addCertHashes := func(addrs []ma.Multiaddr) []ma.Multiaddr {
return addrs
@@ -84,6 +96,18 @@ func newAddrsManagerTestCase(tb testing.TB, args addrsManagerArgs) addrsManagerT
if args.AddCertHashes != nil {
addCertHashes = args.AddCertHashes
}
signKey := args.AddrStoreArgs.SignKey
addrStore := args.AddrStoreArgs.AddrStore
pid := args.AddrStoreArgs.HostID
if args.AddrStoreArgs == (addrStoreArgs{}) {
var err error
signKey, _, err = crypto.GenerateEd25519Key(rand.Reader)
require.NoError(tb, err)
addrStore, err = pstoremem.NewPeerstore()
require.NoError(tb, err)
pid, err = peer.IDFromPrivateKey(signKey)
require.NoError(tb, err)
}
am, err := newAddrsManager(
eb,
args.NATManager,
@@ -91,10 +115,13 @@ func newAddrsManagerTestCase(tb testing.TB, args addrsManagerArgs) addrsManagerT
args.ListenAddrs,
addCertHashes,
args.ObservedAddrsManager,
addrsUpdatedChan,
args.AutoNATClient,
true,
prometheus.DefaultRegisterer,
false,
signKey,
addrStore,
pid,
)
require.NoError(tb, err)
@@ -176,6 +203,7 @@ func TestAddrsManager(t *testing.T) {
assert.ElementsMatch(collect, am.Addrs(), expected, "%s\n%s", am.Addrs(), expected)
}, 5*time.Second, 50*time.Millisecond)
})
t.Run("nat returns unspecified addr", func(t *testing.T) {
quicPort1 := ma.StringCast("/ip4/3.3.3.3/udp/1/quic-v1")
// port from nat, IP from observed addr
@@ -417,6 +445,47 @@ func TestAddrsManagerReachabilityEvent(t *testing.T) {
}
}
func TestAddrsManagerPeerstoreUpdated(t *testing.T) {
quic1 := ma.StringCast("/ip4/1.2.3.4/udp/1234/quic-v1")
quic2 := ma.StringCast("/ip4/1.2.3.5/udp/1/quic-v1")
pstore, err := pstoremem.NewPeerstore()
require.NoError(t, err)
cab, _ := peerstore.GetCertifiedAddrBook(pstore)
signKey, _, err := crypto.GenerateEd25519Key(rand.Reader)
require.NoError(t, err)
pid, err := peer.IDFromPrivateKey(signKey)
require.NoError(t, err)
var update atomic.Bool
am := newAddrsManagerTestCase(t, addrsManagerArgs{
ListenAddrs: func() []ma.Multiaddr { return nil },
AddrsFactory: func([]ma.Multiaddr) []ma.Multiaddr {
if !update.Load() {
return []ma.Multiaddr{quic1}
}
return []ma.Multiaddr{quic2}
},
AddrStoreArgs: addrStoreArgs{
AddrStore: pstore,
HostID: pid,
SignKey: signKey,
},
})
defer am.Close()
matest.AssertEqualMultiaddrs(t, []ma.Multiaddr{quic1}, pstore.Addrs(pid))
ev := cab.GetPeerRecord(pid)
pr := peerRecordFromEnvelope(t, ev)
require.Equal(t, pr.Addrs, []ma.Multiaddr{quic1})
update.Store(true)
am.updateAddrsSync()
matest.AssertEqualMultiaddrs(t, []ma.Multiaddr{quic2}, pstore.Addrs(pid))
ev = cab.GetPeerRecord(pid)
pr = peerRecordFromEnvelope(t, ev)
require.Equal(t, pr.Addrs, []ma.Multiaddr{quic2})
}
func TestRemoveIfNotInSource(t *testing.T) {
var addrs []ma.Multiaddr
for i := 0; i < 10; i++ {
+14 -222
View File
@@ -6,19 +6,16 @@ import (
"fmt"
"io"
"log/slog"
"slices"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/record"
"github.com/libp2p/go-libp2p/p2p/host/autonat"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/host/pstoremanager"
@@ -32,7 +29,6 @@ import (
logging "github.com/libp2p/go-libp2p/gologshim"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
msmux "github.com/multiformats/go-multistream"
)
@@ -46,8 +42,6 @@ var (
DefaultAddrsFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr { return addrs }
)
const maxPeerRecordSize = 8 * 1024 // 8k to be compatible with identify's limit
// AddrsFactory functions can be passed to New in order to override
// addresses returned by Addrs.
type AddrsFactory func([]ma.Multiaddr) []ma.Multiaddr
@@ -79,19 +73,13 @@ type BasicHost struct {
emitters struct {
evtLocalProtocolsUpdated event.Emitter
evtLocalAddrsUpdated event.Emitter
}
disableSignedPeerRecord bool
signKey crypto.PrivKey
caBook peerstore.CertifiedAddrBook
autoNATMx sync.RWMutex
autoNat autonat.AutoNAT
autonatv2 *autonatv2.AutoNAT
addressManager *addrsManager
addrsUpdatedChan chan struct{}
autonatv2 *autonatv2.AutoNAT
addressManager *addrsManager
}
var _ host.Host = (*BasicHost)(nil)
@@ -173,23 +161,18 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
hostCtx, cancel := context.WithCancel(context.Background())
h := &BasicHost{
network: n,
psManager: psManager,
mux: msmux.NewMultistreamMuxer[protocol.ID](),
negtimeout: DefaultNegotiationTimeout,
eventbus: opts.EventBus,
ctx: hostCtx,
ctxCancel: cancel,
disableSignedPeerRecord: opts.DisableSignedPeerRecord,
addrsUpdatedChan: make(chan struct{}, 1),
network: n,
psManager: psManager,
mux: msmux.NewMultistreamMuxer[protocol.ID](),
negtimeout: DefaultNegotiationTimeout,
eventbus: opts.EventBus,
ctx: hostCtx,
ctxCancel: cancel,
}
if h.emitters.evtLocalProtocolsUpdated, err = h.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}, eventbus.Stateful); err != nil {
return nil, err
}
if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}, eventbus.Stateful); err != nil {
return nil, err
}
if opts.MultistreamMuxer != nil {
h.mux = opts.MultistreamMuxer
@@ -201,7 +184,7 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
}
// we can't set this as a default above because it depends on the *BasicHost.
if h.disableSignedPeerRecord {
if opts.DisableSignedPeerRecord {
idOpts = append(idOpts, identify.DisableSignedPeerRecord())
}
if opts.EnableMetrics {
@@ -251,10 +234,13 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
h.Network().ListenAddresses,
addCertHashesFunc,
opts.ObservedAddrsManager,
h.addrsUpdatedChan,
autonatv2Client,
opts.EnableMetrics,
opts.PrometheusRegisterer,
opts.DisableSignedPeerRecord,
h.Peerstore().PrivKey(h.ID()),
h.Peerstore(),
h.ID(),
)
if err != nil {
return nil, fmt.Errorf("failed to create address service: %w", err)
@@ -299,22 +285,6 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
h.pings = ping.NewPingService(h)
}
if !h.disableSignedPeerRecord {
h.signKey = h.Peerstore().PrivKey(h.ID())
cab, ok := peerstore.GetCertifiedAddrBook(h.Peerstore())
if !ok {
return nil, errors.New("peerstore should also be a certified address book")
}
h.caBook = cab
rec, err := h.makeSignedPeerRecord(h.addressManager.Addrs())
if err != nil {
return nil, fmt.Errorf("failed to create signed record for self: %w", err)
}
if _, err := h.caBook.ConsumePeerRecord(rec, peerstore.PermanentAddrTTL); err != nil {
return nil, fmt.Errorf("failed to persist signed record to peerstore: %w", err)
}
}
n.SetStreamHandler(h.newStreamHandler)
return h, nil
@@ -337,21 +307,7 @@ func (h *BasicHost) Start() {
log.Error("address service failed to start", "err", err)
}
if !h.disableSignedPeerRecord {
// Ensure we have the correct peer record after Start returns
rec, err := h.makeSignedPeerRecord(h.addressManager.Addrs())
if err != nil {
log.Error("failed to create signed record", "err", err)
}
if _, err := h.caBook.ConsumePeerRecord(rec, peerstore.PermanentAddrTTL); err != nil {
log.Error("failed to persist signed record to peerstore", "err", err)
}
}
h.ids.Start()
h.refCount.Add(1)
go h.background()
}
// newStreamHandler is the remote-opened stream handler for network.Network
@@ -402,117 +358,6 @@ func (h *BasicHost) newStreamHandler(s network.Stream) {
handle(protoID, s)
}
func (h *BasicHost) makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddressesUpdated {
if prev == nil && current == nil {
return nil
}
prevmap := make(map[string]ma.Multiaddr, len(prev))
currmap := make(map[string]ma.Multiaddr, len(current))
evt := &event.EvtLocalAddressesUpdated{Diffs: true}
addrsAdded := false
for _, addr := range prev {
prevmap[string(addr.Bytes())] = addr
}
for _, addr := range current {
currmap[string(addr.Bytes())] = addr
}
for _, addr := range currmap {
_, ok := prevmap[string(addr.Bytes())]
updated := event.UpdatedAddress{Address: addr}
if ok {
updated.Action = event.Maintained
} else {
updated.Action = event.Added
addrsAdded = true
}
evt.Current = append(evt.Current, updated)
delete(prevmap, string(addr.Bytes()))
}
for _, addr := range prevmap {
updated := event.UpdatedAddress{Action: event.Removed, Address: addr}
evt.Removed = append(evt.Removed, updated)
}
if !addrsAdded && len(evt.Removed) == 0 {
return nil
}
// Our addresses have changed. Make a new signed peer record.
if !h.disableSignedPeerRecord {
// add signed peer record to the event
sr, err := h.makeSignedPeerRecord(current)
if err != nil {
log.Error("error creating a signed peer record from the set of current addresses", "err", err)
// drop this change
return nil
}
evt.SignedPeerRecord = sr
}
return evt
}
func (h *BasicHost) makeSignedPeerRecord(addrs []ma.Multiaddr) (*record.Envelope, error) {
// Limit the length of currentAddrs to ensure that our signed peer records aren't rejected
peerRecordSize := 64 // HostID
k, err := h.signKey.Raw()
if err != nil {
peerRecordSize += 2 * len(k) // 1 for signature, 1 for public key
}
// we want the final address list to be small for keeping the signed peer record in size
addrs = trimHostAddrList(addrs, maxPeerRecordSize-peerRecordSize-256) // 256 B of buffer
rec := peer.PeerRecordFromAddrInfo(peer.AddrInfo{
ID: h.ID(),
Addrs: addrs,
})
return record.Seal(rec, h.signKey)
}
func (h *BasicHost) background() {
defer h.refCount.Done()
var lastAddrs []ma.Multiaddr
emitAddrChange := func(currentAddrs []ma.Multiaddr, lastAddrs []ma.Multiaddr) {
changeEvt := h.makeUpdatedAddrEvent(lastAddrs, currentAddrs)
if changeEvt == nil {
return
}
// Our addresses have changed.
// store the signed peer record in the peer store.
if !h.disableSignedPeerRecord {
if _, err := h.caBook.ConsumePeerRecord(changeEvt.SignedPeerRecord, peerstore.PermanentAddrTTL); err != nil {
log.Error("failed to persist signed peer record in peer store", "err", err)
return
}
}
// update host addresses in the peer store
removedAddrs := make([]ma.Multiaddr, 0, len(changeEvt.Removed))
for _, ua := range changeEvt.Removed {
removedAddrs = append(removedAddrs, ua.Address)
}
h.Peerstore().SetAddrs(h.ID(), currentAddrs, peerstore.PermanentAddrTTL)
h.Peerstore().SetAddrs(h.ID(), removedAddrs, 0)
// emit addr change event
if err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt); err != nil {
log.Warn("error emitting event for updated addrs", "err", err)
}
}
for {
curr := h.Addrs()
emitAddrChange(curr, lastAddrs)
lastAddrs = curr
select {
case <-h.addrsUpdatedChan:
case <-h.ctx.Done():
return
}
}
}
// ID returns the (local) peer.ID associated with this Host
func (h *BasicHost) ID() peer.ID {
return h.Network().LocalPeer()
@@ -755,58 +600,6 @@ func (h *BasicHost) ConfirmedAddrs() (reachable []ma.Multiaddr, unreachable []ma
return h.addressManager.ConfirmedAddrs()
}
func trimHostAddrList(addrs []ma.Multiaddr, maxSize int) []ma.Multiaddr {
totalSize := 0
for _, a := range addrs {
totalSize += len(a.Bytes())
}
if totalSize <= maxSize {
return addrs
}
score := func(addr ma.Multiaddr) int {
var res int
if manet.IsPublicAddr(addr) {
res |= 1 << 12
} else if !manet.IsIPLoopback(addr) {
res |= 1 << 11
}
var protocolWeight int
ma.ForEach(addr, func(c ma.Component) bool {
switch c.Protocol().Code {
case ma.P_QUIC_V1:
protocolWeight = 5
case ma.P_TCP:
protocolWeight = 4
case ma.P_WSS:
protocolWeight = 3
case ma.P_WEBTRANSPORT:
protocolWeight = 2
case ma.P_WEBRTC_DIRECT:
protocolWeight = 1
case ma.P_P2P:
return false
}
return true
})
res |= 1 << protocolWeight
return res
}
slices.SortStableFunc(addrs, func(a, b ma.Multiaddr) int {
return score(b) - score(a) // b-a for reverse order
})
totalSize = 0
for i, a := range addrs {
totalSize += len(a.Bytes())
if totalSize > maxSize {
addrs = addrs[:i]
break
}
}
return addrs
}
// SetAutoNat sets the autonat service for the host.
func (h *BasicHost) SetAutoNat(a autonat.AutoNAT) {
h.autoNATMx.Lock()
@@ -855,7 +648,6 @@ func (h *BasicHost) Close() error {
}
_ = h.emitters.evtLocalProtocolsUpdated.Close()
_ = h.emitters.evtLocalAddrsUpdated.Close()
if err := h.network.Close(); err != nil {
log.Error("swarm close failed", "err", err)
-9
View File
@@ -8,7 +8,6 @@ import (
"reflect"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@@ -597,14 +596,7 @@ func TestAddrChangeImmediatelyIfAddressNonEmpty(t *testing.T) {
ctx := context.Background()
taddrs := []ma.Multiaddr{ma.StringCast("/ip4/1.2.3.4/tcp/1234")}
starting := make(chan struct{}, 1)
var count atomic.Int32
h, err := NewHost(swarmt.GenSwarm(t), &HostOpts{AddrsFactory: func(addrs []ma.Multiaddr) []ma.Multiaddr {
// The first call here is made from the constructor. Don't block.
if count.Add(1) == 1 {
return addrs
}
<-starting
return taddrs
}})
require.NoError(t, err)
@@ -615,7 +607,6 @@ func TestAddrChangeImmediatelyIfAddressNonEmpty(t *testing.T) {
t.Error(err)
}
defer sub.Close()
close(starting)
h.Start()
expected := event.EvtLocalAddressesUpdated{