diff --git a/README.md b/README.md index 170b408..7f8471a 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ go wvp 是 Go 语言实现的开源 GB28181 解决方案,基于 GB28181-2022 ## 在线演示平台 -+ [在线演示平台 :)](http://wvp.golang.space:15123/) ++ [在线演示平台 :) (服务器已过期,暂不提供演示)](http://wvp.golang.space:15123/) ![](./docs/demo/play.gif) @@ -97,12 +97,14 @@ zlm 能否访问到 gowvp?? docker 合并版本填写 127.0.0.1 即可,分离 设计如此,超过 4 个要在管理页查看,或者点击右侧的 "查看更多" -> 使用了 nginx 反向代理,返回的播放地址无法播放 +> 使用了 nginx 反向代理,返回的播放地址无法播放或不加载快照 在反向代理那里配置以下参数,其中域名根据实际的填写 proxy_set_header X-Forwarded-Host $host; proxy_set_header X-Forwarded-Prefix "https://gowvp.com"; +proxy_set_header Upgrade $http_upgrade; +proxy_set_header Connection "upgrade"; diff --git a/internal/conf/default.go b/internal/conf/default.go index bd41d20..ae64e88 100644 --- a/internal/conf/default.go +++ b/internal/conf/default.go @@ -48,7 +48,7 @@ func DefaultConfig() Bootstrap { Log: Log{ Dir: "./logs", Level: "error", - MaxAge: Duration(7 * 24 * time.Hour), + MaxAge: Duration(3 * 24 * time.Hour), RotationTime: Duration(8 * time.Hour), RotationSize: 50, }, diff --git a/internal/core/proxy/stream_proxy.go b/internal/core/proxy/stream_proxy.go index 0a099da..77a545d 100755 --- a/internal/core/proxy/stream_proxy.go +++ b/internal/core/proxy/stream_proxy.go @@ -45,6 +45,17 @@ func (c *Core) GetStreamProxy(ctx context.Context, id string) (*StreamProxy, err return &out, nil } +func (c *Core) GetStreamProxyByAppStream(ctx context.Context, app, stream string) (*StreamProxy, error) { + var out StreamProxy + if err := c.store.StreamProxy().Get(ctx, &out, orm.Where("app=? AND stream=?", app, stream)); err != nil { + if orm.IsErrRecordNotFound(err) { + return nil, reason.ErrNotFound.Withf(`Get err[%s]`, err.Error()) + } + return nil, reason.ErrDB.Withf(`Get err[%s]`, err.Error()) + } + return &out, nil +} + // AddStreamProxy Insert into database func (c *Core) AddStreamProxy(ctx context.Context, in *AddStreamProxyInput) (*StreamProxy, error) { var out StreamProxy @@ -55,6 +66,8 @@ func (c *Core) AddStreamProxy(ctx context.Context, in *AddStreamProxyInput) (*St return nil, reason.ErrBadRequest.With("请更换 app 参数") } out.ID = c.uniqueID.UniqueID(bz.IDPrefixRTSP) + out.Stream = out.ID + out.App = "pull" if err := c.store.StreamProxy().Add(ctx, &out); err != nil { if orm.IsDuplicatedKey(err) { return nil, reason.ErrDB.SetMsg("stream 重复,请勿重复添加") diff --git a/internal/core/sms/node_manager.go b/internal/core/sms/node_manager.go index a3e8f44..e360a2f 100644 --- a/internal/core/sms/node_manager.go +++ b/internal/core/sms/node_manager.go @@ -200,7 +200,14 @@ func (n *NodeManager) connection(server *MediaServer, serverPort int) error { HookEnable: zlm.NewString("1"), HookOnFlowReport: zlm.NewString(""), HookOnPlay: zlm.NewString(fmt.Sprintf("%s/on_play", hookPrefix)), + // HookOnHTTPAccess: zlm.NewString(""), + // 仅开启 hls_fmp4 + ProtocolEnableTs: zlm.NewString("0"), + ProtocolEnableFmp4: zlm.NewString("0"), + ProtocolEnableHls: zlm.NewString("0"), + ProtocolEnableHlsFmp4: zlm.NewString("1"), + HookOnPublish: zlm.NewString(fmt.Sprintf("%s/on_publish", hookPrefix)), HookOnStreamNoneReader: zlm.NewString(fmt.Sprintf("%s/on_stream_none_reader", hookPrefix)), GeneralStreamNoneReaderDelayMS: zlm.NewString("30000"), diff --git a/internal/web/api/gb28181.go b/internal/web/api/gb28181.go index ae399fb..2078605 100755 --- a/internal/web/api/gb28181.go +++ b/internal/web/api/gb28181.go @@ -167,8 +167,7 @@ func (a GB28181API) editChannel(c *gin.Context, in *gb28181.EditChannelInput) (a func (a GB28181API) play(c *gin.Context, _ *struct{}) (*playOutput, error) { channelID := c.Param("id") - var app, appStream, host, stream, session string - var svr *sms.MediaServer + var app, appStream, host, stream, session, mediaServerID string // 国标逻辑 if strings.HasPrefix(channelID, bz.IDPrefixGBChannel) { @@ -185,10 +184,7 @@ func (a GB28181API) play(c *gin.Context, _ *struct{}) (*playOutput, error) { app = "rtp" appStream = ch.ID - svr, err = a.uc.SMSAPI.smsCore.GetMediaServer(c.Request.Context(), sms.DefaultMediaServerID) - if err != nil { - return nil, err - } + mediaServerID = sms.DefaultMediaServerID } else if strings.HasPrefix(channelID, bz.IDPrefixRTMP) { pu, err := a.uc.MediaAPI.pushCore.GetStreamPush(c.Request.Context(), channelID) @@ -200,11 +196,7 @@ func (a GB28181API) play(c *gin.Context, _ *struct{}) (*playOutput, error) { } app = pu.App appStream = pu.Stream - - svr, err = a.uc.SMSAPI.smsCore.GetMediaServer(c.Request.Context(), pu.MediaServerID) - if err != nil { - return nil, err - } + mediaServerID = pu.MediaServerID if !pu.IsAuthDisabled && pu.Session != "" { session = "session=" + pu.Session @@ -216,33 +208,16 @@ func (a GB28181API) play(c *gin.Context, _ *struct{}) (*playOutput, error) { } app = proxy.App appStream = proxy.Stream - - svr, err = a.uc.SMSAPI.smsCore.GetMediaServer(c.Request.Context(), sms.DefaultMediaServerID) - if err != nil { - return nil, err - } - resp, err := a.uc.SMSAPI.smsCore.AddStreamProxy(svr, zlm.AddStreamProxyRequest{ - Vhost: "__defaultVhost__", - App: proxy.App, - Stream: proxy.Stream, - URL: proxy.SourceURL, - RetryCount: 3, - RTPType: proxy.Transport, - TimeoutSec: 10, - // EnableRTMP: zlm.NewBool(true), - // EnableRTSP: zlm.NewBool(true), - // EnableHLS: zlm.NewBool(true), - // EnableAudio: zlm.NewBool(true), - AddMuteAudio: zlm.NewBool(true), - // AutoClose: zlm.NewBool(false), - }) - if err != nil { - return nil, reason.ErrServer.SetMsg(err.Error()) - } - a.uc.ProxyAPI.proxyCore.EditStreamProxyKey(c.Request.Context(), resp.Data.Key, proxy.ID) + mediaServerID = sms.DefaultMediaServerID } else { return nil, reason.ErrNotFound.SetMsg("不支持的播放通道") } + + svr, err := a.uc.SMSAPI.smsCore.GetMediaServer(c.Request.Context(), mediaServerID) + if err != nil { + return nil, err + } + stream = app + "/" + appStream host = c.Request.Host @@ -267,15 +242,15 @@ func (a GB28181API) play(c *gin.Context, _ *struct{}) (*playOutput, error) { WebRTC: fmt.Sprintf("webrtc://%s:%d/proxy/sms/index/api/webrtc?app=%s&stream=%s&type=play", host, httpPort, app, stream) + "&" + session, HLS: fmt.Sprintf("http://%s:%d/proxy/sms/%s/hls.fmp4.m3u8", host, httpPort, stream) + "?" + session, }, - { - Label: "SSL 线路", - WSFLV: fmt.Sprintf("wss://%s:%d/%s.live.flv", host, svr.Ports.HTTP, stream) + session, - HTTPFLV: fmt.Sprintf("https://%s:%d/%s.live.flv", host, svr.Ports.HTTP, stream) + session, - RTMP: fmt.Sprintf("rtmps://%s:%d/%s", host, svr.Ports.RTMPs, stream) + session, - RTSP: fmt.Sprintf("rtsps://%s:%d/%s", host, svr.Ports.RTSPs, stream) + session, - WebRTC: fmt.Sprintf("webrtc://%s:%d/index/api/webrtc?app=%s&stream=%s&type=play", host, svr.Ports.HTTPS, app, stream) + "&" + session, - HLS: fmt.Sprintf("https://%s:%d/%s/hls.fmp4.m3u8", host, svr.Ports.HTTPS, stream) + "?" + session, - }, + // { + // Label: "SSL 线路", + // WSFLV: fmt.Sprintf("wss://%s:%d/%s.live.flv", host, svr.Ports.HTTP, stream) + session, + // HTTPFLV: fmt.Sprintf("https://%s:%d/%s.live.flv", host, svr.Ports.HTTP, stream) + session, + // RTMP: fmt.Sprintf("rtmps://%s:%d/%s", host, svr.Ports.RTMPs, stream) + session, + // RTSP: fmt.Sprintf("rtsps://%s:%d/%s", host, svr.Ports.RTSPs, stream) + session, + // WebRTC: fmt.Sprintf("webrtc://%s:%d/index/api/webrtc?app=%s&stream=%s&type=play", host, svr.Ports.HTTPS, app, stream) + "&" + session, + // HLS: fmt.Sprintf("https://%s:%d/%s/hls.fmp4.m3u8", host, svr.Ports.HTTPS, stream) + "?" + session, + // }, }, } @@ -284,14 +259,15 @@ func (a GB28181API) play(c *gin.Context, _ *struct{}) (*playOutput, error) { wsPrefix := strings.Replace(strings.Replace(prefix, "https", "wss", 1), "http", "ws", 1) out.Items[0].WSFLV = fmt.Sprintf("%s/proxy/sms/%s.live.flv", wsPrefix, stream) + "?" + session out.Items[0].HTTPFLV = fmt.Sprintf("%s/proxy/sms/%s.live.flv", prefix, stream) + "?" + session - - host := c.Request.Header.Get("X-Forwarded-Host") - out.Items[0].RTMP = fmt.Sprintf("rtmp://%s:%d/proxy/sms/%s.live.flv", host, svr.Ports.RTMP, stream) + "?" + session - out.Items[0].RTSP = fmt.Sprintf("rtsp://%s:%d/proxy/sms/%s.live.flv", host, svr.Ports.RTSP, stream) + "?" + session out.Items[0].HLS = fmt.Sprintf("%s/proxy/sms/%s/hls.fmp4.m3u8", prefix, stream) + "?" + session rtcPrefix := strings.Replace(strings.Replace(prefix, "https", "webrtc", 1), "http", "webrtc", 1) out.Items[0].WebRTC = fmt.Sprintf("%s/proxy/sms/index/api/webrtc?app=%s&stream=%s&type=play", rtcPrefix, app, stream) + "&" + session + host := c.Request.Header.Get("X-Forwarded-Host") + if host != "" { + out.Items[0].RTMP = fmt.Sprintf("rtmp://%s:%d/%s", host, svr.Ports.RTMP, stream) + "?" + session + out.Items[0].RTSP = fmt.Sprintf("rtsp://%s:%d/%s", host, svr.Ports.RTSP, stream) + "?" + session + } } // 取一张快照 diff --git a/internal/web/api/zlm_webhook.go b/internal/web/api/zlm_webhook.go index f10d7ba..65d2200 100644 --- a/internal/web/api/zlm_webhook.go +++ b/internal/web/api/zlm_webhook.go @@ -1,15 +1,19 @@ package api import ( + "context" "log/slog" "net/url" + "strings" "github.com/gin-gonic/gin" "github.com/gowvp/gb28181/internal/conf" + "github.com/gowvp/gb28181/internal/core/bz" "github.com/gowvp/gb28181/internal/core/gb28181" "github.com/gowvp/gb28181/internal/core/push" "github.com/gowvp/gb28181/internal/core/sms" "github.com/gowvp/gb28181/pkg/gbs" + "github.com/gowvp/gb28181/pkg/zlm" "github.com/ixugo/goddd/pkg/web" ) @@ -57,7 +61,7 @@ func (w WebHookAPI) onServerKeepalive(_ *gin.Context, in *onServerKeepaliveInput // onPublish rtsp/rtmp/rtp 推流鉴权事件。 // https://docs.zlmediakit.com/zh/guide/media_server/web_hook_api.html#_7%E3%80%81on-publish func (w WebHookAPI) onPublish(c *gin.Context, in *onPublishInput) (*onPublishOutput, error) { - w.log.Info("推流鉴权", "app", in.App, "stream", in.Stream, "schema", in.Schema, "mediaServerID", in.MediaServerID) + w.log.Info("webhook onPublish", "app", in.App, "stream", in.Stream, "schema", in.Schema, "mediaServerID", in.MediaServerID) if in.Schema == "rtmp" { params, err := url.ParseQuery(in.Params) if err != nil { @@ -83,13 +87,13 @@ func (w WebHookAPI) onPublish(c *gin.Context, in *onPublishInput) (*onPublishOut // onStreamChanged rtsp/rtmp 流注册或注销时触发此事件;此事件对回复不敏感。 // https://docs.zlmediakit.com/zh/guide/media_server/web_hook_api.html#_12%E3%80%81on-stream-changed func (w WebHookAPI) onStreamChanged(c *gin.Context, in *onStreamChangedInput) (DefaultOutput, error) { - w.log.InfoContext(c.Request.Context(), "流状态变化", "app", in.App, "stream", in.Stream, "schema", in.Schema, "mediaServerID", in.MediaServerID, "regist", in.Regist) + w.log.InfoContext(c.Request.Context(), "webhook onStreamChanged", "app", in.App, "stream", in.Stream, "schema", in.Schema, "mediaServerID", in.MediaServerID, "regist", in.Regist) if in.App == "rtp" { // 防止多次触发 if in.Schema == "rtmp" && !in.Regist { ch, err := w.gb28181Core.GetChannel(c.Request.Context(), in.Stream) if err != nil { - w.log.Warn("获取通道失败", "err", err) + w.log.WarnContext(c.Request.Context(), "webhook onStreamChanged", "err", err) return newDefaultOutputOK(), nil } w.gbs.StopPlay(&gbs.StopPlayInput{Channel: ch}) @@ -97,14 +101,17 @@ func (w WebHookAPI) onStreamChanged(c *gin.Context, in *onStreamChangedInput) (D return newDefaultOutputOK(), nil } + if strings.HasPrefix(in.Stream, bz.IDPrefixRTSP) { + return newDefaultOutputOK(), nil + } + switch in.Schema { case "rtmp": if !in.Regist { if err := w.mediaCore.UnPublish(c.Request.Context(), in.App, in.Stream); err != nil { - w.log.ErrorContext(c.Request.Context(), "UnPublish", "err", err) + w.log.ErrorContext(c.Request.Context(), "webhook onStreamChanged", "err", err) } } - case "rtsp": } return newDefaultOutputOK(), nil } @@ -120,7 +127,7 @@ func (w WebHookAPI) onPlay(c *gin.Context, in *onPublishInput) (DefaultOutput, e case "rtmp": params, err := url.ParseQuery(in.Params) if err != nil { - w.log.InfoContext(c.Request.Context(), "onPlay 鉴权失败", "err", err) + w.log.InfoContext(c.Request.Context(), "webhook onPlay", "err", err) return DefaultOutput{Code: 1, Msg: err.Error()}, nil } session := params.Get("session") @@ -129,7 +136,7 @@ func (w WebHookAPI) onPlay(c *gin.Context, in *onPublishInput) (DefaultOutput, e Stream: in.Stream, Session: session, }); err != nil { - w.log.InfoContext(c.Request.Context(), "onPlay 鉴权失败", "err", err) + w.log.InfoContext(c.Request.Context(), "webhook onPlay", "err", err) return DefaultOutput{Code: 1, Msg: err.Error()}, nil } case "rtsp": @@ -147,15 +154,16 @@ func (w WebHookAPI) onPlay(c *gin.Context, in *onPublishInput) (DefaultOutput, e // https://docs.zlmediakit.com/zh/guide/media_server/web_hook_api.html#_12%E3%80%81on-stream-changed func (w WebHookAPI) onStreamNoneReader(c *gin.Context, in *onStreamNoneReaderInput) (onStreamNoneReaderOutput, error) { // rtmp 无人观看时,也允许推流 - w.log.InfoContext(c.Request.Context(), "无人观看", "app", in.App, "stream", in.Stream, "mediaServerID", in.MediaServerID) + w.log.InfoContext(c.Request.Context(), "webhook onStreamNoneReader", "app", in.App, "stream", in.Stream, "mediaServerID", in.MediaServerID) if in.App == "rtp" { ch, err := w.gb28181Core.GetChannel(c.Request.Context(), in.Stream) if err != nil { - w.log.WarnContext(c.Request.Context(), "获取通道失败", "err", err) + w.log.WarnContext(c.Request.Context(), "webhook onStreamNoneReader", "err", err) return onStreamNoneReaderOutput{Close: true}, nil } _ = w.gbs.StopPlay(&gbs.StopPlayInput{Channel: ch}) + } else if strings.HasPrefix(in.Stream, bz.IDPrefixRTSP) { } // 存在录像计划时,不关闭流 return onStreamNoneReaderOutput{Close: true}, nil @@ -165,12 +173,12 @@ func (w WebHookAPI) onStreamNoneReader(c *gin.Context, in *onStreamNoneReaderInp // 调用 openRtpServer 接口,rtp server 长时间未收到数据,执行此 web hook,对回复不敏感 // https://docs.zlmediakit.com/zh/guide/media_server/web_hook_api.html#_17%E3%80%81on-rtp-server-timeout func (w WebHookAPI) onRTPServerTimeout(c *gin.Context, in *onRTPServerTimeoutInput) (DefaultOutput, error) { - w.log.InfoContext(c.Request.Context(), "rtp 收流超时", "local_port", in.LocalPort, "ssrc", in.SSRC, "stream_id", in.StreamID, "mediaServerID", in.MediaServerID) + w.log.InfoContext(c.Request.Context(), "webhook onRTPServerTimeout", "local_port", in.LocalPort, "ssrc", in.SSRC, "stream_id", in.StreamID, "mediaServerID", in.MediaServerID) return newDefaultOutputOK(), nil } func (w WebHookAPI) onStreamNotFound(c *gin.Context, in *onStreamNotFoundInput) (DefaultOutput, error) { - w.log.InfoContext(c.Request.Context(), "流不存在", "app", in.App, "stream", in.Stream, "schema", in.Schema, "mediaServerID", in.MediaServerID) + w.log.InfoContext(c.Request.Context(), "webhook onStreamNotFound", "app", in.App, "stream", in.Stream, "schema", in.Schema, "mediaServerID", in.MediaServerID) // 国标流处理 if in.App == "rtp" { @@ -178,32 +186,79 @@ func (w WebHookAPI) onStreamNotFound(c *gin.Context, in *onStreamNotFoundInput) if in.Schema != "rtmp" { return newDefaultOutputOK(), nil } - ch, err := w.gb28181Core.GetChannel(c.Request.Context(), in.Stream) - if err != nil { - // slog.Error("获取通道失败", "err", err) - return newDefaultOutputOK(), nil + v := RTPStream{uc: w.uc} + if err := v.onStreamNotFound(c.Request.Context(), in); err != nil { + slog.ErrorContext(c.Request.Context(), "webhook onStreamNotFound", "err", err) } - - dev, err := w.gb28181Core.GetDevice(c.Request.Context(), ch.DID) - if err != nil { - // slog.Error("获取设备失败", "err", err) - return newDefaultOutputOK(), nil - } - - svr, err := w.uc.SMSAPI.smsCore.GetMediaServer(c.Request.Context(), sms.DefaultMediaServerID) - if err != nil { - // slog.Error("GetMediaServer", "err", err) - return newDefaultOutputOK(), nil - } - - if err := w.gbs.Play(&gbs.PlayInput{ - Channel: ch, - StreamMode: dev.StreamMode, - SMS: svr, - }); err != nil { - w.log.ErrorContext(c.Request.Context(), "play", "err", err, "channel", ch.ID) - return newDefaultOutputOK(), nil + } else if strings.HasPrefix(in.Stream, bz.IDPrefixRTSP) { + v := RTSPStream{uc: w.uc} + if err := v.onStreamNotFound(c.Request.Context(), in); err != nil { + slog.ErrorContext(c.Request.Context(), "webhook onStreamNotFound", "err", err) } } return newDefaultOutputOK(), nil } + +type RTPStream struct { + uc *Usecase +} + +func (r RTPStream) onStreamNotFound(ctx context.Context, in *onStreamNotFoundInput) error { + ch, err := r.uc.GB28181API.gb28181Core.GetChannel(ctx, in.Stream) + if err != nil { + return err + } + + dev, err := r.uc.GB28181API.gb28181Core.GetDevice(ctx, ch.DID) + if err != nil { + return err + } + + svr, err := r.uc.SMSAPI.smsCore.GetMediaServer(ctx, sms.DefaultMediaServerID) + if err != nil { + return err + } + + return r.uc.WebHookAPI.gbs.Play(&gbs.PlayInput{ + Channel: ch, + StreamMode: dev.StreamMode, + SMS: svr, + }) +} + +type RTSPStream struct { + uc *Usecase +} + +func (r RTSPStream) onStreamNotFound(ctx context.Context, in *onStreamNotFoundInput) error { + proxy, err := r.uc.ProxyAPI.proxyCore.GetStreamProxy(ctx, in.Stream) + if err != nil { + return err + } + + svr, err := r.uc.SMSAPI.smsCore.GetMediaServer(ctx, sms.DefaultMediaServerID) + if err != nil { + return err + } + resp, err := r.uc.SMSAPI.smsCore.AddStreamProxy(svr, zlm.AddStreamProxyRequest{ + Vhost: "__defaultVhost__", + App: proxy.App, + Stream: proxy.Stream, + URL: proxy.SourceURL, + RetryCount: 3, + RTPType: proxy.Transport, + TimeoutSec: 10, + EnableHLSFMP4: zlm.NewBool(true), + EnableAudio: zlm.NewBool(true), + EnableRTSP: zlm.NewBool(true), + EnableRTMP: zlm.NewBool(true), + AddMuteAudio: zlm.NewBool(true), + AutoClose: zlm.NewBool(true), + }) + if err != nil { + return err + } + // 用于关闭 + r.uc.ProxyAPI.proxyCore.EditStreamProxyKey(ctx, resp.Data.Key, proxy.ID) + return nil +}