From e553c415b767d4ec4fddebf42834e6bd5ad8b93e Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Mon, 28 Jun 2021 20:22:50 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0ChunkType=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- msg.go | 21 ++++----------------- netConnection.go | 5 +++-- netStream.go | 10 ++++++++-- 3 files changed, 15 insertions(+), 21 deletions(-) diff --git a/msg.go b/msg.go index 663bf20..a2d3bfa 100644 --- a/msg.go +++ b/msg.go @@ -64,11 +64,6 @@ const ( ) var ( - rtmpHeaderPool = &sync.Pool{ - New: func() interface{} { - return new(ChunkHeader) - }, - } chunkMsgPool = &sync.Pool{ New: func() interface{} { return new(Chunk) @@ -77,7 +72,7 @@ var ( ) func newChunkHeader(messageType byte) *ChunkHeader { - head := rtmpHeaderPool.Get().(*ChunkHeader) + head := new(ChunkHeader) head.ChunkStreamID = RTMP_CSID_CONTROL if messageType == RTMP_MSG_AMF0_COMMAND { head.ChunkStreamID = RTMP_CSID_COMMAND @@ -89,7 +84,7 @@ func newChunkHeader(messageType byte) *ChunkHeader { return head } func newRtmpHeader(chunkID uint32, timestamp uint32, messageLength uint32, messageType byte, messageStreamID uint32, extendTimestamp uint32) *ChunkHeader { - head := rtmpHeaderPool.Get().(*ChunkHeader) + head := new(ChunkHeader) head.ChunkStreamID = chunkID head.Timestamp = timestamp head.MessageLength = messageLength @@ -99,16 +94,8 @@ func newRtmpHeader(chunkID uint32, timestamp uint32, messageLength uint32, messa return head } -func (h *ChunkHeader) Clone() *ChunkHeader { - head := rtmpHeaderPool.Get().(*ChunkHeader) - head.ChunkStreamID = h.ChunkStreamID - head.Timestamp = h.Timestamp - head.MessageLength = h.MessageLength - head.MessageTypeID = h.MessageTypeID - head.MessageStreamID = h.MessageStreamID - head.ExtendTimestamp = h.ExtendTimestamp - - return head +func (h ChunkHeader) Clone() *ChunkHeader { + return &h } type RtmpMessage interface { diff --git a/netConnection.go b/netConnection.go index d4c2d1e..4f28726 100644 --- a/netConnection.go +++ b/netConnection.go @@ -416,7 +416,7 @@ func (conn *NetConnection) readChunk() (msg *Chunk, err error) { } ChunkStreamID := uint32(head & 0x3f) // 0011 1111 - ChunkType := (head & 0xc0) >> 6 // 1100 0000 + ChunkType := head >> 6 // 1100 0000 // 如果块流ID为0,1的话,就需要计算. ChunkStreamID, err = conn.readChunkStreamID(ChunkStreamID) @@ -438,6 +438,7 @@ func (conn *NetConnection) readChunk() (msg *Chunk, err error) { } chunkHead, err := conn.readChunkType(h, ChunkType) + if err != nil { return nil, errors.New("get chunk type error :" + err.Error()) } @@ -615,7 +616,7 @@ func (conn *NetConnection) readChunkType(h *ChunkHeader, chunkType byte) (head * } case 3: { - h.ChunkType = chunkType + //h.ChunkType = chunkType } } diff --git a/netStream.go b/netStream.go index efd451b..28526b0 100644 --- a/netStream.go +++ b/netStream.go @@ -109,18 +109,24 @@ func processRtmp(conn net.Conn) { vt := stream.NewVideoTrack(0) at := stream.NewAudioTrack(0) rec_audio = func(msg *Chunk) { + if msg.ChunkType == 0 { + absTs[msg.ChunkStreamID] = 0 + } if msg.Timestamp == 0xffffff { absTs[msg.ChunkStreamID] += msg.ExtendTimestamp } else { - absTs[msg.ChunkStreamID] += msg.Timestamp // 绝对时间戳 + absTs[msg.ChunkStreamID] += msg.Timestamp } at.PushByteStream(engine.AudioPack{Timestamp: absTs[msg.ChunkStreamID], Payload: msg.Body}) } rec_video = func(msg *Chunk) { + if msg.ChunkType == 0 { + absTs[msg.ChunkStreamID] = 0 + } if msg.Timestamp == 0xffffff { absTs[msg.ChunkStreamID] += msg.ExtendTimestamp } else { - absTs[msg.ChunkStreamID] += msg.Timestamp // 绝对时间戳 + absTs[msg.ChunkStreamID] += msg.Timestamp } vt.PushByteStream(engine.VideoPack{Timestamp: absTs[msg.ChunkStreamID], Payload: msg.Body}) }