From 180c7f940f4961fa302eaf846b3c6c0196a3140b Mon Sep 17 00:00:00 2001 From: rkonfj Date: Sat, 31 May 2025 11:25:51 +0800 Subject: [PATCH] disco/udp: make relayprotocol works better --- disco/udp/relay.go | 14 ++++++++++++++ disco/udp/udp.go | 30 +++++++++++++++++++----------- p2p/conn.go | 7 +++++++ 3 files changed, 40 insertions(+), 11 deletions(-) diff --git a/disco/udp/relay.go b/disco/udp/relay.go index d42ac21..fbbdfb6 100644 --- a/disco/udp/relay.go +++ b/disco/udp/relay.go @@ -2,6 +2,7 @@ package udp import ( "bytes" + "fmt" "github.com/sigcn/pg/disco" ) @@ -11,6 +12,19 @@ var ( MAGIC_FROM_RELAY = []byte{'_', 'p', 'g', 3} ) +type RelayToPeerError struct { + PeerID disco.PeerID + err error +} + +func (e RelayToPeerError) Error() string { + return fmt.Errorf("relay to %s : %w", e.PeerID, e.err).Error() +} + +func (e RelayToPeerError) Unwrap() error { + return e.err +} + type relayProtocol struct { } diff --git a/disco/udp/udp.go b/disco/udp/udp.go index c050ffd..6422a96 100644 --- a/disco/udp/udp.go +++ b/disco/udp/udp.go @@ -32,7 +32,7 @@ type UDPConn struct { udpConnsMutex sync.RWMutex udpConns []*net.UDPConn - closedSig chan int + closeChan chan int closedWG sync.WaitGroup cfg UDPConfig @@ -40,6 +40,7 @@ type UDPConn struct { datagrams chan *disco.Datagram natEvents chan *disco.NATInfo endpoints chan *disco.Endpoint + errChan chan error relayProtocol relayProtocol upnpPortMapping upnpPortMapping stunRoundTripper stunRoundTripper @@ -54,7 +55,7 @@ type UDPConn struct { func (c *UDPConn) Close() error { c.upnpPortMapping.close() - close(c.closedSig) + close(c.closeChan) c.udpConnsMutex.RLock() for _, conn := range c.udpConns { conn.Close() @@ -64,6 +65,7 @@ func (c *UDPConn) Close() error { close(c.natEvents) close(c.datagrams) close(c.endpoints) + close(c.errChan) return nil } @@ -101,6 +103,10 @@ func (c *UDPConn) Endpoints() <-chan *disco.Endpoint { return c.endpoints } +func (c *UDPConn) Errors() <-chan error { + return c.errChan +} + func (c *UDPConn) GenerateLocalAddrsSends(peerID disco.PeerID, stunServers []string) { // UPnP go func() { @@ -234,7 +240,7 @@ func (c *UDPConn) RunDiscoMessageSendLoop(udpAddr disco.Endpoint) { for i := 0; i < defaultDiscoConfig.ChallengesRetry; i++ { time.Sleep(interval) select { - case <-c.closedSig: + case <-c.closeChan: return default: } @@ -251,7 +257,7 @@ func (c *UDPConn) RunDiscoMessageSendLoop(udpAddr disco.Endpoint) { rl := rate.NewLimiter(rate.Limit(256), 256) for range 2000 { select { - case <-c.closedSig: + case <-c.closeChan: return default: } @@ -315,7 +321,7 @@ func (c *UDPConn) RunDiscoMessageSendLoop(udpAddr disco.Endpoint) { rl := rate.NewLimiter(rate.Limit(limit), limit) for port := udpAddr.Addr.Port + defaultDiscoConfig.PortScanOffset; port <= udpAddr.Addr.Port+defaultDiscoConfig.PortScanCount; port++ { select { - case <-c.closedSig: + case <-c.closeChan: return default: } @@ -454,7 +460,7 @@ func (c *UDPConn) udpRead(udpConn *net.UDPConn) { buf := make([]byte, 65535) for { select { - case <-c.closedSig: + case <-c.closeChan: return default: } @@ -492,15 +498,16 @@ func (c *UDPConn) udpRead(udpConn *net.UDPConn) { c.tryGetPeerkeeper(udpConn, peerID).heartbeat(peerAddr) slog.Log(context.Background(), -3, "[UDP] ReadFrom", "peer", peerID, "addr", peerAddr) if pkt, dst := c.relayProtocol.tryToDst(buf[:n], peerID); pkt != nil { - c.WriteTo(pkt, dst) // relay to dest + if _, err := c.WriteTo(pkt, dst); err != nil { + c.errChan <- RelayToPeerError{PeerID: dst, err: err} + } // relay to dest continue } if pkt, src := c.relayProtocol.tryRecv(buf[:n]); pkt != nil { c.datagrams <- &disco.Datagram{PeerID: src, Data: pkt} // recv from relay continue } - b := append([]byte(nil), buf[:n]...) - c.datagrams <- &disco.Datagram{PeerID: peerID, Data: b} + c.datagrams <- &disco.Datagram{PeerID: peerID, Data: slices.Clone(buf[:n])} } } @@ -508,7 +515,7 @@ func (c *UDPConn) runPeersHealthcheckLoop() { ticker := time.NewTicker(c.cfg.PeerKeepaliveInterval/2 + time.Second) for { select { - case <-c.closedSig: + case <-c.closeChan: ticker.Stop() return case <-ticker.C: @@ -581,7 +588,8 @@ func ListenUDP(cfg UDPConfig) (*UDPConn, error) { udpConn := UDPConn{ cfg: cfg, disco: &disco.Disco{Magic: cfg.DiscoMagic}, - closedSig: make(chan int), + closeChan: make(chan int), + errChan: make(chan error), natEvents: make(chan *disco.NATInfo, 3), datagrams: make(chan *disco.Datagram), endpoints: make(chan *disco.Endpoint, 10), diff --git a/p2p/conn.go b/p2p/conn.go index 92b2334..229b7dc 100644 --- a/p2p/conn.go +++ b/p2p/conn.go @@ -436,6 +436,13 @@ func (c *PacketConn) eventsHandle() { return } go sendEndpoint(endpoint) + case err, ok := <-c.udpConn.Errors(): + if !ok { + return + } + if relayToErr, ok := err.(udp.RelayToPeerError); ok && !errors.Is(relayToErr.Unwrap(), udp.ErrUDPConnInactive) { + go c.TryLeadDisco(relayToErr.PeerID) // peer not found, and trying to discover it. + } } } }