fix: share tcp or udp not delete when complete

desc: 单端口的使用,当一个端口的tcp或udp使用完毕后,不会被释放,导致后续的使用失败
This commit is contained in:
langhuihui
2023-06-11 13:21:43 +08:00
parent cb127e253c
commit cc7f939a51
5 changed files with 36 additions and 13 deletions
+8 -3
View File
@@ -52,7 +52,6 @@ func (c *PSConfig) ServeTCP(conn net.Conn) {
var err error
ps := make(util.Buffer, 1024)
tcpAddr := zap.String("tcp", conn.LocalAddr().String())
rtpLen := make([]byte, 2)
var puber *PSPublisher
if _, err = io.ReadFull(conn, rtpLen); err != nil {
@@ -182,7 +181,10 @@ func Receive(streamPath, dump, port string, ssrc uint32, reuse bool) (err error)
if _, ok := conf.shareTCP.LoadOrStore(listenaddr, &tcpConf); ok {
} else {
conf.streams.Store(ssrc, &pubber)
go tcpConf.ListenTCP(PSPlugin, conf)
go func() {
tcpConf.ListenTCP(PSPlugin, conf)
conf.shareTCP.Delete(listenaddr)
}()
}
} else {
tcpConf.ListenNum = 1
@@ -202,7 +204,10 @@ func Receive(streamPath, dump, port string, ssrc uint32, reuse bool) (err error)
}
udpConf.UDPConn = udpConn
conf.streams.Store(ssrc, &pubber)
go conf.ServeUDP(udpConn)
go func() {
conf.ServeUDP(udpConn)
conf.shareUDP.Delete(listenaddr)
}()
}
} else {
udpConn, err := util.ListenUDP(listenaddr, 1024*1024)
+1 -2
View File
@@ -96,8 +96,7 @@ loop:
case StartCodeVideo:
payload, err = dec.ReadPayload()
if err == nil {
frame, err = dec.video.parsePESPacket(payload)
if frame != nil {
if frame, err = dec.video.parsePESPacket(payload); frame != nil {
dec.ReceiveVideo(*frame)
}
}
+14 -7
View File
@@ -97,7 +97,11 @@ func (p *PSPublisher) PushPS(ps util.Buffer) {
}
p.pushPS()
}
func (p *PSPublisher) pushRelay(){
item := p.pool.Get(len(p.Packet.Payload))
copy(item.Value, p.Packet.Payload)
p.relayTrack.Push(item)
}
// 解析rtp封装 https://www.ietf.org/rfc/rfc2250.txt
func (p *PSPublisher) pushPS() {
if p.Stream == nil {
@@ -111,17 +115,16 @@ func (p *PSPublisher) pushPS() {
p.lastSeq = p.SequenceNumber - 1
p.pool = make(util.BytesPool, 17)
}
if conf.RelayMode != 0 {
item := p.pool.Get(len(p.Packet.Payload))
copy(item.Value, p.Packet.Payload)
p.relayTrack.Push(item)
}
if conf.RelayMode == 1 && p.relayTrack.PSM != nil {
p.pushRelay()
return
}
if p.DisableReorder {
p.Feed(p.Packet.Payload)
p.lastSeq = p.SequenceNumber
if conf.RelayMode != 0 {
p.pushRelay()
}
} else {
item := p.pool.Get(len(p.Packet.Payload))
copy(item.Value, p.Packet.Payload)
@@ -135,7 +138,11 @@ func (p *PSPublisher) pushPS() {
}
p.Feed(rtpPacket.Value)
p.lastSeq = rtpPacket.Seq
rtpPacket.Recycle()
if conf.RelayMode != 0 {
p.relayTrack.Push(rtpPacket.ListItem)
} else {
rtpPacket.Recycle()
}
}
}
}
+12
View File
@@ -1,6 +1,8 @@
package ps
import (
"bytes"
"github.com/gobwas/ws/wsutil"
. "m7s.live/engine/v4"
"m7s.live/engine/v4/common"
@@ -15,7 +17,17 @@ func (ps *PSSubscriber) OnEvent(event any) {
switch v := event.(type) {
case *PSTrack:
wsutil.WriteServerBinary(ps, util.ConcatBuffers(v.GetPSM()))
enter := false
go v.Play(ps.IO, func(data *common.DataFrame[*util.ListItem[util.Buffer]]) error {
if !enter {
if bytes.Compare(data.Value.Value[:3], []byte{0, 0, 1}) == 0 {
enter = true
} else {
return nil
}
}
// fmt.Printf("% 02X", data.Value.Value[:10])
// fmt.Println()
return wsutil.WriteServerBinary(ps, data.Value.Value)
})
default:
+1 -1
View File
@@ -20,7 +20,7 @@ func (ps *PSTrack) GetPSM() (result net.Buffers) {
func NewPSTrack(s common.IStream) *PSTrack {
result := &PSTrack{}
result.Init(20)
result.Init(1000)
result.SetStuff("ps", s)
result.Reset = func(f *common.DataFrame[*util.ListItem[util.Buffer]]) {
f.Value.Recycle()