mirror of
https://github.com/AlexxIT/go2rtc.git
synced 2026-04-22 15:47:06 +08:00
556 lines
13 KiB
Go
556 lines
13 KiB
Go
package tuya
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"net/url"
|
|
"regexp"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/AlexxIT/go2rtc/pkg/core"
|
|
"github.com/AlexxIT/go2rtc/pkg/webrtc"
|
|
"github.com/pion/rtp"
|
|
pion "github.com/pion/webrtc/v4"
|
|
)
|
|
|
|
type Client struct {
|
|
api TuyaAPI
|
|
conn *webrtc.Conn
|
|
pc *pion.PeerConnection
|
|
connected core.Waiter
|
|
closed bool
|
|
|
|
// HEVC only:
|
|
dc *pion.DataChannel
|
|
videoSSRC *uint32
|
|
audioSSRC *uint32
|
|
streamType int
|
|
isHEVC bool
|
|
handlersMu sync.RWMutex
|
|
handlers map[uint32]func(*rtp.Packet)
|
|
}
|
|
|
|
type DataChannelMessage struct {
|
|
Type string `json:"type"` // "codec", "start", "recv", "complete"
|
|
Msg string `json:"msg"`
|
|
}
|
|
|
|
// RecvMessage contains SSRC values for video/audio streams
|
|
type RecvMessage struct {
|
|
Video struct {
|
|
SSRC uint32 `json:"ssrc"`
|
|
} `json:"video"`
|
|
Audio struct {
|
|
SSRC uint32 `json:"ssrc"`
|
|
} `json:"audio"`
|
|
}
|
|
|
|
func Dial(rawURL string) (core.Producer, error) {
|
|
escapedURL := strings.ReplaceAll(rawURL, "#", "%23")
|
|
u, err := url.Parse(escapedURL)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
query := u.Query()
|
|
|
|
// Tuya Smart API
|
|
email := query.Get("email")
|
|
password := query.Get("password")
|
|
|
|
// Tuya Cloud API
|
|
uid := query.Get("uid")
|
|
clientId := query.Get("client_id")
|
|
clientSecret := query.Get("client_secret")
|
|
|
|
// Shared params
|
|
deviceId := query.Get("device_id")
|
|
|
|
// Stream params
|
|
streamResolution := query.Get("resolution")
|
|
|
|
useSmartApi := deviceId != "" && email != "" && password != ""
|
|
useCloudApi := deviceId != "" && uid != "" && clientId != "" && clientSecret != ""
|
|
|
|
if streamResolution == "" || (streamResolution != "hd" && streamResolution != "sd") {
|
|
streamResolution = "hd"
|
|
}
|
|
|
|
if !useSmartApi && !useCloudApi {
|
|
return nil, errors.New("tuya: wrong query params")
|
|
}
|
|
|
|
client := &Client{
|
|
handlers: make(map[uint32]func(*rtp.Packet)),
|
|
}
|
|
|
|
if useSmartApi {
|
|
if client.api, err = NewTuyaSmartApiClient(nil, u.Hostname(), email, password, deviceId); err != nil {
|
|
return nil, fmt.Errorf("tuya: %w", err)
|
|
}
|
|
} else {
|
|
if client.api, err = NewTuyaCloudApiClient(u.Hostname(), uid, deviceId, clientId, clientSecret); err != nil {
|
|
return nil, fmt.Errorf("tuya: %w", err)
|
|
}
|
|
}
|
|
|
|
if err := client.api.Init(); err != nil {
|
|
return nil, fmt.Errorf("tuya: %w", err)
|
|
}
|
|
|
|
client.streamType = client.api.GetStreamType(streamResolution)
|
|
client.isHEVC = client.api.IsHEVC(client.streamType)
|
|
|
|
// Create a new PeerConnection
|
|
conf := pion.Configuration{
|
|
ICEServers: client.api.GetICEServers(),
|
|
ICETransportPolicy: pion.ICETransportPolicyAll,
|
|
BundlePolicy: pion.BundlePolicyMaxBundle,
|
|
}
|
|
|
|
api, err := webrtc.NewAPI()
|
|
if err != nil {
|
|
client.Close(err)
|
|
return nil, err
|
|
}
|
|
|
|
client.pc, err = api.NewPeerConnection(conf)
|
|
if err != nil {
|
|
client.Close(err)
|
|
return nil, err
|
|
}
|
|
|
|
// protect from sending ICE candidate before Offer
|
|
var sendOffer core.Waiter
|
|
|
|
// protect from blocking on errors
|
|
defer sendOffer.Done(nil)
|
|
|
|
// Create new WebRTC connection
|
|
client.conn = webrtc.NewConn(client.pc)
|
|
client.conn.FormatName = "tuya/webrtc"
|
|
client.conn.Mode = core.ModeActiveProducer
|
|
client.conn.Protocol = "mqtt"
|
|
|
|
mqttClient := client.api.GetMqtt()
|
|
if mqttClient == nil {
|
|
err = errors.New("tuya: no mqtt client")
|
|
client.Close(err)
|
|
return nil, err
|
|
}
|
|
|
|
// Set up MQTT handlers
|
|
mqttClient.handleAnswer = func(answer AnswerFrame) {
|
|
// fmt.Printf("tuya: answer: %s\n", answer.Sdp)
|
|
|
|
desc := pion.SessionDescription{
|
|
Type: pion.SDPTypePranswer,
|
|
SDP: answer.Sdp,
|
|
}
|
|
|
|
if err = client.pc.SetRemoteDescription(desc); err != nil {
|
|
client.Close(err)
|
|
return
|
|
}
|
|
|
|
if err = client.conn.SetAnswer(answer.Sdp); err != nil {
|
|
client.Close(err)
|
|
return
|
|
}
|
|
|
|
if client.isHEVC {
|
|
// Tuya responds with H264/90000 even for HEVC streams
|
|
// So we need to replace video codecs with HEVC ones from API
|
|
for _, media := range client.conn.Medias {
|
|
if media.Kind == core.KindVideo {
|
|
codecs := client.api.GetVideoCodecs()
|
|
if codecs != nil {
|
|
media.Codecs = codecs
|
|
}
|
|
}
|
|
}
|
|
|
|
// Audio codecs from API as well
|
|
// Tuya responds with multiple audio codecs (PCMU, PCMA)
|
|
// But the quality is bad if we use PCMU and skill only has PCMA
|
|
for _, media := range client.conn.Medias {
|
|
if media.Kind == core.KindAudio {
|
|
codecs := client.api.GetAudioCodecs()
|
|
if codecs != nil {
|
|
media.Codecs = codecs
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
mqttClient.handleCandidate = func(candidate CandidateFrame) {
|
|
// fmt.Printf("tuya: candidate: %s\n", candidate.Candidate)
|
|
|
|
if candidate.Candidate != "" {
|
|
client.conn.AddCandidate(candidate.Candidate)
|
|
if err != nil {
|
|
client.Close(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
mqttClient.handleDisconnect = func() {
|
|
// fmt.Println("tuya: disconnect")
|
|
client.Close(errors.New("mqtt: disconnect"))
|
|
}
|
|
|
|
mqttClient.handleError = func(err error) {
|
|
// fmt.Printf("tuya: error: %s\n", err.Error())
|
|
client.Close(err)
|
|
}
|
|
|
|
if client.isHEVC {
|
|
maxRetransmits := uint16(5)
|
|
ordered := true
|
|
client.dc, err = client.pc.CreateDataChannel("fmp4Stream", &pion.DataChannelInit{
|
|
MaxRetransmits: &maxRetransmits,
|
|
Ordered: &ordered,
|
|
})
|
|
|
|
// DataChannel receives two types of messages:
|
|
// 1. String messages: Control messages (codec, recv)
|
|
// 2. Binary messages: RTP packets with video/audio
|
|
client.dc.OnMessage(func(msg pion.DataChannelMessage) {
|
|
if msg.IsString {
|
|
// Handle control messages (codec, recv, etc.)
|
|
if connected, err := client.probe(msg); err != nil {
|
|
client.Close(err)
|
|
} else if connected {
|
|
client.connected.Done(nil)
|
|
}
|
|
} else {
|
|
// Handle RTP packets - Route by SSRC retrieved from "recv" message
|
|
packet := &rtp.Packet{}
|
|
if err := packet.Unmarshal(msg.Data); err != nil {
|
|
// Skip invalid packets
|
|
return
|
|
}
|
|
|
|
if handler, ok := client.getHandler(packet.SSRC); ok {
|
|
handler(packet)
|
|
}
|
|
}
|
|
})
|
|
|
|
client.dc.OnError(func(err error) {
|
|
// fmt.Printf("tuya: datachannel error: %s\n", err.Error())
|
|
client.Close(err)
|
|
})
|
|
|
|
client.dc.OnClose(func() {
|
|
// fmt.Println("tuya: datachannel closed")
|
|
client.Close(errors.New("datachannel: closed"))
|
|
})
|
|
|
|
client.dc.OnOpen(func() {
|
|
// fmt.Println("tuya: datachannel opened")
|
|
|
|
codecRequest, _ := json.Marshal(DataChannelMessage{
|
|
Type: "codec",
|
|
Msg: "",
|
|
})
|
|
|
|
if err := client.sendMessageToDataChannel(codecRequest); err != nil {
|
|
client.Close(fmt.Errorf("failed to send codec request: %w", err))
|
|
}
|
|
})
|
|
}
|
|
|
|
// Set up pc handler
|
|
client.conn.Listen(func(msg any) {
|
|
switch msg := msg.(type) {
|
|
case *pion.ICECandidate:
|
|
_ = sendOffer.Wait()
|
|
if err := mqttClient.SendCandidate("a=" + msg.ToJSON().Candidate); err != nil {
|
|
client.Close(err)
|
|
}
|
|
|
|
case pion.PeerConnectionState:
|
|
switch msg {
|
|
case pion.PeerConnectionStateNew:
|
|
break
|
|
case pion.PeerConnectionStateConnecting:
|
|
break
|
|
case pion.PeerConnectionStateConnected:
|
|
// On HEVC, wait for DataChannel to be opened and camera to send codec info
|
|
if !client.isHEVC {
|
|
if streamResolution == "hd" {
|
|
_ = mqttClient.SendResolution(0)
|
|
}
|
|
client.connected.Done(nil)
|
|
}
|
|
case pion.PeerConnectionStateClosed:
|
|
client.Close(errors.New("webrtc: " + msg.String()))
|
|
default:
|
|
// client.Close(errors.New("webrtc: " + msg.String()))
|
|
}
|
|
}
|
|
})
|
|
|
|
// Audio first, otherwise tuya will send corrupt sdp
|
|
medias := []*core.Media{
|
|
{Kind: core.KindAudio, Direction: core.DirectionSendRecv},
|
|
{Kind: core.KindVideo, Direction: core.DirectionRecvonly},
|
|
}
|
|
|
|
// Create offer
|
|
offer, err := client.conn.CreateOffer(medias)
|
|
if err != nil {
|
|
client.Close(err)
|
|
return nil, err
|
|
}
|
|
|
|
// horter sdp, remove a=extmap... line, device ONLY allow 8KB json payload
|
|
// https://github.com/tuya/webrtc-demo-go/blob/04575054f18ccccb6bc9d82939dd46d449544e20/static/js/main.js#L224
|
|
re := regexp.MustCompile(`\r\na=extmap[^\r\n]*`)
|
|
offer = re.ReplaceAllString(offer, "")
|
|
|
|
// Send offer
|
|
if err := mqttClient.SendOffer(offer, streamResolution, client.streamType, client.isHEVC); err != nil {
|
|
err = fmt.Errorf("tuya: %w", err)
|
|
client.Close(err)
|
|
return nil, err
|
|
}
|
|
|
|
sendOffer.Done(nil)
|
|
|
|
// Wait for connection
|
|
if err = client.connected.Wait(); err != nil {
|
|
err = fmt.Errorf("tuya: %w", err)
|
|
client.Close(err)
|
|
return nil, err
|
|
}
|
|
|
|
return client, nil
|
|
}
|
|
|
|
func (c *Client) GetMedias() []*core.Media {
|
|
return c.conn.GetMedias()
|
|
}
|
|
|
|
func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
|
|
return c.conn.GetTrack(media, codec)
|
|
}
|
|
|
|
func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
|
|
localTrack := c.conn.GetSenderTrack(media.ID)
|
|
if localTrack == nil {
|
|
return errors.New("webrtc: can't get track")
|
|
}
|
|
|
|
// DISABLED: Speaker Protocol 312 command
|
|
// JavaScript client doesn't send this on first call either
|
|
// Only subsequent calls (when speakerChloron is set) send Protocol 312
|
|
// mqttClient := c.api.GetMqtt()
|
|
// if mqttClient != nil {
|
|
// _ = mqttClient.SendSpeaker(1)
|
|
// }
|
|
|
|
payloadType := codec.PayloadType
|
|
|
|
sender := core.NewSender(media, codec)
|
|
|
|
switch track.Codec.Name {
|
|
case core.CodecPCMA, core.CodecPCMU, core.CodecPCM, core.CodecPCML:
|
|
// Frame size affects audio delay with Tuya cameras:
|
|
// Browser sends standard 20ms frames (160 bytes for G.711), but this causes
|
|
// up to 4s delay on some Tuya cameras. Increasing to 240 bytes (30ms) reduces
|
|
// delay to ~2s. Higher values (320+ bytes) don't work and cause issues.
|
|
// Using 240 bytes (30ms) as optimal balance between latency and stability.
|
|
frameSize := 240
|
|
|
|
var buf []byte
|
|
var seq uint16
|
|
var ts uint32
|
|
|
|
sender.Handler = func(packet *rtp.Packet) {
|
|
buf = append(buf, packet.Payload...)
|
|
|
|
for len(buf) >= frameSize {
|
|
payload := buf[:frameSize]
|
|
|
|
pkt := &rtp.Packet{
|
|
Header: rtp.Header{
|
|
Version: 2,
|
|
Marker: true,
|
|
PayloadType: payloadType,
|
|
SequenceNumber: seq,
|
|
Timestamp: ts,
|
|
SSRC: packet.SSRC,
|
|
},
|
|
Payload: payload,
|
|
}
|
|
|
|
seq++
|
|
ts += uint32(frameSize)
|
|
buf = buf[frameSize:]
|
|
|
|
c.conn.Send += pkt.MarshalSize()
|
|
_ = localTrack.WriteRTP(payloadType, pkt)
|
|
}
|
|
}
|
|
|
|
default:
|
|
sender.Handler = func(packet *rtp.Packet) {
|
|
c.conn.Send += packet.MarshalSize()
|
|
_ = localTrack.WriteRTP(payloadType, packet)
|
|
}
|
|
}
|
|
|
|
sender.HandleRTP(track)
|
|
c.conn.Senders = append(c.conn.Senders, sender)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Client) Start() error {
|
|
if len(c.conn.Receivers) == 0 {
|
|
return errors.New("tuya: no receivers")
|
|
}
|
|
|
|
var video, audio *core.Receiver
|
|
for _, receiver := range c.conn.Receivers {
|
|
if receiver.Codec.IsVideo() {
|
|
video = receiver
|
|
} else if receiver.Codec.IsAudio() {
|
|
audio = receiver
|
|
}
|
|
}
|
|
|
|
if c.videoSSRC != nil {
|
|
c.setHandler(*c.videoSSRC, func(packet *rtp.Packet) {
|
|
if video != nil {
|
|
video.WriteRTP(packet)
|
|
}
|
|
})
|
|
}
|
|
|
|
if c.audioSSRC != nil {
|
|
c.setHandler(*c.audioSSRC, func(packet *rtp.Packet) {
|
|
if audio != nil {
|
|
audio.WriteRTP(packet)
|
|
}
|
|
})
|
|
}
|
|
|
|
return c.conn.Start()
|
|
}
|
|
|
|
func (c *Client) Stop() error {
|
|
if c.closed {
|
|
return nil
|
|
}
|
|
|
|
c.closed = true
|
|
|
|
c.clearHandlers()
|
|
|
|
if c.conn != nil {
|
|
_ = c.conn.Stop()
|
|
}
|
|
|
|
if c.api != nil {
|
|
c.api.Close()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Client) Close(err error) error {
|
|
c.connected.Done(err)
|
|
return c.Stop()
|
|
}
|
|
|
|
func (c *Client) MarshalJSON() ([]byte, error) {
|
|
return c.conn.MarshalJSON()
|
|
}
|
|
|
|
func (c *Client) setHandler(ssrc uint32, handler func(*rtp.Packet)) {
|
|
c.handlersMu.Lock()
|
|
defer c.handlersMu.Unlock()
|
|
c.handlers[ssrc] = handler
|
|
}
|
|
|
|
func (c *Client) getHandler(ssrc uint32) (func(*rtp.Packet), bool) {
|
|
c.handlersMu.RLock()
|
|
defer c.handlersMu.RUnlock()
|
|
handler, ok := c.handlers[ssrc]
|
|
return handler, ok
|
|
}
|
|
|
|
func (c *Client) clearHandlers() {
|
|
c.handlersMu.Lock()
|
|
defer c.handlersMu.Unlock()
|
|
for ssrc := range c.handlers {
|
|
delete(c.handlers, ssrc)
|
|
}
|
|
}
|
|
|
|
func (c *Client) probe(msg pion.DataChannelMessage) (bool, error) {
|
|
// fmt.Printf("[tuya] Received string message: %s\n", string(msg.Data))
|
|
|
|
var message DataChannelMessage
|
|
if err := json.Unmarshal([]byte(msg.Data), &message); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
switch message.Type {
|
|
case "codec":
|
|
// Camera responded to our codec request - now request frame start
|
|
frameRequest, _ := json.Marshal(DataChannelMessage{
|
|
Type: "start",
|
|
Msg: "frame",
|
|
})
|
|
|
|
err := c.sendMessageToDataChannel(frameRequest)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
case "recv":
|
|
// Camera sends SSRC values for video/audio streams
|
|
// We need these to route incoming RTP packets correctly
|
|
var recvMessage RecvMessage
|
|
if err := json.Unmarshal([]byte(message.Msg), &recvMessage); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
videoSSRC := recvMessage.Video.SSRC
|
|
audioSSRC := recvMessage.Audio.SSRC
|
|
c.videoSSRC = &videoSSRC
|
|
c.audioSSRC = &audioSSRC
|
|
|
|
// Send "complete" to tell camera we're ready to receive RTP packets
|
|
completeMsg, _ := json.Marshal(DataChannelMessage{
|
|
Type: "complete",
|
|
Msg: "",
|
|
})
|
|
|
|
err := c.sendMessageToDataChannel(completeMsg)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
func (c *Client) sendMessageToDataChannel(message []byte) error {
|
|
if c.dc != nil {
|
|
// fmt.Printf("[tuya] sending message to data channel: %s\n", message)
|
|
return c.dc.Send(message)
|
|
}
|
|
|
|
return nil
|
|
}
|