Create a new encoder/decoder for each transaction.

This commit is contained in:
Jason Coene
2013-06-11 12:00:25 -05:00
parent c2688c7098
commit 2e33e8baaf
5 changed files with 17 additions and 19 deletions
-5
View File
@@ -2,7 +2,6 @@ package rtmp
import (
"crypto/tls"
"github.com/elobuff/goamf"
"net"
"net/url"
"sync"
@@ -16,8 +15,6 @@ type Client struct {
connected bool
conn net.Conn
enc amf.Encoder
dec amf.Decoder
outBytes uint32
outMessages chan *Message
@@ -71,8 +68,6 @@ func (c *Client) Reset() {
close(c.inMessages)
}
c.enc = *new(amf.Encoder)
c.dec = *new(amf.Decoder)
c.outBytes = 0
c.outMessages = make(chan *Message, 100)
c.outChunkSize = DEFAULT_CHUNK_SIZE
+9 -8
View File
@@ -68,29 +68,30 @@ func EncodeConnect(c *Client) (msg *Message, err error) {
cm.Object["body"] = nil
cm.Object["headers"] = cmh
enc := new(amf.Encoder)
buf := new(bytes.Buffer)
if _, err = c.enc.Encode(buf, "connect", 0); err != nil {
if _, err = enc.Encode(buf, "connect", 0); err != nil {
return
}
if _, err = c.enc.Encode(buf, tid, 0); err != nil {
if _, err = enc.Encode(buf, tid, 0); err != nil {
return
}
if _, err = c.enc.Encode(buf, opts, 0); err != nil {
if _, err = enc.Encode(buf, opts, 0); err != nil {
return
}
if _, err = c.enc.Encode(buf, false, 0); err != nil {
if _, err = enc.Encode(buf, false, 0); err != nil {
return
}
if _, err = c.enc.Encode(buf, "nil", 0); err != nil {
if _, err = enc.Encode(buf, "nil", 0); err != nil {
return
}
if _, err = c.enc.Encode(buf, "", 0); err != nil {
if _, err = enc.Encode(buf, "", 0); err != nil {
return
}
if err = c.enc.EncodeAmf0Amf3Marker(buf); err != nil {
if err = enc.EncodeAmf0Amf3Marker(buf); err != nil {
return
}
if _, err = c.enc.Encode(buf, cm, 3); err != nil {
if _, err = enc.Encode(buf, cm, 3); err != nil {
return
}
+5 -4
View File
@@ -33,6 +33,7 @@ func (c *Client) EncodeInvoke(className string, destination interface{}, operati
rm.Object["headers"] = rmh
rm.Object["body"] = body
enc := new(amf.Encoder)
buf := new(bytes.Buffer)
// amf3 empty byte
@@ -41,12 +42,12 @@ func (c *Client) EncodeInvoke(className string, destination interface{}, operati
}
// amf0 command
if _, err = c.enc.EncodeAmf0Null(buf, true); err != nil {
if _, err = enc.EncodeAmf0Null(buf, true); err != nil {
return msg, Error("client invoke: could not encode amf0 command: %s", err)
}
// amf0 tid
if _, err = c.enc.EncodeAmf0Number(buf, float64(tid), true); err != nil {
if _, err = enc.EncodeAmf0Number(buf, float64(tid), true); err != nil {
return msg, Error("client invoke: could not encode amf0 tid: %s", err)
}
@@ -56,12 +57,12 @@ func (c *Client) EncodeInvoke(className string, destination interface{}, operati
}
// amf0 amf3
if err = c.enc.EncodeAmf0Amf3Marker(buf); err != nil {
if err = enc.EncodeAmf0Amf3Marker(buf); err != nil {
return msg, Error("client invoke: could not encode amf3 0x11 byte: %s", err)
}
// amf3 object
if _, err = c.enc.EncodeAmf3(buf, rm); err != nil {
if _, err = enc.EncodeAmf3(buf, rm); err != nil {
return msg, Error("client invoke: could not encode amf3 object: %s", err)
}
+1 -1
View File
@@ -23,7 +23,7 @@ func (c *Client) routeLoop() {
}
func (c *Client) routeCommandMessage(msg *Message) {
response, err := msg.DecodeResponse(&c.dec)
response, err := msg.DecodeResponse()
if err != nil {
log.Error("unable to decode message type %d on stream %d into command, discarding: %s", msg.Type, msg.ChunkStreamId, err)
return
+2 -1
View File
@@ -24,7 +24,8 @@ func (m *Message) RemainingBytes() uint32 {
return m.Length - uint32(m.Buffer.Len())
}
func (m *Message) DecodeResponse(dec *amf.Decoder) (response *Response, err error) {
func (m *Message) DecodeResponse() (response *Response, err error) {
dec := new(amf.Decoder)
response = new(Response)
if m.ChunkStreamId != CHUNK_STREAM_ID_COMMAND {