diff --git a/client.go b/client.go index c0c91c6..e5dcf93 100644 --- a/client.go +++ b/client.go @@ -83,14 +83,13 @@ func (pusher *RTMPPusher) OnEvent(event any) { case engine.PushEvent: pusher.ReConnectCount++ if pusher.Stream == nil { - if plugin.Subscribe(pusher.StreamPath, pusher) { - } + plugin.Subscribe(pusher.StreamPath, pusher) } } } func (pusher *RTMPPusher) push() { - defer pusher.Bye() + defer pusher.Stop() for { msg, err := pusher.RecvMessage() if err != nil { @@ -149,15 +148,13 @@ func (puller *RTMPPuller) OnEvent(event any) { case engine.PullEvent: puller.ReConnectCount++ if puller.Stream == nil { - if plugin.Publish(puller.StreamPath, puller) { - break - } + plugin.Publish(puller.StreamPath, puller) } } } func (puller *RTMPPuller) pull() { - defer puller.Bye() + defer puller.Stop() for { msg, err := puller.RecvMessage() if err != nil { diff --git a/main.go b/main.go index 65fd4a4..f7d59b5 100644 --- a/main.go +++ b/main.go @@ -17,33 +17,34 @@ type RTMPConfig struct { ChunkSize int } -var _ PullPlugin = (*RTMPConfig)(nil) - -func (config *RTMPConfig) Update(override config.Config) { - plugin.Info("server rtmp start at", zap.String("listen addr", config.ListenAddr)) - err := config.Listen(plugin, config) - if err == context.Canceled { - plugin.Info("rtmp listen shutdown") - } else { - plugin.Fatal("rtmp server", zap.Error(err)) +func (c *RTMPConfig) OnEvent(event any) { + switch v := event.(type) { + case FirstConfig: + if c.ListenAddr != "" { + plugin.Info("server rtmp start at", zap.String("listen addr", c.ListenAddr)) + go c.Listen(plugin, c) + } + case config.Config: + plugin.CancelFunc() + if c.ListenAddr != "" { + plugin.Context, plugin.CancelFunc = context.WithCancel(Engine) + plugin.Info("server rtmp start at", zap.String("listen addr", c.ListenAddr)) + go c.Listen(plugin, c) + } + case Puller: + client := RTMPPuller{ + Puller: v, + } + client.OnEvent(PullEvent(0)) + case Pusher: + client := RTMPPusher{ + Pusher: v, + } + client.OnEvent(PushEvent(0)) } } var plugin = InstallPlugin(&RTMPConfig{ ChunkSize: 4096, TCP: config.TCP{ListenAddr: ":1935"}, -}) - -func (config *RTMPConfig) PullStream(puller Puller) { - client := RTMPPuller{ - Puller: puller, - } - client.OnEvent(PullEvent(0)) -} - -func (config *RTMPConfig) PushStream(pusher Pusher) { - client := RTMPPusher{ - Pusher: pusher, - } - client.OnEvent(PushEvent(0)) -} +}) \ No newline at end of file diff --git a/media.go b/media.go index e777c8e..7ab5bfc 100644 --- a/media.go +++ b/media.go @@ -15,11 +15,9 @@ type RTMPSender struct { func (rtmp *RTMPSender) OnEvent(event any) { switch v := event.(type) { case AudioDeConf: - if rtmp.AudioTrack.IsAAC() { - rtmp.sendAVMessage(0, net.Buffers{v.AVCC}, true, true) - } + rtmp.sendAVMessage(0, v.AVCC, true, true) case VideoDeConf: - rtmp.sendAVMessage(0, net.Buffers(v.AVCC), false, true) + rtmp.sendAVMessage(0, v.AVCC, false, true) // case TrackRemoved: //TODO case AudioFrame: diff --git a/server.go b/server.go index 3a1a2b2..24c86fc 100644 --- a/server.go +++ b/server.go @@ -102,7 +102,7 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) { } receiver.Closer = &nc receiver.OnEvent(ctx) - if plugin.Publish(nc.appName+"/"+pm.PublishingName, receiver) { + if plugin.Publish(nc.appName+"/"+pm.PublishingName, receiver) == nil { receivers[receiver.StreamID] = receiver receiver.absTs = make(map[uint32]uint32) receiver.Begin() @@ -120,7 +120,7 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) { } sender.OnEvent(ctx) sender.ID = fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), sender.StreamID) - if plugin.Subscribe(streamPath, sender) { + if plugin.Subscribe(streamPath, sender) == nil { senders[sender.StreamID] = sender err = nc.SendStreamID(RTMP_USER_STREAM_IS_RECORDED, msg.MessageStreamID) sender.Begin() @@ -133,7 +133,7 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) { case "closeStream": cm := msg.MsgData.(*CURDStreamMessage) if stream, ok := senders[cm.StreamId]; ok { - stream.Bye() + stream.Stop() delete(senders, cm.StreamId) } case "releaseStream": @@ -142,7 +142,7 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) { p, ok := receivers[msg.MessageStreamID] if ok { amfobj["level"] = "_result" - p.Bye() + p.Stop() } else { amfobj["level"] = "_error" }