From 9f6958ff5c3cfaf51db15876ad0385ac2f2ec3a4 Mon Sep 17 00:00:00 2001 From: dexter <178529795@qq.com> Date: Sun, 13 Mar 2022 23:23:41 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client.go | 8 +++----- handshake.go | 2 +- media.go | 4 ++-- netConnection.go | 8 ++++---- server.go | 4 ++-- 5 files changed, 12 insertions(+), 14 deletions(-) diff --git a/client.go b/client.go index e8ec6df..ff11073 100644 --- a/client.go +++ b/client.go @@ -31,10 +31,9 @@ func NewRTMPClient(addr string) (client *NetConnection, err error) { writeChunkSize: conf.ChunkSize, readChunkSize: RTMP_DEFAULT_CHUNK_SIZE, rtmpHeader: make(map[uint32]*ChunkHeader), - incompleteRtmpBody: make(map[uint32]util.Buffer), + incompleteRtmpBody: make(map[uint32]*util.Buffer), bandwidth: RTMP_MAX_CHUNK_SIZE << 3, tmpBuf: make([]byte, 4), - // subscribers: make(map[uint32]*engine.Subscriber), } err = client.ClientHandshake() if err != nil { @@ -104,7 +103,7 @@ func (pusher *RTMPPusher) Push() { URL, _ := url.Parse(pusher.RemoteURL) ps := strings.Split(URL.Path, "/") pusher.Args = URL.Query() - m := &PublishMessage{ + pusher.SendMessage(RTMP_MSG_AMF0_COMMAND, &PublishMessage{ CURDStreamMessage{ CommandMessage{ "publish", @@ -114,8 +113,7 @@ func (pusher *RTMPPusher) Push() { }, ps[len(ps)-1], "live", - } - pusher.SendMessage(RTMP_MSG_AMF0_COMMAND, m) + }) } else if response, ok := msg.MsgData.(*ResponsePublishMessage); ok { if response.Infomation["code"] == NetStream_Publish_Start { go pusher.PlayBlock(pusher) diff --git a/handshake.go b/handshake.go index f0af9f4..2673075 100644 --- a/handshake.go +++ b/handshake.go @@ -121,7 +121,7 @@ func (nc *NetConnection) complex_handshake(C1 []byte) error { return err } - fmt.Sprintf("digested handshake, scheme : %v\nchallenge : %v\ndigest : %v\nok : %v\nerr : %v\n", scheme, challenge, digest, ok, err) + fmt.Printf("digested handshake, scheme : %v\nchallenge : %v\ndigest : %v\nok : %v\nerr : %v\n", scheme, challenge, digest, ok, err) if !ok { return errors.New("validateClient failed") diff --git a/media.go b/media.go index 1ca238b..e990217 100644 --- a/media.go +++ b/media.go @@ -19,9 +19,9 @@ func (rtmp *RTMPSender) OnEvent(event any) { case VideoDeConf: rtmp.sendAVMessage(0, v.AVCC, false, true) case *AudioFrame: - rtmp.sendAVMessage(v.DeltaTime, v.GetAVCC(), true, false) + rtmp.sendAVMessage(v.DeltaTime, v.AVCC, true, false) case *VideoFrame: - rtmp.sendAVMessage(v.DeltaTime, v.GetAVCC(), false, false) + rtmp.sendAVMessage(v.DeltaTime, v.AVCC, false, false) default: rtmp.Subscriber.OnEvent(event) } diff --git a/netConnection.go b/netConnection.go index 08d1178..dbe6446 100644 --- a/netConnection.go +++ b/netConnection.go @@ -84,7 +84,7 @@ type NetConnection struct { totalRead uint32 // 总共读了多少字节 writeChunkSize int readChunkSize int - incompleteRtmpBody map[uint32]util.Buffer // 完整的RtmpBody,在网络上是被分成一块一块的,需要将其组装起来 + incompleteRtmpBody map[uint32]*util.Buffer // 完整的RtmpBody,在网络上是被分成一块一块的,需要将其组装起来 rtmpHeader map[uint32]*ChunkHeader // RtmpHeader objectEncoding float64 appName string @@ -160,8 +160,9 @@ func (conn *NetConnection) readChunk() (msg *Chunk, err error) { } msgLen := int(h.MessageLength) - if !ok || currentBody.Len() == 0 { - currentBody = util.Buffer(make([]byte, 0, msgLen)) + if !ok { + newBuffer := util.Buffer(make([]byte, 0, msgLen)) + currentBody = &newBuffer conn.incompleteRtmpBody[ChunkStreamID] = currentBody } @@ -182,7 +183,6 @@ func (conn *NetConnection) readChunk() (msg *Chunk, err error) { } err = GetRtmpMessage(msg) } - conn.incompleteRtmpBody[ChunkStreamID] = currentBody return } diff --git a/server.go b/server.go index f272c11..b6f4feb 100644 --- a/server.go +++ b/server.go @@ -44,7 +44,7 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) { writeChunkSize: RTMP_DEFAULT_CHUNK_SIZE, readChunkSize: RTMP_DEFAULT_CHUNK_SIZE, rtmpHeader: make(map[uint32]*ChunkHeader), - incompleteRtmpBody: make(map[uint32]util.Buffer), + incompleteRtmpBody: make(map[uint32]*util.Buffer), bandwidth: RTMP_MAX_CHUNK_SIZE << 3, tmpBuf: make([]byte, 4), } @@ -104,7 +104,7 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) { plugin.Info("createStream:", zap.Uint32("streamId", streamId)) nc.ResponseCreateStream(cmd.TransactionId, streamId) case *CURDStreamMessage: - if stream, ok := senders[cmd.StreamId]; ok { + if stream, ok := receivers[cmd.StreamId]; ok { stream.Stop() delete(senders, cmd.StreamId) }