diff --git a/internal/homekit/hksv.go b/internal/homekit/hksv.go index 81380970..d2656d3b 100644 --- a/internal/homekit/hksv.go +++ b/internal/homekit/hksv.go @@ -63,22 +63,38 @@ func (hs *hksvSession) handleOpen(streamID int) error { hs.stopRecording() } - consumer := newHKSVConsumer(hs.session, streamID) - hs.consumer = consumer + // Try to use the pre-started consumer from pair-verify + consumer := hs.server.takePreparedConsumer() + if consumer != nil { + log.Debug().Str("stream", hs.server.stream).Msg("[homekit] HKSV using prepared consumer") + hs.consumer = consumer + hs.server.AddConn(consumer) + + // Activate: set the HDS session and send init + start streaming + if err := consumer.activate(hs.session, streamID); err != nil { + log.Error().Err(err).Str("stream", hs.server.stream).Msg("[homekit] HKSV activate failed") + hs.stopRecording() + return nil + } + return nil + } + + // Fallback: create new consumer (will be slow ~3s) + log.Debug().Str("stream", hs.server.stream).Msg("[homekit] HKSV no prepared consumer, creating new") + consumer = newHKSVConsumer() stream := streams.Get(hs.server.stream) if err := stream.AddConsumer(consumer); err != nil { log.Error().Err(err).Str("stream", hs.server.stream).Msg("[homekit] HKSV add consumer failed") - hs.consumer = nil - return nil // don't kill the session + return nil } + hs.consumer = consumer hs.server.AddConn(consumer) - // wait for tracks to be added, then send init go func() { - if err := consumer.waitAndSendInit(); err != nil { - log.Error().Err(err).Str("stream", hs.server.stream).Msg("[homekit] HKSV send init failed") + if err := consumer.activate(hs.session, streamID); err != nil { + log.Error().Err(err).Str("stream", hs.server.stream).Msg("[homekit] HKSV activate failed") } }() @@ -107,19 +123,31 @@ func (hs *hksvSession) stopRecording() { hs.server.DelConn(consumer) } -// hksvConsumer implements core.Consumer, generates fMP4 and sends over HDS +// hksvConsumer implements core.Consumer, generates fMP4 and sends over HDS. +// It can be pre-started without an HDS session, buffering init data until activated. type hksvConsumer struct { core.Connection + muxer *mp4.Muxer + mu sync.Mutex + done chan struct{} + + // Set by activate() when HDS session is available session *hds.Session - muxer *mp4.Muxer streamID int seqNum int - mu sync.Mutex - start bool - done chan struct{} + active bool + start bool // waiting for first keyframe + + // GOP buffer - accumulate moof+mdat pairs, flush on next keyframe + fragBuf []byte + + // Pre-built init segment (built when tracks connect) + initData []byte + initErr error + initDone chan struct{} // closed when init is ready } -func newHKSVConsumer(session *hds.Session, streamID int) *hksvConsumer { +func newHKSVConsumer() *hksvConsumer { medias := []*core.Media{ { Kind: core.KindVideo, @@ -143,35 +171,42 @@ func newHKSVConsumer(session *hds.Session, streamID int) *hksvConsumer { Protocol: "hds", Medias: medias, }, - session: session, muxer: &mp4.Muxer{}, - streamID: streamID, done: make(chan struct{}), + initDone: make(chan struct{}), } } func (c *hksvConsumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error { trackID := byte(len(c.Senders)) + log.Debug().Str("codec", track.Codec.Name).Uint8("trackID", trackID).Msg("[homekit] HKSV AddTrack") + codec := track.Codec.Clone() handler := core.NewSender(media, codec) switch track.Codec.Name { case core.CodecH264: handler.Handler = func(packet *rtp.Packet) { + c.mu.Lock() + if !c.active { + c.mu.Unlock() + return + } if !c.start { if !h264.IsKeyframe(packet.Payload) { + c.mu.Unlock() return } c.start = true + log.Debug().Int("payloadLen", len(packet.Payload)).Msg("[homekit] HKSV first keyframe") + } else if h264.IsKeyframe(packet.Payload) && len(c.fragBuf) > 0 { + // New keyframe = flush previous GOP as one mediaFragment + c.flushFragment() } - c.mu.Lock() b := c.muxer.GetPayload(trackID, packet) - if err := c.session.SendMediaFragment(c.streamID, b, c.seqNum); err == nil { - c.Send += len(b) - c.seqNum++ - } + c.fragBuf = append(c.fragBuf, b...) c.mu.Unlock() } @@ -183,16 +218,14 @@ func (c *hksvConsumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Re case core.CodecAAC: handler.Handler = func(packet *rtp.Packet) { - if !c.start { + c.mu.Lock() + if !c.active || !c.start { + c.mu.Unlock() return } - c.mu.Lock() b := c.muxer.GetPayload(trackID, packet) - if err := c.session.SendMediaFragment(c.streamID, b, c.seqNum); err == nil { - c.Send += len(b) - c.seqNum++ - } + c.fragBuf = append(c.fragBuf, b...) c.mu.Unlock() } @@ -208,23 +241,72 @@ func (c *hksvConsumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Re handler.HandleRTP(track) c.Senders = append(c.Senders, handler) + // Build init segment when all expected tracks are ready (video + audio) + select { + case <-c.initDone: + // already built + default: + if len(c.Senders) >= len(c.Medias) { + initData, err := c.muxer.GetInit() + c.initData = initData + c.initErr = err + close(c.initDone) + if err != nil { + log.Error().Err(err).Msg("[homekit] HKSV GetInit failed") + } else { + log.Debug().Int("initSize", len(initData)).Int("tracks", len(c.Senders)).Msg("[homekit] HKSV init segment ready") + } + } + } + return nil } -func (c *hksvConsumer) waitAndSendInit() error { - // wait for at least one track to be added - for i := 0; i < 50; i++ { - if len(c.Senders) > 0 { - break - } - time.Sleep(100 * time.Millisecond) +// activate is called when the HDS session is ready (dataSend.open). +// It sends the pre-built init segment and starts streaming. +func (c *hksvConsumer) activate(session *hds.Session, streamID int) error { + // Wait for init to be ready (should already be done if consumer was pre-started) + select { + case <-c.initDone: + case <-time.After(5 * time.Second): + return io.ErrClosedPipe } - init, err := c.muxer.GetInit() - if err != nil { + if c.initErr != nil { + return c.initErr + } + + log.Debug().Int("initSize", len(c.initData)).Msg("[homekit] HKSV sending init segment") + + if err := session.SendMediaInit(streamID, c.initData); err != nil { return err } - return c.session.SendMediaInit(c.streamID, init) + + log.Debug().Msg("[homekit] HKSV init segment sent OK") + + // Enable live streaming (seqNum=2 because init used seqNum=1) + c.mu.Lock() + c.session = session + c.streamID = streamID + c.seqNum = 2 + c.active = true + c.mu.Unlock() + + return nil +} + +// flushFragment sends the accumulated GOP buffer as a single mediaFragment. +// Must be called while holding c.mu. +func (c *hksvConsumer) flushFragment() { + fragment := c.fragBuf + c.fragBuf = make([]byte, 0, len(fragment)) + + log.Debug().Int("fragSize", len(fragment)).Int("seq", c.seqNum).Msg("[homekit] HKSV flush fragment") + + if err := c.session.SendMediaFragment(c.streamID, fragment, c.seqNum); err == nil { + c.Send += len(fragment) + } + c.seqNum++ } func (c *hksvConsumer) WriteTo(io.Writer) (int64, error) { @@ -238,6 +320,9 @@ func (c *hksvConsumer) Stop() error { default: close(c.done) } + c.mu.Lock() + c.active = false + c.mu.Unlock() return c.Connection.Stop() } @@ -287,3 +372,60 @@ func (s *server) acceptHDS(hapConn *hap.Conn, ln net.Listener, salt string) { log.Debug().Err(err).Str("stream", s.stream).Msg("[homekit] HKSV session ended") } } + +// prepareHKSVConsumer pre-starts a consumer and adds it to the stream. +// When dataSend.open arrives, the consumer is ready immediately. +func (s *server) prepareHKSVConsumer() { + stream := streams.Get(s.stream) + if stream == nil { + return + } + + consumer := newHKSVConsumer() + + if err := stream.AddConsumer(consumer); err != nil { + log.Debug().Err(err).Str("stream", s.stream).Msg("[homekit] HKSV prepare consumer failed") + return + } + + log.Debug().Str("stream", s.stream).Msg("[homekit] HKSV consumer prepared") + + s.mu.Lock() + // Clean up any previous prepared consumer + if s.preparedConsumer != nil { + old := s.preparedConsumer + s.preparedConsumer = nil + s.mu.Unlock() + stream.RemoveConsumer(old) + _ = old.Stop() + s.mu.Lock() + } + s.preparedConsumer = consumer + s.mu.Unlock() + + // Keep alive until used or timeout (60 seconds) + select { + case <-consumer.done: + // consumer was stopped (used or server closed) + case <-time.After(60 * time.Second): + // timeout: clean up unused prepared consumer + s.mu.Lock() + if s.preparedConsumer == consumer { + s.preparedConsumer = nil + s.mu.Unlock() + stream.RemoveConsumer(consumer) + _ = consumer.Stop() + log.Debug().Str("stream", s.stream).Msg("[homekit] HKSV prepared consumer expired") + } else { + s.mu.Unlock() + } + } +} + +func (s *server) takePreparedConsumer() *hksvConsumer { + s.mu.Lock() + defer s.mu.Unlock() + consumer := s.preparedConsumer + s.preparedConsumer = nil + return consumer +} diff --git a/internal/homekit/server.go b/internal/homekit/server.go index 44c92578..47f445c1 100644 --- a/internal/homekit/server.go +++ b/internal/homekit/server.go @@ -45,10 +45,12 @@ type server struct { stream string // stream name from YAML // HKSV fields - motionMode string // "api", "continuous", "detect" - motionThreshold float64 // ratio threshold for "detect" mode (default 2.0) - motionDetector *motionDetector - hksvSession *hksvSession + motionMode string // "api", "continuous", "detect" + motionThreshold float64 // ratio threshold for "detect" mode (default 2.0) + motionDetector *motionDetector + hksvSession *hksvSession + continuousMotion bool + preparedConsumer *hksvConsumer } func (s *server) MarshalJSON() ([]byte, error) { @@ -113,9 +115,13 @@ func (s *server) Handle(w http.ResponseWriter, r *http.Request) { s.AddConn(controller) defer s.DelConn(controller) - // start motion detector on first Home Hub connection - if s.motionMode == "detect" { + // start motion on first Home Hub connection + switch s.motionMode { + case "detect": go s.startMotionDetector() + case "continuous": + go s.prepareHKSVConsumer() + go s.startContinuousMotion() } var handler homekit.HandlerFunc @@ -510,7 +516,21 @@ func (s *server) stopMotionDetector() { } } + func (s *server) startContinuousMotion() { + s.mu.Lock() + if s.continuousMotion { + s.mu.Unlock() + return + } + s.continuousMotion = true + s.mu.Unlock() + + log.Debug().Str("stream", s.stream).Msg("[homekit] continuous motion started") + + // delay to allow Home Hub to subscribe to events + time.Sleep(5 * time.Second) + s.SetMotionDetected(true) ticker := time.NewTicker(30 * time.Second) diff --git a/pkg/hap/camera/accessory.go b/pkg/hap/camera/accessory.go index 35f4a0de..da03d522 100644 --- a/pkg/hap/camera/accessory.go +++ b/pkg/hap/camera/accessory.go @@ -40,10 +40,8 @@ func NewHKSVAccessory(manuf, model, name, serial, firmware string) *hap.Accessor } acc.InitIID() - // CameraOperatingMode links to RTPStreamManagement and RecordingManagement - operatingMode.Linked = []int{int(rtpStream.IID), int(recordingMgmt.IID)} - // CameraEventRecordingManagement links to DataStreamManagement and MotionSensor - recordingMgmt.Linked = []int{int(dataStreamMgmt.IID), int(motionSensor.IID)} + // HAP-NodeJS: only RecordingManagement links to DataStreamManagement + recordingMgmt.Linked = []int{int(dataStreamMgmt.IID)} return acc } @@ -71,10 +69,8 @@ func NewHKSVDoorbellAccessory(manuf, model, name, serial, firmware string) *hap. } acc.InitIID() - // CameraOperatingMode links to RTPStreamManagement and RecordingManagement - operatingMode.Linked = []int{int(rtpStream.IID), int(recordingMgmt.IID)} - // CameraEventRecordingManagement links to DataStreamManagement, MotionSensor, and Doorbell - recordingMgmt.Linked = []int{int(dataStreamMgmt.IID), int(motionSensor.IID), int(doorbell.IID)} + // HAP-NodeJS: only RecordingManagement links to DataStreamManagement + recordingMgmt.Linked = []int{int(dataStreamMgmt.IID)} return acc } diff --git a/pkg/hap/camera/services_hksv.go b/pkg/hap/camera/services_hksv.go index d05c44c4..c858e893 100644 --- a/pkg/hap/camera/services_hksv.go +++ b/pkg/hap/camera/services_hksv.go @@ -104,6 +104,49 @@ func ServiceCameraEventRecordingManagement() *hap.Service { }, }) + // Default selected recording configuration (Home Hub expects this to persist) + val209, _ := tlv8.MarshalBase64(SelectedCameraRecordingConfiguration{ + GeneralConfig: SupportedCameraRecordingConfiguration{ + PrebufferLength: 4000, + EventTriggerOptions: 0x01, // motion + MediaContainerConfigurations: MediaContainerConfigurations{ + MediaContainerType: 0, + MediaContainerParameters: MediaContainerParameters{ + FragmentLength: 4000, + }, + }, + }, + VideoConfig: SupportedVideoRecordingConfiguration{ + CodecConfigs: []VideoRecordingCodecConfiguration{ + { + CodecType: VideoCodecTypeH264, + CodecParams: VideoRecordingCodecParameters{ + ProfileID: VideoCodecProfileHigh, + Level: VideoCodecLevel40, + Bitrate: 2000, + IFrameInterval: 4000, + }, + CodecAttrs: VideoCodecAttributes{Width: 1920, Height: 1080, Framerate: 30}, + }, + }, + }, + AudioConfig: SupportedAudioRecordingConfiguration{ + CodecConfigs: []AudioRecordingCodecConfiguration{ + { + CodecType: AudioRecordingCodecTypeAACLC, + CodecParams: []AudioRecordingCodecParameters{ + { + Channels: 1, + BitrateMode: []byte{AudioCodecBitrateVariable}, + SampleRate: []byte{AudioRecordingSampleRate24Khz}, + MaxAudioBitrate: []uint32{64}, + }, + }, + }, + }, + }, + }) + return &hap.Service{ Type: "204", Characters: []*hap.Character{ @@ -134,7 +177,7 @@ func ServiceCameraEventRecordingManagement() *hap.Service { { Type: TypeSelectedCameraRecordingConfiguration, Format: hap.FormatTLV8, - Value: "", + Value: val209, Perms: hap.EVPRPW, }, { diff --git a/pkg/hap/hds/protocol.go b/pkg/hap/hds/protocol.go index 42520332..5a3934e7 100644 --- a/pkg/hap/hds/protocol.go +++ b/pkg/hap/hds/protocol.go @@ -163,27 +163,59 @@ func (s *Session) WriteRequest(protocol, topic string, body map[string]any) (int return id, s.WriteMessage(header, body) } +// maxChunkSize is the maximum data chunk size for HDS media transfer (256 KiB) +const maxChunkSize = 0x40000 + // SendMediaInit sends the fMP4 initialization segment (ftyp+moov) func (s *Session) SendMediaInit(streamID int, initData []byte) error { - return s.WriteEvent(ProtoDataSend, TopicData, map[string]any{ - "streamId": streamID, - "packets": 1, - "type": "mediaInitialization", - "data": initData, - }) + return s.sendMediaData(streamID, "mediaInitialization", initData, 1) } -// SendMediaFragment sends an fMP4 fragment (moof+mdat) +// SendMediaFragment sends an fMP4 fragment (moof+mdat), splitting into chunks if needed func (s *Session) SendMediaFragment(streamID int, fragment []byte, sequence int) error { - return s.WriteEvent(ProtoDataSend, TopicData, map[string]any{ - "streamId": streamID, - "packets": 1, - "type": "mediaFragment", - "data": fragment, - "dataSequenceNumber": sequence, - "isLastDataChunk": true, - "dataChunkSequenceNumber": 0, - }) + return s.sendMediaData(streamID, "mediaFragment", fragment, sequence) +} + +// sendMediaData sends media data with proper HAP-NodeJS compatible packet structure. +// Large data is split into chunks of maxChunkSize bytes. +func (s *Session) sendMediaData(streamID int, dataType string, data []byte, sequence int) error { + totalSize := len(data) + chunkSeq := 1 + + for offset := 0; offset < totalSize; offset += maxChunkSize { + end := offset + maxChunkSize + if end > totalSize { + end = totalSize + } + chunk := data[offset:end] + isLast := end >= totalSize + + metadata := map[string]any{ + "dataType": dataType, + "dataSequenceNumber": sequence, + "dataChunkSequenceNumber": chunkSeq, + "isLastDataChunk": isLast, + } + if chunkSeq == 1 { + metadata["dataTotalSize"] = totalSize + } + + body := map[string]any{ + "streamId": streamID, + "packets": []any{ + map[string]any{ + "data": chunk, + "metadata": metadata, + }, + }, + } + + if err := s.WriteEvent(ProtoDataSend, TopicData, body); err != nil { + return err + } + chunkSeq++ + } + return nil } // Run processes incoming HDS messages in a loop diff --git a/pkg/homekit/server.go b/pkg/homekit/server.go index cf8aaeb0..257245a8 100644 --- a/pkg/homekit/server.go +++ b/pkg/homekit/server.go @@ -69,12 +69,15 @@ func ServerHandler(server Server) HandlerFunc { IID uint64 `json:"iid"` Value any `json:"value"` Event any `json:"ev"` + R *bool `json:"r,omitempty"` } `json:"characteristics"` } if err := json.NewDecoder(req.Body).Decode(&v); err != nil { return nil, err } + var writeResponses []hap.JSONCharacter + for _, c := range v.Value { if c.Value != nil { server.SetCharacteristic(conn, c.AID, c.IID, c.Value) @@ -93,6 +96,25 @@ func ServerHandler(server Server) HandlerFunc { } } } + if c.R != nil && *c.R { + // write-response: return updated value + accs := server.GetAccessories(conn) + for _, acc := range accs { + if char := acc.GetCharacterByID(c.IID); char != nil { + writeResponses = append(writeResponses, hap.JSONCharacter{ + AID: c.AID, + IID: c.IID, + Status: 0, + Value: char.Value, + }) + break + } + } + } + } + + if len(writeResponses) > 0 { + return makeResponse(hap.MimeJSON, hap.JSONCharacters{Value: writeResponses}) } res := &http.Response{