interface cleanup, segment transcoder

This commit is contained in:
Eric Tang
2017-07-02 18:59:06 -04:00
parent e97eb093f0
commit d85c95aa06
15 changed files with 640 additions and 338 deletions
+14 -42
View File
@@ -121,22 +121,6 @@ func (s *SegmentStream) ReadRTMPFromStream(ctx context.Context, dst av.MuxCloser
}
time.Sleep(time.Second * 1)
}
// for {
// packet := s.GetPacket()
// if len(packet.Data) == 0 {
// glog.Info("Reached the end...")
// dst.WriteTrailer()
// return io.EOF
// }
// err := dst.WritePacket(packet)
// if err != nil {
// glog.Infof("Error writing RTMP packet from Stream %v to mux", s.StreamID)
// return err
// }
// }
}
return nil
@@ -270,41 +254,29 @@ func main() {
streamDB := &StreamDB{db: make(map[string]stream.Stream)}
lpms.HandleRTMPPublish(
//getStreamID
func(url *url.URL) (string, error) {
return getStreamIDFromPath(url.Path), nil
//makeStreamID
func(url *url.URL) (strmID string) {
return getStreamIDFromPath(url.Path)
},
//getStream
func(url *url.URL) (stream.Stream, stream.Stream, error) {
//gotStream
func(url *url.URL, rtmpStrm *stream.VideoStream) error {
streamID := getStreamIDFromPath(url.Path)
stream1 := NewSegmentStream(streamID)
stream2 := NewSegmentStream(streamID)
// stream2 := NewSegmentStream(streamID)
streamDB.db[streamID] = stream1
return stream1, stream2, nil
return nil
},
//finishStream
func(strmID1 string, strmID2 string) {
// streamID := getStreamIDFromPath(reqPath)
// delete(streamDB.db, streamID)
// tranStreamID := streamID + "_tran"
// delete(streamDB.db, tranStreamID)
//endStream
func(url *url.URL, rtmpStrm *stream.VideoStream) error {
delete(streamDB.db, rtmpStrm.GetStreamID())
return nil
})
lpms.HandleRTMPPlay(
//getStream
func(ctx context.Context, reqPath string, dst av.MuxCloser) error {
glog.Infof("Got req: ", reqPath)
// streamID := getStreamIDFromPath(reqPath)
// src := streamDB.db[streamID]
func(url *url.URL) (stream.Stream, error) {
src := copyStream(&StagedStream)
// if src != nil {
src.ReadRTMPFromStream(ctx, dst)
// } else {
// glog.Error("Cannot find stream for ", streamID)
// return stream.ErrNotFound
// }
return nil
return src, nil
})
lpms.HandleTranscode(
@@ -324,7 +296,7 @@ func main() {
fileStream := stream.NewFileStream(streamID + "_file")
return fileStream, nil
})
lpms.Start()
lpms.Start(context.Background())
}
func getStreamIDFromPath(reqPath string) string {
+66 -81
View File
@@ -3,16 +3,16 @@ package main
import (
"context"
"flag"
"fmt"
"net/http"
"net/url"
"regexp"
"strings"
"time"
"github.com/ericxtang/m3u8"
"github.com/golang/glog"
"github.com/livepeer/lpms"
"github.com/livepeer/lpms/stream"
"github.com/nareix/joy4/av"
)
type StreamDB struct {
@@ -23,6 +23,25 @@ type BufferDB struct {
db map[string]*stream.HLSBuffer
}
//Trivial method for getting the id
func getStreamID(url *url.URL) string {
if strings.HasSuffix(url.Path, "m3u8") {
return "hlsStrmID"
} else {
return "rtmpStrmID"
}
}
func getHLSSegmentName(url *url.URL) string {
var segName string
regex, _ := regexp.Compile("\\/stream\\/.*\\.ts")
match := regex.FindString(url.Path)
if match != "" {
segName = strings.Replace(match, "/stream/", "", -1)
}
return segName
}
func main() {
flag.Set("logtostderr", "true")
flag.Parse()
@@ -32,89 +51,55 @@ func main() {
bufferDB := &BufferDB{db: make(map[string]*stream.HLSBuffer)}
lpms.HandleRTMPPublish(
//getStreamID
func(url *url.URL) (string, error) {
return getStreamIDFromPath(url.Path), nil
//makeStreamID
func(url *url.URL) (strmID string) {
//Give the stream a name
return getStreamID(url)
},
//getStream
func(url *url.URL) (stream.Stream, stream.Stream, error) {
rtmpStreamID := getStreamIDFromPath(url.Path)
hlsStreamID := rtmpStreamID + "_hls"
rtmpStream := stream.NewVideoStream(rtmpStreamID, stream.RTMP)
hlsStream := stream.NewVideoStream(hlsStreamID, stream.HLS)
streamDB.db[rtmpStreamID] = rtmpStream
streamDB.db[hlsStreamID] = hlsStream
return rtmpStream, hlsStream, nil
//gotStream
func(url *url.URL, rtmpStrm *stream.VideoStream) (err error) {
//Store the stream
streamDB.db[rtmpStrm.GetStreamID()] = rtmpStrm
return nil
},
//finishStream
func(rtmpID string, hlsID string) {
delete(streamDB.db, rtmpID)
delete(streamDB.db, hlsID)
//endStream
func(url *url.URL, rtmpStrm *stream.VideoStream) error {
//Remove the stream
delete(streamDB.db, rtmpStrm.GetStreamID())
return nil
})
//No transcoding for now until segment transcoder is finished.
// lpms.HandleTranscode(
// //getInStream
// func(ctx context.Context, streamID string) (stream.Stream, error) {
// if stream := streamDB.db[streamID]; stream != nil {
// return stream, nil
// }
// return nil, stream.ErrNotFound
// },
// //getOutStream
// func(ctx context.Context, streamID string) (stream.Stream, error) {
// //For this example, we'll name the transcoded stream "{streamID}_tran"
// newStream := stream.NewVideoStream(streamID + "_tran")
// streamDB.db[newStream.GetStreamID()] = newStream
// return newStream, nil
// // glog.Infof("Making File Stream")
// // fileStream := stream.NewFileStream(streamID + "_file")
// // return fileStream, nil
// })
lpms.HandleHLSPlay(
//getHLSBuffer
func(reqPath string) (*stream.HLSBuffer, error) {
streamID := getHLSStreamIDFromPath(reqPath)
// glog.Infof("Got HTTP Req for stream: %v", streamID)
buffer := bufferDB.db[streamID]
s := streamDB.db[streamID]
if s == nil {
return nil, stream.ErrNotFound
//getMasterPlaylist
func(url *url.URL) (*m3u8.MasterPlaylist, error) {
//No need to return a masterlist unless we are doing ABS
return nil, nil
},
//getMediaPlaylist
func(url *url.URL) (*m3u8.MediaPlaylist, error) {
buf, ok := bufferDB.db[getStreamID(url)]
if !ok {
return nil, fmt.Errorf("Cannot find video")
}
if buffer == nil {
//Create the buffer and start copying the stream into the buffer
buffer = stream.NewHLSBuffer(10, 100)
bufferDB.db[streamID] = buffer
sub := stream.NewStreamSubscriber(s)
go sub.StartHLSWorker(context.Background(), time.Second*1)
err := sub.SubscribeHLS(streamID, buffer)
if err != nil {
return nil, stream.ErrStreamSubscriber
}
return buf.LatestPlaylist()
},
//getSegment
func(url *url.URL) ([]byte, error) {
buf, ok := bufferDB.db[getStreamID(url)]
if !ok {
return nil, fmt.Errorf("Cannot find video")
}
return buffer, nil
return buf.WaitAndPopSegment(context.Background(), getHLSSegmentName(url))
})
lpms.HandleRTMPPlay(
//getStream
func(ctx context.Context, reqPath string, dst av.MuxCloser) error {
glog.Infof("Got req: ", reqPath)
streamID := getStreamIDFromPath(reqPath)
src := streamDB.db[streamID]
func(url *url.URL) (stream.Stream, error) {
glog.Infof("Got req: ", url.Path)
strmID := getStreamID(url)
src := streamDB.db[strmID]
if src != nil {
src.ReadRTMPFromStream(ctx, dst)
} else {
glog.Error("Cannot find stream for ", streamID)
return stream.ErrNotFound
}
return nil
return src, nil
})
//Helper function to print out all the streams
@@ -133,13 +118,13 @@ func main() {
w.Write([]byte(str))
})
lpms.Start()
lpms.Start(context.Background())
}
func getStreamIDFromPath(reqPath string) string {
return "test"
}
// func getStreamIDFromPath(reqPath string) string {
// return "test"
// }
func getHLSStreamIDFromPath(reqPath string) string {
return "test_hls"
}
// func getHLSStreamIDFromPath(reqPath string) string {
// return "test_hls"
// }
+87 -9
View File
@@ -8,13 +8,15 @@ import (
"encoding/json"
"net/http"
"net/url"
"os"
"github.com/ericxtang/m3u8"
"github.com/golang/glog"
"github.com/livepeer/lpms/segmenter"
"github.com/livepeer/lpms/stream"
"github.com/livepeer/lpms/transcoder"
"github.com/livepeer/lpms/vidlistener"
"github.com/livepeer/lpms/vidplayer"
"github.com/nareix/joy4/av"
joy4rtmp "github.com/nareix/joy4/format/rtmp"
)
@@ -46,7 +48,7 @@ func New(rtmpPort, httpPort, ffmpegPath, vodPath string) *LPMS {
}
//Start starts the rtmp and http server
func (l *LPMS) Start() error {
func (l *LPMS) Start(ctx context.Context) error {
ec := make(chan error, 1)
go func() {
glog.Infof("Starting LPMS Server at :%v", l.rtmpServer.Addr)
@@ -61,26 +63,102 @@ func (l *LPMS) Start() error {
case err := <-ec:
glog.Infof("LPMS Server Error: %v. Quitting...", err)
return err
case <-ctx.Done():
return ctx.Err()
}
}
//HandleRTMPPublish offload to the video listener
func (l *LPMS) HandleRTMPPublish(
getStreamID func(url *url.URL) (string, error),
getStream func(url *url.URL) (stream.Stream, stream.Stream, error),
endStream func(rtmpStrmID string, hlsStrmID string)) error {
makeStreamID func(url *url.URL) (strmID string),
gotStream func(url *url.URL, rtmpStrm *stream.VideoStream) (err error),
endStream func(url *url.URL, rtmpStrm *stream.VideoStream) error) {
return l.vidListen.HandleRTMPPublish(getStreamID, getStream, endStream)
l.vidListen.HandleRTMPPublish(makeStreamID, gotStream, endStream)
}
//HandleRTMPPlay offload to the video player
func (l *LPMS) HandleRTMPPlay(getStream func(ctx context.Context, reqPath string, dst av.MuxCloser) error) error {
func (l *LPMS) HandleRTMPPlay(getStream func(url *url.URL) (stream.Stream, error)) error {
return l.vidPlayer.HandleRTMPPlay(getStream)
}
//HandleHLSPlay offload to the video player
func (l *LPMS) HandleHLSPlay(getStream func(reqPath string) (*stream.HLSBuffer, error)) error {
return l.vidPlayer.HandleHLSPlay(getStream)
func (l *LPMS) HandleHLSPlay(
getMasterPlaylist func(url *url.URL) (*m3u8.MasterPlaylist, error),
getMediaPlaylist func(url *url.URL) (*m3u8.MediaPlaylist, error),
getSegment func(url *url.URL) ([]byte, error)) {
l.vidPlayer.HandleHLSPlay(getMasterPlaylist, getMediaPlaylist, getSegment)
}
//SegmentRTMPToHLS takes a rtmp stream and re-packages it into a HLS stream with the specified segmenter options
func (l *LPMS) SegmentRTMPToHLS(ctx context.Context, rs stream.Stream, hs stream.Stream, segOptions segmenter.SegmenterOptions) error {
//Invoke Segmenter
workDir, _ := os.Getwd()
workDir = workDir + "/tmp"
localRtmpUrl := "rtmp://localhost" + l.rtmpServer.Addr + "/stream/" + rs.GetStreamID()
glog.Infof("Segment RTMP Req: %v", localRtmpUrl)
s := segmenter.NewFFMpegVideoSegmenter(workDir, hs.GetStreamID(), localRtmpUrl, segOptions.SegLength, l.ffmpegPath)
c := make(chan error, 1)
ffmpegCtx, ffmpegCancel := context.WithCancel(context.Background())
go func() { c <- s.RTMPToHLS(ffmpegCtx, segOptions, true) }()
//Kick off go routine to write HLS playlist
plCtx, plCancel := context.WithCancel(context.Background())
go func() {
c <- func() error {
for {
pl, err := s.PollPlaylist(plCtx)
if err != nil {
glog.Errorf("Got error polling playlist: %v", err)
return err
}
// glog.Infof("Writing pl: %v", pl)
hs.WriteHLSPlaylistToStream(*pl.Data)
select {
case <-plCtx.Done():
return plCtx.Err()
default:
}
}
}()
}()
//Kick off go routine to write HLS segments
segCtx, segCancel := context.WithCancel(context.Background())
go func() {
c <- func() error {
for {
seg, err := s.PollSegment(segCtx)
if err != nil {
return err
}
ss := stream.HLSSegment{SeqNo: seg.SeqNo, Data: seg.Data, Name: seg.Name, Duration: seg.Length.Seconds()}
// glog.Infof("Writing stream: %v, duration:%v, len:%v", ss.Name, ss.Duration, len(seg.Data))
hs.WriteHLSSegmentToStream(ss)
select {
case <-segCtx.Done():
return segCtx.Err()
default:
}
}
}()
}()
select {
case err := <-c:
glog.Errorf("Error segmenting stream: %v", err)
ffmpegCancel()
plCancel()
segCancel()
return err
case <-ctx.Done():
ffmpegCancel()
plCancel()
segCancel()
return ctx.Err()
}
}
//HandleTranscode kicks off a transcoding process, keeps a local HLS buffer, and returns the new stream ID.
Executable → Regular
BIN
View File
Binary file not shown.
+5
View File
@@ -22,6 +22,7 @@ import (
)
var ErrSegmenterTimeout = errors.New("SegmenterTimeout")
var ErrFFMpegSegmenter = errors.New("FFMpegSegmenterError")
var PlaylistRetryCount = 5
var PlaylistRetryWait = 500 * time.Millisecond
@@ -106,6 +107,10 @@ func (s *FFMpegVideoSegmenter) RTMPToHLS(ctx context.Context, opt SegmenterOptio
select {
case ffmpege := <-ec:
glog.Errorf("Error from ffmpeg: %v", ffmpege)
//Sometimes ffmpeg doesn't return the correct error
if ffmpege == nil {
ffmpege = ErrFFMpegSegmenter
}
return ffmpege
case <-ctx.Done():
//Can't close RTMP server, joy4 doesn't support it.
+10 -5
View File
@@ -14,6 +14,7 @@ import (
"io/ioutil"
"github.com/ericxtang/m3u8"
"github.com/golang/glog"
"github.com/livepeer/lpms/stream"
"github.com/livepeer/lpms/vidplayer"
"github.com/nareix/joy4/av"
@@ -34,7 +35,11 @@ func (s *TestStream) ReadRTMPFromStream(ctx context.Context, dst av.MuxCloser) e
fmt.Println("Error opening file: ", err)
return err
}
header, _ := file.Streams()
header, err := file.Streams()
if err != nil {
glog.Errorf("Error reading headers: %v", err)
return err
}
dst.WriteHeader(header)
for {
@@ -78,7 +83,7 @@ func TestSegmenter(t *testing.T) {
se := make(chan error, 1)
opt := SegmenterOptions{}
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*200)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
//Kick off FFMpeg to create segments
@@ -109,11 +114,11 @@ func TestSegmenter(t *testing.T) {
t.Errorf("Segment counter should start with 0. But got: %v", vs.curSegment)
}
for i := 0; i < 4; i++ {
for i := 0; i < 2; i++ {
seg, err := vs.PollSegment(ctx)
if vs.curSegment != i+1 {
t.Errorf("Segment counter should move to 1. But got: %v", vs.curSegment)
t.Errorf("Segment counter should move to %v. But got: %v", i+1, vs.curSegment)
}
if err != nil {
@@ -128,7 +133,7 @@ func TestSegmenter(t *testing.T) {
t.Errorf("Expecting HLS segment, got %v", seg.Format)
}
timeDiff := seg.Length - time.Second*2
timeDiff := seg.Length - time.Second*8
if timeDiff > time.Millisecond*500 || timeDiff < -time.Millisecond*500 {
t.Errorf("Expecting 2 sec segments, got %v", seg.Length)
}
+23 -14
View File
@@ -11,6 +11,7 @@ import (
var ErrNotFound = errors.New("Not Found")
var ErrBadHLSBuffer = errors.New("BadHLSBuffer")
var ErrEOF = errors.New("ErrEOF")
type HLSDemuxer interface {
PollPlaylist(ctx context.Context) (m3u8.MediaPlaylist, error)
@@ -22,42 +23,46 @@ type HLSMuxer interface {
WriteSegment(seqNo uint64, name string, duration float64, s []byte) error
}
//TODO: Add Master Playlist Methods
//TODO: Write tests, set buffer size, kick out segments / playlists if too full
type HLSBuffer struct {
plCache *m3u8.MediaPlaylist
sq *ConcurrentMap
lock sync.Locker
Capacity uint
masterPlCache *m3u8.MasterPlaylist
mediaPlCache *m3u8.MediaPlaylist
sq *ConcurrentMap
lock sync.Locker
Capacity uint
eof bool
}
func NewHLSBuffer(winSize, segCap uint) *HLSBuffer {
m := NewCMap()
// return &HLSBuffer{plCacheNew: false, segCache: &Queue{}, HoldTime: time.Second, sq: &m, lock: &sync.Mutex{}}
pl, _ := m3u8.NewMediaPlaylist(winSize, segCap)
return &HLSBuffer{plCache: pl, sq: &m, lock: &sync.Mutex{}, Capacity: segCap}
return &HLSBuffer{mediaPlCache: pl, sq: &m, lock: &sync.Mutex{}, Capacity: segCap}
}
func (b *HLSBuffer) WriteSegment(seqNo uint64, name string, duration float64, s []byte) error {
b.lock.Lock()
defer b.lock.Unlock()
b.sq.Set(name, &HLSSegment{SeqNo: seqNo, Name: name, Duration: duration, Data: s})
err := b.plCache.InsertSegment(seqNo, &m3u8.MediaSegment{SeqId: seqNo, Duration: duration, URI: name})
err := b.mediaPlCache.InsertSegment(seqNo, &m3u8.MediaSegment{SeqId: seqNo, Duration: duration, URI: name})
if err != nil {
return err
}
// if b.plCache.Count() > b.plCache.WinSize() { //Evit oldest segment
// toRm := b.plCache.Segments[b.plCache.Count()-b.plCache.WinSize()-1]
// // fmt.Println("Evicting %v", toRm)
// b.sq.Remove(toRm.URI)
// }
b.lock.Unlock()
return nil
}
func (b *HLSBuffer) WriteEOF() {
b.eof = true
}
func (b *HLSBuffer) LatestPlaylist() (*m3u8.MediaPlaylist, error) {
return b.plCache, nil
if b.eof {
return nil, ErrEOF
}
return b.mediaPlCache, nil
}
func (b *HLSBuffer) WaitAndPopSegment(ctx context.Context, name string) ([]byte, error) {
@@ -69,6 +74,10 @@ func (b *HLSBuffer) WaitAndPopSegment(ctx context.Context, name string) ([]byte,
}
func (b *HLSBuffer) WaitAndGetSegment(ctx context.Context, name string) ([]byte, error) {
if b.eof {
return nil, ErrEOF
}
for {
// fmt.Printf("HLSBuffer %v: segment keys: %v. Current name: %v\n", &b, b.sq.Keys(), name)
seg, found := b.sq.Get(name)
+1
View File
@@ -102,6 +102,7 @@ type VideoStream struct {
}
func (s *VideoStream) Len() int64 {
// glog.Infof("buffer.q: %v", s.buffer.q)
return s.buffer.len()
}
+72
View File
@@ -0,0 +1,72 @@
package transcoder
import (
"fmt"
"io/ioutil"
"math/rand"
"os"
"os/exec"
"path"
"time"
"github.com/golang/glog"
)
//SegmentTranscoder transcodes segments individually. This is a simple wrapper for calling FFMpeg on the command line.
type FFMpegSegmentTranscoder struct {
Bitrate string
Framerate uint
Resolution string
ffmpegPath string
workDir string
}
func NewFFMpegSegmentTranscoder(bitr string, framer uint, res string, ffmpegp string, workd string) *FFMpegSegmentTranscoder {
return &FFMpegSegmentTranscoder{Bitrate: bitr, Framerate: framer, Resolution: res, ffmpegPath: ffmpegp, workDir: workd}
}
func (t *FFMpegSegmentTranscoder) Transcode(d []byte) ([]byte, error) {
//Assume d is in the right format, write it to disk
inName := randName()
outName := fmt.Sprintf("out%v", inName)
if _, err := os.Stat(t.workDir); os.IsNotExist(err) {
err := os.Mkdir(t.workDir, 0700)
if err != nil {
glog.Errorf("Transcoder cannot create workdir: %v", err)
return nil, err
}
}
if err := ioutil.WriteFile(path.Join(t.workDir, inName), d, 0644); err != nil {
glog.Errorf("Transcoder cannot write file: %v", err)
return nil, err
}
//Invoke ffmpeg
glog.Infof("Ffmpeg path: %v", t.ffmpegPath)
var cmd *exec.Cmd
//ffmpeg -i seg.ts -c:v libx264 -s 426:240 -r 30 -mpegts_copyts 1 -minrate 700k -maxrate 700k -bufsize 700k -threads 1 out3.ts
cmd = exec.Command(path.Join(t.ffmpegPath, "ffmpeg"), "-i", path.Join(t.workDir, inName), "-c:v", "libx264", "-s", t.Resolution, "-mpegts_copyts", "1", "-minrate", t.Bitrate, "-maxrate", t.Bitrate, "-bufsize", t.Bitrate, "-r", fmt.Sprintf("%d", t.Framerate), "-threads", "1", path.Join(t.workDir, outName))
if err := cmd.Run(); err != nil {
glog.Errorf("Cannot start ffmpeg command: %v", err)
return nil, err
}
dout, err := ioutil.ReadFile(path.Join(t.workDir, outName))
if err != nil {
glog.Errorf("Cannot read transcode output: %v", err)
}
os.Remove(path.Join(t.workDir, inName))
os.Remove(path.Join(t.workDir, outName))
return dout, nil
}
func randName() string {
rand.Seed(time.Now().UnixNano())
x := make([]byte, 10, 10)
for i := 0; i < len(x); i++ {
x[i] = byte(rand.Uint32())
}
return fmt.Sprintf("%x.ts", x)
}
@@ -0,0 +1,27 @@
package transcoder
import (
"io/ioutil"
"testing"
)
func TestTrans(t *testing.T) {
testSeg, err := ioutil.ReadFile("./test.ts")
if err != nil {
t.Errorf("Error reading test segment: %v", err)
}
tr := NewFFMpegSegmentTranscoder("700k", 30, "426:240", "", "./")
r, err := tr.Transcode(testSeg)
if err != nil {
t.Errorf("Error transcoding: %v", err)
}
if r == nil {
t.Errorf("Did not get output")
}
if len(r) != 523768 {
t.Errorf("Expecting output size to be 523768, got %v", len(r))
}
}
Binary file not shown.
+31 -1
View File
@@ -24,9 +24,38 @@ type VidListener struct {
FfmpegPath string
}
func (self *VidListener) HandleRTMPPublish(
makeStreamID func(url *url.URL) (strmID string),
gotStream func(url *url.URL, rtmpStrm *stream.VideoStream) error,
endStream func(url *url.URL, rtmpStrm *stream.VideoStream) error) {
self.RtmpServer.HandlePublish = func(conn *joy4rtmp.Conn) {
glog.Infof("RTMP server got upstream: %v", conn.URL)
s := stream.NewVideoStream(makeStreamID(conn.URL), stream.RTMP)
ctx, cancel := context.WithCancel(context.Background())
ec := make(chan error)
go func() { ec <- s.WriteRTMPToStream(ctx, conn) }()
glog.Infof("Listner rtmp addr: %v", &s)
err := gotStream(conn.URL, s)
if err != nil {
glog.Errorf("Error RTMP gotStream handler: %v", err)
cancel()
return
}
select {
case err := <-ec:
endStream(conn.URL, s)
glog.Errorf("Got error writing RTMP: %v", err)
cancel()
}
}
}
//HandleRTMPPublish immediately turns the RTMP stream into segmented HLS, and writes it into a stream.
//It exposes getStreamID so the user can name the stream, and getStream so the user can keep track of all the streams.
func (self *VidListener) HandleRTMPPublish(
func (self *VidListener) HandleRTMPPublish_OLD(
getStreamID func(url *url.URL) (string, error),
getStream func(url *url.URL) (rtmpStrm stream.Stream, hlsStrm stream.Stream, err error),
endStream func(rtmpStrmID string, hlsStrmID string)) error {
@@ -69,6 +98,7 @@ func (self *VidListener) HandleRTMPPublish(
return nil
}
//I think this should be done by the application instead the SDK. It's unreasonable to expect the default LPMS behavior to segment the RTMP stream.
func (self *VidListener) segmentStream(ctx context.Context, rs stream.Stream, hs stream.Stream) error {
// //Invoke Segmenter
workDir, _ := os.Getwd()
+36 -51
View File
@@ -1,6 +1,7 @@
package vidlistener
import (
"context"
"fmt"
"net/url"
"os/exec"
@@ -8,73 +9,57 @@ import (
"time"
"github.com/livepeer/lpms/stream"
"github.com/nareix/joy4/av/pubsub"
joy4rtmp "github.com/nareix/joy4/format/rtmp"
)
func TestListener(t *testing.T) {
server := &joy4rtmp.Server{Addr: ":1937"}
listener := &VidListener{RtmpServer: server}
listener.HandleRTMPPublish(
func(url *url.URL) (string, error) {
return "test", nil
},
func(url *url.URL) (stream.Stream, stream.Stream, error) {
// return errors.New("Some Error")
return stream.NewVideoStream("test", stream.RTMP), stream.NewVideoStream("test", stream.HLS), nil
},
func(rtmpStrmID string, hlsStrmID string) {})
q := pubsub.NewQueue()
listener.HandleRTMPPublish(
//makeStreamID
func(url *url.URL) string {
return "testID"
},
//gotStream
func(url *url.URL, rtmpStrm *stream.VideoStream) (err error) {
//Read the stream into q
go rtmpStrm.ReadRTMPFromStream(context.Background(), q)
return nil
},
//endStream
func(url *url.URL, rtmpStrm *stream.VideoStream) error {
if rtmpStrm.GetStreamID() != "testID" {
t.Errorf("Expecting 'testID', found %v", rtmpStrm.GetStreamID())
}
return nil
})
//Stream test stream into the rtmp server
ffmpegCmd := "ffmpeg"
ffmpegArgs := []string{"-re", "-i", "../data/bunny2.mp4", "-c", "copy", "-f", "flv", "rtmp://localhost:1937/movie/stream"}
cmd := exec.Command(ffmpegCmd, ffmpegArgs...)
go cmd.Run()
//Start the server
go listener.RtmpServer.ListenAndServe()
//Wait for the stream to run for a little, then finish.
time.Sleep(time.Second * 1)
err := cmd.Process.Kill()
if err != nil {
fmt.Println("Error killing ffmpeg")
}
codecs, err := q.Oldest().Streams()
if err != nil || codecs == nil {
t.Errorf("Expecting codecs, got nil. Error: %v", err)
}
pkt, err := q.Oldest().ReadPacket()
if err != nil || len(pkt.Data) == 0 {
t.Errorf("Expecting pkt, got nil. Error: %v", err)
}
}
// Integration test.
// func TestRTMPWithServer(t *testing.T) {
// server := &joy4rtmp.Server{Addr: ":1936"}
// listener := &VidListener{RtmpServer: server}
// listener.HandleRTMPPublish(
// func(reqPath string) (string, error) {
// return "teststream", nil
// },
// func(reqPath string) (*lpmsio.Stream, error) {
// header, err := demux.Streams()
// if err != nil {
// t.Fatal("Failed ot read stream header")
// }
// fmt.Println("header: ", header)
// counter := 0
// fmt.Println("data: ")
// for {
// packet, err := demux.ReadPacket()
// if err != nil {
// t.Fatal("Failed to read packets")
// }
// fmt.Print("\r", len(packet.Data))
// counter = counter + 1
// }
// },
// func(reqPath string) {})
// ffmpegCmd := "ffmpeg"
// ffmpegArgs := []string{"-re", "-i", "../data/bunny2.mp4", "-c", "copy", "-f", "flv", "rtmp://localhost:1936/movie/stream"}
// go exec.Command(ffmpegCmd, ffmpegArgs...).Run()
// go listener.RtmpServer.ListenAndServe()
// time.Sleep(time.Second * 1)
// if stream := listener.Streams["teststream"]; stream.StreamID != "teststream" {
// t.Fatal("Server did not set stream")
// }
// time.Sleep(time.Second * 1)
// }
+214 -95
View File
@@ -2,9 +2,11 @@ package vidplayer
import (
"context"
"errors"
"io/ioutil"
"mime"
"net/http"
"net/url"
"path"
"path/filepath"
@@ -15,10 +17,11 @@ import (
"github.com/ericxtang/m3u8"
"github.com/golang/glog"
"github.com/livepeer/lpms/stream"
"github.com/nareix/joy4/av"
joy4rtmp "github.com/nareix/joy4/format/rtmp"
)
var ErrNotFound = errors.New("NotFound")
var ErrRTMP = errors.New("RTMP Error")
var PlaylistWaittime = 6 * time.Second
//VidPlayer is the module that handles playing video. For now we only support RTMP and HLS play.
@@ -29,62 +32,53 @@ type VidPlayer struct {
//HandleRTMPPlay is the handler when there is a RTMP request for a video. The source should write
//into the MuxCloser. The easiest way is through avutil.Copy.
func (s *VidPlayer) HandleRTMPPlay(getStream func(ctx context.Context, reqPath string, dst av.MuxCloser) error) error {
func (s *VidPlayer) HandleRTMPPlay(getStream func(url *url.URL) (stream.Stream, error)) error {
s.RtmpServer.HandlePlay = func(conn *joy4rtmp.Conn) {
glog.Infof("LPMS got RTMP request @ %v", conn.URL)
ctx := context.Background()
c := make(chan error, 1)
go func() { c <- getStream(ctx, conn.URL.Path, conn) }()
select {
case err := <-c:
glog.Errorf("Rtmp getStream Error: %v", err)
// ctx := context.Background()
// c := make(chan error, 1)
src, err := getStream(conn.URL)
if err != nil {
glog.Errorf("Error getting stream: %v", err)
return
}
err = src.ReadRTMPFromStream(context.Background(), conn)
if err != nil {
glog.Errorf("Error copying RTMP stream: %v", err)
return
}
// go func() { c <- getStream(ctx, conn.URL.Path, conn) }()
// select {
// case err := <-c:
// glog.Errorf("Rtmp getStream Error: %v", err)
// return
// }
}
return nil
}
//HandleHLSPlay is the handler when there is a HLA request. The source should write the raw bytes into the io.Writer,
//for either the playlist or the segment.
func (s *VidPlayer) HandleHLSPlay(getHLSBuffer func(reqPath string) (*stream.HLSBuffer, error)) error {
func (s *VidPlayer) HandleHLSPlay(
getMasterPlaylist func(url *url.URL) (*m3u8.MasterPlaylist, error),
getMediaPlaylist func(url *url.URL) (*m3u8.MediaPlaylist, error),
getSegment func(url *url.URL) ([]byte, error)) {
http.HandleFunc("/stream/", func(w http.ResponseWriter, r *http.Request) {
handleHLS(w, r, getHLSBuffer)
handleLive(w, r, getMasterPlaylist, getMediaPlaylist, getSegment)
})
//To play video from static files
http.HandleFunc("/vod/", func(w http.ResponseWriter, r *http.Request) {
if strings.HasSuffix(r.URL.Path, ".m3u8") {
plName := filepath.Join(s.VodPath, strings.Replace(r.URL.Path, "/vod/", "", -1))
dat, err := ioutil.ReadFile(plName)
if err != nil {
glog.Errorf("Cannot find file: %v", plName)
return
}
w.Header().Set("Content-Type", mime.TypeByExtension(path.Ext(r.URL.Path)))
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Cache-Control", "max-age=5")
w.Write(dat)
return
}
if strings.Contains(r.URL.Path, ".ts") {
segName := filepath.Join(s.VodPath, strings.Replace(r.URL.Path, "/vod/", "", -1))
dat, err := ioutil.ReadFile(segName)
if err != nil {
glog.Errorf("Cannot find file: %v", segName)
return
}
w.Header().Set("Content-Type", mime.TypeByExtension(path.Ext(r.URL.Path)))
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Write(dat)
return
}
handleVOD(r.URL, s.VodPath, w)
})
return nil
}
func handleHLS(w http.ResponseWriter, r *http.Request, getHLSBuffer func(reqPath string) (*stream.HLSBuffer, error)) {
func handleLive(w http.ResponseWriter, r *http.Request,
getMasterPlaylist func(url *url.URL) (*m3u8.MasterPlaylist, error),
getMediaPlaylist func(url *url.URL) (*m3u8.MediaPlaylist, error),
getSegment func(url *url.URL) ([]byte, error)) {
glog.Infof("LPMS got HTTP request @ %v", r.URL.Path)
w.Header().Set("Content-Type", "application/x-mpegURL")
w.Header().Set("Access-Control-Allow-Origin", "*")
@@ -95,83 +89,208 @@ func handleHLS(w http.ResponseWriter, r *http.Request, getHLSBuffer func(reqPath
http.Error(w, "LPMS only accepts HLS requests over HTTP (m3u8, ts).", 500)
}
ctx := context.Background()
buffer, err := getHLSBuffer(r.URL.Path)
if err != nil {
glog.Errorf("Error getting HLS Buffer: %v", err)
return
}
if strings.HasSuffix(r.URL.Path, ".m3u8") {
// Just an experiment to create a fake master playlist. Commenting it out for now, maybe useful later when we re-visit adaptive bitrate streaming.
// if strings.Contains(r.URL.Path, "_master") {
// pl := *m3u8.NewMasterPlaylist()
// regex, _ := regexp.Compile("\\/stream\\/([[:alpha:]]|\\d)*")
// match := regex.FindString(r.URL.Path)
// uri := strings.Replace(match, "/stream/", "", -1)
// uri = strings.Replace(uri, "_master", "", -1)
// pl.Append(uri+".m3u8", nil, m3u8.VariantParams{Bandwidth: 520929})
// w.Header().Set("Content-Type", mime.TypeByExtension(path.Ext(r.URL.Path)))
// w.Header().Set("Access-Control-Allow-Origin", "*")
// w.Header().Set("Cache-Control", "max-age=5")
// w.Write(pl.Encode().Bytes())
// return
// }
var pl *m3u8.MediaPlaylist
sleepTime := 0 * time.Millisecond
for sleepTime < PlaylistWaittime { //Try to wait a little for the first segments
pl, err = buffer.LatestPlaylist()
if pl.Count() == 0 {
time.Sleep(100 * time.Millisecond)
sleepTime = sleepTime + 100*time.Millisecond
} else {
break
//First, assume it's the master playlist
var masterPl *m3u8.MasterPlaylist
var mediaPl *m3u8.MediaPlaylist
masterPl, err := getMasterPlaylist(r.URL)
if masterPl == nil || err != nil {
//Now try the media playlist
mediaPl, err = getMediaPlaylist(r.URL)
if err != nil {
http.Error(w, "Error getting HLS playlist", 500)
return
}
}
if err != nil {
glog.Errorf("Error getting HLS playlist %v: %v", r.URL.Path, err)
return
if masterPl != nil {
_, err = w.Write(masterPl.Encode().Bytes())
} else if mediaPl != nil {
_, err = w.Write(mediaPl.Encode().Bytes())
}
// segs := ""
// for _, s := range pl.Segments {
// segs = segs + ", " + strings.Split(s.URI, "_")[1]
// }
// glog.Infof("Writing playlist seg: %v", segs)
// pl.MediaType = m3u8.EVENT
_, err = w.Write(pl.Encode().Bytes())
if err != nil {
glog.Errorf("Error writing playlist to ResponseWriter: %v", err)
return
}
if err != nil {
glog.Errorf("Error writting HLS playlist %v: %v", r.URL.Path, err)
return
}
return
}
if strings.HasSuffix(r.URL.Path, ".ts") {
pathArr := strings.Split(r.URL.Path, "/")
segName := pathArr[len(pathArr)-1]
seg, err := buffer.WaitAndGetSegment(ctx, segName)
seg, err := getSegment(r.URL)
if err != nil {
glog.Errorf("Error getting HLS segment %v: %v", segName, err)
glog.Errorf("Error getting segment %v: %v", r.URL, err)
return
}
// glog.Infof("Writing seg: %v, len:%v", segName, len(seg))
w.Header().Set("Content-Type", mime.TypeByExtension(path.Ext(r.URL.Path)))
w.Header().Set("Access-Control-Allow-Origin", "*")
_, err = w.Write(seg)
if err != nil {
glog.Errorf("Error writting HLS segment %v: %v", segName, err)
glog.Errorf("Error writting HLS segment %v: %v", r.URL, err)
return
}
return
}
http.Error(w, "Cannot find HTTP video resource: "+r.URL.Path, 500)
http.Error(w, "Cannot find HTTP video resource: "+r.URL.String(), 500)
}
func handleVOD(url *url.URL, vodPath string, w http.ResponseWriter) error {
if strings.HasSuffix(url.Path, ".m3u8") {
plName := filepath.Join(vodPath, strings.Replace(url.Path, "/vod/", "", -1))
dat, err := ioutil.ReadFile(plName)
if err != nil {
glog.Errorf("Cannot find file: %v", plName)
return ErrNotFound
}
w.Header().Set("Content-Type", mime.TypeByExtension(path.Ext(url.Path)))
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Cache-Control", "max-age=5")
w.Write(dat)
}
if strings.Contains(url.Path, ".ts") {
segName := filepath.Join(vodPath, strings.Replace(url.Path, "/vod/", "", -1))
dat, err := ioutil.ReadFile(segName)
if err != nil {
glog.Errorf("Cannot find file: %v", segName)
return ErrNotFound
}
w.Header().Set("Content-Type", mime.TypeByExtension(path.Ext(url.Path)))
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Write(dat)
}
return nil
}
// //HandleHLSPlay is the handler when there is a HLA request. The source should write the raw bytes into the io.Writer,
// //for either the playlist or the segment.
// func (s *VidPlayer) HandleHLSPlay_OLD(getHLSBuffer func(reqPath string) (*stream.HLSBuffer, error)) error {
// http.HandleFunc("/stream/", func(w http.ResponseWriter, r *http.Request) {
// handleHLS(w, r, getHLSBuffer)
// })
// //To play video from static files
// http.HandleFunc("/vod/", func(w http.ResponseWriter, r *http.Request) {
// if strings.HasSuffix(r.URL.Path, ".m3u8") {
// plName := filepath.Join(s.VodPath, strings.Replace(r.URL.Path, "/vod/", "", -1))
// dat, err := ioutil.ReadFile(plName)
// if err != nil {
// glog.Errorf("Cannot find file: %v", plName)
// return
// }
// w.Header().Set("Content-Type", mime.TypeByExtension(path.Ext(r.URL.Path)))
// w.Header().Set("Access-Control-Allow-Origin", "*")
// w.Header().Set("Cache-Control", "max-age=5")
// w.Write(dat)
// return
// }
// if strings.Contains(r.URL.Path, ".ts") {
// segName := filepath.Join(s.VodPath, strings.Replace(r.URL.Path, "/vod/", "", -1))
// dat, err := ioutil.ReadFile(segName)
// if err != nil {
// glog.Errorf("Cannot find file: %v", segName)
// return
// }
// w.Header().Set("Content-Type", mime.TypeByExtension(path.Ext(r.URL.Path)))
// w.Header().Set("Access-Control-Allow-Origin", "*")
// w.Write(dat)
// return
// }
// })
// return nil
// }
// func handleHLS(w http.ResponseWriter, r *http.Request, getHLSBuffer func(reqPath string) (*stream.HLSBuffer, error)) {
// glog.Infof("LPMS got HTTP request @ %v", r.URL.Path)
// w.Header().Set("Content-Type", "application/x-mpegURL")
// w.Header().Set("Access-Control-Allow-Origin", "*")
// w.Header().Set("Connection", "keep-alive")
// w.Header().Set("Cache-Control", "max-age=5")
// if !strings.HasSuffix(r.URL.Path, ".m3u8") && !strings.HasSuffix(r.URL.Path, ".ts") {
// http.Error(w, "LPMS only accepts HLS requests over HTTP (m3u8, ts).", 500)
// }
// ctx := context.Background()
// buffer, err := getHLSBuffer(r.URL.Path)
// if err != nil {
// glog.Errorf("Error getting HLS Buffer: %v", err)
// return
// }
// if strings.HasSuffix(r.URL.Path, ".m3u8") {
// // Just an experiment to create a fake master playlist. Commenting it out for now, maybe useful later when we re-visit adaptive bitrate streaming.
// // if strings.Contains(r.URL.Path, "_master") {
// // pl := *m3u8.NewMasterPlaylist()
// // regex, _ := regexp.Compile("\\/stream\\/([[:alpha:]]|\\d)*")
// // match := regex.FindString(r.URL.Path)
// // uri := strings.Replace(match, "/stream/", "", -1)
// // uri = strings.Replace(uri, "_master", "", -1)
// // pl.Append(uri+".m3u8", nil, m3u8.VariantParams{Bandwidth: 520929})
// // w.Header().Set("Content-Type", mime.TypeByExtension(path.Ext(r.URL.Path)))
// // w.Header().Set("Access-Control-Allow-Origin", "*")
// // w.Header().Set("Cache-Control", "max-age=5")
// // w.Write(pl.Encode().Bytes())
// // return
// // }
// var pl *m3u8.MediaPlaylist
// sleepTime := 0 * time.Millisecond
// for sleepTime < PlaylistWaittime { //Try to wait a little for the first segments
// pl, err = buffer.LatestPlaylist()
// if pl.Count() == 0 {
// time.Sleep(100 * time.Millisecond)
// sleepTime = sleepTime + 100*time.Millisecond
// } else {
// break
// }
// }
// if err != nil {
// glog.Errorf("Error getting HLS playlist %v: %v", r.URL.Path, err)
// return
// }
// // segs := ""
// // for _, s := range pl.Segments {
// // segs = segs + ", " + strings.Split(s.URI, "_")[1]
// // }
// // glog.Infof("Writing playlist seg: %v", segs)
// // pl.MediaType = m3u8.EVENT
// _, err = w.Write(pl.Encode().Bytes())
// if err != nil {
// glog.Errorf("Error writing playlist to ResponseWriter: %v", err)
// return
// }
// if err != nil {
// glog.Errorf("Error writting HLS playlist %v: %v", r.URL.Path, err)
// return
// }
// return
// }
// if strings.HasSuffix(r.URL.Path, ".ts") {
// pathArr := strings.Split(r.URL.Path, "/")
// segName := pathArr[len(pathArr)-1]
// seg, err := buffer.WaitAndGetSegment(ctx, segName)
// if err != nil {
// glog.Errorf("Error getting HLS segment %v: %v", segName, err)
// return
// }
// // glog.Infof("Writing seg: %v, len:%v", segName, len(seg))
// w.Header().Set("Content-Type", mime.TypeByExtension(path.Ext(r.URL.Path)))
// w.Header().Set("Access-Control-Allow-Origin", "*")
// _, err = w.Write(seg)
// if err != nil {
// glog.Errorf("Error writting HLS segment %v: %v", segName, err)
// return
// }
// return
// }
// http.Error(w, "Cannot find HTTP video resource: "+r.URL.Path, 500)
// }
+54 -40
View File
@@ -66,46 +66,60 @@ func TestHLS(t *testing.T) {
s.WriteHLSPlaylistToStream(m3u8.MediaPlaylist{})
s.WriteHLSSegmentToStream(stream.HLSSegment{})
var buffer *stream.HLSBuffer
player.HandleHLSPlay(func(reqPath string) (*stream.HLSBuffer, error) {
//if can't find local cache, start downloading, and store in cache.
if buffer == nil {
buffer := stream.NewHLSBuffer(10, 100)
ec := make(chan error, 1)
go func() { ec <- s.ReadHLSFromStream(context.Background(), buffer) }()
// select {
// case err := <-ec:
// return err
// }
}
return buffer, nil
player.HandleHLSPlay(
//getMasterPlaylist
func(url *url.URL) (*m3u8.MasterPlaylist, error) {
return nil, nil
},
//getMediaPlaylist
func(url *url.URL) (*m3u8.MediaPlaylist, error) {
return buffer.LatestPlaylist()
},
//getSegment
func(url *url.URL) ([]byte, error) {
return nil, nil
})
// if strings.HasSuffix(reqPath, ".m3u8") {
// pl, err := buffer.WaitAndPopPlaylist(ctx)
// if err != nil {
// return nil, err
// }
// _, err = writer.Write(pl.Encode().Bytes())
// if err != nil {
// return nil, err
// }
// return nil, nil
// }
// func(reqPath string) (*stream.HLSBuffer, error) {
// //if can't find local cache, start downloading, and store in cache.
// if buffer == nil {
// buffer := stream.NewHLSBuffer(10, 100)
// ec := make(chan error, 1)
// go func() { ec <- s.ReadHLSFromStream(context.Background(), buffer) }()
// // select {
// // case err := <-ec:
// // return err
// // }
// }
// return buffer, nil
// if strings.HasSuffix(reqPath, ".ts") {
// pathArr := strings.Split(reqPath, "/")
// segName := pathArr[len(pathArr)-1]
// seg, err := buffer.WaitAndPopSegment(ctx, segName)
// if err != nil {
// return nil, err
// }
// _, err = writer.Write(seg)
// if err != nil {
// return nil, err
// }
// }
// if strings.HasSuffix(reqPath, ".m3u8") {
// pl, err := buffer.WaitAndPopPlaylist(ctx)
// if err != nil {
// return nil, err
// }
// _, err = writer.Write(pl.Encode().Bytes())
// if err != nil {
// return nil, err
// }
// return nil, nil
// }
// return nil, lpmsio.ErrNotFound
})
// if strings.HasSuffix(reqPath, ".ts") {
// pathArr := strings.Split(reqPath, "/")
// segName := pathArr[len(pathArr)-1]
// seg, err := buffer.WaitAndPopSegment(ctx, segName)
// if err != nil {
// return nil, err
// }
// _, err = writer.Write(seg)
// if err != nil {
// return nil, err
// }
// }
// return nil, lpmsio.ErrNotFound
// })
// go http.ListenAndServe(":8000", nil)
@@ -142,9 +156,9 @@ func TestHandleHLS(t *testing.T) {
testBuf.WriteSegment(3, "url_3.ts", 2, []byte{0, 0})
testBuf.WriteSegment(4, "url_4.ts", 2, []byte{0, 0})
handleHLS(rw, req, func(reqPath string) (*stream.HLSBuffer, error) {
return testBuf, nil
})
// HandleHLSPlay(rw, req, func(reqPath string) (*stream.HLSBuffer, error) {
// return testBuf, nil
// })
p1, _ := m3u8.NewMediaPlaylist(10, 10)
err := p1.DecodeFrom(bytes.NewReader(pl.Encode().Bytes()), true)