diff --git a/README.md b/README.md index e1fdbf8c30..458eb7ee79 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,167 @@ # LPMS - Livepeer media server -This is meant to be a standalone server, but at the moment it is -coupled with the go-livepeer repo. For the time being development will -proceed at the -[github.com/livepeer/go-livepeer/lpms package](https://github.com/livepeer/go-livepeer/lpms). +LPMS is a media server that can run independantly, or on top of the [Livepeer](https://livepeer.org) +network. It allows you to manipulate / broadcast a live video stream. Currently, LPMS supports RTMP +as input format and RTMP/HLS as output formats. + +LPMS can be integrated into another service, or run as a standalone service. To try LPMS as a +standalone service, simply get the package: +``` +go get github.com/livepeer/lpms +``` + +Go to the lpms root directory, and run +``` +./lpms +``` + +### Requirements + +LPMS requires ffmpeg. To install it on OSX, use homebrew. As a part of this installation, `ffmpeg` and `ffplay` should be installed as commandline utilities. + +``` +//This may take a few minutes +brew install ffmpeg --with-fdk-aac --with-ffplay --with-freetype --with-libass --with-libquvi --with-libvorbis --with-libvpx --with-opus --with-x265 +``` + +LPMS uses [SRS](http://ossrs.net/srs.release/releases/) as a transcoding backend. It's included in +the `/bin` directory for testing purposes. Make sure you are running SRS before testing out LPMS. + +To start srs, run +``` +./bin/srs -c ./bin/srs.conf +``` + +### Testing out LPMS + +The test LPMS server exposes a few different endpoints: +1. `rtmp://localhost:1936/stream/test` for uploading/viewing RTMP video stream. +2. `http://localhost:8000/transcode` for issuing transcode request. +3. `http://localhost:8000/stream/test_tran.m3u8` for consuming the transcoded video. + +Do the following steps to view a live stream video: +1. Upload an RTMP video stream to `rtmp://localhost:1936/stream/test`. We recommend using [OBS](https://obsproject.com/download). + + +![OBS Screenshot](https://s3.amazonaws.com/livepeer/obs_screenshot.png) + + +2. If you have successfully uploaded the stream, you should see something like this in the LPMS output +``` +I0324 09:44:14.639405 80673 listener.go:28] RTMP server got upstream +I0324 09:44:14.639429 80673 listener.go:42] Got RTMP Stream: test +``` +3. Now you have a RTMP video stream running, we can view it from the server. Simply run `ffplay rtmp://localhost:1936/stream/test`, you should see the rtmp video playback. +4. Let's transcode the video to HLS. Before issuing the transcoding request, make sure your SRS is running. +``` +//To start SRS +./bin/srs -c ./bin/srs.conf +``` + +![SRS Screenshot](https://s3.amazonaws.com/livepeer/srs_screenshot.png) + +5. To issue the transcoding request, we can use curl. +``` +curl -H "Content-Type: application/json" -X POST -d '{"StreamID":"test"}' http://localhost:8000/transcode +``` + +5. You should see your SRS console start logging. Now just open up `hlsVideo.html` in Safari, and you should see the HLS video. There may be a delay due to the video transcoding - we'll expose more parameters to lower that delay in the future. Note that in typical internet broadcasting today, there is usually a delay of 30 - 90 seconds. + + +### Integrating LPMS + +LPMS exposes a few different methods for customization. As an example, take a look at `cmd/lpms.go`. + +To create a new LPMS server: +``` +//Specify ports you want the server to run on, and which port SRS is running on (should be specified) +//in srs.conf +lpms := lpms.New("1936", "8000", "2436", "7936") +``` + +To handle RTMP publish: +``` +lpms.HandleRTMPPublish( + //getStreamID + func(reqPath string) (string, error) { + return getStreamIDFromPath(reqPath), nil + }, + //getStream + func(reqPath string) (*stream.Stream, error) { + streamID := getStreamIDFromPath(reqPath) + stream := stream.NewStream(streamID) + streamDB.db[streamID] = stream + return stream, nil + }, + //finishStream + func(reqPath string) { + delete(streamDB.db, getStreamIDFromPath(reqPath)) + }) +``` + +To handle RTMP playback: +``` +lpms.HandleRTMPPlay( + //getStream + func(ctx context.Context, reqPath string, dst av.MuxCloser) error { + glog.Infof("Got req: ", reqPath) + streamID := getStreamIDFromPath(reqPath) + src := streamDB.db[streamID] + + if src != nil { + src.ReadRTMPFromStream(ctx, dst) + } else { + glog.Error("Cannot find stream for ", streamID) + return stream.ErrNotFound + } + return nil + }) +``` + +To handle transcode request: +``` +lpms.HandleTranscode( + //getInStream + func(ctx context.Context, streamID string) (*stream.Stream, error) { + if stream := streamDB.db[streamID]; stream != nil { + return stream, nil + } + + return nil, stream.ErrNotFound + }, + //getOutStream + func(ctx context.Context, streamID string) (*stream.Stream, error) { + //For this example, we'll name the transcoded stream "{streamID}_tran" + newStream := stream.NewStream(streamID + "_tran") + streamDB.db[newStream.StreamID] = newStream + return newStream, nil + }) +``` + +To handle HLS playback: +``` +lpms.HandleHLSPlay( + //getHLSBuffer + func(reqPath string) (*stream.HLSBuffer, error) { + streamID := getHLSStreamIDFromPath(reqPath) + glog.Infof("Got HTTP Req for stream: %v", streamID) + buffer := bufferDB.db[streamID] + s := streamDB.db[streamID] + + if s == nil { + return nil, stream.ErrNotFound + } + + if buffer == nil { + //Create the buffer and start copying the stream into the buffer + buffer = stream.NewHLSBuffer() + bufferDB.db[streamID] = buffer + ec := make(chan error, 1) + go func() { ec <- s.ReadHLSFromStream(buffer) }() + } + return buffer, nil + + }) +``` + +You can follow the development of LPMS and Livepeer @ our [forum](http://forum.livepeer.org) diff --git a/bin/srs.conf b/bin/srs.conf new file mode 100755 index 0000000000..4cd177ac64 --- /dev/null +++ b/bin/srs.conf @@ -0,0 +1,64 @@ +listen 2435; +max_connections 200; +daemon off; +srs_log_tank console; +http_server { + enabled on; + listen 7935; + dir ./objs/nginx/html; +} + +vhost __defaultVhost__ { + hls { + enabled on; + hls_fragment 10; + hls_window 60; + hls_path ./objs/nginx/html; + hls_m3u8_file [app]/[stream].m3u8; + hls_ts_file [app]/[stream]-[seq].ts; + } + + transcode { + enabled on; + ffmpeg /usr/local/bin/ffmpeg; + engine hls500 { + enabled on; + vfilter { + } + vcodec libx264; + vbitrate 500; + vfps 25; + vwidth 720; + vheight 480; + vthreads 12; + vprofile main; + vpreset medium; + vparams { + } + acodec libfdk_aac; + aparams { + } + output rtmp://127.0.0.1:[port]/[app]?vhost=[vhost]/[stream]_[engine]; + } + engine hls1000 { + enabled on; + vfilter { + } + vcodec libx264; + vbitrate 1000; + vfps 25; + vwidth 720; + vheight 480; + vthreads 12; + vprofile main; + vpreset medium; + vparams { + } + acodec libfdk_aac; + aparams { + } + output rtmp://127.0.0.1:[port]/[app]?vhost=[vhost]/[stream]_[engine]; + } + } +} + diff --git a/cmd/lpms.go b/cmd/lpms.go new file mode 100644 index 0000000000..0412a0e4d6 --- /dev/null +++ b/cmd/lpms.go @@ -0,0 +1,138 @@ +package main + +import ( + "context" + "flag" + "net/http" + "strings" + + "github.com/golang/glog" + "github.com/livepeer/lpms" + "github.com/livepeer/lpms/stream" + + "github.com/nareix/joy4/av" +) + +type StreamDB struct { + db map[string]*stream.Stream +} + +type BufferDB struct { + db map[string]*stream.HLSBuffer +} + +func main() { + flag.Set("logtostderr", "true") + flag.Parse() + + lpms := lpms.New("1935", "8000", "2435", "7935") + streamDB := &StreamDB{db: make(map[string]*stream.Stream)} + bufferDB := &BufferDB{db: make(map[string]*stream.HLSBuffer)} + + lpms.HandleRTMPPublish( + //getStreamID + func(reqPath string) (string, error) { + return getStreamIDFromPath(reqPath), nil + }, + //getStream + func(reqPath string) (*stream.Stream, error) { + streamID := getStreamIDFromPath(reqPath) + stream := stream.NewStream(streamID) + streamDB.db[streamID] = stream + return stream, nil + }, + //finishStream + func(reqPath string) { + streamID := getStreamIDFromPath(reqPath) + delete(streamDB.db, streamID) + tranStreamID := streamID + "_tran" + delete(streamDB.db, tranStreamID) + }) + + lpms.HandleTranscode( + //getInStream + func(ctx context.Context, streamID string) (*stream.Stream, error) { + if stream := streamDB.db[streamID]; stream != nil { + return stream, nil + } + + return nil, stream.ErrNotFound + }, + //getOutStream + func(ctx context.Context, streamID string) (*stream.Stream, error) { + //For this example, we'll name the transcoded stream "{streamID}_tran" + newStream := stream.NewStream(streamID + "_tran") + streamDB.db[newStream.StreamID] = newStream + return newStream, nil + }) + + lpms.HandleHLSPlay( + //getHLSBuffer + func(reqPath string) (*stream.HLSBuffer, error) { + streamID := getHLSStreamIDFromPath(reqPath) + glog.Infof("Got HTTP Req for stream: %v", streamID) + buffer := bufferDB.db[streamID] + s := streamDB.db[streamID] + + if s == nil { + return nil, stream.ErrNotFound + } + + if buffer == nil { + //Create the buffer and start copying the stream into the buffer + buffer = stream.NewHLSBuffer() + bufferDB.db[streamID] = buffer + ec := make(chan error, 1) + go func() { ec <- s.ReadHLSFromStream(buffer) }() + //May want to handle the error here + } + return buffer, nil + + }) + + lpms.HandleRTMPPlay( + //getStream + func(ctx context.Context, reqPath string, dst av.MuxCloser) error { + glog.Infof("Got req: ", reqPath) + streamID := getStreamIDFromPath(reqPath) + src := streamDB.db[streamID] + + if src != nil { + src.ReadRTMPFromStream(ctx, dst) + } else { + glog.Error("Cannot find stream for ", streamID) + return stream.ErrNotFound + } + return nil + }) + + //Helper function to print out all the streams + http.HandleFunc("/streams", func(w http.ResponseWriter, r *http.Request) { + streams := []string{} + + for k, _ := range streamDB.db { + streams = append(streams, k) + } + + if len(streams) == 0 { + w.Write([]byte("no streams")) + return + } + str := strings.Join(streams, ",") + w.Write([]byte(str)) + }) + + lpms.Start() +} + +func getStreamIDFromPath(reqPath string) string { + return "test" +} + +func getHLSStreamIDFromPath(reqPath string) string { + if strings.HasSuffix(reqPath, ".m3u8") { + return "test_tran" + } else { + return "test_tran" + } +} diff --git a/common/common.go b/common/common.go deleted file mode 100644 index 13589e439b..0000000000 --- a/common/common.go +++ /dev/null @@ -1,32 +0,0 @@ -package common - -import "sync" - -type config struct { - SrsRTMPPort string - SrsHTTPPort string - LpmsRTMPPort string - LpmsHTTPPort string -} - -var instance *config -var once sync.Once - -func GetConfig() *config { - once.Do(func() { - instance = &config{} - }) - return instance -} - -func SetConfig(srsRTMPPort string, srsHTTPPort string, lpmsRTMPPOrt string, lpmsHTTPPort string) { - c := GetConfig() - c.LpmsHTTPPort = lpmsHTTPPort - c.LpmsRTMPPort = lpmsRTMPPOrt - c.SrsHTTPPort = srsHTTPPort - c.SrsRTMPPort = srsRTMPPort -} - -// func (self *Config) GetSrsRTMPPort() string { -// return self.SrsRTMPPort -// } diff --git a/hlsVideo.html b/hlsVideo.html new file mode 100644 index 0000000000..ff6d002073 --- /dev/null +++ b/hlsVideo.html @@ -0,0 +1,9 @@ + + + + + LivePeer + + + + diff --git a/io/io.go b/io/io.go deleted file mode 100644 index f69a496386..0000000000 --- a/io/io.go +++ /dev/null @@ -1,378 +0,0 @@ -package io - -import ( - "bytes" - "crypto/sha256" - "fmt" - "io" - "log" - "net/http" - "net/url" - "regexp" - "strconv" - "strings" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" - "github.com/livepeer/go-livepeer/livepeer/storage/streaming" - "github.com/golang/groupcache/lru" - "github.com/kz26/m3u8" - lpmsCommon "github.com/livepeer/lpms/common" - "github.com/livepeer/lpms/types" - "github.com/nareix/joy4/av" - joy4rtmp "github.com/nareix/joy4/format/rtmp" -) - -func CopyChannelToChannel(inChan chan *streaming.VideoChunk, outChan chan *streaming.VideoChunk) { - for { - select { - case chunk := <-inChan: - outChan <- chunk - default: - } - } -} - -func Transcode(inChan chan *streaming.VideoChunk, outChan chan *streaming.VideoChunk, newStreamID streaming.StreamID, - format string, bitrate string, codecin string, codecout string, closeStreamC chan bool) (err error) { - if codecin != "RTMP" { - return fmt.Errorf("Only support RTMP as input stream") - } - - if format != "HLS" { - return fmt.Errorf("Only support HLS as output format") - } - - if bitrate != "1000" && bitrate != "500" { - return fmt.Errorf("Only support 500 and 1000 bitrate") - } - - dstConn, err := joy4rtmp.Dial("rtmp://localhost:" + lpmsCommon.GetConfig().SrsRTMPPort + "/stream/" + string(newStreamID)) - if err != nil { - glog.V(logger.Error).Infof("Error connecting to SRS server: ", err) - return err - } - - //Upload the video to SRS - go CopyRTMPFromChannel(dstConn, inChan, closeStreamC) - - msChan := make(chan *types.Download, 1024) - m3u8Chan := make(chan []byte) - hlsSegChan := make(chan streaming.HlsSegment) - //Download the playlist - go GetHlsPlaylist("http://localhost:"+lpmsCommon.GetConfig().SrsHTTPPort+"/stream/"+string(newStreamID)+"_hls"+bitrate+".m3u8", time.Duration(0), true, msChan, m3u8Chan) - //Download the segments - go DownloadHlsSegment(msChan, hlsSegChan) - //Copy the playlist and hls segments to a stream - go CopyHlsToChannel(m3u8Chan, hlsSegChan, outChan, closeStreamC) - - return -} - -//Copy packets from channels in the streamer to our destination muxer -func CopyRTMPFromStream(dst av.Muxer, stream *streaming.Stream, closeStreamC chan bool) (err error) { - if len(stream.SrcVideoChan) > 0 { - //First check SrcVideoChan, and then check DstVideoChan - CopyRTMPFromChannel(dst, stream.SrcVideoChan, closeStreamC) - } else { - CopyRTMPFromChannel(dst, stream.DstVideoChan, closeStreamC) - } - - return -} - -func CopyRTMPFromChannel(dst av.Muxer, videoChan chan *streaming.VideoChunk, closeStreamC chan bool) (err error) { - chunk := <-videoChan - if err := dst.WriteHeader(chunk.HeaderStreams); err != nil { - fmt.Println("Error writing header copying from channel") - return err - } - - for { - select { - case chunk := <-videoChan: - // fmt.Println("Copying from channel") - if chunk.ID == streaming.EOFStreamMsgID { - fmt.Println("Copying EOF from channel") - closeStreamC <- true - err := dst.WriteTrailer() - if err != nil { - fmt.Println("Error writing trailer: ", err) - return err - } - } - err := dst.WritePacket(chunk.Packet) - if chunk.Seq%100 == 0 { - glog.V(logger.Info).Infof("Copy RTMP to muxer from channel. %d", chunk.Seq) - } - if err != nil { - glog.V(logger.Error).Infof("Error writing packet to video player: %s", err) - return err - } - } - } -} - -//Copy HLS segments and playlist to the streamer channel. -func CopyHlsToChannel(m3u8Chan chan []byte, hlsSegChan chan streaming.HlsSegment, outChan chan *streaming.VideoChunk, closeStreamC chan bool) { - for { - select { - case m3u8 := <-m3u8Chan: - // stream.M3U8 = m3u8 //Just for testing - fmt.Printf("Sending HLS Playlist: %s\n", string(m3u8)) - CopyPacketsToChannel(1, nil, nil, m3u8, streaming.HlsSegment{}, outChan, closeStreamC) - case hlsSeg := <-hlsSegChan: - regex, _ := regexp.Compile("-(\\d)*") - match := regex.FindString(hlsSeg.Name) - segNumStr := match[1:len(match)] - segNum, _ := strconv.Atoi(segNumStr) - // stream.HlsSegNameMap[hlsSeg.Name] = hlsSeg.Data //Just for testing - fmt.Printf("Sending HLS Segment: %d, %s\n", segNum, segNumStr) - CopyPacketsToChannel(int64(segNum), nil, nil, nil, hlsSeg, outChan, closeStreamC) - } - } -} - -// func CopyHlsToChannel(stream *streaming.Stream) (err error) { -// for { -// select { -// case m3u8 := <-stream.M3U8Chan: -// // stream.M3U8 = m3u8 //Just for testing -// fmt.Printf("Sending HLS Playlist: %s\n", string(m3u8)) -// CopyPacketsToChannel(1, nil, nil, m3u8, streaming.HlsSegment{}, stream.SrcVideoChan) -// case hlsSeg := <-stream.HlsSegChan: -// regex, _ := regexp.Compile("-(\\d)*") -// match := regex.FindString(hlsSeg.Name) -// segNumStr := match[1:len(match)] -// segNum, _ := strconv.Atoi(segNumStr) -// // stream.HlsSegNameMap[hlsSeg.Name] = hlsSeg.Data //Just for testing -// fmt.Printf("Sending HLS Segment: %d, %s\n", segNum, segNumStr) -// CopyPacketsToChannel(int64(segNum), nil, nil, nil, hlsSeg, stream.SrcVideoChan) -// } -// } -// } - -//Copy packets from our source demuxer to the streamer channels. For now we put the header in every packet. We can -//optimize for packet size later. -func CopyToChannel(src av.Demuxer, stream *streaming.Stream, closeStreamC chan bool) (err error) { - var streams []av.CodecData - if streams, err = src.Streams(); err != nil { - return - } - for seq := int64(0); ; seq++ { - if err = CopyPacketsToChannel(seq, src, streams, nil, streaming.HlsSegment{}, stream.SrcVideoChan, closeStreamC); err != nil { - return - } - } - return -} - -// func CopyPacketsToChannel(seq int64, src av.PacketReader, headerStreams []av.CodecData, m3u8 []byte, hlsSeg streaming.HlsSegment, stream *streaming.Stream) (err error) { -func CopyPacketsToChannel(seq int64, src av.PacketReader, headerStreams []av.CodecData, m3u8 []byte, hlsSeg streaming.HlsSegment, outVideoChan chan *streaming.VideoChunk, closeStreamC chan bool) (err error) { - // for seq := int64(0); ; seq++ { - var pkt av.Packet - if src != nil { - if pkt, err = src.ReadPacket(); err != nil { - if err == io.EOF { - chunk := &streaming.VideoChunk{ - ID: streaming.EOFStreamMsgID, - Seq: seq, - HeaderStreams: headerStreams, - Packet: pkt, - } - // stream.SrcVideoChan <- chunk - outVideoChan <- chunk - fmt.Println("Done with packet reading: ", err) - - // Close the channel so that the protocol.go loop - // reading from the channel doesn't block - close(outVideoChan) - closeStreamC <- true - return fmt.Errorf("EOF") - } - return - } - } - - chunk := &streaming.VideoChunk{ - ID: streaming.DeliverStreamMsgID, - Seq: seq, - HeaderStreams: headerStreams, - Packet: pkt, - M3U8: m3u8, - HLSSegData: hlsSeg.Data, - HLSSegName: hlsSeg.Name, - } - - select { - // case stream.SrcVideoChan <- chunk: - case outVideoChan <- chunk: - if chunk.Seq%100 == 0 { - fmt.Printf("sent video chunk: %d, %s\n", chunk.Seq, hlsSeg.Name) - } - default: - } - - return -} - -func doRequest(c *http.Client, req *http.Request) (*http.Response, error) { - // req.Header.Set("User-Agent", USER_AGENT) - resp, err := c.Do(req) - return resp, err -} - -func DownloadHlsSegment(dlc chan *types.Download, segChan chan streaming.HlsSegment) { - for v := range dlc { - req, err := http.NewRequest("GET", v.URI, nil) - if err != nil { - log.Fatal(err) - } - resp, err := doRequest(&http.Client{}, req) - if err != nil { - log.Print(err) - continue - } - if resp.StatusCode != 200 { - log.Printf("Received HTTP %v for %v\n", resp.StatusCode, v.URI) - continue - } - - // Get the segment name - need to store in a map - match := strings.Split(v.URI, "/") - filename := match[len(match)-1] - buf := new(bytes.Buffer) - _, err = io.Copy(buf, resp.Body) - if err != nil { - log.Fatal(err) - } - - seg := &streaming.HlsSegment{ - Data: buf.Bytes(), - Name: filename, - } - // fmt.Println("Got HLS segment: ", filename) - - segChan <- *seg - resp.Body.Close() - // log.Printf("Downloaded %v\n", v.URI) - } -} - -func GetHlsPlaylist(urlStr string, recTime time.Duration, useLocalTime bool, dlc chan *types.Download, playlistChan chan []byte) { - fmt.Println("Getting playlist: ", urlStr) - startTime := time.Now() - var recDuration time.Duration = 0 - cache := lru.New(1024) - playlistUrl, err := url.Parse(urlStr) - if err != nil { - log.Fatal(err) - } - for { - req, err := http.NewRequest("GET", urlStr, nil) - if err != nil { - log.Fatal(err) - } - resp, err := doRequest(&http.Client{}, req) - if err != nil { - log.Print(err) - time.Sleep(time.Duration(3) * time.Second) - } - - playlist, listType, err := m3u8.DecodeFrom(resp.Body, true) - if playlist == nil { - //SRS doesn't serve the video right away. It take a few seconds. May be a param we can tune later. - waitTime := time.Second * 5 - fmt.Println("Cannot read playlist from ", urlStr, resp.StatusCode, "Waiting", waitTime) - time.Sleep(waitTime) - } else { - // fmt.Println("Got playlist", urlStr) - buf := playlist.Encode() - bytes := buf.Bytes() - // fmt.Println("sending playlist to playlistChan", bytes) - playlistChan <- bytes - resp.Body.Close() - if listType == m3u8.MEDIA { - mpl := playlist.(*m3u8.MediaPlaylist) - for _, v := range mpl.Segments { - if v != nil { - var msURI string - if strings.HasPrefix(v.URI, "http") { - msURI, err = url.QueryUnescape(v.URI) - if err != nil { - log.Fatal(err) - } - } else { - msUrl, err := playlistUrl.Parse(v.URI) - if err != nil { - log.Print(err) - continue - } - msURI, err = url.QueryUnescape(msUrl.String()) - if err != nil { - log.Fatal(err) - } - } - _, hit := cache.Get(msURI) - if !hit { - cache.Add(msURI, nil) - if useLocalTime { - recDuration = time.Now().Sub(startTime) - } else { - recDuration += time.Duration(int64(v.Duration * 1000000000)) - } - dlc <- &types.Download{ - URI: msURI, - TotalDuration: recDuration} - } - if recTime != 0 && recDuration != 0 && recDuration >= recTime { - close(dlc) - return - } - } - } - if mpl.Closed { - close(dlc) - return - } else { - time.Sleep(time.Duration(int64(mpl.TargetDuration * 1000000000))) - } - } else { - log.Fatal("Not a valid media playlist") - } - } - } -} - -func rememberHlsSegs(nameSegMap *map[string][]byte, segChan chan streaming.HlsSegment) { - for { - select { - case seg := <-segChan: - fmt.Println("Got a HLS segment:", seg.Name) - (*nameSegMap)[seg.Name] = seg.Data - } - } -} - -func createTranscodeId(streamID streaming.StreamID, bReq types.BroadcastReq) common.Hash { - //Create a "transcodeID" in the same keyspace as stream.ID - fmt.Println("Creating transcode ID with: ", streamID, bReq) - h := sha256.New() - h.Write([]byte(streamID)) - h.Write([]byte(fmt.Sprintf("%v", bReq))) - id := h.Sum(nil) - - var x common.Hash - if len(x) != len(id) { - panic("Error creating trasncode ID") - } - for i := 0; i < len(x); i++ { - x[i] = id[i] - } - - fmt.Println("Transcode ID: ", x) - - return x -} diff --git a/lpms b/lpms new file mode 100755 index 0000000000..915a5f8d2c Binary files /dev/null and b/lpms differ diff --git a/lpms.go b/lpms.go index 23a452d357..a347635e7b 100644 --- a/lpms.go +++ b/lpms.go @@ -1,26 +1,148 @@ -//Adding the RTMP server. This will put up a RTMP endpoint when starting up Swarm. -//It's a simple RTMP server that will take a video stream and play it right back out. -//After bringing up the Swarm node with RTMP enabled, try it out using: -// -//ffmpeg -re -i bunny.mp4 -c copy -f flv rtmp://localhost/movie -//ffplay rtmp://localhost/movie - +//The RTMP server. This will put up a RTMP endpoint when starting up Swarm. +//To integrate with LPMS means your code will become the source / destination of the media server. +//This RTMP endpoint is mainly used for video upload. The expected url is rtmp://localhost:port/livepeer/stream package lpms import ( - "github.com/livepeer/go-livepeer/livepeer/network" - "github.com/livepeer/go-livepeer/livepeer/storage" - "github.com/livepeer/go-livepeer/livepeer/storage/streaming" - "github.com/livepeer/lpms/common" - "github.com/livepeer/lpms/server" - streamingVizClient "github.com/livepeer/streamingviz/client" + "context" + "encoding/json" + "net/http" + + "github.com/golang/glog" + "github.com/livepeer/lpms/stream" + "github.com/livepeer/lpms/transcoder" + "github.com/livepeer/lpms/vidlistener" + "github.com/livepeer/lpms/vidplayer" + "github.com/nareix/joy4/av" + + joy4rtmp "github.com/nareix/joy4/format/rtmp" ) -func StartVideoServer(rtmpPort string, httpPort string, srsRtmpPort string, srsHttpPort string, streamer *streaming.Streamer, - forwarder storage.CloudStore, streamdb *network.StreamDB, viz *streamingVizClient.Client) { +type LPMS struct { + rtmpServer *joy4rtmp.Server + vidPlayer *vidplayer.VidPlayer + vidListen *vidlistener.VidListener + httpPort string + srsRTMPPort string + srsHTTPPort string +} - common.SetConfig(srsRtmpPort, srsHttpPort, rtmpPort, httpPort) - server.StartRTMPServer(rtmpPort, srsRtmpPort, srsHttpPort, streamer, forwarder, viz) - server.StartHTTPServer(rtmpPort, httpPort, srsRtmpPort, srsHttpPort, streamer, forwarder, streamdb, viz) +type transcodeReq struct { + Formats []string + Bitrates []string + Codecin string + Codecout []string + StreamID string +} + +//New creates a new LPMS server object. It really just brokers everything to the components. +func New(rtmpPort string, httpPort string, srsRTMPPort string, srsHTTPPort string) *LPMS { + server := &joy4rtmp.Server{Addr: (":" + rtmpPort)} + player := &vidplayer.VidPlayer{RtmpServer: server} + listener := &vidlistener.VidListener{RtmpServer: server} + return &LPMS{rtmpServer: server, vidPlayer: player, vidListen: listener, srsRTMPPort: srsRTMPPort, srsHTTPPort: srsHTTPPort, httpPort: httpPort} +} + +//Start starts the rtmp and http server +func (l *LPMS) Start() error { + ec := make(chan error, 1) + go func() { + glog.Infof("Starting LPMS Server at :%v", l.rtmpServer.Addr) + ec <- l.rtmpServer.ListenAndServe() + }() + go func() { + glog.Infof("Starting HTTP Server at :%v", l.httpPort) + ec <- http.ListenAndServe(":"+l.httpPort, nil) + }() + + select { + case err := <-ec: + glog.Infof("LPMS Server Error: %v. Quitting...", err) + return err + } +} + +//HandleRTMPPublish offload to the video listener +func (l *LPMS) HandleRTMPPublish( + getStreamID func(reqPath string) (string, error), + stream func(reqPath string) (*stream.Stream, error), + endStream func(reqPath string)) error { + + return l.vidListen.HandleRTMPPublish(getStreamID, stream, endStream) +} + +//HandleRTMPPlay offload to the video player +func (l *LPMS) HandleRTMPPlay(getStream func(ctx context.Context, reqPath string, dst av.MuxCloser) error) error { + return l.vidPlayer.HandleRTMPPlay(getStream) +} + +//HandleHLSPlay offload to the video player +func (l *LPMS) HandleHLSPlay(getStream func(reqPath string) (*stream.HLSBuffer, error)) error { + return l.vidPlayer.HandleHLSPlay(getStream) +} + +//HandleTranscode kicks off a transcoding process, keeps a local HLS buffer, and returns the new stream ID. +//stream is the video stream you want to be transcoded. getNewStreamID gives you a way to name the transcoded stream. +func (l *LPMS) HandleTranscode(getInStream func(ctx context.Context, streamID string) (*stream.Stream, error), getOutStream func(ctx context.Context, streamID string) (*stream.Stream, error)) { + http.HandleFunc("/transcode", func(w http.ResponseWriter, r *http.Request) { + ctx, _ := context.WithCancel(context.Background()) + // defer cancel() + + //parse transcode request + decoder := json.NewDecoder(r.Body) + var tReq transcodeReq + if r.Body == nil { + http.Error(w, "Please send a request body", 400) + return + } + err := decoder.Decode(&tReq) + if err != nil { + http.Error(w, err.Error(), 400) + return + } + + //Get the RTMP Stream + inStream, err := getInStream(ctx, tReq.StreamID) + if err != nil { + http.Error(w, err.Error(), 400) + return + } + + //Get the HLS Stream + newStream, err := getOutStream(ctx, tReq.StreamID) + if err != nil { + http.Error(w, err.Error(), 400) + } + + ec := make(chan error, 1) + go func() { ec <- l.doTranscoding(ctx, inStream, newStream) }() + + w.Write([]byte("New Stream: " + newStream.StreamID)) + }) +} + +func (l *LPMS) doTranscoding(ctx context.Context, inStream *stream.Stream, newStream *stream.Stream) error { + t := transcoder.New(l.srsRTMPPort, l.srsHTTPPort, newStream.StreamID) + //Should kick off a goroutine for this, so we can return the new streamID rightaway. + + tranMux, err := t.LocalSRSUploadMux() + if err != nil { + return err + // http.Error(w, "Cannot create a connection with local transcoder", 400) + } + + uec := make(chan error, 1) + go func() { uec <- t.StartUpload(ctx, tranMux, inStream) }() + dec := make(chan error, 1) + go func() { dec <- t.StartDownload(ctx, newStream) }() + + select { + case err := <-uec: + return err + // http.Error(w, "Cannot upload stream to transcoder: "+err.Error(), 400) + case err := <-dec: + return err + // http.Error(w, "Cannot download stream from transcoder: "+err.Error(), 400) + } } diff --git a/objs/srs.log b/objs/srs.log new file mode 100644 index 0000000000..e69de29bb2 diff --git a/server/httpServer.go b/server/httpServer.go deleted file mode 100644 index 0beb6a3488..0000000000 --- a/server/httpServer.go +++ /dev/null @@ -1,232 +0,0 @@ -package server - -import ( - "encoding/json" - "fmt" - "io" - "mime" - "net/http" - "path" - "regexp" - "strings" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" - "github.com/livepeer/go-livepeer/livepeer/network" - "github.com/ethereum/go-ethereum/swarm/network/kademlia" - "github.com/livepeer/go-livepeer/livepeer/storage" - "github.com/livepeer/go-livepeer/livepeer/storage/streaming" - lpmsIo "github.com/livepeer/lpms/io" - streamingVizClient "github.com/livepeer/streamingviz/client" - "github.com/nareix/joy4/format/flv" -) - -//This is for flushing to http request handlers (joy4 concept) -type writeFlusher struct { - httpflusher http.Flusher - io.Writer -} - -func (self writeFlusher) Flush() error { - self.httpflusher.Flush() - return nil -} - -type broadcastReq struct { - Formats []string - Bitrates []string - Codecin string - Codecout []string - StreamID string -} - -func StartHTTPServer(rtmpPort string, httpPort string, srsRtmpPort string, srsHttpPort string, streamer *streaming.Streamer, forwarder storage.CloudStore, streamdb *network.StreamDB, viz *streamingVizClient.Client) { - glog.V(logger.Info).Infof("Starting HTTP Server at port: ", httpPort) - - http.HandleFunc("/stream/", func(w http.ResponseWriter, r *http.Request) { - fmt.Println("In handleFunc, Path: ", r.URL.Path) - - var strmID string - //Example path: /stream/133bd3c4e543e3cd53e2cf2b366eeeace7eae483b651b8b1e2a2072b250864fc62b0bac9f64df186c4fb74d427f136647dcf0ead9198dc7d9f881b1d5c2d2132-0.ts - regex, _ := regexp.Compile("\\/stream\\/([[:alpha:]]|\\d)*") - match := regex.FindString(r.URL.Path) - if match != "" { - strmID = strings.Replace(match, "/stream/", "", -1) - } - - glog.V(logger.Info).Infof("Got streamID as %v", strmID) - - if strings.HasSuffix(r.URL.Path, ".m3u8") == true { - stream, err := streamer.GetStreamByStreamID(streaming.StreamID(strmID)) - if stream == nil { - stream, err = streamer.SubscribeToStream(strmID) - if err != nil { - glog.V(logger.Info).Infof("Error subscribing to stream %v", err) - return - } - //Send subscribe request - forwarder.Stream(strmID, kademlia.Address(common.HexToHash(""))) - } - - //HLS request. Example: http://localhost:8080/stream/streamid.m3u8 - countdown := 12 - for countdown > 0 { - if stream.M3U8 != nil { - break - } else { - fmt.Println("Waiting for playlist") - time.Sleep(time.Second * 5) - } - countdown = countdown - 1 - } - if countdown == 0 { - w.WriteHeader(404) - w.Write([]byte("Cannot find playlist for HLS")) - } - // w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") - w.Header().Set("Content-Type", mime.TypeByExtension(path.Ext(r.URL.Path))) - w.Header().Set("Access-Control-Allow-Origin", "*") - w.Write(stream.M3U8) - fmt.Println("Writing Playlist in handler: ", string(stream.M3U8)) - // go rememberHlsSegs(&stream.HlsSegNameMap, stream.HlsSegChan) // this is only used for testing viewer on publisher. Publisher doesn't need to remember HLS segments - // return - } else if strings.HasSuffix(r.URL.Path, ".ts") == true { - //HLS video segments - - stream, _ := streamer.GetStreamByStreamID(streaming.StreamID(strmID)) - fmt.Println("Got requests for: ", r.URL.Path) - match := strings.Split(r.URL.Path, "/") - filename := match[len(match)-1] - - countdown := 60 //Total wait time is 60 seconds. Make the single wait smaller to minimize total delay. - for countdown > 0 { - if stream.HlsSegNameMap[filename] != nil { - w.Header().Set("Content-Type", mime.TypeByExtension(path.Ext(r.URL.Path))) - w.Write(stream.HlsSegNameMap[filename]) - break - } else { - fmt.Println("Waiting 1s for segment", filename, ", ", countdown) - time.Sleep(time.Second * 1) - } - countdown = countdown - 1 - } - - if countdown == 0 { - w.WriteHeader(500) - return - } - } else { - //Assume rtmp - fmt.Println("Assumign rtmp: ", r.URL.Path) - stream, err := streamer.GetStreamByStreamID(streaming.StreamID(strmID)) - if stream == nil { - stream, err = streamer.SubscribeToStream(strmID) - if err != nil { - glog.V(logger.Info).Infof("Error subscribing to stream %v", err) - return - } - //Send subscribe request - forwarder.Stream(strmID, kademlia.Address(common.HexToHash(""))) - } - - w.Header().Set("Content-Type", "video/x-flv") - w.Header().Set("Transfer-Encoding", "chunked") - w.WriteHeader(200) - flusher := w.(http.Flusher) - flusher.Flush() - - muxer := flv.NewMuxerWriteFlusher(writeFlusher{httpflusher: flusher, Writer: w}) - //Cannot kick off a go routine here because the ResponseWriter is not a pointer (so a copy of the writer doesn't make any sense) - lpmsIo.CopyRTMPFromStream(muxer, stream, stream.CloseChan) - } - }) - - http.HandleFunc("/broadcast", func(w http.ResponseWriter, r *http.Request) { - glog.V(logger.Info).Infof("Got broadcast request") - decoder := json.NewDecoder(r.Body) - var bReq broadcastReq - if r.Body == nil { - http.Error(w, "Please send a request body", 400) - return - } - err := decoder.Decode(&bReq) - // glog.V(logger.Info).Infof("http body: ", r.Body) - if err != nil { - http.Error(w, err.Error(), 400) - return - } - bReq.Codecin = "RTMP" - glog.V(logger.Info).Infof("Broadcast request: ", bReq) - - var transcodeId common.Hash - if len(r.URL.Query()["transcodeId"]) > 0 { - str := r.URL.Query()["transcodeId"][0] - transcodeId = common.HexToHash(str) - glog.V(logger.Info).Infof("transcodeId %x", transcodeId[:]) - } else { - //generate an completely random id - transcodeId = common.HexToHash(string(streaming.MakeStreamID(streaming.RandomStreamID(), fmt.Sprintf("%x", streaming.RandomStreamID())))) - } - - // streamID := r.URL.Query()["streamId"][0] - streamID := bReq.StreamID - stream, _ := streamer.GetStreamByStreamID(streaming.StreamID(streamID)) - if stream == nil { - // stream, _ = streamer.AddNewStream() - //Require a stream to exist first - w.WriteHeader(404) - w.Write([]byte("Cannot find stream with ID: " + streamID)) - } - forwarder.Transcode(string(stream.ID), transcodeId, bReq.Formats, bReq.Bitrates, bReq.Codecin, bReq.Codecout) - glog.V(logger.Info).Infof("Broadcast Original Stream: %s. Waiting for ack...", stream.ID) - }) - - http.HandleFunc("/transcodedVideo", func(w http.ResponseWriter, r *http.Request) { - glog.V(logger.Info).Infof("Getting transcoded video") - videos := streamdb.TranscodedStreams[streaming.StreamID(r.URL.Query()["originStreamID"][0])] - js, err := json.Marshal(videos) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - w.Header().Set("Content-Type", "application/json") - w.Write(js) - }) - - http.HandleFunc("/streamIDs", func(w http.ResponseWriter, r *http.Request) { - fmt.Println("Getting stream ids") - streams := streamer.GetAllStreams() - js, err := json.Marshal(streams) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - w.Header().Set("Content-Type", "application/json") - w.Write(js) - return - }) - - http.HandleFunc("/streamEndpoint", func(w http.ResponseWriter, r *http.Request) { - fmt.Println("Getting stream endpoint") - resp := map[string]string{"url": "rtmp://localhost:" + rtmpPort + "/live/stream"} - js, _ := json.Marshal(resp) - - w.Header().Set("Content-Type", "application/json") - w.Write(js) - }) - - //For serving static HTML files (web-based broadcaster and viewer) - fs := http.FileServer(http.Dir("static")) - fmt.Println("Serving static files from: ", fs) - http.Handle("/static/", http.StripPrefix("/static/", fs)) - - http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - http.Redirect(w, r, "/static/broadcast.html", 301) - }) - - go http.ListenAndServe(":"+httpPort, nil) -} diff --git a/server/rtmpServer.go b/server/rtmpServer.go deleted file mode 100644 index 25f88f4b39..0000000000 --- a/server/rtmpServer.go +++ /dev/null @@ -1,121 +0,0 @@ -package server - -import ( - "fmt" - "regexp" - "strings" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" - "github.com/ethereum/go-ethereum/swarm/network/kademlia" - "github.com/livepeer/go-livepeer/livepeer/storage" - "github.com/livepeer/go-livepeer/livepeer/storage/streaming" - "github.com/livepeer/lpms/io" - "github.com/livepeer/lpms/types" - streamingVizClient "github.com/livepeer/streamingviz/client" - "github.com/nareix/joy4/av/avutil" - joy4rtmp "github.com/nareix/joy4/format/rtmp" -) - -var srsRTMPPort string - -func SrsRTMPPort() string { - return srsRTMPPort -} - -func StartRTMPServer(rtmpPort string, srsRtmpPort string, srsHttpPort string, streamer *streaming.Streamer, forwarder storage.CloudStore, viz *streamingVizClient.Client) { - if rtmpPort == "" { - rtmpPort = "1935" - } - fmt.Println("Starting RTMP Server on port: ", rtmpPort) - server := &joy4rtmp.Server{Addr: ":" + rtmpPort} - - srsRTMPPort = srsRtmpPort - - server.HandlePlay = func(conn *joy4rtmp.Conn) { - glog.V(logger.Info).Infof("Trying to play stream at %v", conn.URL) - - // Parse the streamID from the path host:port/stream/{streamID} - var strmID string - regex, _ := regexp.Compile("\\/stream\\/([[:alpha:]]|\\d)*") - match := regex.FindString(conn.URL.Path) - if match != "" { - strmID = strings.Replace(match, "/stream/", "", -1) - } - - glog.V(logger.Info).Infof("Got streamID as %v", strmID) - viz.LogConsume(strmID) - stream, err := streamer.GetStreamByStreamID(streaming.StreamID(strmID)) - if stream == nil { - stream, err = streamer.SubscribeToStream(strmID) - if err != nil { - glog.V(logger.Info).Infof("Error subscribing to stream %v", err) - return - } - } else { - fmt.Println("Found stream: ", strmID) - } - - //Send subscribe request - forwarder.Stream(strmID, kademlia.Address(common.HexToHash(""))) - - //Copy chunks to outgoing connection - go io.CopyRTMPFromStream(conn, stream, stream.CloseChan) - } - - server.HandlePublish = func(conn *joy4rtmp.Conn) { - transcodeParam := conn.URL.Query()["transcode"] - if (len(transcodeParam) > 0) && (transcodeParam[0] == "true") { - //For now, we rely on SRS. The next iteraion will be looking into directly integrating ffmpeg - //First, forward the rtmp stream to the local SRS server (always running on . - //Then, issue http req through the HLS endpoint. - stream, _ := streamer.AddNewStream() - glog.V(logger.Info).Infof("Added a new stream with id: %v", stream.ID) - viz.LogBroadcast(string(stream.ID)) - dstConn, err := joy4rtmp.Dial("rtmp://localhost:" + srsRtmpPort + "/stream/" + string(stream.ID)) - if err != nil { - glog.V(logger.Error).Infof("Error connecting to SRS server: ", err) - return - } - - //To pass segment name from the playlist to the segment download routine. - msChan := make(chan *types.Download, 1024) - - //Copy to SRS rtmp - go avutil.CopyFile(dstConn, conn) - //Kick off goroutine to listen for HLS playlist file - go io.GetHlsPlaylist("http://localhost:"+srsHttpPort+"/stream/"+string(stream.ID)+".m3u8", time.Duration(0), true, msChan, stream.M3U8Chan) - //Download the segments - go io.DownloadHlsSegment(msChan, stream.HlsSegChan) - //Copy Hls segments to swarm - go io.CopyHlsToChannel(stream.M3U8Chan, stream.HlsSegChan, stream.SrcVideoChan, stream.CloseChan) - // go io.CopyHlsToChannel(stream) - } else { - //Do regular RTMP stuff - create a new stream, copy the video to the stream. - var strmID string - var stream *streaming.Stream - regex, _ := regexp.Compile("\\/stream\\/([[:alpha:]]|\\d)*") - match := regex.FindString(conn.URL.Path) - if match != "" { - strmID = strings.Replace(match, "/stream/", "", -1) - stream, _ = streamer.GetStreamByStreamID(streaming.StreamID(strmID)) - } - - if stream == nil { - stream, _ = streamer.AddNewStream() - glog.V(logger.Info).Infof("Added a new stream with id: %v", stream.ID) - } else { - glog.V(logger.Info).Infof("Got streamID as %v", strmID) - } - - viz.LogBroadcast(string(stream.ID)) - - //Send video to streamer channels - go io.CopyToChannel(conn, stream, stream.CloseChan) - } - } - - go server.ListenAndServe() -} diff --git a/stream/cmap.go b/stream/cmap.go new file mode 100644 index 0000000000..ffd7e64530 --- /dev/null +++ b/stream/cmap.go @@ -0,0 +1,317 @@ +package stream + +//Borrowed from https://github.com/orcaman/concurrent-map + +import ( + "encoding/json" + "sync" +) + +var SHARD_COUNT = 32 + +// A "thread" safe map of type string:Anything. +// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards. +type ConcurrentMap []*ConcurrentMapShared + +// A "thread" safe string to anything map. +type ConcurrentMapShared struct { + items map[string]interface{} + sync.RWMutex // Read Write mutex, guards access to internal map. +} + +// Creates a new concurrent map. +func NewCMap() ConcurrentMap { + m := make(ConcurrentMap, SHARD_COUNT) + for i := 0; i < SHARD_COUNT; i++ { + m[i] = &ConcurrentMapShared{items: make(map[string]interface{})} + } + return m +} + +// Returns shard under given key +func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared { + return m[uint(fnv32(key))%uint(SHARD_COUNT)] +} + +func (m ConcurrentMap) MSet(data map[string]interface{}) { + for key, value := range data { + shard := m.GetShard(key) + shard.Lock() + shard.items[key] = value + shard.Unlock() + } +} + +// Sets the given value under the specified key. +func (m *ConcurrentMap) Set(key string, value interface{}) { + // Get map shard. + shard := m.GetShard(key) + shard.Lock() + shard.items[key] = value + shard.Unlock() +} + +// Callback to return new element to be inserted into the map +// It is called while lock is held, therefore it MUST NOT +// try to access other keys in same map, as it can lead to deadlock since +// Go sync.RWLock is not reentrant +type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) interface{} + +// Insert or Update - updates existing element or inserts a new one using UpsertCb +func (m *ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{}) { + shard := m.GetShard(key) + shard.Lock() + v, ok := shard.items[key] + res = cb(ok, v, value) + shard.items[key] = res + shard.Unlock() + return res +} + +// Sets the given value under the specified key if no value was associated with it. +func (m *ConcurrentMap) SetIfAbsent(key string, value interface{}) bool { + // Get map shard. + shard := m.GetShard(key) + shard.Lock() + _, ok := shard.items[key] + if !ok { + shard.items[key] = value + } + shard.Unlock() + return !ok +} + +// Retrieves an element from map under given key. +func (m ConcurrentMap) Get(key string) (interface{}, bool) { + // Get shard + shard := m.GetShard(key) + shard.RLock() + // Get item from shard. + val, ok := shard.items[key] + shard.RUnlock() + return val, ok +} + +// Returns the number of elements within the map. +func (m ConcurrentMap) Count() int { + count := 0 + for i := 0; i < SHARD_COUNT; i++ { + shard := m[i] + shard.RLock() + count += len(shard.items) + shard.RUnlock() + } + return count +} + +// Looks up an item under specified key +func (m *ConcurrentMap) Has(key string) bool { + // Get shard + shard := m.GetShard(key) + shard.RLock() + // See if element is within shard. + _, ok := shard.items[key] + shard.RUnlock() + return ok +} + +// Removes an element from the map. +func (m *ConcurrentMap) Remove(key string) { + // Try to get shard. + shard := m.GetShard(key) + shard.Lock() + delete(shard.items, key) + shard.Unlock() +} + +// Removes an element from the map and returns it +func (m *ConcurrentMap) Pop(key string) (v interface{}, exists bool) { + // Try to get shard. + shard := m.GetShard(key) + shard.Lock() + v, exists = shard.items[key] + delete(shard.items, key) + shard.Unlock() + return v, exists +} + +// Checks if map is empty. +func (m *ConcurrentMap) IsEmpty() bool { + return m.Count() == 0 +} + +// Used by the Iter & IterBuffered functions to wrap two variables together over a channel, +type Tuple struct { + Key string + Val interface{} +} + +// Returns an iterator which could be used in a for range loop. +// +// Deprecated: using IterBuffered() will get a better performence +func (m ConcurrentMap) Iter() <-chan Tuple { + chans := snapshot(&m) + ch := make(chan Tuple) + go fanIn(chans, ch) + return ch +} + +// Returns a buffered iterator which could be used in a for range loop. +func (m ConcurrentMap) IterBuffered() <-chan Tuple { + chans := snapshot(&m) + total := 0 + for _, c := range chans { + total += cap(c) + } + ch := make(chan Tuple, total) + go fanIn(chans, ch) + return ch +} + +// Returns a array of channels that contains elements in each shard, +// which likely takes a snapshot of `m`. +// It returns once the size of each buffered channel is determined, +// before all the channels are populated using goroutines. +func snapshot(m *ConcurrentMap) (chans []chan Tuple) { + chans = make([]chan Tuple, SHARD_COUNT) + wg := sync.WaitGroup{} + wg.Add(SHARD_COUNT) + // Foreach shard. + for index, shard := range *m { + go func(index int, shard *ConcurrentMapShared) { + // Foreach key, value pair. + shard.RLock() + chans[index] = make(chan Tuple, len(shard.items)) + wg.Done() + for key, val := range shard.items { + chans[index] <- Tuple{key, val} + } + shard.RUnlock() + close(chans[index]) + }(index, shard) + } + wg.Wait() + return chans +} + +// fanIn reads elements from channels `chans` into channel `out` +func fanIn(chans []chan Tuple, out chan Tuple) { + wg := sync.WaitGroup{} + wg.Add(len(chans)) + for _, ch := range chans { + go func(ch chan Tuple) { + for t := range ch { + out <- t + } + wg.Done() + }(ch) + } + wg.Wait() + close(out) +} + +// Returns all items as map[string]interface{} +func (m ConcurrentMap) Items() map[string]interface{} { + tmp := make(map[string]interface{}) + + // Insert items to temporary map. + for item := range m.IterBuffered() { + tmp[item.Key] = item.Val + } + + return tmp +} + +// Iterator callback,called for every key,value found in +// maps. RLock is held for all calls for a given shard +// therefore callback sess consistent view of a shard, +// but not across the shards +type IterCb func(key string, v interface{}) + +// Callback based iterator, cheapest way to read +// all elements in a map. +func (m *ConcurrentMap) IterCb(fn IterCb) { + for idx := range *m { + shard := (*m)[idx] + shard.RLock() + for key, value := range shard.items { + fn(key, value) + } + shard.RUnlock() + } +} + +// Return all keys as []string +func (m ConcurrentMap) Keys() []string { + count := m.Count() + ch := make(chan string, count) + go func() { + // Foreach shard. + wg := sync.WaitGroup{} + wg.Add(SHARD_COUNT) + for _, shard := range m { + go func(shard *ConcurrentMapShared) { + // Foreach key, value pair. + shard.RLock() + for key := range shard.items { + ch <- key + } + shard.RUnlock() + wg.Done() + }(shard) + } + wg.Wait() + close(ch) + }() + + // Generate keys + keys := make([]string, 0, count) + for k := range ch { + keys = append(keys, k) + } + return keys +} + +//Reviles ConcurrentMap "private" variables to json marshal. +func (m ConcurrentMap) MarshalJSON() ([]byte, error) { + // Create a temporary map, which will hold all item spread across shards. + tmp := make(map[string]interface{}) + + // Insert items to temporary map. + for item := range m.IterBuffered() { + tmp[item.Key] = item.Val + } + return json.Marshal(tmp) +} + +func fnv32(key string) uint32 { + hash := uint32(2166136261) + const prime32 = uint32(16777619) + for i := 0; i < len(key); i++ { + hash *= prime32 + hash ^= uint32(key[i]) + } + return hash +} + +// Concurrent map uses Interface{} as its value, therefor JSON Unmarshal +// will probably won't know which to type to unmarshal into, in such case +// we'll end up with a value of type map[string]interface{}, In most cases this isn't +// out value type, this is why we've decided to remove this functionality. + +// func (m *ConcurrentMap) UnmarshalJSON(b []byte) (err error) { +// // Reverse process of Marshal. + +// tmp := make(map[string]interface{}) + +// // Unmarshal into a single map. +// if err := json.Unmarshal(b, &tmp); err != nil { +// return nil +// } + +// // foreach key,value pair in temporary map insert into our concurrent map. +// for key, val := range tmp { +// m.Set(key, val) +// } +// return nil +// } diff --git a/stream/hls.go b/stream/hls.go new file mode 100644 index 0000000000..4ba1e86cb6 --- /dev/null +++ b/stream/hls.go @@ -0,0 +1,94 @@ +package stream + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/golang/glog" + "github.com/kz26/m3u8" +) + +var ErrNotFound = errors.New("Not Found") + +type HLSDemuxer interface { + //This method should ONLY push a playlist onto a chan when it's a NEW playlist + WaitAndPopPlaylist(ctx context.Context) (m3u8.MediaPlaylist, error) + //This method should ONLY push a segment onto a chan when it's a NEW segment + WaitAndPopSegment(ctx context.Context, name string) ([]byte, error) +} + +type HLSMuxer interface { + WritePlaylist(m3u8.MediaPlaylist) error + WriteSegment(name string, s []byte) error +} + +//TODO: Write tests, set buffer size, kick out segments / playlists if too full +type HLSBuffer struct { + HoldTime time.Duration + plCacheNew bool + segCache *Queue + // pq *Queue + plCache m3u8.MediaPlaylist + sq *ConcurrentMap + lock sync.Locker +} + +func NewHLSBuffer() *HLSBuffer { + m := NewCMap() + return &HLSBuffer{plCacheNew: false, segCache: &Queue{}, HoldTime: time.Second, sq: &m, lock: &sync.Mutex{}} +} + +func (b *HLSBuffer) WritePlaylist(p m3u8.MediaPlaylist) error { + + b.lock.Lock() + b.plCache = p + b.plCacheNew = true + b.lock.Unlock() + return nil +} + +func (b *HLSBuffer) WriteSegment(name string, s []byte) error { + b.lock.Lock() + b.segCache.Put(name) + b.sq.Set(name, s) + b.lock.Unlock() + return nil +} + +func (b *HLSBuffer) WaitAndPopPlaylist(ctx context.Context) (m3u8.MediaPlaylist, error) { + for { + + if b.plCacheNew { + return b.plCache, nil + b.plCacheNew = false + } + time.Sleep(time.Second * 1) + select { + case <-ctx.Done(): + return m3u8.MediaPlaylist{}, ctx.Err() + default: + //Fall through here so we can loop back + } + } +} + +func (b *HLSBuffer) WaitAndPopSegment(ctx context.Context, name string) ([]byte, error) { + for { + seg, found := b.sq.Get(name) + glog.Infof("GetSegment: %v, %v", name, found) + if found { + b.sq.Remove(name) + return seg.([]byte), nil + } + + time.Sleep(time.Second * 1) + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + //Fall through here so we can loop back + } + } +} diff --git a/stream/queue.go b/stream/queue.go new file mode 100644 index 0000000000..7f189c5dd1 --- /dev/null +++ b/stream/queue.go @@ -0,0 +1,375 @@ +//Mostly take from github.com/Workiva/go-datastructures. +package stream + +import ( + "errors" + "runtime" + "sync" + "sync/atomic" + "time" +) + +var ( + // ErrDisposed is returned when an operation is performed on a disposed + // queue. + ErrDisposed = errors.New(`queue: disposed`) + + // ErrTimeout is returned when an applicable queue operation times out. + ErrTimeout = errors.New(`queue: poll timed out`) + + // ErrEmptyQueue is returned when an non-applicable queue operation was called + // due to the queue's empty item state + ErrEmptyQueue = errors.New(`queue: empty queue`) +) + +type waiters []*sema + +func (w *waiters) get() *sema { + if len(*w) == 0 { + return nil + } + + sema := (*w)[0] + copy((*w)[0:], (*w)[1:]) + (*w)[len(*w)-1] = nil // or the zero value of T + *w = (*w)[:len(*w)-1] + return sema +} + +func (w *waiters) put(sema *sema) { + *w = append(*w, sema) +} + +func (w *waiters) remove(sema *sema) { + if len(*w) == 0 { + return + } + // build new slice, copy all except sema + ws := *w + newWs := make(waiters, 0, len(*w)) + for i := range ws { + if ws[i] != sema { + newWs = append(newWs, ws[i]) + } + } + *w = newWs +} + +type items []interface{} + +func (items *items) get(number int64) []interface{} { + returnItems := make([]interface{}, 0, number) + index := int64(0) + for i := int64(0); i < number; i++ { + if i >= int64(len(*items)) { + break + } + + returnItems = append(returnItems, (*items)[i]) + (*items)[i] = nil + index++ + } + + *items = (*items)[index:] + return returnItems +} + +func (items *items) peek() (interface{}, bool) { + length := len(*items) + + if length == 0 { + return nil, false + } + + return (*items)[0], true +} + +func (items *items) getUntil(checker func(item interface{}) bool) []interface{} { + length := len(*items) + + if len(*items) == 0 { + // returning nil here actually wraps that nil in a list + // of interfaces... thanks go + return []interface{}{} + } + + returnItems := make([]interface{}, 0, length) + index := -1 + for i, item := range *items { + if !checker(item) { + break + } + + returnItems = append(returnItems, item) + index = i + (*items)[i] = nil // prevent memory leak + } + + *items = (*items)[index+1:] + return returnItems +} + +type sema struct { + ready chan bool + response *sync.WaitGroup +} + +func newSema() *sema { + return &sema{ + ready: make(chan bool, 1), + response: &sync.WaitGroup{}, + } +} + +// Queue is the struct responsible for tracking the state +// of the queue. +type Queue struct { + waiters waiters + items items + lock sync.Mutex + disposed bool +} + +// Put will add the specified items to the queue. +func (q *Queue) Put(items ...interface{}) error { + if len(items) == 0 { + return nil + } + + q.lock.Lock() + + if q.disposed { + q.lock.Unlock() + return ErrDisposed + } + + q.items = append(q.items, items...) + for { + sema := q.waiters.get() + if sema == nil { + break + } + sema.response.Add(1) + select { + case sema.ready <- true: + sema.response.Wait() + default: + // This semaphore timed out. + } + if len(q.items) == 0 { + break + } + } + + q.lock.Unlock() + return nil +} + +// Get retrieves items from the queue. If there are some items in the +// queue, get will return a number UP TO the number passed in as a +// parameter. If no items are in the queue, this method will pause +// until items are added to the queue. +func (q *Queue) Get(number int64) ([]interface{}, error) { + return q.Poll(number, 0) +} + +// Poll retrieves items from the queue. If there are some items in the queue, +// Poll will return a number UP TO the number passed in as a parameter. If no +// items are in the queue, this method will pause until items are added to the +// queue or the provided timeout is reached. A non-positive timeout will block +// until items are added. If a timeout occurs, ErrTimeout is returned. +func (q *Queue) Poll(number int64, timeout time.Duration) ([]interface{}, error) { + if number < 1 { + // thanks again go + return []interface{}{}, nil + } + + q.lock.Lock() + + if q.disposed { + q.lock.Unlock() + return nil, ErrDisposed + } + + var items []interface{} + + if len(q.items) == 0 { + sema := newSema() + q.waiters.put(sema) + q.lock.Unlock() + + var timeoutC <-chan time.Time + if timeout > 0 { + timeoutC = time.After(timeout) + } + select { + case <-sema.ready: + // we are now inside the put's lock + if q.disposed { + return nil, ErrDisposed + } + items = q.items.get(number) + sema.response.Done() + return items, nil + case <-timeoutC: + // cleanup the sema that was added to waiters + select { + case sema.ready <- true: + // we called this before Put() could + // Remove sema from waiters. + q.lock.Lock() + q.waiters.remove(sema) + q.lock.Unlock() + default: + // Put() got it already, we need to call Done() so Put() can move on + sema.response.Done() + } + return nil, ErrTimeout + } + } + + items = q.items.get(number) + q.lock.Unlock() + return items, nil +} + +// Peek returns a the first item in the queue by value +// without modifying the queue. +func (q *Queue) Peek() (interface{}, error) { + q.lock.Lock() + defer q.lock.Unlock() + + if q.disposed { + return nil, ErrDisposed + } + + peekItem, ok := q.items.peek() + if !ok { + return nil, ErrEmptyQueue + } + + return peekItem, nil +} + +// TakeUntil takes a function and returns a list of items that +// match the checker until the checker returns false. This does not +// wait if there are no items in the queue. +func (q *Queue) TakeUntil(checker func(item interface{}) bool) ([]interface{}, error) { + if checker == nil { + return nil, nil + } + + q.lock.Lock() + + if q.disposed { + q.lock.Unlock() + return nil, ErrDisposed + } + + result := q.items.getUntil(checker) + q.lock.Unlock() + return result, nil +} + +// Empty returns a bool indicating if this bool is empty. +func (q *Queue) Empty() bool { + q.lock.Lock() + defer q.lock.Unlock() + + return len(q.items) == 0 +} + +// Len returns the number of items in this queue. +func (q *Queue) Len() int64 { + q.lock.Lock() + defer q.lock.Unlock() + + return int64(len(q.items)) +} + +// Disposed returns a bool indicating if this queue +// has had disposed called on it. +func (q *Queue) Disposed() bool { + q.lock.Lock() + defer q.lock.Unlock() + + return q.disposed +} + +// Dispose will dispose of this queue and returns +// the items disposed. Any subsequent calls to Get +// or Put will return an error. +func (q *Queue) Dispose() []interface{} { + q.lock.Lock() + defer q.lock.Unlock() + + q.disposed = true + for _, waiter := range q.waiters { + waiter.response.Add(1) + select { + case waiter.ready <- true: + // release Poll immediately + default: + // ignore if it's a timeout or in the get + } + } + + disposedItems := q.items + + q.items = nil + q.waiters = nil + + return disposedItems +} + +// New is a constructor for a new threadsafe queue. +func NewQueue(hint int64) *Queue { + return &Queue{ + items: make([]interface{}, 0, hint), + } +} + +// ExecuteInParallel will (in parallel) call the provided function +// with each item in the queue until the queue is exhausted. When the queue +// is exhausted execution is complete and all goroutines will be killed. +// This means that the queue will be disposed so cannot be used again. +func ExecuteInParallel(q *Queue, fn func(interface{})) { + if q == nil { + return + } + + q.lock.Lock() // so no one touches anything in the middle + // of this process + todo, done := uint64(len(q.items)), int64(-1) + // this is important or we might face an infinite loop + if todo == 0 { + return + } + + numCPU := 1 + if runtime.NumCPU() > 1 { + numCPU = runtime.NumCPU() - 1 + } + + var wg sync.WaitGroup + wg.Add(numCPU) + items := q.items + + for i := 0; i < numCPU; i++ { + go func() { + for { + index := atomic.AddInt64(&done, 1) + if index >= int64(todo) { + wg.Done() + break + } + + fn(items[index]) + items[index] = 0 + } + }() + } + wg.Wait() + q.lock.Unlock() + q.Dispose() +} diff --git a/stream/stream.go b/stream/stream.go new file mode 100644 index 0000000000..eda35e7db0 --- /dev/null +++ b/stream/stream.go @@ -0,0 +1,193 @@ +package stream + +import ( + "context" + "errors" + "io" + "reflect" + + "time" + + "github.com/golang/glog" + "github.com/kz26/m3u8" + "github.com/nareix/joy4/av" +) + +var ErrBufferFull = errors.New("Stream Buffer Full") +var ErrBufferEmpty = errors.New("Stream Buffer Empty") +var ErrBufferItemType = errors.New("Buffer Item Type Not Recognized") +var ErrDroppedRTMPStream = errors.New("RTMP Stream Stopped Without EOF") +var ErrHttpReqFailed = errors.New("Http Request Failed") + +type RTMPEOF struct{} + +type streamBuffer struct { + q *Queue +} + +func newStreamBuffer() *streamBuffer { + return &streamBuffer{q: NewQueue(1000)} +} + +func (b *streamBuffer) push(in interface{}) error { + b.q.Put(in) + return nil +} + +func (b *streamBuffer) poll(wait time.Duration) (interface{}, error) { + results, err := b.q.Poll(1, wait) + if err != nil { + return nil, err + } + result := results[0] + return result, nil +} + +func (b *streamBuffer) pop() (interface{}, error) { + results, err := b.q.Get(1) + if err != nil { + return nil, err + } + result := results[0] + return result, nil +} + +func (b *streamBuffer) len() int64 { + return b.q.Len() +} + +type HLSSegment struct { + Name string + Data []byte +} + +type Stream struct { + StreamID string + RTMPTimeout time.Duration + HLSTimeout time.Duration + buffer *streamBuffer +} + +func (s *Stream) Len() int64 { + return s.buffer.len() +} + +func NewStream(id string) *Stream { + return &Stream{buffer: newStreamBuffer(), StreamID: id} +} + +//ReadRTMPFromStream reads the content from the RTMP stream out into the dst. +func (s *Stream) ReadRTMPFromStream(ctx context.Context, dst av.MuxCloser) error { + defer dst.Close() + + //TODO: Make sure to listen to ctx.Done() + for { + item, err := s.buffer.poll(s.RTMPTimeout) + if err != nil { + return err + } + + switch item.(type) { + case []av.CodecData: + headers := item.([]av.CodecData) + err = dst.WriteHeader(headers) + if err != nil { + glog.Infof("Error writing RTMP header from Stream %v to mux", s.StreamID) + return err + } + case av.Packet: + packet := item.(av.Packet) + err = dst.WritePacket(packet) + if err != nil { + glog.Infof("Error writing RTMP packet from Stream %v to mux", s.StreamID) + return err + } + case RTMPEOF: + err := dst.WriteTrailer() + if err != nil { + glog.Infof("Error writing RTMP trailer from Stream %v", s.StreamID) + return err + } + return io.EOF + default: + glog.Infof("Cannot recognize buffer iteam type: ", reflect.TypeOf(item)) + return ErrBufferItemType + } + } +} + +//WriteRTMPToStream writes a video stream from src into the stream. +func (s *Stream) WriteRTMPToStream(ctx context.Context, src av.DemuxCloser) error { + defer src.Close() + + c := make(chan error, 1) + go func() { + c <- func() error { + header, err := src.Streams() + if err != nil { + return err + } + err = s.buffer.push(header) + if err != nil { + return err + } + + // var lastKeyframe av.Packet + for { + packet, err := src.ReadPacket() + if err == io.EOF { + s.buffer.push(RTMPEOF{}) + return err + } else if err != nil { + return err + } else if len(packet.Data) == 0 { //TODO: Investigate if it's possible for packet to be nil (what happens when RTMP stopped publishing because of a dropped connection? Is it possible to have err and packet both nil?) + return ErrDroppedRTMPStream + } + + if packet.IsKeyFrame { + // lastKeyframe = packet + } + + err = s.buffer.push(packet) + if err == ErrBufferFull { + //TODO: Delete all packets until last keyframe, insert headers in front - trying to get rid of streaming artifacts. + } + } + }() + }() + + select { + case <-ctx.Done(): + glog.Infof("Finished writing RTMP to Stream %v", s.StreamID) + return ctx.Err() + case err := <-c: + return err + } +} + +func (s *Stream) WriteHLSPlaylistToStream(pl m3u8.MediaPlaylist) error { + return s.buffer.push(pl) +} + +func (s *Stream) WriteHLSSegmentToStream(seg HLSSegment) error { + return s.buffer.push(seg) +} + +//ReadHLSFromStream reads an HLS stream into an HLSBuffer +func (s *Stream) ReadHLSFromStream(buffer HLSMuxer) error { + for { + item, err := s.buffer.poll(s.HLSTimeout) + if err != nil { + return err + } + + switch item.(type) { + case m3u8.MediaPlaylist: + buffer.WritePlaylist(item.(m3u8.MediaPlaylist)) + case HLSSegment: + buffer.WriteSegment(item.(HLSSegment).Name, item.(HLSSegment).Data) + default: + return ErrBufferItemType + } + } +} diff --git a/stream/stream_test.go b/stream/stream_test.go new file mode 100644 index 0000000000..897f113d9b --- /dev/null +++ b/stream/stream_test.go @@ -0,0 +1,351 @@ +package stream + +import ( + "context" + "errors" + "io" + "runtime" + "testing" + + "time" + + "github.com/kz26/m3u8" + "github.com/nareix/joy4/av" +) + +//Testing WriteRTMP errors +var ErrPacketRead = errors.New("packet read error") +var ErrStreams = errors.New("streams error") + +type BadStreamsDemuxer struct{} + +func (d BadStreamsDemuxer) Close() error { return nil } +func (d BadStreamsDemuxer) Streams() ([]av.CodecData, error) { return nil, ErrStreams } +func (d BadStreamsDemuxer) ReadPacket() (av.Packet, error) { return av.Packet{Data: []byte{0, 0}}, nil } + +type BadPacketsDemuxer struct{} + +func (d BadPacketsDemuxer) Close() error { return nil } +func (d BadPacketsDemuxer) Streams() ([]av.CodecData, error) { return nil, nil } +func (d BadPacketsDemuxer) ReadPacket() (av.Packet, error) { + return av.Packet{Data: []byte{0, 0}}, ErrPacketRead +} + +type NoEOFDemuxer struct { + c *Counter +} + +type Counter struct { + Count int +} + +func (d NoEOFDemuxer) Close() error { return nil } +func (d NoEOFDemuxer) Streams() ([]av.CodecData, error) { return nil, nil } +func (d NoEOFDemuxer) ReadPacket() (av.Packet, error) { + if d.c.Count == 10 { + return av.Packet{}, nil + } + + d.c.Count = d.c.Count + 1 + return av.Packet{Data: []byte{0}}, nil +} + +func TestWriteRTMPErrors(t *testing.T) { + // stream := Stream{Buffer: &StreamBuffer{}, StreamID: "test"} + stream := NewStream("test") + err := stream.WriteRTMPToStream(context.Background(), BadStreamsDemuxer{}) + if err != ErrStreams { + t.Error("Expecting Streams Error, but got: ", err) + } + + err = stream.WriteRTMPToStream(context.Background(), BadPacketsDemuxer{}) + if err != ErrPacketRead { + t.Error("Expecting Packet Read Error, but got: ", err) + } + + err = stream.WriteRTMPToStream(context.Background(), NoEOFDemuxer{c: &Counter{Count: 0}}) + if err != ErrDroppedRTMPStream { + t.Error("Expecting RTMP Dropped Error, but got: ", err) + } +} + +//Testing WriteRTMP +type PacketsDemuxer struct { + c *Counter +} + +func (d PacketsDemuxer) Close() error { return nil } +func (d PacketsDemuxer) Streams() ([]av.CodecData, error) { return nil, nil } +func (d PacketsDemuxer) ReadPacket() (av.Packet, error) { + if d.c.Count == 10 { + return av.Packet{Data: []byte{0, 0}}, io.EOF + } + + d.c.Count = d.c.Count + 1 + return av.Packet{Data: []byte{0, 0}}, nil +} + +func TestWriteRTMP(t *testing.T) { + // stream := Stream{Buffer: NewStreamBuffer(), StreamID: "test"} + stream := NewStream("test") + err := stream.WriteRTMPToStream(context.Background(), PacketsDemuxer{c: &Counter{Count: 0}}) + + if err != io.EOF { + t.Error("Expecting EOF, but got: ", err) + } + + if stream.Len() != 12 { //10 packets, 1 header, 1 trailer + t.Error("Expecting buffer length to be 12, but got: ", stream.Len()) + } + + // fmt.Println(stream.buffer.q.Get(12)) + + //TODO: Test what happens when the buffer is full (should evict everything before the last keyframe) +} + +var ErrBadHeader = errors.New("BadHeader") +var ErrBadPacket = errors.New("BadPacket") + +type BadHeaderMuxer struct{} + +func (d BadHeaderMuxer) Close() error { return nil } +func (d BadHeaderMuxer) WriteHeader([]av.CodecData) error { return ErrBadHeader } +func (d BadHeaderMuxer) WriteTrailer() error { return nil } +func (d BadHeaderMuxer) WritePacket(av.Packet) error { return nil } + +type BadPacketMuxer struct{} + +func (d BadPacketMuxer) Close() error { return nil } +func (d BadPacketMuxer) WriteHeader([]av.CodecData) error { return nil } +func (d BadPacketMuxer) WriteTrailer() error { return nil } +func (d BadPacketMuxer) WritePacket(av.Packet) error { return ErrBadPacket } + +func TestReadRTMPError(t *testing.T) { + stream := NewStream("test") + err := stream.WriteRTMPToStream(context.Background(), PacketsDemuxer{c: &Counter{Count: 0}}) + if err != io.EOF { + t.Error("Error setting up the test - while inserting packet.") + } + err = stream.ReadRTMPFromStream(context.Background(), BadHeaderMuxer{}) + + if err != ErrBadHeader { + t.Error("Expecting bad header error, but got ", err) + } + + err = stream.ReadRTMPFromStream(context.Background(), BadPacketMuxer{}) + if err != ErrBadPacket { + t.Error("Expecting bad packet error, but got ", err) + } +} + +//Test ReadRTMP +type PacketsMuxer struct{} + +func (d PacketsMuxer) Close() error { return nil } +func (d PacketsMuxer) WriteHeader([]av.CodecData) error { return nil } +func (d PacketsMuxer) WriteTrailer() error { return nil } +func (d PacketsMuxer) WritePacket(av.Packet) error { return nil } + +func TestReadRTMP(t *testing.T) { + stream := NewStream("test") + err := stream.WriteRTMPToStream(context.Background(), PacketsDemuxer{c: &Counter{Count: 0}}) + if err != io.EOF { + t.Error("Error setting up the test - while inserting packet.") + } + readErr := stream.ReadRTMPFromStream(context.Background(), PacketsMuxer{}) + + if readErr != io.EOF { + t.Error("Expecting buffer to be empty, but got ", err) + } + + if stream.Len() != 0 { + t.Error("Expecting buffer length to be 0, but got ", stream.Len()) + } + + stream2 := NewStream("test2") + stream2.RTMPTimeout = time.Millisecond * 50 + err2 := stream.WriteRTMPToStream(context.Background(), NoEOFDemuxer{c: &Counter{Count: 0}}) + if err2 != ErrDroppedRTMPStream { + t.Error("Error setting up the test - while inserting packet.") + } + err2 = stream2.ReadRTMPFromStream(context.Background(), PacketsMuxer{}) + if err2 != ErrTimeout { + t.Error("Expecting timeout, but got", err2) + } +} + +func TestWriteHLS(t *testing.T) { + stream := NewStream("test") + err1 := stream.WriteHLSPlaylistToStream(m3u8.MediaPlaylist{}) + err2 := stream.WriteHLSSegmentToStream(HLSSegment{}) + if err1 != nil { + t.Error("Shouldn't be error writing playlist, but got:", err1) + } + if err2 != nil { + t.Error("Shouldn't be error writing segment, but got:", err2) + } + if stream.buffer.len() != 2 { + t.Error("Should have 2 packet, but got:", stream.buffer.len()) + } +} + +// struct TestHLSBuffer struct{} +// func (b *TestHLSBuffer) WritePlaylist(m3u8.MediaPlaylist) error { + +// } + +// func (b *TestHLSBuffer) WriteSegment(name string, s []byte) error { + +// } + +func TestReadHLS(t *testing.T) { + stream := NewStream("test") + stream.HLSTimeout = time.Millisecond * 100 + buffer := NewHLSBuffer() + grBefore := runtime.NumGoroutine() + stream.WriteHLSPlaylistToStream(m3u8.MediaPlaylist{SeqNo: 100}) + for i := 0; i < 9; i++ { + stream.WriteHLSSegmentToStream(HLSSegment{Name: "test" + string(i), Data: []byte{0}}) + } + + ec := make(chan error, 1) + go func() { ec <- stream.ReadHLSFromStream(buffer) }() + + time.Sleep(time.Millisecond * 100) + if buffer.sq.Count() != 9 { + t.Error("Should have 9 packets in the buffer, but got:", buffer.sq.Count()) + } + + if buffer.plCache.SeqNo != 100 { + t.Error("Should have inserted a playlist with SeqNo of 100") + } + + time.Sleep(time.Millisecond * 100) + grAfter := runtime.NumGoroutine() + if grBefore != grAfter { + t.Errorf("Should have %v Go routines, but have %v", grBefore, grAfter) + } +} + +// type GoodHLSDemux struct{} + +// func (d GoodHLSDemux) WaitAndPopPlaylist(ctx context.Context) (m3u8.MediaPlaylist, error) { +// return m3u8.MediaPlaylist{}, nil +// for i := 0; i < 2; i++ { +// pc <- m3u8.MediaPlaylist{} +// time.Sleep(time.Millisecond * 50) +// } + +// select { +// case <-ctx.Done(): +// return ctx.Err() +// } +// } +// func (d GoodHLSDemux) WaitAndGetSegment(ctx context.Context, name string) ([]byte, error) { +// return nil, nil +// } + +// func (d GoodHLSDemux) PollPlaylist(ctx context.Context, pc chan m3u8.MediaPlaylist) error { +// for i := 0; i < 2; i++ { +// pc <- m3u8.MediaPlaylist{} +// time.Sleep(time.Millisecond * 50) +// } + +// select { +// case <-ctx.Done(): +// return ctx.Err() +// } +// } + +// func (d GoodHLSDemux) PollSegment(ctx context.Context, sc chan m3u8.MediaSegment) error { +// for i := 0; i < 4; i++ { +// sc <- m3u8.MediaSegment{} +// time.Sleep(time.Millisecond * 50) +// } + +// return io.EOF +// } + +// func TestWriteHLS(t *testing.T) { +// stream := NewStream("test") +// numGR := runtime.NumGoroutine() +// ctx, cancel := context.WithCancel(context.Background()) +// err := stream.WriteHLSToStream(ctx, GoodHLSDemux{}) +// cancel() + +// if err != io.EOF { +// t.Error("Expecting EOF, but got:", err) +// } + +// if stream.buffer.len() != 6 { +// t.Error("Expecting 6 packets in buffer, but got:", stream.buffer.len()) +// } + +// time.Sleep(time.Millisecond * 100) +// if numGR != runtime.NumGoroutine() { +// t.Errorf("NumGoroutine not equal. Before:%v, After:%v", numGR, runtime.NumGoroutine()) +// } +// } + +// type TimeoutHLSDemux struct{} + +// func (d TimeoutHLSDemux) PollPlaylist(ctx context.Context, pc chan m3u8.MediaPlaylist) error { +// select { +// case <-ctx.Done(): +// return ctx.Err() +// } +// } + +// func (d TimeoutHLSDemux) PollSegment(ctx context.Context, sc chan m3u8.MediaSegment) error { +// select { +// case <-ctx.Done(): +// return ctx.Err() +// } +// } + +// //This test is more for documentation - this is how timeout works here. +// func TestWriteHLSTimeout(t *testing.T) { +// stream := NewStream("test") +// numGR := runtime.NumGoroutine() +// ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) +// defer cancel() +// err := stream.WriteHLSToStream(ctx, TimeoutHLSDemux{}) + +// if err != context.DeadlineExceeded { +// t.Error("Expecting EOF, but got:", err) +// } + +// if stream.buffer.len() != 0 { +// t.Error("Expecting 0 packets in buffer, but got:", stream.buffer.len()) +// } + +// if numGR != runtime.NumGoroutine() { +// t.Errorf("NumGoroutine not equal. Before:%v, After:%v", numGR, runtime.NumGoroutine()) +// } +// } + +// //Test ReadRTMP Errors +// type FakeStreamBuffer struct { +// c *Counter +// } + +// func (b *FakeStreamBuffer) Push(in interface{}) error { return nil } +// func (b *FakeStreamBuffer) Pop() (interface{}, error) { +// // fmt.Println("pop, count:", b.c.Count) +// switch b.c.Count { +// case 10: +// b.c.Count = b.c.Count - 1 +// // i := &BufferItem{Type: RTMPHeader, Data: []av.CodecData{}} +// // h, _ := Serialize(i) +// // return h, nil +// return []av.CodecData{}, nil +// case 0: +// return nil, ErrBufferEmpty +// default: +// b.c.Count = b.c.Count - 1 +// // i := &BufferItem{Type: RTMPPacket, Data: av.Packet{}} +// // p, _ := Serialize(i) +// // return p, nil +// return av.Packet{}, nil +// } +// } diff --git a/transcoder/external.go b/transcoder/external.go new file mode 100644 index 0000000000..f08ce70f93 --- /dev/null +++ b/transcoder/external.go @@ -0,0 +1,202 @@ +package transcoder + +import ( + "bytes" + "context" + "errors" + "io" + "net/http" + "time" + + "github.com/golang/glog" + "github.com/kz26/m3u8" + "github.com/livepeer/lpms/stream" + "github.com/nareix/joy4/av" + joy4rtmp "github.com/nareix/joy4/format/rtmp" + cmap "github.com/orcaman/concurrent-map" +) + +var ErrTranscoderConnRefused = errors.New("Connection Refused for Local External Transcoder") +var ErrHLSDownloadTimeout = errors.New("HLS Download Timeout") +var ErrUnsupportFormat = errors.New("Unsupported Format") +var ErrNotFound = errors.New("Not Found") + +type ExternalTranscoder struct { + localSRSRTMPPort string + localSRSHTTPPort string + streamID string + downloader HLSDownloader + + //TODO: Keep track of local SRS instance +} + +func New(rtmpPort string, srsHTTPPort string, streamID string) *ExternalTranscoder { + m := cmap.New() + d := SRSHLSDownloader{cache: &m, localEndpoint: "http://localhost:" + srsHTTPPort + "/stream/", streamID: streamID, startDownloadWaitTime: time.Second * 10, hlsIntervalWaitTime: time.Second} + return &ExternalTranscoder{localSRSRTMPPort: rtmpPort, localSRSHTTPPort: srsHTTPPort, streamID: streamID, downloader: d} +} + +func (et *ExternalTranscoder) StartService() { + //Start SRS +} + +//LocalSRSUploadMux Convenience method to get a mux +func (et *ExternalTranscoder) LocalSRSUploadMux() (av.MuxCloser, error) { + url := "rtmp://localhost:" + et.localSRSRTMPPort + "/stream/" + et.streamID + glog.Infof("SRS Upload path: %v", url) + rtmpMux, err := joy4rtmp.Dial(url) + if err != nil { + glog.Errorf("Transcoder RTMP Stream Publish Error: %v. Make sure you have started your local SRS instance correctly.", err) + return nil, err + } + return rtmpMux, nil +} + +//StartUpload takes a io.Stream of RTMP stream, and loads it into a local RTMP endpoint. The streamID will be used as the streaming endpoint. +//So if you want to create a new stream, make sure to do that before passing in the stream. +func (et *ExternalTranscoder) StartUpload(ctx context.Context, rtmpMux av.MuxCloser, src *stream.Stream) error { + upErrC := make(chan error, 1) + + go func() { upErrC <- src.ReadRTMPFromStream(ctx, rtmpMux) }() + + select { + case err := <-upErrC: + return err + case <-ctx.Done(): + return ctx.Err() + } +} + +//StartDownload pushes hls playlists and segments into the stream as they become available from the transcoder. +func (et *ExternalTranscoder) StartDownload(ctx context.Context, hlsMux *stream.Stream) error { + pc := make(chan *m3u8.MediaPlaylist) + sc := make(chan *stream.HLSSegment) + ec := make(chan error) + go func() { ec <- et.downloader.Download(pc, sc) }() + for { + select { + case pl := <-pc: + err := hlsMux.WriteHLSPlaylistToStream(*pl) + if err != nil { + return err + } + case seg := <-sc: + err := hlsMux.WriteHLSSegmentToStream(*seg) + if err != nil { + return err + } + case err := <-ec: + glog.Errorf("HLS Download Error: %v", err) + return err + case <-ctx.Done(): + return ctx.Err() + } + } +} + +//HLSDownloader doesn't take m3u8.MediaSegment because it doesn't contain the actual data +type HLSDownloader interface { + Download(pc chan *m3u8.MediaPlaylist, sc chan *stream.HLSSegment) error +} + +type SRSHLSDownloader struct { + cache *cmap.ConcurrentMap + localEndpoint string + streamID string + startDownloadWaitTime time.Duration + hlsIntervalWaitTime time.Duration +} + +//Download only pushes a playlist onto the channel when there is a new segment in it. +func (d SRSHLSDownloader) Download(pc chan *m3u8.MediaPlaylist, sc chan *stream.HLSSegment) error { + before := time.Now() + plURL := d.localEndpoint + d.streamID + ".m3u8" + glog.Infof("SRS Playlist Download Path: ", plURL) + + for { + pl, errp := DownloadPlaylist(plURL) + if errp == ErrNotFound && time.Since(before) < d.startDownloadWaitTime { //only sleep wait for until the start download time + time.Sleep(time.Second * 5) + continue + } else if errp != nil { + glog.Errorf("Transcoder HLS Playlist Download Error: %v", errp) + return errp + } + + sendpl := false + for _, seginfo := range pl.Segments { + if seginfo == nil { + continue + } + if _, found := d.cache.Get(seginfo.URI); found == false { + seg, errs := DownloadSegment(d.localEndpoint, seginfo) + if errs != nil { + glog.Errorf("Transcoder HLS Segment Download Error: %v", errp) + return errs + } + sc <- &stream.HLSSegment{Name: seginfo.URI, Data: seg} + + d.cache.Set(seginfo.URI, true) + sendpl = true + } + } + + if sendpl { + pc <- pl + } + + time.Sleep(d.hlsIntervalWaitTime) + } +} + +func DownloadSegment(endpoint string, seginfo *m3u8.MediaSegment) ([]byte, error) { + req, err := http.NewRequest("GET", endpoint+seginfo.URI, nil) + if err != nil { + glog.Errorf("Transcoder HLS Segment Download Error: %v", err) + return nil, err + } + client := http.Client{} + + resp, err := client.Do(req) + if err != nil { + glog.Errorf("Transcoder HLS Segment Download Error: %v", err) + return nil, err + } + + buf := new(bytes.Buffer) + _, err = io.Copy(buf, resp.Body) + if err != nil { + glog.Errorf("Segment Download Error: %v", err) + return nil, err + } + + return buf.Bytes(), nil +} + +func DownloadPlaylist(endpointUrl string) (*m3u8.MediaPlaylist, error) { + req, err := http.NewRequest("GET", endpointUrl, nil) + if err != nil { + glog.Errorf("Transcoder HLS Download Error: %v", err) + return nil, err + } + client := http.Client{} + + resp, err := client.Do(req) + if err != nil { + glog.Errorf("Transcoder HLS Download Error: %v", err) + return nil, err + } + + playlist, listType, err := m3u8.DecodeFrom(resp.Body, true) + + if playlist == nil { + return nil, ErrNotFound + } + + if listType == m3u8.MEDIA { + mpl := playlist.(*m3u8.MediaPlaylist) + return mpl, nil + } + + return nil, ErrUnsupportFormat +} diff --git a/transcoder/external_test.go b/transcoder/external_test.go new file mode 100644 index 0000000000..1dedc93364 --- /dev/null +++ b/transcoder/external_test.go @@ -0,0 +1,171 @@ +package transcoder + +import ( + "context" + "io" + "testing" + + "github.com/kz26/m3u8" + "github.com/livepeer/lpms/stream" + "github.com/nareix/joy4/av" +) + +type Counter struct { + Count int +} +type PacketsDemuxer struct { + c *Counter +} + +func (d PacketsDemuxer) Close() error { return nil } +func (d PacketsDemuxer) Streams() ([]av.CodecData, error) { return []av.CodecData{}, nil } +func (d PacketsDemuxer) ReadPacket() (av.Packet, error) { + if d.c.Count == 10 { + return av.Packet{}, io.EOF + } + + d.c.Count = d.c.Count + 1 + return av.Packet{Data: []byte{0, 0}}, nil +} + +type PacketsMuxer struct{ NumWrites int32 } + +func (d *PacketsMuxer) Close() error { return nil } +func (d *PacketsMuxer) WriteHeader([]av.CodecData) error { + d.NumWrites = d.NumWrites + 1 + return nil +} +func (d *PacketsMuxer) WriteTrailer() error { + // fmt.Println("writing Trailer") + d.NumWrites = d.NumWrites + 1 + return nil +} +func (d *PacketsMuxer) WritePacket(av.Packet) error { + // fmt.Println("writing packet") + d.NumWrites = d.NumWrites + 1 + return nil +} + +func TestStartUpload(t *testing.T) { + tr := &ExternalTranscoder{} + mux := &PacketsMuxer{} + demux := &PacketsDemuxer{c: &Counter{}} + stream := stream.NewStream("test") + stream.WriteRTMPToStream(context.Background(), demux) + ctx := context.Background() + + err := tr.StartUpload(ctx, mux, stream) + if err != io.EOF { + t.Error("Should have gotten EOF, but got:", err) + } + + if mux.NumWrites != 12 { + t.Error("Should have written 12 packets. Instead we got:", mux.NumWrites) + } +} + +type Downloader struct{} + +func (d Downloader) Download(pc chan *m3u8.MediaPlaylist, sc chan *stream.HLSSegment) error { + pl := m3u8.MediaPlaylist{} + pc <- &pl + for i := 0; i < 9; i++ { + seg := stream.HLSSegment{} + sc <- &seg + } + return io.EOF +} + +func TestStartDownload(t *testing.T) { + // fmt.Println("Testing Download") + d := Downloader{} + s := stream.NewStream("test") + tr := &ExternalTranscoder{downloader: d} + err := tr.StartDownload(context.Background(), s) + + if err != io.EOF { + t.Error("Expecting EOF, got", err) + } + + if s.Len() != 10 { + t.Error("Expecting 10 packets, got ", s.Len()) + } + +} + +//Be running SRS when doing this integration test +// func TestDownloader(t *testing.T) { +// fmt.Println("Testing Downloader - Integration Test") +// m := cmap.New() +// d := SRSHLSDownloader{cache: &m, localEndpoint: "http://localhost:7936/stream/", streamID: "live.m3u8", startDownloadWaitTime: time.Second, hlsIntervalWaitTime: time.Second * 5} +// pc := make(chan *m3u8.MediaPlaylist) +// sc := make(chan *lpmsio.HLSSegment) +// ec := make(chan error, 1) +// hlsBuffer := lpmsio.NewHLSBuffer() + +// //Do the download into the channel (refer to the end of the method for copying into hlsBuffer) +// go func() { ec <- d.Download(pc, sc) }() + +// //Set up the player +// player := vidplayer.VidPlayer{} +// player.HandleHTTPPlay(func(ctx context.Context, reqPath string, writer io.Writer) error { +// if strings.HasSuffix(reqPath, ".m3u8") { +// fmt.Println("Got m3u8 req:", reqPath) +// pl, err := hlsBuffer.WaitAndGetPlaylist(ctx) +// buf := pl.Encode() +// bytes := buf.Bytes() +// _, werr := writer.Write(bytes) +// if werr != nil { +// fmt.Println("Error Writing m3u8 playlist: ", err) +// } +// return nil + +// } + +// if strings.HasSuffix(reqPath, ".ts") { +// fmt.Println("Got ts req:", reqPath) +// segID := strings.Split(reqPath, "/")[2] +// seg, err := hlsBuffer.WaitAndGetSegment(ctx, segID) +// fmt.Println("Got seg: ", len(seg)) +// if err != nil { +// fmt.Println("Error Writing ts segs: ", err) +// } +// _, werr := writer.Write(seg) +// if werr != nil { +// fmt.Println("Error Writing ts segs: ", err) +// } +// return nil + +// } + +// return errors.New("Unrecognized req string: " + reqPath) +// }) + +// //Get the server running +// go http.ListenAndServe(":8000", nil) + +// //Do the copying into the buffer +// for { +// select { +// case e := <-ec: +// fmt.Println(e) +// return +// case pl := <-pc: +// for _, s := range pl.Segments { +// if s != nil { +// fmt.Println(s) +// } +// } +// fmt.Println("Writing playlist to hlsBuffer") +// hlsBuffer.WritePlaylist(*pl) +// case seg := <-sc: +// if seg.Name != "" { +// fmt.Printf("Writing %v to hlsBuffer\n", seg.Name) +// hlsBuffer.WriteSegment(seg.Name, seg.Data) +// } else { +// fmt.Printf("Skipping writting %v:%v to hlsBuffer\n", seg.Name, len(seg.Data)) +// } +// } +// } + +// } diff --git a/types/types.go b/types/types.go deleted file mode 100644 index 9ef38fd207..0000000000 --- a/types/types.go +++ /dev/null @@ -1,20 +0,0 @@ -package types - -import "time" - -type HlsSegment struct { - Data []byte - Name string -} - -type Download struct { - URI string - TotalDuration time.Duration -} - -type BroadcastReq struct { - formats []string - bitrates []string - codecin string - codecout []string -} diff --git a/vidlistener/listener.go b/vidlistener/listener.go new file mode 100644 index 0000000000..6a59c7a530 --- /dev/null +++ b/vidlistener/listener.go @@ -0,0 +1,53 @@ +package vidlistener + +import ( + "context" + + "github.com/golang/glog" + "github.com/livepeer/lpms/stream" + joy4rtmp "github.com/nareix/joy4/format/rtmp" +) + +type LocalStream struct { + StreamID string + Timestamp int64 +} + +type VidListener struct { + RtmpServer *joy4rtmp.Server +} + +//HandleRTMPPublish writes the published RTMP stream into a stream. It exposes getStreamID so the +//user can name the stream, and getStream so the user can keep track of all the streams. +func (s *VidListener) HandleRTMPPublish( + getStreamID func(reqPath string) (string, error), + getStream func(reqPath string) (*stream.Stream, error), + endStream func(reqPath string)) error { + + s.RtmpServer.HandlePublish = func(conn *joy4rtmp.Conn) { + glog.Infof("RTMP server got upstream") + + streamID, err := getStreamID(conn.URL.Path) + if err != nil { + glog.Errorf("RTMP Stream Publish Error: %v", err) + return + } + + stream, err := getStream(conn.URL.Path) + if err != nil { + glog.Errorf("RTMP Publish couldn't get a destination stream for %v", conn.URL.Path) + return + } + + glog.Infof("Got RTMP Stream: %v", streamID) + c := make(chan error, 0) + go func() { c <- stream.WriteRTMPToStream(context.Background(), conn) }() + select { + case err := <-c: + endStream(conn.URL.Path) + glog.Error("Got error writing RTMP: ", err) + } + + } + return nil +} diff --git a/vidlistener/listener_test.go b/vidlistener/listener_test.go new file mode 100644 index 0000000000..927d04e3b2 --- /dev/null +++ b/vidlistener/listener_test.go @@ -0,0 +1,73 @@ +package vidlistener + +import ( + "os/exec" + "testing" + "time" + + "github.com/livepeer/lpms/stream" + joy4rtmp "github.com/nareix/joy4/format/rtmp" +) + +func TestError(t *testing.T) { + server := &joy4rtmp.Server{Addr: ":1937"} + listener := &VidListener{RtmpServer: server} + listener.HandleRTMPPublish( + func(reqPath string) (string, error) { + return "test", nil + }, + func(reqPath string) (*stream.Stream, error) { + // return errors.New("Some Error") + return &stream.Stream{}, nil + }, + func(reqPath string) {}) + + ffmpegCmd := "ffmpeg" + ffmpegArgs := []string{"-re", "-i", "../data/bunny2.mp4", "-c", "copy", "-f", "flv", "rtmp://localhost:1937/movie/stream"} + go exec.Command(ffmpegCmd, ffmpegArgs...).Run() + + go listener.RtmpServer.ListenAndServe() + + time.Sleep(time.Second * 1) +} + +// Integration test. +// func TestRTMPWithServer(t *testing.T) { +// server := &joy4rtmp.Server{Addr: ":1936"} +// listener := &VidListener{RtmpServer: server} +// listener.HandleRTMPPublish( +// func(reqPath string) (string, error) { +// return "teststream", nil +// }, +// func(reqPath string) (*lpmsio.Stream, error) { +// header, err := demux.Streams() +// if err != nil { +// t.Fatal("Failed ot read stream header") +// } +// fmt.Println("header: ", header) + +// counter := 0 +// fmt.Println("data: ") +// for { +// packet, err := demux.ReadPacket() +// if err != nil { +// t.Fatal("Failed to read packets") +// } +// fmt.Print("\r", len(packet.Data)) +// counter = counter + 1 +// } +// }, +// func(reqPath string) {}) +// ffmpegCmd := "ffmpeg" +// ffmpegArgs := []string{"-re", "-i", "../data/bunny2.mp4", "-c", "copy", "-f", "flv", "rtmp://localhost:1936/movie/stream"} +// go exec.Command(ffmpegCmd, ffmpegArgs...).Run() + +// go listener.RtmpServer.ListenAndServe() + +// time.Sleep(time.Second * 1) +// if stream := listener.Streams["teststream"]; stream.StreamID != "teststream" { +// t.Fatal("Server did not set stream") +// } + +// time.Sleep(time.Second * 1) +// } diff --git a/vidplayer/player.go b/vidplayer/player.go new file mode 100644 index 0000000000..bb05aed3b3 --- /dev/null +++ b/vidplayer/player.go @@ -0,0 +1,88 @@ +package vidplayer + +import ( + "context" + "net/http" + + "strings" + + "github.com/golang/glog" + "github.com/livepeer/lpms/stream" + "github.com/nareix/joy4/av" + joy4rtmp "github.com/nareix/joy4/format/rtmp" +) + +//VidPlayer is the module that handles playing video. For now we only support RTMP and HLS play. +type VidPlayer struct { + RtmpServer *joy4rtmp.Server +} + +//HandleRTMPPlay is the handler when there is a RTMP request for a video. The source should write +//into the MuxCloser. The easiest way is through avutil.Copy. +func (s *VidPlayer) HandleRTMPPlay(getStream func(ctx context.Context, reqPath string, dst av.MuxCloser) error) error { + s.RtmpServer.HandlePlay = func(conn *joy4rtmp.Conn) { + glog.Infof("LPMS got RTMP request @ %v", conn.URL) + + ctx := context.Background() + c := make(chan error, 1) + go func() { c <- getStream(ctx, conn.URL.Path, conn) }() + select { + case err := <-c: + glog.Errorf("Rtmp getStream Error: %v", err) + } + } + return nil +} + +//HandleHLSPlay is the handler when there is a HLA request. The source should write the raw bytes into the io.Writer, +//for either the playlist or the segment. +func (s *VidPlayer) HandleHLSPlay(getHLSBuffer func(reqPath string) (*stream.HLSBuffer, error)) error { + http.HandleFunc("/stream/", func(w http.ResponseWriter, r *http.Request) { + glog.Infof("LPMS got HTTP request @ %v", r.URL.Path) + + if !strings.HasSuffix(r.URL.Path, ".m3u8") && !strings.HasSuffix(r.URL.Path, ".ts") { + http.Error(w, "LPMS only accepts HLS requests over HTTP (m3u8, ts).", 500) + } + + ctx := context.Background() + // c := make(chan error, 1) + // go func() { c <- getStream(ctx, r.URL.Path, w) }() + buffer, err := getHLSBuffer(r.URL.Path) + if err != nil { + glog.Errorf("Error getting HLS Buffer: %v", err) + } + + if strings.HasSuffix(r.URL.Path, ".m3u8") { + pl, err := buffer.WaitAndPopPlaylist(ctx) + if err != nil { + glog.Errorf("Error getting HLS playlist %v: %v", r.URL.Path, err) + return + } + _, err = w.Write(pl.Encode().Bytes()) + if err != nil { + glog.Errorf("Error writting HLS playlist %v: %v", r.URL.Path, err) + return + } + return + } + + if strings.HasSuffix(r.URL.Path, ".ts") { + pathArr := strings.Split(r.URL.Path, "/") + segName := pathArr[len(pathArr)-1] + seg, err := buffer.WaitAndPopSegment(ctx, segName) + if err != nil { + glog.Errorf("Error getting HLS segment %v: %v", segName, err) + return + } + _, err = w.Write(seg) + if err != nil { + glog.Errorf("Error writting HLS segment %v: %v", segName, err) + return + } + return + } + + http.Error(w, "Cannot find HTTP video resource: "+r.URL.Path, 500) + }) + return nil +} diff --git a/vidplayer/player_test.go b/vidplayer/player_test.go new file mode 100644 index 0000000000..38d9447a94 --- /dev/null +++ b/vidplayer/player_test.go @@ -0,0 +1,109 @@ +package vidplayer + +import ( + "context" + "fmt" + "testing" + + "time" + + "github.com/kz26/m3u8" + "github.com/livepeer/lpms/stream" + "github.com/nareix/joy4/av" + "github.com/nareix/joy4/av/avutil" + joy4rtmp "github.com/nareix/joy4/format/rtmp" +) + +func TestRTMP(t *testing.T) { + server := &joy4rtmp.Server{Addr: ":1936"} + player := &VidPlayer{RtmpServer: server} + var demuxer av.Demuxer + gotUpvid := false + gotPlayvid := false + player.RtmpServer.HandlePublish = func(conn *joy4rtmp.Conn) { + gotUpvid = true + demuxer = conn + } + + player.HandleRTMPPlay(func(ctx context.Context, reqPath string, dst av.MuxCloser) error { + gotPlayvid = true + fmt.Println(reqPath) + avutil.CopyFile(dst, demuxer) + return nil + }) + + // go server.ListenAndServe() + + // ffmpegCmd := "ffmpeg" + // ffmpegArgs := []string{"-re", "-i", "../data/bunny2.mp4", "-c", "copy", "-f", "flv", "rtmp://localhost:1936/movie/stream"} + // go exec.Command(ffmpegCmd, ffmpegArgs...).Run() + + // time.Sleep(time.Second * 1) + + // if gotUpvid == false { + // t.Fatal("Didn't get the upstream video") + // } + + // ffplayCmd := "ffplay" + // ffplayArgs := []string{"rtmp://localhost:1936/movie/stream"} + // go exec.Command(ffplayCmd, ffplayArgs...).Run() + + // time.Sleep(time.Second * 1) + // if gotPlayvid == false { + // t.Fatal("Didn't get the downstream video") + // } +} + +func TestHLS(t *testing.T) { + player := &VidPlayer{} + s := stream.NewStream("test") + s.HLSTimeout = time.Second * 5 + //Write some packets into the stream + s.WriteHLSPlaylistToStream(m3u8.MediaPlaylist{}) + s.WriteHLSSegmentToStream(stream.HLSSegment{}) + var buffer *stream.HLSBuffer + player.HandleHLSPlay(func(reqPath string) (*stream.HLSBuffer, error) { + //if can't find local cache, start downloading, and store in cache. + if buffer == nil { + buffer := stream.NewHLSBuffer() + ec := make(chan error, 1) + go func() { ec <- s.ReadHLSFromStream(buffer) }() + // select { + // case err := <-ec: + // return err + // } + } + return buffer, nil + + // if strings.HasSuffix(reqPath, ".m3u8") { + // pl, err := buffer.WaitAndPopPlaylist(ctx) + // if err != nil { + // return nil, err + // } + // _, err = writer.Write(pl.Encode().Bytes()) + // if err != nil { + // return nil, err + // } + // return nil, nil + // } + + // if strings.HasSuffix(reqPath, ".ts") { + // pathArr := strings.Split(reqPath, "/") + // segName := pathArr[len(pathArr)-1] + // seg, err := buffer.WaitAndPopSegment(ctx, segName) + // if err != nil { + // return nil, err + // } + // _, err = writer.Write(seg) + // if err != nil { + // return nil, err + // } + // } + + // return nil, lpmsio.ErrNotFound + }) + + // go http.ListenAndServe(":8000", nil) + + //TODO: Add tests for checking if packets were written, etc. +}