From c18d8e116673d6c6bf8b33b92572de2c3d5ef09a Mon Sep 17 00:00:00 2001 From: lynx Date: Wed, 2 Aug 2023 14:42:09 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=20buffer=20=E6=8B=B7?= =?UTF-8?q?=E8=B4=9D=E7=9B=B8=E5=85=B3=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/device/tun_linux.go | 1 + core/engine/dev.go | 6 ++++-- core/engine/engine.go | 32 +++++++++++++++++++------------- 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/core/device/tun_linux.go b/core/device/tun_linux.go index 49a5aca..0382dbf 100644 --- a/core/device/tun_linux.go +++ b/core/device/tun_linux.go @@ -182,6 +182,7 @@ func CreateTUN(name string, mtu int) (Device, error) { // set the current file descriptor to non-blocking status to improve concurrency err = unix.SetNonblock(tfd, true) if err != nil { + syscall.Close(tfd) return nil, err } diff --git a/core/engine/dev.go b/core/engine/dev.go index 6c80e95..961f536 100644 --- a/core/engine/dev.go +++ b/core/engine/dev.go @@ -1,6 +1,8 @@ package engine -import "io" +import ( + "io" +) var _ io.ReadWriteCloser = (*devWrapper)(nil) @@ -12,7 +14,7 @@ type devWrapper struct { func (c *devWrapper) Read(p []byte) (n int, err error) { packet := <-c.r copy(p, packet.Data) - return len(p), nil + return len(packet.Data), nil } func (c *devWrapper) Write(p []byte) (n int, err error) { diff --git a/core/engine/engine.go b/core/engine/engine.go index 8351f30..7cdeb3c 100644 --- a/core/engine/engine.go +++ b/core/engine/engine.go @@ -158,17 +158,23 @@ func (e *Engine) Start() error { util.Advertise(e.ctx, e.discovery, e.host.ID().String()) go func() { + ticker := time.NewTimer(5 * time.Minute) for { - peers, err := e.dht.GetClosestPeers(e.ctx, string(e.host.ID())) - if err != nil { - e.log.Warningf(e.ctx, "Failed to get nearest node: %s", err) - continue - } + select { + case <-ticker.C: + peers, err := e.dht.GetClosestPeers(e.ctx, string(e.host.ID())) + if err != nil { + e.log.Warningf(e.ctx, "Failed to get nearest node: %s", err) + continue + } - for _, id := range peers { - e.relayChan <- e.host.Peerstore().PeerInfo(id) + for _, id := range peers { + e.relayChan <- e.host.Peerstore().PeerInfo(id) + } + case <-e.ctx.Done(): + ticker.Stop() + return } - time.Sleep(10 * time.Minute) } }() @@ -204,8 +210,9 @@ func (e *Engine) RoutineTUNReader() { continue } payload := Payload{ - Src: ip.Src(), - Dst: ip.Dst(), + Src: ip.Src(), + Dst: ip.Dst(), + Data: make([]byte, n), } copy(payload.Data, buff[:n]) select { @@ -225,7 +232,7 @@ func (e *Engine) RoutineTUNWriter() { for payload = range e.devWriter { _, err = e.device.Write(payload.Data) if err != nil { - e.errChan <- fmt.Errorf("[RoutineTUNWriter]: %s", err) + e.log.Error(e.ctx, fmt.Errorf("[RoutineTUNWriter]: %s", err)) return } } @@ -237,7 +244,6 @@ func (e *Engine) RoutineRouteTableWriter() { ) for payload = range e.devReader { - fmt.Println(payload.Dst) var conn PacketChan c, ok := e.routeTable.addr.Load(payload.Dst) if ok { @@ -310,6 +316,7 @@ func (e *Engine) addConn(dst netip.Addr) (PacketChan, error) { e.log.Warningf(e.ctx, "Connection establishment with node %s failed due to %s", info, err) } e.log.Infof(e.ctx, "Peer [%s] connect success", string(id)) + defer stream.Close() go func() { defer stream.Close() @@ -319,7 +326,6 @@ func (e *Engine) addConn(dst netip.Addr) (PacketChan, error) { } }() - defer stream.Close() _, err = io.Copy(dev, stream) if err != nil && err != io.EOF { e.log.Errorf(e.ctx, "Peer [%s] stream read error: %s", string(id), err)