修复play命令解析逻辑

This commit is contained in:
dexter
2022-03-08 21:30:23 +08:00
parent ca1b08e499
commit 28030327cc
4 changed files with 54 additions and 78 deletions
+12 -8
View File
@@ -7,20 +7,22 @@ github.com/Monibuca/plugin-rtmp
## 插件引入
```go
import (
_ "github.com/Monibuca/plugin-rtmp"
_ "m7s.live/plugin/rtmp/v4"
)
```
## 默认插件配置
```toml
[RTMP]
ListenAddr = ":1935"
ChunkSize = 512
```yaml
rtmp
tcp:
listenaddr: :1935
listennum: 0
chunksize: 512
```
- ListenAddr是监听的地址
- ChunkSize是分块大小
- listenaddr是监听的地址
- chunksize是输出分块大小
## 插件功能
@@ -38,4 +40,6 @@ ffmpeg -i **** -f flv rtmp://localhost/live/test
如果m7s中已经存在live/test流的话就可以用rtmp协议进行播放
```bash
ffplay -i rtmp://localhost/live/test
```
```
### 从远端拉流到m7s
+20 -10
View File
@@ -9,6 +9,7 @@ import (
"m7s.live/engine/v4/util"
)
// https://zhuanlan.zhihu.com/p/196743129
const (
/* RTMP Message ID*/
@@ -225,14 +226,22 @@ func decodeCommandAMF0(chunk *Chunk) {
float64(-1),
true,
}
if amf.Len() > 0 {
m.Start = amf.readNumber()
}
if amf.Len() > 0 {
m.Duration = amf.readNumber()
}
if amf.Len() > 0 {
m.Reset = amf.readBool()
for i := 0; i < 3; i++ {
if v := amf.decodeObject(); v != nil {
switch vv := v.(type) {
case float64:
if i == 0 {
m.Start = vv
} else {
m.Duration = vv
}
case bool:
m.Reset = vv
i = 2
}
} else {
break
}
}
chunk.MsgData = m
case "play2":
@@ -498,10 +507,11 @@ func (msg *PlayMessage) Encode() []byte {
amf.writeString(msg.StreamName)
if msg.Start > 0 {
amf.writeNumber(float64(msg.Start))
amf.writeNumber(msg.Start)
}
if msg.Duration > 0 {
amf.writeNumber(float64(msg.Duration))
amf.writeNumber(msg.Duration)
}
amf.writeBool(msg.Reset)
+14 -53
View File
@@ -109,59 +109,20 @@ func (conn *NetConnection) ResponseCreateStream(tid uint64, streamID uint32) err
return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
}
func (conn *NetConnection) SendCommand(message string, args any) error {
switch message {
// case SEND_SET_BUFFER_LENGTH_MESSAGE:
// if args != nil {
// return errors.New(SEND_SET_BUFFER_LENGTH_MESSAGE + ", The parameter is nil")
// }
// m := new(SetBufferMessage)
// m.EventType = RTMP_USER_SET_BUFFLEN
// m.Millisecond = 100
// m.StreamID = conn.streamID
// return conn.writeMessage(RTMP_MSG_USER_CONTROL, m)
case SEND_CONNECT_RESPONSE_MESSAGE:
//if !ok {
// errors.New(SEND_CONNECT_RESPONSE_MESSAGE + ", The parameter is AMFObjects(map[string]interface{})")
//}
pro := make(AMFObject)
info := make(AMFObject)
//for i, v := range data {
// switch i {
// case "fmsVer", "capabilities", "mode", "Author", "level", "code", "objectEncoding":
// pro[i] = v
// }
//}
pro["fmsVer"] = "monibuca/" + Engine.Version
pro["capabilities"] = 31
pro["mode"] = 1
pro["Author"] = "dexter"
info["level"] = Level_Status
info["code"] = NetConnection_Connect_Success
info["objectEncoding"] = conn.objectEncoding
m := new(ResponseConnectMessage)
m.CommandName = Response_Result
m.TransactionId = 1
m.Properties = pro
m.Infomation = info
return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
case SEND_UNPUBLISH_RESPONSE_MESSAGE:
data, ok := args.(AMFObject)
if !ok {
return errors.New(SEND_UNPUBLISH_RESPONSE_MESSAGE + ", The parameter is AMFObjects(map[string]interface{})")
}
m := new(CommandMessage)
m.TransactionId = data["tid"].(uint64)
m.CommandName = "releaseStream" + data["level"].(string)
return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
}
return errors.New("send message no exist")
}
// func (conn *NetConnection) SendCommand(message string, args any) error {
// switch message {
// // case SEND_SET_BUFFER_LENGTH_MESSAGE:
// // if args != nil {
// // return errors.New(SEND_SET_BUFFER_LENGTH_MESSAGE + ", The parameter is nil")
// // }
// // m := new(SetBufferMessage)
// // m.EventType = RTMP_USER_SET_BUFFLEN
// // m.Millisecond = 100
// // m.StreamID = conn.streamID
// // return conn.writeMessage(RTMP_MSG_USER_CONTROL, m)
// }
// return errors.New("send message no exist")
// }
func (conn *NetConnection) readChunk() (msg *Chunk, err error) {
head, err := conn.ReadByte()
+8 -7
View File
@@ -151,16 +151,17 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
}
case "releaseStream":
cm := msg.MsgData.(*ReleaseStreamMessage)
amfobj := make(AMFObject)
p, ok := receivers[msg.MessageStreamID]
if ok {
amfobj["level"] = "_result"
m := &CommandMessage{
CommandName: "releaseStream",
TransactionId: cm.TransactionId,
}
if p, ok := receivers[msg.MessageStreamID]; ok {
m.CommandName += "_result"
p.Stop()
} else {
amfobj["level"] = "_error"
m.CommandName += "_error"
}
amfobj["tid"] = cm.TransactionId
err = nc.SendCommand(SEND_UNPUBLISH_RESPONSE_MESSAGE, amfobj)
err = nc.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
}
case RTMP_MSG_AUDIO:
if r, ok := receivers[msg.MessageStreamID]; ok {