初步调通

This commit is contained in:
langhuihui
2022-02-19 21:15:28 +08:00
parent c69646aec9
commit 6140f652a6
5 changed files with 59 additions and 47 deletions
+4 -6
View File
@@ -71,7 +71,7 @@ type RTMPPusher struct {
engine.Pusher
}
func (pusher *RTMPPusher) OnEvent(event any) any {
func (pusher *RTMPPusher) OnEvent(event any) {
pusher.RTMPSender.OnEvent(event)
switch event.(type) {
case *engine.Stream:
@@ -87,11 +87,10 @@ func (pusher *RTMPPusher) OnEvent(event any) any {
}
}
}
return event
}
func (pusher *RTMPPusher) push() {
defer pusher.Unsubscribe()
defer pusher.Bye()
for {
msg, err := pusher.RecvMessage()
if err != nil {
@@ -136,7 +135,7 @@ type RTMPPuller struct {
engine.Puller
}
func (puller *RTMPPuller) OnEvent(event any) any {
func (puller *RTMPPuller) OnEvent(event any) {
puller.RTMPReceiver.OnEvent(event)
switch event.(type) {
case *engine.Stream:
@@ -155,11 +154,10 @@ func (puller *RTMPPuller) OnEvent(event any) any {
}
}
}
return event
}
func (puller *RTMPPuller) pull() {
defer puller.Unpublish()
defer puller.Bye()
for {
msg, err := puller.RecvMessage()
if err != nil {
+7 -1
View File
@@ -2,4 +2,10 @@ module github.com/Monibuca/plugin-rtmp/v4
go 1.18
require github.com/logrusorgru/aurora v2.0.3+incompatible
require go.uber.org/zap v1.21.0
require (
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
+11 -2
View File
@@ -1,2 +1,11 @@
github.com/logrusorgru/aurora v2.0.3+incompatible h1:tOpm7WcpBTn4fjmVfgpQq0EfczGlG91VSDkswnjF5A8=
github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/zap v1.21.0 h1:WefMeulhovoZ2sYXz7st6K0sLj7bBhpiFaud4r4zST8=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
+13 -28
View File
@@ -4,8 +4,6 @@ import (
"net"
. "github.com/Monibuca/engine/v4"
"github.com/Monibuca/engine/v4/codec"
"github.com/Monibuca/engine/v4/track"
"github.com/Monibuca/engine/v4/util"
)
@@ -14,36 +12,23 @@ type RTMPSender struct {
NetStream
}
func (rtmp *RTMPSender) OnEvent(event any) any {
rtmp.Subscriber.OnEvent(event)
func (rtmp *RTMPSender) OnEvent(event any) {
switch v := event.(type) {
case TrackRemoved:
case AudioDeConf:
if rtmp.AudioTrack.IsAAC() {
rtmp.sendAVMessage(0, net.Buffers{v.AVCC}, true, true)
}
case VideoDeConf:
rtmp.sendAVMessage(0, net.Buffers(v.AVCC), false, true)
// case TrackRemoved:
//TODO
case *track.Audio:
isPlaying := rtmp.IsPlaying()
if rtmp.AddTrack(v) {
if v.CodecID == codec.CodecID_AAC {
rtmp.sendAVMessage(0, net.Buffers{rtmp.Subscriber.AudioTrack.DecoderConfiguration.AVCC}, false, true)
}
// 如果不订阅视频则遇到音频也播放,否则需要等视频先播放
if !isPlaying && !rtmp.Config.SubVideo {
go rtmp.Play()
}
}
case *track.Video:
isPlaying := rtmp.IsPlaying()
if rtmp.AddTrack(v) {
rtmp.sendAVMessage(0, net.Buffers(rtmp.Subscriber.VideoTrack.DecoderConfiguration.AVCC), true, true)
if !isPlaying {
go rtmp.Play()
}
}
case *AudioFrame:
case AudioFrame:
rtmp.sendAVMessage(v.DeltaTime, v.AVCC, true, false)
case *VideoFrame:
case VideoFrame:
rtmp.sendAVMessage(v.DeltaTime, v.AVCC, false, false)
default:
rtmp.Subscriber.OnEvent(event)
}
return event
}
// 当发送音视频数据的时候,当块类型为12的时候,Chunk Message Header有一个字段TimeStamp,指明一个时间
@@ -104,7 +89,7 @@ func (r *RTMPSender) Response(code, level string) error {
type RTMPReceiver struct {
Publisher
NetStream
absTs map[uint32]uint32
absTs map[uint32]uint32
}
func (r *RTMPReceiver) Response(code, level string) error {
+24 -10
View File
@@ -23,8 +23,19 @@ func (ns *NetStream) Begin() {
var gstreamid = uint32(64)
type RTMPSubscriber struct {
RTMPSender
}
func (s *RTMPSubscriber) OnEvent(event any) {
switch event.(type) {
case engine.SEclose:
s.Response(NetStream_Play_Stop, Level_Status)
}
s.RTMPSender.OnEvent(event)
}
func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
senders := make(map[uint32]*RTMPSender)
senders := make(map[uint32]*RTMPSubscriber)
receivers := make(map[uint32]*RTMPReceiver)
nc := NetConnection{
TCPConn: conn,
@@ -37,8 +48,10 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
tmpBuf: make([]byte, 4),
}
ctx, cancel := context.WithCancel(engine.Engine)
defer nc.Close()
defer cancel()
defer func() {
nc.Close()
cancel()
}()
/* Handshake */
if err := nc.Handshake(); err != nil {
plugin.Error("handshake", zap.Error(err))
@@ -87,6 +100,7 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
StreamID: pm.StreamId,
},
}
receiver.Closer = &nc
receiver.OnEvent(ctx)
if plugin.Publish(nc.appName+"/"+pm.PublishingName, receiver) {
receivers[receiver.StreamID] = receiver
@@ -99,11 +113,10 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
case "play":
pm := msg.MsgData.(*PlayMessage)
streamPath := nc.appName + "/" + pm.StreamName
sender := &RTMPSender{
NetStream: NetStream{
NetConnection: &nc,
StreamID: msg.MessageStreamID,
},
sender := &RTMPSubscriber{}
sender.NetStream = NetStream{
&nc,
msg.MessageStreamID,
}
sender.OnEvent(ctx)
sender.ID = fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), sender.StreamID)
@@ -113,13 +126,14 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
sender.Begin()
sender.Response(NetStream_Play_Reset, Level_Status)
sender.Response(NetStream_Play_Start, Level_Status)
go sender.Play(sender)
} else {
sender.Response(NetStream_Play_Failed, Level_Error)
}
case "closeStream":
cm := msg.MsgData.(*CURDStreamMessage)
if stream, ok := senders[cm.StreamId]; ok {
stream.Unsubscribe()
stream.Bye()
delete(senders, cm.StreamId)
}
case "releaseStream":
@@ -128,7 +142,7 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
p, ok := receivers[msg.MessageStreamID]
if ok {
amfobj["level"] = "_result"
p.Unpublish()
p.Bye()
} else {
amfobj["level"] = "_error"
}