diff --git a/pkg/bits/reader.go b/pkg/bits/reader.go index 7832e0f2..10ea2253 100644 --- a/pkg/bits/reader.go +++ b/pkg/bits/reader.go @@ -127,3 +127,7 @@ func (r *Reader) ReadSEGolomb() int32 { return int32(b >> 1) } } + +func (r *Reader) Left() []byte { + return r.buf[r.pos:] +} diff --git a/pkg/mpegts/demuxer.go b/pkg/mpegts/demuxer.go index 83160105..a3efc2c9 100644 --- a/pkg/mpegts/demuxer.go +++ b/pkg/mpegts/demuxer.go @@ -1,6 +1,7 @@ package mpegts import ( + "bytes" "errors" "io" @@ -98,6 +99,11 @@ func (d *Demuxer) skip(i byte) { d.pos += i } +func (d *Demuxer) readBytes(i byte) []byte { + d.pos += i + return d.buf[d.pos-i : d.pos] +} + func (d *Demuxer) readPSIHeader() { // https://en.wikipedia.org/wiki/Program-specific_information#Table_Sections pointer := d.readByte() // Pointer field @@ -159,7 +165,11 @@ func (d *Demuxer) readPMT() { _ = d.readBits(4) // Reserved bits _ = d.readBits(2) // ES Info length unused bits size = d.readBits(10) // ES Info length - d.skip(byte(size)) + info := d.readBytes(byte(size)) + + if streamType == StreamTypePrivate && bytes.HasPrefix(info, opusInfo) { + streamType = StreamTypePrivateOPUS + } d.pes[pid] = &PES{StreamType: streamType} } @@ -175,7 +185,7 @@ func (d *Demuxer) readPES(pid uint16, start bool) *rtp.Packet { // if new payload beging if start { - if pes.Payload != nil { + if len(pes.Payload) != 0 { d.pos = skipRead return pes.GetPacket() // finish previous packet } @@ -314,12 +324,13 @@ const ( // https://en.wikipedia.org/wiki/Program-specific_information#Elementary_stream_types const ( - StreamTypeMetadata = 0 // Reserved - StreamTypePrivate = 0x06 // PCMU or PCMA or FLAC from FFmpeg - StreamTypeAAC = 0x0F - StreamTypeH264 = 0x1B - StreamTypeH265 = 0x24 - StreamTypePCMATapo = 0x90 + StreamTypeMetadata = 0 // Reserved + StreamTypePrivate = 0x06 // PCMU or PCMA or FLAC from FFmpeg + StreamTypeAAC = 0x0F + StreamTypeH264 = 0x1B + StreamTypeH265 = 0x24 + StreamTypePCMATapo = 0x90 + StreamTypePrivateOPUS = 0xEB ) // PES - Packetized Elementary Stream @@ -397,6 +408,23 @@ func (p *PES) GetPacket() (pkt *rtp.Packet) { } //p.Timestamp += uint32(len(p.Payload)) // update next timestamp! + + case StreamTypePrivateOPUS: + p.Sequence++ + + pkt = &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: p.StreamType, + SequenceNumber: p.Sequence, + Timestamp: p.PTS, + }, + } + + pkt.Payload, p.Payload = CutOPUSPacket(p.Payload) + p.PTS += opusDT + return } p.Payload = nil diff --git a/pkg/mpegts/opus.go b/pkg/mpegts/opus.go new file mode 100644 index 00000000..d6077ea4 --- /dev/null +++ b/pkg/mpegts/opus.go @@ -0,0 +1,66 @@ +package mpegts + +import ( + "github.com/AlexxIT/go2rtc/pkg/bits" +) + +// opusDT - each AU from FFmpeg has 5 OPUS packets. Each packet len = 960 in the 48000 clock. +const opusDT = 960 * ClockRate / 48000 + +// https://opus-codec.org/docs/ +var opusInfo = []byte{ // registration_descriptor + 0x05, // descriptor_tag + 0x04, // descriptor_length + 'O', 'p', 'u', 's', // format_identifier +} + +//goland:noinspection GoSnakeCaseUsage +func CutOPUSPacket(b []byte) (packet []byte, left []byte) { + r := bits.NewReader(b) + + size := opus_control_header(r) + if size == 0 { + return nil, nil + } + + packet = r.ReadBytes(size) + left = r.Left() + return +} + +//goland:noinspection GoSnakeCaseUsage +func opus_control_header(r *bits.Reader) int { + control_header_prefix := r.ReadBits(11) + if control_header_prefix != 0x3FF { + return 0 + } + + start_trim_flag := r.ReadBit() + end_trim_flag := r.ReadBit() + control_extension_flag := r.ReadBit() + _ = r.ReadBits(2) // reserved + + var payload_size int + for { + i := r.ReadByte() + payload_size += int(i) + if i < 255 { + break + } + } + + if start_trim_flag != 0 { + _ = r.ReadBits(3) + _ = r.ReadBits(13) + } + if end_trim_flag != 0 { + _ = r.ReadBits(3) + _ = r.ReadBits(13) + } + if control_extension_flag != 0 { + control_extension_length := r.ReadByte() + _ = r.ReadBytes(int(control_extension_length)) // reserved + } + + return payload_size +} diff --git a/pkg/mpegts/producer.go b/pkg/mpegts/producer.go index c7484cf0..78f320a2 100644 --- a/pkg/mpegts/producer.go +++ b/pkg/mpegts/producer.go @@ -87,7 +87,7 @@ func (c *Producer) probe() error { case StreamTypeMetadata: for _, streamType := range pkt.Payload { switch streamType { - case StreamTypeH264, StreamTypeH265, StreamTypeAAC: + case StreamTypeH264, StreamTypeH265, StreamTypeAAC, StreamTypePrivateOPUS: waitType = append(waitType, streamType) } } @@ -118,6 +118,19 @@ func (c *Producer) probe() error { Codecs: []*core.Codec{codec}, } c.Medias = append(c.Medias, media) + + case StreamTypePrivateOPUS: + codec := &core.Codec{ + Name: core.CodecOpus, + ClockRate: 48000, + Channels: 2, + } + media := &core.Media{ + Kind: core.KindAudio, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{codec}, + } + c.Medias = append(c.Medias, media) } } @@ -134,6 +147,8 @@ func StreamType(codec *core.Codec) uint8 { return StreamTypeAAC case core.CodecPCMA: return StreamTypePCMATapo + case core.CodecOpus: + return StreamTypePrivateOPUS } return 0 }