addrsmanager: extract out addressing logic from basichost (#3075)

Benchmark for AllAddrs:

```
goos: linux
goarch: amd64
pkg: github.com/libp2p/go-libp2p
cpu: AMD Ryzen 7 7840U w/ Radeon  780M Graphics
BenchmarkAllAddrs-16               16737            122245 ns/op           21240 B/op        218 allocs/op
```
after:
```
goos: linux
goarch: amd64
pkg: github.com/libp2p/go-libp2p
cpu: AMD Ryzen 7 7840U w/ Radeon  780M Graphics
BenchmarkAllAddrs-16            11103236               105.7 ns/op           192 B/op          1 allocs/op
```
This commit is contained in:
sukun
2025-02-27 22:24:57 +05:30
committed by GitHub
parent 578af0c651
commit 5e6f217d84
6 changed files with 1095 additions and 387 deletions
+81 -361
View File
@@ -5,10 +5,8 @@ import (
"errors"
"fmt"
"io"
"net"
"slices"
"sync"
"sync/atomic"
"time"
"github.com/libp2p/go-libp2p/core/connmgr"
@@ -22,7 +20,6 @@ import (
"github.com/libp2p/go-libp2p/core/record"
"github.com/libp2p/go-libp2p/core/transport"
"github.com/libp2p/go-libp2p/p2p/host/autonat"
"github.com/libp2p/go-libp2p/p2p/host/basic/internal/backoff"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/host/pstoremanager"
"github.com/libp2p/go-libp2p/p2p/host/relaysvc"
@@ -35,8 +32,6 @@ import (
libp2pwebtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport"
"github.com/prometheus/client_golang/prometheus"
"github.com/libp2p/go-netroute"
logging "github.com/ipfs/go-log/v2"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
@@ -81,13 +76,10 @@ type BasicHost struct {
ids identify.IDService
hps *holepunch.Service
pings *ping.PingService
natmgr NATManager
cmgr connmgr.ConnManager
eventbus event.Bus
relayManager *relaysvc.RelayManager
AddrsFactory AddrsFactory
negtimeout time.Duration
emitters struct {
@@ -95,23 +87,16 @@ type BasicHost struct {
evtLocalAddrsUpdated event.Emitter
}
addrChangeChan chan struct{}
addrMu sync.RWMutex // protects the following fields
updateLocalIPv4Backoff backoff.ExpBackoff
updateLocalIPv6Backoff backoff.ExpBackoff
filteredInterfaceAddrs []ma.Multiaddr
allInterfaceAddrs []ma.Multiaddr
relayAddrs atomic.Pointer[[]ma.Multiaddr]
disableSignedPeerRecord bool
signKey crypto.PrivKey
caBook peerstore.CertifiedAddrBook
autoNat autonat.AutoNAT
autoNATMx sync.RWMutex
autoNat autonat.AutoNAT
autonatv2 *autonatv2.AutoNAT
autonatv2 *autonatv2.AutoNAT
addressManager *addrsManager
addrsUpdatedChan chan struct{}
}
var _ host.Host = (*BasicHost)(nil)
@@ -195,18 +180,13 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
psManager: psManager,
mux: msmux.NewMultistreamMuxer[protocol.ID](),
negtimeout: DefaultNegotiationTimeout,
AddrsFactory: DefaultAddrsFactory,
eventbus: opts.EventBus,
addrChangeChan: make(chan struct{}, 1),
ctx: hostCtx,
ctxCancel: cancel,
disableSignedPeerRecord: opts.DisableSignedPeerRecord,
addrsUpdatedChan: make(chan struct{}, 1),
}
var relayAddrs []ma.Multiaddr
h.relayAddrs.Store(&relayAddrs)
h.updateLocalIpAddr()
if h.emitters.evtLocalProtocolsUpdated, err = h.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}, eventbus.Stateful); err != nil {
return nil, err
}
@@ -214,32 +194,6 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
return nil, err
}
if !h.disableSignedPeerRecord {
cab, ok := peerstore.GetCertifiedAddrBook(n.Peerstore())
if !ok {
return nil, errors.New("peerstore should also be a certified address book")
}
h.caBook = cab
h.signKey = h.Peerstore().PrivKey(h.ID())
if h.signKey == nil {
return nil, errors.New("unable to access host key")
}
// persist a signed peer record for self to the peerstore.
rec := peer.PeerRecordFromAddrInfo(peer.AddrInfo{
ID: h.ID(),
Addrs: h.Addrs(),
})
ev, err := record.Seal(rec, h.signKey)
if err != nil {
return nil, fmt.Errorf("failed to create signed record for self: %w", err)
}
if _, err := cab.ConsumePeerRecord(ev, peerstore.PermanentAddrTTL); err != nil {
return nil, fmt.Errorf("failed to persist signed record to peerstore: %w", err)
}
}
if opts.MultistreamMuxer != nil {
h.mux = opts.MultistreamMuxer
}
@@ -267,6 +221,29 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
return nil, fmt.Errorf("failed to create Identify service: %s", err)
}
addrFactory := DefaultAddrsFactory
if opts.AddrsFactory != nil {
addrFactory = opts.AddrsFactory
}
var natmgr NATManager
if opts.NATManager != nil {
natmgr = opts.NATManager(h.Network())
}
var tfl func(ma.Multiaddr) transport.Transport
if s, ok := h.Network().(interface {
TransportForListening(ma.Multiaddr) transport.Transport
}); ok {
tfl = s.TransportForListening
}
h.addressManager, err = newAddrsManager(h.eventbus, natmgr, addrFactory, h.Network().ListenAddresses, tfl, h.ids, h.addrsUpdatedChan)
if err != nil {
return nil, fmt.Errorf("failed to create address service: %w", err)
}
// register to be notified when the network's listen addrs change,
// so we can update our address set and push events if needed
h.Network().Notify(h.addressManager.NetNotifee())
if opts.EnableHolePunching {
if opts.EnableMetrics {
hpOpts := []holepunch.Option{
@@ -274,16 +251,7 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
opts.HolePunchingOptions = append(hpOpts, opts.HolePunchingOptions...)
}
h.hps, err = holepunch.NewService(h, h.ids, func() []ma.Multiaddr {
addrs := h.AllAddrs()
if opts.AddrsFactory != nil {
addrs = slices.Clone(opts.AddrsFactory(addrs))
}
// AllAddrs may ignore observed addresses in favour of NAT mappings. Use both for hole punching.
addrs = append(addrs, h.ids.OwnObservedAddrs()...)
addrs = ma.Unique(addrs)
return slices.DeleteFunc(addrs, func(a ma.Multiaddr) bool { return !manet.IsPublicAddr(a) })
}, opts.HolePunchingOptions...)
h.hps, err = holepunch.NewService(h, h.ids, h.addressManager.HolePunchAddrs, opts.HolePunchingOptions...)
if err != nil {
return nil, fmt.Errorf("failed to create hole punch service: %w", err)
}
@@ -293,14 +261,6 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
h.negtimeout = opts.NegotiationTimeout
}
if opts.AddrsFactory != nil {
h.AddrsFactory = opts.AddrsFactory
}
if opts.NATManager != nil {
h.natmgr = opts.NATManager(n)
}
if opts.ConnManager == nil {
h.cmgr = &connmgr.NullConnMgr{}
} else {
@@ -334,114 +294,55 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
}
}
n.SetStreamHandler(h.newStreamHandler)
if !h.disableSignedPeerRecord {
h.signKey = h.Peerstore().PrivKey(h.ID())
cab, ok := peerstore.GetCertifiedAddrBook(h.Peerstore())
if !ok {
return nil, errors.New("peerstore should also be a certified address book")
}
h.caBook = cab
// register to be notified when the network's listen addrs change,
// so we can update our address set and push events if needed
listenHandler := func(network.Network, ma.Multiaddr) {
h.SignalAddressChange()
rec, err := h.makeSignedPeerRecord(h.addressManager.Addrs())
if err != nil {
return nil, fmt.Errorf("failed to create signed record for self: %w", err)
}
if _, err := h.caBook.ConsumePeerRecord(rec, peerstore.PermanentAddrTTL); err != nil {
return nil, fmt.Errorf("failed to persist signed record to peerstore: %w", err)
}
}
n.Notify(&network.NotifyBundle{
ListenF: listenHandler,
ListenCloseF: listenHandler,
})
n.SetStreamHandler(h.newStreamHandler)
return h, nil
}
func (h *BasicHost) updateLocalIpAddr() {
h.addrMu.Lock()
defer h.addrMu.Unlock()
h.filteredInterfaceAddrs = nil
h.allInterfaceAddrs = nil
// Try to use the default ipv4/6 addresses.
if r, err := netroute.New(); err != nil {
log.Debugw("failed to build Router for kernel's routing table", "error", err)
} else {
var localIPv4 net.IP
var ran bool
err, ran = h.updateLocalIPv4Backoff.Run(func() error {
_, _, localIPv4, err = r.Route(net.IPv4zero)
return err
})
if ran && err != nil {
log.Debugw("failed to fetch local IPv4 address", "error", err)
} else if ran && localIPv4.IsGlobalUnicast() {
maddr, err := manet.FromIP(localIPv4)
if err == nil {
h.filteredInterfaceAddrs = append(h.filteredInterfaceAddrs, maddr)
}
}
var localIPv6 net.IP
err, ran = h.updateLocalIPv6Backoff.Run(func() error {
_, _, localIPv6, err = r.Route(net.IPv6unspecified)
return err
})
if ran && err != nil {
log.Debugw("failed to fetch local IPv6 address", "error", err)
} else if ran && localIPv6.IsGlobalUnicast() {
maddr, err := manet.FromIP(localIPv6)
if err == nil {
h.filteredInterfaceAddrs = append(h.filteredInterfaceAddrs, maddr)
}
}
}
// Resolve the interface addresses
ifaceAddrs, err := manet.InterfaceMultiaddrs()
if err != nil {
// This usually shouldn't happen, but we could be in some kind
// of funky restricted environment.
log.Errorw("failed to resolve local interface addresses", "error", err)
// Add the loopback addresses to the filtered addrs and use them as the non-filtered addrs.
// Then bail. There's nothing else we can do here.
h.filteredInterfaceAddrs = append(h.filteredInterfaceAddrs, manet.IP4Loopback, manet.IP6Loopback)
h.allInterfaceAddrs = h.filteredInterfaceAddrs
return
}
for _, addr := range ifaceAddrs {
// Skip link-local addrs, they're mostly useless.
if !manet.IsIP6LinkLocal(addr) {
h.allInterfaceAddrs = append(h.allInterfaceAddrs, addr)
}
}
// If netroute failed to get us any interface addresses, use all of
// them.
if len(h.filteredInterfaceAddrs) == 0 {
// Add all addresses.
h.filteredInterfaceAddrs = h.allInterfaceAddrs
} else {
// Only add loopback addresses. Filter these because we might
// not _have_ an IPv6 loopback address.
for _, addr := range h.allInterfaceAddrs {
if manet.IsIPLoopback(addr) {
h.filteredInterfaceAddrs = append(h.filteredInterfaceAddrs, addr)
}
}
}
}
// Start starts background tasks in the host
// TODO: Return error and handle it in the caller?
func (h *BasicHost) Start() {
h.psManager.Start()
h.refCount.Add(1)
h.ids.Start()
if h.autonatv2 != nil {
err := h.autonatv2.Start()
if err != nil {
log.Errorf("autonat v2 failed to start: %s", err)
}
}
if err := h.addressManager.Start(); err != nil {
log.Errorf("address service failed to start: %s", err)
}
if !h.disableSignedPeerRecord {
// Ensure we have the correct peer record after Start returns
rec, err := h.makeSignedPeerRecord(h.addressManager.Addrs())
if err != nil {
log.Errorf("failed to create signed record: %w", err)
}
if _, err := h.caBook.ConsumePeerRecord(rec, peerstore.PermanentAddrTTL); err != nil {
log.Errorf("failed to persist signed record to peerstore: %w", err)
}
}
h.ids.Start()
h.refCount.Add(1)
go h.background()
}
@@ -493,16 +394,6 @@ func (h *BasicHost) newStreamHandler(s network.Stream) {
handle(protoID, s)
}
// SignalAddressChange signals to the host that it needs to determine whether our listen addresses have recently
// changed.
// Warning: this interface is unstable and may disappear in the future.
func (h *BasicHost) SignalAddressChange() {
select {
case h.addrChangeChan <- struct{}{}:
default:
}
}
func (h *BasicHost) makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddressesUpdated {
if prev == nil && current == nil {
return nil
@@ -601,39 +492,13 @@ func (h *BasicHost) background() {
}
}
addrSub, err := h.EventBus().Subscribe(new(event.EvtAutoRelayAddrsUpdated), eventbus.Name("basic host address loop"))
if err != nil {
log.Errorf("failed to subscribe to the EvtAutoRelayAddrs: %s", err)
return
}
defer addrSub.Close()
// periodically schedules an IdentifyPush to update our peers for changes
// in our address set (if needed)
ticker := time.NewTicker(addrChangeTickrInterval)
defer ticker.Stop()
for {
// Update our local IP addresses before checking our current addresses.
if len(h.network.ListenAddresses()) > 0 {
h.updateLocalIpAddr()
}
curr := h.Addrs()
emitAddrChange(curr, lastAddrs)
lastAddrs = curr
select {
case <-ticker.C:
case <-h.addrChangeChan:
case e := <-addrSub.Out():
if evt, ok := e.(event.EvtAutoRelayAddrsUpdated); ok {
// Copy to a new slice. Copying to the slice pointed to by relayAddrs
// will introduce a race.
addrs := slices.Clone(evt.RelayAddrs)
h.relayAddrs.Store(&addrs)
} else {
log.Errorf("received unexpected event: %T %v", e, e)
}
case <-h.addrsUpdatedChan:
case <-h.ctx.Done():
return
}
@@ -864,19 +729,7 @@ func (h *BasicHost) ConnManager() connmgr.ConnManager {
// When used with AutoRelay, and if the host is not publicly reachable,
// this will only have host's private, relay, and no public addresses.
func (h *BasicHost) Addrs() []ma.Multiaddr {
addrs := h.AllAddrs()
// Make a copy. Consumers can modify the slice elements
if h.GetAutoNat() != nil && h.GetAutoNat().Status() == network.ReachabilityPrivate {
relayAddrsPtr := h.relayAddrs.Load()
// Only remove public addresses if we have relay addresses.
if relayAddrsPtr != nil && len(*relayAddrsPtr) > 0 {
addrs = slices.DeleteFunc(addrs, func(a ma.Multiaddr) bool { return manet.IsPublicAddr(a) })
addrs = append(addrs, *relayAddrsPtr...)
}
}
addrs = slices.Clone(h.AddrsFactory(addrs))
// Add certhashes for the addresses provided by the user via address factory.
return h.addCertHashes(ma.Unique(addrs))
return h.addressManager.Addrs()
}
// NormalizeMultiaddr returns a multiaddr suitable for equality checks.
@@ -896,149 +749,9 @@ func (h *BasicHost) NormalizeMultiaddr(addr ma.Multiaddr) ma.Multiaddr {
return addr
}
var p2pCircuitAddr = ma.StringCast("/p2p-circuit")
// AllAddrs returns all the addresses the host is listening on except circuit addresses.
func (h *BasicHost) AllAddrs() []ma.Multiaddr {
listenAddrs := h.Network().ListenAddresses()
if len(listenAddrs) == 0 {
return nil
}
h.addrMu.RLock()
filteredIfaceAddrs := h.filteredInterfaceAddrs
allIfaceAddrs := h.allInterfaceAddrs
h.addrMu.RUnlock()
// Iterate over all _unresolved_ listen addresses, resolving our primary
// interface only to avoid advertising too many addresses.
finalAddrs := make([]ma.Multiaddr, 0, 8)
if resolved, err := manet.ResolveUnspecifiedAddresses(listenAddrs, filteredIfaceAddrs); err != nil {
// This can happen if we're listening on no addrs, or listening
// on IPv6 addrs, but only have IPv4 interface addrs.
log.Debugw("failed to resolve listen addrs", "error", err)
} else {
finalAddrs = append(finalAddrs, resolved...)
}
finalAddrs = ma.Unique(finalAddrs)
// use nat mappings if we have them
if h.natmgr != nil && h.natmgr.HasDiscoveredNAT() {
// We have successfully mapped ports on our NAT. Use those
// instead of observed addresses (mostly).
// Next, apply this mapping to our addresses.
for _, listen := range listenAddrs {
extMaddr := h.natmgr.GetMapping(listen)
if extMaddr == nil {
// not mapped
continue
}
// if the router reported a sane address
if !manet.IsIPUnspecified(extMaddr) {
// Add in the mapped addr.
finalAddrs = append(finalAddrs, extMaddr)
} else {
log.Warn("NAT device reported an unspecified IP as it's external address")
}
// Did the router give us a routable public addr?
if manet.IsPublicAddr(extMaddr) {
// well done
continue
}
// No.
// in case the router gives us a wrong address or we're behind a double-NAT.
// also add observed addresses
resolved, err := manet.ResolveUnspecifiedAddress(listen, allIfaceAddrs)
if err != nil {
// This can happen if we try to resolve /ip6/::/...
// without any IPv6 interface addresses.
continue
}
for _, addr := range resolved {
// Now, check if we have any observed addresses that
// differ from the one reported by the router. Routers
// don't always give the most accurate information.
observed := h.ids.ObservedAddrsFor(addr)
if len(observed) == 0 {
continue
}
// Drop the IP from the external maddr
_, extMaddrNoIP := ma.SplitFirst(extMaddr)
for _, obsMaddr := range observed {
// Extract a public observed addr.
ip, _ := ma.SplitFirst(obsMaddr)
if ip == nil || !manet.IsPublicAddr(ip.Multiaddr()) {
continue
}
finalAddrs = append(finalAddrs, ip.Encapsulate(extMaddrNoIP))
}
}
}
} else {
var observedAddrs []ma.Multiaddr
if h.ids != nil {
observedAddrs = h.ids.OwnObservedAddrs()
}
finalAddrs = append(finalAddrs, observedAddrs...)
}
finalAddrs = ma.Unique(finalAddrs)
// Remove /p2p-circuit addresses from the list.
// The p2p-circuit tranport listener reports its address as just /p2p-circuit
// This is useless for dialing. Users need to manage their circuit addresses themselves,
// or use AutoRelay.
finalAddrs = slices.DeleteFunc(finalAddrs, func(a ma.Multiaddr) bool {
return a.Equal(p2pCircuitAddr)
})
// Add certhashes for /webrtc-direct, /webtransport, etc addresses discovered
// using identify.
finalAddrs = h.addCertHashes(finalAddrs)
return finalAddrs
}
// addCertHashes adds certhashes to the relevant addresses. It modifies `addrs` in place.
func (h *BasicHost) addCertHashes(addrs []ma.Multiaddr) []ma.Multiaddr {
// This is a temporary workaround/hack that fixes #2233. Once we have a
// proper address pipeline, rework this. See the issue for more context.
type transportForListeninger interface {
TransportForListening(a ma.Multiaddr) transport.Transport
}
type addCertHasher interface {
AddCertHashes(m ma.Multiaddr) (ma.Multiaddr, bool)
}
s, ok := h.Network().(transportForListeninger)
if !ok {
return addrs
}
for i, addr := range addrs {
wtOK, wtN := libp2pwebtransport.IsWebtransportMultiaddr(addr)
webrtcOK, webrtcN := libp2pwebrtc.IsWebRTCDirectMultiaddr(addr)
if (wtOK && wtN == 0) || (webrtcOK && webrtcN == 0) {
t := s.TransportForListening(addr)
tpt, ok := t.(addCertHasher)
if !ok {
continue
}
addrWithCerthash, added := tpt.AddCertHashes(addr)
if !added {
log.Debugf("Couldn't add certhashes to multiaddr: %s", addr)
continue
}
addrs[i] = addrWithCerthash
}
}
return addrs
return h.addressManager.DirectAddrs()
}
func trimHostAddrList(addrs []ma.Multiaddr, maxSize int) []ma.Multiaddr {
@@ -1095,30 +808,37 @@ func trimHostAddrList(addrs []ma.Multiaddr, maxSize int) []ma.Multiaddr {
// SetAutoNat sets the autonat service for the host.
func (h *BasicHost) SetAutoNat(a autonat.AutoNAT) {
h.addrMu.Lock()
defer h.addrMu.Unlock()
h.autoNATMx.Lock()
defer h.autoNATMx.Unlock()
if h.autoNat == nil {
h.autoNat = a
}
}
// GetAutoNat returns the host's AutoNAT service, if AutoNAT is enabled.
//
// Deprecated: Use `BasicHost.Reachability` to get the host's reachability.
func (h *BasicHost) GetAutoNat() autonat.AutoNAT {
h.addrMu.Lock()
defer h.addrMu.Unlock()
h.autoNATMx.Lock()
defer h.autoNATMx.Unlock()
return h.autoNat
}
// Reachability returns the host's reachability status.
func (h *BasicHost) Reachability() network.Reachability {
return *h.addressManager.hostReachability.Load()
}
// Close shuts down the Host's services (network, etc).
func (h *BasicHost) Close() error {
h.closeSync.Do(func() {
h.ctxCancel()
if h.natmgr != nil {
h.natmgr.Close()
}
if h.cmgr != nil {
h.cmgr.Close()
}
h.addressManager.Close()
if h.ids != nil {
h.ids.Close()
}