mirror of
https://github.com/libp2p/go-libp2p.git
synced 2026-04-22 16:17:19 +08:00
feat(peerstore): replace stale addrs on newer signed peer record
Treat ConsumePeerRecord as replace-semantics rather than merge: when a peer publishes a newer signed peer record, addrs that were present in the previously stored signed record but omitted from the new one are evicted, so the peerstore reflects the peer's current self-advertised set instead of the accumulated union. Unsigned addrs (e.g. DHT gossip or identify exchanges without a signed record) are untouched, and addrs held by a live connection (TTL >= ConnectedAddrTTL) are kept so active sessions are not torn down. Both backends recover the prior addr set by decoding the stored envelope: pstoremem reuses the Envelope already kept on peerRecordState; pstoreds fetches it via GetPeerRecord and drops superseded addrs with deleteAddrs. Envelope.Record() caches on first access, so the diff is cheap. The shared CertifiedAddresses assertion is updated to match the new behavior.
This commit is contained in:
@@ -282,7 +282,21 @@ func (ab *dsAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duratio
|
||||
|
||||
// ConsumePeerRecord adds addresses from a signed peer.PeerRecord (contained in
|
||||
// a record.Envelope), which will expire after the given TTL.
|
||||
// See https://godoc.org/github.com/libp2p/go-libp2p/core/peerstore#CertifiedAddrBook for more details.
|
||||
// See https://godoc.org/github.com/libp2p/go-libp2p/core/peerstore#CertifiedAddrBook
|
||||
// for more details.
|
||||
//
|
||||
// The signed peer record's Seq is treated as monotonic per peer: a record
|
||||
// with a Seq lower than the last accepted one is rejected. Equal Seq is
|
||||
// accepted as a TTL refresh.
|
||||
//
|
||||
// When a newer signed record is accepted, addrs that were present in the
|
||||
// previously stored signed record but absent in the new one are evicted, so
|
||||
// the peerstore reflects the peer's current self-advertised set instead of
|
||||
// the union of every record we have ever seen. Unsigned addrs (added via
|
||||
// AddAddr / SetAddr from sources like DHT gossip, or from an identify
|
||||
// exchange where the peer did not send a signed record) are not touched, and
|
||||
// addrs held by a live connection (TTL >= ConnectedAddrTTL) are also kept so
|
||||
// active sessions are not dropped.
|
||||
func (ab *dsAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, ttl time.Duration) (bool, error) {
|
||||
r, err := recordEnvelope.Record()
|
||||
if err != nil {
|
||||
@@ -303,6 +317,15 @@ func (ab *dsAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, ttl tim
|
||||
}
|
||||
|
||||
addrs := cleanAddrs(rec.Addrs, rec.PeerID)
|
||||
|
||||
// Diff against the previously stored signed record so we can drop addrs
|
||||
// the peer no longer advertises before adding the new ones.
|
||||
if superseded := ab.supersededSignedAddrs(rec.PeerID, addrs); len(superseded) > 0 {
|
||||
if err := ab.deleteAddrs(rec.PeerID, superseded); err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
err = ab.setAddrs(rec.PeerID, addrs, ttl, ttlExtend, true)
|
||||
if err != nil {
|
||||
return false, err
|
||||
@@ -315,6 +338,62 @@ func (ab *dsAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, ttl tim
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// supersededSignedAddrs returns addrs that were present in the previously
|
||||
// stored signed peer record for p but are absent in newAddrs. Addrs held by
|
||||
// a live connection (TTL >= ConnectedAddrTTL) are excluded so an active
|
||||
// session is not torn down when the peer rotates its advertised set.
|
||||
func (ab *dsAddrBook) supersededSignedAddrs(p peer.ID, newAddrs []ma.Multiaddr) []ma.Multiaddr {
|
||||
prevEnv := ab.GetPeerRecord(p)
|
||||
if prevEnv == nil {
|
||||
return nil
|
||||
}
|
||||
prev, err := prevEnv.Record()
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
prevRec, ok := prev.(*peer.PeerRecord)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
newSet := make(map[string]struct{}, len(newAddrs))
|
||||
for _, a := range newAddrs {
|
||||
newSet[string(a.Bytes())] = struct{}{}
|
||||
}
|
||||
|
||||
pr, err := ab.loadRecord(p, true, false)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
pr.RLock()
|
||||
connected := make(map[string]struct{})
|
||||
for _, a := range pr.Addrs {
|
||||
if ttlIsConnected(time.Duration(a.Ttl)) {
|
||||
connected[string(a.Addr)] = struct{}{}
|
||||
}
|
||||
}
|
||||
pr.RUnlock()
|
||||
|
||||
superseded := make([]ma.Multiaddr, 0, len(prevRec.Addrs))
|
||||
for _, a := range prevRec.Addrs {
|
||||
key := string(a.Bytes())
|
||||
if _, still := newSet[key]; still {
|
||||
continue
|
||||
}
|
||||
if _, isConn := connected[key]; isConn {
|
||||
continue
|
||||
}
|
||||
superseded = append(superseded, a)
|
||||
}
|
||||
return superseded
|
||||
}
|
||||
|
||||
// ttlIsConnected reports whether the given TTL marks the address as held by
|
||||
// a live connection.
|
||||
func ttlIsConnected(ttl time.Duration) bool {
|
||||
return ttl >= pstore.ConnectedAddrTTL
|
||||
}
|
||||
|
||||
func (ab *dsAddrBook) latestPeerRecordSeq(p peer.ID) uint64 {
|
||||
pr, err := ab.loadRecord(p, true, false)
|
||||
if err != nil {
|
||||
|
||||
@@ -5,12 +5,17 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/crypto"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
pstore "github.com/libp2p/go-libp2p/core/peerstore"
|
||||
"github.com/libp2p/go-libp2p/core/record"
|
||||
"github.com/libp2p/go-libp2p/core/test"
|
||||
pt "github.com/libp2p/go-libp2p/p2p/host/peerstore/test"
|
||||
|
||||
mockclock "github.com/benbjohnson/clock"
|
||||
ds "github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/sync"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
@@ -72,6 +77,60 @@ func TestDsAddrBook(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestDsConsumePeerRecordReplacesStaleAddrs verifies replace-semantics on a
|
||||
// newer signed peer record: addrs dropped from the new record are evicted,
|
||||
// while unsigned addrs and addrs held by a live connection are kept.
|
||||
func TestDsConsumePeerRecordReplacesStaleAddrs(t *testing.T) {
|
||||
for name, dsFactory := range dstores {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
opts := DefaultOpts()
|
||||
store, closeDs := dsFactory(t)
|
||||
defer closeDs()
|
||||
ab, err := NewAddrBook(context.Background(), store, opts)
|
||||
require.NoError(t, err)
|
||||
defer ab.Close()
|
||||
|
||||
priv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256)
|
||||
require.NoError(t, err)
|
||||
id, err := peer.IDFromPrivateKey(priv)
|
||||
require.NoError(t, err)
|
||||
|
||||
keep := ma.StringCast("/ip4/1.2.3.4/tcp/1")
|
||||
drop := ma.StringCast("/ip4/1.2.3.4/tcp/2")
|
||||
unsigned := ma.StringCast("/ip4/1.2.3.4/tcp/3")
|
||||
connected := ma.StringCast("/ip4/1.2.3.4/tcp/4")
|
||||
|
||||
rec1 := peer.NewPeerRecord()
|
||||
rec1.PeerID = id
|
||||
rec1.Seq = 1
|
||||
rec1.Addrs = []ma.Multiaddr{keep, drop, connected}
|
||||
env1, err := record.Seal(rec1, priv)
|
||||
require.NoError(t, err)
|
||||
|
||||
accepted, err := ab.ConsumePeerRecord(env1, time.Hour)
|
||||
require.NoError(t, err)
|
||||
require.True(t, accepted)
|
||||
|
||||
ab.AddAddr(id, connected, pstore.ConnectedAddrTTL)
|
||||
ab.AddAddr(id, unsigned, time.Hour)
|
||||
require.ElementsMatch(t, []ma.Multiaddr{keep, drop, connected, unsigned}, ab.Addrs(id))
|
||||
|
||||
rec2 := peer.NewPeerRecord()
|
||||
rec2.PeerID = id
|
||||
rec2.Seq = 2
|
||||
rec2.Addrs = []ma.Multiaddr{keep}
|
||||
env2, err := record.Seal(rec2, priv)
|
||||
require.NoError(t, err)
|
||||
|
||||
accepted, err = ab.ConsumePeerRecord(env2, time.Hour)
|
||||
require.NoError(t, err)
|
||||
require.True(t, accepted)
|
||||
|
||||
require.ElementsMatch(t, []ma.Multiaddr{keep, connected, unsigned}, ab.Addrs(id))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDsKeyBook(t *testing.T) {
|
||||
for name, dsFactory := range dstores {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
|
||||
@@ -44,7 +44,9 @@ func ttlIsConnected(ttl time.Duration) bool {
|
||||
|
||||
type peerRecordState struct {
|
||||
Envelope *record.Envelope
|
||||
Seq uint64
|
||||
// Seq is the sequence number from the stored signed peer record. Newer
|
||||
// records (higher Seq) supersede older ones for the same peer.
|
||||
Seq uint64
|
||||
}
|
||||
|
||||
// Essentially Go stdlib's Priority Queue example
|
||||
@@ -289,8 +291,23 @@ func (mab *memoryAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du
|
||||
mab.addAddrs(p, addrs, ttl)
|
||||
}
|
||||
|
||||
// ConsumePeerRecord adds addresses from a signed peer.PeerRecord, which will expire after the given TTL.
|
||||
// See https://godoc.org/github.com/libp2p/go-libp2p/core/peerstore#CertifiedAddrBook for more details.
|
||||
// ConsumePeerRecord adds addresses from a signed peer.PeerRecord, which will
|
||||
// expire after the given TTL. See
|
||||
// https://godoc.org/github.com/libp2p/go-libp2p/core/peerstore#CertifiedAddrBook
|
||||
// for more details.
|
||||
//
|
||||
// The signed peer record's Seq is treated as monotonic per peer: a record with
|
||||
// a Seq lower than the last accepted one is rejected. Equal Seq is accepted as
|
||||
// a TTL refresh.
|
||||
//
|
||||
// When a newer signed record is accepted, addrs that were present in the
|
||||
// previously stored signed record but absent in the new one are evicted, so
|
||||
// the peerstore reflects the peer's current self-advertised set instead of
|
||||
// the union of every record we have ever seen. Unsigned addrs (added via
|
||||
// AddAddr / SetAddr from sources like DHT gossip, or from an identify
|
||||
// exchange where the peer did not send a signed record) are not touched, and
|
||||
// addrs held by a live connection (TTL >= ConnectedAddrTTL) are also kept so
|
||||
// active sessions are not dropped.
|
||||
func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, ttl time.Duration) (bool, error) {
|
||||
r, err := recordEnvelope.Record()
|
||||
if err != nil {
|
||||
@@ -316,6 +333,33 @@ func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, tt
|
||||
if !found && len(mab.signedPeerRecords) >= mab.maxSignedPeerRecords {
|
||||
return false, errors.New("too many signed peer records")
|
||||
}
|
||||
|
||||
// Drop addrs from the previous signed record that are absent in the
|
||||
// new one; addrs held by a live connection are preserved so we don't
|
||||
// drop an active session if the peer rotates its advertised set. The
|
||||
// prior addr set is recovered by decoding the stored envelope; that
|
||||
// call caches on first access (core/record/envelope.go), so repeated
|
||||
// lookups are cheap.
|
||||
if found {
|
||||
if prevRec := prevSignedAddrs(lastState); len(prevRec) > 0 {
|
||||
newAddrSet := make(map[string]struct{}, len(rec.Addrs))
|
||||
for _, a := range rec.Addrs {
|
||||
newAddrSet[string(a.Bytes())] = struct{}{}
|
||||
}
|
||||
for _, a := range prevRec {
|
||||
key := string(a.Bytes())
|
||||
if _, still := newAddrSet[key]; still {
|
||||
continue
|
||||
}
|
||||
ea, ok := mab.addrs.Addrs[rec.PeerID][key]
|
||||
if !ok || ea.IsConnected() {
|
||||
continue
|
||||
}
|
||||
mab.addrs.Delete(ea)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mab.signedPeerRecords[rec.PeerID] = &peerRecordState{
|
||||
Envelope: recordEnvelope,
|
||||
Seq: rec.Seq,
|
||||
@@ -324,6 +368,24 @@ func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, tt
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// prevSignedAddrs returns the addrs from the stored signed peer record, or
|
||||
// nil if the envelope is absent or can't be decoded. Envelope.Record() caches
|
||||
// its result, so repeated calls are cheap.
|
||||
func prevSignedAddrs(s *peerRecordState) []ma.Multiaddr {
|
||||
if s == nil || s.Envelope == nil {
|
||||
return nil
|
||||
}
|
||||
r, err := s.Envelope.Record()
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
pr, ok := r.(*peer.PeerRecord)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return pr.Addrs
|
||||
}
|
||||
|
||||
func (mab *memoryAddrBook) maybeDeleteSignedPeerRecordUnlocked(p peer.ID) {
|
||||
if len(mab.addrs.Addrs[p]) == 0 {
|
||||
delete(mab.signedPeerRecords, p)
|
||||
|
||||
@@ -8,7 +8,11 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/crypto"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/core/peerstore"
|
||||
"github.com/libp2p/go-libp2p/core/record"
|
||||
"github.com/libp2p/go-libp2p/core/test"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@@ -186,6 +190,56 @@ func TestPeerLimits(t *testing.T) {
|
||||
require.Equal(t, 1024, ab.addrs.NumUnconnectedAddrs())
|
||||
}
|
||||
|
||||
// TestConsumePeerRecordReplacesStaleAddrs verifies replace-semantics on a
|
||||
// newer signed peer record: addrs dropped from the new record are evicted,
|
||||
// while unsigned addrs and addrs held by a live connection are kept.
|
||||
func TestConsumePeerRecordReplacesStaleAddrs(t *testing.T) {
|
||||
ab := NewAddrBook()
|
||||
defer ab.Close()
|
||||
|
||||
priv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256)
|
||||
require.NoError(t, err)
|
||||
id, err := peer.IDFromPrivateKey(priv)
|
||||
require.NoError(t, err)
|
||||
|
||||
keep := ma.StringCast("/ip4/1.2.3.4/tcp/1")
|
||||
drop := ma.StringCast("/ip4/1.2.3.4/tcp/2")
|
||||
unsigned := ma.StringCast("/ip4/1.2.3.4/tcp/3")
|
||||
connected := ma.StringCast("/ip4/1.2.3.4/tcp/4")
|
||||
|
||||
rec1 := peer.NewPeerRecord()
|
||||
rec1.PeerID = id
|
||||
rec1.Seq = 1
|
||||
rec1.Addrs = []ma.Multiaddr{keep, drop, connected}
|
||||
env1, err := record.Seal(rec1, priv)
|
||||
require.NoError(t, err)
|
||||
|
||||
accepted, err := ab.ConsumePeerRecord(env1, time.Hour)
|
||||
require.NoError(t, err)
|
||||
require.True(t, accepted)
|
||||
|
||||
// Pin `connected` via ConnectedAddrTTL and add an unsigned addr.
|
||||
ab.AddAddr(id, connected, peerstore.ConnectedAddrTTL)
|
||||
ab.AddAddr(id, unsigned, time.Hour)
|
||||
require.ElementsMatch(t, []ma.Multiaddr{keep, drop, connected, unsigned}, ab.Addrs(id))
|
||||
|
||||
// Newer record drops `drop` and only mentions `keep`. `drop` must go;
|
||||
// `unsigned` (never in a signed record) and `connected` (held by a
|
||||
// live connection) must stay.
|
||||
rec2 := peer.NewPeerRecord()
|
||||
rec2.PeerID = id
|
||||
rec2.Seq = 2
|
||||
rec2.Addrs = []ma.Multiaddr{keep}
|
||||
env2, err := record.Seal(rec2, priv)
|
||||
require.NoError(t, err)
|
||||
|
||||
accepted, err = ab.ConsumePeerRecord(env2, time.Hour)
|
||||
require.NoError(t, err)
|
||||
require.True(t, accepted)
|
||||
|
||||
require.ElementsMatch(t, []ma.Multiaddr{keep, connected, unsigned}, ab.Addrs(id))
|
||||
}
|
||||
|
||||
func BenchmarkPeerAddrs(b *testing.B) {
|
||||
sizes := [...]int{1, 10, 100, 1000, 10_000, 100_000, 1000_000}
|
||||
for _, sz := range sizes {
|
||||
|
||||
@@ -475,8 +475,8 @@ func testCertifiedAddresses(m pstore.AddrBook, clk *mockClock.Mock) func(*testin
|
||||
t.Error("unable to retrieve signed routing record from addrbook")
|
||||
}
|
||||
|
||||
// Adding a new envelope should clear existing certified addresses.
|
||||
// Only the newly-added ones should remain
|
||||
// A newer signed record drops addrs the peer no longer advertises.
|
||||
// Unsigned addrs (added via plain AddAddrs) are retained.
|
||||
certifiedAddrs = certifiedAddrs[:3]
|
||||
rec4 := peer.NewPeerRecord()
|
||||
rec4.PeerID = id
|
||||
@@ -488,8 +488,9 @@ func testCertifiedAddresses(m pstore.AddrBook, clk *mockClock.Mock) func(*testin
|
||||
if !accepted {
|
||||
t.Error("expected peer record to be accepted")
|
||||
}
|
||||
// AssertAddressesEqual(t, certifiedAddrs, m.Addrs(id))
|
||||
AssertAddressesEqual(t, allAddrs, m.Addrs(id))
|
||||
expectedAfterRec4 := append([]multiaddr.Multiaddr{}, certifiedAddrs...)
|
||||
expectedAfterRec4 = append(expectedAfterRec4, uncertifiedAddrs...)
|
||||
AssertAddressesEqual(t, expectedAfterRec4, m.Addrs(id))
|
||||
|
||||
// update TTL on signed addrs to -1 to remove them.
|
||||
// the signed routing record should be deleted
|
||||
|
||||
Reference in New Issue
Block a user