mirror of
https://github.com/pion/mediadevices.git
synced 2026-04-22 15:57:27 +08:00
cd6aaa1393
Also make the ReadCloser an Controllable allows to uncouple the controller implementation from the encoder. This is not needed for the 2 codec controller already implemented (openh264 and vpx) but is more future proof in case it required for other codecs.
538 lines
15 KiB
Go
538 lines
15 KiB
Go
package mediadevices
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"github.com/pion/interceptor"
|
|
"github.com/pion/rtcp"
|
|
"image"
|
|
"io"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/pion/mediadevices/pkg/codec"
|
|
"github.com/pion/mediadevices/pkg/driver"
|
|
"github.com/pion/mediadevices/pkg/io/audio"
|
|
"github.com/pion/mediadevices/pkg/io/video"
|
|
"github.com/pion/mediadevices/pkg/wave"
|
|
"github.com/pion/rtp"
|
|
"github.com/pion/webrtc/v3"
|
|
)
|
|
|
|
const (
|
|
rtpOutboundMTU = 1200
|
|
)
|
|
|
|
var (
|
|
errInvalidDriverType = errors.New("invalid driver type")
|
|
errNotFoundPeerConnection = errors.New("failed to find given peer connection")
|
|
)
|
|
|
|
// Source is a generic representation of a media source
|
|
type Source interface {
|
|
ID() string
|
|
Close() error
|
|
}
|
|
|
|
// VideoSource is a specific type of media source that emits a series of video frames
|
|
type VideoSource interface {
|
|
video.Reader
|
|
Source
|
|
}
|
|
|
|
// AudioSource is a specific type of media source that emits a series of audio chunks
|
|
type AudioSource interface {
|
|
audio.Reader
|
|
Source
|
|
}
|
|
|
|
// Track is an interface that represent MediaStreamTrack
|
|
// Reference: https://w3c.github.io/mediacapture-main/#mediastreamtrack
|
|
type Track interface {
|
|
Source
|
|
// OnEnded registers a handler to receive an error from the media stream track.
|
|
// If the error is already occured before registering, the handler will be
|
|
// immediately called.
|
|
OnEnded(func(error))
|
|
Kind() webrtc.RTPCodecType
|
|
// StreamID is the group this track belongs too. This must be unique
|
|
StreamID() string
|
|
// RID is the RTP Stearm ID for this track. This is only used for Simulcast
|
|
RID() string
|
|
// Bind binds the current track source to the given peer connection. In Pion/webrtc v3, the bind
|
|
// call will happen automatically after the SDP negotiation. Users won't need to call this manually.
|
|
Bind(webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, error)
|
|
// Unbind is the clean up operation that should be called after Bind. Similar to Bind, unbind will
|
|
// be called automatically in Pion/webrtc v3.
|
|
Unbind(webrtc.TrackLocalContext) error
|
|
// NewRTPReader creates a new reader from the source. The reader will encode the source, and packetize
|
|
// the encoded data in RTP format with given mtu size.
|
|
//
|
|
// Note: `mtu int` will be changed to `mtu uint16` in a future update.
|
|
NewRTPReader(codecName string, ssrc uint32, mtu int) (RTPReadCloser, error)
|
|
// NewEncodedReader creates a EncodedReadCloser that reads the encoded data in codecName format
|
|
NewEncodedReader(codecName string) (EncodedReadCloser, error)
|
|
// NewEncodedReader creates a new Go standard io.ReadCloser that reads the encoded data in codecName format
|
|
NewEncodedIOReader(codecName string) (io.ReadCloser, error)
|
|
}
|
|
|
|
type baseTrack struct {
|
|
Source
|
|
err error
|
|
onErrorHandler func(error)
|
|
mu sync.Mutex
|
|
endOnce sync.Once
|
|
kind MediaDeviceType
|
|
selector *CodecSelector
|
|
activePeerConnections map[string]chan<- chan<- struct{}
|
|
}
|
|
|
|
func newBaseTrack(source Source, kind MediaDeviceType, selector *CodecSelector) *baseTrack {
|
|
return &baseTrack{
|
|
Source: source,
|
|
kind: kind,
|
|
selector: selector,
|
|
activePeerConnections: make(map[string]chan<- chan<- struct{}),
|
|
}
|
|
}
|
|
|
|
// Kind returns track's kind
|
|
func (track *baseTrack) Kind() webrtc.RTPCodecType {
|
|
switch track.kind {
|
|
case VideoInput:
|
|
return webrtc.RTPCodecTypeVideo
|
|
case AudioInput:
|
|
return webrtc.RTPCodecTypeAudio
|
|
default:
|
|
panic("invalid track kind: only support VideoInput and AudioInput")
|
|
}
|
|
}
|
|
|
|
func (track *baseTrack) StreamID() string {
|
|
// TODO: StreamID should be used to group multiple tracks. Should get this information from mediastream instead.
|
|
generator, err := uuid.NewRandom()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
return generator.String()
|
|
}
|
|
|
|
// RID is only relevant if you wish to use Simulcast
|
|
func (track *baseTrack) RID() string {
|
|
return ""
|
|
}
|
|
|
|
// OnEnded sets an error handler. When a track has been created and started, if an
|
|
// error occurs, handler will get called with the error given to the parameter.
|
|
func (track *baseTrack) OnEnded(handler func(error)) {
|
|
track.mu.Lock()
|
|
track.onErrorHandler = handler
|
|
err := track.err
|
|
track.mu.Unlock()
|
|
|
|
if err != nil && handler != nil {
|
|
// Already errored.
|
|
track.endOnce.Do(func() {
|
|
handler(err)
|
|
})
|
|
}
|
|
}
|
|
|
|
// onError is a callback when an error occurs
|
|
func (track *baseTrack) onError(err error) {
|
|
track.mu.Lock()
|
|
track.err = err
|
|
handler := track.onErrorHandler
|
|
track.mu.Unlock()
|
|
|
|
if handler != nil {
|
|
track.endOnce.Do(func() {
|
|
handler(err)
|
|
})
|
|
}
|
|
}
|
|
|
|
func (track *baseTrack) bind(ctx webrtc.TrackLocalContext, specializedTrack Track) (webrtc.RTPCodecParameters, error) {
|
|
track.mu.Lock()
|
|
defer track.mu.Unlock()
|
|
|
|
signalCh := make(chan chan<- struct{})
|
|
var stopRead chan struct{}
|
|
track.activePeerConnections[ctx.ID()] = signalCh
|
|
|
|
var encodedReader RTPReadCloser
|
|
var selectedCodec webrtc.RTPCodecParameters
|
|
var err error
|
|
var errReasons []string
|
|
for _, wantedCodec := range ctx.CodecParameters() {
|
|
logger.Debugf("trying to build %s rtp reader", wantedCodec.MimeType)
|
|
encodedReader, err = specializedTrack.NewRTPReader(wantedCodec.MimeType, uint32(ctx.SSRC()), rtpOutboundMTU)
|
|
if err == nil {
|
|
selectedCodec = wantedCodec
|
|
break
|
|
}
|
|
|
|
errReasons = append(errReasons, fmt.Sprintf("%s: %s", wantedCodec.MimeType, err))
|
|
}
|
|
|
|
if encodedReader == nil {
|
|
return webrtc.RTPCodecParameters{}, errors.New(strings.Join(errReasons, "\n\n"))
|
|
}
|
|
|
|
go func() {
|
|
var doneCh chan<- struct{}
|
|
writer := ctx.WriteStream()
|
|
defer func() {
|
|
close(stopRead)
|
|
encodedReader.Close()
|
|
|
|
// When there's another call to unbind, it won't block since we mark the signalCh to be closed
|
|
close(signalCh)
|
|
if doneCh != nil {
|
|
close(doneCh)
|
|
}
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case doneCh = <-signalCh:
|
|
return
|
|
default:
|
|
}
|
|
|
|
pkts, _, err := encodedReader.Read()
|
|
if err != nil {
|
|
// explicitly ignore this error since the higher level should've reported this
|
|
return
|
|
}
|
|
|
|
for _, pkt := range pkts {
|
|
_, err = writer.WriteRTP(&pkt.Header, pkt.Payload)
|
|
if err != nil {
|
|
track.onError(err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
keyFrameController, ok := encodedReader.Controller().(codec.KeyFrameController)
|
|
if ok {
|
|
stopRead = make(chan struct{})
|
|
go func() {
|
|
reader := ctx.RTCPReader()
|
|
for {
|
|
select {
|
|
case <-stopRead:
|
|
return
|
|
default:
|
|
}
|
|
|
|
var readerBuffer []byte
|
|
_, _, err := reader.Read(readerBuffer, interceptor.Attributes{})
|
|
if err != nil {
|
|
track.onError(err)
|
|
return
|
|
}
|
|
|
|
pkts, err := rtcp.Unmarshal(readerBuffer)
|
|
|
|
for _, pkt := range pkts {
|
|
switch pkt.(type) {
|
|
case *rtcp.PictureLossIndication, *rtcp.FullIntraRequest:
|
|
if err := keyFrameController.ForceKeyFrame(); err != nil {
|
|
track.onError(err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
return selectedCodec, nil
|
|
}
|
|
|
|
func (track *baseTrack) unbind(ctx webrtc.TrackLocalContext) error {
|
|
ch, err := track.removeActivePeerConnection(ctx.ID())
|
|
if err != err {
|
|
return err
|
|
}
|
|
|
|
doneCh := make(chan struct{})
|
|
ch <- doneCh
|
|
<-doneCh
|
|
return nil
|
|
}
|
|
|
|
func (track *baseTrack) removeActivePeerConnection(id string) (chan<- chan<- struct{}, error) {
|
|
track.mu.Lock()
|
|
defer track.mu.Unlock()
|
|
|
|
ch, ok := track.activePeerConnections[id]
|
|
if !ok {
|
|
return nil, errNotFoundPeerConnection
|
|
}
|
|
delete(track.activePeerConnections, id)
|
|
|
|
return ch, nil
|
|
}
|
|
|
|
func newTrackFromDriver(d driver.Driver, constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) {
|
|
if err := d.Open(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
switch recorder := d.(type) {
|
|
case driver.VideoRecorder:
|
|
return newVideoTrackFromDriver(d, recorder, constraints, selector)
|
|
case driver.AudioRecorder:
|
|
return newAudioTrackFromDriver(d, recorder, constraints, selector)
|
|
default:
|
|
panic(errInvalidDriverType)
|
|
}
|
|
}
|
|
|
|
// VideoTrack is a specific track type that contains video source which allows multiple readers to access, and manipulate.
|
|
type VideoTrack struct {
|
|
*baseTrack
|
|
*video.Broadcaster
|
|
}
|
|
|
|
// NewVideoTrack constructs a new VideoTrack
|
|
func NewVideoTrack(source VideoSource, selector *CodecSelector) Track {
|
|
return newVideoTrackFromReader(source, source, selector)
|
|
}
|
|
|
|
func newVideoTrackFromReader(source Source, reader video.Reader, selector *CodecSelector) Track {
|
|
base := newBaseTrack(source, VideoInput, selector)
|
|
wrappedReader := video.ReaderFunc(func() (img image.Image, release func(), err error) {
|
|
img, _, err = reader.Read()
|
|
if err != nil {
|
|
base.onError(err)
|
|
}
|
|
return img, func() {}, err
|
|
})
|
|
|
|
// TODO: Allow users to configure broadcaster
|
|
broadcaster := video.NewBroadcaster(wrappedReader, nil)
|
|
|
|
return &VideoTrack{
|
|
baseTrack: base,
|
|
Broadcaster: broadcaster,
|
|
}
|
|
}
|
|
|
|
// newVideoTrackFromDriver is an internal video track creation from driver
|
|
func newVideoTrackFromDriver(d driver.Driver, recorder driver.VideoRecorder, constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) {
|
|
reader, err := recorder.VideoRecord(constraints.selectedMedia)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return newVideoTrackFromReader(d, reader, selector), nil
|
|
}
|
|
|
|
// Transform transforms the underlying source by applying the given fns in serial order
|
|
func (track *VideoTrack) Transform(fns ...video.TransformFunc) {
|
|
src := track.Broadcaster.Source()
|
|
track.Broadcaster.ReplaceSource(video.Merge(fns...)(src))
|
|
}
|
|
|
|
func (track *VideoTrack) Bind(ctx webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, error) {
|
|
return track.bind(ctx, track)
|
|
}
|
|
|
|
func (track *VideoTrack) Unbind(ctx webrtc.TrackLocalContext) error {
|
|
return track.unbind(ctx)
|
|
}
|
|
|
|
func (track *VideoTrack) newEncodedReader(codecNames ...string) (EncodedReadCloser, *codec.RTPCodec, error) {
|
|
reader := track.NewReader(false)
|
|
inputProp, err := detectCurrentVideoProp(track.Broadcaster)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
encodedReader, selectedCodec, err := track.selector.selectVideoCodecByNames(reader, inputProp, codecNames...)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
sample := newVideoSampler(selectedCodec.ClockRate)
|
|
|
|
return &encodedReadCloserImpl{
|
|
readFn: func() (EncodedBuffer, func(), error) {
|
|
data, release, err := encodedReader.Read()
|
|
buffer := EncodedBuffer{
|
|
Data: data,
|
|
Samples: sample(),
|
|
}
|
|
return buffer, release, err
|
|
},
|
|
closeFn: encodedReader.Close,
|
|
controllerFn: encodedReader.Controller,
|
|
}, selectedCodec, nil
|
|
}
|
|
|
|
func (track *VideoTrack) NewEncodedReader(codecName string) (EncodedReadCloser, error) {
|
|
reader, _, err := track.newEncodedReader(codecName)
|
|
return reader, err
|
|
}
|
|
|
|
func (track *VideoTrack) NewEncodedIOReader(codecName string) (io.ReadCloser, error) {
|
|
encodedReader, _, err := track.newEncodedReader(codecName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return newEncodedIOReadCloserImpl(encodedReader), nil
|
|
}
|
|
|
|
func (track *VideoTrack) NewRTPReader(codecName string, ssrc uint32, mtu int) (RTPReadCloser, error) {
|
|
encodedReader, selectedCodec, err := track.newEncodedReader(codecName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
packetizer := rtp.NewPacketizer(uint16(mtu), uint8(selectedCodec.PayloadType), ssrc, selectedCodec.Payloader, rtp.NewRandomSequencer(), selectedCodec.ClockRate)
|
|
|
|
return &rtpReadCloserImpl{
|
|
readFn: func() ([]*rtp.Packet, func(), error) {
|
|
encoded, release, err := encodedReader.Read()
|
|
if err != nil {
|
|
encodedReader.Close()
|
|
track.onError(err)
|
|
return nil, func() {}, err
|
|
}
|
|
defer release()
|
|
|
|
pkts := packetizer.Packetize(encoded.Data, encoded.Samples)
|
|
return pkts, release, err
|
|
},
|
|
closeFn: encodedReader.Close,
|
|
controllerFn: encodedReader.Controller,
|
|
}, nil
|
|
}
|
|
|
|
// AudioTrack is a specific track type that contains audio source which allows multiple readers to access, and
|
|
// manipulate.
|
|
type AudioTrack struct {
|
|
*baseTrack
|
|
*audio.Broadcaster
|
|
}
|
|
|
|
// NewAudioTrack constructs a new AudioTrack
|
|
func NewAudioTrack(source AudioSource, selector *CodecSelector) Track {
|
|
return newAudioTrackFromReader(source, source, selector)
|
|
}
|
|
|
|
func newAudioTrackFromReader(source Source, reader audio.Reader, selector *CodecSelector) Track {
|
|
base := newBaseTrack(source, AudioInput, selector)
|
|
wrappedReader := audio.ReaderFunc(func() (chunk wave.Audio, release func(), err error) {
|
|
chunk, _, err = reader.Read()
|
|
if err != nil {
|
|
base.onError(err)
|
|
}
|
|
return chunk, func() {}, err
|
|
})
|
|
|
|
// TODO: Allow users to configure broadcaster
|
|
broadcaster := audio.NewBroadcaster(wrappedReader, nil)
|
|
|
|
return &AudioTrack{
|
|
baseTrack: base,
|
|
Broadcaster: broadcaster,
|
|
}
|
|
}
|
|
|
|
// newAudioTrackFromDriver is an internal audio track creation from driver
|
|
func newAudioTrackFromDriver(d driver.Driver, recorder driver.AudioRecorder, constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) {
|
|
reader, err := recorder.AudioRecord(constraints.selectedMedia)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return newAudioTrackFromReader(d, reader, selector), nil
|
|
}
|
|
|
|
// Transform transforms the underlying source by applying the given fns in serial order
|
|
func (track *AudioTrack) Transform(fns ...audio.TransformFunc) {
|
|
src := track.Broadcaster.Source()
|
|
track.Broadcaster.ReplaceSource(audio.Merge(fns...)(src))
|
|
}
|
|
|
|
func (track *AudioTrack) Bind(ctx webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, error) {
|
|
return track.bind(ctx, track)
|
|
}
|
|
|
|
func (track *AudioTrack) Unbind(ctx webrtc.TrackLocalContext) error {
|
|
return track.unbind(ctx)
|
|
}
|
|
|
|
func (track *AudioTrack) newEncodedReader(codecNames ...string) (EncodedReadCloser, *codec.RTPCodec, error) {
|
|
reader := track.NewReader(false)
|
|
inputProp, err := detectCurrentAudioProp(track.Broadcaster)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
encodedReader, selectedCodec, err := track.selector.selectAudioCodecByNames(reader, inputProp, codecNames...)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
sample := newAudioSampler(selectedCodec.ClockRate, selectedCodec.Latency)
|
|
|
|
return &encodedReadCloserImpl{
|
|
readFn: func() (EncodedBuffer, func(), error) {
|
|
data, release, err := encodedReader.Read()
|
|
buffer := EncodedBuffer{
|
|
Data: data,
|
|
Samples: sample(),
|
|
}
|
|
return buffer, release, err
|
|
},
|
|
closeFn: encodedReader.Close,
|
|
}, selectedCodec, nil
|
|
}
|
|
|
|
func (track *AudioTrack) NewEncodedReader(codecName string) (EncodedReadCloser, error) {
|
|
reader, _, err := track.newEncodedReader(codecName)
|
|
return reader, err
|
|
}
|
|
|
|
func (track *AudioTrack) NewEncodedIOReader(codecName string) (io.ReadCloser, error) {
|
|
encodedReader, _, err := track.newEncodedReader(codecName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return newEncodedIOReadCloserImpl(encodedReader), nil
|
|
}
|
|
|
|
func (track *AudioTrack) NewRTPReader(codecName string, ssrc uint32, mtu int) (RTPReadCloser, error) {
|
|
encodedReader, selectedCodec, err := track.newEncodedReader(codecName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
packetizer := rtp.NewPacketizer(uint16(mtu), uint8(selectedCodec.PayloadType), ssrc, selectedCodec.Payloader, rtp.NewRandomSequencer(), selectedCodec.ClockRate)
|
|
|
|
return &rtpReadCloserImpl{
|
|
readFn: func() ([]*rtp.Packet, func(), error) {
|
|
encoded, release, err := encodedReader.Read()
|
|
if err != nil {
|
|
encodedReader.Close()
|
|
track.onError(err)
|
|
return nil, func() {}, err
|
|
}
|
|
defer release()
|
|
|
|
pkts := packetizer.Packetize(encoded.Data, encoded.Samples)
|
|
return pkts, release, err
|
|
},
|
|
closeFn: encodedReader.Close,
|
|
}, nil
|
|
}
|