diff --git a/pkg/webrtc/client.go b/pkg/webrtc/client.go index 658573ca..8a5e2562 100644 --- a/pkg/webrtc/client.go +++ b/pkg/webrtc/client.go @@ -7,32 +7,37 @@ import ( ) func (c *Conn) CreateOffer(medias []*streamer.Media) (string, error) { + // 1. Create transeivers with proper kind and direction for _, media := range medias { + var err error switch media.Direction { case streamer.DirectionRecvonly: - if _, err := c.pc.AddTransceiverFromKind( + _, err = c.pc.AddTransceiverFromKind( webrtc.NewRTPCodecType(media.Kind), webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}, - ); err != nil { - return "", err - } + ) case streamer.DirectionSendonly: - if _, err := c.pc.AddTransceiverFromTrack( + _, err = c.pc.AddTransceiverFromTrack( NewTrack(media.Kind), webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionSendonly}, - ); err != nil { - return "", err - } + ) case streamer.DirectionSendRecv: - panic("not implemented") + // default transceiver is sendrecv + _, err = c.pc.AddTransceiverFromTrack(NewTrack(media.Kind)) + } + + if err != nil { + return "", err } } + // 2. Create local offer desc, err := c.pc.CreateOffer(nil) if err != nil { return "", err } + // 3. Start gathering phase if err = c.pc.SetLocalDescription(desc); err != nil { return "", err } @@ -82,6 +87,9 @@ func (c *Conn) SetAnswer(answer string) (err error) { return nil } +// fakeFormatsInAnswer - fix pion bug with remote SDP parsing: +// pion will process formats only from first media of each kind +// so we add all formats from first offer media to the first answer media func fakeFormatsInAnswer(offer, answer string) string { sd2 := &sdp.SessionDescription{} if err := sd2.Unmarshal([]byte(answer)); err != nil { diff --git a/pkg/webrtc/conn.go b/pkg/webrtc/conn.go index 5b314e74..2b60721f 100644 --- a/pkg/webrtc/conn.go +++ b/pkg/webrtc/conn.go @@ -2,8 +2,11 @@ package webrtc import ( "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/h264" + "github.com/AlexxIT/go2rtc/pkg/h265" "github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/pion/rtcp" + "github.com/pion/rtp" "github.com/pion/webrtc/v3" "time" ) @@ -116,6 +119,54 @@ func (c *Conn) AddCandidate(candidate string) error { return c.pc.AddICECandidate(webrtc.ICECandidateInit{Candidate: candidate}) } +func (c *Conn) getTranseiver(mid string) *webrtc.RTPTransceiver { + for _, tr := range c.pc.GetTransceivers() { + if tr.Mid() == mid { + return tr + } + } + return nil +} +func (c *Conn) addSendTrack(media *streamer.Media, track *streamer.Track) *streamer.Track { + tr := c.getTranseiver(media.MID) + sender := tr.Sender() + localTrack := sender.Track().(*Track) + + codec := track.Codec + + // important to get remote PayloadType + payloadType := media.MatchCodec(codec).PayloadType + + push := func(packet *rtp.Packet) error { + c.send += packet.MarshalSize() + return localTrack.WriteRTP(payloadType, packet) + } + + switch codec.Name { + case streamer.CodecH264: + wrapper := h264.RTPPay(1200) + push = wrapper(push) + + if codec.IsRTP() { + wrapper = h264.RTPDepay(track) + } else { + wrapper = h264.RepairAVC(track) + } + push = wrapper(push) + + case streamer.CodecH265: + // SafariPay because it is the only browser in the world + // that supports WebRTC + H265 + wrapper := h265.SafariPay(1200) + push = wrapper(push) + + wrapper = h265.RTPDepay(track) + push = wrapper(push) + } + + return track.Bind(push) +} + func (c *Conn) getRecvTrack(remote *webrtc.TrackRemote) *streamer.Track { payloadType := uint8(remote.PayloadType()) diff --git a/pkg/webrtc/consumer.go b/pkg/webrtc/consumer.go index 5ffdcf82..4b74cd97 100644 --- a/pkg/webrtc/consumer.go +++ b/pkg/webrtc/consumer.go @@ -2,8 +2,6 @@ package webrtc import ( "encoding/json" - "github.com/AlexxIT/go2rtc/pkg/h264" - "github.com/AlexxIT/go2rtc/pkg/h265" "github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/pion/rtp" "github.com/pion/webrtc/v3" @@ -19,7 +17,7 @@ func (c *Conn) AddTrack(media *streamer.Media, track *streamer.Track) *streamer. switch track.Direction { case streamer.DirectionSendonly: // send our track to WebRTC consumer - return c.addConsumerSendTrack(media, track) + return c.addSendTrack(media, track) case streamer.DirectionRecvonly: // receive track from WebRTC consumer (microphone, backchannel, two way audio) @@ -42,101 +40,20 @@ func (c *Conn) AddTrack(media *streamer.Media, track *streamer.Track) *streamer. panic("not implemented") } -func (c *Conn) addConsumerSendTrack(media *streamer.Media, track *streamer.Track) *streamer.Track { - codec := track.Codec - - // webrtc.codecParametersFuzzySearch - caps := webrtc.RTPCodecCapability{ - MimeType: MimeType(codec), - Channels: codec.Channels, - ClockRate: codec.ClockRate, - } - - if codec.Name == streamer.CodecH264 { - // don't know if this really neccessary - // I have tested multiple browsers and H264 profile has no effect on anything - caps.SDPFmtpLine = "packetization-mode=1;profile-level-id=42e01f" - } - - // important to use same streamID so JS will automatically - // join two tracks as one source/stream - trackLocal, err := webrtc.NewTrackLocalStaticRTP( - caps, caps.MimeType[:5], "go2rtc", - ) - if err != nil { - return nil - } - - init := webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionSendonly} - tr, err := c.pc.AddTransceiverFromTrack(trackLocal, init) - if err != nil { - return nil - } - - if err = tr.SetMid(media.MID); err != nil { - return nil - } - - codecs := []webrtc.RTPCodecParameters{{RTPCodecCapability: caps}} - if err = tr.SetCodecPreferences(codecs); err != nil { - return nil - } - - push := func(packet *rtp.Packet) error { - c.send += packet.MarshalSize() - return trackLocal.WriteRTP(packet) - } - - switch codec.Name { - case streamer.CodecH264: - wrapper := h264.RTPPay(1200) - push = wrapper(push) - - if codec.IsRTP() { - wrapper = h264.RTPDepay(track) - } else { - wrapper = h264.RepairAVC(track) - } - push = wrapper(push) - - case streamer.CodecH265: - // SafariPay because it is the only browser in the world - // that supports WebRTC + H265 - wrapper := h265.SafariPay(1200) - push = wrapper(push) - - wrapper = h265.RTPDepay(track) - push = wrapper(push) - } - - track = track.Bind(push) - c.tracks = append(c.tracks, track) - return track -} - func (c *Conn) addConsumerRecvTrack(media *streamer.Media, track *streamer.Track) *streamer.Track { - caps := webrtc.RTPCodecCapability{ - MimeType: MimeType(track.Codec), - ClockRate: track.Codec.ClockRate, - Channels: track.Codec.Channels, + params := webrtc.RTPCodecParameters{ + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: MimeType(track.Codec), + ClockRate: track.Codec.ClockRate, + Channels: track.Codec.Channels, + }, + PayloadType: 0, // don't know if this necessary } - init := webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly} - tr, err := c.pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, init) - if err != nil { - return nil - } + tr := c.getTranseiver(media.MID) - if err = tr.SetMid(media.MID); err != nil { - return nil - } - - codecs := []webrtc.RTPCodecParameters{ - {RTPCodecCapability: caps, PayloadType: webrtc.PayloadType(track.Codec.PayloadType)}, - } - if err = tr.SetCodecPreferences(codecs); err != nil { - return nil - } + // set codec for consumer recv track so remote peer should send media with this codec + _ = tr.SetCodecPreferences([]webrtc.RTPCodecParameters{params}) c.tracks = append(c.tracks, track) return track diff --git a/pkg/webrtc/producer.go b/pkg/webrtc/producer.go index acd45fd2..b2c33048 100644 --- a/pkg/webrtc/producer.go +++ b/pkg/webrtc/producer.go @@ -1,10 +1,7 @@ package webrtc import ( - "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/streamer" - "github.com/pion/rtp" - "github.com/pion/webrtc/v3" ) func (c *Conn) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track { @@ -18,14 +15,10 @@ func (c *Conn) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer. } } - var track *streamer.Track - if media.Direction == streamer.DirectionSendonly { - track = streamer.NewTrack(media, codec) - } else { - track = c.getProducerSendTrack(media, codec) - if track == nil { - panic("getProducerSendTrack return nil track") - } + track := streamer.NewTrack(media, codec) + + if media.Direction == streamer.DirectionRecvonly { + track = c.addSendTrack(media, track) } c.tracks = append(c.tracks, track) @@ -40,97 +33,3 @@ func (c *Conn) Start() error { func (c *Conn) Stop() error { return c.pc.Close() } - -func (c *Conn) getProducerSendTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track { - tr := c.getTranseiver(media.MID) - if tr == nil { - return nil - } - - sender := tr.Sender() - if sender == nil { - return nil - } - - track, ok := sender.Track().(*Track) - if !ok { - return nil - } - - push := func(packet *rtp.Packet) error { - c.send += packet.MarshalSize() - return track.WriteRTP(codec.PayloadType, packet) - } - - return streamer.NewTrack(media, codec).Bind(push) -} - -func (c *Conn) getTranseiver(mid string) *webrtc.RTPTransceiver { - for _, tr := range c.pc.GetTransceivers() { - if tr.Mid() == mid { - return tr - } - } - return nil -} - -type Track struct { - kind string - id string - streamID string - sequence uint16 - ssrc uint32 - writer webrtc.TrackLocalWriter -} - -func NewTrack(kind string) *Track { - return &Track{ - kind: kind, - id: core.RandString(16), - streamID: core.RandString(16), - } -} - -func (t *Track) Bind(context webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, error) { - t.ssrc = uint32(context.SSRC()) - t.writer = context.WriteStream() - - for _, parameters := range context.CodecParameters() { - // return first parameters - return parameters, nil - } - - return webrtc.RTPCodecParameters{}, nil -} - -func (t *Track) Unbind(context webrtc.TrackLocalContext) error { - return nil -} - -func (t *Track) ID() string { - return t.id -} - -func (t *Track) RID() string { - return "" // don't know what it is -} - -func (t *Track) StreamID() string { - return t.streamID -} - -func (t *Track) Kind() webrtc.RTPCodecType { - return webrtc.NewRTPCodecType(t.kind) -} - -func (t *Track) WriteRTP(payloadType uint8, packet *rtp.Packet) error { - // important to have internal counter if input packets from different sources - t.sequence++ - - header := packet.Header - header.SSRC = t.ssrc - header.PayloadType = payloadType - header.SequenceNumber = t.sequence - _, err := t.writer.WriteRTP(&header, packet.Payload) - return err -} diff --git a/pkg/webrtc/server.go b/pkg/webrtc/server.go index d72505cb..aba95468 100644 --- a/pkg/webrtc/server.go +++ b/pkg/webrtc/server.go @@ -14,6 +14,34 @@ func (c *Conn) SetOffer(offer string) (err error) { return } + // create transceivers with opposite direction + for _, md := range sd.MediaDescriptions { + var mid string + var tr *webrtc.RTPTransceiver + for _, attr := range md.Attributes { + switch attr.Key { + case streamer.DirectionSendRecv: + tr, _ = c.pc.AddTransceiverFromTrack(NewTrack(md.MediaName.Media)) + case streamer.DirectionSendonly: + tr, _ = c.pc.AddTransceiverFromKind( + webrtc.NewRTPCodecType(md.MediaName.Media), + webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}, + ) + case streamer.DirectionRecvonly: + tr, _ = c.pc.AddTransceiverFromTrack( + NewTrack(md.MediaName.Media), + webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionSendonly}, + ) + case "mid": + mid = attr.Value + } + } + + if mid != "" && tr != nil { + _ = tr.SetMid(mid) + } + } + medias := streamer.UnmarshalMedias(sd.MediaDescriptions) // sort medias, so video will always be before audio @@ -33,51 +61,6 @@ func (c *Conn) SetOffer(offer string) (err error) { } func (c *Conn) GetAnswer() (answer string, err error) { - switch c.Mode { - case streamer.ModePassiveProducer: - // init all Sender(s) for passive producer or they will be nil - // sender for passive producer is backchannel - sd := &sdp.SessionDescription{} - if err = sd.Unmarshal([]byte(c.offer)); err != nil { - return - } - - for _, md := range sd.MediaDescriptions { - for _, attr := range md.Attributes { - var direction webrtc.RTPTransceiverDirection - switch attr.Key { - case "recvonly": - direction = webrtc.RTPTransceiverDirectionSendonly - case "sendrecv": - direction = webrtc.RTPTransceiverDirectionSendrecv - } - - if direction > 0 { - _, _ = c.pc.AddTransceiverFromTrack( - NewTrack(md.MediaName.Media), - webrtc.RTPTransceiverInit{Direction: direction}, - ) - } - } - } - - case streamer.ModePassiveConsumer: - // fix sendrecv transeivers - set for sendonly codecs from recvonly - for _, tr1 := range c.pc.GetTransceivers() { - for _, tr2 := range c.pc.GetTransceivers() { - if tr1 == tr2 { - continue - } - if tr1.Mid() == tr2.Mid() && tr2.Direction() == webrtc.RTPTransceiverDirectionRecvonly { - codecs := tr2.Receiver().GetParameters().Codecs - if err = tr1.SetCodecPreferences(codecs); err != nil { - return "", err - } - } - } - } - } - // we need to process remote offer after we create transeivers desc := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: c.offer} if err = c.pc.SetRemoteDescription(desc); err != nil { diff --git a/pkg/webrtc/track.go b/pkg/webrtc/track.go new file mode 100644 index 00000000..676da2dd --- /dev/null +++ b/pkg/webrtc/track.go @@ -0,0 +1,74 @@ +package webrtc + +import ( + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/pion/rtp" + "github.com/pion/webrtc/v3" +) + +type Track struct { + kind string + id string + streamID string + sequence uint16 + ssrc uint32 + writer webrtc.TrackLocalWriter +} + +func NewTrack(kind string) *Track { + return &Track{ + kind: kind, + id: core.RandString(16), + streamID: core.RandString(16), + } +} + +func (t *Track) Bind(context webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, error) { + t.ssrc = uint32(context.SSRC()) + t.writer = context.WriteStream() + + for _, parameters := range context.CodecParameters() { + // return first parameters + return parameters, nil + } + + return webrtc.RTPCodecParameters{}, nil +} + +func (t *Track) Unbind(context webrtc.TrackLocalContext) error { + t.writer = nil + return nil +} + +func (t *Track) ID() string { + return t.id +} + +func (t *Track) RID() string { + return "" // don't know what it is +} + +func (t *Track) StreamID() string { + return t.streamID +} + +func (t *Track) Kind() webrtc.RTPCodecType { + return webrtc.NewRTPCodecType(t.kind) +} + +func (t *Track) WriteRTP(payloadType uint8, packet *rtp.Packet) error { + // in case when we start WriteRTP before Track.Bind + if t.writer == nil { + return nil + } + + // important to have internal counter if input packets from different sources + t.sequence++ + + header := packet.Header + header.SSRC = t.ssrc + header.PayloadType = payloadType + header.SequenceNumber = t.sequence + _, err := t.writer.WriteRTP(&header, packet.Payload) + return err +} diff --git a/www/webrtc-sync.html b/www/webrtc-sync.html index 4f818f3f..463a0f09 100644 --- a/www/webrtc-sync.html +++ b/www/webrtc-sync.html @@ -19,45 +19,44 @@