diff --git a/transport/xhttp/upload_queue.go b/transport/xhttp/upload_queue.go index 3172a90b..b7bff432 100644 --- a/transport/xhttp/upload_queue.go +++ b/transport/xhttp/upload_queue.go @@ -6,6 +6,8 @@ import ( "sync" ) +var ErrQueueTooLarge = errors.New("packet queue is too large") + type Packet struct { Seq uint64 Payload []byte // UploadQueue will hold Payload, so never reuse it after UploadQueue.Push @@ -97,7 +99,7 @@ func (q *UploadQueue) Read(b []byte) (int, error) { q.mu.Unlock() // the "reassembly buffer" is too large, and we want to constrain memory usage somehow. // let's tear down the connection and hope the application retries. - return 0, errors.New("packet queue is too large") + return 0, ErrQueueTooLarge } q.condPushed.Wait() diff --git a/transport/xhttp/upload_queue_test.go b/transport/xhttp/upload_queue_test.go new file mode 100644 index 00000000..2f4882c7 --- /dev/null +++ b/transport/xhttp/upload_queue_test.go @@ -0,0 +1,53 @@ +package xhttp + +import ( + "io" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestUploadQueueMaxPackets(t *testing.T) { + q := NewUploadQueue(2) + ch := make(chan struct{}) + go func() { + err := q.Push(Packet{Seq: 0, Payload: []byte{'0'}}) + assert.NoError(t, err) + err = q.Push(Packet{Seq: 1, Payload: []byte{'1'}}) + assert.NoError(t, err) + err = q.Push(Packet{Seq: 2, Payload: []byte{'2'}}) + assert.NoError(t, err) + err = q.Push(Packet{Seq: 4, Payload: []byte{'4'}}) + assert.NoError(t, err) + err = q.Push(Packet{Seq: 5, Payload: []byte{'5'}}) + assert.NoError(t, err) + err = q.Push(Packet{Seq: 6, Payload: []byte{'6'}}) + assert.NoError(t, err) + err = q.Push(Packet{Seq: 7, Payload: []byte{'7'}}) + assert.ErrorIs(t, err, io.ErrClosedPipe) + close(ch) + }() + + buf := make([]byte, 20) + n, err := q.Read(buf) + assert.Equal(t, 1, n) + assert.Equal(t, []byte{'0'}, buf[:n]) + assert.NoError(t, err) + + n, err = q.Read(buf) + assert.Equal(t, 1, n) + assert.Equal(t, []byte{'1'}, buf[:n]) + + n, err = q.Read(buf) + assert.Equal(t, 1, n) + assert.Equal(t, []byte{'2'}, buf[:n]) + + n, err = q.Read(buf) + assert.Equal(t, 0, n) + assert.ErrorIs(t, err, ErrQueueTooLarge) + + err = q.Close() + assert.NoError(t, err) + + <-ch +}