适配引擎升级

This commit is contained in:
dexter
2022-02-21 21:41:02 +08:00
parent 5afbc8378a
commit 7aa8b3f8b5
4 changed files with 35 additions and 39 deletions
+4 -7
View File
@@ -83,14 +83,13 @@ func (pusher *RTMPPusher) OnEvent(event any) {
case engine.PushEvent:
pusher.ReConnectCount++
if pusher.Stream == nil {
if plugin.Subscribe(pusher.StreamPath, pusher) {
}
plugin.Subscribe(pusher.StreamPath, pusher)
}
}
}
func (pusher *RTMPPusher) push() {
defer pusher.Bye()
defer pusher.Stop()
for {
msg, err := pusher.RecvMessage()
if err != nil {
@@ -149,15 +148,13 @@ func (puller *RTMPPuller) OnEvent(event any) {
case engine.PullEvent:
puller.ReConnectCount++
if puller.Stream == nil {
if plugin.Publish(puller.StreamPath, puller) {
break
}
plugin.Publish(puller.StreamPath, puller)
}
}
}
func (puller *RTMPPuller) pull() {
defer puller.Bye()
defer puller.Stop()
for {
msg, err := puller.RecvMessage()
if err != nil {
+25 -24
View File
@@ -17,33 +17,34 @@ type RTMPConfig struct {
ChunkSize int
}
var _ PullPlugin = (*RTMPConfig)(nil)
func (config *RTMPConfig) Update(override config.Config) {
plugin.Info("server rtmp start at", zap.String("listen addr", config.ListenAddr))
err := config.Listen(plugin, config)
if err == context.Canceled {
plugin.Info("rtmp listen shutdown")
} else {
plugin.Fatal("rtmp server", zap.Error(err))
func (c *RTMPConfig) OnEvent(event any) {
switch v := event.(type) {
case FirstConfig:
if c.ListenAddr != "" {
plugin.Info("server rtmp start at", zap.String("listen addr", c.ListenAddr))
go c.Listen(plugin, c)
}
case config.Config:
plugin.CancelFunc()
if c.ListenAddr != "" {
plugin.Context, plugin.CancelFunc = context.WithCancel(Engine)
plugin.Info("server rtmp start at", zap.String("listen addr", c.ListenAddr))
go c.Listen(plugin, c)
}
case Puller:
client := RTMPPuller{
Puller: v,
}
client.OnEvent(PullEvent(0))
case Pusher:
client := RTMPPusher{
Pusher: v,
}
client.OnEvent(PushEvent(0))
}
}
var plugin = InstallPlugin(&RTMPConfig{
ChunkSize: 4096,
TCP: config.TCP{ListenAddr: ":1935"},
})
func (config *RTMPConfig) PullStream(puller Puller) {
client := RTMPPuller{
Puller: puller,
}
client.OnEvent(PullEvent(0))
}
func (config *RTMPConfig) PushStream(pusher Pusher) {
client := RTMPPusher{
Pusher: pusher,
}
client.OnEvent(PushEvent(0))
}
})
+2 -4
View File
@@ -15,11 +15,9 @@ type RTMPSender struct {
func (rtmp *RTMPSender) OnEvent(event any) {
switch v := event.(type) {
case AudioDeConf:
if rtmp.AudioTrack.IsAAC() {
rtmp.sendAVMessage(0, net.Buffers{v.AVCC}, true, true)
}
rtmp.sendAVMessage(0, v.AVCC, true, true)
case VideoDeConf:
rtmp.sendAVMessage(0, net.Buffers(v.AVCC), false, true)
rtmp.sendAVMessage(0, v.AVCC, false, true)
// case TrackRemoved:
//TODO
case AudioFrame:
+4 -4
View File
@@ -102,7 +102,7 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
}
receiver.Closer = &nc
receiver.OnEvent(ctx)
if plugin.Publish(nc.appName+"/"+pm.PublishingName, receiver) {
if plugin.Publish(nc.appName+"/"+pm.PublishingName, receiver) == nil {
receivers[receiver.StreamID] = receiver
receiver.absTs = make(map[uint32]uint32)
receiver.Begin()
@@ -120,7 +120,7 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
}
sender.OnEvent(ctx)
sender.ID = fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), sender.StreamID)
if plugin.Subscribe(streamPath, sender) {
if plugin.Subscribe(streamPath, sender) == nil {
senders[sender.StreamID] = sender
err = nc.SendStreamID(RTMP_USER_STREAM_IS_RECORDED, msg.MessageStreamID)
sender.Begin()
@@ -133,7 +133,7 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
case "closeStream":
cm := msg.MsgData.(*CURDStreamMessage)
if stream, ok := senders[cm.StreamId]; ok {
stream.Bye()
stream.Stop()
delete(senders, cm.StreamId)
}
case "releaseStream":
@@ -142,7 +142,7 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
p, ok := receivers[msg.MessageStreamID]
if ok {
amfobj["level"] = "_result"
p.Bye()
p.Stop()
} else {
amfobj["level"] = "_error"
}