diff --git a/.gitignore b/.gitignore index 2db6339..7333020 100644 --- a/.gitignore +++ b/.gitignore @@ -43,4 +43,5 @@ __pycache__/ *.pt *.onnx *.remember/ -*.mp4 \ No newline at end of file +*.mp4 +owl \ No newline at end of file diff --git a/Makefile b/Makefile index 05b2b4b..bace8e6 100644 --- a/Makefile +++ b/Makefile @@ -205,10 +205,11 @@ docker/push: docker/build/test: build/clean build/linux @docker build --force-rm=true -t $(IMAGE_NAME) -f Dockerfile_zlm . +# 不包含 ai 功能的融合镜像 docker/build/zlm: build/clean build/linux - #@docker build --force-rm=true --push --platform linux/amd64,linux/arm64 -t $(IMAGE_NAME) -f Dockerfile_zlm . - @docker build --force-rm=true --push --platform linux/amd64,linux/arm64 -t registry.cn-shanghai.aliyuncs.com/ixugo/homenvr:latest -f Dockerfile_zlm . + @docker build --force-rm=true --push --platform linux/amd64,linux/arm64 -t $(IMAGE_NAME) -t registry.cn-shanghai.aliyuncs.com/ixugo/homenvr:latest -f Dockerfile_zlm . +# beta 版发布时用,不稳定的 docker/build/ai: build/clean build/linux @docker build --push --platform linux/amd64,linux/arm64 -t registry.cn-shanghai.aliyuncs.com/ixugo/homenvr:beta -f Dockerfile_ai . @@ -216,9 +217,9 @@ docker/build/ai: build/clean build/linux docker/publish: build/clean build/linux @docker build --force-rm=true --push --platform linux/amd64,linux/arm64 -t registry.cn-shanghai.aliyuncs.com/ixugo/homenvr:latest -t $(IMAGE_NAME) -f Dockerfile_ai . -# 构建 gowvp 独立镜像 +# 构建 gowvp 独立镜像(弃用) docker/build/gowvp: build/clean build/linux - @docker build --force-rm=true --push --platform linux/amd64 -t registry.cn-shanghai.aliyuncs.com/ixugo/gowvp:latest -f Dockerfile . + @docker build --force-rm=true --push --platform linux/amd64,linux/arm64 -t registry.cn-shanghai.aliyuncs.com/ixugo/gowvp:latest -f Dockerfile . # ==================================================================================== # diff --git a/internal/adapter/gbadapter/gb.go b/internal/adapter/gbadapter/gb.go index 6530193..ab0ecce 100644 --- a/internal/adapter/gbadapter/gb.go +++ b/internal/adapter/gbadapter/gb.go @@ -32,7 +32,8 @@ func (a *Adapter) InitDevice(ctx context.Context, device *ipc.Device) error { // OnStreamChanged implements ipc.Protocoler. // 流注销时停止播放并更新播放状态(仅在 regist=false 时由 zlm_webhook 调用) -func (a *Adapter) OnStreamChanged(ctx context.Context, stream string) error { +// GB28181 协议的 stream 就是 channel.ID,app 固定为 rtp +func (a *Adapter) OnStreamChanged(ctx context.Context, app, stream string) error { ch, err := a.adapter.GetChannel(ctx, stream) if err != nil { return err diff --git a/internal/adapter/onvifadapter/hook.go b/internal/adapter/onvifadapter/hook.go index 6bfe7fe..8f7b3ef 100644 --- a/internal/adapter/onvifadapter/hook.go +++ b/internal/adapter/onvifadapter/hook.go @@ -10,7 +10,9 @@ import ( "github.com/ixugo/goddd/pkg/orm" ) -func (a *Adapter) OnStreamChanged(ctx context.Context, stream string) error { +// OnStreamChanged implements ipc.Protocoler. +// ONVIF 协议的 stream 就是 channel.ID,app 固定为 live +func (a *Adapter) OnStreamChanged(ctx context.Context, app, stream string) error { var ch ipc.Channel if err := a.adapter.Store().Channel().Get(ctx, &ch, orm.Where("id=?", stream)); err != nil { return err diff --git a/internal/adapter/rtmpadapter/rtmp.go b/internal/adapter/rtmpadapter/rtmp.go index 3426969..5760f54 100644 --- a/internal/adapter/rtmpadapter/rtmp.go +++ b/internal/adapter/rtmpadapter/rtmp.go @@ -38,17 +38,22 @@ func (a *Adapter) InitDevice(ctx context.Context, device *ipc.Device) error { // OnStreamChanged implements ipc.Protocoler. // RTMP 推流断开时更新通道状态(IsOnline=false, IsPlaying=false) -func (a *Adapter) OnStreamChanged(ctx context.Context, stream string) error { +func (a *Adapter) OnStreamChanged(ctx context.Context, app, stream string) error { now := orm.Now() - // stream 就是 channel.ID,直接查询 - _, err := a.ipcCore.EditChannelConfigAndOnline(ctx, stream, false, func(cfg *ipc.StreamConfig) { + // 通过 app+stream 查询通道,支持自定义 app/stream + ch, err := a.ipcCore.GetChannelByAppStreamOrID(ctx, app, stream) + if err != nil { + slog.WarnContext(ctx, "RTMP 通道未找到", "app", app, "stream", stream, "err", err) + return nil + } + _, err = a.ipcCore.EditChannelConfigAndOnline(ctx, ch.ID, false, func(cfg *ipc.StreamConfig) { cfg.StoppedAt = &now }) if err != nil { - slog.WarnContext(ctx, "更新 RTMP 通道停流状态失败", "stream", stream, "err", err) + slog.WarnContext(ctx, "更新 RTMP 通道停流状态失败", "app", app, "stream", stream, "err", err) } // 同时更新 IsPlaying - if _, err := a.ipcCore.EditChannelPlaying(ctx, stream, false); err != nil { + if _, err := a.ipcCore.EditChannelPlaying(ctx, ch.Stream, false); err != nil { slog.WarnContext(ctx, "更新 RTMP 通道播放状态失败", "stream", stream, "err", err) } return nil @@ -62,9 +67,9 @@ func (a *Adapter) OnStreamNotFound(ctx context.Context, app string, stream strin // OnPublish 处理 RTMP 推流鉴权 // 验证推流参数中的 sign 字段是否与配置的 RTMPSecret MD5 一致 -func (a *Adapter) OnPublish(ctx context.Context, stream string, params map[string]string) (bool, error) { - // stream 就是 channel.ID,直接查询 - ch, err := a.ipcCore.GetChannel(ctx, stream) +func (a *Adapter) OnPublish(ctx context.Context, app, stream string, params map[string]string) (bool, error) { + // 通过 app+stream 查询通道,支持自定义 app/stream + ch, err := a.ipcCore.GetChannelByAppStreamOrID(ctx, app, stream) if err != nil { return false, err } diff --git a/internal/adapter/rtspadapter/rtsp.go b/internal/adapter/rtspadapter/rtsp.go index 8370324..5b5fa9d 100644 --- a/internal/adapter/rtspadapter/rtsp.go +++ b/internal/adapter/rtspadapter/rtsp.go @@ -36,10 +36,15 @@ func (a *Adapter) InitDevice(ctx context.Context, device *ipc.Device) error { // OnStreamChanged implements ipc.Protocoler. // RTSP 拉流断开时更新通道状态(IsOnline=false, IsPlaying=false) -func (a *Adapter) OnStreamChanged(ctx context.Context, stream string) error { - // stream 就是 channel.ID,直接更新状态 - if _, err := a.ipcCore.EditChannelOnlineAndPlaying(ctx, stream, false, false); err != nil { - slog.WarnContext(ctx, "更新 RTSP 通道状态失败", "stream", stream, "err", err) +func (a *Adapter) OnStreamChanged(ctx context.Context, app, stream string) error { + // 通过 app+stream 查询通道,支持自定义 app/stream + ch, err := a.ipcCore.GetChannelByAppStreamOrID(ctx, app, stream) + if err != nil { + slog.WarnContext(ctx, "RTSP 通道未找到", "app", app, "stream", stream, "err", err) + return nil + } + if _, err := a.ipcCore.EditChannelOnlineAndPlaying(ctx, ch.Stream, false, false); err != nil { + slog.WarnContext(ctx, "更新 RTSP 通道状态失败", "app", app, "stream", stream, "err", err) } return nil } @@ -47,7 +52,8 @@ func (a *Adapter) OnStreamChanged(ctx context.Context, stream string) error { // OnStreamNotFound implements ipc.Protocoler. // 当流不存在时,从 Channel 获取配置并启动拉流代理 func (a *Adapter) OnStreamNotFound(ctx context.Context, app string, stream string) error { - ch, err := a.ipcCore.GetChannel(ctx, stream) + // 通过 app+stream 查询通道,支持自定义 app/stream + ch, err := a.ipcCore.GetChannelByAppStreamOrID(ctx, app, stream) if err != nil { return err } diff --git a/internal/app/app.go b/internal/app/app.go index 04def95..8ec3603 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -20,6 +20,19 @@ import ( ) func Run(bc *conf.Bootstrap) { + if bc.Server.Recording.DiskUsageThreshold <= 0 { + bc.Server.Recording.DiskUsageThreshold = 95.0 + } + if bc.Server.Recording.SegmentSeconds <= 0 { + bc.Server.Recording.SegmentSeconds = 300 + } + if bc.Server.Recording.RetainDays <= 0 { + bc.Server.Recording.RetainDays = 3 + } + if bc.Server.Recording.StorageDir == "" { + bc.Server.Recording.StorageDir = "./configs/recordings" + } + ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/internal/conf/config.go b/internal/conf/config.go index 88c8d80..ceb1360 100644 --- a/internal/conf/config.go +++ b/internal/conf/config.go @@ -35,10 +35,6 @@ type ServerRecording struct { RetainDays int `comment:"录像保留天数(超过则清理)"` DiskUsageThreshold float64 `comment:"磁盘使用率阈值(百分比),超过则触发循环覆盖"` SegmentSeconds int `comment:"MP4 切片时长(秒)"` - DisabledGB28181 bool `comment:"是否禁用 GB28181 通道录制(true=禁用)"` - DisabledRTMP bool `comment:"是否禁用 RTMP 通道录制(true=禁用)"` - DisabledRTSP bool `comment:"是否禁用 RTSP 通道录制(true=禁用)"` - DisabledONVIF bool `comment:"是否禁用 ONVIF 通道录制(true=禁用)"` } type ServerAI struct { diff --git a/internal/conf/default.go b/internal/conf/default.go index 38afa7a..714da84 100644 --- a/internal/conf/default.go +++ b/internal/conf/default.go @@ -30,11 +30,7 @@ func DefaultConfig() Bootstrap { StorageDir: "./configs/recordings", RetainDays: 3, DiskUsageThreshold: 95.0, - SegmentSeconds: 600, - DisabledGB28181: false, - DisabledRTMP: false, - DisabledRTSP: false, - DisabledONVIF: false, + SegmentSeconds: 300, }, }, Data: Data{ diff --git a/internal/core/ipc/channel.go b/internal/core/ipc/channel.go index 4222d5b..5e7bcc4 100755 --- a/internal/core/ipc/channel.go +++ b/internal/core/ipc/channel.go @@ -91,8 +91,7 @@ func (c *Core) GetChannel(ctx context.Context, id string) (*Channel, error) { } // AddChannel 添加 RTMP/RTSP 通道,支持自动创建虚拟设备 -// RTMP: app 固定为 "push",stream 固定为 channel.ID -// RTSP: app 固定为 "pull",stream 固定为 channel.ID +// RTMP/RTSP: 支持自定义 app 和 stream,但禁止使用 app=rtp(rtp 专用于 GB28181) func (c *Core) AddChannel(ctx context.Context, in *AddChannelInput) (*Channel, error) { // 仅支持 RTMP/RTSP 类型 if in.Type != TypeRTMP && in.Type != TypeRTSP { @@ -104,6 +103,11 @@ func (c *Core) AddChannel(ctx context.Context, in *AddChannelInput) (*Channel, e return nil, reason.ErrBadRequest.SetMsg("通道名称不能为空") } + // 禁止 app=rtp,rtp 专用于 GB28181 协议 + if strings.EqualFold(in.App, "rtp") { + return nil, reason.ErrBadRequest.SetMsg("app=rtp 为 GB28181 专用,RTMP/RTSP 不可使用") + } + var deviceID string var needUpdateChannelCount bool // 是否需要更新设备的通道计数 @@ -149,12 +153,17 @@ func (c *Core) AddChannel(ctx context.Context, in *AddChannelInput) (*Channel, e } out.ID = GenerateChannelID(&out, c.uniqueID) - // RTMP/RTSP 通道的 app 和 stream 固定,不允许自定义 - // stream 与 ID 一致,ZLM 回调时可通过 stream 直接查询通道(WHERE id = stream) + // RTMP/RTSP 通道:支持自定义 app 和 stream,若未指定则使用默认值 switch in.Type { case TypeRTMP: - out.App = "push" - out.Stream = out.ID + out.App = in.App + if out.App == "" { + out.App = "push" + } + out.Stream = in.Stream + if out.Stream == "" { + out.Stream = out.ID + } case TypeRTSP: out.App = "pull" out.Stream = out.ID @@ -195,6 +204,11 @@ func getDevicePrefix(t string) string { // EditChannel Update object information func (c *Core) EditChannel(ctx context.Context, in *EditChannelInput, id string) (*Channel, error) { + // 禁止 app=rtp,rtp 专用于 GB28181 协议 + if strings.EqualFold(in.App, "rtp") { + return nil, reason.ErrBadRequest.SetMsg("app=rtp 为 GB28181 专用,RTMP/RTSP 不可使用") + } + // TODO: 修改 onvif 的账号/密码 后需要重新连接设备 var out Channel if err := c.store.Channel().Edit(ctx, &out, func(b *Channel) error { @@ -279,6 +293,18 @@ func (c *Core) SetAIEnabled(ctx context.Context, channelID string, enabled bool) return &out, nil } +// SetRecordMode 设置通道的录像模式,支持 always/ai/none 三种模式 +func (c *Core) SetRecordMode(ctx context.Context, channelID string, mode string) (*Channel, error) { + var out Channel + if err := c.store.Channel().Edit(ctx, &out, func(b *Channel) error { + b.Ext.RecordMode = mode + return nil + }, orm.Where("id=?", channelID)); err != nil { + return nil, reason.ErrDB.Withf(`Edit err[%s]`, err.Error()) + } + return &out, nil +} + // GetChannelByAppStream 通过 app 和 stream 获取通道 func (c *Core) GetChannelByAppStream(ctx context.Context, app, stream string) (*Channel, error) { var out Channel @@ -291,6 +317,25 @@ func (c *Core) GetChannelByAppStream(ctx context.Context, app, stream string) (* return &out, nil } +// GetChannelByAppStreamOrID 通过 app+stream 或 id=stream 获取通道 +// 用于 ZLM 回调时识别通道:先按 app+stream 查找,查不到再按 id=stream 查找 +// 支持自定义 app/stream 的 RTMP/RTSP 通道以及使用默认 ID 作为 stream 的旧通道 +func (c *Core) GetChannelByAppStreamOrID(ctx context.Context, app, stream string) (*Channel, error) { + var out Channel + // 先按 app+stream 查找 + if err := c.store.Channel().Get(ctx, &out, orm.Where("app=? AND stream=?", app, stream)); err == nil { + return &out, nil + } + // 再按 id=stream 查找(兼容旧逻辑) + if err := c.store.Channel().Get(ctx, &out, orm.Where("id=?", stream)); err != nil { + if orm.IsErrRecordNotFound(err) { + return nil, reason.ErrNotFound.Withf(`Channel not found app[%s] stream[%s]`, app, stream) + } + return nil, reason.ErrDB.Withf(`Get err[%s]`, err.Error()) + } + return &out, nil +} + // GetChannelByStream 通过 stream ID 获取通道 // 先按 stream 字段查找,找不到再按 ID 查找 func (c *Core) GetChannelByStream(ctx context.Context, stream string) (*Channel, error) { diff --git a/internal/core/ipc/channel.model.go b/internal/core/ipc/channel.model.go index b609fb0..3c3754c 100755 --- a/internal/core/ipc/channel.model.go +++ b/internal/core/ipc/channel.model.go @@ -65,3 +65,23 @@ func (c *Channel) IsRTMP() bool { func (c *Channel) IsRTSP() bool { return c.Type == TypeRTSP || bz.IsRTSP(c.ID) } + +func (c *Channel) GetApp() string { + if c.IsGB28181() { + return "rtp" + } + if c.IsOnvif() { + return "live" + } + return c.App +} + +func (c *Channel) GetStream() string { + if c.IsRTMP() { + return c.Stream + } + if c.IsRTSP() { + return c.Stream + } + return c.ID +} diff --git a/internal/core/ipc/channel.param.go b/internal/core/ipc/channel.param.go index 3130ae5..efb4893 100755 --- a/internal/core/ipc/channel.param.go +++ b/internal/core/ipc/channel.param.go @@ -22,8 +22,8 @@ type EditChannelInput struct { Ext DeviceExt `json:"ext"` // RTMP/RTSP 配置 - // App string `json:"app"` // 应用名 - // Stream string `json:"stream"` // 流 ID + App string `json:"app"` // 应用名(RTMP/RTSP 可自定义,但不能为 rtp) + Stream string `json:"stream"` // 流 ID(RTMP/RTSP 可自定义) Config StreamConfig `json:"config"` // 流配置 } diff --git a/internal/core/ipc/model.go b/internal/core/ipc/model.go index b7261a5..fccc961 100755 --- a/internal/core/ipc/model.go +++ b/internal/core/ipc/model.go @@ -40,6 +40,28 @@ type DeviceExt struct { GBVersion string `json:"gb_version"` // GB版本 Zones []Zone `json:"zones"` // 区域 EnabledAI bool `json:"enabled_ai"` // 是否启用 AI + + // 空串表示 always + RecordMode string `json:"record_mode"` // 录像模式, 一直录制:always, 按AI触发:ai, 不录制:none +} + +func (e *DeviceExt) GetRecordMode() string { + if e.RecordMode == "" { + return "always" + } + return e.RecordMode +} + +func (e *DeviceExt) IsAlwaysRecord() bool { + return e.RecordMode == "always" || e.RecordMode == "" +} + +func (e *DeviceExt) IsAIRecord() bool { + return e.RecordMode == "ai" +} + +func (e *DeviceExt) IsNoneRecord() bool { + return e.RecordMode == "none" } // Scan implements orm.Scaner. diff --git a/internal/core/ipc/port.go b/internal/core/ipc/port.go index d7a1ad0..3ac6947 100644 --- a/internal/core/ipc/port.go +++ b/internal/core/ipc/port.go @@ -41,7 +41,9 @@ type Protocoler interface { type Hooker interface { OnStreamNotFound(ctx context.Context, app, stream string) error - OnStreamChanged(ctx context.Context, stream string) error + // OnStreamChanged 流注销时调用,用于更新通道状态 + // app/stream 用于支持自定义 app/stream 的 RTMP/RTSP 通道 + OnStreamChanged(ctx context.Context, app, stream string) error } // OnPublisher 推流鉴权接口(可选实现) @@ -49,7 +51,8 @@ type Hooker interface { type OnPublisher interface { // OnPublish 处理推流鉴权 // 返回 true 表示鉴权通过,false 表示鉴权失败 - OnPublish(ctx context.Context, stream string, params map[string]string) (bool, error) + // app/stream 用于支持自定义 app/stream 的 RTMP/RTSP 通道 + OnPublish(ctx context.Context, app, stream string, params map[string]string) (bool, error) } // PlayResponse 播放响应 diff --git a/internal/core/recording/core.go b/internal/core/recording/core.go index f9c8ce3..ca4186e 100755 --- a/internal/core/recording/core.go +++ b/internal/core/recording/core.go @@ -55,26 +55,6 @@ func (c Core) IsEnabled() bool { return c.conf != nil && !c.conf.Disabled } -// IsTypeEnabled 检查指定通道类型是否启用录制 -// 使用反转逻辑:DisabledXXX=false 表示启用该类型录制 -func (c Core) IsTypeEnabled(channelType string) bool { - if !c.IsEnabled() { - return false - } - switch channelType { - case "GB28181", "": - return !c.conf.DisabledGB28181 - case "RTMP": - return !c.conf.DisabledRTMP - case "RTSP": - return !c.conf.DisabledRTSP - case "ONVIF": - return !c.conf.DisabledONVIF - default: - return false - } -} - // GetFullPath 获取录像文件的完整路径 // relativePath 可能是相对于 StorageDir 的路径,也可能是完整路径 func (c Core) GetFullPath(relativePath string) string { diff --git a/internal/core/recording/manager.go b/internal/core/recording/manager.go index d990535..dd32ed0 100644 --- a/internal/core/recording/manager.go +++ b/internal/core/recording/manager.go @@ -8,8 +8,8 @@ import ( // StartRecording 启动录制,在流注册时调用 // 根据配置决定是否录制该流,并通知 ZLM 开始 MP4 录制 func (c Core) StartRecording(ctx context.Context, channelType, app, stream string) error { - if !c.IsTypeEnabled(channelType) { - slog.DebugContext(ctx, "录制未启用或该类型未开启录制", "type", channelType, "app", app, "stream", stream) + if !c.IsEnabled() { + slog.DebugContext(ctx, "录制未启用", "app", app, "stream", stream) return nil } diff --git a/internal/web/api/ipc.go b/internal/web/api/ipc.go index 8c173a8..25aac0a 100755 --- a/internal/web/api/ipc.go +++ b/internal/web/api/ipc.go @@ -95,17 +95,18 @@ func registerGB28181(g gin.IRouter, api IPCAPI, handler ...gin.HandlerFunc) { // 统一的通道管理 API(支持所有协议) { group := g.Group("/channels", handler...) - group.GET("", web.WrapH(api.findChannel)) // 通道列表(所有协议) - group.POST("", web.WrapH(api.addChannel)) // 添加通道(RTMP/RTSP) - group.PUT("/:id", web.WrapH(api.editChannel)) // 修改通道(所有协议) - group.DELETE("/:id", web.WrapH(api.delChannel)) // 删除通道(RTMP/RTSP) - group.POST("/:id/play", web.WrapH(api.play)) // 播放(所有协议) - group.POST("/:id/snapshot", web.WrapH(api.refreshSnapshot)) // 图像抓拍(所有协议) - group.GET("/:id/snapshot", api.getSnapshot) // 获取图像(所有协议) - group.POST("/:id/zones", web.WrapH(api.addZone)) // 添加区域(所有协议) - group.GET("/:id/zones", web.WrapH(api.getZones)) // 获取区域(所有协议) - group.POST("/:id/ai/enable", web.WrapH(api.enableAI)) // 启用 AI 检测 - group.POST("/:id/ai/disable", web.WrapH(api.disableAI)) // 禁用 AI 检测 + group.GET("", web.WrapH(api.findChannel)) // 通道列表(所有协议) + group.POST("", web.WrapH(api.addChannel)) // 添加通道(RTMP/RTSP) + group.PUT("/:id", web.WrapH(api.editChannel)) // 修改通道(所有协议) + group.DELETE("/:id", web.WrapH(api.delChannel)) // 删除通道(RTMP/RTSP) + group.POST("/:id/play", web.WrapH(api.play)) // 播放(所有协议) + group.POST("/:id/snapshot", web.WrapH(api.refreshSnapshot)) // 图像抓拍(所有协议) + group.GET("/:id/snapshot", api.getSnapshot) // 获取图像(所有协议) + group.POST("/:id/zones", web.WrapH(api.addZone)) // 添加区域(所有协议) + group.GET("/:id/zones", web.WrapH(api.getZones)) // 获取区域(所有协议) + group.POST("/:id/ai/enable", web.WrapH(api.enableAI)) // 启用 AI 检测 + group.POST("/:id/ai/disable", web.WrapH(api.disableAI)) // 禁用 AI 检测 + group.POST("/:id/record_mode", web.WrapH(api.setRecordMode)) // 设置录像模式 } } @@ -651,3 +652,44 @@ func (a IPCAPI) buildRTSPURL(ctx context.Context, channelID string) (string, err return fmt.Sprintf("rtsp://%s:%d/%s/%s", "127.0.0.1", svr.Ports.RTSP, app, stream), nil } + +// setRecordModeInput 设置录像模式请求参数 +type setRecordModeInput struct { + // 录像模式:always-一直录制,ai-按AI触发录制,none-不录制 + Mode string `json:"mode" binding:"required,oneof=always ai none"` +} + +// setRecordMode 设置通道的录像模式,支持三种模式:always(一直录制)、ai(AI触发录制)、none(不录制) +// always 和 ai 都会启用录制 +func (a IPCAPI) setRecordMode(c *gin.Context, in *setRecordModeInput) (gin.H, error) { + channelID := c.Param("id") + ctx := c.Request.Context() + + // 更新通道的录像模式 + channel, err := a.ipc.SetRecordMode(ctx, channelID, in.Mode) + if err != nil { + return nil, err + } + + // 根据录像模式控制 ZLM 录制: + // - always/ai: 如果流在线则启动录制 + // - none: 停止录制 + if channel.Ext.IsNoneRecord() { + // none 模式:停止录制 + if err := a.recordingCore.StopRecording(ctx, channel.GetApp(), channel.GetStream()); err != nil { + slog.WarnContext(ctx, "停止录制失败", "channel", channelID, "err", err) + } + } else { + // always/ai 模式:如果流在线则启动录制 + if channel.IsOnline { + if err := a.recordingCore.StartRecording(ctx, channel.Type, channel.GetApp(), channel.GetStream()); err != nil { + slog.WarnContext(ctx, "启动录制失败", "channel", channelID, "err", err) + } + } + } + + return gin.H{ + "id": channel.ID, + "record_mode": channel.Ext.GetRecordMode(), + }, nil +} diff --git a/internal/web/api/zlm_webhook.go b/internal/web/api/zlm_webhook.go index 3048734..3e35be5 100644 --- a/internal/web/api/zlm_webhook.go +++ b/internal/web/api/zlm_webhook.go @@ -57,6 +57,18 @@ func registerZLMWebhookAPI(r gin.IRouter, api WebHookAPI, handler ...gin.Handler } } +// getChannelType 通过 app+stream 查询通道获取类型 +// 支持自定义 app/stream 的 RTMP/RTSP 通道:先按 app+stream 查询,查不到再按 id=stream 查询 +// 如果都找不到,则回退到使用 stream 前缀判断类型 +func (w WebHookAPI) getChannelType(ctx context.Context, app, stream string) string { + ch, err := w.ipcCore.GetChannelByAppStreamOrID(ctx, app, stream) + if err == nil { + return ch.Type + } + // 回退:使用 stream 前缀判断类型(兼容旧逻辑) + return ipc.GetType(stream) +} + func (w WebHookAPI) onServerStarted(c *gin.Context, _ *struct{}) (DefaultOutput, error) { w.log.InfoContext(c.Request.Context(), "webhook onServerStarted") // 所有 rtmp 通道离线 @@ -81,8 +93,8 @@ func (w WebHookAPI) onPublish(c *gin.Context, in *onPublishInput) (*onPublishOut ctx := c.Request.Context() w.log.Info("webhook onPublish", "app", in.App, "stream", in.Stream, "schema", in.Schema, "mediaServerID", in.MediaServerID) - // 通过 stream 获取通道类型 - channelType := ipc.GetType(in.Stream) + // 通过 app+stream 查询通道获取类型,支持自定义 app/stream + channelType := w.getChannelType(ctx, in.App, in.Stream) // 获取协议适配器,检查是否实现了 OnPublisher 接口 protocol, ok := w.protocols[channelType] @@ -112,7 +124,7 @@ func (w WebHookAPI) onPublish(c *gin.Context, in *onPublishInput) (*onPublishOut paramsMap["media_server_id"] = in.MediaServerID // 调用协议适配器的 OnPublish 方法 - allowed, err := publisher.OnPublish(ctx, in.Stream, paramsMap) + allowed, err := publisher.OnPublish(ctx, in.App, in.Stream, paramsMap) if err != nil { return &onPublishOutput{DefaultOutput: DefaultOutput{Code: 1, Msg: err.Error()}}, nil } @@ -138,13 +150,27 @@ func (w WebHookAPI) onStreamChanged(c *gin.Context, in *onStreamChangedInput) (D app = in.AppName } - // 获取通道类型 - channelType := ipc.GetType(stream) + // 通过 app+stream 查询通道获取类型,支持自定义 app/stream + channelType := w.getChannelType(ctx, app, stream) if in.Regist { - // 流注册时启动录制 - if err := w.recordingCore.StartRecording(ctx, channelType, app, stream); err != nil { - w.log.WarnContext(ctx, "启动录制失败", "stream", stream, "err", err) + // 流注册时根据录像模式决定是否启动录制 + ch, err := w.ipcCore.GetChannelByAppStreamOrID(ctx, app, stream) + if err != nil { + w.log.WarnContext(ctx, "获取通道信息失败,尝试启动录制", "stream", stream, "err", err) + // 找不到通道时仍尝试按旧逻辑启动录制 + if err := w.recordingCore.StartRecording(ctx, channelType, app, stream); err != nil { + w.log.WarnContext(ctx, "启动录制失败", "stream", stream, "err", err) + } + return newDefaultOutputOK(), nil + } + + if !ch.Ext.IsNoneRecord() { + // always 模式:自动启动录制 + if err := w.recordingCore.StartRecording(ctx, channelType, app, stream); err != nil { + w.log.WarnContext(ctx, "启动录制失败", "stream", stream, "err", err) + } + w.log.InfoContext(ctx, "自动启动录制(always模式)", "stream", stream) } return newDefaultOutputOK(), nil } @@ -158,7 +184,7 @@ func (w WebHookAPI) onStreamChanged(c *gin.Context, in *onStreamChangedInput) (D // 每个协议适配器在 OnStreamChanged 中处理自己的状态逻辑 protocol, ok := w.protocols[channelType] if ok { - if err := protocol.OnStreamChanged(ctx, stream); err != nil { + if err := protocol.OnStreamChanged(ctx, app, stream); err != nil { slog.ErrorContext(ctx, "webhook onStreamChanged", "err", err) } } @@ -197,8 +223,21 @@ func (w WebHookAPI) onStreamNoneReader(c *gin.Context, in *onStreamNoneReaderInp w.log.WarnContext(ctx, "更新播放状态失败", "stream", in.Stream, "err", err) } - // 存在录像计划时,不关闭流 - return onStreamNoneReaderOutput{Close: true}, nil + // 根据录像模式判断是否关闭流: + // - none(不录制): 无人观看时关闭流 + // - always/ai(有录像计划): 无人观看时保持流不关闭 + ch, err := w.ipcCore.GetChannelByAppStreamOrID(ctx, in.App, in.Stream) + if err != nil { + // 找不到通道时默认关闭流 + w.log.WarnContext(ctx, "获取通道失败,默认关闭流", "stream", in.Stream, "err", err) + return onStreamNoneReaderOutput{Close: true}, nil + } + + // 如果录像模式为 none,则关闭流;否则保持流不关闭以继续录制 + shouldClose := ch.Ext.IsNoneRecord() + w.log.InfoContext(ctx, "无人观看判断", "stream", in.Stream, "record_mode", ch.Ext.GetRecordMode(), "close", shouldClose) + + return onStreamNoneReaderOutput{Close: shouldClose}, nil } // onRTPServerTimeout RTP 服务器超时事件 @@ -212,7 +251,8 @@ func (w WebHookAPI) onRTPServerTimeout(c *gin.Context, in *onRTPServerTimeoutInp // onStreamNotFound 流不存在事件 // TODO: 重启后立即播放,会出发 "channel not exist" 待处理 func (w WebHookAPI) onStreamNotFound(c *gin.Context, in *onStreamNotFoundInput) (DefaultOutput, error) { - w.log.InfoContext(c.Request.Context(), "webhook onStreamNotFound", "app", in.App, "stream", in.Stream, "schema", in.Schema, "mediaServerID", in.MediaServerID) + ctx := c.Request.Context() + w.log.InfoContext(ctx, "webhook onStreamNotFound", "app", in.App, "stream", in.Stream, "schema", in.Schema, "mediaServerID", in.MediaServerID) stream := in.StreamName app := in.AppName @@ -225,11 +265,12 @@ func (w WebHookAPI) onStreamNotFound(c *gin.Context, in *onStreamNotFoundInput) } } - r := ipc.GetType(stream) - protocol, ok := w.protocols[r] + // 通过 app+stream 查询通道获取类型,支持自定义 app/stream + channelType := w.getChannelType(ctx, app, stream) + protocol, ok := w.protocols[channelType] if ok { - if err := protocol.OnStreamNotFound(c.Request.Context(), app, stream); err != nil { - slog.InfoContext(c.Request.Context(), "webhook onStreamNotFound", "err", err) + if err := protocol.OnStreamNotFound(ctx, app, stream); err != nil { + slog.InfoContext(ctx, "webhook onStreamNotFound", "err", err) } } @@ -267,15 +308,15 @@ func (w WebHookAPI) onRecordMP4(c *gin.Context, in *onRecordMP4Input) (DefaultOu startTime := time.Unix(in.StartTime, 0) endTime := startTime.Add(time.Duration(in.TimeLen * float64(time.Second))) - // 通过 stream 查找 channel ID + // 通过 app+stream 查找 channel ID,支持自定义 app/stream var cid string - ch, err := w.ipcCore.GetChannelByStream(ctx, in.Stream) + ch, err := w.ipcCore.GetChannelByAppStreamOrID(ctx, in.App, in.Stream) if err == nil { cid = ch.ID } else { // 如果找不到通道,使用 stream 作为 CID 的标识 cid = in.Stream - w.log.WarnContext(ctx, "未找到对应通道,使用 stream 作为 CID", "stream", in.Stream) + w.log.WarnContext(ctx, "未找到对应通道,使用 stream 作为 CID", "app", in.App, "stream", in.Stream) } // 入库 diff --git a/pkg/zlm/config.go b/pkg/zlm/config.go index 0bba479..4dfce80 100644 --- a/pkg/zlm/config.go +++ b/pkg/zlm/config.go @@ -372,6 +372,7 @@ func (e *Engine) SetServerConfig(in *SetServerConfigRequest) (*SetServerConfigRe if err := e.ErrHandle(resp.Code, resp.Msg); err != nil { return nil, err } + return &resp, nil }