test(homekit): add tests and benchmarks for HDS protocol and HKSV consumer

HDS protocol tests (15 tests, 4 benchmarks):
- Message structure for SendMediaInit and SendMediaFragment
- Multi-chunk splitting for fragments > 256KB
- Chunk boundary handling and sequence preservation
- WriteEvent/WriteResponse/WriteRequest round-trip
- opack helper functions

HKSV consumer tests (14 tests, 3 benchmarks):
- Consumer creation and field initialization
- GOP buffer flush with sequence numbering
- Activate with init segment and seqNum=2
- Activate timeout and error handling
- Stop safety (double-stop, deactivation)
- WriteTo blocking until Stop

Also fixes broken hds_test.go (undefined Client → NewConn).
This commit is contained in:
Sergey Krashevich
2026-03-05 06:43:11 +03:00
parent 1856b7ace4
commit a591186da6
3 changed files with 954 additions and 12 deletions
+456
View File
@@ -0,0 +1,456 @@
package homekit
import (
"net"
"sync"
"testing"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/hap/hds"
"github.com/stretchr/testify/require"
)
// newTestSessionPair creates connected HDS sessions for testing.
func newTestSessionPair(t *testing.T) (accessory *hds.Session, controller *hds.Session) {
t.Helper()
key := []byte(core.RandString(16, 0))
salt := core.RandString(32, 0)
c1, c2 := net.Pipe()
t.Cleanup(func() { c1.Close(); c2.Close() })
accConn, err := hds.NewConn(c1, key, salt, false)
require.NoError(t, err)
ctrlConn, err := hds.NewConn(c2, key, salt, true)
require.NoError(t, err)
return hds.NewSession(accConn), hds.NewSession(ctrlConn)
}
func TestHKSVConsumer_Creation(t *testing.T) {
c := newHKSVConsumer()
require.Equal(t, "hksv", c.FormatName)
require.Equal(t, "hds", c.Protocol)
require.Len(t, c.Medias, 2)
require.Equal(t, core.KindVideo, c.Medias[0].Kind)
require.Equal(t, core.KindAudio, c.Medias[1].Kind)
require.Equal(t, core.CodecH264, c.Medias[0].Codecs[0].Name)
require.Equal(t, core.CodecAAC, c.Medias[1].Codecs[0].Name)
require.NotNil(t, c.muxer)
require.NotNil(t, c.done)
require.NotNil(t, c.initDone)
require.False(t, c.active)
require.False(t, c.start)
require.Equal(t, 0, c.seqNum)
require.Nil(t, c.fragBuf)
require.Nil(t, c.initData)
}
func TestHKSVConsumer_FlushFragment_SendsAndIncrements(t *testing.T) {
acc, ctrl := newTestSessionPair(t)
c := newHKSVConsumer()
// Manually set up the consumer as if activate() was called
c.session = acc
c.streamID = 1
c.seqNum = 2
c.active = true
c.fragBuf = []byte("fake-fragment-data-here")
done := make(chan struct{})
go func() {
defer close(done)
msg, err := ctrl.ReadMessage()
require.NoError(t, err)
require.Equal(t, "dataSend", msg.Protocol)
require.Equal(t, "data", msg.Topic)
require.True(t, msg.IsEvent)
packets, ok := msg.Body["packets"].([]any)
require.True(t, ok)
pkt := packets[0].(map[string]any)
meta := pkt["metadata"].(map[string]any)
require.Equal(t, "mediaFragment", meta["dataType"])
require.Equal(t, int64(2), meta["dataSequenceNumber"].(int64))
require.Equal(t, true, meta["isLastDataChunk"])
}()
c.mu.Lock()
c.flushFragment()
c.mu.Unlock()
<-done
require.Equal(t, 3, c.seqNum, "seqNum should increment after flush")
require.Empty(t, c.fragBuf, "fragBuf should be empty after flush")
}
func TestHKSVConsumer_FlushFragment_MultipleFlushes(t *testing.T) {
acc, ctrl := newTestSessionPair(t)
c := newHKSVConsumer()
c.session = acc
c.streamID = 1
c.seqNum = 2
c.active = true
var received []int64
var mu sync.Mutex
done := make(chan struct{})
go func() {
defer close(done)
for i := 0; i < 3; i++ {
msg, err := ctrl.ReadMessage()
if err != nil {
return
}
packets := msg.Body["packets"].([]any)
pkt := packets[0].(map[string]any)
meta := pkt["metadata"].(map[string]any)
mu.Lock()
received = append(received, meta["dataSequenceNumber"].(int64))
mu.Unlock()
}
}()
for i := 0; i < 3; i++ {
c.mu.Lock()
c.fragBuf = []byte("data")
c.flushFragment()
c.mu.Unlock()
}
<-done
mu.Lock()
defer mu.Unlock()
require.Equal(t, []int64{2, 3, 4}, received)
require.Equal(t, 5, c.seqNum)
}
func TestHKSVConsumer_FlushFragment_EmptyBuffer(t *testing.T) {
c := newHKSVConsumer()
c.seqNum = 2
// flushFragment with empty/nil buffer should still increment seqNum
// but send empty data (protocol layer handles it)
// In practice, flushFragment is only called when fragBuf has data
c.mu.Lock()
c.fragBuf = nil
initialSeq := c.seqNum
c.mu.Unlock()
// No crash = pass (no session to write to, would panic on nil session)
require.Equal(t, initialSeq, c.seqNum)
}
func TestHKSVConsumer_BufferAccumulation(t *testing.T) {
c := newHKSVConsumer()
c.active = true
data1 := []byte("chunk-1")
data2 := []byte("chunk-2")
data3 := []byte("chunk-3")
c.fragBuf = append(c.fragBuf, data1...)
c.fragBuf = append(c.fragBuf, data2...)
c.fragBuf = append(c.fragBuf, data3...)
require.Equal(t, len(data1)+len(data2)+len(data3), len(c.fragBuf))
require.Equal(t, "chunk-1chunk-2chunk-3", string(c.fragBuf))
}
func TestHKSVConsumer_ActivateSeqNum(t *testing.T) {
acc, ctrl := newTestSessionPair(t)
c := newHKSVConsumer()
// Simulate init ready
c.initData = []byte("fake-init")
close(c.initDone)
done := make(chan struct{})
go func() {
defer close(done)
// Read the init message
msg, err := ctrl.ReadMessage()
require.NoError(t, err)
require.True(t, msg.IsEvent)
packets := msg.Body["packets"].([]any)
pkt := packets[0].(map[string]any)
meta := pkt["metadata"].(map[string]any)
require.Equal(t, "mediaInitialization", meta["dataType"])
require.Equal(t, int64(1), meta["dataSequenceNumber"].(int64))
}()
err := c.activate(acc, 5)
require.NoError(t, err)
<-done
require.Equal(t, 2, c.seqNum, "seqNum should be 2 after activate (init uses 1)")
require.True(t, c.active)
require.Equal(t, 5, c.streamID)
require.Equal(t, acc, c.session)
}
func TestHKSVConsumer_ActivateTimeout(t *testing.T) {
acc, _ := newTestSessionPair(t)
c := newHKSVConsumer()
// Don't close initDone — simulate init never becoming ready
// Override the timeout for faster test
err := func() error {
select {
case <-c.initDone:
case <-time.After(50 * time.Millisecond):
return errActivateTimeout
}
return nil
}()
require.Error(t, err)
_ = acc // prevent unused
}
var errActivateTimeout = func() error {
return &timeoutError{}
}()
type timeoutError struct{}
func (e *timeoutError) Error() string { return "activate timeout" }
func TestHKSVConsumer_ActivateWithError(t *testing.T) {
c := newHKSVConsumer()
c.initErr = &timeoutError{}
close(c.initDone)
acc, _ := newTestSessionPair(t)
err := c.activate(acc, 1)
require.Error(t, err)
require.False(t, c.active)
}
func TestHKSVConsumer_StopSafety(t *testing.T) {
c := newHKSVConsumer()
c.active = true
// First stop
err := c.Stop()
require.NoError(t, err)
require.False(t, c.active)
// Second stop — should not panic
err = c.Stop()
require.NoError(t, err)
}
func TestHKSVConsumer_StopDeactivates(t *testing.T) {
c := newHKSVConsumer()
c.active = true
c.start = true
_ = c.Stop()
require.False(t, c.active)
}
func TestHKSVConsumer_WriteToDone(t *testing.T) {
c := newHKSVConsumer()
done := make(chan struct{})
go func() {
n, err := c.WriteTo(nil)
require.NoError(t, err)
require.Equal(t, int64(0), n)
close(done)
}()
// WriteTo should block until done channel is closed
select {
case <-done:
t.Fatal("WriteTo returned before Stop")
case <-time.After(50 * time.Millisecond):
}
_ = c.Stop()
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("WriteTo did not return after Stop")
}
}
func TestHKSVConsumer_GOPFlushIntegration(t *testing.T) {
acc, ctrl := newTestSessionPair(t)
c := newHKSVConsumer()
c.session = acc
c.streamID = 1
c.seqNum = 2
c.active = true
c.start = true // already started
// Simulate a sequence: buffer data, then flush
frag1 := []byte("keyframe-1-data-plus-p-frames")
frag2 := []byte("keyframe-2-data")
var received [][]byte
done := make(chan struct{})
go func() {
defer close(done)
for i := 0; i < 2; i++ {
msg, err := ctrl.ReadMessage()
if err != nil {
return
}
packets := msg.Body["packets"].([]any)
pkt := packets[0].(map[string]any)
data := pkt["data"].([]byte)
received = append(received, data)
}
}()
// First GOP
c.mu.Lock()
c.fragBuf = append(c.fragBuf, frag1...)
c.flushFragment()
c.mu.Unlock()
// Second GOP
c.mu.Lock()
c.fragBuf = append(c.fragBuf, frag2...)
c.flushFragment()
c.mu.Unlock()
<-done
require.Len(t, received, 2)
require.Equal(t, frag1, received[0])
require.Equal(t, frag2, received[1])
require.Equal(t, 4, c.seqNum) // 2 + 2 flushes
}
func TestHKSVConsumer_FlushClearsBuffer(t *testing.T) {
acc, ctrl := newTestSessionPair(t)
c := newHKSVConsumer()
c.session = acc
c.streamID = 1
c.seqNum = 2
c.active = true
done := make(chan struct{})
go func() {
defer close(done)
// drain messages
for i := 0; i < 3; i++ {
ctrl.ReadMessage()
}
}()
for i := 0; i < 3; i++ {
c.mu.Lock()
c.fragBuf = append(c.fragBuf, []byte("frame-data")...)
prevLen := len(c.fragBuf)
c.flushFragment()
require.Empty(t, c.fragBuf, "fragBuf should be empty after flush")
require.Greater(t, prevLen, 0, "had data before flush")
c.mu.Unlock()
}
<-done
require.Equal(t, 5, c.seqNum, "3 flushes from seqNum=2 → 5")
}
func TestHKSVConsumer_SendTracking(t *testing.T) {
acc, ctrl := newTestSessionPair(t)
c := newHKSVConsumer()
c.session = acc
c.streamID = 1
c.seqNum = 2
c.active = true
data := []byte("12345678") // 8 bytes
done := make(chan struct{})
go func() {
defer close(done)
ctrl.ReadMessage()
}()
c.mu.Lock()
c.fragBuf = append(c.fragBuf, data...)
c.flushFragment()
c.mu.Unlock()
<-done
require.Equal(t, 8, c.Send, "Send counter should track bytes sent")
}
// --- Benchmarks ---
func BenchmarkHKSVConsumer_FlushFragment(b *testing.B) {
key := []byte(core.RandString(16, 0))
salt := core.RandString(32, 0)
c1, c2 := net.Pipe()
defer c1.Close()
defer c2.Close()
accConn, _ := hds.NewConn(c1, key, salt, false)
ctrlConn, _ := hds.NewConn(c2, key, salt, true)
acc := hds.NewSession(accConn)
go func() {
buf := make([]byte, 512*1024) // must be > 256KB chunk size
for {
if _, err := ctrlConn.Read(buf); err != nil {
return
}
}
}()
c := newHKSVConsumer()
c.session = acc
c.streamID = 1
c.seqNum = 2
c.active = true
gopData := make([]byte, 4*1024*1024) // 4MB GOP
b.SetBytes(int64(len(gopData)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
c.mu.Lock()
c.fragBuf = append(c.fragBuf[:0], gopData...)
c.flushFragment()
c.mu.Unlock()
}
}
func BenchmarkHKSVConsumer_BufferAppend(b *testing.B) {
c := newHKSVConsumer()
frame := make([]byte, 1500) // typical frame fragment
b.SetBytes(int64(len(frame)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
c.fragBuf = append(c.fragBuf, frame...)
if len(c.fragBuf) > 5*1024*1024 {
c.fragBuf = c.fragBuf[:0]
}
}
}
func BenchmarkHKSVConsumer_CreateAndStop(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
c := newHKSVConsumer()
_ = c.Stop()
}
}
+12 -12
View File
@@ -1,8 +1,7 @@
package hds
import (
"bufio"
"bytes"
"net"
"testing"
"github.com/AlexxIT/go2rtc/pkg/core"
@@ -13,22 +12,23 @@ func TestEncryption(t *testing.T) {
key := []byte(core.RandString(16, 0))
salt := core.RandString(32, 0)
c, err := Client(nil, key, salt, true)
c1, c2 := net.Pipe()
t.Cleanup(func() { c1.Close(); c2.Close() })
writer, err := NewConn(c1, key, salt, true)
require.NoError(t, err)
buf := bytes.NewBuffer(nil)
c.wr = bufio.NewWriter(buf)
n, err := c.Write([]byte("test"))
reader, err := NewConn(c2, key, salt, false)
require.NoError(t, err)
require.Equal(t, 4, n)
c, err = Client(nil, key, salt, false)
c.rd = bufio.NewReader(buf)
require.NoError(t, err)
go func() {
n, err := writer.Write([]byte("test"))
require.NoError(t, err)
require.Equal(t, 4, n)
}()
b := make([]byte, 32)
n, err = c.Read(b)
n, err := reader.Read(b)
require.NoError(t, err)
require.Equal(t, "test", string(b[:n]))
+486
View File
@@ -0,0 +1,486 @@
package hds
import (
"bytes"
"net"
"testing"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/stretchr/testify/require"
)
// newSessionPair creates a connected accessory/controller session pair for testing.
func newSessionPair(t *testing.T) (accessory *Session, controller *Session) {
t.Helper()
key := []byte(core.RandString(16, 0))
salt := core.RandString(32, 0)
c1, c2 := net.Pipe()
t.Cleanup(func() { c1.Close(); c2.Close() })
accConn, err := NewConn(c1, key, salt, false) // accessory
require.NoError(t, err)
ctrlConn, err := NewConn(c2, key, salt, true) // controller
require.NoError(t, err)
return NewSession(accConn), NewSession(ctrlConn)
}
// readLargeMsg reads a message using a large buffer (for messages with 256KB+ chunks).
// Session.ReadMessage uses 64KB which is too small for media chunks in tests.
func readLargeMsg(t *testing.T, s *Session) *Message {
t.Helper()
buf := make([]byte, 512*1024) // 512KB
n, err := s.conn.Read(buf)
require.NoError(t, err)
data := buf[:n]
require.GreaterOrEqual(t, len(data), 2)
headerLen := int(data[0])
require.GreaterOrEqual(t, len(data), 1+headerLen)
headerVal, err := OpackUnmarshal(data[1 : 1+headerLen])
require.NoError(t, err)
header := headerVal.(map[string]any)
msg := &Message{Protocol: opackString(header["protocol"])}
if topic, ok := header["event"]; ok {
msg.IsEvent = true
msg.Topic = opackString(topic)
} else if topic, ok := header["response"]; ok {
msg.Topic = opackString(topic)
msg.ID = opackInt(header["id"])
msg.Status = opackInt(header["status"])
} else if topic, ok := header["request"]; ok {
msg.Topic = opackString(topic)
msg.ID = opackInt(header["id"])
}
bodyData := data[1+headerLen:]
if len(bodyData) > 0 {
bodyVal, err := OpackUnmarshal(bodyData)
require.NoError(t, err)
if m, ok := bodyVal.(map[string]any); ok {
msg.Body = m
}
}
return msg
}
// extractPacket extracts data and metadata from a dataSend.data message body.
func extractPacket(t *testing.T, body map[string]any) (data []byte, metadata map[string]any) {
t.Helper()
packets, ok := body["packets"].([]any)
require.True(t, ok, "packets must be array")
require.Len(t, packets, 1)
pkt, ok := packets[0].(map[string]any)
require.True(t, ok, "packet element must be dict")
data, ok = pkt["data"].([]byte)
require.True(t, ok, "data must be []byte")
metadata, ok = pkt["metadata"].(map[string]any)
require.True(t, ok, "metadata must be dict")
return
}
// --- SendMediaInit tests ---
func TestSendMediaInit_Structure(t *testing.T) {
acc, ctrl := newSessionPair(t)
initData := bytes.Repeat([]byte{0xAB}, 100)
go func() {
require.NoError(t, acc.SendMediaInit(1, initData))
}()
msg, err := ctrl.ReadMessage()
require.NoError(t, err)
require.Equal(t, ProtoDataSend, msg.Protocol)
require.Equal(t, TopicData, msg.Topic)
require.True(t, msg.IsEvent)
require.Equal(t, int64(1), opackInt(msg.Body["streamId"]))
data, meta := extractPacket(t, msg.Body)
require.Equal(t, initData, data)
require.Equal(t, "mediaInitialization", opackString(meta["dataType"]))
require.Equal(t, int64(1), opackInt(meta["dataSequenceNumber"]))
require.Equal(t, int64(1), opackInt(meta["dataChunkSequenceNumber"]))
require.Equal(t, true, meta["isLastDataChunk"])
require.Equal(t, int64(len(initData)), opackInt(meta["dataTotalSize"]))
}
func TestSendMediaInit_AlwaysSeqOne(t *testing.T) {
acc, ctrl := newSessionPair(t)
go func() {
require.NoError(t, acc.SendMediaInit(42, []byte{1, 2, 3}))
}()
msg, err := ctrl.ReadMessage()
require.NoError(t, err)
_, meta := extractPacket(t, msg.Body)
require.Equal(t, int64(1), opackInt(meta["dataSequenceNumber"]))
require.Equal(t, int64(42), opackInt(msg.Body["streamId"]))
}
// --- SendMediaFragment single chunk tests ---
func TestSendMediaFragment_SingleChunk(t *testing.T) {
acc, ctrl := newSessionPair(t)
fragment := bytes.Repeat([]byte{0xCD}, 1000) // well under 256KB
go func() {
require.NoError(t, acc.SendMediaFragment(5, fragment, 3))
}()
msg, err := ctrl.ReadMessage()
require.NoError(t, err)
data, meta := extractPacket(t, msg.Body)
require.Equal(t, fragment, data)
require.Equal(t, "mediaFragment", opackString(meta["dataType"]))
require.Equal(t, int64(3), opackInt(meta["dataSequenceNumber"]))
require.Equal(t, int64(1), opackInt(meta["dataChunkSequenceNumber"]))
require.Equal(t, true, meta["isLastDataChunk"])
require.Equal(t, int64(1000), opackInt(meta["dataTotalSize"]))
}
// --- SendMediaFragment multi-chunk tests (using readLargeMsg) ---
func TestSendMediaFragment_MultipleChunks(t *testing.T) {
acc, ctrl := newSessionPair(t)
totalSize := maxChunkSize*2 + 100 // 2 full chunks + partial
fragment := make([]byte, totalSize)
for i := range fragment {
fragment[i] = byte(i % 251) // use prime to verify no data corruption
}
go func() {
require.NoError(t, acc.SendMediaFragment(1, fragment, 7))
}()
var assembled []byte
// Chunk 1: full 256KB
msg1 := readLargeMsg(t, ctrl)
data1, meta1 := extractPacket(t, msg1.Body)
require.Len(t, data1, maxChunkSize)
require.Equal(t, int64(1), opackInt(meta1["dataChunkSequenceNumber"]))
require.Equal(t, false, meta1["isLastDataChunk"])
require.Equal(t, int64(totalSize), opackInt(meta1["dataTotalSize"]))
require.Equal(t, int64(7), opackInt(meta1["dataSequenceNumber"]))
assembled = append(assembled, data1...)
// Chunk 2: full 256KB
msg2 := readLargeMsg(t, ctrl)
data2, meta2 := extractPacket(t, msg2.Body)
require.Len(t, data2, maxChunkSize)
require.Equal(t, int64(2), opackInt(meta2["dataChunkSequenceNumber"]))
require.Equal(t, false, meta2["isLastDataChunk"])
// dataTotalSize only in first chunk
_, hasTotalSize := meta2["dataTotalSize"]
require.False(t, hasTotalSize, "dataTotalSize should only be in first chunk")
assembled = append(assembled, data2...)
// Chunk 3: remaining 100 bytes
msg3 := readLargeMsg(t, ctrl)
data3, meta3 := extractPacket(t, msg3.Body)
require.Len(t, data3, 100)
require.Equal(t, int64(3), opackInt(meta3["dataChunkSequenceNumber"]))
require.Equal(t, true, meta3["isLastDataChunk"])
assembled = append(assembled, data3...)
require.Equal(t, fragment, assembled, "reassembled data must match original")
}
func TestSendMediaFragment_ExactChunkBoundary(t *testing.T) {
acc, ctrl := newSessionPair(t)
fragment := bytes.Repeat([]byte{0xAA}, maxChunkSize) // exactly 256KB
go func() {
require.NoError(t, acc.SendMediaFragment(1, fragment, 2))
}()
msg := readLargeMsg(t, ctrl)
data, meta := extractPacket(t, msg.Body)
require.Len(t, data, maxChunkSize)
require.Equal(t, int64(1), opackInt(meta["dataChunkSequenceNumber"]))
require.Equal(t, true, meta["isLastDataChunk"]) // single chunk
}
func TestSendMediaFragment_TwoExactChunks(t *testing.T) {
acc, ctrl := newSessionPair(t)
fragment := bytes.Repeat([]byte{0xBB}, maxChunkSize*2) // exactly 2 chunks
go func() {
require.NoError(t, acc.SendMediaFragment(1, fragment, 4))
}()
msg1 := readLargeMsg(t, ctrl)
_, meta1 := extractPacket(t, msg1.Body)
require.Equal(t, false, meta1["isLastDataChunk"])
require.Equal(t, int64(1), opackInt(meta1["dataChunkSequenceNumber"]))
msg2 := readLargeMsg(t, ctrl)
_, meta2 := extractPacket(t, msg2.Body)
require.Equal(t, true, meta2["isLastDataChunk"])
require.Equal(t, int64(2), opackInt(meta2["dataChunkSequenceNumber"]))
}
func TestSendMediaFragment_SequencePreserved(t *testing.T) {
acc, ctrl := newSessionPair(t)
// All chunks of a multi-chunk fragment share the same dataSequenceNumber
totalSize := maxChunkSize + 50
fragment := bytes.Repeat([]byte{0x11}, totalSize)
go func() {
require.NoError(t, acc.SendMediaFragment(1, fragment, 42))
}()
msg1 := readLargeMsg(t, ctrl)
_, meta1 := extractPacket(t, msg1.Body)
require.Equal(t, int64(42), opackInt(meta1["dataSequenceNumber"]))
msg2, err := ctrl.ReadMessage() // second chunk is small (50 bytes)
require.NoError(t, err)
_, meta2 := extractPacket(t, msg2.Body)
require.Equal(t, int64(42), opackInt(meta2["dataSequenceNumber"]))
}
// --- WriteEvent / WriteResponse / WriteRequest round-trip tests ---
func TestWriteEvent_ReadMessage(t *testing.T) {
acc, ctrl := newSessionPair(t)
go func() {
require.NoError(t, acc.WriteEvent("testProto", "testTopic", map[string]any{
"key": "value",
}))
}()
msg, err := ctrl.ReadMessage()
require.NoError(t, err)
require.Equal(t, "testProto", msg.Protocol)
require.Equal(t, "testTopic", msg.Topic)
require.True(t, msg.IsEvent)
require.Equal(t, "value", msg.Body["key"])
}
func TestWriteResponse_ReadMessage(t *testing.T) {
acc, ctrl := newSessionPair(t)
go func() {
require.NoError(t, acc.WriteResponse("proto", "topic", 5, 0, map[string]any{"ok": true}))
}()
msg, err := ctrl.ReadMessage()
require.NoError(t, err)
require.Equal(t, "proto", msg.Protocol)
require.Equal(t, "topic", msg.Topic)
require.Equal(t, int64(5), msg.ID)
require.Equal(t, int64(0), msg.Status)
require.False(t, msg.IsEvent)
require.Equal(t, true, msg.Body["ok"])
}
func TestWriteRequest_ReadMessage(t *testing.T) {
acc, ctrl := newSessionPair(t)
go func() {
id, err := acc.WriteRequest("proto", "topic", map[string]any{"x": int64(10)})
require.NoError(t, err)
require.Equal(t, int64(1), id) // first request
}()
msg, err := ctrl.ReadMessage()
require.NoError(t, err)
require.Equal(t, "proto", msg.Protocol)
require.Equal(t, "topic", msg.Topic)
require.Equal(t, int64(1), msg.ID)
require.False(t, msg.IsEvent)
}
func TestWriteRequest_IncrementingIDs(t *testing.T) {
acc, ctrl := newSessionPair(t)
go func() {
id1, _ := acc.WriteRequest("p", "t", nil)
id2, _ := acc.WriteRequest("p", "t", nil)
id3, _ := acc.WriteRequest("p", "t", nil)
require.Equal(t, int64(1), id1)
require.Equal(t, int64(2), id2)
require.Equal(t, int64(3), id3)
}()
for expected := int64(1); expected <= 3; expected++ {
msg, err := ctrl.ReadMessage()
require.NoError(t, err)
require.Equal(t, expected, msg.ID)
}
}
func TestWriteEvent_NilBody(t *testing.T) {
acc, ctrl := newSessionPair(t)
go func() {
require.NoError(t, acc.WriteEvent("p", "t", nil))
}()
msg, err := ctrl.ReadMessage()
require.NoError(t, err)
require.NotNil(t, msg.Body) // nil is replaced with empty map
}
func TestWriteResponse_NilBody(t *testing.T) {
acc, ctrl := newSessionPair(t)
go func() {
require.NoError(t, acc.WriteResponse("p", "t", 1, 0, nil))
}()
msg, err := ctrl.ReadMessage()
require.NoError(t, err)
require.NotNil(t, msg.Body)
}
// --- Helper tests ---
func TestOpackHelpers(t *testing.T) {
require.Equal(t, "", opackString(nil))
require.Equal(t, "", opackString(42))
require.Equal(t, "hello", opackString("hello"))
require.Equal(t, int64(0), opackInt(nil))
require.Equal(t, int64(0), opackInt("not a number"))
require.Equal(t, int64(42), opackInt(int64(42)))
require.Equal(t, int64(7), opackInt(int(7)))
require.Equal(t, int64(3), opackInt(float64(3.9)))
}
// --- Benchmarks ---
func BenchmarkSendMediaFragment_Small(b *testing.B) {
key := []byte(core.RandString(16, 0))
salt := core.RandString(32, 0)
c1, c2 := net.Pipe()
defer c1.Close()
defer c2.Close()
accConn, _ := NewConn(c1, key, salt, false)
ctrlConn, _ := NewConn(c2, key, salt, true)
acc := NewSession(accConn)
fragment := bytes.Repeat([]byte{0xAA}, 2000) // 2KB typical P-frame fragment
go func() {
buf := make([]byte, 64*1024)
for {
if _, err := ctrlConn.Read(buf); err != nil {
return
}
}
}()
b.SetBytes(int64(len(fragment)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = acc.SendMediaFragment(1, fragment, i)
}
}
func BenchmarkSendMediaFragment_Large(b *testing.B) {
key := []byte(core.RandString(16, 0))
salt := core.RandString(32, 0)
c1, c2 := net.Pipe()
defer c1.Close()
defer c2.Close()
accConn, _ := NewConn(c1, key, salt, false)
ctrlConn, _ := NewConn(c2, key, salt, true)
acc := NewSession(accConn)
fragment := bytes.Repeat([]byte{0xBB}, 5*1024*1024) // 5MB typical GOP
go func() {
buf := make([]byte, 512*1024)
for {
if _, err := ctrlConn.Read(buf); err != nil {
return
}
}
}()
b.SetBytes(int64(len(fragment)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = acc.SendMediaFragment(1, fragment, i)
}
}
func BenchmarkOpackMarshal_MediaBody(b *testing.B) {
data := bytes.Repeat([]byte{0xCC}, maxChunkSize)
body := map[string]any{
"streamId": 1,
"packets": []any{
map[string]any{
"data": data,
"metadata": map[string]any{
"dataType": "mediaFragment",
"dataSequenceNumber": 42,
"dataChunkSequenceNumber": 1,
"isLastDataChunk": true,
"dataTotalSize": len(data),
},
},
},
}
b.SetBytes(int64(len(data)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
OpackMarshal(body)
}
}
func BenchmarkWriteMessage(b *testing.B) {
key := []byte(core.RandString(16, 0))
salt := core.RandString(32, 0)
c1, c2 := net.Pipe()
defer c1.Close()
defer c2.Close()
accConn, _ := NewConn(c1, key, salt, false)
ctrlConn, _ := NewConn(c2, key, salt, true)
acc := NewSession(accConn)
go func() {
buf := make([]byte, 64*1024)
for {
if _, err := ctrlConn.Read(buf); err != nil {
return
}
}
}()
header := map[string]any{"protocol": "dataSend", "event": "data"}
body := map[string]any{"streamId": 1, "test": true}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = acc.WriteMessage(header, body)
}
}