From 28030327cc9570c8f54df64952505096d22c6f53 Mon Sep 17 00:00:00 2001 From: dexter <178529795@qq.com> Date: Tue, 8 Mar 2022 21:30:23 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dplay=E5=91=BD=E4=BB=A4?= =?UTF-8?q?=E8=A7=A3=E6=9E=90=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 20 +++++++++------ msg.go | 30 ++++++++++++++-------- netConnection.go | 67 ++++++++++-------------------------------------- server.go | 15 ++++++----- 4 files changed, 54 insertions(+), 78 deletions(-) diff --git a/README.md b/README.md index 0fc71db..6c763ed 100644 --- a/README.md +++ b/README.md @@ -7,20 +7,22 @@ github.com/Monibuca/plugin-rtmp ## 插件引入 ```go import ( - _ "github.com/Monibuca/plugin-rtmp" + _ "m7s.live/plugin/rtmp/v4" ) ``` ## 默认插件配置 -```toml -[RTMP] -ListenAddr = ":1935" -ChunkSize = 512 +```yaml +rtmp + tcp: + listenaddr: :1935 + listennum: 0 + chunksize: 512 ``` -- ListenAddr是监听的地址 -- ChunkSize是分块大小 +- listenaddr是监听的地址 +- chunksize是输出分块大小 ## 插件功能 @@ -38,4 +40,6 @@ ffmpeg -i **** -f flv rtmp://localhost/live/test 如果m7s中已经存在live/test流的话就可以用rtmp协议进行播放 ```bash ffplay -i rtmp://localhost/live/test -``` \ No newline at end of file +``` + +### 从远端拉流到m7s diff --git a/msg.go b/msg.go index 9c859f2..88d650c 100644 --- a/msg.go +++ b/msg.go @@ -9,6 +9,7 @@ import ( "m7s.live/engine/v4/util" ) +// https://zhuanlan.zhihu.com/p/196743129 const ( /* RTMP Message ID*/ @@ -225,14 +226,22 @@ func decodeCommandAMF0(chunk *Chunk) { float64(-1), true, } - if amf.Len() > 0 { - m.Start = amf.readNumber() - } - if amf.Len() > 0 { - m.Duration = amf.readNumber() - } - if amf.Len() > 0 { - m.Reset = amf.readBool() + for i := 0; i < 3; i++ { + if v := amf.decodeObject(); v != nil { + switch vv := v.(type) { + case float64: + if i == 0 { + m.Start = vv + } else { + m.Duration = vv + } + case bool: + m.Reset = vv + i = 2 + } + } else { + break + } } chunk.MsgData = m case "play2": @@ -498,10 +507,11 @@ func (msg *PlayMessage) Encode() []byte { amf.writeString(msg.StreamName) if msg.Start > 0 { - amf.writeNumber(float64(msg.Start)) + amf.writeNumber(msg.Start) } + if msg.Duration > 0 { - amf.writeNumber(float64(msg.Duration)) + amf.writeNumber(msg.Duration) } amf.writeBool(msg.Reset) diff --git a/netConnection.go b/netConnection.go index b91ed7b..08d1178 100644 --- a/netConnection.go +++ b/netConnection.go @@ -109,59 +109,20 @@ func (conn *NetConnection) ResponseCreateStream(tid uint64, streamID uint32) err return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m) } -func (conn *NetConnection) SendCommand(message string, args any) error { - switch message { - // case SEND_SET_BUFFER_LENGTH_MESSAGE: - // if args != nil { - // return errors.New(SEND_SET_BUFFER_LENGTH_MESSAGE + ", The parameter is nil") - // } - // m := new(SetBufferMessage) - // m.EventType = RTMP_USER_SET_BUFFLEN - // m.Millisecond = 100 - // m.StreamID = conn.streamID - // return conn.writeMessage(RTMP_MSG_USER_CONTROL, m) - - case SEND_CONNECT_RESPONSE_MESSAGE: - //if !ok { - // errors.New(SEND_CONNECT_RESPONSE_MESSAGE + ", The parameter is AMFObjects(map[string]interface{})") - //} - - pro := make(AMFObject) - info := make(AMFObject) - - //for i, v := range data { - // switch i { - // case "fmsVer", "capabilities", "mode", "Author", "level", "code", "objectEncoding": - // pro[i] = v - // } - //} - - pro["fmsVer"] = "monibuca/" + Engine.Version - pro["capabilities"] = 31 - pro["mode"] = 1 - pro["Author"] = "dexter" - - info["level"] = Level_Status - info["code"] = NetConnection_Connect_Success - info["objectEncoding"] = conn.objectEncoding - m := new(ResponseConnectMessage) - m.CommandName = Response_Result - m.TransactionId = 1 - m.Properties = pro - m.Infomation = info - return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m) - case SEND_UNPUBLISH_RESPONSE_MESSAGE: - data, ok := args.(AMFObject) - if !ok { - return errors.New(SEND_UNPUBLISH_RESPONSE_MESSAGE + ", The parameter is AMFObjects(map[string]interface{})") - } - m := new(CommandMessage) - m.TransactionId = data["tid"].(uint64) - m.CommandName = "releaseStream" + data["level"].(string) - return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m) - } - return errors.New("send message no exist") -} +// func (conn *NetConnection) SendCommand(message string, args any) error { +// switch message { +// // case SEND_SET_BUFFER_LENGTH_MESSAGE: +// // if args != nil { +// // return errors.New(SEND_SET_BUFFER_LENGTH_MESSAGE + ", The parameter is nil") +// // } +// // m := new(SetBufferMessage) +// // m.EventType = RTMP_USER_SET_BUFFLEN +// // m.Millisecond = 100 +// // m.StreamID = conn.streamID +// // return conn.writeMessage(RTMP_MSG_USER_CONTROL, m) +// } +// return errors.New("send message no exist") +// } func (conn *NetConnection) readChunk() (msg *Chunk, err error) { head, err := conn.ReadByte() diff --git a/server.go b/server.go index 00248c6..7109e5e 100644 --- a/server.go +++ b/server.go @@ -151,16 +151,17 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) { } case "releaseStream": cm := msg.MsgData.(*ReleaseStreamMessage) - amfobj := make(AMFObject) - p, ok := receivers[msg.MessageStreamID] - if ok { - amfobj["level"] = "_result" + m := &CommandMessage{ + CommandName: "releaseStream", + TransactionId: cm.TransactionId, + } + if p, ok := receivers[msg.MessageStreamID]; ok { + m.CommandName += "_result" p.Stop() } else { - amfobj["level"] = "_error" + m.CommandName += "_error" } - amfobj["tid"] = cm.TransactionId - err = nc.SendCommand(SEND_UNPUBLISH_RESPONSE_MESSAGE, amfobj) + err = nc.SendMessage(RTMP_MSG_AMF0_COMMAND, m) } case RTMP_MSG_AUDIO: if r, ok := receivers[msg.MessageStreamID]; ok {