diff --git a/segmenter/video_segmenter_test.go b/segmenter/video_segmenter_test.go index 49a22c3d1f..9375faa3d5 100644 --- a/segmenter/video_segmenter_test.go +++ b/segmenter/video_segmenter_test.go @@ -59,6 +59,8 @@ func (s *TestStream) WriteHLSPlaylistToStream(pl m3u8.MediaPlaylist) error func (s *TestStream) WriteHLSSegmentToStream(seg stream.HLSSegment) error { return nil } func (s *TestStream) ReadHLSFromStream(ctx context.Context, buffer stream.HLSMuxer) error { return nil } func (s *TestStream) ReadHLSSegment() (stream.HLSSegment, error) { return stream.HLSSegment{}, nil } +func (s *TestStream) Width() int { return 0 } +func (s *TestStream) Height() int { return 0 } func TestSegmenter(t *testing.T) { wd, _ := os.Getwd() diff --git a/stream/basic_hls_video_manifest.go b/stream/basic_hls_video_manifest.go index 7ee817e6df..07a17134fb 100644 --- a/stream/basic_hls_video_manifest.go +++ b/stream/basic_hls_video_manifest.go @@ -2,6 +2,8 @@ package stream import ( "errors" + "fmt" + "strings" "github.com/ericxtang/m3u8" "github.com/golang/glog" @@ -11,18 +13,18 @@ var ErrVideoManifest = errors.New("ErrVideoManifest") type BasicHLSVideoManifest struct { streamMap map[string]HLSVideoStream + variantMap map[string]*m3u8.Variant manifestCache *m3u8.MasterPlaylist id string - winSize uint } -func NewBasicHLSVideoManifest(id string, wSize uint) *BasicHLSVideoManifest { +func NewBasicHLSVideoManifest(id string) *BasicHLSVideoManifest { pl := m3u8.NewMasterPlaylist() return &BasicHLSVideoManifest{ streamMap: make(map[string]HLSVideoStream), + variantMap: make(map[string]*m3u8.Variant), manifestCache: pl, id: id, - winSize: wSize, } } @@ -42,26 +44,53 @@ func (m *BasicHLSVideoManifest) GetVideoStream(strmID string) (HLSVideoStream, e return strm, nil } -func (m *BasicHLSVideoManifest) AddVideoStream(strmID string, variant *m3u8.Variant) (*BasicHLSVideoStream, error) { - _, ok := m.streamMap[strmID] +func (m *BasicHLSVideoManifest) GetVideoStreams() []HLSVideoStream { + res := []HLSVideoStream{} + for _, s := range m.streamMap { + res = append(res, s) + } + return res +} + +func (m *BasicHLSVideoManifest) AddVideoStream(strm HLSVideoStream, variant *m3u8.Variant) error { + _, ok := m.streamMap[strm.GetStreamID()] if ok { - return nil, ErrVideoManifest + return ErrVideoManifest } //Check if the same Bandwidth & Resolution already exists - for _, strm := range m.streamMap { - v := strm.GetStreamVariant() + for _, v := range m.variantMap { + // v := mStrm.GetStreamVariant() if v.Bandwidth == variant.Bandwidth && v.Resolution == variant.Resolution { + // if v.Bandwidth == strm.GetStreamVariant().Bandwidth && v.Resolution == strm.GetStreamVariant().Resolution { glog.Errorf("Variant with Bandwidth %v and Resolution %v already exists", v.Bandwidth, v.Resolution) - return nil, ErrVideoManifest + return ErrVideoManifest } } //Add to the map + // m.manifestCache.Append(strm.GetStreamVariant().URI, strm.GetStreamVariant().Chunklist, strm.GetStreamVariant().VariantParams) m.manifestCache.Append(variant.URI, variant.Chunklist, variant.VariantParams) - strm := NewBasicHLSVideoStream(strmID, variant, m.winSize) - m.streamMap[strmID] = strm - return strm, nil + m.streamMap[strm.GetStreamID()] = strm + m.variantMap[strm.GetStreamID()] = variant + return nil +} + +func (m *BasicHLSVideoManifest) GetStreamVariant(strmID string) (*m3u8.Variant, error) { + //Try from the variant map + v, ok := m.variantMap[strmID] + if ok { + return v, nil + } + + //Try from the playlist itself + for _, v := range m.manifestCache.Variants { + vsid := strings.Split(v.URI, ".")[0] + if vsid == strmID { + return v, nil + } + } + return nil, ErrNotFound } func (m *BasicHLSVideoManifest) DeleteVideoStream(strmID string) error { @@ -69,4 +98,6 @@ func (m *BasicHLSVideoManifest) DeleteVideoStream(strmID string) error { return nil } -func (m *BasicHLSVideoManifest) String() string { return "" } +func (m BasicHLSVideoManifest) String() string { + return fmt.Sprintf("id:%v, streams:%v", m.id, m.streamMap) +} diff --git a/stream/basic_hls_video_test.go b/stream/basic_hls_video_test.go index 3a2a69979d..008c993571 100644 --- a/stream/basic_hls_video_test.go +++ b/stream/basic_hls_video_test.go @@ -7,9 +7,9 @@ import ( ) func TestAddAndRemove(t *testing.T) { - manifest := NewBasicHLSVideoManifest("test_m", 3) - strm, err := manifest.AddVideoStream("test_s", &m3u8.Variant{URI: "test_s", Chunklist: nil, VariantParams: m3u8.VariantParams{Bandwidth: 100}}) - if err != nil { + manifest := NewBasicHLSVideoManifest("test_m") + strm := NewBasicHLSVideoStream("test_s", DefaultHLSStreamWin) + if err := manifest.AddVideoStream(strm, &m3u8.Variant{URI: "test_s", Chunklist: nil, VariantParams: m3u8.VariantParams{Bandwidth: 100}}); err != nil { t.Errorf("Error: %v", err) } ml, err := manifest.GetManifest() @@ -65,12 +65,12 @@ func TestAddAndRemove(t *testing.T) { //Add a stream pl, _ = m3u8.NewMediaPlaylist(3, 10) - _, err = manifest.AddVideoStream("test2", &m3u8.Variant{URI: "test2.m3u8", Chunklist: pl, VariantParams: m3u8.VariantParams{Bandwidth: 10, Resolution: "10x10"}}) - if err != nil { + strm2 := NewBasicHLSVideoStream("test2", DefaultHLSStreamWin) + if err := manifest.AddVideoStream(strm2, &m3u8.Variant{URI: "test2.m3u8", Chunklist: pl, VariantParams: m3u8.VariantParams{Bandwidth: 10, Resolution: "10x10"}}); err != nil { t.Errorf("Error adding media playlist: %v", err) } - _, err = manifest.AddVideoStream("test3", &m3u8.Variant{URI: "test3.m3u8", Chunklist: pl, VariantParams: m3u8.VariantParams{Bandwidth: 10, Resolution: "10x10"}}) - if err == nil { + strm3 := NewBasicHLSVideoStream("test3", DefaultHLSStreamWin) + if err := manifest.AddVideoStream(strm3, &m3u8.Variant{URI: "test3.m3u8", Chunklist: pl, VariantParams: m3u8.VariantParams{Bandwidth: 10, Resolution: "10x10"}}); err == nil { t.Errorf("Expecting error because of duplicate variant params") } vstrm, err := manifest.GetVideoStream("wrongName") diff --git a/stream/basic_hls_videostream.go b/stream/basic_hls_videostream.go index f228f8d017..45065d2f26 100644 --- a/stream/basic_hls_videostream.go +++ b/stream/basic_hls_videostream.go @@ -11,19 +11,18 @@ import ( "github.com/livepeer/go-livepeer/common" ) -const DefaultMediaPlLen = uint(500) +const DefaultHLSStreamCap = uint(500) +const DefaultHLSStreamWin = uint(3) // const DefaultMediaWinLen = uint(5) const DefaultSegWaitTime = time.Second * 10 const SegWaitInterval = time.Second -var ErrAddVariant = errors.New("ErrAddVariant") var ErrAddHLSSegment = errors.New("ErrAddHLSSegment") //BasicHLSVideoStream is a basic implementation of HLSVideoStream type BasicHLSVideoStream struct { plCache *m3u8.MediaPlaylist //StrmID -> MediaPlaylist - variant *m3u8.Variant sqMap map[string]*HLSSegment lock sync.Locker strmID string @@ -31,15 +30,15 @@ type BasicHLSVideoStream struct { winSize uint } -func NewBasicHLSVideoStream(strmID string, variant *m3u8.Variant, wSize uint) *BasicHLSVideoStream { - pl, err := m3u8.NewMediaPlaylist(wSize, DefaultMediaPlLen) +func NewBasicHLSVideoStream(strmID string, wSize uint) *BasicHLSVideoStream { + pl, err := m3u8.NewMediaPlaylist(wSize, DefaultHLSStreamCap) if err != nil { return nil } return &BasicHLSVideoStream{ plCache: pl, - variant: variant, + // variant: variant, sqMap: make(map[string]*HLSSegment), lock: &sync.Mutex{}, strmID: strmID, @@ -58,7 +57,7 @@ func (s *BasicHLSVideoStream) GetStreamID() string { return s.strmID } //GetStreamFormat always returns HLS func (s *BasicHLSVideoStream) GetStreamFormat() VideoFormat { return HLS } -//GetVariantPlaylist returns the media playlist represented by the streamID +//GetStreamPlaylist returns the media playlist represented by the streamID func (s *BasicHLSVideoStream) GetStreamPlaylist() (*m3u8.MediaPlaylist, error) { if s.plCache.Count() < s.winSize { return nil, nil @@ -67,9 +66,9 @@ func (s *BasicHLSVideoStream) GetStreamPlaylist() (*m3u8.MediaPlaylist, error) { return s.plCache, nil } -func (s *BasicHLSVideoStream) GetStreamVariant() *m3u8.Variant { - return s.variant -} +// func (s *BasicHLSVideoStream) GetStreamVariant() *m3u8.Variant { +// return s.variant +// } //GetHLSSegment gets the HLS segment. It blocks until something is found, or timeout happens. func (s *BasicHLSVideoStream) GetHLSSegment(segName string) (*HLSSegment, error) { diff --git a/stream/basic_rtmp_videostream.go b/stream/basic_rtmp_videostream.go index 21cf5b603b..8adcbd4239 100644 --- a/stream/basic_rtmp_videostream.go +++ b/stream/basic_rtmp_videostream.go @@ -16,6 +16,7 @@ type BasicRTMPVideoStream struct { streamID string buffer *streamBuffer RTMPTimeout time.Duration + header []av.CodecData } //NewBasicRTMPVideoStream creates a new BasicRTMPVideoStream. The default RTMPTimeout is set to 10 milliseconds because we assume all RTMP streams are local. @@ -77,6 +78,13 @@ func (s *BasicRTMPVideoStream) ReadRTMPFromStream(ctx context.Context, dst av.Mu func (s *BasicRTMPVideoStream) WriteRTMPToStream(ctx context.Context, src av.DemuxCloser) error { defer src.Close() + //Set header in case we want to use it. + h, err := src.Streams() + if err != nil { + return err + } + s.header = h + c := make(chan error, 1) go func() { c <- func() error { @@ -125,3 +133,23 @@ func (s *BasicRTMPVideoStream) WriteRTMPToStream(ctx context.Context, src av.Dem func (s BasicRTMPVideoStream) String() string { return fmt.Sprintf("StreamID: %v, Type: %v", s.GetStreamID(), s.GetStreamFormat()) } + +func (s BasicRTMPVideoStream) Height() int { + for _, cd := range s.header { + if cd.Type().IsVideo() { + return cd.(av.VideoCodecData).Height() + } + } + + return 0 +} + +func (s BasicRTMPVideoStream) Width() int { + for _, cd := range s.header { + if cd.Type().IsVideo() { + return cd.(av.VideoCodecData).Width() + } + } + + return 0 +} diff --git a/stream/basic_rtmp_videostream_test.go b/stream/basic_rtmp_videostream_test.go index 87550facdd..275a30eafd 100644 --- a/stream/basic_rtmp_videostream_test.go +++ b/stream/basic_rtmp_videostream_test.go @@ -5,8 +5,10 @@ import ( "errors" "io" "testing" + "time" "github.com/nareix/joy4/av" + "github.com/nareix/joy4/codec/h264parser" ) //Testing WriteRTMP errors @@ -69,8 +71,10 @@ type PacketsDemuxer struct { c *Counter } -func (d PacketsDemuxer) Close() error { return nil } -func (d PacketsDemuxer) Streams() ([]av.CodecData, error) { return nil, nil } +func (d PacketsDemuxer) Close() error { return nil } +func (d PacketsDemuxer) Streams() ([]av.CodecData, error) { + return []av.CodecData{h264parser.CodecData{}}, nil +} func (d PacketsDemuxer) ReadPacket() (av.Packet, error) { if d.c.Count == 10 { return av.Packet{Data: []byte{0, 0}}, io.EOF @@ -93,6 +97,18 @@ func TestWriteBasicRTMP(t *testing.T) { t.Error("Expecting buffer length to be 12, but got: ", stream.buffer.len()) } + start := time.Now() + for time.Since(start) < time.Second { + if len(stream.header) == 0 { + time.Sleep(time.Millisecond * 100) + continue + } + break + } + if len(stream.header) == 0 { + t.Errorf("Expecting header to be set") + } + // fmt.Println(stream.buffer.q.Get(12)) //TODO: Test what happens when the buffer is full (should evict everything before the last keyframe) diff --git a/stream/interface.go b/stream/interface.go index 28e7af3d14..21796e03f2 100644 --- a/stream/interface.go +++ b/stream/interface.go @@ -13,14 +13,21 @@ type VideoStream interface { String() string } -type HLSVideoManifest interface { +type VideoManifest interface { GetManifestID() string GetVideoFormat() VideoFormat + String() string +} + +type HLSVideoManifest interface { + VideoManifest GetManifest() (*m3u8.MasterPlaylist, error) GetVideoStream(strmID string) (HLSVideoStream, error) - AddVideoStream(strmID string, variant *m3u8.Variant) error + // AddVideoStream(strmID string, variant *m3u8.Variant) (HLSVideoStream, error) + AddVideoStream(strm HLSVideoStream, variant *m3u8.Variant) error + GetStreamVariant(strmID string) (*m3u8.Variant, error) + GetVideoStreams() []HLSVideoStream DeleteVideoStream(strmID string) error - String() string } //HLSVideoStream contains the master playlist, media playlists in it, and the segments in them. Each media playlist also has a streamID. @@ -28,7 +35,7 @@ type HLSVideoManifest interface { type HLSVideoStream interface { VideoStream GetStreamPlaylist() (*m3u8.MediaPlaylist, error) - GetStreamVariant() *m3u8.Variant + // GetStreamVariant() *m3u8.Variant GetHLSSegment(segName string) (*HLSSegment, error) AddHLSSegment(seg *HLSSegment) error SetSubscriber(f func(seg *HLSSegment, eof bool)) @@ -39,4 +46,6 @@ type RTMPVideoStream interface { VideoStream ReadRTMPFromStream(ctx context.Context, dst av.MuxCloser) error WriteRTMPToStream(ctx context.Context, src av.DemuxCloser) error + Height() int + Width() int } diff --git a/vidlistener/listener.go b/vidlistener/listener.go index a664625ef2..2e9f448c01 100644 --- a/vidlistener/listener.go +++ b/vidlistener/listener.go @@ -32,7 +32,11 @@ func (self *VidListener) HandleRTMPPublish( self.RtmpServer.HandlePublish = func(conn *joy4rtmp.Conn) { glog.V(2).Infof("RTMP server got upstream: %v", conn.URL) - s := stream.NewBasicRTMPVideoStream(makeStreamID(conn.URL)) + strmID := makeStreamID(conn.URL) + if strmID == "" { + return + } + s := stream.NewBasicRTMPVideoStream(strmID) ctx, cancel := context.WithCancel(context.Background()) ec := make(chan error) go func() { ec <- s.WriteRTMPToStream(ctx, conn) }() diff --git a/vidplayer/player.go b/vidplayer/player.go index 73d6cbfa1f..5e26b190a8 100644 --- a/vidplayer/player.go +++ b/vidplayer/player.go @@ -22,8 +22,10 @@ import ( joy4rtmp "github.com/nareix/joy4/format/rtmp" ) -var ErrNotFound = errors.New("NotFound") -var ErrRTMP = errors.New("RTMP Error") +var ErrMasterPlaylistNotFound = errors.New("ErrMasterPlaylistNotFound") +var ErrNotMasterPlaylistID = errors.New("ErrNotMasterPlaylistID") +var ErrRTMP = errors.New("ErrRTMP") +var ErrHLS = errors.New("ErrHLS") var PlaylistWaittime = 6 * time.Second //VidPlayer is the module that handles playing video. For now we only support RTMP and HLS play. @@ -33,7 +35,7 @@ type VidPlayer struct { VodPath string } -func defaultRtmpPlayHandler(url *url.URL) (stream.RTMPVideoStream, error) { return nil, ErrNotFound } +func defaultRtmpPlayHandler(url *url.URL) (stream.RTMPVideoStream, error) { return nil, ErrRTMP } //NewVidPlayer creates a new video player func NewVidPlayer(rtmpS *joy4rtmp.Server, vodPath string) *VidPlayer { @@ -102,11 +104,13 @@ func handleLive(w http.ResponseWriter, r *http.Request, //Could be a master playlist, or a media playlist var masterPl *m3u8.MasterPlaylist var mediaPl *m3u8.MediaPlaylist - masterPl, err := getMasterPlaylist(r.URL) + masterPl, err := getMasterPlaylist(r.URL) //Return ErrNotMasterPlaylistID to by past the error case if err != nil { - glog.Errorf("Error getting HLS master playlist: %v", err) - http.Error(w, "Error getting HLS master playlist", 500) - return + if err != ErrNotMasterPlaylistID { + glog.Errorf("Error getting HLS master playlist: %v", err) + http.Error(w, "Error getting HLS master playlist", 500) + return + } } if masterPl != nil && len(masterPl.Variants) > 0 { w.Header().Set("Connection", "keep-alive") @@ -154,7 +158,7 @@ func handleVOD(url *url.URL, vodPath string, w http.ResponseWriter) error { dat, err := ioutil.ReadFile(plName) if err != nil { glog.Errorf("Cannot find file: %v", plName) - return ErrNotFound + return ErrHLS } w.Header().Set("Content-Type", mime.TypeByExtension(path.Ext(url.Path))) w.Header().Set("Access-Control-Allow-Origin", "*") @@ -167,7 +171,7 @@ func handleVOD(url *url.URL, vodPath string, w http.ResponseWriter) error { dat, err := ioutil.ReadFile(segName) if err != nil { glog.Errorf("Cannot find file: %v", segName) - return ErrNotFound + return ErrHLS } w.Header().Set("Content-Type", mime.TypeByExtension(path.Ext(url.Path))) w.Header().Set("Access-Control-Allow-Origin", "*")