diff --git a/README.md b/README.md index 7f8471a..1629568 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ go wvp 是 Go 语言实现的开源 GB28181 解决方案,基于 GB28181-2022 ## 在线演示平台 -+ [在线演示平台 :) (服务器已过期,暂不提供演示)](http://wvp.golang.space:15123/) ++ [在线演示平台 :) ](http://gowvp.golang.space:15123/) ![](./docs/demo/play.gif) @@ -35,7 +35,7 @@ go wvp 是 Go 语言实现的开源 GB28181 解决方案,基于 GB28181-2022 ## 开源库 -感谢 @panjjo 大佬的开源库 [panjjo/gosip](https://github.com/panjjo/gosip),GoWVP 的 sip 信令基于此库,出于底层封装需要,并非直接 go mod 依赖该项目,而是源代码放到了 pkg 包中。 +感谢 @panjjo 大佬的开源库 [panjjo/gosip](https://github.com/panjjo/gosip),GoWVP 的 sip 信令基于此库,出于底层封装需要,并非直接依赖该项目,而是源代码放到了 pkg 包中。 流媒体服务基于@夏楚 [ZLMediaKit](https://github.com/ZLMediaKit/ZLMediaKit) @@ -102,8 +102,11 @@ zlm 能否访问到 gowvp?? docker 合并版本填写 127.0.0.1 即可,分离 在反向代理那里配置以下参数,其中域名根据实际的填写 proxy_set_header X-Forwarded-Host $host; + proxy_set_header X-Forwarded-Prefix "https://gowvp.com"; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; diff --git a/internal/app/app.go b/internal/app/app.go index 4f11dde..b54c6f4 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -35,7 +35,7 @@ func Run(bc *conf.Bootstrap) { go setupZLM(ctx, bc.ConfigDir) // 如果需要执行表迁移,递增此版本号和表更新说明 - versionapi.DBVersion = "0.0.12" + versionapi.DBVersion = "0.0.14" versionapi.DBRemark = "add stream proxy" handler, cleanUp, err := wireApp(bc, log) diff --git a/internal/app/wire_gen.go b/internal/app/wire_gen.go index a9457d8..98bea50 100644 --- a/internal/app/wire_gen.go +++ b/internal/app/wire_gen.go @@ -7,14 +7,13 @@ package app import ( - "log/slog" - "net/http" - "github.com/gowvp/gb28181/internal/conf" "github.com/gowvp/gb28181/internal/data" "github.com/gowvp/gb28181/internal/web/api" "github.com/gowvp/gb28181/pkg/gbs" "github.com/ixugo/goddd/domain/version/versionapi" + "log/slog" + "net/http" ) // Injectors from wire.go: @@ -30,13 +29,14 @@ func wireApp(bc *conf.Bootstrap, log *slog.Logger) (http.Handler, func(), error) smsAPI := api.NewSmsAPI(smsCore) uniqueidCore := api.NewUniqueID(db) pushCore := api.NewPushCore(db, uniqueidCore) - storer := api.NewGB28181Store(db) - gb28181 := api.NewGB28181(storer, uniqueidCore) - server, cleanup := gbs.NewServer(bc, gb28181, smsCore) - gb28181Core := api.NewGB28181Core(storer, uniqueidCore) - webHookAPI := api.NewWebHookAPI(smsCore, pushCore, bc, server, gb28181Core) + storer := api.NewIPCStore(db) + gbdAdapter := api.NewGBAdapter(storer, uniqueidCore) + server, cleanup := gbs.NewServer(bc, gbdAdapter, smsCore) + v := api.NewProtocols(storer) + ipcCore := api.NewIPCCore(storer, uniqueidCore, v) + webHookAPI := api.NewWebHookAPI(smsCore, pushCore, bc, server, ipcCore) pushAPI := api.NewPushAPI(pushCore, smsCore, bc) - gb28181API := api.NewGB28181API(gb28181Core) + ipcapi := api.NewIPCAPI(ipcCore) proxyAPI := api.NewProxyAPI(db, uniqueidCore) configAPI := api.NewConfigAPI(db, bc) userAPI := api.NewUserAPI(bc) @@ -48,7 +48,7 @@ func wireApp(bc *conf.Bootstrap, log *slog.Logger) (http.Handler, func(), error) WebHookAPI: webHookAPI, UniqueID: uniqueidCore, MediaAPI: pushAPI, - GB28181API: gb28181API, + GB28181API: ipcapi, ProxyAPI: proxyAPI, ConfigAPI: configAPI, SipServer: server, diff --git a/internal/core/bz/param.go b/internal/core/bz/param.go index 225b494..040c021 100644 --- a/internal/core/bz/param.go +++ b/internal/core/bz/param.go @@ -2,6 +2,7 @@ package bz const ( IDPrefixGB = "gb" // 国标设备 + IDPrefixOnvif = "on" // 国标设备 IDPrefixGBChannel = "ch" // 国标通道 id 前缀 IDPrefixRTMP = "mp" // rtmp ID 前缀,取 rtmp 后缀的 mp,不好记但是清晰 IDPrefixRTSP = "sp" // rtsp ID 前缀,取 rtsp 后缀的 sp,不好记但是清晰 diff --git a/internal/core/config/model.go b/internal/core/config/model.go index b127eb7..86e847c 100755 --- a/internal/core/config/model.go +++ b/internal/core/config/model.go @@ -10,7 +10,7 @@ type Ext struct { // Scan implements orm.Scaner. func (i *Ext) Scan(input interface{}) error { - return orm.JsonUnmarshal(input, i) + return orm.JSONUnmarshal(input, i) } type SIPConfig struct { diff --git a/internal/core/gb28181/core.go b/internal/core/gb28181/core.go deleted file mode 100755 index 49b333a..0000000 --- a/internal/core/gb28181/core.go +++ /dev/null @@ -1,21 +0,0 @@ -// Code generated by godddx, DO AVOID EDIT. -package gb28181 - -import "github.com/ixugo/goddd/domain/uniqueid" - -// Storer data persistence -type Storer interface { - Device() DeviceStorer - Channel() ChannelStorer -} - -// Core business domain -type Core struct { - store Storer - uniqueID uniqueid.Core -} - -// NewCore create business domain -func NewCore(store Storer, uni uniqueid.Core) Core { - return Core{store: store, uniqueID: uni} -} diff --git a/internal/core/gb28181/channel.go b/internal/core/ipc/channel.go similarity index 99% rename from internal/core/gb28181/channel.go rename to internal/core/ipc/channel.go index d794b45..5b17039 100755 --- a/internal/core/gb28181/channel.go +++ b/internal/core/ipc/channel.go @@ -1,5 +1,5 @@ // Code generated by godddx, DO AVOID EDIT. -package gb28181 +package ipc import ( "context" diff --git a/internal/core/gb28181/channel.model.go b/internal/core/ipc/channel.model.go similarity index 87% rename from internal/core/gb28181/channel.model.go rename to internal/core/ipc/channel.model.go index 3465872..1d071f5 100755 --- a/internal/core/gb28181/channel.model.go +++ b/internal/core/ipc/channel.model.go @@ -1,5 +1,5 @@ // Code generated by godddx, DO AVOID EDIT. -package gb28181 +package ipc import "github.com/ixugo/goddd/pkg/orm" @@ -22,3 +22,16 @@ type Channel struct { func (*Channel) TableName() string { return "channels" } + +// 实现 port.Channel 接口 +func (c *Channel) GetID() string { + return c.ID +} + +func (c *Channel) GetChannelID() string { + return c.ChannelID +} + +func (c *Channel) GetDeviceID() string { + return c.DeviceID +} diff --git a/internal/core/gb28181/channel.param.go b/internal/core/ipc/channel.param.go similarity index 98% rename from internal/core/gb28181/channel.param.go rename to internal/core/ipc/channel.param.go index f47107e..df68fb1 100755 --- a/internal/core/gb28181/channel.param.go +++ b/internal/core/ipc/channel.param.go @@ -1,5 +1,5 @@ // Code generated by godddx, DO AVOID EDIT. -package gb28181 +package ipc import "github.com/ixugo/goddd/pkg/web" diff --git a/internal/core/ipc/core.go b/internal/core/ipc/core.go new file mode 100755 index 0000000..e0f75ff --- /dev/null +++ b/internal/core/ipc/core.go @@ -0,0 +1,29 @@ +// Code generated by godddx, DO AVOID EDIT. +package ipc + +import ( + "github.com/gowvp/gb28181/internal/core/port" + "github.com/ixugo/goddd/domain/uniqueid" +) + +// Storer data persistence +type Storer interface { + Device() DeviceStorer + Channel() ChannelStorer +} + +// Core business domain +type Core struct { + store Storer + uniqueID uniqueid.Core + protocols map[string]port.Protocol // 协议映射 +} + +// NewCore create business domain +func NewCore(store Storer, uni uniqueid.Core, protocols map[string]port.Protocol) Core { + return Core{ + store: store, + uniqueID: uni, + protocols: protocols, + } +} diff --git a/internal/core/gb28181/device.go b/internal/core/ipc/device.go similarity index 86% rename from internal/core/gb28181/device.go rename to internal/core/ipc/device.go index 69a273e..01f165d 100755 --- a/internal/core/gb28181/device.go +++ b/internal/core/ipc/device.go @@ -1,5 +1,5 @@ // Code generated by godddx, DO AVOID EDIT. -package gb28181 +package ipc import ( "context" @@ -102,18 +102,40 @@ func (c Core) AddDevice(ctx context.Context, in *AddDeviceInput) (*Device, error if err := copier.Copy(&out, in); err != nil { slog.ErrorContext(ctx, "Copy", "err", err) } - out.ID = c.uniqueID.UniqueID(bz.IDPrefixGB) + + // 协议验证(通过接口调用) + if protocol, ok := c.protocols[out.Type]; ok { + if err := protocol.ValidateDevice(ctx, &out); err != nil { + return nil, reason.ErrBadRequest.SetMsg(err.Error()) + } + } + + if out.IsOnvif() { + out.ID = c.uniqueID.UniqueID(bz.IDPrefixOnvif) + out.DeviceID = out.ID + } else { + out.ID = c.uniqueID.UniqueID(bz.IDPrefixGB) + } if err := out.Check(); err != nil { return nil, reason.ErrBadRequest.SetMsg(err.Error()) } + // 持久化到数据库 if err := c.store.Device().Add(ctx, &out); err != nil { if orm.IsDuplicatedKey(err) { return nil, reason.ErrDB.SetMsg("国标 ID 重复,请勿重复添加") } return nil, reason.ErrDB.Withf(`Add err[%s]`, err.Error()) } + + // 初始化协议连接(失败不影响设备添加) + if protocol, ok := c.protocols[out.Type]; ok { + if err := protocol.InitDevice(ctx, &out); err != nil { + slog.WarnContext(ctx, "初始化协议失败", "err", err, "device_id", out.ID) + } + } + return &out, nil } diff --git a/internal/core/gb28181/device.model.go b/internal/core/ipc/device.model.go similarity index 66% rename from internal/core/gb28181/device.model.go rename to internal/core/ipc/device.model.go index e697ce6..eed89fa 100755 --- a/internal/core/gb28181/device.model.go +++ b/internal/core/ipc/device.model.go @@ -1,8 +1,9 @@ // Code generated by godddx, DO AVOID EDIT. -package gb28181 +package ipc import ( "fmt" + "strings" "github.com/ixugo/goddd/pkg/orm" ) @@ -10,9 +11,10 @@ import ( // Device domain model type Device struct { ID string `gorm:"primaryKey" json:"id"` + Type string `gorm:"column:type;notNull;default:'';comment:设备类型(onvif)" json:"type"` // 设备类型(onvif/gb28181) DeviceID string `gorm:"column:device_id;notNull;uniqueIndex;default:'';comment:20 位国标编号" json:"device_id"` // 20 位国标编号 Name string `gorm:"column:name;notNull;default:'';comment:设备名称" json:"name"` // 设备名称 - Trasnport string `gorm:"column:trasnport;notNull;default:'';comment:传输协议(tcp/udp)" json:"trasnport"` // 传输协议(TCP/UDP) + Transport string `gorm:"column:transport;notNull;default:'';comment:传输协议(tcp/udp)" json:"transport"` // 传输协议(TCP/UDP) StreamMode int8 `gorm:"column:stream_mode;notNull;default:1;comment:数据传输模式(0:UDP; 1:TCP_PASSIVE; 2:TCP_ACTIVE)" json:"stream_mode"` // 数据传输模式 IP string `gorm:"column:ip;notNull;default:''" json:"ip"` Port int `gorm:"column:port;notNull;default:0" json:"port"` @@ -27,19 +29,43 @@ type Device struct { Password string `gorm:"column:password;notNull;default:'';comment:注册密码" json:"password"` Address string `gorm:"column:address;notNull;default:'';comment:设备网络地址" json:"address"` Ext DeviceExt `gorm:"column:ext;notNull;default:'{}';type:jsonb;comment:设备属性" json:"ext"` // 设备属性 + // onvif + Username string `gorm:"column:username;notNull;default:'';comment:用户名" json:"username"` Children []*Channel `gorm:"-" json:"children,omitzero"` } +func (d *Device) IsOnvif() bool { + return strings.ToUpper(d.Type) == "ONVIF" +} + +func (d *Device) IsGB28181() bool { + return strings.ToUpper(d.Type) == "GB28181" || d.Type == "" +} + // TableName database table name func (*Device) TableName() string { return "devices" } func (d Device) Check() error { - if len(d.DeviceID) < 18 { + if d.IsGB28181() && len(d.DeviceID) < 18 { return fmt.Errorf("国标 ID 长度应大于等于 18 位") } + if d.IsOnvif() { + if d.Username == "" { + return fmt.Errorf("用户名不能为空") + } + if d.Password == "" { + return fmt.Errorf("密码不能为空") + } + if d.IP == "" { + return fmt.Errorf("IP不能为空") + } + if d.Port == 0 { + return fmt.Errorf("端口不能为空") + } + } return nil } @@ -49,5 +75,34 @@ func (d *Device) init(id, deviceID string) { } func (d *Device) NetworkAddress() string { - return d.Trasnport + "://" + d.Address + return d.Transport + "://" + d.Address +} + +// 实现 port.Device 接口 +func (d *Device) GetID() string { + return d.ID +} + +func (d *Device) GetDeviceID() string { + return d.DeviceID +} + +func (d *Device) GetType() string { + return d.Type +} + +func (d *Device) GetIP() string { + return d.IP +} + +func (d *Device) GetPort() int { + return d.Port +} + +func (d *Device) GetUsername() string { + return d.Username +} + +func (d *Device) GetPassword() string { + return d.Password } diff --git a/internal/core/gb28181/device.param.go b/internal/core/ipc/device.param.go similarity index 81% rename from internal/core/gb28181/device.param.go rename to internal/core/ipc/device.param.go index 5671fbf..bc78edd 100755 --- a/internal/core/gb28181/device.param.go +++ b/internal/core/ipc/device.param.go @@ -1,5 +1,5 @@ // Code generated by godddx, DO AVOID EDIT. -package gb28181 +package ipc import ( "github.com/ixugo/goddd/pkg/web" @@ -11,7 +11,7 @@ type FindDeviceInput struct { // DeviceID string `form:"device_id"` // 20 位国标编号 // Name string `form:"name"` // 设备名称 // ID string `form:"id"` - // Trasnport string `form:"trasnport"` // 传输协议(TCP/UDP) + // Transport string `form:"Transport"` // 传输协议(TCP/UDP) // StreamMode string `form:"stream_mode"` // 数据传输模式(UDP/TCP_PASSIVE,TCP_ACTIVE) // IP string `form:"ip"` // Port int `form:"port"` @@ -41,12 +41,22 @@ type EditDeviceInput struct { // Ext DeviceExt `json:"ext"` // 设备属性 } +// 目前仅应用于 onvif 添加 type AddDeviceInput struct { DeviceID string `json:"device_id"` // 20 位国标编号 - Name string `json:"name"` // 设备名称 - Password string `json:"password"` // 注册密码 + // onvif + Username string `json:"username"` // 用户名 + IP string `json:"ip"` // ip + Port int `json:"port"` // port + // 通用 + Name string `json:"name"` // 设备名称 + Password string `json:"password"` // 注册密码 - // Trasnport string `json:"trasnport"` // 传输协议(TCP/UDP) + Type string `json:"type"` // 设备类型(onvif/gb28181) + + // Addr string `json:"addr"` // 地址(ip:port) + + // Transport string `json:"transport"` // 传输协议(TCP/UDP) // StreamMode string `json:"stream_mode"` // 数据传输模式(UDP/TCP_PASSIVE,TCP_ACTIVE) // IP string `json:"ip"` // Port int `json:"port"` diff --git a/internal/core/gb28181/gbs.go b/internal/core/ipc/gbs.go similarity index 78% rename from internal/core/gb28181/gbs.go rename to internal/core/ipc/gbs.go index 4a79de3..c6df210 100644 --- a/internal/core/gb28181/gbs.go +++ b/internal/core/ipc/gbs.go @@ -1,4 +1,4 @@ -package gb28181 +package ipc import ( "context" @@ -9,25 +9,25 @@ import ( "github.com/ixugo/goddd/pkg/web" ) -type GB28181 struct { +type GBDAdapter struct { // deviceStore DeviceStorer // channelStore ChannelStorer store Storer uni uniqueid.Core } -func NewGB28181(store Storer, uni uniqueid.Core) GB28181 { - return GB28181{ +func NewGBAdapter(store Storer, uni uniqueid.Core) GBDAdapter { + return GBDAdapter{ store: store, uni: uni, } } -func (g GB28181) Store() Storer { +func (g GBDAdapter) Store() Storer { return g.store } -func (g GB28181) GetDeviceByDeviceID(deviceID string) (*Device, error) { +func (g GBDAdapter) GetDeviceByDeviceID(deviceID string) (*Device, error) { ctx := context.TODO() var d Device if err := g.store.Device().Get(ctx, &d, orm.Where("device_id=?", deviceID)); err != nil { @@ -42,7 +42,7 @@ func (g GB28181) GetDeviceByDeviceID(deviceID string) (*Device, error) { return &d, nil } -func (g GB28181) Logout(deviceID string, changeFn func(*Device)) error { +func (g GBDAdapter) Logout(deviceID string, changeFn func(*Device)) error { var d Device if err := g.store.Device().Edit(context.TODO(), &d, func(d *Device) { changeFn(d) @@ -53,7 +53,7 @@ func (g GB28181) Logout(deviceID string, changeFn func(*Device)) error { return nil } -func (g GB28181) Edit(deviceID string, changeFn func(*Device)) error { +func (g GBDAdapter) Edit(deviceID string, changeFn func(*Device)) error { var d Device if err := g.store.Device().Edit(context.TODO(), &d, func(d *Device) { changeFn(d) @@ -64,7 +64,7 @@ func (g GB28181) Edit(deviceID string, changeFn func(*Device)) error { return nil } -func (g GB28181) EditPlaying(deviceID, channelID string, playing bool) error { +func (g GBDAdapter) EditPlaying(deviceID, channelID string, playing bool) error { var ch Channel if err := g.store.Channel().Edit(context.TODO(), &ch, func(c *Channel) { c.IsPlaying = playing @@ -74,7 +74,7 @@ func (g GB28181) EditPlaying(deviceID, channelID string, playing bool) error { return nil } -func (g GB28181) SaveChannels(channels []*Channel) error { +func (g GBDAdapter) SaveChannels(channels []*Channel) error { if len(channels) <= 0 { return nil } @@ -106,7 +106,7 @@ func (g GB28181) SaveChannels(channels []*Channel) error { } // FindDevices 获取所有设备 -func (g GB28181) FindDevices(ctx context.Context) ([]*Device, error) { +func (g GBDAdapter) FindDevices(ctx context.Context) ([]*Device, error) { var devices []*Device if _, err := g.store.Device().Find(ctx, &devices, web.NewPagerFilterMaxSize()); err != nil { return nil, err diff --git a/internal/core/gb28181/model.go b/internal/core/ipc/model.go similarity index 84% rename from internal/core/gb28181/model.go rename to internal/core/ipc/model.go index 73e825c..f778a5b 100755 --- a/internal/core/gb28181/model.go +++ b/internal/core/ipc/model.go @@ -1,5 +1,5 @@ // Code generated by godddx, DO AVOID EDIT. -package gb28181 +package ipc import ( "database/sql/driver" @@ -14,11 +14,12 @@ type DeviceExt struct { Model string `json:"model"` // 型号 Firmware string `json:"firmware"` // 固件版本 Name string `json:"name"` // 设备名 + GBVersion string `json:"gb_version"` // GB版本 } // Scan implements orm.Scaner. func (i *DeviceExt) Scan(input interface{}) error { - return orm.JsonUnmarshal(input, i) + return orm.JSONUnmarshal(input, i) } func (i DeviceExt) Value() (driver.Value, error) { diff --git a/internal/core/gb28181/store/gb28181cache/cache.go b/internal/core/ipc/store/ipccache/cache.go similarity index 96% rename from internal/core/gb28181/store/gb28181cache/cache.go rename to internal/core/ipc/store/ipccache/cache.go index a03c808..7adbb16 100644 --- a/internal/core/gb28181/store/gb28181cache/cache.go +++ b/internal/core/ipc/store/ipccache/cache.go @@ -1,4 +1,4 @@ -package gb28181cache +package ipccache import ( "context" @@ -6,7 +6,7 @@ import ( "log/slog" "strings" - "github.com/gowvp/gb28181/internal/core/gb28181" + gb28181 "github.com/gowvp/gb28181/internal/core/ipc" "github.com/gowvp/gb28181/pkg/gbs" "github.com/gowvp/gb28181/pkg/gbs/sip" "github.com/ixugo/goddd/pkg/conc" @@ -54,7 +54,7 @@ func (c *Cache) LoadDeviceToMemory(conn sip.Connection) { } for _, d := range devices { - if strings.ToLower(d.Trasnport) == "tcp" { + if strings.ToLower(d.Transport) == "tcp" { // 通知相关设备/通道离线 c.Change(d.DeviceID, func(d *gb28181.Device) { d.IsOnline = false diff --git a/internal/core/gb28181/store/gb28181cache/channel.go b/internal/core/ipc/store/ipccache/channel.go similarity index 95% rename from internal/core/gb28181/store/gb28181cache/channel.go rename to internal/core/ipc/store/ipccache/channel.go index f815ca2..15cb336 100644 --- a/internal/core/gb28181/store/gb28181cache/channel.go +++ b/internal/core/ipc/store/ipccache/channel.go @@ -1,9 +1,9 @@ -package gb28181cache +package ipccache import ( "context" - "github.com/gowvp/gb28181/internal/core/gb28181" + gb28181 "github.com/gowvp/gb28181/internal/core/ipc" "github.com/ixugo/goddd/pkg/orm" ) diff --git a/internal/core/gb28181/store/gb28181cache/device.go b/internal/core/ipc/store/ipccache/device.go similarity index 96% rename from internal/core/gb28181/store/gb28181cache/device.go rename to internal/core/ipc/store/ipccache/device.go index 331528b..9b6726a 100644 --- a/internal/core/gb28181/store/gb28181cache/device.go +++ b/internal/core/ipc/store/ipccache/device.go @@ -1,11 +1,11 @@ -package gb28181cache +package ipccache import ( "context" "fmt" "log/slog" - "github.com/gowvp/gb28181/internal/core/gb28181" + gb28181 "github.com/gowvp/gb28181/internal/core/ipc" "github.com/gowvp/gb28181/pkg/gbs" "github.com/ixugo/goddd/pkg/orm" "gorm.io/gorm" diff --git a/internal/core/gb28181/store/gb28181db/channel.go b/internal/core/ipc/store/ipcdb/channel.go similarity index 96% rename from internal/core/gb28181/store/gb28181db/channel.go rename to internal/core/ipc/store/ipcdb/channel.go index fc687d0..e263bf9 100755 --- a/internal/core/gb28181/store/gb28181db/channel.go +++ b/internal/core/ipc/store/ipcdb/channel.go @@ -1,10 +1,10 @@ // Code generated by godddx, DO AVOID EDIT. -package gb28181db +package ipcdb import ( "context" - "github.com/gowvp/gb28181/internal/core/gb28181" + gb28181 "github.com/gowvp/gb28181/internal/core/ipc" "github.com/ixugo/goddd/pkg/orm" "gorm.io/gorm" ) diff --git a/internal/core/gb28181/store/gb28181db/channel_test.go b/internal/core/ipc/store/ipcdb/channel_test.go similarity index 84% rename from internal/core/gb28181/store/gb28181db/channel_test.go rename to internal/core/ipc/store/ipcdb/channel_test.go index cd9015b..3a48850 100755 --- a/internal/core/gb28181/store/gb28181db/channel_test.go +++ b/internal/core/ipc/store/ipcdb/channel_test.go @@ -1,10 +1,10 @@ -package gb28181db +package ipcdb import ( "context" "testing" - "github.com/gowvp/gb28181/internal/core/gb28181" + "github.com/gowvp/gb28181/internal/core/ipc" "github.com/ixugo/goddd/pkg/orm" ) @@ -16,7 +16,7 @@ func TestChannelGet(t *testing.T) { userDB := NewChannel(db) mock.ExpectQuery(`SELECT \* FROM "channels" WHERE id=\$1 (.+) LIMIT \$2`).WithArgs("jack", 1) - var out gb28181.Channel + var out ipc.Channel if err := userDB.Get(context.Background(), &out, orm.Where("id=?", "jack")); err != nil { t.Fatal(err) } diff --git a/internal/core/gb28181/store/gb28181db/db.go b/internal/core/ipc/store/ipcdb/db.go similarity index 90% rename from internal/core/gb28181/store/gb28181db/db.go rename to internal/core/ipc/store/ipcdb/db.go index 94fd084..f8d78e3 100755 --- a/internal/core/gb28181/store/gb28181db/db.go +++ b/internal/core/ipc/store/ipcdb/db.go @@ -1,8 +1,8 @@ // Code generated by godddx, DO AVOID EDIT. -package gb28181db +package ipcdb import ( - "github.com/gowvp/gb28181/internal/core/gb28181" + gb28181 "github.com/gowvp/gb28181/internal/core/ipc" "gorm.io/gorm" ) diff --git a/internal/core/gb28181/store/gb28181db/db_test.go b/internal/core/ipc/store/ipcdb/db_test.go similarity index 94% rename from internal/core/gb28181/store/gb28181db/db_test.go rename to internal/core/ipc/store/ipcdb/db_test.go index 2bf421c..d442ac8 100755 --- a/internal/core/gb28181/store/gb28181db/db_test.go +++ b/internal/core/ipc/store/ipcdb/db_test.go @@ -1,4 +1,4 @@ -package gb28181db +package ipcdb import ( "github.com/DATA-DOG/go-sqlmock" diff --git a/internal/core/gb28181/store/gb28181db/device.go b/internal/core/ipc/store/ipcdb/device.go similarity index 96% rename from internal/core/gb28181/store/gb28181db/device.go rename to internal/core/ipc/store/ipcdb/device.go index 5b48f5f..635b6b6 100755 --- a/internal/core/gb28181/store/gb28181db/device.go +++ b/internal/core/ipc/store/ipcdb/device.go @@ -1,10 +1,10 @@ // Code generated by godddx, DO AVOID EDIT. -package gb28181db +package ipcdb import ( "context" - "github.com/gowvp/gb28181/internal/core/gb28181" + gb28181 "github.com/gowvp/gb28181/internal/core/ipc" "github.com/ixugo/goddd/pkg/orm" "gorm.io/gorm" ) diff --git a/internal/core/gb28181/store/gb28181db/device_test.go b/internal/core/ipc/store/ipcdb/device_test.go similarity index 84% rename from internal/core/gb28181/store/gb28181db/device_test.go rename to internal/core/ipc/store/ipcdb/device_test.go index 10e973a..cfd5159 100755 --- a/internal/core/gb28181/store/gb28181db/device_test.go +++ b/internal/core/ipc/store/ipcdb/device_test.go @@ -1,10 +1,10 @@ -package gb28181db +package ipcdb import ( "context" "testing" - "github.com/gowvp/gb28181/internal/core/gb28181" + "github.com/gowvp/gb28181/internal/core/ipc" "github.com/ixugo/goddd/pkg/orm" ) @@ -16,7 +16,7 @@ func TestDeviceGet(t *testing.T) { userDB := NewDevice(db) mock.ExpectQuery(`SELECT \* FROM "devices" WHERE id=\$1 (.+) LIMIT \$2`).WithArgs("jack", 1) - var out gb28181.Device + var out ipc.Device if err := userDB.Get(context.Background(), &out, orm.Where("id=?", "jack")); err != nil { t.Fatal(err) } diff --git a/internal/core/port/protocol.go b/internal/core/port/protocol.go new file mode 100644 index 0000000..2636c6b --- /dev/null +++ b/internal/core/port/protocol.go @@ -0,0 +1,50 @@ +package port + +import ( + "context" +) + +// Device 设备接口(避免循环依赖) +// 协议适配器的实现会接收具体的 gb28181.Device 类型 +type Device interface { + GetID() string + GetDeviceID() string + GetType() string + GetIP() string + GetPort() int + GetUsername() string + GetPassword() string +} + +// Channel 通道接口(避免循环依赖) +type Channel interface { + GetID() string + GetChannelID() string + GetDeviceID() string +} + +// Protocol 协议抽象接口(端口) +type Protocol interface { + // ValidateDevice 验证设备连接(添加设备前调用) + ValidateDevice(ctx context.Context, device Device) error + + // InitDevice 初始化设备连接(添加设备后调用) + // 例如: GB28181 不需要主动初始化,ONVIF 需要查询 Profiles 作为通道 + InitDevice(ctx context.Context, device Device) error + + // QueryCatalog 查询设备目录/通道 + QueryCatalog(ctx context.Context, device Device) error + + // StartPlay 开始播放 + StartPlay(ctx context.Context, device Device, channel Channel) (*PlayResponse, error) + + // StopPlay 停止播放 + StopPlay(ctx context.Context, device Device, channel Channel) error +} + +// PlayResponse 播放响应 +type PlayResponse struct { + SSRC string // GB28181 SSRC + Stream string // 流 ID + RTSP string // RTSP 地址 (ONVIF) +} diff --git a/internal/core/sms/model.go b/internal/core/sms/model.go index af3baa2..ea63f12 100755 --- a/internal/core/sms/model.go +++ b/internal/core/sms/model.go @@ -25,7 +25,7 @@ type MediaServerPorts struct { // Scan implements orm.Scaner. func (i *MediaServerPorts) Scan(input interface{}) error { - return orm.JsonUnmarshal(input, i) + return orm.JSONUnmarshal(input, i) } // Value implements driver.Valuer. diff --git a/internal/protocol/onvif/adapter.go b/internal/protocol/onvif/adapter.go new file mode 100644 index 0000000..0bfc06c --- /dev/null +++ b/internal/protocol/onvif/adapter.go @@ -0,0 +1,157 @@ +package onvifadapter + +import ( + "context" + "fmt" + "log/slog" + + gb28181 "github.com/gowvp/gb28181/internal/core/ipc" + "github.com/gowvp/gb28181/internal/core/port" + "github.com/gowvp/onvif" + m "github.com/gowvp/onvif/media" + sdkmedia "github.com/gowvp/onvif/sdk/media" + "github.com/ixugo/goddd/pkg/orm" +) + +var _ port.Protocol = (*Adapter)(nil) + +// Adapter ONVIF 协议适配器 +type Adapter struct { + devices map[string]*onvif.Device // ONVIF 设备连接缓存 + store gb28181.Storer // 协议适配器可以依赖领域的存储接口 +} + +func NewAdapter(store gb28181.Storer) *Adapter { + return &Adapter{ + devices: make(map[string]*onvif.Device), + store: store, + } +} + +// ValidateDevice 实现 port.Protocol 接口 - ONVIF 设备验证 +func (a *Adapter) ValidateDevice(ctx context.Context, device port.Device) error { + // 尝试连接 ONVIF 设备并验证可以获取 Profiles + dev, err := onvif.NewDevice(onvif.DeviceParams{ + Xaddr: fmt.Sprintf("%s:%d", device.GetIP(), device.GetPort()), + Username: device.GetUsername(), + Password: device.GetPassword(), + }) + if err != nil { + return fmt.Errorf("ONVIF 连接失败: %w", err) + } + + // 验证可以获取 Profiles + _, err = sdkmedia.Call_GetProfiles(ctx, dev, m.GetProfiles{}) + if err != nil { + return fmt.Errorf("获取 ONVIF Profiles 失败: %w", err) + } + + return nil +} + +// InitDevice 实现 port.Protocol 接口 - 初始化 ONVIF 设备 +// ONVIF 设备初始化时,自动查询 Profiles 并创建为通道 +func (a *Adapter) InitDevice(ctx context.Context, device port.Device) error { + // 创建 ONVIF 连接 + dev, err := onvif.NewDevice(onvif.DeviceParams{ + Xaddr: fmt.Sprintf("%s:%d", device.GetIP(), device.GetPort()), + Username: device.GetUsername(), + Password: device.GetPassword(), + }) + if err != nil { + return err + } + + // 缓存设备连接 + a.devices[device.GetID()] = dev + + // 自动查询 Profiles 作为通道 + return a.queryAndSaveProfiles(ctx, device, dev) +} + +// QueryCatalog 实现 port.Protocol 接口 - ONVIF 查询 Profiles +func (a *Adapter) QueryCatalog(ctx context.Context, device port.Device) error { + dev, ok := a.devices[device.GetID()] + if !ok { + // 设备连接不在缓存中,尝试重新连接 + var err error + dev, err = onvif.NewDevice(onvif.DeviceParams{ + Xaddr: fmt.Sprintf("%s:%d", device.GetIP(), device.GetPort()), + Username: device.GetUsername(), + Password: device.GetPassword(), + }) + if err != nil { + return fmt.Errorf("ONVIF 设备未初始化: %w", err) + } + a.devices[device.GetID()] = dev + } + + return a.queryAndSaveProfiles(ctx, device, dev) +} + +// StartPlay 实现 port.Protocol 接口 - ONVIF 播放 +func (a *Adapter) StartPlay(ctx context.Context, device port.Device, channel port.Channel) (*port.PlayResponse, error) { + dev, ok := a.devices[device.GetID()] + if !ok { + return nil, fmt.Errorf("ONVIF 设备未初始化") + } + + // 获取 RTSP 地址 + streamURI, err := a.getStreamURI(ctx, dev, channel.GetChannelID()) + if err != nil { + return nil, err + } + + return &port.PlayResponse{ + RTSP: streamURI, + }, nil +} + +// StopPlay 实现 port.Protocol 接口 - ONVIF 停止播放 +func (a *Adapter) StopPlay(ctx context.Context, device port.Device, channel port.Channel) error { + // ONVIF 通常不需要显式停止播放 + return nil +} + +// queryAndSaveProfiles 查询 ONVIF Profiles 并保存为通道 +func (a *Adapter) queryAndSaveProfiles(ctx context.Context, device port.Device, dev *onvif.Device) error { + // 查询 ONVIF Profiles + resp, err := sdkmedia.Call_GetProfiles(ctx, dev, m.GetProfiles{}) + if err != nil { + return fmt.Errorf("获取 ONVIF Profiles 失败: %w", err) + } + + // 将 Profiles 转换为通道并保存 + for _, profile := range resp.Profiles { + channel := &gb28181.Channel{ + DeviceID: device.GetDeviceID(), + ChannelID: string(profile.Token), + Name: string(profile.Name), + DID: device.GetID(), + } + + // 保存到数据库(使用领域层的存储接口) + if err := a.store.Channel().Add(ctx, channel); err != nil { + // 如果是重复错误,忽略 + if orm.IsDuplicatedKey(err) { + slog.DebugContext(ctx, "通道已存在", "channel_id", channel.ChannelID) + continue + } + slog.ErrorContext(ctx, "保存通道失败", "err", err, "channel_id", channel.ChannelID) + continue + } + slog.InfoContext(ctx, "ONVIF Profile 保存为通道", "channel_id", channel.ChannelID, "name", channel.Name) + } + + return nil +} + +// getStreamURI 获取 RTSP 流地址 +func (a *Adapter) getStreamURI(ctx context.Context, dev *onvif.Device, profileToken string) (string, error) { + // TODO: 调用 ONVIF GetStreamUri 方法 + // 这里需要根据 onvif SDK 的实际 API 来实现 + + // 临时实现:假设 profileToken 可以直接构造 RTSP 地址 + params := dev.GetDeviceParams() + return fmt.Sprintf("rtsp://%s:%s@%s/stream/%s", params.Username, params.Password, params.Xaddr, profileToken), nil +} diff --git a/internal/web/api/config.go b/internal/web/api/config.go index 11a40a8..b49140d 100755 --- a/internal/web/api/config.go +++ b/internal/web/api/config.go @@ -21,7 +21,7 @@ type ConfigAPI struct { } func NewConfigAPI(db *gorm.DB, conf *conf.Bootstrap) ConfigAPI { - core := config.NewCore(configdb.NewDB(db).AutoMigrate(orm.EnabledAutoMigrate)) + core := config.NewCore(configdb.NewDB(db).AutoMigrate(orm.GetEnabledAutoMigrate())) return ConfigAPI{configCore: core, conf: conf} } diff --git a/internal/web/api/gb28181.go b/internal/web/api/ipc.go similarity index 61% rename from internal/web/api/gb28181.go rename to internal/web/api/ipc.go index 2078605..05a425c 100755 --- a/internal/web/api/gb28181.go +++ b/internal/web/api/ipc.go @@ -2,6 +2,7 @@ package api import ( + "encoding/json" "fmt" "io" "log/slog" @@ -12,11 +13,14 @@ import ( "time" "github.com/gin-gonic/gin" + "github.com/google/uuid" "github.com/gowvp/gb28181/internal/core/bz" - "github.com/gowvp/gb28181/internal/core/gb28181" + "github.com/gowvp/gb28181/internal/core/ipc" + "github.com/gowvp/gb28181/internal/core/port" "github.com/gowvp/gb28181/internal/core/push" "github.com/gowvp/gb28181/internal/core/sms" "github.com/gowvp/gb28181/pkg/zlm" + "github.com/gowvp/onvif" "github.com/ixugo/goddd/domain/uniqueid" "github.com/ixugo/goddd/pkg/hook" "github.com/ixugo/goddd/pkg/orm" @@ -33,7 +37,9 @@ const ( // TODO: 快照不会删除,只会覆盖,设备删除时也不会删除快照,待实现 func writeCover(dataDir, channelID string, body []byte) error { coverPath := filepath.Join(dataDir, coverDir) - os.MkdirAll(coverPath, 0o755) + if err := os.MkdirAll(coverPath, 0o777); err != nil { + return err + } return os.WriteFile(filepath.Join(coverPath, channelID+".jpg"), body, 0o644) } @@ -46,80 +52,100 @@ func readCover(dataDir, channelID string) ([]byte, error) { return os.ReadFile(readCoverPath(dataDir, channelID)) } -type GB28181API struct { - gb28181Core gb28181.Core - uc *Usecase +type IPCAPI struct { + ipc ipc.Core + uc *Usecase } -func NewGB28181API(core gb28181.Core) GB28181API { - return GB28181API{gb28181Core: core} +func NewIPCAPI(core ipc.Core) IPCAPI { + return IPCAPI{ipc: core} } -func NewGB28181Core(store gb28181.Storer, uni uniqueid.Core) gb28181.Core { - return gb28181.NewCore(store, uni) +func NewIPCCore(store ipc.Storer, uni uniqueid.Core, protocols map[string]port.Protocol) ipc.Core { + return ipc.NewCore(store, uni, protocols) } -func registerGB28181(g gin.IRouter, api GB28181API, handler ...gin.HandlerFunc) { +func registerGB28181(g gin.IRouter, api IPCAPI, handler ...gin.HandlerFunc) { + // GB28181 协议特有的回调接口 g.Any("/gb28181/snapshot", func(c *gin.Context) { b, err := io.ReadAll(c.Request.Body) if err != nil { panic(err) } - os.WriteFile(orm.GenerateRandomString(10)+".jpg", b, 0o644) + if err := os.WriteFile(orm.GenerateRandomString(10)+".jpg", b, 0o644); err != nil { + slog.ErrorContext(c.Request.Context(), "write cover", "err", err) + } c.JSON(200, gin.H{"msg": "ok"}) }) + + // 统一的设备管理 API(支持所有协议) { group := g.Group("/devices", handler...) - group.GET("", web.WrapH(api.findDevice)) // 设备列表 - group.GET("/:id", web.WrapH(api.getDevice)) // 设备详情 - group.PUT("/:id", web.WrapH(api.editDevice)) // 修改设备详情 - group.POST("", web.WrapH(api.addDevice)) // 添加设备 - group.DELETE("/:id", web.WrapH(api.delDevice)) // 删除设备 - group.POST("/:id/catalog", web.WrapH(api.queryCatalog)) // 刷新通道 - group.GET("/channels", web.WrapH(api.FindChannelsForDevice)) // 设备与通道列表 + group.GET("", web.WrapH(api.findDevice)) // 设备列表(所有协议) + group.GET("/:id", web.WrapH(api.getDevice)) // 设备详情(所有协议) + group.PUT("/:id", web.WrapH(api.editDevice)) // 修改设备(所有协议) + group.POST("", web.WrapH(api.addDevice)) // 添加设备(所有协议,通过 type 区分) + group.DELETE("/:id", web.WrapH(api.delDevice)) // 删除设备(所有协议) + group.GET("/channels", web.WrapH(api.FindChannelsForDevice)) // 设备与通道列表(所有协议) + + // GB28181 特有功能 + group.POST("/:id/catalog", web.WrapH(api.queryCatalog)) // 刷新通道(GB28181 特有) + } + { + group := g.Group("/onvif", handler...) + group.GET("/discover", api.discover) // ONVIF 设备发现(ONVIF 特有) } + // 统一的通道管理 API(支持所有协议) { group := g.Group("/channels", handler...) - group.GET("", web.WrapH(api.findChannel)) - group.PUT("/:id", web.WrapH(api.editChannel)) - group.POST("/:id/play", web.WrapH(api.play)) - - group.POST("/:id/snapshot", web.WrapH(api.refreshSnapshot)) // 图像抓拍 - group.GET("/:id/snapshot", api.getSnapshot) // 获取图像 - // group.GET("/:id", web.WrapH(api.getChannel)) - // group.POST("", web.WrapH(api.addChannel)) - // group.DELETE("/:id", web.WrapH(api.delChannel)) + group.GET("", web.WrapH(api.findChannel)) // 通道列表(所有协议) + group.PUT("/:id", web.WrapH(api.editChannel)) // 修改通道(所有协议) + group.POST("/:id/play", web.WrapH(api.play)) // 播放(所有协议) + group.POST("/:id/snapshot", web.WrapH(api.refreshSnapshot)) // 图像抓拍(所有协议) + group.GET("/:id/snapshot", api.getSnapshot) // 获取图像(所有协议) } } // >>> device >>>>>>>>>>>>>>>>>>>> -func (a GB28181API) findDevice(c *gin.Context, in *gb28181.FindDeviceInput) (any, error) { - items, total, err := a.gb28181Core.FindDevice(c.Request.Context(), in) +func (a IPCAPI) findDevice(c *gin.Context, in *ipc.FindDeviceInput) (any, error) { + items, total, err := a.ipc.FindDevice(c.Request.Context(), in) return gin.H{"items": items, "total": total}, err } -func (a GB28181API) getDevice(c *gin.Context, _ *struct{}) (any, error) { +func (a IPCAPI) getDevice(c *gin.Context, _ *struct{}) (any, error) { deviceID := c.Param("id") - return a.gb28181Core.GetDevice(c.Request.Context(), deviceID) + return a.ipc.GetDevice(c.Request.Context(), deviceID) } -func (a GB28181API) editDevice(c *gin.Context, in *gb28181.EditDeviceInput) (any, error) { +func (a IPCAPI) editDevice(c *gin.Context, in *ipc.EditDeviceInput) (any, error) { deviceID := c.Param("id") - return a.gb28181Core.EditDevice(c.Request.Context(), in, deviceID) + return a.ipc.EditDevice(c.Request.Context(), in, deviceID) } -func (a GB28181API) addDevice(c *gin.Context, in *gb28181.AddDeviceInput) (any, error) { - return a.gb28181Core.AddDevice(c.Request.Context(), in) +// addDevice 添加设备(支持所有协议类型) +// 通过 type 字段区分协议: "GB28181" 或 "ONVIF" +// +// 示例1 - 添加 GB28181 设备: +// +// POST /devices +// { "type": "GB28181", "device_id": "34020000001320000001", "name": "摄像头1" } +// +// 示例2 - 添加 ONVIF 设备: +// +// POST /devices +// { "type": "ONVIF", "ip": "192.168.1.100", "port": 80, "username": "admin", "password": "12345" } +func (a IPCAPI) addDevice(c *gin.Context, in *ipc.AddDeviceInput) (any, error) { + return a.ipc.AddDevice(c.Request.Context(), in) } -func (a GB28181API) delDevice(c *gin.Context, _ *struct{}) (any, error) { +func (a IPCAPI) delDevice(c *gin.Context, _ *struct{}) (any, error) { did := c.Param("id") - return a.gb28181Core.DelDevice(c.Request.Context(), did) + return a.ipc.DelDevice(c.Request.Context(), did) } -func (a GB28181API) queryCatalog(c *gin.Context, _ *struct{}) (any, error) { +func (a IPCAPI) queryCatalog(c *gin.Context, _ *struct{}) (any, error) { did := c.Param("id") if err := a.uc.SipServer.QueryCatalog(did); err != nil { return nil, ErrDevice.SetMsg(err.Error()) @@ -127,8 +153,8 @@ func (a GB28181API) queryCatalog(c *gin.Context, _ *struct{}) (any, error) { return gin.H{"msg": "ok"}, nil } -func (a GB28181API) FindChannelsForDevice(c *gin.Context, in *gb28181.FindDeviceInput) (any, error) { - items, total, err := a.gb28181Core.FindChannelsForDevice(c.Request.Context(), in) +func (a IPCAPI) FindChannelsForDevice(c *gin.Context, in *ipc.FindDeviceInput) (any, error) { + items, total, err := a.ipc.FindChannelsForDevice(c.Request.Context(), in) // 按照在线优先排序 sort.SliceStable(items, func(i, j int) bool { @@ -140,8 +166,8 @@ func (a GB28181API) FindChannelsForDevice(c *gin.Context, in *gb28181.FindDevice // >>> channel >>>>>>>>>>>>>>>>>>>> -func (a GB28181API) findChannel(c *gin.Context, in *gb28181.FindChannelInput) (any, error) { - items, total, err := a.gb28181Core.FindChannel(c.Request.Context(), in) +func (a IPCAPI) findChannel(c *gin.Context, in *ipc.FindChannelInput) (any, error) { + items, total, err := a.ipc.FindChannel(c.Request.Context(), in) return gin.H{"items": items, "total": total}, err } @@ -150,9 +176,9 @@ func (a GB28181API) findChannel(c *gin.Context, in *gb28181.FindChannelInput) (a // return a.gb28181Core.GetChannel(c.Request.Context(), channelID) // } -func (a GB28181API) editChannel(c *gin.Context, in *gb28181.EditChannelInput) (any, error) { +func (a IPCAPI) editChannel(c *gin.Context, in *ipc.EditChannelInput) (any, error) { cid := c.Param("id") - return a.gb28181Core.EditChannel(c.Request.Context(), in, cid) + return a.ipc.EditChannel(c.Request.Context(), in, cid) } // func (a GB28181API) addChannel(c *gin.Context, in *gb28181.AddChannelInput) (any, error) { @@ -164,7 +190,7 @@ func (a GB28181API) editChannel(c *gin.Context, in *gb28181.EditChannelInput) (a // return a.gb28181Core.DelChannel(c.Request.Context(), channelID) // } -func (a GB28181API) play(c *gin.Context, _ *struct{}) (*playOutput, error) { +func (a IPCAPI) play(c *gin.Context, _ *struct{}) (*playOutput, error) { channelID := c.Param("id") var app, appStream, host, stream, session, mediaServerID string @@ -176,7 +202,7 @@ func (a GB28181API) play(c *gin.Context, _ *struct{}) (*playOutput, error) { return nil, reason.ErrUsedLogic.SetMsg("请先配置流媒体 SDP 收流地址") } // a.uc.SipServer. - ch, err := a.gb28181Core.GetChannel(c.Request.Context(), channelID) + ch, err := a.ipc.GetChannel(c.Request.Context(), channelID) if err != nil { return nil, err } @@ -272,15 +298,22 @@ func (a GB28181API) play(c *gin.Context, _ *struct{}) (*playOutput, error) { // 取一张快照 go func() { - body, err := a.uc.SMSAPI.smsCore.GetSnapshot(svr, zlm.GetSnapRequest{ - URL: out.Items[0].RTSP, - TimeoutSec: 10, - ExpireSec: 15, - }) - if err != nil { - slog.ErrorContext(c.Request.Context(), "get snapshot", "err", err) - } else { - writeCover(a.uc.Conf.ConfigDir, channelID, body) + for range 2 { + time.Sleep(5 * time.Second) + rtsp := fmt.Sprintf("rtsp://%s:%d/%s", "127.0.0.1", svr.Ports.RTSP, stream) + "?" + session + body, err := a.uc.SMSAPI.smsCore.GetSnapshot(svr, zlm.GetSnapRequest{ + URL: rtsp, + TimeoutSec: 10, + ExpireSec: 15, + }) + if err != nil { + slog.ErrorContext(c.Request.Context(), "get snapshot", "err", err) + continue + } + if err := writeCover(a.uc.Conf.ConfigDir, channelID, body); err != nil { + slog.ErrorContext(c.Request.Context(), "write cover", "err", err) + } + break } }() return &out, nil @@ -293,7 +326,7 @@ type refreshSnapshotInput struct { URL string `json:"url"` } -func (a GB28181API) refreshSnapshot(c *gin.Context, in *refreshSnapshotInput) (any, error) { +func (a IPCAPI) refreshSnapshot(c *gin.Context, in *refreshSnapshotInput) (any, error) { channelID := c.Param("id") path := readCoverPath(a.uc.Conf.ConfigDir, channelID) @@ -329,7 +362,9 @@ func (a GB28181API) refreshSnapshot(c *gin.Context, in *refreshSnapshotInput) (a // return nil, reason.ErrBadRequest.Msg(err.Error()) } else { if hook.MD5FromBytes(img) != "" { - writeCover(a.uc.Conf.ConfigDir, channelID, img) + if err := writeCover(a.uc.Conf.ConfigDir, channelID, img); err != nil { + slog.ErrorContext(c.Request.Context(), "write cover", "err", err) + } } } } @@ -337,12 +372,68 @@ func (a GB28181API) refreshSnapshot(c *gin.Context, in *refreshSnapshotInput) (a return gin.H{"link": fmt.Sprintf("%s/channels/%s/snapshot?token=%s", prefix, channelID, token)}, nil } -func (a GB28181API) getSnapshot(c *gin.Context) { +func (a IPCAPI) getSnapshot(c *gin.Context) { channelID := c.Param("id") body, err := readCover(a.uc.Conf.ConfigDir, channelID) if err != nil { - reason.ErrNotFound.SetMsg(err.Error()) + web.Fail(c, reason.ErrNotFound.SetMsg(err.Error())) return } c.Data(200, "image/jpeg", body) } + +type DiscoverResponse struct { + Addr string `json:"addr"` +} + +func toDiscoverResponse(dev *onvif.Device) *DiscoverResponse { + addr := dev.GetDeviceParams().Xaddr + if !strings.Contains(addr, ":") { + addr += ":80" + } + return &DiscoverResponse{ + Addr: addr, + } +} + +func (a IPCAPI) discover(c *gin.Context) { + recv, cancel, err := onvif.AllAvailableDevicesAtSpecificEthernetInterfaces() + if err != nil { + web.Fail(c, err) + return + } + defer cancel() + + se := web.NewSSE(64, time.Minute) + go func() { + defer func() { + se.Publish(web.Event{ + ID: uuid.NewString(), + Event: "end", + }) + se.Close() + }() + for { + select { + case dev := <-recv: + if dev == nil { + return + } + // TODO: 已经添加的设备需要过滤掉 + b, _ := json.Marshal(toDiscoverResponse(dev)) + se.Publish(web.Event{ + ID: uuid.NewString(), + Event: "discover", + Data: b, + }) + time.Sleep(time.Millisecond * 200) + case <-c.Request.Context().Done(): + return + case <-time.After(3 * time.Second): + slog.DebugContext(c.Request.Context(), "discover timeout") + return + } + } + }() + se.ServeHTTP(c.Writer, c.Request) +} diff --git a/internal/web/api/provider.go b/internal/web/api/provider.go index f71070e..1dc8dbb 100644 --- a/internal/web/api/provider.go +++ b/internal/web/api/provider.go @@ -6,11 +6,13 @@ import ( "github.com/gin-gonic/gin" "github.com/google/wire" "github.com/gowvp/gb28181/internal/conf" - "github.com/gowvp/gb28181/internal/core/gb28181" - "github.com/gowvp/gb28181/internal/core/gb28181/store/gb28181cache" - "github.com/gowvp/gb28181/internal/core/gb28181/store/gb28181db" + "github.com/gowvp/gb28181/internal/core/ipc" + "github.com/gowvp/gb28181/internal/core/ipc/store/ipccache" + "github.com/gowvp/gb28181/internal/core/ipc/store/ipcdb" + "github.com/gowvp/gb28181/internal/core/port" "github.com/gowvp/gb28181/internal/core/push" "github.com/gowvp/gb28181/internal/core/push/store/pushdb" + onvifadapter "github.com/gowvp/gb28181/internal/protocol/onvif" "github.com/gowvp/gb28181/pkg/gbs" "github.com/ixugo/goddd/domain/uniqueid" "github.com/ixugo/goddd/domain/uniqueid/store/uniqueiddb" @@ -31,10 +33,7 @@ var ( NewUniqueID, NewPushCore, NewPushAPI, gbs.NewServer, - NewGB28181Store, - NewGB28181API, - NewGB28181Core, - NewGB28181, + NewIPCStore, NewProtocols, NewIPCCore, NewIPCAPI, NewGBAdapter, NewProxyAPI, NewConfigAPI, NewUserAPI, @@ -49,7 +48,7 @@ type Usecase struct { WebHookAPI WebHookAPI UniqueID uniqueid.Core MediaAPI PushAPI - GB28181API GB28181API + GB28181API IPCAPI ProxyAPI ProxyAPI ConfigAPI ConfigAPI @@ -79,7 +78,7 @@ func NewHTTPHandler(uc *Usecase) http.Handler { } setupRouter(g, uc) // 设置路由处理函数 - + uc.Version.RecordVersion() return g // 返回配置好的 Gin 实例作为 http.Handler } @@ -92,13 +91,26 @@ func NewPushCore(db *gorm.DB, uni uniqueid.Core) push.Core { return push.NewCore(pushdb.NewDB(db).AutoMigrate(orm.GetEnabledAutoMigrate()), uni) } -func NewGB28181Store(db *gorm.DB) gb28181.Storer { - return gb28181cache.NewCache(gb28181db.NewDB(db).AutoMigrate(orm.GetEnabledAutoMigrate())) +func NewIPCStore(db *gorm.DB) ipc.Storer { + return ipccache.NewCache(ipcdb.NewDB(db).AutoMigrate(orm.GetEnabledAutoMigrate())) } -func NewGB28181(store gb28181.Storer, uni uniqueid.Core) gb28181.GB28181 { - return gb28181.NewGB28181( +func NewGBAdapter(store ipc.Storer, uni uniqueid.Core) ipc.GBDAdapter { + return ipc.NewGBAdapter( store, uni, ) } + +// NewProtocols 创建协议适配器映射 +func NewProtocols(store ipc.Storer) map[string]port.Protocol { + protocols := make(map[string]port.Protocol) + + // 注册 ONVIF 协议适配器 + protocols["ONVIF"] = onvifadapter.NewAdapter(store) + + // TODO: 注册 GB28181 协议适配器 + // protocols["GB28181"] = gb28181adapter.NewAdapter(sipServer, store) + + return protocols +} diff --git a/internal/web/api/proxy.go b/internal/web/api/proxy.go index 26e4933..b9a8fb0 100755 --- a/internal/web/api/proxy.go +++ b/internal/web/api/proxy.go @@ -16,7 +16,7 @@ type ProxyAPI struct { } func NewProxyAPI(db *gorm.DB, uni uniqueid.Core) ProxyAPI { - core := proxy.NewCore(proxydb.NewDB(db).AutoMigrate(orm.EnabledAutoMigrate), uni) + core := proxy.NewCore(proxydb.NewDB(db).AutoMigrate(orm.GetEnabledAutoMigrate()), uni) return ProxyAPI{proxyCore: core} } diff --git a/internal/web/api/zlm_webhook.go b/internal/web/api/zlm_webhook.go index 65d2200..0af6380 100644 --- a/internal/web/api/zlm_webhook.go +++ b/internal/web/api/zlm_webhook.go @@ -9,7 +9,7 @@ import ( "github.com/gin-gonic/gin" "github.com/gowvp/gb28181/internal/conf" "github.com/gowvp/gb28181/internal/core/bz" - "github.com/gowvp/gb28181/internal/core/gb28181" + gb28181 "github.com/gowvp/gb28181/internal/core/ipc" "github.com/gowvp/gb28181/internal/core/push" "github.com/gowvp/gb28181/internal/core/sms" "github.com/gowvp/gb28181/pkg/gbs" @@ -204,12 +204,12 @@ type RTPStream struct { } func (r RTPStream) onStreamNotFound(ctx context.Context, in *onStreamNotFoundInput) error { - ch, err := r.uc.GB28181API.gb28181Core.GetChannel(ctx, in.Stream) + ch, err := r.uc.GB28181API.ipc.GetChannel(ctx, in.Stream) if err != nil { return err } - dev, err := r.uc.GB28181API.gb28181Core.GetDevice(ctx, ch.DID) + dev, err := r.uc.GB28181API.ipc.GetDevice(ctx, ch.DID) if err != nil { return err } diff --git a/pkg/gbs/devices.go b/pkg/gbs/devices.go index 77d741f..47f03be 100644 --- a/pkg/gbs/devices.go +++ b/pkg/gbs/devices.go @@ -8,7 +8,7 @@ import ( "sync" "time" - "github.com/gowvp/gb28181/internal/core/gb28181" + "github.com/gowvp/gb28181/internal/core/ipc" "github.com/gowvp/gb28181/pkg/gbs/m" "github.com/gowvp/gb28181/pkg/gbs/sip" "github.com/ixugo/goddd/pkg/conc" @@ -43,7 +43,7 @@ type Device struct { keepaliveTimeout uint16 } -func NewDevice(conn sip.Connection, d *gb28181.Device) *Device { +func NewDevice(conn sip.Connection, d *ipc.Device) *Device { uri, err := sip.ParseURI(fmt.Sprintf("sip:%s@%s", d.DeviceID, d.Address)) if err != nil { slog.Error("parse uri", "err", err, "did", d.ID) @@ -89,7 +89,7 @@ func (d *Device) CheckConnection() error { return nil } -func (d *Device) LoadChannels(channels ...*gb28181.Channel) { +func (d *Device) LoadChannels(channels ...*ipc.Channel) { for _, channel := range channels { ch := Channel{ ChannelID: channel.ChannelID, diff --git a/pkg/gbs/info.go b/pkg/gbs/info.go index 5b19769..cfd64bf 100644 --- a/pkg/gbs/info.go +++ b/pkg/gbs/info.go @@ -3,7 +3,7 @@ package gbs import ( "encoding/hex" - "github.com/gowvp/gb28181/internal/core/gb28181" + "github.com/gowvp/gb28181/internal/core/ipc" "github.com/gowvp/gb28181/pkg/gbs/sip" ) @@ -43,14 +43,14 @@ func (g GB28181API) sipMessageDeviceInfo(ctx *sip.Context) { return } - if err := g.core.Edit(ctx.DeviceID, func(d *gb28181.Device) { + if err := g.core.Edit(ctx.DeviceID, func(d *ipc.Device) { d.Ext.Firmware = msg.Firmware d.Ext.Manufacturer = msg.Manufacturer d.Ext.Model = msg.Model d.Ext.Name = msg.DeviceName d.Address = ctx.Source.String() - d.Trasnport = ctx.Source.Network() + d.Transport = ctx.Source.Network() }); err != nil { ctx.Log.Error("Edit", "err", err) ctx.String(500, ErrDatabase.Error()) diff --git a/pkg/gbs/keepalive.go b/pkg/gbs/keepalive.go index 79d019f..d36aa19 100644 --- a/pkg/gbs/keepalive.go +++ b/pkg/gbs/keepalive.go @@ -1,7 +1,7 @@ package gbs import ( - "github.com/gowvp/gb28181/internal/core/gb28181" + "github.com/gowvp/gb28181/internal/core/ipc" "github.com/gowvp/gb28181/pkg/gbs/sip" "github.com/ixugo/goddd/pkg/orm" // "github.com/panjjo/gosip/db" @@ -31,11 +31,11 @@ func (g *GB28181API) sipMessageKeepalive(ctx *sip.Context) { to: ctx.To, }) - if err := g.svr.memoryStorer.Change(ctx.DeviceID, func(d *gb28181.Device) { + if err := g.svr.memoryStorer.Change(ctx.DeviceID, func(d *ipc.Device) { d.KeepaliveAt = orm.Now() d.IsOnline = msg.Status == "OK" || msg.Status == "ON" d.Address = ctx.Source.String() - d.Trasnport = ctx.Source.Network() + d.Transport = ctx.Source.Network() }, func(d *Device) { d.conn = ctx.Request.GetConnection() d.source = ctx.Source diff --git a/pkg/gbs/play.go b/pkg/gbs/play.go index 6f6cd7e..936691c 100644 --- a/pkg/gbs/play.go +++ b/pkg/gbs/play.go @@ -7,7 +7,7 @@ import ( "strings" "sync" - "github.com/gowvp/gb28181/internal/core/gb28181" + "github.com/gowvp/gb28181/internal/core/ipc" "github.com/gowvp/gb28181/internal/core/sms" "github.com/gowvp/gb28181/pkg/gbs/m" "github.com/gowvp/gb28181/pkg/gbs/sip" @@ -16,13 +16,13 @@ import ( ) type PlayInput struct { - Channel *gb28181.Channel + Channel *ipc.Channel SMS *sms.MediaServer StreamMode int8 } type StopPlayInput struct { - Channel *gb28181.Channel + Channel *ipc.Channel } // stopPlay 不加锁的 @@ -158,7 +158,7 @@ func GetIP(input string) (string, error) { return ips[0].String(), nil } - slog.Error("域名没有解析到任何IP地址", "域名", input) + slog.Error("域名没有解析到任何IP地址", "域名", input) return input, fmt.Errorf("域名没有解析到IP地址") } @@ -196,11 +196,10 @@ func (g *GB28181API) sipPlayPush2(ch *Channel, in *PlayInput, port int, stream * video.AddAttribute("rtpmap", "97", "MPEG4/90000") video.AddAttribute("rtpmap", "98", "H264/90000") - //获取配置值 + // 获取配置值 ipstr := in.SMS.GetSDPIP() - //进行IP解析 + // 进行IP解析 ip4str, err := GetIP(ipstr) - if err != nil { slog.Error("域名解析失败", "域名", ipstr, "错误", err) return err diff --git a/pkg/gbs/record.go b/pkg/gbs/record.go index d48d438..b57d712 100644 --- a/pkg/gbs/record.go +++ b/pkg/gbs/record.go @@ -139,7 +139,6 @@ type RecordDate struct { Items []RecordInfo `json:"items"` } -// RecordInfo RecordInfo type RecordInfo struct { Start int64 `json:"start" bson:"start"` End int64 `json:"end" bson:"end"` diff --git a/pkg/gbs/register.go b/pkg/gbs/register.go index b016cd6..90a1f44 100644 --- a/pkg/gbs/register.go +++ b/pkg/gbs/register.go @@ -9,7 +9,7 @@ import ( "unicode" "github.com/gowvp/gb28181/internal/conf" - "github.com/gowvp/gb28181/internal/core/gb28181" + gb28181 "github.com/gowvp/gb28181/internal/core/ipc" "github.com/gowvp/gb28181/internal/core/sms" "github.com/gowvp/gb28181/pkg/gbs/sip" "github.com/ixugo/goddd/pkg/conc" @@ -20,7 +20,7 @@ const ignorePassword = "#" type GB28181API struct { cfg *conf.SIP - core gb28181.GB28181 + core gb28181.GBDAdapter catalog *sip.Collector[Channels] @@ -32,7 +32,7 @@ type GB28181API struct { sms *sms.NodeManager } -func NewGB28181API(cfg *conf.Bootstrap, store gb28181.GB28181, sms *sms.NodeManager) *GB28181API { +func NewGB28181API(cfg *conf.Bootstrap, store gb28181.GBDAdapter, sms *sms.NodeManager) *GB28181API { g := GB28181API{ cfg: &cfg.Sip, core: store, @@ -130,14 +130,16 @@ func (g *GB28181API) handlerRegister(ctx *sip.Context) { if dev.Password == ignorePassword { password = "" } + + hdrs := ctx.Request.GetHeaders("Authorization") + if len(hdrs) == 0 { + resp := sip.NewResponseFromRequest("", ctx.Request, http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized), nil) + resp.AppendHeader(&sip.GenericHeader{HeaderName: "WWW-Authenticate", Contents: fmt.Sprintf(`Digest realm="%s",qop="auth",nonce="%s"`, g.cfg.Domain, sip.RandString(32))}) + _ = ctx.Tx.Respond(resp) + return + } + if password != "" { - hdrs := ctx.Request.GetHeaders("Authorization") - if len(hdrs) == 0 { - resp := sip.NewResponseFromRequest("", ctx.Request, http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized), nil) - resp.AppendHeader(&sip.GenericHeader{HeaderName: "WWW-Authenticate", Contents: fmt.Sprintf("Digest nonce=\"%s\", algorithm=MD5, realm=\"%s\",qop=\"auth\"", sip.RandString(32), g.cfg.Domain)}) - _ = ctx.Tx.Respond(resp) - return - } authenticateHeader := hdrs[0].(*sip.GenericHeader) auth := sip.AuthFromValue(authenticateHeader.Contents) auth.SetPassword(password) @@ -170,10 +172,19 @@ func (g *GB28181API) handlerRegister(ctx *sip.Context) { respFn() return } - g.login(ctx, expire) + + g.login(ctx, func(b *gb28181.Device) { + b.IsOnline = true + b.RegisteredAt = orm.Now() + b.KeepaliveAt = orm.Now() + b.Expires, _ = strconv.Atoi(expire) + b.Address = ctx.Source.String() + b.Transport = ctx.Source.Network() + b.Ext.GBVersion = ctx.XGBVer + }) // conn := ctx.Request.GetConnection() - // fmt.Printf(">>> %p\n", conn) + // fmt.Printf(">>> %p\n", conn ctx.Log.Info("设备注册成功") // ctx.Log.Debug("device info", "source", ctx.Source, "host", ctx.Host) @@ -185,14 +196,9 @@ func (g *GB28181API) handlerRegister(ctx *sip.Context) { _ = g.QueryConfigDownloadBasic(dev.DeviceID) } -func (g GB28181API) login(ctx *sip.Context, expire string) { +func (g GB28181API) login(ctx *sip.Context, fn func(d *gb28181.Device)) { slog.Info("status change 设备上线", "device_id", ctx.DeviceID) - g.svr.memoryStorer.Change(ctx.DeviceID, func(d *gb28181.Device) { - d.IsOnline = true - d.RegisteredAt = orm.Now() - d.KeepaliveAt = orm.Now() - d.Expires, _ = strconv.Atoi(expire) - }, func(d *Device) { + g.svr.memoryStorer.Change(ctx.DeviceID, fn, func(d *Device) { d.conn = ctx.Request.GetConnection() d.source = ctx.Source d.to = ctx.To diff --git a/pkg/gbs/server.go b/pkg/gbs/server.go index 121345c..0833c6b 100644 --- a/pkg/gbs/server.go +++ b/pkg/gbs/server.go @@ -11,12 +11,12 @@ import ( "time" "github.com/gowvp/gb28181/internal/conf" - "github.com/gowvp/gb28181/internal/core/gb28181" + gb28181 "github.com/gowvp/gb28181/internal/core/ipc" "github.com/gowvp/gb28181/internal/core/sms" "github.com/gowvp/gb28181/pkg/gbs/m" "github.com/gowvp/gb28181/pkg/gbs/sip" "github.com/ixugo/goddd/pkg/conc" - "github.com/ixugo/goddd/pkg/system" + "github.com/ixugo/netpulse/ip" ) type MemoryStorer interface { @@ -42,11 +42,11 @@ type Server struct { memoryStorer MemoryStorer } -func NewServer(cfg *conf.Bootstrap, store gb28181.GB28181, sc sms.Core) (*Server, func()) { +func NewServer(cfg *conf.Bootstrap, store gb28181.GBDAdapter, sc sms.Core) (*Server, func()) { api := NewGB28181API(cfg, store, sc.NodeManager) - ip := system.LocalIP() - uri, _ := sip.ParseSipURI(fmt.Sprintf("sip:%s@%s:%d", cfg.Sip.ID, ip, cfg.Sip.Port)) + iip := ip.InternalIP() + uri, _ := sip.ParseSipURI(fmt.Sprintf("sip:%s@%s:%d", cfg.Sip.ID, iip, cfg.Sip.Port)) from := sip.Address{ DisplayName: sip.String{Str: "gowvp"}, URI: &uri, @@ -61,7 +61,6 @@ func NewServer(cfg *conf.Bootstrap, store gb28181.GB28181, sc sms.Core) (*Server msg.Handle("DeviceInfo", api.sipMessageDeviceInfo) msg.Handle("ConfigDownload", api.sipMessageConfigDownload) msg.Handle("DeviceConfig", api.handleDeviceConfig) - // msg.Handle("RecordInfo", api.handlerMessage) c := Server{ diff --git a/pkg/gbs/sip/context.go b/pkg/gbs/sip/context.go index 01c4059..20b8222 100644 --- a/pkg/gbs/sip/context.go +++ b/pkg/gbs/sip/context.go @@ -30,7 +30,8 @@ type Context struct { Log *slog.Logger - svr *Server + svr *Server + XGBVer string } func newContext(req *Request, tx *Transaction) *Context { @@ -76,6 +77,23 @@ func (c *Context) parserRequest() error { slog.Error(">>>>>>>> to is nil", "header", header) } + xgbVer := req.GetHeaders("X-GB-Ver") + if len(xgbVer) > 0 { + h := xgbVer[0] + parts := strings.Split(h.String(), ":") + if len(parts) == 2 { + switch strings.TrimSpace(parts[1]) { + case "3.0": + c.XGBVer = "2022" + case "2.0": + c.XGBVer = "2016" + case "1.0": + c.XGBVer = "2011" + } + } + + } + c.Log = slog.Default().With("deviceID", c.DeviceID, "host", c.Host) return nil } diff --git a/pkg/gbs/sip/header.go b/pkg/gbs/sip/header.go index 3b6e52e..f33a839 100644 --- a/pkg/gbs/sip/header.go +++ b/pkg/gbs/sip/header.go @@ -26,6 +26,7 @@ type HeadersBuilder struct { maxForwards *MaxForwards allow *AllowHeader supported *SupportedHeader + XGBVer *XGBVer // recipient *URI } @@ -34,6 +35,7 @@ func NewHeaderBuilder() *HeadersBuilder { callID := CallID(RandString(32)) maxForwards := MaxForwards(70) userAgent := UserAgentHeader("GoWVP") + xgbVer := XGBVer("3.0") return &HeadersBuilder{ protocol: "SIP", protocolVersion: "2.0", @@ -47,6 +49,7 @@ func NewHeaderBuilder() *HeadersBuilder { generic: make(map[string]Header), allow: defaultAllowMethods, supported: &SupportedHeader{Options: []string{}}, + XGBVer: &xgbVer, } } @@ -89,6 +92,9 @@ func (hb *HeadersBuilder) Build() []Header { if hb.contentType != nil { hdrs = append(hdrs, hb.contentType) } + if hb.XGBVer != nil { + hdrs = append(hdrs, hb.XGBVer) + } // for _, header := range hb.generic { // hdrs = append(hdrs, header) @@ -96,6 +102,12 @@ func (hb *HeadersBuilder) Build() []Header { return hdrs } +func (hb *HeadersBuilder) SetXGBVer() *HeadersBuilder { + s := XGBVer("3.0") + hb.XGBVer = &s + return hb +} + // SetMethod SetMethod func (hb *HeadersBuilder) SetMethod(method string) *HeadersBuilder { hb.method = method @@ -1667,3 +1679,22 @@ func (header *GenericHeader) Equals(other interface{}) bool { return false } + +type XGBVer string + +func (ct XGBVer) String() string { return "X-GB-Ver: " + string(ct) } + +// Name Name +func (ct XGBVer) Name() string { return "X-GB-Ver" } + +// Clone Clone +func (ct XGBVer) Clone() Header { return &ct } + +// Equals Equals +func (ct *XGBVer) Equals(other interface{}) bool { + h, ok := other.(XGBVer) + if !ok { + return false + } + return h.String() == ct.String() +} diff --git a/pkg/gbs/sip/request.go b/pkg/gbs/sip/request.go index 55e6984..ba9595c 100644 --- a/pkg/gbs/sip/request.go +++ b/pkg/gbs/sip/request.go @@ -33,6 +33,7 @@ func NewRequest( req.SetSipVersion(sipVersion) req.startLine = req.StartLine req.headers = newHeaders(hdrs) + req.SetMethod(method) req.SetRecipient(recipient) @@ -43,26 +44,26 @@ func NewRequest( } // NewRequestFromResponse NewRequestFromResponse -func NewRequestFromResponse(method string, inviteResponse *Response) *Request { - contact, _ := inviteResponse.Contact() +func NewRequestFromResponse(method string, resp *Response) *Request { + contact, _ := resp.Contact() ackRequest := NewRequest( - inviteResponse.MessageID(), + resp.MessageID(), method, contact.Address, - inviteResponse.SipVersion(), + resp.SipVersion(), []Header{}, []byte{}, ) - CopyHeaders("Via", inviteResponse, ackRequest) + CopyHeaders("Via", resp, ackRequest) viaHop, _ := ackRequest.ViaHop() // update branch, 2xx ACK is separate Tx viaHop.Params.Add("branch", String{Str: GenerateBranch()}) - if len(inviteResponse.GetHeaders("Route")) > 0 { - CopyHeaders("Route", inviteResponse, ackRequest) + if len(resp.GetHeaders("Route")) > 0 { + CopyHeaders("Route", resp, ackRequest) } else { - for _, h := range inviteResponse.GetHeaders("Record-Route") { + for _, h := range resp.GetHeaders("Record-Route") { uris := make([]*URI, 0) for _, u := range h.(*RecordRouteHeader).Addresses { uris = append(uris, u.Clone()) @@ -73,10 +74,10 @@ func NewRequestFromResponse(method string, inviteResponse *Response) *Request { } } - CopyHeaders("From", inviteResponse, ackRequest) - CopyHeaders("To", inviteResponse, ackRequest) - CopyHeaders("Call-ID", inviteResponse, ackRequest) - cseq, _ := inviteResponse.CSeq() + CopyHeaders("From", resp, ackRequest) + CopyHeaders("To", resp, ackRequest) + CopyHeaders("Call-ID", resp, ackRequest) + cseq, _ := resp.CSeq() cseq.MethodName = method // https://www.rfc-editor.org/rfc/rfc3261.html#section-12.2.1.1 @@ -92,8 +93,8 @@ func NewRequestFromResponse(method string, inviteResponse *Response) *Request { cseq.SeqNo++ } ackRequest.AppendHeader(cseq) - ackRequest.SetSource(inviteResponse.Destination()) - ackRequest.SetDestination(inviteResponse.Source()) + ackRequest.SetSource(resp.Destination()) + ackRequest.SetDestination(resp.Source()) return ackRequest } diff --git a/pkg/gbs/sip/response.go b/pkg/gbs/sip/response.go index 65d3f81..3496ecb 100644 --- a/pkg/gbs/sip/response.go +++ b/pkg/gbs/sip/response.go @@ -34,9 +34,16 @@ func NewResponseFromRequest( CopyHeaders("Record-Route", req, res) CopyHeaders("Via", req, res) CopyHeaders("From", req, res) - CopyHeaders("To", req, res) - CopyHeaders("Call-ID", req, res) + to, ok := req.To() + if ok { + if _, ok := to.Params.Get("tag"); !ok { + to.Params.Add("tag", String{Str: RandString(32)}) + } + } CopyHeaders("CSeq", req, res) + CopyHeaders("Call-ID", req, res) + + res.AppendHeader(to) if statusCode == 100 { CopyHeaders("Timestamp", req, res) @@ -45,9 +52,12 @@ func NewResponseFromRequest( res.SetSource(req.Destination()) res.SetDestination(req.Source()) - if len(body) > 0 { - res.SetBody(body, true) - } + res.SetBody(body, true) + + res.AppendHeader(&GenericHeader{ + HeaderName: "User-Agent", + Contents: "GoWVP/1.0", + }) return res } diff --git a/pkg/gbs/stream.go b/pkg/gbs/stream.go index 4c8a59d..18522ee 100644 --- a/pkg/gbs/stream.go +++ b/pkg/gbs/stream.go @@ -64,18 +64,18 @@ var StreamList streamsList func (g *GB28181API) getSSRC(t int) string { r := false - for { - StreamList.ssrc++ - // ssrc最大为四位数,超过时从1开始重新计算 - if StreamList.ssrc > 9000 && !r { - StreamList.ssrc = 0 - r = true - } - key := fmt.Sprintf("%d%s%04d", t, g.cfg.Domain[3:8], StreamList.ssrc) - // stream := Streams{StreamID: ssrc2stream(key), Stop: false} - // if err := db.Get(db.DBClient, &stream); db.RecordNotFound(err) || stream.CreatedAt == 0 { - return key + // for { + StreamList.ssrc++ + // ssrc最大为四位数,超过时从1开始重新计算 + if StreamList.ssrc > 9000 && !r { + StreamList.ssrc = 0 + r = true } + key := fmt.Sprintf("%d%s%04d", t, g.cfg.Domain[3:8], StreamList.ssrc) + // stream := Streams{StreamID: ssrc2stream(key), Stop: false} + // if err := db.Get(db.DBClient, &stream); db.RecordNotFound(err) || stream.CreatedAt == 0 { + return key + // } } // 定时检查未关闭的流