mirror of
https://github.com/livepeer/lpms
synced 2026-04-23 00:07:25 +08:00
vidlistener: Support opaque user data.
This allows us to pass around per-stream data from MakeStreamID without storing it outside LPMS.
This commit is contained in:
+9
-2
@@ -27,6 +27,12 @@ import (
|
||||
|
||||
var HLSWaitTime = time.Second * 10
|
||||
|
||||
type exampleStream string
|
||||
|
||||
func (t *exampleStream) StreamID() string {
|
||||
return string(*t)
|
||||
}
|
||||
|
||||
func randString(n int) string {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
x := make([]byte, n, n)
|
||||
@@ -75,8 +81,9 @@ func main() {
|
||||
|
||||
lpms.HandleRTMPPublish(
|
||||
//makeStreamID (give the stream an ID)
|
||||
func(url *url.URL) (strmID string) {
|
||||
return randString(10)
|
||||
func(url *url.URL) stream.AppData {
|
||||
s := exampleStream(randString(10))
|
||||
return &s
|
||||
},
|
||||
|
||||
//gotStream
|
||||
|
||||
+1
-1
@@ -116,7 +116,7 @@ func (l *LPMS) Start(ctx context.Context) error {
|
||||
|
||||
//HandleRTMPPublish offload to the video listener. To understand how it works, look at videoListener.HandleRTMPPublish.
|
||||
func (l *LPMS) HandleRTMPPublish(
|
||||
makeStreamID func(url *url.URL) (strmID string),
|
||||
makeStreamID func(url *url.URL) (strmID stream.AppData),
|
||||
gotStream func(url *url.URL, rtmpStrm stream.RTMPVideoStream) (err error),
|
||||
endStream func(url *url.URL, rtmpStrm stream.RTMPVideoStream) error) {
|
||||
|
||||
|
||||
@@ -32,6 +32,7 @@ import (
|
||||
|
||||
type TestStream struct{}
|
||||
|
||||
func (s TestStream) AppData() stream.AppData { return nil }
|
||||
func (s TestStream) String() string { return "" }
|
||||
func (s *TestStream) GetStreamFormat() stream.VideoFormat { return stream.RTMP }
|
||||
func (s *TestStream) GetStreamID() string { return "test" }
|
||||
|
||||
@@ -53,6 +53,8 @@ func (s *BasicHLSVideoStream) SetSubscriber(f func(seg *HLSSegment, eof bool)) {
|
||||
//GetStreamID returns the streamID
|
||||
func (s *BasicHLSVideoStream) GetStreamID() string { return s.strmID }
|
||||
|
||||
func (s *BasicHLSVideoStream) AppData() AppData { return nil }
|
||||
|
||||
//GetStreamFormat always returns HLS
|
||||
func (s *BasicHLSVideoStream) GetStreamFormat() VideoFormat { return HLS }
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
)
|
||||
|
||||
type BasicRTMPVideoStream struct {
|
||||
streamID string
|
||||
appData AppData // opaque app-supplied data
|
||||
ch chan *av.Packet
|
||||
listeners map[string]av.MuxCloser
|
||||
listnersLock *sync.Mutex
|
||||
@@ -26,14 +26,14 @@ type BasicRTMPVideoStream struct {
|
||||
}
|
||||
|
||||
//NewBasicRTMPVideoStream creates a new BasicRTMPVideoStream. The default RTMPTimeout is set to 10 milliseconds because we assume all RTMP streams are local.
|
||||
func NewBasicRTMPVideoStream(id string) *BasicRTMPVideoStream {
|
||||
func NewBasicRTMPVideoStream(data AppData) *BasicRTMPVideoStream {
|
||||
ch := make(chan *av.Packet)
|
||||
eof := make(chan struct{})
|
||||
listeners := make(map[string]av.MuxCloser)
|
||||
lLock := &sync.Mutex{}
|
||||
cLock := &sync.Mutex{}
|
||||
|
||||
s := &BasicRTMPVideoStream{streamID: id, listeners: listeners, listnersLock: lLock, ch: ch, EOF: eof, closeLock: cLock, closed: false}
|
||||
s := &BasicRTMPVideoStream{appData: data, listeners: listeners, listnersLock: lLock, ch: ch, EOF: eof, closeLock: cLock, closed: false}
|
||||
//Automatically start a worker that reads packets. There is no buffering of the video packets.
|
||||
go func(strm *BasicRTMPVideoStream) {
|
||||
var cache map[string]av.MuxCloser
|
||||
@@ -64,7 +64,14 @@ func NewBasicRTMPVideoStream(id string) *BasicRTMPVideoStream {
|
||||
}
|
||||
|
||||
func (s *BasicRTMPVideoStream) GetStreamID() string {
|
||||
return s.streamID
|
||||
if s.appData == nil {
|
||||
return ""
|
||||
}
|
||||
return s.appData.StreamID()
|
||||
}
|
||||
|
||||
func (s *BasicRTMPVideoStream) AppData() AppData {
|
||||
return s.appData
|
||||
}
|
||||
|
||||
func (s *BasicRTMPVideoStream) GetStreamFormat() VideoFormat {
|
||||
@@ -153,7 +160,7 @@ func (s *BasicRTMPVideoStream) Close() {
|
||||
return
|
||||
}
|
||||
s.closed = true
|
||||
glog.V(2).Infof("Closing RTMP %v", s.streamID)
|
||||
glog.V(2).Infof("Closing RTMP %v", s.appData.StreamID())
|
||||
close(s.EOF)
|
||||
}
|
||||
|
||||
|
||||
@@ -52,8 +52,18 @@ func (d NoEOFDemuxer) ReadPacket() (av.Packet, error) {
|
||||
return av.Packet{Data: []byte{0}}, nil
|
||||
}
|
||||
|
||||
type testStream string
|
||||
|
||||
func (t *testStream) StreamID() string {
|
||||
return string(*t)
|
||||
}
|
||||
func newTestStream(s string) *testStream {
|
||||
t := testStream(s)
|
||||
return &t
|
||||
}
|
||||
|
||||
func TestWriteBasicRTMPErrors(t *testing.T) {
|
||||
stream := NewBasicRTMPVideoStream("test")
|
||||
stream := NewBasicRTMPVideoStream(newTestStream(t.Name()))
|
||||
_, err := stream.WriteRTMPToStream(context.Background(), BadStreamsDemuxer{})
|
||||
if err != ErrStreams {
|
||||
t.Error("Expecting Streams Error, but got: ", err)
|
||||
@@ -92,7 +102,7 @@ func (d *PacketsDemuxer) ReadPacket() (av.Packet, error) {
|
||||
|
||||
func TestWriteBasicRTMP(t *testing.T) {
|
||||
glog.Infof("\n\nTestWriteBasicRTMP\n\n")
|
||||
stream := NewBasicRTMPVideoStream("test")
|
||||
stream := NewBasicRTMPVideoStream(newTestStream(t.Name()))
|
||||
//Add a listener
|
||||
l := &PacketsMuxer{}
|
||||
stream.listeners["rand"] = l
|
||||
@@ -118,7 +128,7 @@ func TestRTMPConcurrency(t *testing.T) {
|
||||
// Run under -race
|
||||
|
||||
glog.Infof("\n\nTest%s\n", t.Name())
|
||||
st := NewBasicRTMPVideoStream(t.Name())
|
||||
st := NewBasicRTMPVideoStream(newTestStream(t.Name()))
|
||||
demux := &PacketsDemuxer{c: &Counter{Count: 0}, wait: true, startReading: make(chan struct{})}
|
||||
eof, err := st.WriteRTMPToStream(context.Background(), demux)
|
||||
if err != nil {
|
||||
@@ -179,7 +189,7 @@ func (d *BadPacketMuxer) WritePacket(av.Packet) error { return ErrBadPacket
|
||||
|
||||
func TestReadBasicRTMPError(t *testing.T) {
|
||||
glog.Infof("\nTestReadBasicRTMPError\n\n")
|
||||
stream := NewBasicRTMPVideoStream("test")
|
||||
stream := NewBasicRTMPVideoStream(newTestStream(t.Name()))
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
if _, err := stream.ReadRTMPFromStream(context.Background(), &BadHeaderMuxer{}); err != ErrBadHeader {
|
||||
@@ -198,7 +208,7 @@ func TestReadBasicRTMPError(t *testing.T) {
|
||||
case <-done:
|
||||
}
|
||||
|
||||
stream = NewBasicRTMPVideoStream("test")
|
||||
stream = NewBasicRTMPVideoStream(newTestStream(t.Name()))
|
||||
done = make(chan struct{})
|
||||
go func() {
|
||||
eof, err := stream.ReadRTMPFromStream(context.Background(), &BadPacketMuxer{})
|
||||
@@ -276,7 +286,7 @@ func (d *PacketsMuxer) numPackets() int {
|
||||
}
|
||||
|
||||
func TestReadBasicRTMP(t *testing.T) {
|
||||
stream := NewBasicRTMPVideoStream("test")
|
||||
stream := NewBasicRTMPVideoStream(newTestStream(t.Name()))
|
||||
_, err := stream.WriteRTMPToStream(context.Background(), &PacketsDemuxer{c: &Counter{Count: 0}})
|
||||
if err != nil {
|
||||
t.Error("Error setting up the test - while inserting packet.")
|
||||
|
||||
@@ -7,7 +7,12 @@ import (
|
||||
"github.com/nareix/joy4/av"
|
||||
)
|
||||
|
||||
type AppData interface {
|
||||
StreamID() string
|
||||
}
|
||||
|
||||
type VideoStream interface {
|
||||
AppData() AppData
|
||||
GetStreamID() string
|
||||
GetStreamFormat() VideoFormat
|
||||
String() string
|
||||
|
||||
@@ -27,7 +27,7 @@ type VidListener struct {
|
||||
//gotStream is called when the stream starts. It gives you access to the stream.
|
||||
//endStream is called when the stream ends. It gives you access to the stream.
|
||||
func (self *VidListener) HandleRTMPPublish(
|
||||
makeStreamID func(url *url.URL) (strmID string),
|
||||
makeStreamID func(url *url.URL) (strmID stream.AppData),
|
||||
gotStream func(url *url.URL, rtmpStrm stream.RTMPVideoStream) error,
|
||||
endStream func(url *url.URL, rtmpStrm stream.RTMPVideoStream) error) {
|
||||
|
||||
@@ -36,7 +36,7 @@ func (self *VidListener) HandleRTMPPublish(
|
||||
glog.V(2).Infof("RTMP server got upstream: %v", conn.URL)
|
||||
|
||||
strmID := makeStreamID(conn.URL)
|
||||
if strmID == "" {
|
||||
if strmID == nil || strmID.StreamID() == "" {
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -14,6 +14,16 @@ import (
|
||||
joy4rtmp "github.com/nareix/joy4/format/rtmp"
|
||||
)
|
||||
|
||||
type testStream string
|
||||
|
||||
func (t *testStream) StreamID() string {
|
||||
return string(*t)
|
||||
}
|
||||
func newTestStream() *testStream {
|
||||
t := testStream("testID")
|
||||
return &t
|
||||
}
|
||||
|
||||
func TestListener(t *testing.T) {
|
||||
server := &joy4rtmp.Server{Addr: ":1937"}
|
||||
listener := &VidListener{RtmpServer: server}
|
||||
@@ -21,8 +31,8 @@ func TestListener(t *testing.T) {
|
||||
|
||||
listener.HandleRTMPPublish(
|
||||
//makeStreamID
|
||||
func(url *url.URL) string {
|
||||
return "testID"
|
||||
func(url *url.URL) stream.AppData {
|
||||
return newTestStream()
|
||||
},
|
||||
//gotStream
|
||||
func(url *url.URL, rtmpStrm stream.RTMPVideoStream) (err error) {
|
||||
@@ -72,8 +82,8 @@ func TestListenerError(t *testing.T) {
|
||||
failures := 0
|
||||
badListener.HandleRTMPPublish(
|
||||
//makeStreamID
|
||||
func(url *url.URL) string {
|
||||
return "testID"
|
||||
func(url *url.URL) stream.AppData {
|
||||
return newTestStream()
|
||||
},
|
||||
//gotStream
|
||||
func(url *url.URL, rtmpStrm stream.RTMPVideoStream) error {
|
||||
@@ -113,9 +123,9 @@ func TestListenerEmptyStreamID(t *testing.T) {
|
||||
|
||||
badListener.HandleRTMPPublish(
|
||||
//makeStreamID
|
||||
func(url *url.URL) string {
|
||||
func(url *url.URL) stream.AppData {
|
||||
// On returning empty stream id connection should be closed
|
||||
return ""
|
||||
return newTestStream()
|
||||
},
|
||||
//gotStream
|
||||
func(url *url.URL, rtmpStrm stream.RTMPVideoStream) error {
|
||||
|
||||
Reference in New Issue
Block a user