fix: 修改通知关键帧事件, 使用请求体携带关键帧数据, 替代之前的转json携带;

This commit is contained in:
ydajiang
2025-12-12 14:29:21 +08:00
parent 6310103891
commit 96bbaea91d
6 changed files with 80 additions and 48 deletions
+13 -3
View File
@@ -1,9 +1,9 @@
package gb28181
import (
"bytes"
"encoding/json"
"fmt"
"github.com/lkmio/lkm/stream"
"github.com/lkmio/transport"
"net"
"net/http"
@@ -13,6 +13,16 @@ import (
"time"
)
func DoPost(url string, body []byte) (*http.Response, error) {
client := &http.Client{}
request, err := http.NewRequest("post", url, bytes.NewBuffer(body))
if err != nil {
return nil, err
}
request.Header.Set("Content-Type", "application/json")
return client.Do(request)
}
func callForward(source, setup, addr string) string {
v := &struct {
Source string `json:"source"` //GetSourceID
@@ -31,7 +41,7 @@ func callForward(source, setup, addr string) string {
panic(err)
}
response, err := stream.SendHookEvent("http://localhost:8080/api/v1/gb28181/forward", body)
response, err := DoPost("http://localhost:8080/api/v1/gb28181/forward", body)
if err != nil {
panic(err)
}
@@ -76,7 +86,7 @@ func closeForwardSink(source, sink string) {
panic(err)
}
_, err = stream.SendHookEvent("http://localhost:8080/api/v1/sink/close", body)
_, err = DoPost("http://localhost:8080/api/v1/sink/close", body)
if err != nil {
panic(err)
}
+22
View File
@@ -364,3 +364,25 @@ func TestDecode(t *testing.T) {
}
})
}
func TestDecodeRtpOverTCPRaw(t *testing.T) {
file, err := os.ReadFile("./ptz_pw.raw")
if err != nil {
panic(err)
}
f, err := os.OpenFile("./ptz.ps", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
for i := 2; i < len(file); i += 2 {
size := binary.BigEndian.Uint16(file[i-2:])
if len(file)-i < int(size) {
break
}
f.Write(file[i+12 : i+int(size)])
i += int(size)
}
f.Close()
}
+1 -1
View File
@@ -159,7 +159,7 @@ func main() {
if stream.AppConfig.Hooks.IsEnableOnStarted() {
go func() {
_, _ = stream.PostHookEvent(stream.HookEventStarted, "", nil)
_, _ = stream.PostHookEventWithJson(stream.HookEventStarted, "", nil)
}()
}
+27 -27
View File
@@ -28,36 +28,13 @@ func responseBodyToString(resp *http.Response) string {
return string(bodyBytes)
}
func DoPost(url string, body []byte) (*http.Response, error) {
func DoPostHookEvent(event HookEvent, req *http.Request, dumpBody []byte) (*http.Response, error) {
client := &http.Client{
Timeout: time.Duration(AppConfig.Hooks.Timeout),
}
request, err := http.NewRequest("post", url, bytes.NewBuffer(body))
if err != nil {
return nil, err
}
request.Header.Set("Content-Type", "application/json")
return client.Do(request)
}
func PostHookEvent(event HookEvent, params string, body interface{}) (*http.Response, error) {
url, ok := hookUrls[event]
if url == "" || !ok {
return nil, fmt.Errorf("the url for this %s event does not exist", event.ToString())
}
bytes, err := json.Marshal(body)
if err != nil {
return nil, err
}
if "" != params {
url += "?" + params
}
log.Sugar.Infof("sent a hook event for %s. url: %s body: %s", event.ToString(), url, bytes)
response, err := DoPost(url, bytes)
log.Sugar.Infof("sent a hook event for %s. url: %s body: %s", event.ToString(), req.URL.String(), dumpBody)
response, err := client.Do(req)
if err != nil {
log.Sugar.Errorf("failed to %s the hook event. err: %s", event.ToString(), err.Error())
return response, err
@@ -68,10 +45,33 @@ func PostHookEvent(event HookEvent, params string, body interface{}) (*http.Resp
if http.StatusOK != response.StatusCode {
return response, fmt.Errorf("unexpected response status: %s", response.Status)
}
return response, nil
}
func PostHookEventWithJson(event HookEvent, params string, body interface{}) (*http.Response, error) {
url, ok := hookUrls[event]
if url == "" || !ok {
return nil, fmt.Errorf("the url for this %s event does not exist", event.ToString())
}
jsonBody, err := json.Marshal(body)
if err != nil {
return nil, err
}
if "" != params {
url += "?" + params
}
request, err := http.NewRequest("post", url, bytes.NewBuffer(jsonBody))
if err != nil {
return nil, err
}
request.Header.Set("Content-Type", "application/json")
return DoPostHookEvent(event, request, jsonBody)
}
func NewHookPlayEventInfo(sink Sink) eventInfo {
return eventInfo{Stream: sink.GetSourceID(), Protocol: int(sink.GetProtocol()), RemoteAddr: sink.RemoteAddr()}
}
+2 -2
View File
@@ -24,7 +24,7 @@ func PreparePlaySink(sink Sink, waitTimeout bool) (*http.Response, utils.HookSta
Sink: SinkID2String(sink.GetID()),
}
hook, err := PostHookEvent(HookEventPlay, sink.UrlValues().Encode(), body)
hook, err := PostHookEventWithJson(HookEventPlay, sink.UrlValues().Encode(), body)
if err != nil {
log.Sugar.Errorf("播放事件-通知失败 err: %s sink: %s-%v source: %s", err.Error(), sink.GetProtocol().String(), sink.GetID(), sink.GetSourceID())
@@ -79,7 +79,7 @@ func HookPlayDoneEvent(sink Sink) (*http.Response, bool) {
Sink: SinkID2String(sink.GetID()),
}
hook, err := PostHookEvent(HookEventPlayDone, sink.UrlValues().Encode(), body)
hook, err := PostHookEventWithJson(HookEventPlayDone, sink.UrlValues().Encode(), body)
if err != nil {
log.Sugar.Errorf("播放结束事件-通知失败 err: %s sink: %s-%v source: %s", err.Error(), sink.GetProtocol().String(), sink.GetID(), sink.GetSourceID())
return hook, false
+15 -15
View File
@@ -1,6 +1,7 @@
package stream
import (
"bytes"
"encoding/json"
"fmt"
"github.com/lkmio/avformat/utils"
@@ -78,7 +79,7 @@ func PreparePublishSourceWithAsync(source Source, add bool) {
func NotifyPublishEvent(source Source) (*http.Response, error) {
if AppConfig.Hooks.IsEnablePublishEvent() {
return PostHookEvent(HookEventPublish, source.UrlValues().Encode(), NewHookPublishEventInfo(source))
return PostHookEventWithJson(HookEventPublish, source.UrlValues().Encode(), NewHookPublishEventInfo(source))
}
return nil, nil
@@ -86,38 +87,37 @@ func NotifyPublishEvent(source Source) (*http.Response, error) {
func NotifyPublishDoneEvent(source Source) {
if AppConfig.Hooks.IsEnablePublishEvent() {
_, _ = PostHookEvent(HookEventPublishDone, source.UrlValues().Encode(), NewHookPublishEventInfo(source))
_, _ = PostHookEventWithJson(HookEventPublishDone, source.UrlValues().Encode(), NewHookPublishEventInfo(source))
}
}
func NotifyReceiveTimeoutEvent(source Source) (*http.Response, error) {
utils.Assert(AppConfig.Hooks.IsEnableOnReceiveTimeout())
return PostHookEvent(HookEventReceiveTimeout, source.UrlValues().Encode(), NewHookPublishEventInfo(source))
return PostHookEventWithJson(HookEventReceiveTimeout, source.UrlValues().Encode(), NewHookPublishEventInfo(source))
}
func NotifyIdleTimeoutEvent(source Source) (*http.Response, error) {
utils.Assert(AppConfig.Hooks.IsEnableOnIdleTimeout())
return PostHookEvent(HookEventIdleTimeout, source.UrlValues().Encode(), NewHookPublishEventInfo(source))
return PostHookEventWithJson(HookEventIdleTimeout, source.UrlValues().Encode(), NewHookPublishEventInfo(source))
}
func NotifyRecordEvent(source Source, path string) {
if AppConfig.Hooks.IsEnableOnRecord() {
_, _ = PostHookEvent(HookEventRecord, "", NewRecordEventInfo(source, path))
_, _ = PostHookEventWithJson(HookEventRecord, "", NewRecordEventInfo(source, path))
}
}
func NotifySnapshotEvent(source Source, codec string, keyFrameData []byte) {
if AppConfig.Hooks.IsEnableOnSnapshot() {
data := struct {
eventInfo
Codec string
KeyFrameData []byte `json:"key_frame_data"`
}{
eventInfo: NewHookPublishEventInfo(source),
Codec: codec,
KeyFrameData: keyFrameData,
}
req, _ := http.NewRequest("POST", AppConfig.Hooks.OnSnapshotUrl, bytes.NewReader(keyFrameData))
_, _ = PostHookEvent(HookEventSnapshot, "", &data)
req.Header.Add("stream", source.GetID())
req.Header.Add("session", source.GetSessionID())
req.Header.Add("protocol", source.GetType().String())
req.Header.Add("remote_addr", source.RemoteAddr())
req.Header.Add("codec", codec)
req.Header.Set("Content-Type", "application/octet-stream")
_, _ = DoPostHookEvent(HookEventSnapshot, req, nil)
}
}