Overhaul rtmp to add synchronous invokes, asynchronous routing, proper invoke encoding, better errors, more.

This commit is contained in:
Jason Coene
2013-06-08 22:00:51 -05:00
parent ed1034ed7e
commit fc1c33a476
10 changed files with 341 additions and 218 deletions
+99 -45
View File
@@ -5,19 +5,14 @@ import (
"github.com/elobuff/goamf"
"net"
"net/url"
"sync"
"sync/atomic"
"time"
)
type ClientHandler interface {
OnRtmpConnect()
OnRtmpDisconnect()
OnRtmpCommand(command *Command)
}
type Client struct {
url string
handler ClientHandler
connected bool
conn net.Conn
@@ -29,42 +24,73 @@ type Client struct {
outWindowSize uint32
outChunkSize uint32
outChunkStreams map[uint32]*OutboundChunkStream
inBytes uint32
inMessages chan *Message
inNotify chan uint8
inWindowSize uint32
inChunkSize uint32
inChunkStreams map[uint32]*InboundChunkStream
inBytes uint32
inMessages chan *Message
inNotify chan uint8
inWindowSize uint32
inChunkSize uint32
inChunkStreams map[uint32]*InboundChunkStream
results map[uint32]*Result
resultsMutex sync.Mutex
lastTransactionId uint32
connectionId string
}
func NewClient(url string, handler ClientHandler) (*Client, error) {
c := &Client{
func NewClient(url string) (c *Client, err error) {
c = &Client{
url: url,
connected: false,
handler: handler,
enc: *new(amf.Encoder),
dec: *new(amf.Decoder),
outMessages: make(chan *Message, 100),
outChunkSize: DEFAULT_CHUNK_SIZE,
outWindowSize: DEFAULT_WINDOW_SIZE,
outChunkStreams: make(map[uint32]*OutboundChunkStream),
inMessages: make(chan *Message, 100),
inChunkSize: DEFAULT_CHUNK_SIZE,
inWindowSize: DEFAULT_WINDOW_SIZE,
inChunkStreams: make(map[uint32]*InboundChunkStream),
}
err := c.Connect()
if err != nil {
return c, err
c.Reset()
err = c.Connect()
return
}
func (c *Client) IsAlive() bool {
if c.connected != true {
return false
}
return c, err
return true
}
func (c *Client) Reset() {
c.connected = false
if c.conn != nil {
c.conn.Close()
}
if c.outMessages != nil {
close(c.outMessages)
}
if c.inMessages != nil {
close(c.inMessages)
}
c.enc = *new(amf.Encoder)
c.dec = *new(amf.Decoder)
c.outBytes = 0
c.outMessages = make(chan *Message, 100)
c.outChunkSize = DEFAULT_CHUNK_SIZE
c.outWindowSize = DEFAULT_WINDOW_SIZE
c.outChunkStreams = make(map[uint32]*OutboundChunkStream)
c.inBytes = 0
c.inMessages = make(chan *Message, 100)
c.inChunkSize = DEFAULT_CHUNK_SIZE
c.inWindowSize = DEFAULT_WINDOW_SIZE
c.inChunkStreams = make(map[uint32]*InboundChunkStream)
c.results = make(map[uint32]*Result)
c.lastTransactionId = 0
c.connectionId = ""
}
func (c *Client) Disconnect() {
c.Reset()
log.Info("disconnected from %s", c.url, c.outBytes, c.inBytes)
}
func (c *Client) Connect() (err error) {
@@ -89,33 +115,61 @@ func (c *Client) Connect() (err error) {
err = c.handshake()
if err != nil {
return err
return Error("client connect: could not complete handshake: %s", err)
}
log.Debug("sending connect command to %s", c.url)
err = c.invokeConnect()
if err != nil {
return err
}
go c.dispatchLoop()
go c.receiveLoop()
go c.sendLoop()
go c.routeLoop()
return nil
var id string
id, err = c.connect()
if err != nil {
return Error("client connect: could not complete connect: %s", err)
}
c.connected = true
c.connectionId = id
log.Info("connected to %s (%s)", c.url, c.connectionId)
return
}
func (c *Client) NextTransactionId() uint32 {
return atomic.AddUint32(&c.lastTransactionId, 1)
}
func (c *Client) Disconnect() {
c.connected = false
c.conn.Close()
c.handler.OnRtmpDisconnect()
func (c *Client) Call(msg *Message, t uint32) (result *Result, err error) {
c.outMessages <- msg
log.Info("disconnected from %s", c.url, c.outBytes, c.inBytes)
tid := msg.TransactionId
poll := time.Tick(time.Duration(5) * time.Millisecond)
timeout := time.After(time.Duration(t) * time.Second)
for {
select {
case <-poll:
log.Trace("client call: polling for result for tid %d", tid)
c.resultsMutex.Lock()
result = c.results[tid]
if result != nil {
log.Trace("client call: found result for %d", tid)
delete(c.results, tid)
}
c.resultsMutex.Unlock()
if result != nil {
return
}
case <-timeout:
return result, Error("timed out after %d seconds", t)
}
}
return
}
func (c *Client) Read(p []byte) (n int, err error) {
+62 -43
View File
@@ -6,12 +6,39 @@ import (
"github.com/elobuff/goamf"
)
func (c *Client) invokeConnect() (err error) {
buf := new(bytes.Buffer)
tid := c.NextTransactionId()
func (c *Client) connect() (id string, err error) {
var msg *Message
msg, err = EncodeConnect(c)
if err != nil {
return id, Error("client connect: unable to encode connect command: %s", err)
}
c.enc.Encode(buf, "connect", 0)
c.enc.Encode(buf, float64(tid), 0)
var result *Result
result, err = c.Call(msg, 10)
if err != nil {
return id, Error("client connect: unable to complete connect: %s", err)
}
if !result.IsResult() {
return id, Error("client connect: connect result unsuccessful: %#v", result)
}
obj, ok := result.Objects[1].(amf.Object)
if !ok {
return id, Error("client connect: unable to find connect response: %#v", result)
}
if obj["code"] != "NetConnection.Connect.Success" {
return id, Error("client connect: connection was unsuccessful: %#v", result)
}
id = obj["id"].(string)
return
}
func EncodeConnect(c *Client) (msg *Message, err error) {
tid := c.NextTransactionId()
opts := make(amf.Object)
opts["app"] = ""
@@ -26,16 +53,11 @@ func (c *Client) invokeConnect() (err error) {
opts["pageUrl"] = nil
opts["objectEncoding"] = 3
c.enc.Encode(buf, opts, 0)
c.enc.Encode(buf, false, 0)
c.enc.Encode(buf, "nil", 0)
c.enc.Encode(buf, "", 0)
cmh := make(amf.Object)
cmh["DSMessagingVersion"] = 1
cmh["DSId"] = "my-rtmps"
cm := amf.NewTypedObject()
cm := *amf.NewTypedObject()
cm.Type = "flex.messaging.messages.CommandMessage"
cm.Object["destination"] = ""
cm.Object["operation"] = 5
@@ -46,40 +68,37 @@ func (c *Client) invokeConnect() (err error) {
cm.Object["body"] = nil
cm.Object["headers"] = cmh
c.enc.Encode(buf, cm, 0)
buf := new(bytes.Buffer)
if _, err = c.enc.Encode(buf, "connect", 0); err != nil {
return
}
if _, err = c.enc.Encode(buf, tid, 0); err != nil {
return
}
if _, err = c.enc.Encode(buf, opts, 0); err != nil {
return
}
if _, err = c.enc.Encode(buf, false, 0); err != nil {
return
}
if _, err = c.enc.Encode(buf, "nil", 0); err != nil {
return
}
if _, err = c.enc.Encode(buf, "", 0); err != nil {
return
}
if err = c.enc.EncodeAmf0Amf3Marker(buf); err != nil {
return
}
if _, err = c.enc.Encode(buf, cm, 3); err != nil {
return
}
m := &Message{
ChunkStreamId: CHUNK_STREAM_ID_COMMAND,
return &Message{
Type: MESSAGE_TYPE_AMF0,
ChunkStreamId: CHUNK_STREAM_ID_COMMAND,
TransactionId: tid,
Length: uint32(buf.Len()),
Buffer: buf,
}
c.outMessages <- m
return
}
func (c *Client) handleConnectResult(cmd *Command) {
log.Debug("connect response received from %s", c.url)
obj, ok := cmd.Objects[1].(amf.Object)
if !ok {
log.Error("unable to find connect result in command response")
c.Disconnect()
return
}
if obj["code"] != "NetConnection.Connect.Success" {
log.Error("connect unsuccessful: %s", obj["code"])
c.Disconnect()
return
}
c.connected = true
c.connectionId = obj["id"].(string)
c.handler.OnRtmpConnect()
log.Info("connected to %s (%s)", c.url, c.connectionId)
return
}, err
}
-61
View File
@@ -1,61 +0,0 @@
package rtmp
import (
"encoding/binary"
)
func (c *Client) dispatchLoop() {
for {
m := <-c.inMessages
switch m.ChunkStreamId {
case CHUNK_STREAM_ID_PROTOCOL:
c.dispatchProtocolMessage(m)
case CHUNK_STREAM_ID_COMMAND:
c.dispatchCommandMessage(m)
default:
log.Warn("discarding message on unknown chunk stream %d: +%v", m.ChunkStreamId, m)
}
}
}
func (c *Client) dispatchProtocolMessage(m *Message) {
switch m.Type {
case MESSAGE_TYPE_CHUNK_SIZE:
var err error
var size uint32
err = binary.Read(m.Buffer, binary.BigEndian, &size)
if err != nil {
log.Error("error decoding chunk size: %s", err)
return
}
c.inChunkSize = size
log.Debug("received chunk size, setting to %d", c.inChunkSize, size)
case MESSAGE_TYPE_ACK_SIZE:
log.Debug("received ack size, discarding")
case MESSAGE_TYPE_BANDWIDTH:
log.Debug("received bandwidth, discarding")
default:
log.Debug("received protocol message %d, discarding", m.Type)
}
}
func (c *Client) dispatchCommandMessage(m *Message) {
cmd, err := m.DecodeCommand(&c.dec)
if err != nil {
log.Error("unable to decode message type %d on stream %d into command, discarding: %s", m.Type, m.ChunkStreamId, err)
return
}
if cmd.Name == "_result" && cmd.TransactionId == 1 {
c.handleConnectResult(cmd)
} else {
c.handler.OnRtmpCommand(cmd)
return
}
}
+62 -39
View File
@@ -1,49 +1,72 @@
package rtmp
/*
func (c *Client) Invoke(destination string, operation string, objects ...interface{}) (tid uint32, err error) {
rmh := *amf.MakeObject()
rmh["DSRequestTimeout"] = 60
rmh["DSId"] = c.connectionId
rmh["DSEndpoint"] = "my-rtmps"
import (
"bytes"
"code.google.com/p/go-uuid/uuid"
"github.com/elobuff/goamf"
)
rm := *amf.MakeTypedObject()
rm.Type = "flex.messaging.messages.RemotingMessage"
rm.Object["destination"] = destination
rm.Object["operation"] = operation
rm.Object["messageId"] = uuid.New()
rm.Object["source"] = nil
rm.Object["timestamp"] = 0
rm.Object["timeToLive"] = 0
rm.Object["headers"] = rmh
rm.Object["body"] = objects
func (c *Client) EncodeInvoke(destination string, operation string, vals ...interface{}) (msg *Message, err error) {
tid := c.NextTransactionId()
buf := new(bytes.Buffer)
tid = c.NextTransactionId()
rmh := make(amf.Object)
rmh["DSRequestTimeout"] = 60
rmh["DSId"] = c.connectionId
rmh["DSEndpoint"] = "my-rtmps"
amf.WriteMarker(buf, 0x00) // AMF3
amf.WriteNull(buf) // command
amf.WriteDouble(buf, float64(tid)) // tid
amf.WriteMarker(buf, amf.AMF0_ACMPLUS_OBJECT_MARKER) // amf3
var body []interface{}
for _, val := range vals {
body = append(body, val)
}
log.Debug("rm: %+v", rm)
log.Debug("buf before: %+v", buf.Bytes())
_, err = amf.AMF3_WriteValue(buf, rm)
if err != nil {
log.Debug("error: %s", err)
return
}
log.Debug("buf after: %s", buf.Bytes())
rm := *amf.NewTypedObject()
rm.Type = "flex.messaging.messages.RemotingMessage"
rm.Object["destination"] = destination
rm.Object["operation"] = operation
rm.Object["messageId"] = uuid.New()
rm.Object["source"] = nil
rm.Object["timestamp"] = 0
rm.Object["timeToLive"] = 0
rm.Object["headers"] = rmh
rm.Object["body"] = body
m := &Message{
Type: MESSAGE_TYPE_AMF3,
ChunkStreamId: CHUNK_STREAM_ID_COMMAND,
Length: uint32(buf.Len()),
Buffer: buf,
}
buf := new(bytes.Buffer)
c.outMessages <- msg
// amf3 empty byte
if err = amf.WriteMarker(buf, 0x00); err != nil {
return msg, Error("client invoke: could not encode amf3 0x00 byte: %s", err)
}
return
// amf0 command
if _, err = c.enc.EncodeAmf0Null(buf, true); err != nil {
return msg, Error("client invoke: could not encode amf0 command: %s", err)
}
// amf0 tid
if _, err = c.enc.EncodeAmf0Number(buf, float64(tid), true); err != nil {
return msg, Error("client invoke: could not encode amf0 tid: %s", err)
}
// amf0 nil?
if err = amf.WriteMarker(buf, 0x05); err != nil {
return msg, Error("client invoke: could not encode amf3 0x05 byte: %s", err)
}
// amf0 amf3
if err = c.enc.EncodeAmf0Amf3Marker(buf); err != nil {
return msg, Error("client invoke: could not encode amf3 0x11 byte: %s", err)
}
// amf3 object
if _, err = c.enc.EncodeAmf3(buf, rm); err != nil {
return msg, Error("client invoke: could not encode amf3 object: %s", err)
}
return &Message{
Type: MESSAGE_TYPE_AMF3,
ChunkStreamId: CHUNK_STREAM_ID_COMMAND,
TransactionId: tid,
Length: uint32(buf.Len()),
Buffer: buf,
}, nil
}
*/
+31
View File
@@ -0,0 +1,31 @@
package rtmp
import (
"encoding/binary"
)
func (c *Client) handleProtocolMessage(m *Message) {
switch m.Type {
case MESSAGE_TYPE_CHUNK_SIZE:
var err error
var size uint32
err = binary.Read(m.Buffer, binary.BigEndian, &size)
if err != nil {
log.Error("error decoding chunk size: %s", err)
return
}
c.inChunkSize = size
log.Debug("received chunk size, setting to %d", c.inChunkSize, size)
case MESSAGE_TYPE_ACK_SIZE:
log.Debug("received ack size, discarding")
case MESSAGE_TYPE_BANDWIDTH:
log.Debug("received bandwidth, discarding")
default:
log.Debug("received protocol message %d, discarding", m.Type)
}
}
+5 -4
View File
@@ -10,9 +10,9 @@ func (c *Client) receiveLoop() {
// Read the next header from the connection
h, err := ReadHeader(c)
if err != nil {
if c.connected {
if c.IsAlive() {
log.Warn("unable to receive next header while connected")
c.Disconnect()
c.Reset()
}
return
}
@@ -30,7 +30,7 @@ func (c *Client) receiveLoop() {
if (cs.lastHeader == nil) && (h.Format != HEADER_FORMAT_FULL) {
log.Warn("unable to find previous header on chunk stream %d", h.ChunkStreamId)
c.Disconnect()
c.Reset()
return
}
@@ -95,7 +95,7 @@ func (c *Client) receiveLoop() {
if err != nil {
if c.connected {
log.Warn("unable to copy %d message bytes from buffer", rs)
c.Disconnect()
c.Reset()
}
return
@@ -103,6 +103,7 @@ func (c *Client) receiveLoop() {
if m.RemainingBytes() == 0 {
cs.currentMessage = nil
log.Trace("receive sending message to router: %#v", m)
c.inMessages <- m
} else {
cs.currentMessage = m
+37
View File
@@ -0,0 +1,37 @@
package rtmp
func (c *Client) routeLoop() {
for {
msg, open := <-c.inMessages
log.Trace("client route: received message: %#v", msg)
if !open {
log.Trace("client route: channel closed, exiting")
return
}
switch msg.ChunkStreamId {
case CHUNK_STREAM_ID_PROTOCOL:
c.handleProtocolMessage(msg)
case CHUNK_STREAM_ID_COMMAND:
c.routeCommandMessage(msg)
default:
log.Warn("discarding message on unknown chunk stream %d: +%v", msg.ChunkStreamId, msg)
}
}
}
func (c *Client) routeCommandMessage(msg *Message) {
result, err := msg.DecodeResult(&c.dec)
if err != nil {
log.Error("unable to decode message type %d on stream %d into command, discarding: %s", msg.Type, msg.ChunkStreamId, err)
return
}
tid := uint32(result.TransactionId)
c.resultsMutex.Lock()
c.results[tid] = result
c.resultsMutex.Unlock()
}
+15 -8
View File
@@ -6,7 +6,14 @@ import (
func (c *Client) sendLoop() {
for {
m := <-c.outMessages
m, open := <-c.outMessages
if !open {
log.Trace("client send: channel closed, exiting")
return
}
log.Trace("client send: processing message: %#v", m)
var cs *OutboundChunkStream = c.outChunkStreams[m.ChunkStreamId]
if cs == nil {
@@ -21,12 +28,12 @@ func (c *Client) sendLoop() {
var rem uint32 = m.Length
for rem > 0 {
log.Debug("send header: %+v", h)
log.Trace("client send: send header: %+v", h)
_, err = h.Write(c)
if err != nil {
if c.connected {
if c.IsAlive() {
log.Warn("unable to send header: %v", err)
c.Disconnect()
c.Reset()
}
return
}
@@ -36,13 +43,13 @@ func (c *Client) sendLoop() {
ws = c.outChunkSize
}
log.Debug("send bytes: %d", ws)
log.Trace("client send: send bytes: %d", ws)
n, err = io.CopyN(c, m.Buffer, int64(ws))
if err != nil {
if c.connected {
if c.IsAlive() {
log.Warn("unable to send message")
c.Disconnect()
c.Reset()
}
return
}
@@ -54,7 +61,7 @@ func (c *Client) sendLoop() {
h.Format = HEADER_FORMAT_CONTINUATION
}
log.Debug("send complete")
log.Trace("client send: send complete")
}
}
+15 -18
View File
@@ -11,6 +11,7 @@ type Message struct {
StreamId uint32
Timestamp uint32
AbsoluteTimestamp uint32
TransactionId uint32
Length uint32
Buffer *bytes.Buffer
}
@@ -23,50 +24,46 @@ func (m *Message) RemainingBytes() uint32 {
return m.Length - uint32(m.Buffer.Len())
}
func (m *Message) DecodeCommand(dec *amf.Decoder) (*Command, error) {
var err error
var obj interface{}
cmd := new(Command)
cmd.Version = AMF0
func (m *Message) DecodeResult(dec *amf.Decoder) (result *Result, err error) {
result = new(Result)
if m.ChunkStreamId != CHUNK_STREAM_ID_COMMAND {
return cmd, Error("message is not a command message")
return result, Error("message is not a command message")
}
switch m.Type {
case MESSAGE_TYPE_AMF3:
cmd.Version = AMF3
_, err = m.Buffer.ReadByte()
if err != nil {
return cmd, Error("unable to read first byte of amf3 message")
return result, Error("unable to read first byte of amf3 message")
}
fallthrough
case MESSAGE_TYPE_AMF0:
cmd.Name, err = dec.DecodeAmf0String(m.Buffer, true)
result.Name, err = dec.DecodeAmf0String(m.Buffer, true)
if err != nil {
return cmd, Error("unable to read command from amf message")
return result, Error("unable to read command from amf message")
}
cmd.TransactionId, err = dec.DecodeAmf0Number(m.Buffer, true)
result.TransactionId, err = dec.DecodeAmf0Number(m.Buffer, true)
if err != nil {
return cmd, Error("unable to read tid from amf message")
return result, Error("unable to read tid from amf message")
}
var obj interface{}
for m.Buffer.Len() > 0 {
obj, err = dec.Decode(m.Buffer, 0)
if err != nil {
return cmd, Error("unable to read object from amf message: %s", err)
return result, Error("unable to read object from amf message: %s", err)
}
cmd.Objects = append(cmd.Objects, obj)
result.Objects = append(result.Objects, obj)
}
default:
return cmd, Error("unable to decode message: %+v", m)
return result, Error("unable to decode message: %+v", m)
}
log.Debug("command decoded: %+v", cmd)
log.Debug("command decoded: %+v", result)
return cmd, err
return result, err
}
+15
View File
@@ -0,0 +1,15 @@
package rtmp
type Result struct {
Name string
TransactionId float64
Objects []interface{}
}
func (r *Result) IsResult() bool {
return r.Name == "_result"
}
func (r *Result) IsError() bool {
return r.Name == "_error"
}