重构 rtsp 播放

This commit is contained in:
xugo
2025-11-07 12:18:56 +08:00
parent d698127ee9
commit 530d36ff4d
6 changed files with 139 additions and 86 deletions
+4 -2
View File
@@ -14,7 +14,7 @@ go wvp 是 Go 语言实现的开源 GB28181 解决方案,基于 GB28181-2022
## 在线演示平台
+ [在线演示平台 :)](http://wvp.golang.space:15123/)
+ [在线演示平台 :) (服务器已过期,暂不提供演示)](http://wvp.golang.space:15123/)
![](./docs/demo/play.gif)
@@ -97,12 +97,14 @@ zlm 能否访问到 gowvp?? docker 合并版本填写 127.0.0.1 即可,分离
设计如此,超过 4 个要在管理页查看,或者点击右侧的 "查看更多"
> 使用了 nginx 反向代理,返回的播放地址无法播放
> 使用了 nginx 反向代理,返回的播放地址无法播放或不加载快照
在反向代理那里配置以下参数,其中域名根据实际的填写
proxy_set_header X-Forwarded-Host $host;
proxy_set_header X-Forwarded-Prefix "https://gowvp.com";
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
+1 -1
View File
@@ -48,7 +48,7 @@ func DefaultConfig() Bootstrap {
Log: Log{
Dir: "./logs",
Level: "error",
MaxAge: Duration(7 * 24 * time.Hour),
MaxAge: Duration(3 * 24 * time.Hour),
RotationTime: Duration(8 * time.Hour),
RotationSize: 50,
},
+13
View File
@@ -45,6 +45,17 @@ func (c *Core) GetStreamProxy(ctx context.Context, id string) (*StreamProxy, err
return &out, nil
}
func (c *Core) GetStreamProxyByAppStream(ctx context.Context, app, stream string) (*StreamProxy, error) {
var out StreamProxy
if err := c.store.StreamProxy().Get(ctx, &out, orm.Where("app=? AND stream=?", app, stream)); err != nil {
if orm.IsErrRecordNotFound(err) {
return nil, reason.ErrNotFound.Withf(`Get err[%s]`, err.Error())
}
return nil, reason.ErrDB.Withf(`Get err[%s]`, err.Error())
}
return &out, nil
}
// AddStreamProxy Insert into database
func (c *Core) AddStreamProxy(ctx context.Context, in *AddStreamProxyInput) (*StreamProxy, error) {
var out StreamProxy
@@ -55,6 +66,8 @@ func (c *Core) AddStreamProxy(ctx context.Context, in *AddStreamProxyInput) (*St
return nil, reason.ErrBadRequest.With("请更换 app 参数")
}
out.ID = c.uniqueID.UniqueID(bz.IDPrefixRTSP)
out.Stream = out.ID
out.App = "pull"
if err := c.store.StreamProxy().Add(ctx, &out); err != nil {
if orm.IsDuplicatedKey(err) {
return nil, reason.ErrDB.SetMsg("stream 重复,请勿重复添加")
+7
View File
@@ -200,7 +200,14 @@ func (n *NodeManager) connection(server *MediaServer, serverPort int) error {
HookEnable: zlm.NewString("1"),
HookOnFlowReport: zlm.NewString(""),
HookOnPlay: zlm.NewString(fmt.Sprintf("%s/on_play", hookPrefix)),
// HookOnHTTPAccess: zlm.NewString(""),
// 仅开启 hls_fmp4
ProtocolEnableTs: zlm.NewString("0"),
ProtocolEnableFmp4: zlm.NewString("0"),
ProtocolEnableHls: zlm.NewString("0"),
ProtocolEnableHlsFmp4: zlm.NewString("1"),
HookOnPublish: zlm.NewString(fmt.Sprintf("%s/on_publish", hookPrefix)),
HookOnStreamNoneReader: zlm.NewString(fmt.Sprintf("%s/on_stream_none_reader", hookPrefix)),
GeneralStreamNoneReaderDelayMS: zlm.NewString("30000"),
+24 -48
View File
@@ -167,8 +167,7 @@ func (a GB28181API) editChannel(c *gin.Context, in *gb28181.EditChannelInput) (a
func (a GB28181API) play(c *gin.Context, _ *struct{}) (*playOutput, error) {
channelID := c.Param("id")
var app, appStream, host, stream, session string
var svr *sms.MediaServer
var app, appStream, host, stream, session, mediaServerID string
// 国标逻辑
if strings.HasPrefix(channelID, bz.IDPrefixGBChannel) {
@@ -185,10 +184,7 @@ func (a GB28181API) play(c *gin.Context, _ *struct{}) (*playOutput, error) {
app = "rtp"
appStream = ch.ID
svr, err = a.uc.SMSAPI.smsCore.GetMediaServer(c.Request.Context(), sms.DefaultMediaServerID)
if err != nil {
return nil, err
}
mediaServerID = sms.DefaultMediaServerID
} else if strings.HasPrefix(channelID, bz.IDPrefixRTMP) {
pu, err := a.uc.MediaAPI.pushCore.GetStreamPush(c.Request.Context(), channelID)
@@ -200,11 +196,7 @@ func (a GB28181API) play(c *gin.Context, _ *struct{}) (*playOutput, error) {
}
app = pu.App
appStream = pu.Stream
svr, err = a.uc.SMSAPI.smsCore.GetMediaServer(c.Request.Context(), pu.MediaServerID)
if err != nil {
return nil, err
}
mediaServerID = pu.MediaServerID
if !pu.IsAuthDisabled && pu.Session != "" {
session = "session=" + pu.Session
@@ -216,33 +208,16 @@ func (a GB28181API) play(c *gin.Context, _ *struct{}) (*playOutput, error) {
}
app = proxy.App
appStream = proxy.Stream
svr, err = a.uc.SMSAPI.smsCore.GetMediaServer(c.Request.Context(), sms.DefaultMediaServerID)
if err != nil {
return nil, err
}
resp, err := a.uc.SMSAPI.smsCore.AddStreamProxy(svr, zlm.AddStreamProxyRequest{
Vhost: "__defaultVhost__",
App: proxy.App,
Stream: proxy.Stream,
URL: proxy.SourceURL,
RetryCount: 3,
RTPType: proxy.Transport,
TimeoutSec: 10,
// EnableRTMP: zlm.NewBool(true),
// EnableRTSP: zlm.NewBool(true),
// EnableHLS: zlm.NewBool(true),
// EnableAudio: zlm.NewBool(true),
AddMuteAudio: zlm.NewBool(true),
// AutoClose: zlm.NewBool(false),
})
if err != nil {
return nil, reason.ErrServer.SetMsg(err.Error())
}
a.uc.ProxyAPI.proxyCore.EditStreamProxyKey(c.Request.Context(), resp.Data.Key, proxy.ID)
mediaServerID = sms.DefaultMediaServerID
} else {
return nil, reason.ErrNotFound.SetMsg("不支持的播放通道")
}
svr, err := a.uc.SMSAPI.smsCore.GetMediaServer(c.Request.Context(), mediaServerID)
if err != nil {
return nil, err
}
stream = app + "/" + appStream
host = c.Request.Host
@@ -267,15 +242,15 @@ func (a GB28181API) play(c *gin.Context, _ *struct{}) (*playOutput, error) {
WebRTC: fmt.Sprintf("webrtc://%s:%d/proxy/sms/index/api/webrtc?app=%s&stream=%s&type=play", host, httpPort, app, stream) + "&" + session,
HLS: fmt.Sprintf("http://%s:%d/proxy/sms/%s/hls.fmp4.m3u8", host, httpPort, stream) + "?" + session,
},
{
Label: "SSL 线路",
WSFLV: fmt.Sprintf("wss://%s:%d/%s.live.flv", host, svr.Ports.HTTP, stream) + session,
HTTPFLV: fmt.Sprintf("https://%s:%d/%s.live.flv", host, svr.Ports.HTTP, stream) + session,
RTMP: fmt.Sprintf("rtmps://%s:%d/%s", host, svr.Ports.RTMPs, stream) + session,
RTSP: fmt.Sprintf("rtsps://%s:%d/%s", host, svr.Ports.RTSPs, stream) + session,
WebRTC: fmt.Sprintf("webrtc://%s:%d/index/api/webrtc?app=%s&stream=%s&type=play", host, svr.Ports.HTTPS, app, stream) + "&" + session,
HLS: fmt.Sprintf("https://%s:%d/%s/hls.fmp4.m3u8", host, svr.Ports.HTTPS, stream) + "?" + session,
},
// {
// Label: "SSL 线路",
// WSFLV: fmt.Sprintf("wss://%s:%d/%s.live.flv", host, svr.Ports.HTTP, stream) + session,
// HTTPFLV: fmt.Sprintf("https://%s:%d/%s.live.flv", host, svr.Ports.HTTP, stream) + session,
// RTMP: fmt.Sprintf("rtmps://%s:%d/%s", host, svr.Ports.RTMPs, stream) + session,
// RTSP: fmt.Sprintf("rtsps://%s:%d/%s", host, svr.Ports.RTSPs, stream) + session,
// WebRTC: fmt.Sprintf("webrtc://%s:%d/index/api/webrtc?app=%s&stream=%s&type=play", host, svr.Ports.HTTPS, app, stream) + "&" + session,
// HLS: fmt.Sprintf("https://%s:%d/%s/hls.fmp4.m3u8", host, svr.Ports.HTTPS, stream) + "?" + session,
// },
},
}
@@ -284,14 +259,15 @@ func (a GB28181API) play(c *gin.Context, _ *struct{}) (*playOutput, error) {
wsPrefix := strings.Replace(strings.Replace(prefix, "https", "wss", 1), "http", "ws", 1)
out.Items[0].WSFLV = fmt.Sprintf("%s/proxy/sms/%s.live.flv", wsPrefix, stream) + "?" + session
out.Items[0].HTTPFLV = fmt.Sprintf("%s/proxy/sms/%s.live.flv", prefix, stream) + "?" + session
host := c.Request.Header.Get("X-Forwarded-Host")
out.Items[0].RTMP = fmt.Sprintf("rtmp://%s:%d/proxy/sms/%s.live.flv", host, svr.Ports.RTMP, stream) + "?" + session
out.Items[0].RTSP = fmt.Sprintf("rtsp://%s:%d/proxy/sms/%s.live.flv", host, svr.Ports.RTSP, stream) + "?" + session
out.Items[0].HLS = fmt.Sprintf("%s/proxy/sms/%s/hls.fmp4.m3u8", prefix, stream) + "?" + session
rtcPrefix := strings.Replace(strings.Replace(prefix, "https", "webrtc", 1), "http", "webrtc", 1)
out.Items[0].WebRTC = fmt.Sprintf("%s/proxy/sms/index/api/webrtc?app=%s&stream=%s&type=play", rtcPrefix, app, stream) + "&" + session
host := c.Request.Header.Get("X-Forwarded-Host")
if host != "" {
out.Items[0].RTMP = fmt.Sprintf("rtmp://%s:%d/%s", host, svr.Ports.RTMP, stream) + "?" + session
out.Items[0].RTSP = fmt.Sprintf("rtsp://%s:%d/%s", host, svr.Ports.RTSP, stream) + "?" + session
}
}
// 取一张快照
+90 -35
View File
@@ -1,15 +1,19 @@
package api
import (
"context"
"log/slog"
"net/url"
"strings"
"github.com/gin-gonic/gin"
"github.com/gowvp/gb28181/internal/conf"
"github.com/gowvp/gb28181/internal/core/bz"
"github.com/gowvp/gb28181/internal/core/gb28181"
"github.com/gowvp/gb28181/internal/core/push"
"github.com/gowvp/gb28181/internal/core/sms"
"github.com/gowvp/gb28181/pkg/gbs"
"github.com/gowvp/gb28181/pkg/zlm"
"github.com/ixugo/goddd/pkg/web"
)
@@ -57,7 +61,7 @@ func (w WebHookAPI) onServerKeepalive(_ *gin.Context, in *onServerKeepaliveInput
// onPublish rtsp/rtmp/rtp 推流鉴权事件。
// https://docs.zlmediakit.com/zh/guide/media_server/web_hook_api.html#_7%E3%80%81on-publish
func (w WebHookAPI) onPublish(c *gin.Context, in *onPublishInput) (*onPublishOutput, error) {
w.log.Info("推流鉴权", "app", in.App, "stream", in.Stream, "schema", in.Schema, "mediaServerID", in.MediaServerID)
w.log.Info("webhook onPublish", "app", in.App, "stream", in.Stream, "schema", in.Schema, "mediaServerID", in.MediaServerID)
if in.Schema == "rtmp" {
params, err := url.ParseQuery(in.Params)
if err != nil {
@@ -83,13 +87,13 @@ func (w WebHookAPI) onPublish(c *gin.Context, in *onPublishInput) (*onPublishOut
// onStreamChanged rtsp/rtmp 流注册或注销时触发此事件;此事件对回复不敏感。
// https://docs.zlmediakit.com/zh/guide/media_server/web_hook_api.html#_12%E3%80%81on-stream-changed
func (w WebHookAPI) onStreamChanged(c *gin.Context, in *onStreamChangedInput) (DefaultOutput, error) {
w.log.InfoContext(c.Request.Context(), "流状态变化", "app", in.App, "stream", in.Stream, "schema", in.Schema, "mediaServerID", in.MediaServerID, "regist", in.Regist)
w.log.InfoContext(c.Request.Context(), "webhook onStreamChanged", "app", in.App, "stream", in.Stream, "schema", in.Schema, "mediaServerID", in.MediaServerID, "regist", in.Regist)
if in.App == "rtp" {
// 防止多次触发
if in.Schema == "rtmp" && !in.Regist {
ch, err := w.gb28181Core.GetChannel(c.Request.Context(), in.Stream)
if err != nil {
w.log.Warn("获取通道失败", "err", err)
w.log.WarnContext(c.Request.Context(), "webhook onStreamChanged", "err", err)
return newDefaultOutputOK(), nil
}
w.gbs.StopPlay(&gbs.StopPlayInput{Channel: ch})
@@ -97,14 +101,17 @@ func (w WebHookAPI) onStreamChanged(c *gin.Context, in *onStreamChangedInput) (D
return newDefaultOutputOK(), nil
}
if strings.HasPrefix(in.Stream, bz.IDPrefixRTSP) {
return newDefaultOutputOK(), nil
}
switch in.Schema {
case "rtmp":
if !in.Regist {
if err := w.mediaCore.UnPublish(c.Request.Context(), in.App, in.Stream); err != nil {
w.log.ErrorContext(c.Request.Context(), "UnPublish", "err", err)
w.log.ErrorContext(c.Request.Context(), "webhook onStreamChanged", "err", err)
}
}
case "rtsp":
}
return newDefaultOutputOK(), nil
}
@@ -120,7 +127,7 @@ func (w WebHookAPI) onPlay(c *gin.Context, in *onPublishInput) (DefaultOutput, e
case "rtmp":
params, err := url.ParseQuery(in.Params)
if err != nil {
w.log.InfoContext(c.Request.Context(), "onPlay 鉴权失败", "err", err)
w.log.InfoContext(c.Request.Context(), "webhook onPlay", "err", err)
return DefaultOutput{Code: 1, Msg: err.Error()}, nil
}
session := params.Get("session")
@@ -129,7 +136,7 @@ func (w WebHookAPI) onPlay(c *gin.Context, in *onPublishInput) (DefaultOutput, e
Stream: in.Stream,
Session: session,
}); err != nil {
w.log.InfoContext(c.Request.Context(), "onPlay 鉴权失败", "err", err)
w.log.InfoContext(c.Request.Context(), "webhook onPlay", "err", err)
return DefaultOutput{Code: 1, Msg: err.Error()}, nil
}
case "rtsp":
@@ -147,15 +154,16 @@ func (w WebHookAPI) onPlay(c *gin.Context, in *onPublishInput) (DefaultOutput, e
// https://docs.zlmediakit.com/zh/guide/media_server/web_hook_api.html#_12%E3%80%81on-stream-changed
func (w WebHookAPI) onStreamNoneReader(c *gin.Context, in *onStreamNoneReaderInput) (onStreamNoneReaderOutput, error) {
// rtmp 无人观看时,也允许推流
w.log.InfoContext(c.Request.Context(), "无人观看", "app", in.App, "stream", in.Stream, "mediaServerID", in.MediaServerID)
w.log.InfoContext(c.Request.Context(), "webhook onStreamNoneReader", "app", in.App, "stream", in.Stream, "mediaServerID", in.MediaServerID)
if in.App == "rtp" {
ch, err := w.gb28181Core.GetChannel(c.Request.Context(), in.Stream)
if err != nil {
w.log.WarnContext(c.Request.Context(), "获取通道失败", "err", err)
w.log.WarnContext(c.Request.Context(), "webhook onStreamNoneReader", "err", err)
return onStreamNoneReaderOutput{Close: true}, nil
}
_ = w.gbs.StopPlay(&gbs.StopPlayInput{Channel: ch})
} else if strings.HasPrefix(in.Stream, bz.IDPrefixRTSP) {
}
// 存在录像计划时,不关闭流
return onStreamNoneReaderOutput{Close: true}, nil
@@ -165,12 +173,12 @@ func (w WebHookAPI) onStreamNoneReader(c *gin.Context, in *onStreamNoneReaderInp
// 调用 openRtpServer 接口,rtp server 长时间未收到数据,执行此 web hook,对回复不敏感
// https://docs.zlmediakit.com/zh/guide/media_server/web_hook_api.html#_17%E3%80%81on-rtp-server-timeout
func (w WebHookAPI) onRTPServerTimeout(c *gin.Context, in *onRTPServerTimeoutInput) (DefaultOutput, error) {
w.log.InfoContext(c.Request.Context(), "rtp 收流超时", "local_port", in.LocalPort, "ssrc", in.SSRC, "stream_id", in.StreamID, "mediaServerID", in.MediaServerID)
w.log.InfoContext(c.Request.Context(), "webhook onRTPServerTimeout", "local_port", in.LocalPort, "ssrc", in.SSRC, "stream_id", in.StreamID, "mediaServerID", in.MediaServerID)
return newDefaultOutputOK(), nil
}
func (w WebHookAPI) onStreamNotFound(c *gin.Context, in *onStreamNotFoundInput) (DefaultOutput, error) {
w.log.InfoContext(c.Request.Context(), "流不存在", "app", in.App, "stream", in.Stream, "schema", in.Schema, "mediaServerID", in.MediaServerID)
w.log.InfoContext(c.Request.Context(), "webhook onStreamNotFound", "app", in.App, "stream", in.Stream, "schema", in.Schema, "mediaServerID", in.MediaServerID)
// 国标流处理
if in.App == "rtp" {
@@ -178,32 +186,79 @@ func (w WebHookAPI) onStreamNotFound(c *gin.Context, in *onStreamNotFoundInput)
if in.Schema != "rtmp" {
return newDefaultOutputOK(), nil
}
ch, err := w.gb28181Core.GetChannel(c.Request.Context(), in.Stream)
if err != nil {
// slog.Error("获取通道失败", "err", err)
return newDefaultOutputOK(), nil
v := RTPStream{uc: w.uc}
if err := v.onStreamNotFound(c.Request.Context(), in); err != nil {
slog.ErrorContext(c.Request.Context(), "webhook onStreamNotFound", "err", err)
}
dev, err := w.gb28181Core.GetDevice(c.Request.Context(), ch.DID)
if err != nil {
// slog.Error("获取设备失败", "err", err)
return newDefaultOutputOK(), nil
}
svr, err := w.uc.SMSAPI.smsCore.GetMediaServer(c.Request.Context(), sms.DefaultMediaServerID)
if err != nil {
// slog.Error("GetMediaServer", "err", err)
return newDefaultOutputOK(), nil
}
if err := w.gbs.Play(&gbs.PlayInput{
Channel: ch,
StreamMode: dev.StreamMode,
SMS: svr,
}); err != nil {
w.log.ErrorContext(c.Request.Context(), "play", "err", err, "channel", ch.ID)
return newDefaultOutputOK(), nil
} else if strings.HasPrefix(in.Stream, bz.IDPrefixRTSP) {
v := RTSPStream{uc: w.uc}
if err := v.onStreamNotFound(c.Request.Context(), in); err != nil {
slog.ErrorContext(c.Request.Context(), "webhook onStreamNotFound", "err", err)
}
}
return newDefaultOutputOK(), nil
}
type RTPStream struct {
uc *Usecase
}
func (r RTPStream) onStreamNotFound(ctx context.Context, in *onStreamNotFoundInput) error {
ch, err := r.uc.GB28181API.gb28181Core.GetChannel(ctx, in.Stream)
if err != nil {
return err
}
dev, err := r.uc.GB28181API.gb28181Core.GetDevice(ctx, ch.DID)
if err != nil {
return err
}
svr, err := r.uc.SMSAPI.smsCore.GetMediaServer(ctx, sms.DefaultMediaServerID)
if err != nil {
return err
}
return r.uc.WebHookAPI.gbs.Play(&gbs.PlayInput{
Channel: ch,
StreamMode: dev.StreamMode,
SMS: svr,
})
}
type RTSPStream struct {
uc *Usecase
}
func (r RTSPStream) onStreamNotFound(ctx context.Context, in *onStreamNotFoundInput) error {
proxy, err := r.uc.ProxyAPI.proxyCore.GetStreamProxy(ctx, in.Stream)
if err != nil {
return err
}
svr, err := r.uc.SMSAPI.smsCore.GetMediaServer(ctx, sms.DefaultMediaServerID)
if err != nil {
return err
}
resp, err := r.uc.SMSAPI.smsCore.AddStreamProxy(svr, zlm.AddStreamProxyRequest{
Vhost: "__defaultVhost__",
App: proxy.App,
Stream: proxy.Stream,
URL: proxy.SourceURL,
RetryCount: 3,
RTPType: proxy.Transport,
TimeoutSec: 10,
EnableHLSFMP4: zlm.NewBool(true),
EnableAudio: zlm.NewBool(true),
EnableRTSP: zlm.NewBool(true),
EnableRTMP: zlm.NewBool(true),
AddMuteAudio: zlm.NewBool(true),
AutoClose: zlm.NewBool(true),
})
if err != nil {
return err
}
// 用于关闭
r.uc.ProxyAPI.proxyCore.EditStreamProxyKey(ctx, resp.Data.Key, proxy.ID)
return nil
}