diff --git a/pkg/util/allocator.go b/pkg/util/allocator.go index 027f6ed..f3e25e0 100644 --- a/pkg/util/allocator.go +++ b/pkg/util/allocator.go @@ -42,6 +42,7 @@ func NewAllocator(size int) (result *Allocator) { root.allocator = result return } + func compareBySize(a, b *Block) bool { if sizea, sizeb := a.End-a.Start, b.End-b.Start; sizea != sizeb { return sizea < sizeb diff --git a/pkg/util/buf-reader.go b/pkg/util/buf-reader.go index 80f0e35..d89a2d6 100644 --- a/pkg/util/buf-reader.go +++ b/pkg/util/buf-reader.go @@ -2,6 +2,7 @@ package util import ( "io" + ) const defaultBufSize = 65536 @@ -29,6 +30,12 @@ func NewBufReader(reader io.Reader) (r *BufReader) { return } +func (r *BufReader) Recycle() { + r.reader = nil + r.buf = MemoryReader{} + r.allocator.Recycle() +} + func (r *BufReader) eat() error { buf := r.allocator.Malloc(r.BufLen) if n, err := r.reader.Read(buf); err != nil { diff --git a/pkg/util/mem.go b/pkg/util/mem.go index 29ac995..b07ec17 100644 --- a/pkg/util/mem.go +++ b/pkg/util/mem.go @@ -2,6 +2,7 @@ package util import ( "fmt" + "sync" "unsafe" ) @@ -24,6 +25,10 @@ func NewMemoryAllocator(size int) (ret *MemoryAllocator) { return } +func (ma *MemoryAllocator) Reset() { + ma.allocator = NewAllocator(ma.Size) +} + func (ma *MemoryAllocator) Malloc(size int) (memory []byte) { if offset := ma.allocator.Allocate(size); offset != -1 { memory = ma.memory[offset : offset+size] @@ -50,6 +55,8 @@ func (ma *MemoryAllocator) GetBlocks() (blocks []*Block) { var EnableCheckSize bool = false +var pools sync.Map + type ScalableMemoryAllocator struct { children []*MemoryAllocator totalMalloc int64 @@ -58,7 +65,12 @@ type ScalableMemoryAllocator struct { } func NewScalableMemoryAllocator(size int) (ret *ScalableMemoryAllocator) { - return &ScalableMemoryAllocator{children: []*MemoryAllocator{NewMemoryAllocator(size)}, size: size} + if value, ok := pools.Load(size); ok { + ret = value.(*sync.Pool).Get().(*ScalableMemoryAllocator) + } else { + ret = &ScalableMemoryAllocator{children: []*MemoryAllocator{NewMemoryAllocator(size)}, size: size} + } + return } func (sma *ScalableMemoryAllocator) checkSize() { @@ -91,6 +103,19 @@ func (sma *ScalableMemoryAllocator) GetChildren() []*MemoryAllocator { return sma.children } +func (sma *ScalableMemoryAllocator) Recycle() { + for _, child := range sma.children { + child.Reset() + } + size := sma.children[0].Size + pool, _ := pools.LoadOrStore(size, &sync.Pool{ + New: func() interface{} { + return &ScalableMemoryAllocator{children: []*MemoryAllocator{NewMemoryAllocator(size)}, size: size} + }, + }) + pool.(*sync.Pool).Put(sma) +} + func (sma *ScalableMemoryAllocator) Malloc(size int) (memory []byte) { if sma == nil { return make([]byte, size) diff --git a/plugin/rtmp/index.go b/plugin/rtmp/index.go index fcfb243..616bb43 100644 --- a/plugin/rtmp/index.go +++ b/plugin/rtmp/index.go @@ -39,12 +39,13 @@ func (p *RTMPPlugin) OnPublish(puber *m7s.Publisher) { } func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) { - defer conn.Close() logger := p.Logger.With("remote", conn.RemoteAddr().String()) receivers := make(map[uint32]*RTMPReceiver) var err error logger.Info("conn") nc := NewNetConnection(conn, logger) + defer nc.BufReader.Recycle() + defer conn.Close() ctx, cancel := context.WithCancelCause(p) defer func() { logger.Info("conn close")