chore: rebuild upload queue logic

This commit is contained in:
wwqgtxx
2026-04-15 01:50:08 +08:00
parent a450080ed1
commit 6c407f0f93
+22 -11
View File
@@ -14,7 +14,8 @@ type Packet struct {
type UploadQueue struct {
mu sync.Mutex
cond sync.Cond
condPushed sync.Cond
condPopped sync.Cond
packets map[uint64][]byte
nextSeq uint64
buf []byte
@@ -28,7 +29,8 @@ func NewUploadQueue(maxPackets int) *UploadQueue {
packets: make(map[uint64][]byte, maxPackets),
maxPackets: maxPackets,
}
q.cond = sync.Cond{L: &q.mu}
q.condPushed = sync.Cond{L: &q.mu}
q.condPopped = sync.Cond{L: &q.mu}
return q
}
@@ -46,19 +48,19 @@ func (q *UploadQueue) Push(p Packet) error {
if p.Reader != nil {
q.reader = p.Reader
q.cond.Broadcast()
q.condPushed.Broadcast()
return nil
}
if len(q.packets) > q.maxPackets {
// the "reassembly buffer" is too large, and we want to
// constrain memory usage somehow. let's tear down the
// connection, and hope the application retries.
return errors.New("packet queue is too large")
for len(q.packets) > q.maxPackets {
q.condPopped.Wait() // wait for the reader to read the packets
if q.closed {
return io.ErrClosedPipe
}
}
q.packets[p.Seq] = p.Payload
q.cond.Broadcast()
q.condPushed.Broadcast()
return nil
}
@@ -77,6 +79,7 @@ func (q *UploadQueue) Read(b []byte) (int, error) {
delete(q.packets, q.nextSeq)
q.nextSeq++
q.buf = payload
q.condPopped.Broadcast()
continue
}
@@ -90,7 +93,14 @@ func (q *UploadQueue) Read(b []byte) (int, error) {
return 0, io.EOF
}
q.cond.Wait()
if len(q.packets) > q.maxPackets {
q.mu.Unlock()
// the "reassembly buffer" is too large, and we want to constrain memory usage somehow.
// let's tear down the connection and hope the application retries.
return 0, errors.New("packet queue is too large")
}
q.condPushed.Wait()
}
}
@@ -103,6 +113,7 @@ func (q *UploadQueue) Close() error {
err = q.reader.Close()
}
q.closed = true
q.cond.Broadcast()
q.condPushed.Broadcast()
q.condPopped.Broadcast()
return err
}