From efe5c2b0ee2081e0829c09333b0878d7011b26c5 Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Thu, 7 Mar 2024 20:48:58 +0800 Subject: [PATCH] fix: appName --- client.go | 2 +- msg.go | 51 +++++++++++++++++++++++++----------------------- netConnection.go | 12 ++++++++++-- 3 files changed, 38 insertions(+), 27 deletions(-) diff --git a/client.go b/client.go index 15702ed..48fb810 100644 --- a/client.go +++ b/client.go @@ -54,7 +54,7 @@ func NewRTMPClient(addr string) (client *NetConnection, err error) { RTMPPlugin.Error("handshake", zap.Error(err)) return nil, err } - client.appName = ps[1] + client.appName = strings.Join(ps[1:len(ps)-1], "/") err = client.SendMessage(RTMP_MSG_CHUNK_SIZE, Uint32Message(conf.ChunkSize)) if err != nil { return diff --git a/msg.go b/msg.go index 2474185..b980073 100644 --- a/msg.go +++ b/msg.go @@ -80,7 +80,7 @@ func (h ChunkHeader) Clone() *ChunkHeader { } type RtmpMessage interface { - Encode(*util.Buffer) + Encode(util.IAMF) } type HaveStreamID interface { GetStreamID() uint32 @@ -293,6 +293,9 @@ func decodeCommandAMF0(chunk *Chunk, body []byte) { amf.ReadObject(), amf.ReadObject(), "", } + if response.Infomation == nil && response.Properties != nil { + response.Infomation = response.Properties + } codef := zap.String("code", response.Infomation["code"].(string)) switch response.Infomation["level"] { case Level_Status: @@ -339,8 +342,8 @@ func (cmd *CommandMessage) GetCommand() *CommandMessage { return cmd } -func (msg *CommandMessage) Encode(buf *util.Buffer) { - buf.MarshalAMFs(msg.CommandName, msg.TransactionId, nil) +func (msg *CommandMessage) Encode(buf util.IAMF) { + buf.Marshals(msg.CommandName, msg.TransactionId, nil) } // Protocol control message 1. @@ -349,7 +352,7 @@ func (msg *CommandMessage) Encode(buf *util.Buffer) { // chunk size (31 bits): This field holds the new maximum chunk size,in bytes, which will be used for all of the sender’s subsequent chunks until further notice type Uint32Message uint32 -func (msg Uint32Message) Encode(buf *util.Buffer) { +func (msg Uint32Message) Encode(buf util.IAMF) { binary.BigEndian.PutUint32(buf.Malloc(4), uint32(msg)) } @@ -374,7 +377,7 @@ type SetPeerBandwidthMessage struct { LimitType byte } -func (msg *SetPeerBandwidthMessage) Encode(buf *util.Buffer) { +func (msg *SetPeerBandwidthMessage) Encode(buf util.IAMF) { buf.WriteUint32(msg.AcknowledgementWindowsize) buf.WriteByte(msg.LimitType) } @@ -406,10 +409,10 @@ type CallMessage struct { Optional map[string]any `json:",omitempty"` } -func (msg *CallMessage) Encode(buf *util.Buffer) { - buf.MarshalAMFs(msg.CommandName, msg.TransactionId, msg.Object) +func (msg *CallMessage) Encode(buf util.IAMF) { + buf.Marshals(msg.CommandName, msg.TransactionId, msg.Object) if msg.Optional != nil { - buf.MarshalAMFs(msg.Optional) + buf.Marshals(msg.Optional) } } @@ -465,7 +468,7 @@ type PlayMessage struct { // Duration -> 可选的参数,以秒为单位定义了回放的持续时间.默认值为 -1.-1 值意味着一个直播流会一直播放直到它不再可用或者一个录制流一直播放直到结束 // Reset -> 可选的布尔值或者数字定义了是否对以前的播放列表进行 flush -func (msg *PlayMessage) Encode(buf *util.Buffer) { +func (msg *PlayMessage) Encode(buf util.IAMF) { // if msg.Start > 0 { // amf.writeNumber(msg.Start) // } @@ -475,7 +478,7 @@ func (msg *PlayMessage) Encode(buf *util.Buffer) { // } // amf.writeBool(msg.Reset) - buf.MarshalAMFs(msg.CommandName, msg.TransactionId, nil, msg.StreamName, -2000) + buf.Marshals(msg.CommandName, msg.TransactionId, nil, msg.StreamName, -2000) } /* @@ -548,8 +551,8 @@ type PublishMessage struct { // “append”:流被发布并且附加到一个文件之后.如果没有发现文件则创建一个文件. // “live”:发布直播数据而不录制到文件 -func (msg *PublishMessage) Encode(buf *util.Buffer) { - buf.MarshalAMFs(msg.CommandName, msg.TransactionId, nil, msg.PublishingName, msg.PublishingType) +func (msg *PublishMessage) Encode(buf util.IAMF) { + buf.Marshals(msg.CommandName, msg.TransactionId, nil, msg.PublishingName, msg.PublishingType) } // Seek Message @@ -590,8 +593,8 @@ type ResponseConnectMessage struct { Infomation map[string]any `json:",omitempty"` } -func (msg *ResponseConnectMessage) Encode(buf *util.Buffer) { - buf.MarshalAMFs(msg.CommandName, msg.TransactionId, msg.Properties, msg.Infomation) +func (msg *ResponseConnectMessage) Encode(buf util.IAMF) { + buf.Marshals(msg.CommandName, msg.TransactionId, msg.Properties, msg.Infomation) } /* @@ -616,8 +619,8 @@ type ResponseCreateStreamMessage struct { StreamId uint32 } -func (msg *ResponseCreateStreamMessage) Encode(buf *util.Buffer) { - buf.MarshalAMFs(msg.CommandName, msg.TransactionId, nil, msg.StreamId) +func (msg *ResponseCreateStreamMessage) Encode(buf util.IAMF) { + buf.Marshals(msg.CommandName, msg.TransactionId, nil, msg.StreamId) } /* @@ -647,8 +650,8 @@ type ResponsePlayMessage struct { func (msg *ResponsePlayMessage) GetStreamID() uint32 { return msg.StreamID } -func (msg *ResponsePlayMessage) Encode(buf *util.Buffer) { - buf.MarshalAMFs(msg.CommandName, msg.TransactionId, nil, msg.Infomation) +func (msg *ResponsePlayMessage) Encode(buf util.IAMF) { + buf.Marshals(msg.CommandName, msg.TransactionId, nil, msg.Infomation) } /* @@ -684,8 +687,8 @@ func (msg *ResponsePublishMessage) GetStreamID() uint32 { // 属性 -> null // 信息 -> level, code, description -func (msg *ResponsePublishMessage) Encode(buf *util.Buffer) { - buf.MarshalAMFs(msg.CommandName, msg.TransactionId, msg.Properties, msg.Infomation) +func (msg *ResponsePublishMessage) Encode(buf util.IAMF) { + buf.Marshals(msg.CommandName, msg.TransactionId, msg.Properties, msg.Infomation) } /* @@ -744,7 +747,7 @@ type StreamIDMessage struct { StreamID uint32 } -func (msg *StreamIDMessage) Encode(buffer *util.Buffer) { +func (msg *StreamIDMessage) Encode(buffer util.IAMF) { buffer.WriteUint16(msg.EventType) msg.EventData = buffer.Malloc(4) binary.BigEndian.PutUint32(msg.EventData, msg.StreamID) @@ -760,7 +763,7 @@ type SetBufferMessage struct { Millisecond uint32 } -func (msg *SetBufferMessage) Encode(buf *util.Buffer) { +func (msg *SetBufferMessage) Encode(buf util.IAMF) { buf.WriteUint16(msg.EventType) msg.EventData = buf.Malloc(8) binary.BigEndian.PutUint32(msg.EventData, msg.StreamID) @@ -776,12 +779,12 @@ type PingRequestMessage struct { Timestamp uint32 } -func (msg *PingRequestMessage) Encode(buf *util.Buffer) { +func (msg *PingRequestMessage) Encode(buf util.IAMF) { buf.WriteUint16(msg.EventType) msg.EventData = buf.Malloc(4) binary.BigEndian.PutUint32(msg.EventData, msg.Timestamp) } -func (msg *UserControlMessage) Encode(buf *util.Buffer) { +func (msg *UserControlMessage) Encode(buf util.IAMF) { buf.WriteUint16(msg.EventType) } diff --git a/netConnection.go b/netConnection.go index 76a9dc2..cd1160e 100644 --- a/netConnection.go +++ b/netConnection.go @@ -9,6 +9,7 @@ import ( "runtime" "sync/atomic" + "go.uber.org/zap" "m7s.live/engine/v4/util" ) @@ -259,7 +260,7 @@ func (conn *NetConnection) RecvMessage() (msg *Chunk, err error) { switch msg.MessageTypeID { case RTMP_MSG_CHUNK_SIZE: conn.readChunkSize = int(msg.MsgData.(Uint32Message)) - println("read chunk size", conn.readChunkSize) + RTMPPlugin.Info("msg read chunk size", zap.Int("readChunkSize", conn.readChunkSize)) case RTMP_MSG_ABORT: delete(conn.incommingChunks, uint32(msg.MsgData.(Uint32Message))) case RTMP_MSG_ACK, RTMP_MSG_EDGE: @@ -293,7 +294,14 @@ func (conn *NetConnection) SendMessage(t byte, msg RtmpMessage) (err error) { } defer conn.writing.Store(false) conn.tmpBuf.Reset() - msg.Encode(&conn.tmpBuf) + amf := util.AMF{conn.tmpBuf} + if conn.objectEncoding == 0 { + msg.Encode(&amf) + } else { + amf := util.AMF3{AMF: amf} + msg.Encode(&amf) + } + conn.tmpBuf = amf.Buffer head := newChunkHeader(t) head.MessageLength = uint32(conn.tmpBuf.Len()) if sid, ok := msg.(HaveStreamID); ok {