diff --git a/.gitignore b/.gitignore index 9eda369..113b7b4 100644 --- a/.gitignore +++ b/.gitignore @@ -30,6 +30,7 @@ build/ __debug_* www/ *.db -tables +tables/ *.tar -*.zip \ No newline at end of file +*.zip +.idea/ diff --git a/README.md b/README.md index 3aa64dd..eda7ae0 100644 --- a/README.md +++ b/README.md @@ -110,7 +110,9 @@ ZLM使用文档 [github.com/ZLMediaKit/ZLMediaKit](https://github.com/ZLMediaKit [GoWVP 全栈开发日记[2]:搭建服务端,解决跨域,接口联调](https://juejin.cn/post/7456796962120417314) -[GoWVP 全栈开发日记[3]:使用 React 组件构建监控数据面板](https://juejin.cn/spost/7457228085826764834) +[GoWVP 全栈开发日记[3]:使用 React 组件构建监控数据面板](https://juejin.cn/post/7457228085826764834) + +[GoWVP 全栈开发日记[4]:使用 ESlint 辅助开发](https://juejin.cn/post/7461539078111789108) 开发中... diff --git a/cmd/server/main.go b/cmd/server/main.go index 3e53e63..a75bac0 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -36,12 +36,14 @@ func getBuildRelease() bool { func main() { flag.Parse() + // 以可执行文件所在目录为工作目录,防止以服务方式运行时,工作目录切换到其它位置 bin, _ := os.Executable() if err := os.Chdir(filepath.Dir(bin)); err != nil { slog.Error("change dir error") } // 初始化配置 var bc conf.Bootstrap + // 获取配置目录绝对路径 filedir, _ := abs(*configDir) filePath := filepath.Join(filedir, "config.toml") if err := conf.SetupConfig(&bc, filePath); err != nil { diff --git a/configs/config.toml b/configs/config.toml index 5b3e6db..14bb67d 100644 --- a/configs/config.toml +++ b/configs/config.toml @@ -18,10 +18,10 @@ version = 1 SlowThreshold = '200ms' [Media] - IP = "127.0.0.1" + IP = "192.168.1.10" HTTPPort = 8080 - Secret = "OHvo86N9Ww6V8mHPWMisxNgkb8dvqAV420241107" - WebHookIP = "192.168.10.25" + Secret = "s1kPE7bzqKeHUaVcp8dCA0jeB8yxyFq4" + WebHookIP = "192.168.1.10" RTPPortRange = "20000,20500" [Log] diff --git a/internal/core/media/stream_push.go b/internal/core/media/stream_push.go index 880f1e9..75ad472 100755 --- a/internal/core/media/stream_push.go +++ b/internal/core/media/stream_push.go @@ -22,7 +22,16 @@ type StreamPushStorer interface { // FindStreamPush Paginated search func (c Core) FindStreamPush(ctx context.Context, in *FindStreamPushInput) ([]*StreamPush, int64, error) { items := make([]*StreamPush, 0) - total, err := c.store.StreamPush().Find(ctx, &items, in) + args := make([]orm.QueryOption, 0, 2) + args = append(args, orm.OrderBy("created_at DESC")) + if in.Status != "" { + args = append(args, orm.Where("status=?", in.Status)) + } + if in.Key != "" { + args = append(args, orm.Where("id=? OR app LIKE ? OR stream LIKE ?", in.Key, "%"+in.Key+"%", "%"+in.Key+"%")) + } + + total, err := c.store.StreamPush().Find(ctx, &items, in, args...) if err != nil { return nil, 0, web.ErrDB.Withf(`Find err[%s]`, err.Error()) } diff --git a/internal/core/media/stream_push.param.go b/internal/core/media/stream_push.param.go index acacedb..e6a8841 100755 --- a/internal/core/media/stream_push.param.go +++ b/internal/core/media/stream_push.param.go @@ -8,13 +8,14 @@ import ( type FindStreamPushInput struct { web.PagerFilter - App string `form:"app"` // 应用名 - PushedAt *orm.Time `form:"pushed_at"` // 最后一次推流时间 - StoppedAt *orm.Time `form:"stopped_at"` // 最后一次停止时间 - Stream string `form:"stream"` // 流 ID - MediaServerID string `form:"media_server_id"` // 媒体服务器 ID - ServerID string `form:"server_id"` // 服务器 ID - Status string `form:"status"` // 推流状态(PUSHING) + // App string `form:"app"` // 应用名 + // PushedAt *orm.Time `form:"pushed_at"` // 最后一次推流时间 + // StoppedAt *orm.Time `form:"stopped_at"` // 最后一次停止时间 + // Stream string `form:"stream"` // 流 ID + // MediaServerID string `form:"media_server_id"` // 媒体服务器 ID + // ServerID string `form:"server_id"` // 服务器 ID + Status string `form:"status"` // 推流状态(PUSHING) + Key string `form:"key"` } type EditStreamPushInput struct { diff --git a/internal/core/sms/media_server.go b/internal/core/sms/media_server.go index d1b532e..7be86cc 100755 --- a/internal/core/sms/media_server.go +++ b/internal/core/sms/media_server.go @@ -30,7 +30,7 @@ func (c *Core) FindMediaServer(ctx context.Context, in *FindMediaServerInput) ([ } // GetMediaServer Query a single object -func (c *Core) GetMediaServer(ctx context.Context, id int) (*MediaServer, error) { +func (c *Core) GetMediaServer(ctx context.Context, id string) (*MediaServer, error) { var out MediaServer if err := c.storer.MediaServer().Get(ctx, &out, orm.Where("id=?", id)); err != nil { if orm.IsErrRecordNotFound(err) { @@ -54,7 +54,7 @@ func (c *Core) AddMediaServer(ctx context.Context, in *AddMediaServerInput) (*Me } // EditMediaServer Update object information -func (c *Core) EditMediaServer(ctx context.Context, in *EditMediaServerInput, id int) (*MediaServer, error) { +func (c *Core) EditMediaServer(ctx context.Context, in *EditMediaServerInput, id string) (*MediaServer, error) { var out MediaServer if err := c.storer.MediaServer().Edit(ctx, &out, func(b *MediaServer) { if err := copier.Copy(b, in); err != nil { @@ -67,7 +67,7 @@ func (c *Core) EditMediaServer(ctx context.Context, in *EditMediaServerInput, id } // DelMediaServer Delete object -func (c *Core) DelMediaServer(ctx context.Context, id int) (*MediaServer, error) { +func (c *Core) DelMediaServer(ctx context.Context, id string) (*MediaServer, error) { var out MediaServer if err := c.storer.MediaServer().Del(ctx, &out, orm.Where("id=?", id)); err != nil { return nil, web.ErrDB.Withf(`Del err[%s]`, err.Error()) diff --git a/internal/core/sms/node_manager.go b/internal/core/sms/node_manager.go index b8e5b63..6e34afe 100644 --- a/internal/core/sms/node_manager.go +++ b/internal/core/sms/node_manager.go @@ -136,8 +136,11 @@ func (n *NodeManager) connection(server *MediaServer, serverPort int) { zlmConfig := resp.Data[0] var ms MediaServer if err := n.storer.MediaServer().Edit(context.Background(), &ms, func(b *MediaServer) { - b.Ports.FLV = zlmConfig.HTTPPort - b.Ports.WsFLV = zlmConfig.HTTPSslport + // b.Ports.FLV = zlmConfig.HTTPPort + // TODO: 映射的端口,会导致获取配置文件的端口不一定能访问 + http := server.Ports.HTTP + b.Ports.FLV = http + b.Ports.WsFLV = http // zlmConfig.HTTPSslport b.Ports.HTTPS = zlmConfig.HTTPSslport b.Ports.RTMP = zlmConfig.RtmpPort b.Ports.RTMPS = zlmConfig.RtmpSslport @@ -159,12 +162,12 @@ func (n *NodeManager) connection(server *MediaServer, serverPort int) { GeneralMediaServerID: zlm.NewString(server.ID), HookEnable: zlm.NewString("1"), HookOnFlowReport: zlm.NewString(""), - // HookOnPlay: , - HookOnHTTPAccess: zlm.NewString(""), - HookOnPublish: zlm.NewString(fmt.Sprintf("%s/on_publish", hookPrefix)), - HookOnRecordTs: zlm.NewString(""), - HookOnRtspAuth: zlm.NewString(""), - HookOnRtspRealm: zlm.NewString(""), + HookOnPlay: zlm.NewString(fmt.Sprintf("%s/on_play", hookPrefix)), + // HookOnHTTPAccess: zlm.NewString(""), + HookOnPublish: zlm.NewString(fmt.Sprintf("%s/on_publish", hookPrefix)), + HookOnRecordTs: zlm.NewString(""), + HookOnRtspAuth: zlm.NewString(""), + HookOnRtspRealm: zlm.NewString(""), // HookOnServerStarted: , HookOnShellLogin: zlm.NewString(""), HookOnStreamChanged: zlm.NewString(fmt.Sprintf("%s/on_stream_changed", hookPrefix)), diff --git a/internal/web/api/api.go b/internal/web/api/api.go index f490ea0..4ab22e6 100644 --- a/internal/web/api/api.go +++ b/internal/web/api/api.go @@ -2,6 +2,7 @@ package api import ( "expvar" + "fmt" "log/slog" "net/http" "path/filepath" @@ -13,6 +14,7 @@ import ( "github.com/gin-contrib/gzip" "github.com/gin-gonic/gin" + "github.com/gowvp/gb28181/internal/core/media" "github.com/gowvp/gb28181/plugin/stat" "github.com/gowvp/gb28181/plugin/stat/statapi" "github.com/ixugo/goweb/pkg/system" @@ -66,10 +68,61 @@ func setupRouter(r *gin.Engine, uc *Usecase) { registerZLMWebhookAPI(r, uc.WebHookAPI) // TODO: 待增加鉴权 registerMediaAPI(r, uc.MediaAPI) + // TODO: 临时播放接口,待重构 - r.POST("/channels/:id/play", func(ctx *gin.Context) { - web.Success(ctx, gin.H{"msg": "ok"}) - }) + r.POST("/channels/:id/play", web.WarpH(func(c *gin.Context, _ *struct{}) (*playOutput, error) { + channelID := c.Param("id") + + // TODO: 目前仅开发到 rtsp,待扩展 rtsp/gb 等 + if !strings.HasPrefix(channelID, media.RTMPIDPrefix) { + return nil, web.ErrNotFound.Msg("不支持的播放通道") + } + + push, err := uc.MediaAPI.mediaCore.GetStreamPush(c.Request.Context(), channelID) + if err != nil { + return nil, err + } + if push.Status != media.StatusPushing { + return nil, web.ErrNotFound.Msg("未推流") + } + + svr, err := uc.SMSAPI.smsCore.GetMediaServer(c.Request.Context(), push.MediaServerID) + if err != nil { + return nil, err + } + + stream := push.App + "/" + push.Stream + + host := c.Request.Host + if l := strings.Split(c.Request.Host, ":"); len(l) == 2 { + host = l[0] + } + + return &playOutput{ + App: push.App, + Stream: push.Stream, + Items: []streamAddrItem{ + { + WSFLV: fmt.Sprintf("ws://%s:%d/%s.live.flv", host, svr.Ports.WsFLV, stream), + HTTPFLV: fmt.Sprintf("http://%s:%d/%s.live.flv", host, svr.Ports.FLV, stream), + RTMP: fmt.Sprintf("rtmp://%s:%d/%s", host, svr.Ports.RTMP, stream), + RTSP: fmt.Sprintf("rtsp://%s:%d/%s", host, svr.Ports.RTSP, stream), + }, + }, + }, nil + })) +} + +type playOutput struct { + App string `json:"app"` + Stream string `json:"stream"` + Items []streamAddrItem `json:"items"` +} +type streamAddrItem struct { + WSFLV string `json:"ws_flv"` + HTTPFLV string `json:"http_flv"` + RTMP string `json:"rtmp"` + RTSP string `json:"rtsp"` } type getHealthOutput struct { diff --git a/internal/web/api/sms.go b/internal/web/api/sms.go index f063a69..86461db 100755 --- a/internal/web/api/sms.go +++ b/internal/web/api/sms.go @@ -2,8 +2,6 @@ package api import ( - "strconv" - "github.com/gin-gonic/gin" "github.com/gowvp/gb28181/internal/conf" "github.com/gowvp/gb28181/internal/core/sms" @@ -54,12 +52,12 @@ func (a SmsAPI) findMediaServer(c *gin.Context, in *sms.FindMediaServerInput) (a } func (a SmsAPI) getMediaServer(c *gin.Context, _ *struct{}) (any, error) { - mediaServerID, _ := strconv.Atoi(c.Param("id")) + mediaServerID := c.Param("id") return a.smsCore.GetMediaServer(c.Request.Context(), mediaServerID) } func (a SmsAPI) editMediaServer(c *gin.Context, in *sms.EditMediaServerInput) (any, error) { - mediaServerID, _ := strconv.Atoi(c.Param("id")) + mediaServerID := c.Param("id") return a.smsCore.EditMediaServer(c.Request.Context(), in, mediaServerID) } @@ -68,6 +66,6 @@ func (a SmsAPI) addMediaServer(c *gin.Context, in *sms.AddMediaServerInput) (any } func (a SmsAPI) delMediaServer(c *gin.Context, _ *struct{}) (any, error) { - mediaServerID, _ := strconv.Atoi(c.Param("id")) + mediaServerID := c.Param("id") return a.smsCore.DelMediaServer(c.Request.Context(), mediaServerID) } diff --git a/internal/web/api/zlm_webhook.go b/internal/web/api/zlm_webhook.go index b250d83..b257425 100644 --- a/internal/web/api/zlm_webhook.go +++ b/internal/web/api/zlm_webhook.go @@ -22,6 +22,7 @@ func registerZLMWebhookAPI(r gin.IRouter, api WebHookAPI, handler ...gin.Handler group.POST("/on_server_keepalive", web.WarpH(api.onServerKeepalive)) group.POST("/on_stream_changed", web.WarpH(api.onStreamChanged)) group.POST("/on_publish", web.WarpH(api.onPublish)) + group.POST("/on_play", web.WarpH(api.onPlay)) } } @@ -47,6 +48,14 @@ func (w WebHookAPI) onPublish(c *gin.Context, in *onPublishInput) (*onPublishOut // onStreamChanged rtsp/rtmp 流注册或注销时触发此事件;此事件对回复不敏感。 // https://docs.zlmediakit.com/zh/guide/media_server/web_hook_api.html#_12%E3%80%81on-stream-changed -func (w WebHookAPI) onStreamChanged(_ *gin.Context, in *onStreamChangedInput) (DefaultOutput, error) { +func (w WebHookAPI) onStreamChanged(_ *gin.Context, in *struct{}) (DefaultOutput, error) { + return newDefaultOutputOK(), nil +} + +// onPlay rtsp/rtmp/http-flv/ws-flv/hls 播放触发播放器身份验证事件。 +// 播放流时会触发此事件。如果流不存在,则首先触发 on_play 事件,然后触发 on_stream_not_found 事件。 +// 播放rtsp流时,如果该流开启了rtsp专用认证(on_rtsp_realm),则不会触发on_play事件。 +// https://docs.zlmediakit.com/guide/media_server/web_hook_api.html#_6-on-play +func (w WebHookAPI) onPlay(_ *gin.Context, in *onPublishInput) (DefaultOutput, error) { return newDefaultOutputOK(), nil }