diff --git a/client.go b/client.go index 0527f9a..b7c09d8 100644 --- a/client.go +++ b/client.go @@ -109,7 +109,9 @@ func (pusher *RTMPPusher) Connect() (err error) { } return } - +func (pusher *RTMPPusher) Disconnect() { + pusher.NetConnection.Close() +} func (pusher *RTMPPusher) Push() error { pusher.SendMessage(RTMP_MSG_AMF0_COMMAND, &CommandMessage{"createStream", 2}) for { @@ -168,6 +170,11 @@ func (puller *RTMPPuller) Connect() (err error) { } return } +func (puller *RTMPPuller) Disconnect() { + if puller.NetConnection != nil { + puller.NetConnection.Close() + } +} func (puller *RTMPPuller) Pull() (err error) { defer puller.Stop() diff --git a/main.go b/main.go index 58573a0..dab803d 100644 --- a/main.go +++ b/main.go @@ -4,7 +4,6 @@ import ( "context" "net/http" "strconv" - "time" "go.uber.org/zap" . "m7s.live/engine/v4" @@ -47,9 +46,9 @@ func (c *RTMPConfig) OnEvent(event any) { RTMPPlugin.Error("push", zap.String("streamPath", v.Target.Path), zap.String("url", url), zap.Error(err)) } } - case *Stream: //按需拉流 - if url, ok := c.PullOnSub[v.Path]; ok { - pull(v.Path, url) + case InvitePublish: //按需拉流 + if url, ok := c.PullOnSub[v.Target]; ok { + pull(v.Target, url) } } } @@ -72,24 +71,24 @@ func filterStreams() (ss []*Stream) { } func (*RTMPConfig) API_list(w http.ResponseWriter, r *http.Request) { - util.ReturnJson(filterStreams, time.Second, w, r) + util.ReturnFetchValue(filterStreams, w, r) } func (*RTMPConfig) API_Pull(rw http.ResponseWriter, r *http.Request) { save, _ := strconv.Atoi(r.URL.Query().Get("save")) err := RTMPPlugin.Pull(r.URL.Query().Get("streamPath"), r.URL.Query().Get("target"), new(RTMPPuller), save) if err != nil { - http.Error(rw, err.Error(), http.StatusBadRequest) + util.ReturnError(util.APIErrorQueryParse, err.Error(), rw, r) } else { - rw.Write([]byte("ok")) + util.ReturnOK(rw, r) } } func (*RTMPConfig) API_Push(rw http.ResponseWriter, r *http.Request) { err := RTMPPlugin.Push(r.URL.Query().Get("streamPath"), r.URL.Query().Get("target"), new(RTMPPusher), r.URL.Query().Has("save")) if err != nil { - http.Error(rw, err.Error(), http.StatusBadRequest) + util.ReturnError(util.APIErrorQueryParse, err.Error(), rw, r) } else { - rw.Write([]byte("ok")) + util.ReturnOK(rw, r) } } diff --git a/server.go b/server.go index c9a9eba..aa9b1ea 100644 --- a/server.go +++ b/server.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "net" - "sync/atomic" "go.uber.org/zap" "m7s.live/engine/v4" @@ -20,8 +19,6 @@ func (ns *NetStream) Begin() { ns.SendStreamID(RTMP_USER_STREAM_BEGIN, ns.StreamID) } -var gstreamid uint32 - type RTMPSubscriber struct { RTMPSender } @@ -35,12 +32,14 @@ func (s *RTMPSubscriber) OnEvent(event any) { } func (config *RTMPConfig) ServeTCP(conn net.Conn) { defer conn.Close() - zapRemote := zap.String("remote", conn.RemoteAddr().String()) + logger := RTMPPlugin.Logger.With(zap.String("remote", conn.RemoteAddr().String())) senders := make(map[uint32]*RTMPSubscriber) receivers := make(map[uint32]*RTMPReceiver) var err error + logger.Info("conn") defer func() { ze := zap.Error(err) + logger.Info("conn close", ze) for _, sender := range senders { sender.Stop(ze) } @@ -53,10 +52,11 @@ func (config *RTMPConfig) ServeTCP(conn net.Conn) { defer cancel() /* Handshake */ if err = nc.Handshake(); err != nil { - RTMPPlugin.Error("handshake", zap.Error(err), zapRemote) + logger.Error("handshake", zap.Error(err)) return } var msg *Chunk + var gstreamid uint32 for { if msg, err = nc.RecvMessage(); err == nil { if msg.MessageLength <= 0 { @@ -68,7 +68,7 @@ func (config *RTMPConfig) ServeTCP(conn net.Conn) { break } cmd := msg.MsgData.(Commander).GetCommand() - RTMPPlugin.Debug("recv cmd", zap.String("commandName", cmd.CommandName), zap.Uint32("streamID", msg.MessageStreamID), zapRemote) + logger.Debug("recv cmd", zap.String("commandName", cmd.CommandName), zap.Uint32("streamID", msg.MessageStreamID)) switch cmd := msg.MsgData.(type) { case *CallMessage: //connect app := cmd.Object["app"] // 客户端要连接到的服务应用名 @@ -80,7 +80,7 @@ func (config *RTMPConfig) ServeTCP(conn net.Conn) { nc.objectEncoding = 0 } nc.appName = app.(string) - RTMPPlugin.Info("connect", zap.String("appName", nc.appName), zap.Float64("objectEncoding", nc.objectEncoding), zapRemote) + logger.Info("connect", zap.String("appName", nc.appName), zap.Float64("objectEncoding", nc.objectEncoding)) err = nc.SendMessage(RTMP_MSG_ACK_SIZE, Uint32Message(512<<10)) nc.writeChunkSize = config.ChunkSize err = nc.SendMessage(RTMP_MSG_CHUNK_SIZE, Uint32Message(config.ChunkSize)) @@ -105,9 +105,9 @@ func (config *RTMPConfig) ServeTCP(conn net.Conn) { } err = nc.SendMessage(RTMP_MSG_AMF0_COMMAND, m) case *CommandMessage: // "createStream" - streamId := atomic.AddUint32(&gstreamid, 1) - RTMPPlugin.Info("createStream:", zap.Uint32("streamId", streamId), zapRemote) - nc.ResponseCreateStream(cmd.TransactionId, streamId) + gstreamid++ + logger.Info("createStream:", zap.Uint32("streamId", gstreamid)) + nc.ResponseCreateStream(cmd.TransactionId, gstreamid) case *CURDStreamMessage: if stream, ok := receivers[cmd.StreamId]; ok { stream.Stop() @@ -171,20 +171,20 @@ func (config *RTMPConfig) ServeTCP(conn net.Conn) { if r, ok := receivers[msg.MessageStreamID]; ok { r.ReceiveAudio(msg) } else { - RTMPPlugin.Warn("ReceiveAudio", zap.Uint32("MessageStreamID", msg.MessageStreamID), zapRemote) + logger.Warn("ReceiveAudio", zap.Uint32("MessageStreamID", msg.MessageStreamID)) } case RTMP_MSG_VIDEO: if r, ok := receivers[msg.MessageStreamID]; ok { r.ReceiveVideo(msg) } else { - RTMPPlugin.Warn("ReceiveVideo", zap.Uint32("MessageStreamID", msg.MessageStreamID), zapRemote) + logger.Warn("ReceiveVideo", zap.Uint32("MessageStreamID", msg.MessageStreamID)) } } } else if err == io.EOF || err == io.ErrUnexpectedEOF { - RTMPPlugin.Info("rtmp client closed", zapRemote) + logger.Info("rtmp client closed") return } else { - RTMPPlugin.Warn("ReadMessage", zap.Error(err), zapRemote) + logger.Warn("ReadMessage", zap.Error(err)) return } }