From 6c407f0f9390b19733ae243a2e6e84705576e5b1 Mon Sep 17 00:00:00 2001 From: wwqgtxx Date: Wed, 15 Apr 2026 01:50:08 +0800 Subject: [PATCH] chore: rebuild upload queue logic --- transport/xhttp/upload_queue.go | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/transport/xhttp/upload_queue.go b/transport/xhttp/upload_queue.go index 1fac71bb..3172a90b 100644 --- a/transport/xhttp/upload_queue.go +++ b/transport/xhttp/upload_queue.go @@ -14,7 +14,8 @@ type Packet struct { type UploadQueue struct { mu sync.Mutex - cond sync.Cond + condPushed sync.Cond + condPopped sync.Cond packets map[uint64][]byte nextSeq uint64 buf []byte @@ -28,7 +29,8 @@ func NewUploadQueue(maxPackets int) *UploadQueue { packets: make(map[uint64][]byte, maxPackets), maxPackets: maxPackets, } - q.cond = sync.Cond{L: &q.mu} + q.condPushed = sync.Cond{L: &q.mu} + q.condPopped = sync.Cond{L: &q.mu} return q } @@ -46,19 +48,19 @@ func (q *UploadQueue) Push(p Packet) error { if p.Reader != nil { q.reader = p.Reader - q.cond.Broadcast() + q.condPushed.Broadcast() return nil } - if len(q.packets) > q.maxPackets { - // the "reassembly buffer" is too large, and we want to - // constrain memory usage somehow. let's tear down the - // connection, and hope the application retries. - return errors.New("packet queue is too large") + for len(q.packets) > q.maxPackets { + q.condPopped.Wait() // wait for the reader to read the packets + if q.closed { + return io.ErrClosedPipe + } } q.packets[p.Seq] = p.Payload - q.cond.Broadcast() + q.condPushed.Broadcast() return nil } @@ -77,6 +79,7 @@ func (q *UploadQueue) Read(b []byte) (int, error) { delete(q.packets, q.nextSeq) q.nextSeq++ q.buf = payload + q.condPopped.Broadcast() continue } @@ -90,7 +93,14 @@ func (q *UploadQueue) Read(b []byte) (int, error) { return 0, io.EOF } - q.cond.Wait() + if len(q.packets) > q.maxPackets { + q.mu.Unlock() + // the "reassembly buffer" is too large, and we want to constrain memory usage somehow. + // let's tear down the connection and hope the application retries. + return 0, errors.New("packet queue is too large") + } + + q.condPushed.Wait() } } @@ -103,6 +113,7 @@ func (q *UploadQueue) Close() error { err = q.reader.Close() } q.closed = true - q.cond.Broadcast() + q.condPushed.Broadcast() + q.condPopped.Broadcast() return err }