From b7c5b799465fb6c868f5a3fcb1a177dea714eacd Mon Sep 17 00:00:00 2001 From: rkonfj Date: Sun, 28 Apr 2024 21:11:13 +0800 Subject: [PATCH] pgcli: encoding sharing filename and some other improvements --- cmd/pgcli/download/download.go | 29 +++++++++++------------------ cmd/pgcli/share/filemanager.go | 16 +++++++++++++--- cmd/pgcli/share/share.go | 5 +++-- cmd/pgcli/vpn/vpn.go | 2 +- disco/disco.go | 2 +- disco/ws.go | 14 ++++++++++++++ peermap/peermap.go | 7 ++++--- 7 files changed, 47 insertions(+), 28 deletions(-) diff --git a/cmd/pgcli/download/download.go b/cmd/pgcli/download/download.go index 83cccb1..a3c643b 100644 --- a/cmd/pgcli/download/download.go +++ b/cmd/pgcli/download/download.go @@ -10,6 +10,7 @@ import ( "fmt" "io" "log/slog" + "net" "net/url" "os" "os/signal" @@ -69,7 +70,11 @@ func init() { } ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer cancel() - return requestFile(ctx, pubnet, resourceURL.Host, uint16(index), filename) + fn, err := url.QueryUnescape(filename) + if err != nil { + fn = filename + } + return requestFile(ctx, pubnet, resourceURL.Host, uint16(index), fn) }, } Cmd.Flags().StringP("server", "s", "", "peermap server") @@ -91,7 +96,7 @@ func requestFile(ctx context.Context, pubnet pubnet.PublicNetwork, peerID string } defer conn.Close() conn.SetStreamMode(true) - conn.SetNoDelay(1, 10, 2, 1) + conn.SetNoDelay(0, 10, 0, 1) conn.SetWindowSize(1024, 1024) f, err := os.Create(filename) @@ -123,24 +128,16 @@ func requestFile(ctx context.Context, pubnet pubnet.PublicNetwork, peerID string fileSize := binary.BigEndian.Uint32(header[1:]) bar := progressbar.NewOptions64( int64(fileSize), - progressbar.OptionSetDescription("downloading"), + progressbar.OptionSetDescription(filename), progressbar.OptionSetWriter(os.Stderr), progressbar.OptionShowBytes(true), progressbar.OptionSetWidth(10), - progressbar.OptionThrottle(10*time.Millisecond), + progressbar.OptionThrottle(500*time.Millisecond), progressbar.OptionShowCount(), progressbar.OptionOnCompletion(func() { fmt.Fprint(os.Stderr, "\n") }), - progressbar.OptionSetTheme(progressbar.Theme{ - Saucer: "=", - SaucerHead: ">", - SaucerPadding: " ", - BarStart: "[", - BarEnd: "]", - }), progressbar.OptionSpinnerType(14), - progressbar.OptionFullWidth(), progressbar.OptionSetRenderBlankState(true), ) go func() { // watch exit program event @@ -156,7 +153,6 @@ func requestFile(ctx context.Context, pubnet pubnet.PublicNetwork, peerID string if err != nil && !errors.Is(err, io.EOF) { return fmt.Errorf("download file falied: %w", err) } - conn.Write(buildChecksum()) checksum := make([]byte, 32) if _, err = io.ReadFull(conn, checksum); err != nil { return fmt.Errorf("read checksum failed: %w", err) @@ -169,7 +165,7 @@ func requestFile(ctx context.Context, pubnet pubnet.PublicNetwork, peerID string } type downloader struct { - r io.Reader + r net.Conn finished func() bool } @@ -177,6 +173,7 @@ func (d *downloader) Read(p []byte) (n int, err error) { if d.finished() { return 0, io.EOF } + d.r.SetReadDeadline(time.Now().Add(5 * time.Second)) return d.r.Read(p) } @@ -190,7 +187,3 @@ func buildGet(index uint16) []byte { func buildClose() []byte { return []byte{1} } - -func buildChecksum() []byte { - return []byte{2} -} diff --git a/cmd/pgcli/share/filemanager.go b/cmd/pgcli/share/filemanager.go index 3ddf799..bc7d043 100644 --- a/cmd/pgcli/share/filemanager.go +++ b/cmd/pgcli/share/filemanager.go @@ -8,6 +8,7 @@ import ( "io" "log/slog" "net" + "net/url" "os" "os/user" "path/filepath" @@ -88,11 +89,11 @@ func (fm *FileManager) HandleRequest(peerID string, conn net.Conn) { bar := progressbar.NewOptions64( stat.Size(), - progressbar.OptionSetDescription(fmt.Sprintf("%s:%s", peerID, stat.Name())), + progressbar.OptionSetDescription(fmt.Sprintf("%s:%s", peerID, url.QueryEscape(stat.Name()))), progressbar.OptionSetWriter(os.Stderr), progressbar.OptionShowBytes(true), progressbar.OptionSetWidth(10), - progressbar.OptionThrottle(10*time.Millisecond), + progressbar.OptionThrottle(200*time.Millisecond), progressbar.OptionShowCount(), progressbar.OptionOnCompletion(func() { fmt.Fprint(os.Stderr, "\n") @@ -110,13 +111,22 @@ func (fm *FileManager) HandleRequest(peerID string, conn net.Conn) { ) sha256Checksum := sha256.New() - _, err = io.Copy(io.MultiWriter(conn, bar, sha256Checksum), f) + _, err = io.Copy(io.MultiWriter(&sender{conn}, bar, sha256Checksum), f) if err != nil { slog.Info("Copy file failed", "err", err) } conn.Write(sha256Checksum.Sum(nil)) } +type sender struct { + net.Conn +} + +func (s *sender) Write(b []byte) (n int, err error) { + s.Conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) + return s.Conn.Write(b) +} + func buildOK(fileSize int64) []byte { pkt := make([]byte, 5) copy(pkt[1:], binary.BigEndian.AppendUint32(nil, uint32(fileSize))) diff --git a/cmd/pgcli/share/share.go b/cmd/pgcli/share/share.go index 4b2e568..08a1c29 100644 --- a/cmd/pgcli/share/share.go +++ b/cmd/pgcli/share/share.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log/slog" + "net/url" "os" "os/signal" "path/filepath" @@ -72,7 +73,7 @@ func serve(ctx context.Context, pubnet pubnet.PublicNetwork, files []string) err if index, err := fm.Add(file); err != nil { slog.Warn("AddFile", "path", file, "err", err) } else { - fmt.Printf("ShareURL: pg://%s/%d/%s\n", packetConn.LocalAddr(), index, filepath.Base(file)) + fmt.Printf("ShareURL: pg://%s/%d/%s\n", packetConn.LocalAddr(), index, url.QueryEscape(filepath.Base(file))) } } @@ -97,7 +98,7 @@ func serve(ctx context.Context, pubnet pubnet.PublicNetwork, files []string) err continue } conn.SetStreamMode(true) - conn.SetNoDelay(1, 10, 2, 1) + conn.SetNoDelay(0, 10, 0, 1) conn.SetWindowSize(1024, 1024) go func() { <-ctx.Done() diff --git a/cmd/pgcli/vpn/vpn.go b/cmd/pgcli/vpn/vpn.go index 94405c1..38ec1c3 100644 --- a/cmd/pgcli/vpn/vpn.go +++ b/cmd/pgcli/vpn/vpn.go @@ -46,7 +46,7 @@ func init() { Cmd.Flags().StringSlice("peer", []string{}, "specify peers instead of auto-discovery (pg://?alias1=&alias2=)") Cmd.Flags().Int("disco-port-scan-count", 2000, "scan ports count when disco") - Cmd.Flags().Int("disco-challenges-retry", 2, "ping challenges retry count when disco") + Cmd.Flags().Int("disco-challenges-retry", 3, "ping challenges retry count when disco") Cmd.Flags().Duration("disco-challenges-initial-interval", 200*time.Millisecond, "ping challenges initial interval when disco") Cmd.Flags().Float64("disco-challenges-backoff-rate", 1.75, "ping challenges backoff rate when disco") diff --git a/disco/disco.go b/disco/disco.go index 59d2e21..192ec05 100644 --- a/disco/disco.go +++ b/disco/disco.go @@ -18,7 +18,7 @@ var ( var defaultDiscoConfig = DiscoConfig{ PortScanCount: 2000, - ChallengesRetry: 2, + ChallengesRetry: 3, ChallengesInitialInterval: 200 * time.Millisecond, ChallengesBackoffRate: 1.75, } diff --git a/disco/ws.go b/disco/ws.go index be2b111..e3e89c2 100644 --- a/disco/ws.go +++ b/disco/ws.go @@ -10,6 +10,7 @@ import ( "net" "net/http" "net/url" + "strconv" "strings" "sync" "time" @@ -17,6 +18,7 @@ import ( "github.com/gorilla/websocket" "github.com/rkonfj/peerguard/peer" "github.com/rkonfj/peerguard/peer/peermap" + "golang.org/x/time/rate" ) var ( @@ -36,6 +38,7 @@ type WSConn struct { stuns []string activeTime time.Time writeMutex sync.Mutex + rateLimiter *rate.Limiter connData chan []byte connBuf []byte @@ -95,6 +98,9 @@ func (c *WSConn) CloseConn() error { } func (c *WSConn) WriteTo(p []byte, peerID peer.ID, op byte) error { + if op == peer.CONTROL_RELAY && c.rateLimiter != nil { + c.rateLimiter.WaitN(context.Background(), len(p)) + } b := make([]byte, 0, 2+len(peerID)+len(p)) b = append(b, op) // relay b = append(b, peerID.Len()) // addr length @@ -171,6 +177,14 @@ func (c *WSConn) dial(server string) error { if err != nil { return err } + limit, err := strconv.ParseInt(httpResp.Header.Get("X-Limiter-Limit"), 10, 64) + if err != nil { + return err + } + slog.Log(context.Background(), -2, "RealyRateLimiter", "limit", limit, "burst", limit) + if limit > 0 { + c.rateLimiter = rate.NewLimiter(rate.Limit(limit), int(limit)) + } c.Conn = conn c.stuns = stuns c.nonce = peer.MustParseNonce(httpResp.Header.Get("X-Nonce")) diff --git a/peermap/peermap.go b/peermap/peermap.go index 889777a..3adbf48 100644 --- a/peermap/peermap.go +++ b/peermap/peermap.go @@ -192,9 +192,6 @@ func (p *Peer) readMessageLoop() { return } p.activeTime = time.Now() - if p.networkContext.ratelimiter != nil { - p.networkContext.ratelimiter.WaitN(context.Background(), len(b)) - } switch mt { case websocket.BinaryMessage: default: @@ -451,6 +448,10 @@ func (pm *PeerMap) HandlePeerPacketConnect(w http.ResponseWriter, r *http.Reques upgradeHeader.Set("X-Nonce", r.Header.Get("X-Nonce")) stuns, _ := json.Marshal(pm.cfg.STUNs) upgradeHeader.Set("X-STUNs", base64.StdEncoding.EncodeToString(stuns)) + if pm.cfg.RateLimiter != nil { + upgradeHeader.Set("X-Limiter-Burst", fmt.Sprintf("%d", pm.cfg.RateLimiter.Burst)) + upgradeHeader.Set("X-Limiter-Limit", fmt.Sprintf("%d", pm.cfg.RateLimiter.Limit)) + } wsConn, err := pm.wsUpgrader.Upgrade(w, r, upgradeHeader) if err != nil { slog.Error(err.Error())