Merge pull request #11 from livepeer/video_segment

Video segment
This commit is contained in:
Doug Petkanics
2017-04-07 23:44:59 -04:00
committed by GitHub
14 changed files with 941 additions and 23 deletions
+21
View File
@@ -0,0 +1,21 @@
listen 2435;
max_connections 200;
daemon off;
srs_log_tank console;
http_server {
enabled on;
listen 7935;
dir ./objs/nginx/html;
}
vhost __defaultVhost__ {
hls {
enabled on;
hls_fragment 3;
hls_window 30;
hls_path ./objs/nginx/html;
hls_m3u8_file [app]/[stream].m3u8;
hls_ts_file [app]/[stream]-[seq].ts;
}
}
+344
View File
@@ -0,0 +1,344 @@
package main
import (
"context"
"flag"
"io"
"strings"
"time"
"github.com/golang/glog"
"github.com/kz26/m3u8"
"github.com/livepeer/lpms"
"github.com/livepeer/lpms/stream"
"github.com/nareix/joy4/av"
)
//This is basically a test for segmenting videos. After much research, the current best approach is to use the segment function inside FFMpeg.
//However, this test serves as a document so we can revisit later.
var StagedStream SegmentStream
var transcodeCount int
type StreamDB struct {
db map[string]stream.Stream
}
type Segment struct {
availSpace time.Duration
packets []av.Packet
}
func NewSegment() *Segment {
return &Segment{time.Second * 2, make([]av.Packet, 0, 100)}
}
type SegmentStream struct {
StreamID string
Headers []av.CodecData
Segments []*Segment
RTMPTimeout time.Duration
}
func NewSegmentStream(streamID string) *SegmentStream {
return &SegmentStream{streamID, nil, make([]*Segment, 0, 100), time.Second * 10}
}
func (s *SegmentStream) GetStreamID() string {
return s.StreamID
}
func (s *SegmentStream) Len() int64 {
return int64(len(s.Segments))
}
func (s *SegmentStream) ReadRTMPFromStream(ctx context.Context, dst av.MuxCloser) error {
defer dst.Close()
dst.WriteHeader(s.Headers)
transcodeCount = transcodeCount + 1
if transcodeCount%2 == 0 {
glog.Infof("Writing seg 2")
for _, p := range s.Segments[2].packets {
err := dst.WritePacket(p)
time.Sleep(time.Millisecond * 5)
if err != nil {
glog.Infof("Error writing RTMP packet from Stream %v to mux", s.StreamID)
return err
}
}
glog.Infof("Writing seg 3")
for _, p := range s.Segments[3].packets {
err := dst.WritePacket(p)
time.Sleep(time.Millisecond * 5)
if err != nil {
glog.Infof("Error writing RTMP packet from Stream %v to mux", s.StreamID)
dst.WriteTrailer()
return err
}
}
glog.Infof("Writing seg 6")
for _, p := range s.Segments[6].packets {
err := dst.WritePacket(p)
time.Sleep(time.Millisecond * 5)
if err != nil {
glog.Infof("Error writing RTMP packet from Stream %v to mux", s.StreamID)
return err
}
}
glog.Infof("Writing seg 7")
for _, p := range s.Segments[7].packets {
err := dst.WritePacket(p)
time.Sleep(time.Millisecond * 5)
if err != nil {
glog.Infof("Error writing RTMP packet from Stream %v to mux", s.StreamID)
dst.WriteTrailer()
return err
}
}
} else {
// s.Segments = s.Segments[0:len(s.Segments)]
for i, seg := range s.Segments {
glog.Infof("Writing seg %v", i)
glog.Infof("Packet Keyframe: %v, %v", seg.packets[0].IsKeyFrame, len(seg.packets[0].Data))
for _, p := range seg.packets {
// glog.Infof("%v", j)
err := dst.WritePacket(p)
if err != nil {
glog.Infof("Error writing RTMP packet from Stream %v to mux", s.StreamID)
dst.WriteTrailer()
return err
}
}
time.Sleep(time.Second * 1)
}
// for {
// packet := s.GetPacket()
// if len(packet.Data) == 0 {
// glog.Info("Reached the end...")
// dst.WriteTrailer()
// return io.EOF
// }
// err := dst.WritePacket(packet)
// if err != nil {
// glog.Infof("Error writing RTMP packet from Stream %v to mux", s.StreamID)
// return err
// }
// }
}
return nil
}
func (s *SegmentStream) WriteRTMPToStream(ctx context.Context, src av.DemuxCloser) error {
defer src.Close()
c := make(chan error, 1)
go func() {
c <- func() error {
header, err := src.Streams()
if err != nil {
return err
}
s.Headers = header
// packet, err := src.ReadPacket()
// tag, _ := flv.PacketToTag(packet, header[packet.Idx])
// glog.Infof("Tag Type: %v", tag.Type)
// glog.Infof("Tag: %v", tag)
// var lastKeyframe av.Packet
for {
packet, err := src.ReadPacket()
if packet.IsKeyFrame {
glog.Infof("IsKeyFrame: %v\n", packet.IsKeyFrame)
// glog.Infof("Composition Time: %v\n", packet.CompositionTime)
glog.Infof("Time: %v\n", packet.Time)
}
if err == io.EOF {
StagedStream = *s
glog.Infof("Segments Len: %v", len(s.Segments))
for i, seg := range s.Segments {
glog.Infof("seg[%v] Len: %v", i, len(seg.packets))
}
// glog.Infof("%v", s.Segments[0].packets[0])
// glog.Infof("%v", s.Segments[1].packets[0])
// s.buffer.push(RTMPEOF{})
return err
} else if err != nil {
return err
} else if len(packet.Data) == 0 { //TODO: Investigate if it's possible for packet to be nil (what happens when RTMP stopped publishing because of a dropped connection? Is it possible to have err and packet both nil?)
return stream.ErrDroppedRTMPStream
}
insertPacket(s, packet)
// err = s.buffer.push(packet)
// if err == ErrBufferFull {
//TODO: Delete all packets until last keyframe, insert headers in front - trying to get rid of streaming artifacts.
// }
}
}()
}()
select {
case <-ctx.Done():
glog.Infof("Finished writing RTMP to Stream %v", s.StreamID)
return ctx.Err()
case err := <-c:
return err
}
}
func insertPacket(s *SegmentStream, pkt av.Packet) {
var lastSeg *Segment
if s.Len() == 0 || pkt.IsKeyFrame {
lastSeg = NewSegment()
s.Segments = append(s.Segments, lastSeg)
} else {
// glog.Infof("seg length: %v", s.Len())
lastSeg = s.Segments[s.Len()-1]
// if lastSeg.availSpace == 0 {
// lastSeg = NewSegment()
// s.Segments = append(s.Segments, lastSeg)
// }
}
// glog.Infof("Appending packet: %v to %v", len(pkt.Data), lastSeg.availSpace)
lastSeg.packets = append(lastSeg.packets, pkt)
lastSeg.availSpace = lastSeg.availSpace - pkt.CompositionTime
}
func (s *SegmentStream) GetPacket() (pkt av.Packet) {
if len(s.Segments) == 0 {
pkt = av.Packet{}
return
}
seg := s.Segments[0]
if len(seg.packets) > 1 {
pkt, seg.packets = seg.packets[0], seg.packets[1:len(seg.packets)]
} else {
glog.Infof("Seg len %v", len(seg.packets))
pkt = seg.packets[0]
if len(s.Segments) > 1 {
s.Segments = s.Segments[1:len(s.Segments)]
} else {
s.Segments = []*Segment{}
}
}
return
}
func (s *SegmentStream) WriteHLSPlaylistToStream(pl m3u8.MediaPlaylist) error {
glog.Infof("Writing HLS Playlist")
return nil
}
func (s *SegmentStream) WriteHLSSegmentToStream(seg stream.HLSSegment) error {
glog.Infof("Writing HLS Segment")
return nil
}
func (s *SegmentStream) ReadHLSFromStream(buffer stream.HLSMuxer) error {
glog.Info("Reading HLS")
return nil
}
func main() {
flag.Set("logtostderr", "true")
flag.Parse()
lpms := lpms.New("1935", "8000", "2435", "7935")
streamDB := &StreamDB{db: make(map[string]stream.Stream)}
lpms.HandleRTMPPublish(
//getStreamID
func(reqPath string) (string, error) {
return getStreamIDFromPath(reqPath), nil
},
//getStream
func(reqPath string) (stream.Stream, stream.Stream, error) {
streamID := getStreamIDFromPath(reqPath)
stream1 := NewSegmentStream(streamID)
stream2 := NewSegmentStream(streamID)
streamDB.db[streamID] = stream1
return stream1, stream2, nil
},
//finishStream
func(reqPath string) {
// streamID := getStreamIDFromPath(reqPath)
// delete(streamDB.db, streamID)
// tranStreamID := streamID + "_tran"
// delete(streamDB.db, tranStreamID)
})
lpms.HandleRTMPPlay(
//getStream
func(ctx context.Context, reqPath string, dst av.MuxCloser) error {
glog.Infof("Got req: ", reqPath)
// streamID := getStreamIDFromPath(reqPath)
// src := streamDB.db[streamID]
src := copyStream(&StagedStream)
// if src != nil {
src.ReadRTMPFromStream(ctx, dst)
// } else {
// glog.Error("Cannot find stream for ", streamID)
// return stream.ErrNotFound
// }
return nil
})
lpms.HandleTranscode(
//getInStream
func(ctx context.Context, streamID string) (stream.Stream, error) {
s := copyStream(&StagedStream)
return s, nil
},
//getOutStream
func(ctx context.Context, streamID string) (stream.Stream, error) {
//For this example, we'll name the transcoded stream "{streamID}_tran"
// newStream := stream.NewVideoStream(streamID + "_tran")
// streamDB.db[newStream.GetStreamID()] = newStream
// return newStream, nil
glog.Infof("Making File Stream")
fileStream := stream.NewFileStream(streamID + "_file")
return fileStream, nil
})
lpms.Start()
}
func getStreamIDFromPath(reqPath string) string {
return "test"
}
func getHLSStreamIDFromPath(reqPath string) string {
if strings.HasSuffix(reqPath, ".m3u8") {
return "test_tran"
} else {
return "test_tran"
}
}
func copyStream(s *SegmentStream) *SegmentStream {
c := &SegmentStream{Headers: s.Headers, Segments: make([]*Segment, len(s.Segments))}
for i := range c.Segments {
seg := &Segment{packets: make([]av.Packet, len(s.Segments[i].packets))}
seg.packets = s.Segments[i].packets
c.Segments[i] = seg
}
return c
}
+60
View File
@@ -0,0 +1,60 @@
package main
import (
"context"
"io"
"testing"
"github.com/nareix/joy4/av"
)
type TestDemuxer struct{}
var count int
func (d TestDemuxer) Close() error { return nil }
func (d TestDemuxer) Streams() ([]av.CodecData, error) { return nil, nil }
func (d TestDemuxer) ReadPacket() (av.Packet, error) {
if count < 207 {
isKeyframe := false
if count == 3 || count == 53 || count == 140 || count == 185 {
isKeyframe = true
}
count = count + 1
return av.Packet{IsKeyFrame: isKeyframe, Data: []byte{0, 0}}, nil
}
return av.Packet{}, io.EOF
}
type TestMuxer struct{}
var Header []av.CodecData
var Packets []av.Packet
func (d TestMuxer) Close() error { return nil }
func (d TestMuxer) WriteHeader(h []av.CodecData) error {
Header = h
return nil
}
func (d TestMuxer) WriteTrailer() error { return nil }
func (d TestMuxer) WritePacket(p av.Packet) error {
Packets = append(Packets, p)
return nil
}
func TestSegmentStream(t *testing.T) {
// fmt.Printf("Testing Segment Stream")
s := NewSegmentStream("test")
s.WriteRTMPToStream(context.Background(), &TestDemuxer{})
if len(s.Segments) != 5 {
t.Errorf("Expecting 5 segments, got %v", len(s.Segments))
}
// glog.Infof("Done Inserting")
Packets = make([]av.Packet, 0, 207)
s.ReadRTMPFromStream(context.Background(), &TestMuxer{})
if len(Packets) != 207 {
t.Errorf("Expecting 207 packets, got %v", len(Packets))
}
}
+5 -4
View File
@@ -35,11 +35,12 @@ func main() {
return getStreamIDFromPath(reqPath), nil
},
//getStream
func(reqPath string) (stream.Stream, error) {
func(reqPath string) (stream.Stream, stream.Stream, error) {
streamID := getStreamIDFromPath(reqPath)
stream := stream.NewVideoStream(streamID)
streamDB.db[streamID] = stream
return stream, nil
stream1 := stream.NewVideoStream(streamID)
stream2 := stream.NewVideoStream(streamID)
streamDB.db[streamID] = stream1
return stream1, stream2, nil
},
//finishStream
func(reqPath string) {
+2 -2
View File
@@ -65,10 +65,10 @@ func (l *LPMS) Start() error {
//HandleRTMPPublish offload to the video listener
func (l *LPMS) HandleRTMPPublish(
getStreamID func(reqPath string) (string, error),
stream func(reqPath string) (stream.Stream, error),
getStream func(reqPath string) (stream.Stream, stream.Stream, error),
endStream func(reqPath string)) error {
return l.vidListen.HandleRTMPPublish(getStreamID, stream, endStream)
return l.vidListen.HandleRTMPPublish(getStreamID, getStream, endStream)
}
//HandleRTMPPlay offload to the video player
BIN
View File
Binary file not shown.
+223
View File
@@ -0,0 +1,223 @@
package segmenter
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"os"
"os/exec"
"strconv"
"time"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/kz26/m3u8"
"github.com/nareix/joy4/av"
"github.com/nareix/joy4/format/rtmp"
)
type VideoFormat uint32
var (
HLS = MakeVideoFormatType(avFormatTypeMagic + 1)
RTMP = MakeVideoFormatType(avFormatTypeMagic + 1)
)
func MakeVideoFormatType(base uint32) (c VideoFormat) {
c = VideoFormat(base) << videoFormatOtherBits
return
}
const avFormatTypeMagic = 577777
const videoFormatOtherBits = 1
type SegmenterOptions struct {
EnforceKeyframe bool //Enforce each segment starts with a keyframe
SegLength time.Duration
}
type VideoSegment struct {
Codec av.CodecType
Format VideoFormat
Length time.Duration
Data []byte
Name string
}
type VideoPlaylist struct {
Format VideoFormat
// Data []byte
Data *m3u8.MediaPlaylist
}
type VideoSegmenter interface{}
//FFMpegVideoSegmenter segments a RTMP stream by invoking FFMpeg and monitoring the file system.
type FFMpegVideoSegmenter struct {
WorkDir string
LocalRtmpUrl string
StrmID string
curSegment int
curPlaylist *m3u8.MediaPlaylist
curWaitTime time.Duration
SegLen time.Duration
}
func NewFFMpegVideoSegmenter(workDir string, strmID string, localRtmpUrl string, segLen time.Duration) *FFMpegVideoSegmenter {
return &FFMpegVideoSegmenter{WorkDir: workDir, StrmID: strmID, LocalRtmpUrl: localRtmpUrl, SegLen: segLen}
}
//RTMPToHLS invokes the FFMpeg command to do the segmenting. This method blocks unless killed.
func (s *FFMpegVideoSegmenter) RTMPToHLS(ctx context.Context, opt SegmenterOptions) error {
//Set up local workdir
if _, err := os.Stat(s.WorkDir); os.IsNotExist(err) {
err := os.Mkdir(s.WorkDir, 0700)
if err != nil {
return err
}
}
//Test to make sure local RTMP is running.
rtmpMux, err := rtmp.Dial(s.LocalRtmpUrl)
if err != nil {
glog.Errorf("Video Segmenter Error: %v. Make sure local RTMP stream is available for segmenter.", err)
rtmpMux.Close()
return err
}
rtmpMux.Close()
//Invoke the FFMpeg command
// fmt.Println("ffmpeg", "-i", fmt.Sprintf("rtmp://localhost:%v/stream/%v", "1935", "test"), "-vcodec", "copy", "-acodec", "copy", "-bsf:v", "h264_mp4toannexb", "-f", "segment", "-muxdelay", "0", "-segment_list", "./tmp/stream.m3u8", "./tmp/stream_%d.ts")
plfn := fmt.Sprintf("%s/%s.m3u8", s.WorkDir, s.StrmID)
tsfn := s.WorkDir + "/" + s.StrmID + "_%d.ts"
//This command needs to be manually killed, because ffmpeg doesn't seem to quit after getting a rtmp EOF
cmd := exec.Command("ffmpeg", "-i", s.LocalRtmpUrl, "-vcodec", "copy", "-acodec", "copy", "-bsf:v", "h264_mp4toannexb", "-f", "segment", "-muxdelay", "0", "-segment_list", plfn, tsfn)
err = cmd.Start()
if err != nil {
glog.Errorf("Cannot start ffmpeg command.")
return err
}
ec := make(chan error, 1)
go func() { ec <- cmd.Wait() }()
select {
case ffmpege := <-ec:
glog.Errorf("Error from ffmpeg: %v", ffmpege)
return ffmpege
case <-ctx.Done():
//Can't close RTMP server, joy4 doesn't support it.
//server.Stop()
cmd.Process.Kill()
return ctx.Err()
}
}
//PollSegment monitors the filesystem and returns a new segment as it becomes available
func (s *FFMpegVideoSegmenter) PollSegment(ctx context.Context) (*VideoSegment, error) {
var length time.Duration
tsfn := s.WorkDir + "/" + s.StrmID + "_" + strconv.Itoa(s.curSegment) + ".ts"
seg, err := pollSegment(ctx, tsfn, time.Millisecond*100, s.SegLen)
if err != nil {
return nil, err
}
name := s.StrmID + "_" + strconv.Itoa(s.curSegment) + ".ts"
if s.curPlaylist != nil && s.curPlaylist.Segments[s.curSegment] != nil {
//This is ridiculous - but it's how we can round floats in Go
sec, _ := strconv.Atoi(fmt.Sprintf("%.0f", s.curPlaylist.Segments[s.curSegment].Duration))
length = time.Duration(sec) * 1000 * time.Millisecond
}
s.curSegment = s.curSegment + 1
glog.Infof("Segment: %v, len:%v", name, len(seg))
return &VideoSegment{Codec: av.H264, Format: HLS, Length: length, Data: seg, Name: name}, err
}
//PollPlaylist monitors the filesystem and returns a new playlist as it becomes available
func (s *FFMpegVideoSegmenter) PollPlaylist(ctx context.Context) (*VideoPlaylist, error) {
plfn := fmt.Sprintf("%s/%s.m3u8", s.WorkDir, s.StrmID)
var lastPl []byte
if s.curPlaylist == nil {
lastPl = nil
} else {
lastPl = s.curPlaylist.Encode().Bytes()
}
pl, err := pollPlaylist(ctx, plfn, time.Millisecond*100, lastPl)
if err != nil {
return nil, err
}
p, err := m3u8.NewMediaPlaylist(50000, 50000)
err = p.DecodeFrom(bytes.NewReader(pl), true)
if err != nil {
return nil, err
}
s.curPlaylist = p
return &VideoPlaylist{Format: HLS, Data: p}, err
}
func pollPlaylist(ctx context.Context, fn string, sleepTime time.Duration, lastFile []byte) (f []byte, err error) {
for {
if _, err := os.Stat(fn); err == nil {
if err != nil {
return nil, err
}
content, err := ioutil.ReadFile(fn)
if err != nil {
return nil, err
}
//The m3u8 package has some bugs, so the translation isn't 100% correct...
p, err := m3u8.NewMediaPlaylist(50000, 50000)
err = p.DecodeFrom(bytes.NewReader(content), true)
if err != nil {
return nil, err
}
curFile := p.Encode().Bytes()
// fmt.Printf("p.Segments: %v\n", p.Segments[0])
// fmt.Printf("lf: %s \ncf: %s \ncomp:%v\n\n", lastFile, curFile, bytes.Compare(lastFile, curFile))
if lastFile == nil || bytes.Compare(lastFile, curFile) != 0 {
return content, nil
}
}
select {
case <-ctx.Done():
fmt.Println("ctx.Done()!!!")
return nil, ctx.Err()
default:
}
time.Sleep(sleepTime)
}
}
func pollSegment(ctx context.Context, fn string, sleepTime time.Duration, segLen time.Duration) (f []byte, err error) {
for {
if _, err := os.Stat(fn); err == nil {
// fmt.Printf("FileName: %v, FileSize: %v \n\n", fn, info.Size())
time.Sleep(segLen)
// fmt.Printf("FileName: %v, FileSize: %v \n\n", fn, info.Size())
content, err := ioutil.ReadFile(fn)
if err != nil {
return nil, err
}
return content, err
}
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
time.Sleep(sleepTime)
}
}
+203
View File
@@ -0,0 +1,203 @@
package segmenter
import (
"context"
"fmt"
"io"
"os"
"testing"
"time"
"strconv"
"io/ioutil"
"github.com/kz26/m3u8"
"github.com/livepeer/lpms/stream"
"github.com/livepeer/lpms/vidplayer"
"github.com/nareix/joy4/av"
"github.com/nareix/joy4/av/avutil"
"github.com/nareix/joy4/format"
"github.com/nareix/joy4/format/rtmp"
)
type TestStream struct{}
func (s *TestStream) GetStreamID() string { return "test" }
func (s *TestStream) Len() int64 { return 0 }
func (s *TestStream) ReadRTMPFromStream(ctx context.Context, dst av.MuxCloser) error {
format.RegisterAll()
wd, _ := os.Getwd()
file, err := avutil.Open(wd + "/test.flv")
if err != nil {
fmt.Println("Error opening file: ", err)
return err
}
header, _ := file.Streams()
dst.WriteHeader(header)
for {
pkt, err := file.ReadPacket()
if err == io.EOF {
dst.WriteTrailer()
return err
}
dst.WritePacket(pkt)
}
}
func (s *TestStream) WriteRTMPToStream(ctx context.Context, src av.DemuxCloser) error { return nil }
func (s *TestStream) WriteHLSPlaylistToStream(pl m3u8.MediaPlaylist) error { return nil }
func (s *TestStream) WriteHLSSegmentToStream(seg stream.HLSSegment) error { return nil }
func (s *TestStream) ReadHLSFromStream(buffer stream.HLSMuxer) error { return nil }
func TestSegmenter(t *testing.T) {
wd, _ := os.Getwd()
workDir := wd + "/tmp"
os.RemoveAll(workDir)
strm := &TestStream{}
url := fmt.Sprintf("rtmp://localhost:%v/stream/%v", "1935", strm.GetStreamID())
vs := NewFFMpegVideoSegmenter(workDir, strm.GetStreamID(), url, time.Millisecond*10)
// server := New("1935", "", "", "")
server := &rtmp.Server{Addr: ":1935"}
player := vidplayer.VidPlayer{RtmpServer: server}
player.HandleRTMPPlay(
func(ctx context.Context, reqPath string, dst av.MuxCloser) error {
return strm.ReadRTMPFromStream(ctx, dst)
})
go player.RtmpServer.ListenAndServe()
se := make(chan error, 1)
opt := SegmenterOptions{}
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*200)
defer cancel()
go func() { se <- func() error { return vs.RTMPToHLS(ctx, opt) }() }()
select {
case err := <-se:
if err != context.DeadlineExceeded {
t.Errorf("Should exceed deadline (since it's not a real stream, ffmpeg should finish instantly). But instead got: %v", err)
}
}
pl, err := vs.PollPlaylist(ctx)
if err != nil {
t.Errorf("Got error: %v", err)
}
if pl.Format != HLS {
t.Errorf("Expecting HLS Playlist, got %v", pl.Format)
}
// p, err := m3u8.NewMediaPlaylist(100, 100)
// err = p.DecodeFrom(bytes.NewReader(pl.Data), true)
// if err != nil {
// t.Errorf("Error decoding HLS playlist: %v", err)
// }
if vs.curSegment != 0 {
t.Errorf("Segment counter should start with 0. But got: %v", vs.curSegment)
}
for i := 0; i < 4; i++ {
seg, err := vs.PollSegment(ctx)
if vs.curSegment != i+1 {
t.Errorf("Segment counter should move to 1. But got: %v", vs.curSegment)
}
if err != nil {
t.Errorf("Got error: %v", err)
}
if seg.Codec != av.H264 {
t.Errorf("Expecting H264 segment, got: %v", seg.Codec)
}
if seg.Format != HLS {
t.Errorf("Expecting HLS segment, got %v", seg.Format)
}
if seg.Length != time.Second*2 {
t.Errorf("Expecting 2 sec segments, got %v", seg.Length)
}
fn := "test_" + strconv.Itoa(i) + ".ts"
if seg.Name != fn {
t.Errorf("Expecting %v, got %v", fn, seg.Name)
}
segLen := len(seg.Data)
if segLen < 20000 {
t.Errorf("File size is too small: %v", segLen)
}
}
newPl := `#EXTM3U
#EXT-X-VERSION:3
#EXT-X-MEDIA-SEQUENCE:0
#EXT-X-ALLOW-CACHE:YES
#EXT-X-TARGETDURATION:7
#EXTINF:2.066000,
test_0.ts
#EXTINF:1.999000,
test_1.ts
#EXTINF:1.999000,
test_2.ts
#EXTINF:1.999000,
test_3.ts
#EXTINF:1.999000,
test_4.ts
#EXTINF:1.999000,
test_5.ts
#EXTINF:1.999000,
test_6.ts
`
// bf, _ := ioutil.ReadFile(workDir + "/test.m3u8")
// fmt.Printf("bf:%s\n", bf)
ioutil.WriteFile(workDir+"/test.m3u8", []byte(newPl), os.ModeAppend)
// af, _ := ioutil.ReadFile(workDir + "/test.m3u8")
// fmt.Printf("af:%s\n", af)
// fmt.Println("before:%v", pl.Data.Segments[0:10])
pl, err = vs.PollPlaylist(ctx)
if err != nil {
t.Errorf("Got error polling playlist: %v", err)
}
// fmt.Println("after:%v", pl.Data.Segments[0:10])
// segLen := len(pl.Data.Segments)
// if segLen != 4 {
// t.Errorf("Seglen should be 4. Got: %v", segLen)
// }
ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*400)
defer cancel()
pl, err = vs.PollPlaylist(ctx)
if err == nil {
t.Errorf("Expecting timeout error...")
}
//Clean up
os.RemoveAll(workDir)
}
func TestPollPlaylistError(t *testing.T) {
vs := NewFFMpegVideoSegmenter("./sometestdir", "test", "", time.Millisecond*100)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel()
_, err := vs.PollPlaylist(ctx)
if err != context.DeadlineExceeded {
t.Errorf("Expect to exceed deadline, but got: %v", err)
}
}
func TestPollSegmentError(t *testing.T) {
vs := NewFFMpegVideoSegmenter("./sometestdir", "test", "", time.Millisecond*10)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel()
_, err := vs.PollSegment(ctx)
if err != context.DeadlineExceeded {
t.Errorf("Expect to exceed deadline, but got: %v", err)
}
}
+1 -2
View File
@@ -6,7 +6,6 @@ import (
"sync"
"time"
"github.com/golang/glog"
"github.com/kz26/m3u8"
)
@@ -77,7 +76,7 @@ func (b *HLSBuffer) WaitAndPopPlaylist(ctx context.Context) (m3u8.MediaPlaylist,
func (b *HLSBuffer) WaitAndPopSegment(ctx context.Context, name string) ([]byte, error) {
for {
seg, found := b.sq.Get(name)
glog.Infof("GetSegment: %v, %v", name, found)
// glog.Infof("GetSegment: %v, %v", name, found)
if found {
b.sq.Remove(name)
return seg.([]byte), nil
+3 -1
View File
@@ -5,6 +5,7 @@ import (
"errors"
"io"
"reflect"
"runtime/debug"
"time"
@@ -114,7 +115,7 @@ func (s *VideoStream) ReadRTMPFromStream(ctx context.Context, dst av.MuxCloser)
packet := item.(av.Packet)
err = dst.WritePacket(packet)
if err != nil {
glog.Infof("Error writing RTMP packet from Stream %v to mux", s.StreamID)
glog.Infof("Error writing RTMP packet from Stream %v to mux: %v", s.StreamID, err)
return err
}
case RTMPEOF:
@@ -126,6 +127,7 @@ func (s *VideoStream) ReadRTMPFromStream(ctx context.Context, dst av.MuxCloser)
return io.EOF
default:
glog.Infof("Cannot recognize buffer iteam type: ", reflect.TypeOf(item))
debug.PrintStack()
return ErrBufferItemType
}
}
+70 -12
View File
@@ -2,12 +2,17 @@ package vidlistener
import (
"context"
"os"
"time"
"github.com/golang/glog"
"github.com/livepeer/lpms/segmenter"
"github.com/livepeer/lpms/stream"
joy4rtmp "github.com/nareix/joy4/format/rtmp"
)
var segOptions = segmenter.SegmenterOptions{SegLength: time.Second * 2}
type LocalStream struct {
StreamID string
Timestamp int64
@@ -17,38 +22,91 @@ type VidListener struct {
RtmpServer *joy4rtmp.Server
}
//HandleRTMPPublish writes the published RTMP stream into a stream. It exposes getStreamID so the
//user can name the stream, and getStream so the user can keep track of all the streams.
func (s *VidListener) HandleRTMPPublish(
//HandleRTMPPublish immediately turns the RTMP stream into segmented HLS, and writes it into a stream.
//It exposes getStreamID so the user can name the stream, and getStream so the user can keep track of all the streams.
func (self *VidListener) HandleRTMPPublish(
getStreamID func(reqPath string) (string, error),
getStream func(reqPath string) (stream.Stream, error),
getStream func(reqPath string) (stream.Stream, stream.Stream, error),
endStream func(reqPath string)) error {
s.RtmpServer.HandlePublish = func(conn *joy4rtmp.Conn) {
self.RtmpServer.HandlePublish = func(conn *joy4rtmp.Conn) {
glog.Infof("RTMP server got upstream")
streamID, err := getStreamID(conn.URL.Path)
_, err := getStreamID(conn.URL.Path)
if err != nil {
glog.Errorf("RTMP Stream Publish Error: %v", err)
return
}
stream, err := getStream(conn.URL.Path)
rs, hs, err := getStream(conn.URL.Path)
if err != nil {
glog.Errorf("RTMP Publish couldn't get a destination stream for %v", conn.URL.Path)
return
}
glog.Infof("Got RTMP Stream: %v", streamID)
c := make(chan error, 0)
go func() { c <- stream.WriteRTMPToStream(context.Background(), conn) }()
glog.Infof("Got RTMP Stream: %v", rs.GetStreamID())
cew := make(chan error, 0)
cs := make(chan error, 0)
ctx, cancel := context.WithCancel(context.Background())
glog.Infof("Writing RTMP to stream")
go func() { cew <- rs.WriteRTMPToStream(ctx, conn) }()
go func() { cs <- self.segmentStream(ctx, rs, hs) }()
select {
case err := <-c:
case err := <-cew:
endStream(conn.URL.Path)
glog.Infof("Final stream length: %v", stream.Len())
glog.Infof("Final stream length: %v", rs.Len())
glog.Error("Got error writing RTMP: ", err)
cancel()
case err := <-cs:
glog.Errorf("Error segmenting, %v", err)
cancel()
}
}
return nil
}
func (self *VidListener) segmentStream(ctx context.Context, rs stream.Stream, hs stream.Stream) error {
// //Invoke Segmenter
workDir, _ := os.Getwd()
workDir = workDir + "/tmp"
localRtmpUrl := "rtmp://localhost" + self.RtmpServer.Addr + "/stream/" + rs.GetStreamID()
s := segmenter.NewFFMpegVideoSegmenter(workDir, rs.GetStreamID(), localRtmpUrl, segOptions.SegLength)
c := make(chan error, 1)
go func() { c <- s.RTMPToHLS(ctx, segOptions) }()
go func() {
c <- func() error {
for {
pl, err := s.PollPlaylist(ctx)
if err != nil {
glog.Errorf("Got error polling playlist: %v", err)
return err
}
// glog.Infof("Writing pl: %v", pl)
hs.WriteHLSPlaylistToStream(*pl.Data)
}
}()
}()
go func() {
c <- func() error {
for {
seg, err := s.PollSegment(ctx)
if err != nil {
return err
}
ss := stream.HLSSegment{Data: seg.Data, Name: seg.Name}
glog.Infof("Writing stream: %v, len:%v", ss.Name, len(seg.Data))
hs.WriteHLSSegmentToStream(ss)
}
}()
}()
select {
case err := <-c:
glog.Errorf("Error segmenting stream: %v", err)
return err
}
}
+2 -2
View File
@@ -17,9 +17,9 @@ func TestListener(t *testing.T) {
func(reqPath string) (string, error) {
return "test", nil
},
func(reqPath string) (stream.Stream, error) {
func(reqPath string) (stream.Stream, stream.Stream, error) {
// return errors.New("Some Error")
return stream.NewVideoStream("test"), nil
return stream.NewVideoStream("test"), stream.NewVideoStream("test"), nil
},
func(reqPath string) {})
+7
View File
@@ -2,7 +2,9 @@ package vidplayer
import (
"context"
"mime"
"net/http"
"path"
"strings"
@@ -29,6 +31,7 @@ func (s *VidPlayer) HandleRTMPPlay(getStream func(ctx context.Context, reqPath s
select {
case err := <-c:
glog.Errorf("Rtmp getStream Error: %v", err)
return
}
}
return nil
@@ -58,6 +61,8 @@ func (s *VidPlayer) HandleHLSPlay(getHLSBuffer func(reqPath string) (*stream.HLS
glog.Errorf("Error getting HLS playlist %v: %v", r.URL.Path, err)
return
}
w.Header().Set("Content-Type", mime.TypeByExtension(path.Ext(r.URL.Path)))
w.Header().Set("Access-Control-Allow-Origin", "*")
_, err = w.Write(pl.Encode().Bytes())
if err != nil {
glog.Errorf("Error writting HLS playlist %v: %v", r.URL.Path, err)
@@ -74,6 +79,8 @@ func (s *VidPlayer) HandleHLSPlay(getHLSBuffer func(reqPath string) (*stream.HLS
glog.Errorf("Error getting HLS segment %v: %v", segName, err)
return
}
glog.Infof("Writing seg: %v, len:%v", segName, len(seg))
w.Header().Set("Content-Type", mime.TypeByExtension(path.Ext(r.URL.Path)))
_, err = w.Write(seg)
if err != nil {
glog.Errorf("Error writting HLS segment %v: %v", segName, err)