Total rework FLV client

This commit is contained in:
Alexey Khit
2023-08-13 15:42:42 +03:00
parent e9795e7521
commit 0b6fda2af5
11 changed files with 703 additions and 8 deletions
+200
View File
@@ -0,0 +1,200 @@
package amf
import (
"encoding/binary"
"errors"
"math"
)
const (
TypeNumber byte = iota
TypeBoolean
TypeString
TypeObject
TypeNull = 5
TypeEcmaArray = 8
TypeObjectEnd = 9
)
// AMF spec: http://download.macromedia.com/pub/labs/amf/amf0_spec_121207.pdf
type AMF struct {
buf []byte
pos int
}
var ErrRead = errors.New("amf: read error")
func NewReader(b []byte) *AMF {
return &AMF{buf: b}
}
func (a *AMF) ReadItems() ([]any, error) {
var items []any
for a.pos < len(a.buf) {
v, err := a.ReadItem()
if err != nil {
return nil, err
}
items = append(items, v)
}
return items, nil
}
func (a *AMF) ReadItem() (any, error) {
dataType, err := a.ReadByte()
if err != nil {
return nil, err
}
switch dataType {
case TypeNumber:
return a.ReadNumber()
case TypeBoolean:
b, err := a.ReadByte()
return b != 0, err
case TypeString:
return a.ReadString()
case TypeObject:
return a.ReadObject()
case TypeNull:
return nil, nil
case TypeObjectEnd:
return nil, nil
}
return nil, ErrRead
}
func (a *AMF) ReadByte() (byte, error) {
if a.pos >= len(a.buf) {
return 0, ErrRead
}
v := a.buf[a.pos]
a.pos++
return v, nil
}
func (a *AMF) ReadNumber() (float64, error) {
if a.pos+8 > len(a.buf) {
return 0, ErrRead
}
v := binary.BigEndian.Uint64(a.buf[a.pos : a.pos+8])
a.pos += 8
return math.Float64frombits(v), nil
}
func (a *AMF) ReadString() (string, error) {
if a.pos+2 > len(a.buf) {
return "", ErrRead
}
size := int(binary.BigEndian.Uint16(a.buf[a.pos:]))
a.pos += 2
if a.pos+size > len(a.buf) {
return "", ErrRead
}
s := string(a.buf[a.pos : a.pos+size])
a.pos += size
return s, nil
}
func (a *AMF) ReadObject() (map[string]any, error) {
obj := make(map[string]any)
for {
k, err := a.ReadString()
if err != nil {
return nil, err
}
v, err := a.ReadItem()
if err != nil {
return nil, err
}
if k == "" {
break
}
obj[k] = v
}
return obj, nil
}
func (a *AMF) ReadEcmaArray() (map[string]any, error) {
if a.pos+4 > len(a.buf) {
return nil, ErrRead
}
a.pos += 4 // skip size
return a.ReadObject()
}
func NewWriter() *AMF {
return &AMF{}
}
func (a *AMF) Bytes() []byte {
return a.buf
}
func (a *AMF) WriteNumber(n float64) {
b := math.Float64bits(n)
a.buf = append(
a.buf, TypeNumber,
byte(b>>56), byte(b>>48), byte(b>>40), byte(b>>32),
byte(b>>24), byte(b>>16), byte(b>>8), byte(b),
)
}
func (a *AMF) WriteBool(b bool) {
if b {
a.buf = append(a.buf, TypeBoolean, 1)
} else {
a.buf = append(a.buf, TypeBoolean, 0)
}
}
func (a *AMF) WriteString(s string) {
n := len(s)
a.buf = append(a.buf, TypeString, byte(n>>8), byte(n))
a.buf = append(a.buf, s...)
}
func (a *AMF) WriteObject(obj map[string]any) {
a.buf = append(a.buf, TypeObject)
for k, v := range obj {
n := len(k)
a.buf = append(a.buf, byte(n>>8), byte(n))
a.buf = append(a.buf, k...)
switch v := v.(type) {
case string:
a.WriteString(v)
case int:
a.WriteNumber(float64(v))
case bool:
a.WriteBool(v)
default:
panic(v)
}
}
a.buf = append(a.buf, 0, 0, TypeObjectEnd)
}
func (a *AMF) WriteNull() {
a.buf = append(a.buf, TypeNull)
}
+162
View File
@@ -0,0 +1,162 @@
package flv
import (
"bytes"
"io"
"time"
"github.com/AlexxIT/go2rtc/pkg/aac"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/h264/avc"
"github.com/pion/rtp"
)
type Client struct {
URL string
rd io.Reader
medias []*core.Media
receivers []*core.Receiver
recv int
}
func NewClient(rd io.Reader) *Client {
return &Client{rd: rd}
}
func (c *Client) Describe() error {
if err := c.ReadHeader(); err != nil {
return err
}
// Normal software sends:
// 1. Video/audio flag in header
// 2. MetaData as first tag (with video/audio codec info)
// 3. Video/audio headers in 2nd and 3rd tag
// Reolink camera sends:
// 1. Empty video/audio flag
// 2. MedaData without stereo key for AAC
// 3. Audio header after Video keyframe tag
waitVideo := true
waitAudio := true
timeout := time.Now().Add(core.ProbeTimeout)
for (waitVideo || waitAudio) && time.Now().Before(timeout) {
tagType, _, b, err := c.ReadTag()
if err != nil {
return err
}
c.recv += len(b)
switch tagType {
case TagAudio:
if !waitAudio {
continue
}
waitAudio = false
codecID := b[0] >> 4 // SoundFormat
_ = b[0] & 0b1100 // SoundRate
_ = b[0] & 0b0010 // SoundSize
_ = b[0] & 0b0001 // SoundType
if codecID != CodecAAC {
continue
}
if b[1] != 0 { // check if header
continue
}
codec := aac.ConfigToCodec(b[2:])
media := &core.Media{
Kind: core.KindAudio,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{codec},
}
c.medias = append(c.medias, media)
case TagVideo:
if !waitVideo {
continue
}
waitVideo = false
_ = b[0] >> 4 // FrameType
codecID := b[0] & 0b1111 // CodecID
if codecID != CodecAVC {
continue
}
if b[1] != 0 { // check if header
continue
}
codec := avc.ConfigToCodec(b[5:])
media := &core.Media{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{codec},
}
c.medias = append(c.medias, media)
case TagData:
if !bytes.Contains(b, []byte("onMetaData")) {
continue
}
waitVideo = bytes.Contains(b, []byte("videocodecid"))
waitAudio = bytes.Contains(b, []byte("audiocodecid"))
}
}
return nil
}
func (c *Client) Play() error {
video, audio := core.VA(c.receivers)
for {
tagType, timeMS, b, err := c.ReadTag()
if err != nil {
return err
}
c.recv += len(b)
switch tagType {
case TagAudio:
if audio == nil || b[1] == 0 {
continue
}
pkt := &rtp.Packet{
Header: rtp.Header{
Timestamp: TimeToRTP(timeMS, audio.Codec.ClockRate),
},
Payload: b[2:],
}
audio.WriteRTP(pkt)
case TagVideo:
// frame type 4b, codecID 4b, avc packet type 8b, composition time 24b
if video == nil || b[1] == 0 {
continue
}
pkt := &rtp.Packet{
Header: rtp.Header{
Timestamp: TimeToRTP(timeMS, video.Codec.ClockRate),
},
Payload: b[5:],
}
video.WriteRTP(pkt)
}
}
}
+62
View File
@@ -0,0 +1,62 @@
package flv
import (
"encoding/binary"
"errors"
"io"
)
const (
TagAudio = 8
TagVideo = 9
TagData = 18
CodecAAC = 10
CodecAVC = 7
)
func (c *Client) ReadHeader() error {
b := make([]byte, 9)
if _, err := io.ReadFull(c.rd, b); err != nil {
return err
}
if string(b[:3]) != "FLV" {
return errors.New("flv: wrong header")
}
_ = b[4] // flags (skip because unsupported by Reolink cameras)
if skip := binary.BigEndian.Uint32(b[5:]) - 9; skip > 0 {
if _, err := io.ReadFull(c.rd, make([]byte, skip)); err != nil {
return err
}
}
return nil
}
func (c *Client) ReadTag() (byte, uint32, []byte, error) {
// https://rtmp.veriskope.com/pdf/video_file_format_spec_v10.pdf
b := make([]byte, 4+11)
if _, err := io.ReadFull(c.rd, b); err != nil {
return 0, 0, nil, err
}
b = b[4 : 4+11] // skip previous tag size
tagType := b[0]
size := uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3])
timeMS := uint32(b[4])<<16 | uint32(b[5])<<8 | uint32(b[6]) | uint32(b[7])<<24
b = make([]byte, size)
if _, err := io.ReadFull(c.rd, b); err != nil {
return 0, 0, nil, err
}
return tagType, timeMS, b, nil
}
func TimeToRTP(timeMS uint32, clockRate uint32) uint32 {
return timeMS * clockRate / 1000
}
+45
View File
@@ -0,0 +1,45 @@
package flv
import (
"encoding/json"
"io"
"github.com/AlexxIT/go2rtc/pkg/core"
)
func (c *Client) GetMedias() []*core.Media {
return c.medias
}
func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
for _, track := range c.receivers {
if track.Codec == codec {
return track, nil
}
}
track := core.NewReceiver(media, codec)
c.receivers = append(c.receivers, track)
return track, nil
}
func (c *Client) Start() error {
return c.Play()
}
func (c *Client) Stop() error {
if closer, ok := c.rd.(io.Closer); ok {
return closer.Close()
}
return nil
}
func (c *Client) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: "FLV active producer",
URL: c.URL,
Medias: c.medias,
Receivers: c.receivers,
Recv: c.recv,
}
return json.Marshal(info)
}