fix(homekit): fix HKSV recording by correcting HDS protocol and adding GOP buffering

The HKSV recording was failing because:
1. The dataSend.data message structure was wrong - `packets` was a flat integer
   instead of an array of objects with `data` and `metadata` fields matching
   the HAP-NodeJS specification
2. Each video/audio frame was sent as a separate mediaFragment, but Home Hub
   expects GOP-based fragments (~2-4 seconds of accumulated data)
3. Large fragments were not chunked (max 256 KiB per chunk)

Changes:
- Fix HDS dataSend.data message structure to use proper packets array with
  nested data/metadata (dataType, dataSequenceNumber, dataChunkSequenceNumber,
  isLastDataChunk, dataTotalSize)
- Add 256 KiB chunking for large media fragments
- Buffer moof+mdat pairs in hksvConsumer and flush on keyframe boundaries
  (GOP-based fragmentation)
- Pre-start consumer at pair-verify for instant init segment delivery
- Add write-response support to HAP PUT handler for ch131 DataStream setup
- Fix HAP service linking to match HAP-NodeJS reference
- Add default SelectedCameraRecordingConfiguration (ch209) value
- Start continuous motion generator at pair-verify with dedup protection
This commit is contained in:
Sergey Krashevich
2026-03-05 06:25:00 +03:00
parent 35fd1383c8
commit 1856b7ace4
6 changed files with 322 additions and 67 deletions
+178 -36
View File
@@ -63,22 +63,38 @@ func (hs *hksvSession) handleOpen(streamID int) error {
hs.stopRecording()
}
consumer := newHKSVConsumer(hs.session, streamID)
hs.consumer = consumer
// Try to use the pre-started consumer from pair-verify
consumer := hs.server.takePreparedConsumer()
if consumer != nil {
log.Debug().Str("stream", hs.server.stream).Msg("[homekit] HKSV using prepared consumer")
hs.consumer = consumer
hs.server.AddConn(consumer)
// Activate: set the HDS session and send init + start streaming
if err := consumer.activate(hs.session, streamID); err != nil {
log.Error().Err(err).Str("stream", hs.server.stream).Msg("[homekit] HKSV activate failed")
hs.stopRecording()
return nil
}
return nil
}
// Fallback: create new consumer (will be slow ~3s)
log.Debug().Str("stream", hs.server.stream).Msg("[homekit] HKSV no prepared consumer, creating new")
consumer = newHKSVConsumer()
stream := streams.Get(hs.server.stream)
if err := stream.AddConsumer(consumer); err != nil {
log.Error().Err(err).Str("stream", hs.server.stream).Msg("[homekit] HKSV add consumer failed")
hs.consumer = nil
return nil // don't kill the session
return nil
}
hs.consumer = consumer
hs.server.AddConn(consumer)
// wait for tracks to be added, then send init
go func() {
if err := consumer.waitAndSendInit(); err != nil {
log.Error().Err(err).Str("stream", hs.server.stream).Msg("[homekit] HKSV send init failed")
if err := consumer.activate(hs.session, streamID); err != nil {
log.Error().Err(err).Str("stream", hs.server.stream).Msg("[homekit] HKSV activate failed")
}
}()
@@ -107,19 +123,31 @@ func (hs *hksvSession) stopRecording() {
hs.server.DelConn(consumer)
}
// hksvConsumer implements core.Consumer, generates fMP4 and sends over HDS
// hksvConsumer implements core.Consumer, generates fMP4 and sends over HDS.
// It can be pre-started without an HDS session, buffering init data until activated.
type hksvConsumer struct {
core.Connection
muxer *mp4.Muxer
mu sync.Mutex
done chan struct{}
// Set by activate() when HDS session is available
session *hds.Session
muxer *mp4.Muxer
streamID int
seqNum int
mu sync.Mutex
start bool
done chan struct{}
active bool
start bool // waiting for first keyframe
// GOP buffer - accumulate moof+mdat pairs, flush on next keyframe
fragBuf []byte
// Pre-built init segment (built when tracks connect)
initData []byte
initErr error
initDone chan struct{} // closed when init is ready
}
func newHKSVConsumer(session *hds.Session, streamID int) *hksvConsumer {
func newHKSVConsumer() *hksvConsumer {
medias := []*core.Media{
{
Kind: core.KindVideo,
@@ -143,35 +171,42 @@ func newHKSVConsumer(session *hds.Session, streamID int) *hksvConsumer {
Protocol: "hds",
Medias: medias,
},
session: session,
muxer: &mp4.Muxer{},
streamID: streamID,
done: make(chan struct{}),
initDone: make(chan struct{}),
}
}
func (c *hksvConsumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error {
trackID := byte(len(c.Senders))
log.Debug().Str("codec", track.Codec.Name).Uint8("trackID", trackID).Msg("[homekit] HKSV AddTrack")
codec := track.Codec.Clone()
handler := core.NewSender(media, codec)
switch track.Codec.Name {
case core.CodecH264:
handler.Handler = func(packet *rtp.Packet) {
c.mu.Lock()
if !c.active {
c.mu.Unlock()
return
}
if !c.start {
if !h264.IsKeyframe(packet.Payload) {
c.mu.Unlock()
return
}
c.start = true
log.Debug().Int("payloadLen", len(packet.Payload)).Msg("[homekit] HKSV first keyframe")
} else if h264.IsKeyframe(packet.Payload) && len(c.fragBuf) > 0 {
// New keyframe = flush previous GOP as one mediaFragment
c.flushFragment()
}
c.mu.Lock()
b := c.muxer.GetPayload(trackID, packet)
if err := c.session.SendMediaFragment(c.streamID, b, c.seqNum); err == nil {
c.Send += len(b)
c.seqNum++
}
c.fragBuf = append(c.fragBuf, b...)
c.mu.Unlock()
}
@@ -183,16 +218,14 @@ func (c *hksvConsumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Re
case core.CodecAAC:
handler.Handler = func(packet *rtp.Packet) {
if !c.start {
c.mu.Lock()
if !c.active || !c.start {
c.mu.Unlock()
return
}
c.mu.Lock()
b := c.muxer.GetPayload(trackID, packet)
if err := c.session.SendMediaFragment(c.streamID, b, c.seqNum); err == nil {
c.Send += len(b)
c.seqNum++
}
c.fragBuf = append(c.fragBuf, b...)
c.mu.Unlock()
}
@@ -208,23 +241,72 @@ func (c *hksvConsumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Re
handler.HandleRTP(track)
c.Senders = append(c.Senders, handler)
// Build init segment when all expected tracks are ready (video + audio)
select {
case <-c.initDone:
// already built
default:
if len(c.Senders) >= len(c.Medias) {
initData, err := c.muxer.GetInit()
c.initData = initData
c.initErr = err
close(c.initDone)
if err != nil {
log.Error().Err(err).Msg("[homekit] HKSV GetInit failed")
} else {
log.Debug().Int("initSize", len(initData)).Int("tracks", len(c.Senders)).Msg("[homekit] HKSV init segment ready")
}
}
}
return nil
}
func (c *hksvConsumer) waitAndSendInit() error {
// wait for at least one track to be added
for i := 0; i < 50; i++ {
if len(c.Senders) > 0 {
break
}
time.Sleep(100 * time.Millisecond)
// activate is called when the HDS session is ready (dataSend.open).
// It sends the pre-built init segment and starts streaming.
func (c *hksvConsumer) activate(session *hds.Session, streamID int) error {
// Wait for init to be ready (should already be done if consumer was pre-started)
select {
case <-c.initDone:
case <-time.After(5 * time.Second):
return io.ErrClosedPipe
}
init, err := c.muxer.GetInit()
if err != nil {
if c.initErr != nil {
return c.initErr
}
log.Debug().Int("initSize", len(c.initData)).Msg("[homekit] HKSV sending init segment")
if err := session.SendMediaInit(streamID, c.initData); err != nil {
return err
}
return c.session.SendMediaInit(c.streamID, init)
log.Debug().Msg("[homekit] HKSV init segment sent OK")
// Enable live streaming (seqNum=2 because init used seqNum=1)
c.mu.Lock()
c.session = session
c.streamID = streamID
c.seqNum = 2
c.active = true
c.mu.Unlock()
return nil
}
// flushFragment sends the accumulated GOP buffer as a single mediaFragment.
// Must be called while holding c.mu.
func (c *hksvConsumer) flushFragment() {
fragment := c.fragBuf
c.fragBuf = make([]byte, 0, len(fragment))
log.Debug().Int("fragSize", len(fragment)).Int("seq", c.seqNum).Msg("[homekit] HKSV flush fragment")
if err := c.session.SendMediaFragment(c.streamID, fragment, c.seqNum); err == nil {
c.Send += len(fragment)
}
c.seqNum++
}
func (c *hksvConsumer) WriteTo(io.Writer) (int64, error) {
@@ -238,6 +320,9 @@ func (c *hksvConsumer) Stop() error {
default:
close(c.done)
}
c.mu.Lock()
c.active = false
c.mu.Unlock()
return c.Connection.Stop()
}
@@ -287,3 +372,60 @@ func (s *server) acceptHDS(hapConn *hap.Conn, ln net.Listener, salt string) {
log.Debug().Err(err).Str("stream", s.stream).Msg("[homekit] HKSV session ended")
}
}
// prepareHKSVConsumer pre-starts a consumer and adds it to the stream.
// When dataSend.open arrives, the consumer is ready immediately.
func (s *server) prepareHKSVConsumer() {
stream := streams.Get(s.stream)
if stream == nil {
return
}
consumer := newHKSVConsumer()
if err := stream.AddConsumer(consumer); err != nil {
log.Debug().Err(err).Str("stream", s.stream).Msg("[homekit] HKSV prepare consumer failed")
return
}
log.Debug().Str("stream", s.stream).Msg("[homekit] HKSV consumer prepared")
s.mu.Lock()
// Clean up any previous prepared consumer
if s.preparedConsumer != nil {
old := s.preparedConsumer
s.preparedConsumer = nil
s.mu.Unlock()
stream.RemoveConsumer(old)
_ = old.Stop()
s.mu.Lock()
}
s.preparedConsumer = consumer
s.mu.Unlock()
// Keep alive until used or timeout (60 seconds)
select {
case <-consumer.done:
// consumer was stopped (used or server closed)
case <-time.After(60 * time.Second):
// timeout: clean up unused prepared consumer
s.mu.Lock()
if s.preparedConsumer == consumer {
s.preparedConsumer = nil
s.mu.Unlock()
stream.RemoveConsumer(consumer)
_ = consumer.Stop()
log.Debug().Str("stream", s.stream).Msg("[homekit] HKSV prepared consumer expired")
} else {
s.mu.Unlock()
}
}
}
func (s *server) takePreparedConsumer() *hksvConsumer {
s.mu.Lock()
defer s.mu.Unlock()
consumer := s.preparedConsumer
s.preparedConsumer = nil
return consumer
}
+26 -6
View File
@@ -45,10 +45,12 @@ type server struct {
stream string // stream name from YAML
// HKSV fields
motionMode string // "api", "continuous", "detect"
motionThreshold float64 // ratio threshold for "detect" mode (default 2.0)
motionDetector *motionDetector
hksvSession *hksvSession
motionMode string // "api", "continuous", "detect"
motionThreshold float64 // ratio threshold for "detect" mode (default 2.0)
motionDetector *motionDetector
hksvSession *hksvSession
continuousMotion bool
preparedConsumer *hksvConsumer
}
func (s *server) MarshalJSON() ([]byte, error) {
@@ -113,9 +115,13 @@ func (s *server) Handle(w http.ResponseWriter, r *http.Request) {
s.AddConn(controller)
defer s.DelConn(controller)
// start motion detector on first Home Hub connection
if s.motionMode == "detect" {
// start motion on first Home Hub connection
switch s.motionMode {
case "detect":
go s.startMotionDetector()
case "continuous":
go s.prepareHKSVConsumer()
go s.startContinuousMotion()
}
var handler homekit.HandlerFunc
@@ -510,7 +516,21 @@ func (s *server) stopMotionDetector() {
}
}
func (s *server) startContinuousMotion() {
s.mu.Lock()
if s.continuousMotion {
s.mu.Unlock()
return
}
s.continuousMotion = true
s.mu.Unlock()
log.Debug().Str("stream", s.stream).Msg("[homekit] continuous motion started")
// delay to allow Home Hub to subscribe to events
time.Sleep(5 * time.Second)
s.SetMotionDetected(true)
ticker := time.NewTicker(30 * time.Second)
+4 -8
View File
@@ -40,10 +40,8 @@ func NewHKSVAccessory(manuf, model, name, serial, firmware string) *hap.Accessor
}
acc.InitIID()
// CameraOperatingMode links to RTPStreamManagement and RecordingManagement
operatingMode.Linked = []int{int(rtpStream.IID), int(recordingMgmt.IID)}
// CameraEventRecordingManagement links to DataStreamManagement and MotionSensor
recordingMgmt.Linked = []int{int(dataStreamMgmt.IID), int(motionSensor.IID)}
// HAP-NodeJS: only RecordingManagement links to DataStreamManagement
recordingMgmt.Linked = []int{int(dataStreamMgmt.IID)}
return acc
}
@@ -71,10 +69,8 @@ func NewHKSVDoorbellAccessory(manuf, model, name, serial, firmware string) *hap.
}
acc.InitIID()
// CameraOperatingMode links to RTPStreamManagement and RecordingManagement
operatingMode.Linked = []int{int(rtpStream.IID), int(recordingMgmt.IID)}
// CameraEventRecordingManagement links to DataStreamManagement, MotionSensor, and Doorbell
recordingMgmt.Linked = []int{int(dataStreamMgmt.IID), int(motionSensor.IID), int(doorbell.IID)}
// HAP-NodeJS: only RecordingManagement links to DataStreamManagement
recordingMgmt.Linked = []int{int(dataStreamMgmt.IID)}
return acc
}
+44 -1
View File
@@ -104,6 +104,49 @@ func ServiceCameraEventRecordingManagement() *hap.Service {
},
})
// Default selected recording configuration (Home Hub expects this to persist)
val209, _ := tlv8.MarshalBase64(SelectedCameraRecordingConfiguration{
GeneralConfig: SupportedCameraRecordingConfiguration{
PrebufferLength: 4000,
EventTriggerOptions: 0x01, // motion
MediaContainerConfigurations: MediaContainerConfigurations{
MediaContainerType: 0,
MediaContainerParameters: MediaContainerParameters{
FragmentLength: 4000,
},
},
},
VideoConfig: SupportedVideoRecordingConfiguration{
CodecConfigs: []VideoRecordingCodecConfiguration{
{
CodecType: VideoCodecTypeH264,
CodecParams: VideoRecordingCodecParameters{
ProfileID: VideoCodecProfileHigh,
Level: VideoCodecLevel40,
Bitrate: 2000,
IFrameInterval: 4000,
},
CodecAttrs: VideoCodecAttributes{Width: 1920, Height: 1080, Framerate: 30},
},
},
},
AudioConfig: SupportedAudioRecordingConfiguration{
CodecConfigs: []AudioRecordingCodecConfiguration{
{
CodecType: AudioRecordingCodecTypeAACLC,
CodecParams: []AudioRecordingCodecParameters{
{
Channels: 1,
BitrateMode: []byte{AudioCodecBitrateVariable},
SampleRate: []byte{AudioRecordingSampleRate24Khz},
MaxAudioBitrate: []uint32{64},
},
},
},
},
},
})
return &hap.Service{
Type: "204",
Characters: []*hap.Character{
@@ -134,7 +177,7 @@ func ServiceCameraEventRecordingManagement() *hap.Service {
{
Type: TypeSelectedCameraRecordingConfiguration,
Format: hap.FormatTLV8,
Value: "",
Value: val209,
Perms: hap.EVPRPW,
},
{
+48 -16
View File
@@ -163,27 +163,59 @@ func (s *Session) WriteRequest(protocol, topic string, body map[string]any) (int
return id, s.WriteMessage(header, body)
}
// maxChunkSize is the maximum data chunk size for HDS media transfer (256 KiB)
const maxChunkSize = 0x40000
// SendMediaInit sends the fMP4 initialization segment (ftyp+moov)
func (s *Session) SendMediaInit(streamID int, initData []byte) error {
return s.WriteEvent(ProtoDataSend, TopicData, map[string]any{
"streamId": streamID,
"packets": 1,
"type": "mediaInitialization",
"data": initData,
})
return s.sendMediaData(streamID, "mediaInitialization", initData, 1)
}
// SendMediaFragment sends an fMP4 fragment (moof+mdat)
// SendMediaFragment sends an fMP4 fragment (moof+mdat), splitting into chunks if needed
func (s *Session) SendMediaFragment(streamID int, fragment []byte, sequence int) error {
return s.WriteEvent(ProtoDataSend, TopicData, map[string]any{
"streamId": streamID,
"packets": 1,
"type": "mediaFragment",
"data": fragment,
"dataSequenceNumber": sequence,
"isLastDataChunk": true,
"dataChunkSequenceNumber": 0,
})
return s.sendMediaData(streamID, "mediaFragment", fragment, sequence)
}
// sendMediaData sends media data with proper HAP-NodeJS compatible packet structure.
// Large data is split into chunks of maxChunkSize bytes.
func (s *Session) sendMediaData(streamID int, dataType string, data []byte, sequence int) error {
totalSize := len(data)
chunkSeq := 1
for offset := 0; offset < totalSize; offset += maxChunkSize {
end := offset + maxChunkSize
if end > totalSize {
end = totalSize
}
chunk := data[offset:end]
isLast := end >= totalSize
metadata := map[string]any{
"dataType": dataType,
"dataSequenceNumber": sequence,
"dataChunkSequenceNumber": chunkSeq,
"isLastDataChunk": isLast,
}
if chunkSeq == 1 {
metadata["dataTotalSize"] = totalSize
}
body := map[string]any{
"streamId": streamID,
"packets": []any{
map[string]any{
"data": chunk,
"metadata": metadata,
},
},
}
if err := s.WriteEvent(ProtoDataSend, TopicData, body); err != nil {
return err
}
chunkSeq++
}
return nil
}
// Run processes incoming HDS messages in a loop
+22
View File
@@ -69,12 +69,15 @@ func ServerHandler(server Server) HandlerFunc {
IID uint64 `json:"iid"`
Value any `json:"value"`
Event any `json:"ev"`
R *bool `json:"r,omitempty"`
} `json:"characteristics"`
}
if err := json.NewDecoder(req.Body).Decode(&v); err != nil {
return nil, err
}
var writeResponses []hap.JSONCharacter
for _, c := range v.Value {
if c.Value != nil {
server.SetCharacteristic(conn, c.AID, c.IID, c.Value)
@@ -93,6 +96,25 @@ func ServerHandler(server Server) HandlerFunc {
}
}
}
if c.R != nil && *c.R {
// write-response: return updated value
accs := server.GetAccessories(conn)
for _, acc := range accs {
if char := acc.GetCharacterByID(c.IID); char != nil {
writeResponses = append(writeResponses, hap.JSONCharacter{
AID: c.AID,
IID: c.IID,
Status: 0,
Value: char.Value,
})
break
}
}
}
}
if len(writeResponses) > 0 {
return makeResponse(hap.MimeJSON, hap.JSONCharacters{Value: writeResponses})
}
res := &http.Response{