disco: detect network changed

This commit is contained in:
rkonfj 2024-03-29 22:45:52 +08:00
parent cf34867e14
commit 682d18ed6b
5 changed files with 106 additions and 25 deletions

View File

@ -90,7 +90,6 @@ func ListLocalIPs() ([]net.IP, error) {
if ignoredLocalCIDRs.Contains(ipnet.IP) {
continue
}
slog.Debug("AddLocalIP " + ipnet.IP.String())
ips = append(ips, ipnet.IP)
}
}

View File

@ -32,7 +32,11 @@ type UDPConn struct {
datagrams chan *Datagram
stunResponse chan []byte
udpAddrSends chan *PeerUDPAddrEvent
networkChanged chan struct{}
peerKeepaliveInterval time.Duration
udpPort int
disableIPv4 bool
disableIPv6 bool
id peer.ID
peersIndex map[peer.ID]*PeerContext
stunSessions cmap.ConcurrentMap[string, STUNSession]
@ -47,6 +51,11 @@ func (c *UDPConn) Close() error {
if c.upnpDeleteMapping != nil {
c.upnpDeleteMapping()
}
close(c.closedSig)
close(c.datagrams)
close(c.peersOPs)
close(c.stunResponse)
close(c.udpAddrSends)
return c.UDPConn.Close()
}
@ -58,6 +67,10 @@ func (c *UDPConn) UDPAddrSends() <-chan *PeerUDPAddrEvent {
return c.udpAddrSends
}
func (c *UDPConn) NetworkChangedEvents() <-chan struct{} {
return c.networkChanged
}
func (c *UDPConn) GenerateLocalAddrsSends(peerID peer.ID, stunServers []string) {
c.peersOPs <- &PeerOP{
Op: OP_PEER_DELETE,
@ -162,6 +175,60 @@ func (c *UDPConn) discoPing(peerID peer.ID, peerAddr *net.UDPAddr) {
c.UDPConn.WriteToUDP([]byte("_ping"+c.id), peerAddr)
}
func (c *UDPConn) updateLocalNetworkAddrs() bool {
ips, err := ListLocalIPs()
if err != nil {
slog.Error("ListLocalIPsFailed", "details", err)
return false
}
var detectIPs []string
for _, ip := range ips {
addr := net.JoinHostPort(ip.String(), fmt.Sprintf("%d", c.udpPort))
if ip.To4() != nil {
if c.disableIPv4 {
continue
}
} else {
if c.disableIPv6 {
continue
}
}
if slices.Contains(detectIPs, addr) {
continue
}
detectIPs = append(detectIPs, addr)
slog.Debug("AddLocalIP " + addr)
}
slices.Sort(detectIPs)
if !slices.Equal(c.localAddrs, detectIPs) {
c.localAddrs = detectIPs
c.peersIndexMutex.Lock()
clear(c.peersIndex)
c.peersIndexMutex.Unlock()
if len(c.networkChanged) == cap(c.networkChanged) {
return true
}
c.networkChanged <- struct{}{}
return true
}
return false
}
func (c *UDPConn) runNetworkChangedDetector() {
detectTicker := time.NewTicker(5 * time.Second)
for {
select {
case <-c.closedSig:
detectTicker.Stop()
return
case <-detectTicker.C:
if c.updateLocalNetworkAddrs() {
slog.Info("NetworkChanged")
}
}
}
}
func (c *UDPConn) runPacketEventLoop() {
buf := make([]byte, 65535)
for {
@ -265,7 +332,10 @@ func (c *UDPConn) runSTUNEventLoop() {
return
default:
}
stunResp := <-c.stunResponse
stunResp, ok := <-c.stunResponse
if !ok {
return
}
txid, saddr, err := stun.ParseResponse(stunResp)
if err != nil {
slog.Error("Skipped invalid stun response", "err", err.Error())
@ -415,10 +485,6 @@ func ListenUDP(port int, disableIPv4, disableIPv6 bool, id peer.ID) (*UDPConn, e
if err != nil {
return nil, fmt.Errorf("listen udp error: %w", err)
}
ips, err := ListLocalIPs()
if err != nil {
return nil, fmt.Errorf("list local ips error: %w", err)
}
udpConn := UDPConn{
id: id,
@ -428,27 +494,19 @@ func ListenUDP(port int, disableIPv4, disableIPv6 bool, id peer.ID) (*UDPConn, e
datagrams: make(chan *Datagram),
udpAddrSends: make(chan *PeerUDPAddrEvent, 10),
stunResponse: make(chan []byte, 10),
networkChanged: make(chan struct{}, 1),
peerKeepaliveInterval: 10 * time.Second,
udpPort: port,
disableIPv4: disableIPv4,
disableIPv6: disableIPv6,
peersIndex: make(map[peer.ID]*PeerContext),
stunSessions: cmap.New[STUNSession](),
}
for _, ip := range ips {
addr := net.JoinHostPort(ip.String(), fmt.Sprintf("%d", port))
if ip.To4() != nil {
if disableIPv4 {
continue
}
} else {
if disableIPv6 {
continue
}
}
udpConn.localAddrs = append(udpConn.localAddrs, addr)
}
go udpConn.runPacketEventLoop()
udpConn.updateLocalNetworkAddrs()
go udpConn.runPeerOPLoop()
go udpConn.runSTUNEventLoop()
go udpConn.runPacketEventLoop()
go udpConn.runPeersHealthcheckLoop()
go udpConn.runNetworkChangedDetector()
return &udpConn, nil
}

View File

@ -1,6 +1,7 @@
package disco
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
@ -216,7 +217,7 @@ func (c *WSConn) WriteTo(p []byte, peerID peer.ID, op byte) error {
}
func (c *WSConn) LeadDisco(peerID peer.ID) error {
slog.Debug("LeadDisco", "peer", peerID)
slog.Log(context.Background(), -3, "LeadDisco", "peer", peerID)
return c.WriteTo(nil, peerID, peer.CONTROL_LEAD_DISCO)
}

View File

@ -62,3 +62,8 @@ func (c *Cache[K, V]) Put(key K, value V) {
elem := c.list.PushFront(&entry[K, V]{key: key, value: value})
c.cache[key] = elem
}
func (c *Cache[K, V]) Clear() {
clear(c.cache)
c.list.Init()
}

View File

@ -194,14 +194,23 @@ func (c *PeerPacketConn) UDPConn() net.PacketConn {
func (c *PeerPacketConn) runControlEventLoop(wsConn *disco.WSConn, udpConn *disco.UDPConn) {
for {
select {
case peer := <-wsConn.Peers():
case peer, ok := <-wsConn.Peers():
if !ok {
return
}
go udpConn.GenerateLocalAddrsSends(peer.PeerID, wsConn.STUNs())
if onPeer := c.cfg.OnPeer; onPeer != nil {
go onPeer(peer.PeerID, peer.Metadata)
}
case revcUDPAddr := <-wsConn.PeersUDPAddrs():
case revcUDPAddr, ok := <-wsConn.PeersUDPAddrs():
if !ok {
return
}
go udpConn.RunDiscoMessageSendLoop(revcUDPAddr.PeerID, revcUDPAddr.Addr)
case sendUDPAddr := <-udpConn.UDPAddrSends():
case sendUDPAddr, ok := <-udpConn.UDPAddrSends():
if !ok {
return
}
go func(e *disco.PeerUDPAddrEvent) {
for i := 0; i < 3; i++ {
err := wsConn.WriteTo([]byte(e.Addr.String()), e.PeerID, peer.CONTROL_NEW_PEER_UDP_ADDR)
@ -212,6 +221,15 @@ func (c *PeerPacketConn) runControlEventLoop(wsConn *disco.WSConn, udpConn *disc
time.Sleep(200 * time.Millisecond)
}
}(sendUDPAddr)
case _, ok := <-udpConn.NetworkChangedEvents():
if !ok {
return
}
go func() {
c.discoCoolingMutex.Lock()
defer c.discoCoolingMutex.Unlock()
c.discoCooling.Clear()
}()
}
}
}