fix: reduce ring

This commit is contained in:
langhuihui
2024-06-04 08:49:30 +08:00
parent 7ab831dad5
commit 0c7bd27176
3 changed files with 90 additions and 47 deletions
+9 -17
View File
@@ -21,6 +21,7 @@ func SplitAnnexB[T ~[]byte](frame T, process func(T), delimiter []byte) {
}
}
}
type LIRTP = util.ListItem[RTPFrame]
type RTPFrame struct {
*rtp.Packet
@@ -50,7 +51,6 @@ type DataFrame[T any] struct {
WriteTime time.Time // 写入时间,可用于比较两个帧的先后
Sequence uint32 // 在一个Track中的序号
BytesIn int // 输入字节数用于计算BPS
CanRead bool `json:"-" yaml:"-"` // 是否可读取
readerCount atomic.Int32 `json:"-" yaml:"-"` // 读取者数量
Data T `json:"-" yaml:"-"`
sync.Cond `json:"-" yaml:"-"`
@@ -59,19 +59,15 @@ type DataFrame[T any] struct {
func NewDataFrame[T any]() *DataFrame[T] {
return &DataFrame[T]{}
}
func (df *DataFrame[T]) IsWriting() bool {
return !df.CanRead
return df.readerCount.Load() == -1
}
func (df *DataFrame[T]) IsDiscarded() bool {
return df.L == nil
}
func (df *DataFrame[T]) Discard() int32 {
df.L = nil //标记为废弃
return df.readerCount.Load()
}
func (df *DataFrame[T]) SetSequence(sequence uint32) {
df.Sequence = sequence
}
@@ -84,27 +80,23 @@ func (df *DataFrame[T]) ReaderEnter() int32 {
return df.readerCount.Add(1)
}
func (df *DataFrame[T]) ReaderCount() int32 {
return df.readerCount.Load()
}
func (df *DataFrame[T]) ReaderLeave() int32 {
return df.readerCount.Add(-1)
}
func (df *DataFrame[T]) StartWrite() bool {
if df.readerCount.Load() > 0 {
df.Discard() //标记为废弃
return false
} else {
df.CanRead = false //标记为正在写入
if df.readerCount.CompareAndSwap(0, -1) {
return true
}
df.L = nil //标记为废弃
return false
}
func (df *DataFrame[T]) Ready() {
df.WriteTime = time.Now()
df.CanRead = true //标记为可读取
if !df.readerCount.CompareAndSwap(-1, 0) {
panic("Ready")
}
df.Broadcast()
}
+65
View File
@@ -0,0 +1,65 @@
package track
import (
"context"
"testing"
"time"
"m7s.live/engine/v4/util"
. "m7s.live/engine/v4/common"
)
func TestRing(t *testing.T) {
w := &util.RingWriter[any, *AVFrame]{}
w.Init(10,func() *AVFrame {
return &AVFrame{}
})
ctx, _ := context.WithTimeout(context.Background(), time.Second*10)
go t.Run("writer", func(t *testing.T) {
for i := 0; ctx.Err() == nil; i++ {
w.Value.Data = i
normal := w.Step()
t.Log("write", i, normal)
time.Sleep(time.Millisecond * 50)
}
})
go t.Run("reader1", func(t *testing.T) {
var reader RingReader[any, *AVFrame]
err := reader.StartRead(w.Ring)
if err != nil {
t.Error(err)
return
}
for ctx.Err() == nil {
err = reader.ReadNext()
t.Log("read1", reader.Value.Data)
if err != nil {
t.Error(err)
break
}
time.Sleep(time.Millisecond * 10)
}
reader.Value.ReaderLeave()
<-ctx.Done()
})
// slow reader
t.Run("reader2", func(t *testing.T) {
var reader RingReader[any, *AVFrame]
err := reader.StartRead(w.Ring)
if err != nil {
t.Error(err)
return
}
for ctx.Err() == nil {
err = reader.ReadNext()
if err != nil {
// t.Error(err)
return
}
t.Log("read2", reader.Value.Data)
time.Sleep(time.Millisecond * 100)
}
reader.Value.ReaderLeave()
<-ctx.Done()
})
}
+16 -30
View File
@@ -2,7 +2,6 @@ package util
import (
"sync/atomic"
)
type emptyLocker struct{}
@@ -21,8 +20,6 @@ type IDataFrame[T any] interface {
StartWrite() bool // 开始写入
SetSequence(uint32) // 设置序号
GetSequence() uint32 // 获取序号
ReaderCount() int32 // 读取者数量
Discard() int32 // 如果写入时还有读取者没有离开则废弃该帧,剥离RingBuffer,防止并发读写
IsDiscarded() bool // 是否已废弃
IsWriting() bool // 是否正在写入
Wait() // 阻塞等待可读取
@@ -30,13 +27,13 @@ type IDataFrame[T any] interface {
}
type RingWriter[T any, F IDataFrame[T]] struct {
*Ring[F] `json:"-" yaml:"-"`
ReaderCount atomic.Int32 `json:"-" yaml:"-"`
pool *Ring[F]
poolSize int
Size int
LastValue F
constructor func() F
*Ring[F] `json:"-" yaml:"-"`
ReaderCount atomic.Int32 `json:"-" yaml:"-"`
pool *Ring[F]
poolSize int
Size int
LastValue F
constructor func() F
}
func (rb *RingWriter[T, F]) create(n int) (ring *Ring[F]) {
@@ -53,15 +50,10 @@ func (rb *RingWriter[T, F]) Init(n int, constructor func() F) *RingWriter[T, F]
rb.Ring = rb.create(n)
rb.Size = n
rb.LastValue = rb.Value
rb.Value.StartWrite()
return rb
}
// func (rb *RingBuffer[T, F]) MoveNext() F {
// rb.LastValue = rb.Value
// rb.Ring = rb.Next()
// return rb.Value
// }
func (rb *RingWriter[T, F]) Glow(size int) (newItem *Ring[F]) {
if size < rb.poolSize {
newItem = rb.pool.Unlink(size)
@@ -93,22 +85,16 @@ func (rb *RingWriter[T, F]) Recycle(r *Ring[F]) {
func (rb *RingWriter[T, F]) Reduce(size int) {
r := rb.Unlink(size)
if size > 1 {
for p := r.Next(); p != r; {
next := p.Next() //先保存下一个节点
if p.Value.Discard() == 0 {
rb.Recycle(p.Prev().Unlink(1))
} else {
// fmt.Println("Reduce", p.Value.ReaderCount())
}
p = next
for p := r.Next(); p != r; {
next := p.Next() //先保存下一个节点
if !rb.Value.IsDiscarded() {
rb.Recycle(p.Prev().Unlink(1))
} else {
// fmt.Println("Reduce", p.Value.ReaderCount())
}
}
if r.Value.Discard() == 0 {
rb.Recycle(r)
p = next
}
rb.Size -= size
return
}
func (rb *RingWriter[T, F]) Step() (normal bool) {
@@ -131,4 +117,4 @@ func (rb *RingWriter[T, F]) Step() (normal bool) {
func (rb *RingWriter[T, F]) GetReaderCount() int32 {
return rb.ReaderCount.Load()
}
}