mirror of
https://github.com/sigcn/pg.git
synced 2026-04-23 00:37:30 +08:00
disco: add a result cache for peer ready check
This commit is contained in:
+35
-8
@@ -12,6 +12,11 @@ import (
|
||||
"github.com/sigcn/pg/disco"
|
||||
)
|
||||
|
||||
type cache[T any] struct {
|
||||
result T
|
||||
time time.Time
|
||||
}
|
||||
|
||||
type peerkeeper struct {
|
||||
udpConn atomic.Pointer[net.UDPConn]
|
||||
peerID disco.PeerID
|
||||
@@ -23,6 +28,9 @@ type peerkeeper struct {
|
||||
keepaliveInterval time.Duration
|
||||
|
||||
statesMutex sync.RWMutex
|
||||
|
||||
// caches
|
||||
readyCache atomic.Value
|
||||
}
|
||||
|
||||
func (peer *peerkeeper) heartbeat(addr *net.UDPAddr) {
|
||||
@@ -55,16 +63,35 @@ func (peer *peerkeeper) healthcheck() {
|
||||
}
|
||||
}
|
||||
|
||||
// ready when peer context has at least one active udp address
|
||||
// ready when has at least one active udp address
|
||||
func (peer *peerkeeper) ready() bool {
|
||||
peer.statesMutex.RLock()
|
||||
defer peer.statesMutex.RUnlock()
|
||||
for _, state := range peer.states {
|
||||
if time.Since(state.LastActiveTime) <= peer.keepaliveInterval+2*time.Second {
|
||||
return true
|
||||
now := time.Now()
|
||||
doRealCheck := func() bool {
|
||||
peer.statesMutex.RLock()
|
||||
defer peer.statesMutex.RUnlock()
|
||||
for _, state := range peer.states {
|
||||
if now.Sub(state.LastActiveTime) <= peer.keepaliveInterval+2*time.Second {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// perform real-time checks within the first 20 seconds after creation,
|
||||
if now.Sub(peer.createTime) < 20*time.Second {
|
||||
return doRealCheck()
|
||||
}
|
||||
|
||||
// then cache for one heartbeat interval thereafter
|
||||
if value := peer.readyCache.Load(); value != nil {
|
||||
cache := value.(cache[bool])
|
||||
if now.Sub(cache.time) < peer.keepaliveInterval {
|
||||
return cache.result
|
||||
}
|
||||
}
|
||||
return false
|
||||
result := doRealCheck()
|
||||
peer.readyCache.Store(cache[bool]{result: result, time: now})
|
||||
return result
|
||||
}
|
||||
|
||||
// selectPeerUDP select one of the multiple UDP addresses discovered by the peer
|
||||
@@ -92,7 +119,7 @@ func (peer *peerkeeper) selectPeerUDP() *PeerState {
|
||||
func (peer *peerkeeper) writeUDP(p []byte) (int, error) {
|
||||
if peerState := peer.selectPeerUDP(); p != nil {
|
||||
slog.Log(context.Background(), -3, "[UDP] WriteTo", "peer", peer.peerID, "addr", peerState.Addr)
|
||||
if time.Since(peerState.LastActiveTime) > peer.keepaliveInterval+200*time.Millisecond {
|
||||
if time.Since(peerState.LastActiveTime) > peer.keepaliveInterval+time.Second {
|
||||
peer.udpConn.Load().WriteTo(p, peerState.Addr)
|
||||
return 0, ErrUDPConnInactive
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user