feat: add pool for ScalableMemoryAllocator

This commit is contained in:
langhuihui
2024-06-04 19:43:11 +08:00
parent 6275b9767f
commit 4f2f5d2c6a
4 changed files with 36 additions and 2 deletions
+1
View File
@@ -42,6 +42,7 @@ func NewAllocator(size int) (result *Allocator) {
root.allocator = result
return
}
func compareBySize(a, b *Block) bool {
if sizea, sizeb := a.End-a.Start, b.End-b.Start; sizea != sizeb {
return sizea < sizeb
+7
View File
@@ -2,6 +2,7 @@ package util
import (
"io"
)
const defaultBufSize = 65536
@@ -29,6 +30,12 @@ func NewBufReader(reader io.Reader) (r *BufReader) {
return
}
func (r *BufReader) Recycle() {
r.reader = nil
r.buf = MemoryReader{}
r.allocator.Recycle()
}
func (r *BufReader) eat() error {
buf := r.allocator.Malloc(r.BufLen)
if n, err := r.reader.Read(buf); err != nil {
+26 -1
View File
@@ -2,6 +2,7 @@ package util
import (
"fmt"
"sync"
"unsafe"
)
@@ -24,6 +25,10 @@ func NewMemoryAllocator(size int) (ret *MemoryAllocator) {
return
}
func (ma *MemoryAllocator) Reset() {
ma.allocator = NewAllocator(ma.Size)
}
func (ma *MemoryAllocator) Malloc(size int) (memory []byte) {
if offset := ma.allocator.Allocate(size); offset != -1 {
memory = ma.memory[offset : offset+size]
@@ -50,6 +55,8 @@ func (ma *MemoryAllocator) GetBlocks() (blocks []*Block) {
var EnableCheckSize bool = false
var pools sync.Map
type ScalableMemoryAllocator struct {
children []*MemoryAllocator
totalMalloc int64
@@ -58,7 +65,12 @@ type ScalableMemoryAllocator struct {
}
func NewScalableMemoryAllocator(size int) (ret *ScalableMemoryAllocator) {
return &ScalableMemoryAllocator{children: []*MemoryAllocator{NewMemoryAllocator(size)}, size: size}
if value, ok := pools.Load(size); ok {
ret = value.(*sync.Pool).Get().(*ScalableMemoryAllocator)
} else {
ret = &ScalableMemoryAllocator{children: []*MemoryAllocator{NewMemoryAllocator(size)}, size: size}
}
return
}
func (sma *ScalableMemoryAllocator) checkSize() {
@@ -91,6 +103,19 @@ func (sma *ScalableMemoryAllocator) GetChildren() []*MemoryAllocator {
return sma.children
}
func (sma *ScalableMemoryAllocator) Recycle() {
for _, child := range sma.children {
child.Reset()
}
size := sma.children[0].Size
pool, _ := pools.LoadOrStore(size, &sync.Pool{
New: func() interface{} {
return &ScalableMemoryAllocator{children: []*MemoryAllocator{NewMemoryAllocator(size)}, size: size}
},
})
pool.(*sync.Pool).Put(sma)
}
func (sma *ScalableMemoryAllocator) Malloc(size int) (memory []byte) {
if sma == nil {
return make([]byte, size)
+2 -1
View File
@@ -39,12 +39,13 @@ func (p *RTMPPlugin) OnPublish(puber *m7s.Publisher) {
}
func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) {
defer conn.Close()
logger := p.Logger.With("remote", conn.RemoteAddr().String())
receivers := make(map[uint32]*RTMPReceiver)
var err error
logger.Info("conn")
nc := NewNetConnection(conn, logger)
defer nc.BufReader.Recycle()
defer conn.Close()
ctx, cancel := context.WithCancelCause(p)
defer func() {
logger.Info("conn close")