代码重构

This commit is contained in:
xugo
2025-11-21 21:52:47 +08:00
parent 530d36ff4d
commit 08c988f4cc
44 changed files with 719 additions and 233 deletions
+5 -2
View File
@@ -14,7 +14,7 @@ go wvp 是 Go 语言实现的开源 GB28181 解决方案,基于 GB28181-2022
## 在线演示平台
+ [在线演示平台 :) (服务器已过期,暂不提供演示)](http://wvp.golang.space:15123/)
+ [在线演示平台 :) ](http://gowvp.golang.space:15123/)
![](./docs/demo/play.gif)
@@ -35,7 +35,7 @@ go wvp 是 Go 语言实现的开源 GB28181 解决方案,基于 GB28181-2022
## 开源库
感谢 @panjjo 大佬的开源库 [panjjo/gosip](https://github.com/panjjo/gosip),GoWVP 的 sip 信令基于此库,出于底层封装需要,并非直接 go mod 依赖该项目,而是源代码放到了 pkg 包中。
感谢 @panjjo 大佬的开源库 [panjjo/gosip](https://github.com/panjjo/gosip),GoWVP 的 sip 信令基于此库,出于底层封装需要,并非直接依赖该项目,而是源代码放到了 pkg 包中。
流媒体服务基于@夏楚 [ZLMediaKit](https://github.com/ZLMediaKit/ZLMediaKit)
@@ -102,8 +102,11 @@ zlm 能否访问到 gowvp?? docker 合并版本填写 127.0.0.1 即可,分离
在反向代理那里配置以下参数,其中域名根据实际的填写
proxy_set_header X-Forwarded-Host $host;
proxy_set_header X-Forwarded-Prefix "https://gowvp.com";
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
+1 -1
View File
@@ -35,7 +35,7 @@ func Run(bc *conf.Bootstrap) {
go setupZLM(ctx, bc.ConfigDir)
// 如果需要执行表迁移,递增此版本号和表更新说明
versionapi.DBVersion = "0.0.12"
versionapi.DBVersion = "0.0.14"
versionapi.DBRemark = "add stream proxy"
handler, cleanUp, err := wireApp(bc, log)
+10 -10
View File
@@ -7,14 +7,13 @@
package app
import (
"log/slog"
"net/http"
"github.com/gowvp/gb28181/internal/conf"
"github.com/gowvp/gb28181/internal/data"
"github.com/gowvp/gb28181/internal/web/api"
"github.com/gowvp/gb28181/pkg/gbs"
"github.com/ixugo/goddd/domain/version/versionapi"
"log/slog"
"net/http"
)
// Injectors from wire.go:
@@ -30,13 +29,14 @@ func wireApp(bc *conf.Bootstrap, log *slog.Logger) (http.Handler, func(), error)
smsAPI := api.NewSmsAPI(smsCore)
uniqueidCore := api.NewUniqueID(db)
pushCore := api.NewPushCore(db, uniqueidCore)
storer := api.NewGB28181Store(db)
gb28181 := api.NewGB28181(storer, uniqueidCore)
server, cleanup := gbs.NewServer(bc, gb28181, smsCore)
gb28181Core := api.NewGB28181Core(storer, uniqueidCore)
webHookAPI := api.NewWebHookAPI(smsCore, pushCore, bc, server, gb28181Core)
storer := api.NewIPCStore(db)
gbdAdapter := api.NewGBAdapter(storer, uniqueidCore)
server, cleanup := gbs.NewServer(bc, gbdAdapter, smsCore)
v := api.NewProtocols(storer)
ipcCore := api.NewIPCCore(storer, uniqueidCore, v)
webHookAPI := api.NewWebHookAPI(smsCore, pushCore, bc, server, ipcCore)
pushAPI := api.NewPushAPI(pushCore, smsCore, bc)
gb28181API := api.NewGB28181API(gb28181Core)
ipcapi := api.NewIPCAPI(ipcCore)
proxyAPI := api.NewProxyAPI(db, uniqueidCore)
configAPI := api.NewConfigAPI(db, bc)
userAPI := api.NewUserAPI(bc)
@@ -48,7 +48,7 @@ func wireApp(bc *conf.Bootstrap, log *slog.Logger) (http.Handler, func(), error)
WebHookAPI: webHookAPI,
UniqueID: uniqueidCore,
MediaAPI: pushAPI,
GB28181API: gb28181API,
GB28181API: ipcapi,
ProxyAPI: proxyAPI,
ConfigAPI: configAPI,
SipServer: server,
+1
View File
@@ -2,6 +2,7 @@ package bz
const (
IDPrefixGB = "gb" // 国标设备
IDPrefixOnvif = "on" // 国标设备
IDPrefixGBChannel = "ch" // 国标通道 id 前缀
IDPrefixRTMP = "mp" // rtmp ID 前缀,取 rtmp 后缀的 mp,不好记但是清晰
IDPrefixRTSP = "sp" // rtsp ID 前缀,取 rtsp 后缀的 sp,不好记但是清晰
+1 -1
View File
@@ -10,7 +10,7 @@ type Ext struct {
// Scan implements orm.Scaner.
func (i *Ext) Scan(input interface{}) error {
return orm.JsonUnmarshal(input, i)
return orm.JSONUnmarshal(input, i)
}
type SIPConfig struct {
-21
View File
@@ -1,21 +0,0 @@
// Code generated by godddx, DO AVOID EDIT.
package gb28181
import "github.com/ixugo/goddd/domain/uniqueid"
// Storer data persistence
type Storer interface {
Device() DeviceStorer
Channel() ChannelStorer
}
// Core business domain
type Core struct {
store Storer
uniqueID uniqueid.Core
}
// NewCore create business domain
func NewCore(store Storer, uni uniqueid.Core) Core {
return Core{store: store, uniqueID: uni}
}
@@ -1,5 +1,5 @@
// Code generated by godddx, DO AVOID EDIT.
package gb28181
package ipc
import (
"context"
@@ -1,5 +1,5 @@
// Code generated by godddx, DO AVOID EDIT.
package gb28181
package ipc
import "github.com/ixugo/goddd/pkg/orm"
@@ -22,3 +22,16 @@ type Channel struct {
func (*Channel) TableName() string {
return "channels"
}
// 实现 port.Channel 接口
func (c *Channel) GetID() string {
return c.ID
}
func (c *Channel) GetChannelID() string {
return c.ChannelID
}
func (c *Channel) GetDeviceID() string {
return c.DeviceID
}
@@ -1,5 +1,5 @@
// Code generated by godddx, DO AVOID EDIT.
package gb28181
package ipc
import "github.com/ixugo/goddd/pkg/web"
+29
View File
@@ -0,0 +1,29 @@
// Code generated by godddx, DO AVOID EDIT.
package ipc
import (
"github.com/gowvp/gb28181/internal/core/port"
"github.com/ixugo/goddd/domain/uniqueid"
)
// Storer data persistence
type Storer interface {
Device() DeviceStorer
Channel() ChannelStorer
}
// Core business domain
type Core struct {
store Storer
uniqueID uniqueid.Core
protocols map[string]port.Protocol // 协议映射
}
// NewCore create business domain
func NewCore(store Storer, uni uniqueid.Core, protocols map[string]port.Protocol) Core {
return Core{
store: store,
uniqueID: uni,
protocols: protocols,
}
}
@@ -1,5 +1,5 @@
// Code generated by godddx, DO AVOID EDIT.
package gb28181
package ipc
import (
"context"
@@ -102,18 +102,40 @@ func (c Core) AddDevice(ctx context.Context, in *AddDeviceInput) (*Device, error
if err := copier.Copy(&out, in); err != nil {
slog.ErrorContext(ctx, "Copy", "err", err)
}
out.ID = c.uniqueID.UniqueID(bz.IDPrefixGB)
// 协议验证(通过接口调用)
if protocol, ok := c.protocols[out.Type]; ok {
if err := protocol.ValidateDevice(ctx, &out); err != nil {
return nil, reason.ErrBadRequest.SetMsg(err.Error())
}
}
if out.IsOnvif() {
out.ID = c.uniqueID.UniqueID(bz.IDPrefixOnvif)
out.DeviceID = out.ID
} else {
out.ID = c.uniqueID.UniqueID(bz.IDPrefixGB)
}
if err := out.Check(); err != nil {
return nil, reason.ErrBadRequest.SetMsg(err.Error())
}
// 持久化到数据库
if err := c.store.Device().Add(ctx, &out); err != nil {
if orm.IsDuplicatedKey(err) {
return nil, reason.ErrDB.SetMsg("国标 ID 重复,请勿重复添加")
}
return nil, reason.ErrDB.Withf(`Add err[%s]`, err.Error())
}
// 初始化协议连接(失败不影响设备添加)
if protocol, ok := c.protocols[out.Type]; ok {
if err := protocol.InitDevice(ctx, &out); err != nil {
slog.WarnContext(ctx, "初始化协议失败", "err", err, "device_id", out.ID)
}
}
return &out, nil
}
@@ -1,8 +1,9 @@
// Code generated by godddx, DO AVOID EDIT.
package gb28181
package ipc
import (
"fmt"
"strings"
"github.com/ixugo/goddd/pkg/orm"
)
@@ -10,9 +11,10 @@ import (
// Device domain model
type Device struct {
ID string `gorm:"primaryKey" json:"id"`
Type string `gorm:"column:type;notNull;default:'';comment:设备类型(onvif)" json:"type"` // 设备类型(onvif/gb28181)
DeviceID string `gorm:"column:device_id;notNull;uniqueIndex;default:'';comment:20 位国标编号" json:"device_id"` // 20 位国标编号
Name string `gorm:"column:name;notNull;default:'';comment:设备名称" json:"name"` // 设备名称
Trasnport string `gorm:"column:trasnport;notNull;default:'';comment:传输协议(tcp/udp)" json:"trasnport"` // 传输协议(TCP/UDP)
Transport string `gorm:"column:transport;notNull;default:'';comment:传输协议(tcp/udp)" json:"transport"` // 传输协议(TCP/UDP)
StreamMode int8 `gorm:"column:stream_mode;notNull;default:1;comment:数据传输模式(0:UDP; 1:TCP_PASSIVE; 2:TCP_ACTIVE)" json:"stream_mode"` // 数据传输模式
IP string `gorm:"column:ip;notNull;default:''" json:"ip"`
Port int `gorm:"column:port;notNull;default:0" json:"port"`
@@ -27,19 +29,43 @@ type Device struct {
Password string `gorm:"column:password;notNull;default:'';comment:注册密码" json:"password"`
Address string `gorm:"column:address;notNull;default:'';comment:设备网络地址" json:"address"`
Ext DeviceExt `gorm:"column:ext;notNull;default:'{}';type:jsonb;comment:设备属性" json:"ext"` // 设备属性
// onvif
Username string `gorm:"column:username;notNull;default:'';comment:用户名" json:"username"`
Children []*Channel `gorm:"-" json:"children,omitzero"`
}
func (d *Device) IsOnvif() bool {
return strings.ToUpper(d.Type) == "ONVIF"
}
func (d *Device) IsGB28181() bool {
return strings.ToUpper(d.Type) == "GB28181" || d.Type == ""
}
// TableName database table name
func (*Device) TableName() string {
return "devices"
}
func (d Device) Check() error {
if len(d.DeviceID) < 18 {
if d.IsGB28181() && len(d.DeviceID) < 18 {
return fmt.Errorf("国标 ID 长度应大于等于 18 位")
}
if d.IsOnvif() {
if d.Username == "" {
return fmt.Errorf("用户名不能为空")
}
if d.Password == "" {
return fmt.Errorf("密码不能为空")
}
if d.IP == "" {
return fmt.Errorf("IP不能为空")
}
if d.Port == 0 {
return fmt.Errorf("端口不能为空")
}
}
return nil
}
@@ -49,5 +75,34 @@ func (d *Device) init(id, deviceID string) {
}
func (d *Device) NetworkAddress() string {
return d.Trasnport + "://" + d.Address
return d.Transport + "://" + d.Address
}
// 实现 port.Device 接口
func (d *Device) GetID() string {
return d.ID
}
func (d *Device) GetDeviceID() string {
return d.DeviceID
}
func (d *Device) GetType() string {
return d.Type
}
func (d *Device) GetIP() string {
return d.IP
}
func (d *Device) GetPort() int {
return d.Port
}
func (d *Device) GetUsername() string {
return d.Username
}
func (d *Device) GetPassword() string {
return d.Password
}
@@ -1,5 +1,5 @@
// Code generated by godddx, DO AVOID EDIT.
package gb28181
package ipc
import (
"github.com/ixugo/goddd/pkg/web"
@@ -11,7 +11,7 @@ type FindDeviceInput struct {
// DeviceID string `form:"device_id"` // 20 位国标编号
// Name string `form:"name"` // 设备名称
// ID string `form:"id"`
// Trasnport string `form:"trasnport"` // 传输协议(TCP/UDP)
// Transport string `form:"Transport"` // 传输协议(TCP/UDP)
// StreamMode string `form:"stream_mode"` // 数据传输模式(UDP/TCP_PASSIVE,TCP_ACTIVE)
// IP string `form:"ip"`
// Port int `form:"port"`
@@ -41,12 +41,22 @@ type EditDeviceInput struct {
// Ext DeviceExt `json:"ext"` // 设备属性
}
// 目前仅应用于 onvif 添加
type AddDeviceInput struct {
DeviceID string `json:"device_id"` // 20 位国标编号
Name string `json:"name"` // 设备名称
Password string `json:"password"` // 注册密码
// onvif
Username string `json:"username"` // 用户名
IP string `json:"ip"` // ip
Port int `json:"port"` // port
// 通用
Name string `json:"name"` // 设备名称
Password string `json:"password"` // 注册密码
// Trasnport string `json:"trasnport"` // 传输协议(TCP/UDP)
Type string `json:"type"` // 设备类型(onvif/gb28181)
// Addr string `json:"addr"` // 地址(ip:port)
// Transport string `json:"transport"` // 传输协议(TCP/UDP)
// StreamMode string `json:"stream_mode"` // 数据传输模式(UDP/TCP_PASSIVE,TCP_ACTIVE)
// IP string `json:"ip"`
// Port int `json:"port"`
@@ -1,4 +1,4 @@
package gb28181
package ipc
import (
"context"
@@ -9,25 +9,25 @@ import (
"github.com/ixugo/goddd/pkg/web"
)
type GB28181 struct {
type GBDAdapter struct {
// deviceStore DeviceStorer
// channelStore ChannelStorer
store Storer
uni uniqueid.Core
}
func NewGB28181(store Storer, uni uniqueid.Core) GB28181 {
return GB28181{
func NewGBAdapter(store Storer, uni uniqueid.Core) GBDAdapter {
return GBDAdapter{
store: store,
uni: uni,
}
}
func (g GB28181) Store() Storer {
func (g GBDAdapter) Store() Storer {
return g.store
}
func (g GB28181) GetDeviceByDeviceID(deviceID string) (*Device, error) {
func (g GBDAdapter) GetDeviceByDeviceID(deviceID string) (*Device, error) {
ctx := context.TODO()
var d Device
if err := g.store.Device().Get(ctx, &d, orm.Where("device_id=?", deviceID)); err != nil {
@@ -42,7 +42,7 @@ func (g GB28181) GetDeviceByDeviceID(deviceID string) (*Device, error) {
return &d, nil
}
func (g GB28181) Logout(deviceID string, changeFn func(*Device)) error {
func (g GBDAdapter) Logout(deviceID string, changeFn func(*Device)) error {
var d Device
if err := g.store.Device().Edit(context.TODO(), &d, func(d *Device) {
changeFn(d)
@@ -53,7 +53,7 @@ func (g GB28181) Logout(deviceID string, changeFn func(*Device)) error {
return nil
}
func (g GB28181) Edit(deviceID string, changeFn func(*Device)) error {
func (g GBDAdapter) Edit(deviceID string, changeFn func(*Device)) error {
var d Device
if err := g.store.Device().Edit(context.TODO(), &d, func(d *Device) {
changeFn(d)
@@ -64,7 +64,7 @@ func (g GB28181) Edit(deviceID string, changeFn func(*Device)) error {
return nil
}
func (g GB28181) EditPlaying(deviceID, channelID string, playing bool) error {
func (g GBDAdapter) EditPlaying(deviceID, channelID string, playing bool) error {
var ch Channel
if err := g.store.Channel().Edit(context.TODO(), &ch, func(c *Channel) {
c.IsPlaying = playing
@@ -74,7 +74,7 @@ func (g GB28181) EditPlaying(deviceID, channelID string, playing bool) error {
return nil
}
func (g GB28181) SaveChannels(channels []*Channel) error {
func (g GBDAdapter) SaveChannels(channels []*Channel) error {
if len(channels) <= 0 {
return nil
}
@@ -106,7 +106,7 @@ func (g GB28181) SaveChannels(channels []*Channel) error {
}
// FindDevices 获取所有设备
func (g GB28181) FindDevices(ctx context.Context) ([]*Device, error) {
func (g GBDAdapter) FindDevices(ctx context.Context) ([]*Device, error) {
var devices []*Device
if _, err := g.store.Device().Find(ctx, &devices, web.NewPagerFilterMaxSize()); err != nil {
return nil, err
@@ -1,5 +1,5 @@
// Code generated by godddx, DO AVOID EDIT.
package gb28181
package ipc
import (
"database/sql/driver"
@@ -14,11 +14,12 @@ type DeviceExt struct {
Model string `json:"model"` // 型号
Firmware string `json:"firmware"` // 固件版本
Name string `json:"name"` // 设备名
GBVersion string `json:"gb_version"` // GB版本
}
// Scan implements orm.Scaner.
func (i *DeviceExt) Scan(input interface{}) error {
return orm.JsonUnmarshal(input, i)
return orm.JSONUnmarshal(input, i)
}
func (i DeviceExt) Value() (driver.Value, error) {
@@ -1,4 +1,4 @@
package gb28181cache
package ipccache
import (
"context"
@@ -6,7 +6,7 @@ import (
"log/slog"
"strings"
"github.com/gowvp/gb28181/internal/core/gb28181"
gb28181 "github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/pkg/gbs"
"github.com/gowvp/gb28181/pkg/gbs/sip"
"github.com/ixugo/goddd/pkg/conc"
@@ -54,7 +54,7 @@ func (c *Cache) LoadDeviceToMemory(conn sip.Connection) {
}
for _, d := range devices {
if strings.ToLower(d.Trasnport) == "tcp" {
if strings.ToLower(d.Transport) == "tcp" {
// 通知相关设备/通道离线
c.Change(d.DeviceID, func(d *gb28181.Device) {
d.IsOnline = false
@@ -1,9 +1,9 @@
package gb28181cache
package ipccache
import (
"context"
"github.com/gowvp/gb28181/internal/core/gb28181"
gb28181 "github.com/gowvp/gb28181/internal/core/ipc"
"github.com/ixugo/goddd/pkg/orm"
)
@@ -1,11 +1,11 @@
package gb28181cache
package ipccache
import (
"context"
"fmt"
"log/slog"
"github.com/gowvp/gb28181/internal/core/gb28181"
gb28181 "github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/pkg/gbs"
"github.com/ixugo/goddd/pkg/orm"
"gorm.io/gorm"
@@ -1,10 +1,10 @@
// Code generated by godddx, DO AVOID EDIT.
package gb28181db
package ipcdb
import (
"context"
"github.com/gowvp/gb28181/internal/core/gb28181"
gb28181 "github.com/gowvp/gb28181/internal/core/ipc"
"github.com/ixugo/goddd/pkg/orm"
"gorm.io/gorm"
)
@@ -1,10 +1,10 @@
package gb28181db
package ipcdb
import (
"context"
"testing"
"github.com/gowvp/gb28181/internal/core/gb28181"
"github.com/gowvp/gb28181/internal/core/ipc"
"github.com/ixugo/goddd/pkg/orm"
)
@@ -16,7 +16,7 @@ func TestChannelGet(t *testing.T) {
userDB := NewChannel(db)
mock.ExpectQuery(`SELECT \* FROM "channels" WHERE id=\$1 (.+) LIMIT \$2`).WithArgs("jack", 1)
var out gb28181.Channel
var out ipc.Channel
if err := userDB.Get(context.Background(), &out, orm.Where("id=?", "jack")); err != nil {
t.Fatal(err)
}
@@ -1,8 +1,8 @@
// Code generated by godddx, DO AVOID EDIT.
package gb28181db
package ipcdb
import (
"github.com/gowvp/gb28181/internal/core/gb28181"
gb28181 "github.com/gowvp/gb28181/internal/core/ipc"
"gorm.io/gorm"
)
@@ -1,4 +1,4 @@
package gb28181db
package ipcdb
import (
"github.com/DATA-DOG/go-sqlmock"
@@ -1,10 +1,10 @@
// Code generated by godddx, DO AVOID EDIT.
package gb28181db
package ipcdb
import (
"context"
"github.com/gowvp/gb28181/internal/core/gb28181"
gb28181 "github.com/gowvp/gb28181/internal/core/ipc"
"github.com/ixugo/goddd/pkg/orm"
"gorm.io/gorm"
)
@@ -1,10 +1,10 @@
package gb28181db
package ipcdb
import (
"context"
"testing"
"github.com/gowvp/gb28181/internal/core/gb28181"
"github.com/gowvp/gb28181/internal/core/ipc"
"github.com/ixugo/goddd/pkg/orm"
)
@@ -16,7 +16,7 @@ func TestDeviceGet(t *testing.T) {
userDB := NewDevice(db)
mock.ExpectQuery(`SELECT \* FROM "devices" WHERE id=\$1 (.+) LIMIT \$2`).WithArgs("jack", 1)
var out gb28181.Device
var out ipc.Device
if err := userDB.Get(context.Background(), &out, orm.Where("id=?", "jack")); err != nil {
t.Fatal(err)
}
+50
View File
@@ -0,0 +1,50 @@
package port
import (
"context"
)
// Device 设备接口(避免循环依赖)
// 协议适配器的实现会接收具体的 gb28181.Device 类型
type Device interface {
GetID() string
GetDeviceID() string
GetType() string
GetIP() string
GetPort() int
GetUsername() string
GetPassword() string
}
// Channel 通道接口(避免循环依赖)
type Channel interface {
GetID() string
GetChannelID() string
GetDeviceID() string
}
// Protocol 协议抽象接口(端口)
type Protocol interface {
// ValidateDevice 验证设备连接(添加设备前调用)
ValidateDevice(ctx context.Context, device Device) error
// InitDevice 初始化设备连接(添加设备后调用)
// 例如: GB28181 不需要主动初始化,ONVIF 需要查询 Profiles 作为通道
InitDevice(ctx context.Context, device Device) error
// QueryCatalog 查询设备目录/通道
QueryCatalog(ctx context.Context, device Device) error
// StartPlay 开始播放
StartPlay(ctx context.Context, device Device, channel Channel) (*PlayResponse, error)
// StopPlay 停止播放
StopPlay(ctx context.Context, device Device, channel Channel) error
}
// PlayResponse 播放响应
type PlayResponse struct {
SSRC string // GB28181 SSRC
Stream string // 流 ID
RTSP string // RTSP 地址 (ONVIF)
}
+1 -1
View File
@@ -25,7 +25,7 @@ type MediaServerPorts struct {
// Scan implements orm.Scaner.
func (i *MediaServerPorts) Scan(input interface{}) error {
return orm.JsonUnmarshal(input, i)
return orm.JSONUnmarshal(input, i)
}
// Value implements driver.Valuer.
+157
View File
@@ -0,0 +1,157 @@
package onvifadapter
import (
"context"
"fmt"
"log/slog"
gb28181 "github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/internal/core/port"
"github.com/gowvp/onvif"
m "github.com/gowvp/onvif/media"
sdkmedia "github.com/gowvp/onvif/sdk/media"
"github.com/ixugo/goddd/pkg/orm"
)
var _ port.Protocol = (*Adapter)(nil)
// Adapter ONVIF 协议适配器
type Adapter struct {
devices map[string]*onvif.Device // ONVIF 设备连接缓存
store gb28181.Storer // 协议适配器可以依赖领域的存储接口
}
func NewAdapter(store gb28181.Storer) *Adapter {
return &Adapter{
devices: make(map[string]*onvif.Device),
store: store,
}
}
// ValidateDevice 实现 port.Protocol 接口 - ONVIF 设备验证
func (a *Adapter) ValidateDevice(ctx context.Context, device port.Device) error {
// 尝试连接 ONVIF 设备并验证可以获取 Profiles
dev, err := onvif.NewDevice(onvif.DeviceParams{
Xaddr: fmt.Sprintf("%s:%d", device.GetIP(), device.GetPort()),
Username: device.GetUsername(),
Password: device.GetPassword(),
})
if err != nil {
return fmt.Errorf("ONVIF 连接失败: %w", err)
}
// 验证可以获取 Profiles
_, err = sdkmedia.Call_GetProfiles(ctx, dev, m.GetProfiles{})
if err != nil {
return fmt.Errorf("获取 ONVIF Profiles 失败: %w", err)
}
return nil
}
// InitDevice 实现 port.Protocol 接口 - 初始化 ONVIF 设备
// ONVIF 设备初始化时,自动查询 Profiles 并创建为通道
func (a *Adapter) InitDevice(ctx context.Context, device port.Device) error {
// 创建 ONVIF 连接
dev, err := onvif.NewDevice(onvif.DeviceParams{
Xaddr: fmt.Sprintf("%s:%d", device.GetIP(), device.GetPort()),
Username: device.GetUsername(),
Password: device.GetPassword(),
})
if err != nil {
return err
}
// 缓存设备连接
a.devices[device.GetID()] = dev
// 自动查询 Profiles 作为通道
return a.queryAndSaveProfiles(ctx, device, dev)
}
// QueryCatalog 实现 port.Protocol 接口 - ONVIF 查询 Profiles
func (a *Adapter) QueryCatalog(ctx context.Context, device port.Device) error {
dev, ok := a.devices[device.GetID()]
if !ok {
// 设备连接不在缓存中,尝试重新连接
var err error
dev, err = onvif.NewDevice(onvif.DeviceParams{
Xaddr: fmt.Sprintf("%s:%d", device.GetIP(), device.GetPort()),
Username: device.GetUsername(),
Password: device.GetPassword(),
})
if err != nil {
return fmt.Errorf("ONVIF 设备未初始化: %w", err)
}
a.devices[device.GetID()] = dev
}
return a.queryAndSaveProfiles(ctx, device, dev)
}
// StartPlay 实现 port.Protocol 接口 - ONVIF 播放
func (a *Adapter) StartPlay(ctx context.Context, device port.Device, channel port.Channel) (*port.PlayResponse, error) {
dev, ok := a.devices[device.GetID()]
if !ok {
return nil, fmt.Errorf("ONVIF 设备未初始化")
}
// 获取 RTSP 地址
streamURI, err := a.getStreamURI(ctx, dev, channel.GetChannelID())
if err != nil {
return nil, err
}
return &port.PlayResponse{
RTSP: streamURI,
}, nil
}
// StopPlay 实现 port.Protocol 接口 - ONVIF 停止播放
func (a *Adapter) StopPlay(ctx context.Context, device port.Device, channel port.Channel) error {
// ONVIF 通常不需要显式停止播放
return nil
}
// queryAndSaveProfiles 查询 ONVIF Profiles 并保存为通道
func (a *Adapter) queryAndSaveProfiles(ctx context.Context, device port.Device, dev *onvif.Device) error {
// 查询 ONVIF Profiles
resp, err := sdkmedia.Call_GetProfiles(ctx, dev, m.GetProfiles{})
if err != nil {
return fmt.Errorf("获取 ONVIF Profiles 失败: %w", err)
}
// 将 Profiles 转换为通道并保存
for _, profile := range resp.Profiles {
channel := &gb28181.Channel{
DeviceID: device.GetDeviceID(),
ChannelID: string(profile.Token),
Name: string(profile.Name),
DID: device.GetID(),
}
// 保存到数据库(使用领域层的存储接口)
if err := a.store.Channel().Add(ctx, channel); err != nil {
// 如果是重复错误,忽略
if orm.IsDuplicatedKey(err) {
slog.DebugContext(ctx, "通道已存在", "channel_id", channel.ChannelID)
continue
}
slog.ErrorContext(ctx, "保存通道失败", "err", err, "channel_id", channel.ChannelID)
continue
}
slog.InfoContext(ctx, "ONVIF Profile 保存为通道", "channel_id", channel.ChannelID, "name", channel.Name)
}
return nil
}
// getStreamURI 获取 RTSP 流地址
func (a *Adapter) getStreamURI(ctx context.Context, dev *onvif.Device, profileToken string) (string, error) {
// TODO: 调用 ONVIF GetStreamUri 方法
// 这里需要根据 onvif SDK 的实际 API 来实现
// 临时实现:假设 profileToken 可以直接构造 RTSP 地址
params := dev.GetDeviceParams()
return fmt.Sprintf("rtsp://%s:%s@%s/stream/%s", params.Username, params.Password, params.Xaddr, profileToken), nil
}
+1 -1
View File
@@ -21,7 +21,7 @@ type ConfigAPI struct {
}
func NewConfigAPI(db *gorm.DB, conf *conf.Bootstrap) ConfigAPI {
core := config.NewCore(configdb.NewDB(db).AutoMigrate(orm.EnabledAutoMigrate))
core := config.NewCore(configdb.NewDB(db).AutoMigrate(orm.GetEnabledAutoMigrate()))
return ConfigAPI{configCore: core, conf: conf}
}
@@ -2,6 +2,7 @@
package api
import (
"encoding/json"
"fmt"
"io"
"log/slog"
@@ -12,11 +13,14 @@ import (
"time"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/gowvp/gb28181/internal/core/bz"
"github.com/gowvp/gb28181/internal/core/gb28181"
"github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/internal/core/port"
"github.com/gowvp/gb28181/internal/core/push"
"github.com/gowvp/gb28181/internal/core/sms"
"github.com/gowvp/gb28181/pkg/zlm"
"github.com/gowvp/onvif"
"github.com/ixugo/goddd/domain/uniqueid"
"github.com/ixugo/goddd/pkg/hook"
"github.com/ixugo/goddd/pkg/orm"
@@ -33,7 +37,9 @@ const (
// TODO: 快照不会删除,只会覆盖,设备删除时也不会删除快照,待实现
func writeCover(dataDir, channelID string, body []byte) error {
coverPath := filepath.Join(dataDir, coverDir)
os.MkdirAll(coverPath, 0o755)
if err := os.MkdirAll(coverPath, 0o777); err != nil {
return err
}
return os.WriteFile(filepath.Join(coverPath, channelID+".jpg"), body, 0o644)
}
@@ -46,80 +52,100 @@ func readCover(dataDir, channelID string) ([]byte, error) {
return os.ReadFile(readCoverPath(dataDir, channelID))
}
type GB28181API struct {
gb28181Core gb28181.Core
uc *Usecase
type IPCAPI struct {
ipc ipc.Core
uc *Usecase
}
func NewGB28181API(core gb28181.Core) GB28181API {
return GB28181API{gb28181Core: core}
func NewIPCAPI(core ipc.Core) IPCAPI {
return IPCAPI{ipc: core}
}
func NewGB28181Core(store gb28181.Storer, uni uniqueid.Core) gb28181.Core {
return gb28181.NewCore(store, uni)
func NewIPCCore(store ipc.Storer, uni uniqueid.Core, protocols map[string]port.Protocol) ipc.Core {
return ipc.NewCore(store, uni, protocols)
}
func registerGB28181(g gin.IRouter, api GB28181API, handler ...gin.HandlerFunc) {
func registerGB28181(g gin.IRouter, api IPCAPI, handler ...gin.HandlerFunc) {
// GB28181 协议特有的回调接口
g.Any("/gb28181/snapshot", func(c *gin.Context) {
b, err := io.ReadAll(c.Request.Body)
if err != nil {
panic(err)
}
os.WriteFile(orm.GenerateRandomString(10)+".jpg", b, 0o644)
if err := os.WriteFile(orm.GenerateRandomString(10)+".jpg", b, 0o644); err != nil {
slog.ErrorContext(c.Request.Context(), "write cover", "err", err)
}
c.JSON(200, gin.H{"msg": "ok"})
})
// 统一的设备管理 API(支持所有协议)
{
group := g.Group("/devices", handler...)
group.GET("", web.WrapH(api.findDevice)) // 设备列表
group.GET("/:id", web.WrapH(api.getDevice)) // 设备详情
group.PUT("/:id", web.WrapH(api.editDevice)) // 修改设备详情
group.POST("", web.WrapH(api.addDevice)) // 添加设备
group.DELETE("/:id", web.WrapH(api.delDevice)) // 删除设备
group.POST("/:id/catalog", web.WrapH(api.queryCatalog)) // 刷新通道
group.GET("/channels", web.WrapH(api.FindChannelsForDevice)) // 设备与通道列表
group.GET("", web.WrapH(api.findDevice)) // 设备列表(所有协议)
group.GET("/:id", web.WrapH(api.getDevice)) // 设备详情(所有协议)
group.PUT("/:id", web.WrapH(api.editDevice)) // 修改设备(所有协议)
group.POST("", web.WrapH(api.addDevice)) // 添加设备(所有协议,通过 type 区分)
group.DELETE("/:id", web.WrapH(api.delDevice)) // 删除设备(所有协议)
group.GET("/channels", web.WrapH(api.FindChannelsForDevice)) // 设备与通道列表(所有协议)
// GB28181 特有功能
group.POST("/:id/catalog", web.WrapH(api.queryCatalog)) // 刷新通道(GB28181 特有)
}
{
group := g.Group("/onvif", handler...)
group.GET("/discover", api.discover) // ONVIF 设备发现(ONVIF 特有)
}
// 统一的通道管理 API(支持所有协议)
{
group := g.Group("/channels", handler...)
group.GET("", web.WrapH(api.findChannel))
group.PUT("/:id", web.WrapH(api.editChannel))
group.POST("/:id/play", web.WrapH(api.play))
group.POST("/:id/snapshot", web.WrapH(api.refreshSnapshot)) // 图像抓拍
group.GET("/:id/snapshot", api.getSnapshot) // 获取图像
// group.GET("/:id", web.WrapH(api.getChannel))
// group.POST("", web.WrapH(api.addChannel))
// group.DELETE("/:id", web.WrapH(api.delChannel))
group.GET("", web.WrapH(api.findChannel)) // 通道列表(所有协议)
group.PUT("/:id", web.WrapH(api.editChannel)) // 修改通道(所有协议)
group.POST("/:id/play", web.WrapH(api.play)) // 播放(所有协议)
group.POST("/:id/snapshot", web.WrapH(api.refreshSnapshot)) // 图像抓拍(所有协议)
group.GET("/:id/snapshot", api.getSnapshot) // 获取图像(所有协议)
}
}
// >>> device >>>>>>>>>>>>>>>>>>>>
func (a GB28181API) findDevice(c *gin.Context, in *gb28181.FindDeviceInput) (any, error) {
items, total, err := a.gb28181Core.FindDevice(c.Request.Context(), in)
func (a IPCAPI) findDevice(c *gin.Context, in *ipc.FindDeviceInput) (any, error) {
items, total, err := a.ipc.FindDevice(c.Request.Context(), in)
return gin.H{"items": items, "total": total}, err
}
func (a GB28181API) getDevice(c *gin.Context, _ *struct{}) (any, error) {
func (a IPCAPI) getDevice(c *gin.Context, _ *struct{}) (any, error) {
deviceID := c.Param("id")
return a.gb28181Core.GetDevice(c.Request.Context(), deviceID)
return a.ipc.GetDevice(c.Request.Context(), deviceID)
}
func (a GB28181API) editDevice(c *gin.Context, in *gb28181.EditDeviceInput) (any, error) {
func (a IPCAPI) editDevice(c *gin.Context, in *ipc.EditDeviceInput) (any, error) {
deviceID := c.Param("id")
return a.gb28181Core.EditDevice(c.Request.Context(), in, deviceID)
return a.ipc.EditDevice(c.Request.Context(), in, deviceID)
}
func (a GB28181API) addDevice(c *gin.Context, in *gb28181.AddDeviceInput) (any, error) {
return a.gb28181Core.AddDevice(c.Request.Context(), in)
// addDevice 添加设备(支持所有协议类型)
// 通过 type 字段区分协议: "GB28181" 或 "ONVIF"
//
// 示例1 - 添加 GB28181 设备:
//
// POST /devices
// { "type": "GB28181", "device_id": "34020000001320000001", "name": "摄像头1" }
//
// 示例2 - 添加 ONVIF 设备:
//
// POST /devices
// { "type": "ONVIF", "ip": "192.168.1.100", "port": 80, "username": "admin", "password": "12345" }
func (a IPCAPI) addDevice(c *gin.Context, in *ipc.AddDeviceInput) (any, error) {
return a.ipc.AddDevice(c.Request.Context(), in)
}
func (a GB28181API) delDevice(c *gin.Context, _ *struct{}) (any, error) {
func (a IPCAPI) delDevice(c *gin.Context, _ *struct{}) (any, error) {
did := c.Param("id")
return a.gb28181Core.DelDevice(c.Request.Context(), did)
return a.ipc.DelDevice(c.Request.Context(), did)
}
func (a GB28181API) queryCatalog(c *gin.Context, _ *struct{}) (any, error) {
func (a IPCAPI) queryCatalog(c *gin.Context, _ *struct{}) (any, error) {
did := c.Param("id")
if err := a.uc.SipServer.QueryCatalog(did); err != nil {
return nil, ErrDevice.SetMsg(err.Error())
@@ -127,8 +153,8 @@ func (a GB28181API) queryCatalog(c *gin.Context, _ *struct{}) (any, error) {
return gin.H{"msg": "ok"}, nil
}
func (a GB28181API) FindChannelsForDevice(c *gin.Context, in *gb28181.FindDeviceInput) (any, error) {
items, total, err := a.gb28181Core.FindChannelsForDevice(c.Request.Context(), in)
func (a IPCAPI) FindChannelsForDevice(c *gin.Context, in *ipc.FindDeviceInput) (any, error) {
items, total, err := a.ipc.FindChannelsForDevice(c.Request.Context(), in)
// 按照在线优先排序
sort.SliceStable(items, func(i, j int) bool {
@@ -140,8 +166,8 @@ func (a GB28181API) FindChannelsForDevice(c *gin.Context, in *gb28181.FindDevice
// >>> channel >>>>>>>>>>>>>>>>>>>>
func (a GB28181API) findChannel(c *gin.Context, in *gb28181.FindChannelInput) (any, error) {
items, total, err := a.gb28181Core.FindChannel(c.Request.Context(), in)
func (a IPCAPI) findChannel(c *gin.Context, in *ipc.FindChannelInput) (any, error) {
items, total, err := a.ipc.FindChannel(c.Request.Context(), in)
return gin.H{"items": items, "total": total}, err
}
@@ -150,9 +176,9 @@ func (a GB28181API) findChannel(c *gin.Context, in *gb28181.FindChannelInput) (a
// return a.gb28181Core.GetChannel(c.Request.Context(), channelID)
// }
func (a GB28181API) editChannel(c *gin.Context, in *gb28181.EditChannelInput) (any, error) {
func (a IPCAPI) editChannel(c *gin.Context, in *ipc.EditChannelInput) (any, error) {
cid := c.Param("id")
return a.gb28181Core.EditChannel(c.Request.Context(), in, cid)
return a.ipc.EditChannel(c.Request.Context(), in, cid)
}
// func (a GB28181API) addChannel(c *gin.Context, in *gb28181.AddChannelInput) (any, error) {
@@ -164,7 +190,7 @@ func (a GB28181API) editChannel(c *gin.Context, in *gb28181.EditChannelInput) (a
// return a.gb28181Core.DelChannel(c.Request.Context(), channelID)
// }
func (a GB28181API) play(c *gin.Context, _ *struct{}) (*playOutput, error) {
func (a IPCAPI) play(c *gin.Context, _ *struct{}) (*playOutput, error) {
channelID := c.Param("id")
var app, appStream, host, stream, session, mediaServerID string
@@ -176,7 +202,7 @@ func (a GB28181API) play(c *gin.Context, _ *struct{}) (*playOutput, error) {
return nil, reason.ErrUsedLogic.SetMsg("请先配置流媒体 SDP 收流地址")
}
// a.uc.SipServer.
ch, err := a.gb28181Core.GetChannel(c.Request.Context(), channelID)
ch, err := a.ipc.GetChannel(c.Request.Context(), channelID)
if err != nil {
return nil, err
}
@@ -272,15 +298,22 @@ func (a GB28181API) play(c *gin.Context, _ *struct{}) (*playOutput, error) {
// 取一张快照
go func() {
body, err := a.uc.SMSAPI.smsCore.GetSnapshot(svr, zlm.GetSnapRequest{
URL: out.Items[0].RTSP,
TimeoutSec: 10,
ExpireSec: 15,
})
if err != nil {
slog.ErrorContext(c.Request.Context(), "get snapshot", "err", err)
} else {
writeCover(a.uc.Conf.ConfigDir, channelID, body)
for range 2 {
time.Sleep(5 * time.Second)
rtsp := fmt.Sprintf("rtsp://%s:%d/%s", "127.0.0.1", svr.Ports.RTSP, stream) + "?" + session
body, err := a.uc.SMSAPI.smsCore.GetSnapshot(svr, zlm.GetSnapRequest{
URL: rtsp,
TimeoutSec: 10,
ExpireSec: 15,
})
if err != nil {
slog.ErrorContext(c.Request.Context(), "get snapshot", "err", err)
continue
}
if err := writeCover(a.uc.Conf.ConfigDir, channelID, body); err != nil {
slog.ErrorContext(c.Request.Context(), "write cover", "err", err)
}
break
}
}()
return &out, nil
@@ -293,7 +326,7 @@ type refreshSnapshotInput struct {
URL string `json:"url"`
}
func (a GB28181API) refreshSnapshot(c *gin.Context, in *refreshSnapshotInput) (any, error) {
func (a IPCAPI) refreshSnapshot(c *gin.Context, in *refreshSnapshotInput) (any, error) {
channelID := c.Param("id")
path := readCoverPath(a.uc.Conf.ConfigDir, channelID)
@@ -329,7 +362,9 @@ func (a GB28181API) refreshSnapshot(c *gin.Context, in *refreshSnapshotInput) (a
// return nil, reason.ErrBadRequest.Msg(err.Error())
} else {
if hook.MD5FromBytes(img) != "" {
writeCover(a.uc.Conf.ConfigDir, channelID, img)
if err := writeCover(a.uc.Conf.ConfigDir, channelID, img); err != nil {
slog.ErrorContext(c.Request.Context(), "write cover", "err", err)
}
}
}
}
@@ -337,12 +372,68 @@ func (a GB28181API) refreshSnapshot(c *gin.Context, in *refreshSnapshotInput) (a
return gin.H{"link": fmt.Sprintf("%s/channels/%s/snapshot?token=%s", prefix, channelID, token)}, nil
}
func (a GB28181API) getSnapshot(c *gin.Context) {
func (a IPCAPI) getSnapshot(c *gin.Context) {
channelID := c.Param("id")
body, err := readCover(a.uc.Conf.ConfigDir, channelID)
if err != nil {
reason.ErrNotFound.SetMsg(err.Error())
web.Fail(c, reason.ErrNotFound.SetMsg(err.Error()))
return
}
c.Data(200, "image/jpeg", body)
}
type DiscoverResponse struct {
Addr string `json:"addr"`
}
func toDiscoverResponse(dev *onvif.Device) *DiscoverResponse {
addr := dev.GetDeviceParams().Xaddr
if !strings.Contains(addr, ":") {
addr += ":80"
}
return &DiscoverResponse{
Addr: addr,
}
}
func (a IPCAPI) discover(c *gin.Context) {
recv, cancel, err := onvif.AllAvailableDevicesAtSpecificEthernetInterfaces()
if err != nil {
web.Fail(c, err)
return
}
defer cancel()
se := web.NewSSE(64, time.Minute)
go func() {
defer func() {
se.Publish(web.Event{
ID: uuid.NewString(),
Event: "end",
})
se.Close()
}()
for {
select {
case dev := <-recv:
if dev == nil {
return
}
// TODO: 已经添加的设备需要过滤掉
b, _ := json.Marshal(toDiscoverResponse(dev))
se.Publish(web.Event{
ID: uuid.NewString(),
Event: "discover",
Data: b,
})
time.Sleep(time.Millisecond * 200)
case <-c.Request.Context().Done():
return
case <-time.After(3 * time.Second):
slog.DebugContext(c.Request.Context(), "discover timeout")
return
}
}
}()
se.ServeHTTP(c.Writer, c.Request)
}
+25 -13
View File
@@ -6,11 +6,13 @@ import (
"github.com/gin-gonic/gin"
"github.com/google/wire"
"github.com/gowvp/gb28181/internal/conf"
"github.com/gowvp/gb28181/internal/core/gb28181"
"github.com/gowvp/gb28181/internal/core/gb28181/store/gb28181cache"
"github.com/gowvp/gb28181/internal/core/gb28181/store/gb28181db"
"github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/internal/core/ipc/store/ipccache"
"github.com/gowvp/gb28181/internal/core/ipc/store/ipcdb"
"github.com/gowvp/gb28181/internal/core/port"
"github.com/gowvp/gb28181/internal/core/push"
"github.com/gowvp/gb28181/internal/core/push/store/pushdb"
onvifadapter "github.com/gowvp/gb28181/internal/protocol/onvif"
"github.com/gowvp/gb28181/pkg/gbs"
"github.com/ixugo/goddd/domain/uniqueid"
"github.com/ixugo/goddd/domain/uniqueid/store/uniqueiddb"
@@ -31,10 +33,7 @@ var (
NewUniqueID,
NewPushCore, NewPushAPI,
gbs.NewServer,
NewGB28181Store,
NewGB28181API,
NewGB28181Core,
NewGB28181,
NewIPCStore, NewProtocols, NewIPCCore, NewIPCAPI, NewGBAdapter,
NewProxyAPI,
NewConfigAPI,
NewUserAPI,
@@ -49,7 +48,7 @@ type Usecase struct {
WebHookAPI WebHookAPI
UniqueID uniqueid.Core
MediaAPI PushAPI
GB28181API GB28181API
GB28181API IPCAPI
ProxyAPI ProxyAPI
ConfigAPI ConfigAPI
@@ -79,7 +78,7 @@ func NewHTTPHandler(uc *Usecase) http.Handler {
}
setupRouter(g, uc) // 设置路由处理函数
uc.Version.RecordVersion()
return g // 返回配置好的 Gin 实例作为 http.Handler
}
@@ -92,13 +91,26 @@ func NewPushCore(db *gorm.DB, uni uniqueid.Core) push.Core {
return push.NewCore(pushdb.NewDB(db).AutoMigrate(orm.GetEnabledAutoMigrate()), uni)
}
func NewGB28181Store(db *gorm.DB) gb28181.Storer {
return gb28181cache.NewCache(gb28181db.NewDB(db).AutoMigrate(orm.GetEnabledAutoMigrate()))
func NewIPCStore(db *gorm.DB) ipc.Storer {
return ipccache.NewCache(ipcdb.NewDB(db).AutoMigrate(orm.GetEnabledAutoMigrate()))
}
func NewGB28181(store gb28181.Storer, uni uniqueid.Core) gb28181.GB28181 {
return gb28181.NewGB28181(
func NewGBAdapter(store ipc.Storer, uni uniqueid.Core) ipc.GBDAdapter {
return ipc.NewGBAdapter(
store,
uni,
)
}
// NewProtocols 创建协议适配器映射
func NewProtocols(store ipc.Storer) map[string]port.Protocol {
protocols := make(map[string]port.Protocol)
// 注册 ONVIF 协议适配器
protocols["ONVIF"] = onvifadapter.NewAdapter(store)
// TODO: 注册 GB28181 协议适配器
// protocols["GB28181"] = gb28181adapter.NewAdapter(sipServer, store)
return protocols
}
+1 -1
View File
@@ -16,7 +16,7 @@ type ProxyAPI struct {
}
func NewProxyAPI(db *gorm.DB, uni uniqueid.Core) ProxyAPI {
core := proxy.NewCore(proxydb.NewDB(db).AutoMigrate(orm.EnabledAutoMigrate), uni)
core := proxy.NewCore(proxydb.NewDB(db).AutoMigrate(orm.GetEnabledAutoMigrate()), uni)
return ProxyAPI{proxyCore: core}
}
+3 -3
View File
@@ -9,7 +9,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/gowvp/gb28181/internal/conf"
"github.com/gowvp/gb28181/internal/core/bz"
"github.com/gowvp/gb28181/internal/core/gb28181"
gb28181 "github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/internal/core/push"
"github.com/gowvp/gb28181/internal/core/sms"
"github.com/gowvp/gb28181/pkg/gbs"
@@ -204,12 +204,12 @@ type RTPStream struct {
}
func (r RTPStream) onStreamNotFound(ctx context.Context, in *onStreamNotFoundInput) error {
ch, err := r.uc.GB28181API.gb28181Core.GetChannel(ctx, in.Stream)
ch, err := r.uc.GB28181API.ipc.GetChannel(ctx, in.Stream)
if err != nil {
return err
}
dev, err := r.uc.GB28181API.gb28181Core.GetDevice(ctx, ch.DID)
dev, err := r.uc.GB28181API.ipc.GetDevice(ctx, ch.DID)
if err != nil {
return err
}
+3 -3
View File
@@ -8,7 +8,7 @@ import (
"sync"
"time"
"github.com/gowvp/gb28181/internal/core/gb28181"
"github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/pkg/gbs/m"
"github.com/gowvp/gb28181/pkg/gbs/sip"
"github.com/ixugo/goddd/pkg/conc"
@@ -43,7 +43,7 @@ type Device struct {
keepaliveTimeout uint16
}
func NewDevice(conn sip.Connection, d *gb28181.Device) *Device {
func NewDevice(conn sip.Connection, d *ipc.Device) *Device {
uri, err := sip.ParseURI(fmt.Sprintf("sip:%s@%s", d.DeviceID, d.Address))
if err != nil {
slog.Error("parse uri", "err", err, "did", d.ID)
@@ -89,7 +89,7 @@ func (d *Device) CheckConnection() error {
return nil
}
func (d *Device) LoadChannels(channels ...*gb28181.Channel) {
func (d *Device) LoadChannels(channels ...*ipc.Channel) {
for _, channel := range channels {
ch := Channel{
ChannelID: channel.ChannelID,
+3 -3
View File
@@ -3,7 +3,7 @@ package gbs
import (
"encoding/hex"
"github.com/gowvp/gb28181/internal/core/gb28181"
"github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/pkg/gbs/sip"
)
@@ -43,14 +43,14 @@ func (g GB28181API) sipMessageDeviceInfo(ctx *sip.Context) {
return
}
if err := g.core.Edit(ctx.DeviceID, func(d *gb28181.Device) {
if err := g.core.Edit(ctx.DeviceID, func(d *ipc.Device) {
d.Ext.Firmware = msg.Firmware
d.Ext.Manufacturer = msg.Manufacturer
d.Ext.Model = msg.Model
d.Ext.Name = msg.DeviceName
d.Address = ctx.Source.String()
d.Trasnport = ctx.Source.Network()
d.Transport = ctx.Source.Network()
}); err != nil {
ctx.Log.Error("Edit", "err", err)
ctx.String(500, ErrDatabase.Error())
+3 -3
View File
@@ -1,7 +1,7 @@
package gbs
import (
"github.com/gowvp/gb28181/internal/core/gb28181"
"github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/pkg/gbs/sip"
"github.com/ixugo/goddd/pkg/orm"
// "github.com/panjjo/gosip/db"
@@ -31,11 +31,11 @@ func (g *GB28181API) sipMessageKeepalive(ctx *sip.Context) {
to: ctx.To,
})
if err := g.svr.memoryStorer.Change(ctx.DeviceID, func(d *gb28181.Device) {
if err := g.svr.memoryStorer.Change(ctx.DeviceID, func(d *ipc.Device) {
d.KeepaliveAt = orm.Now()
d.IsOnline = msg.Status == "OK" || msg.Status == "ON"
d.Address = ctx.Source.String()
d.Trasnport = ctx.Source.Network()
d.Transport = ctx.Source.Network()
}, func(d *Device) {
d.conn = ctx.Request.GetConnection()
d.source = ctx.Source
+6 -7
View File
@@ -7,7 +7,7 @@ import (
"strings"
"sync"
"github.com/gowvp/gb28181/internal/core/gb28181"
"github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/internal/core/sms"
"github.com/gowvp/gb28181/pkg/gbs/m"
"github.com/gowvp/gb28181/pkg/gbs/sip"
@@ -16,13 +16,13 @@ import (
)
type PlayInput struct {
Channel *gb28181.Channel
Channel *ipc.Channel
SMS *sms.MediaServer
StreamMode int8
}
type StopPlayInput struct {
Channel *gb28181.Channel
Channel *ipc.Channel
}
// stopPlay 不加锁的
@@ -158,7 +158,7 @@ func GetIP(input string) (string, error) {
return ips[0].String(), nil
}
slog.Error("域名没有解析到任何IP地址", "域名", input)
slog.Error("域名没有解析到任何IP地址", "域名", input)
return input, fmt.Errorf("域名没有解析到IP地址")
}
@@ -196,11 +196,10 @@ func (g *GB28181API) sipPlayPush2(ch *Channel, in *PlayInput, port int, stream *
video.AddAttribute("rtpmap", "97", "MPEG4/90000")
video.AddAttribute("rtpmap", "98", "H264/90000")
//获取配置值
// 获取配置值
ipstr := in.SMS.GetSDPIP()
//进行IP解析
// 进行IP解析
ip4str, err := GetIP(ipstr)
if err != nil {
slog.Error("域名解析失败", "域名", ipstr, "错误", err)
return err
-1
View File
@@ -139,7 +139,6 @@ type RecordDate struct {
Items []RecordInfo `json:"items"`
}
// RecordInfo RecordInfo
type RecordInfo struct {
Start int64 `json:"start" bson:"start"`
End int64 `json:"end" bson:"end"`
+25 -19
View File
@@ -9,7 +9,7 @@ import (
"unicode"
"github.com/gowvp/gb28181/internal/conf"
"github.com/gowvp/gb28181/internal/core/gb28181"
gb28181 "github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/internal/core/sms"
"github.com/gowvp/gb28181/pkg/gbs/sip"
"github.com/ixugo/goddd/pkg/conc"
@@ -20,7 +20,7 @@ const ignorePassword = "#"
type GB28181API struct {
cfg *conf.SIP
core gb28181.GB28181
core gb28181.GBDAdapter
catalog *sip.Collector[Channels]
@@ -32,7 +32,7 @@ type GB28181API struct {
sms *sms.NodeManager
}
func NewGB28181API(cfg *conf.Bootstrap, store gb28181.GB28181, sms *sms.NodeManager) *GB28181API {
func NewGB28181API(cfg *conf.Bootstrap, store gb28181.GBDAdapter, sms *sms.NodeManager) *GB28181API {
g := GB28181API{
cfg: &cfg.Sip,
core: store,
@@ -130,14 +130,16 @@ func (g *GB28181API) handlerRegister(ctx *sip.Context) {
if dev.Password == ignorePassword {
password = ""
}
hdrs := ctx.Request.GetHeaders("Authorization")
if len(hdrs) == 0 {
resp := sip.NewResponseFromRequest("", ctx.Request, http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized), nil)
resp.AppendHeader(&sip.GenericHeader{HeaderName: "WWW-Authenticate", Contents: fmt.Sprintf(`Digest realm="%s",qop="auth",nonce="%s"`, g.cfg.Domain, sip.RandString(32))})
_ = ctx.Tx.Respond(resp)
return
}
if password != "" {
hdrs := ctx.Request.GetHeaders("Authorization")
if len(hdrs) == 0 {
resp := sip.NewResponseFromRequest("", ctx.Request, http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized), nil)
resp.AppendHeader(&sip.GenericHeader{HeaderName: "WWW-Authenticate", Contents: fmt.Sprintf("Digest nonce=\"%s\", algorithm=MD5, realm=\"%s\",qop=\"auth\"", sip.RandString(32), g.cfg.Domain)})
_ = ctx.Tx.Respond(resp)
return
}
authenticateHeader := hdrs[0].(*sip.GenericHeader)
auth := sip.AuthFromValue(authenticateHeader.Contents)
auth.SetPassword(password)
@@ -170,10 +172,19 @@ func (g *GB28181API) handlerRegister(ctx *sip.Context) {
respFn()
return
}
g.login(ctx, expire)
g.login(ctx, func(b *gb28181.Device) {
b.IsOnline = true
b.RegisteredAt = orm.Now()
b.KeepaliveAt = orm.Now()
b.Expires, _ = strconv.Atoi(expire)
b.Address = ctx.Source.String()
b.Transport = ctx.Source.Network()
b.Ext.GBVersion = ctx.XGBVer
})
// conn := ctx.Request.GetConnection()
// fmt.Printf(">>> %p\n", conn)
// fmt.Printf(">>> %p\n", conn
ctx.Log.Info("设备注册成功")
// ctx.Log.Debug("device info", "source", ctx.Source, "host", ctx.Host)
@@ -185,14 +196,9 @@ func (g *GB28181API) handlerRegister(ctx *sip.Context) {
_ = g.QueryConfigDownloadBasic(dev.DeviceID)
}
func (g GB28181API) login(ctx *sip.Context, expire string) {
func (g GB28181API) login(ctx *sip.Context, fn func(d *gb28181.Device)) {
slog.Info("status change 设备上线", "device_id", ctx.DeviceID)
g.svr.memoryStorer.Change(ctx.DeviceID, func(d *gb28181.Device) {
d.IsOnline = true
d.RegisteredAt = orm.Now()
d.KeepaliveAt = orm.Now()
d.Expires, _ = strconv.Atoi(expire)
}, func(d *Device) {
g.svr.memoryStorer.Change(ctx.DeviceID, fn, func(d *Device) {
d.conn = ctx.Request.GetConnection()
d.source = ctx.Source
d.to = ctx.To
+5 -6
View File
@@ -11,12 +11,12 @@ import (
"time"
"github.com/gowvp/gb28181/internal/conf"
"github.com/gowvp/gb28181/internal/core/gb28181"
gb28181 "github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/internal/core/sms"
"github.com/gowvp/gb28181/pkg/gbs/m"
"github.com/gowvp/gb28181/pkg/gbs/sip"
"github.com/ixugo/goddd/pkg/conc"
"github.com/ixugo/goddd/pkg/system"
"github.com/ixugo/netpulse/ip"
)
type MemoryStorer interface {
@@ -42,11 +42,11 @@ type Server struct {
memoryStorer MemoryStorer
}
func NewServer(cfg *conf.Bootstrap, store gb28181.GB28181, sc sms.Core) (*Server, func()) {
func NewServer(cfg *conf.Bootstrap, store gb28181.GBDAdapter, sc sms.Core) (*Server, func()) {
api := NewGB28181API(cfg, store, sc.NodeManager)
ip := system.LocalIP()
uri, _ := sip.ParseSipURI(fmt.Sprintf("sip:%s@%s:%d", cfg.Sip.ID, ip, cfg.Sip.Port))
iip := ip.InternalIP()
uri, _ := sip.ParseSipURI(fmt.Sprintf("sip:%s@%s:%d", cfg.Sip.ID, iip, cfg.Sip.Port))
from := sip.Address{
DisplayName: sip.String{Str: "gowvp"},
URI: &uri,
@@ -61,7 +61,6 @@ func NewServer(cfg *conf.Bootstrap, store gb28181.GB28181, sc sms.Core) (*Server
msg.Handle("DeviceInfo", api.sipMessageDeviceInfo)
msg.Handle("ConfigDownload", api.sipMessageConfigDownload)
msg.Handle("DeviceConfig", api.handleDeviceConfig)
// msg.Handle("RecordInfo", api.handlerMessage)
c := Server{
+19 -1
View File
@@ -30,7 +30,8 @@ type Context struct {
Log *slog.Logger
svr *Server
svr *Server
XGBVer string
}
func newContext(req *Request, tx *Transaction) *Context {
@@ -76,6 +77,23 @@ func (c *Context) parserRequest() error {
slog.Error(">>>>>>>> to is nil", "header", header)
}
xgbVer := req.GetHeaders("X-GB-Ver")
if len(xgbVer) > 0 {
h := xgbVer[0]
parts := strings.Split(h.String(), ":")
if len(parts) == 2 {
switch strings.TrimSpace(parts[1]) {
case "3.0":
c.XGBVer = "2022"
case "2.0":
c.XGBVer = "2016"
case "1.0":
c.XGBVer = "2011"
}
}
}
c.Log = slog.Default().With("deviceID", c.DeviceID, "host", c.Host)
return nil
}
+31
View File
@@ -26,6 +26,7 @@ type HeadersBuilder struct {
maxForwards *MaxForwards
allow *AllowHeader
supported *SupportedHeader
XGBVer *XGBVer
// recipient *URI
}
@@ -34,6 +35,7 @@ func NewHeaderBuilder() *HeadersBuilder {
callID := CallID(RandString(32))
maxForwards := MaxForwards(70)
userAgent := UserAgentHeader("GoWVP")
xgbVer := XGBVer("3.0")
return &HeadersBuilder{
protocol: "SIP",
protocolVersion: "2.0",
@@ -47,6 +49,7 @@ func NewHeaderBuilder() *HeadersBuilder {
generic: make(map[string]Header),
allow: defaultAllowMethods,
supported: &SupportedHeader{Options: []string{}},
XGBVer: &xgbVer,
}
}
@@ -89,6 +92,9 @@ func (hb *HeadersBuilder) Build() []Header {
if hb.contentType != nil {
hdrs = append(hdrs, hb.contentType)
}
if hb.XGBVer != nil {
hdrs = append(hdrs, hb.XGBVer)
}
// for _, header := range hb.generic {
// hdrs = append(hdrs, header)
@@ -96,6 +102,12 @@ func (hb *HeadersBuilder) Build() []Header {
return hdrs
}
func (hb *HeadersBuilder) SetXGBVer() *HeadersBuilder {
s := XGBVer("3.0")
hb.XGBVer = &s
return hb
}
// SetMethod SetMethod
func (hb *HeadersBuilder) SetMethod(method string) *HeadersBuilder {
hb.method = method
@@ -1667,3 +1679,22 @@ func (header *GenericHeader) Equals(other interface{}) bool {
return false
}
type XGBVer string
func (ct XGBVer) String() string { return "X-GB-Ver: " + string(ct) }
// Name Name
func (ct XGBVer) Name() string { return "X-GB-Ver" }
// Clone Clone
func (ct XGBVer) Clone() Header { return &ct }
// Equals Equals
func (ct *XGBVer) Equals(other interface{}) bool {
h, ok := other.(XGBVer)
if !ok {
return false
}
return h.String() == ct.String()
}
+15 -14
View File
@@ -33,6 +33,7 @@ func NewRequest(
req.SetSipVersion(sipVersion)
req.startLine = req.StartLine
req.headers = newHeaders(hdrs)
req.SetMethod(method)
req.SetRecipient(recipient)
@@ -43,26 +44,26 @@ func NewRequest(
}
// NewRequestFromResponse NewRequestFromResponse
func NewRequestFromResponse(method string, inviteResponse *Response) *Request {
contact, _ := inviteResponse.Contact()
func NewRequestFromResponse(method string, resp *Response) *Request {
contact, _ := resp.Contact()
ackRequest := NewRequest(
inviteResponse.MessageID(),
resp.MessageID(),
method,
contact.Address,
inviteResponse.SipVersion(),
resp.SipVersion(),
[]Header{},
[]byte{},
)
CopyHeaders("Via", inviteResponse, ackRequest)
CopyHeaders("Via", resp, ackRequest)
viaHop, _ := ackRequest.ViaHop()
// update branch, 2xx ACK is separate Tx
viaHop.Params.Add("branch", String{Str: GenerateBranch()})
if len(inviteResponse.GetHeaders("Route")) > 0 {
CopyHeaders("Route", inviteResponse, ackRequest)
if len(resp.GetHeaders("Route")) > 0 {
CopyHeaders("Route", resp, ackRequest)
} else {
for _, h := range inviteResponse.GetHeaders("Record-Route") {
for _, h := range resp.GetHeaders("Record-Route") {
uris := make([]*URI, 0)
for _, u := range h.(*RecordRouteHeader).Addresses {
uris = append(uris, u.Clone())
@@ -73,10 +74,10 @@ func NewRequestFromResponse(method string, inviteResponse *Response) *Request {
}
}
CopyHeaders("From", inviteResponse, ackRequest)
CopyHeaders("To", inviteResponse, ackRequest)
CopyHeaders("Call-ID", inviteResponse, ackRequest)
cseq, _ := inviteResponse.CSeq()
CopyHeaders("From", resp, ackRequest)
CopyHeaders("To", resp, ackRequest)
CopyHeaders("Call-ID", resp, ackRequest)
cseq, _ := resp.CSeq()
cseq.MethodName = method
// https://www.rfc-editor.org/rfc/rfc3261.html#section-12.2.1.1
@@ -92,8 +93,8 @@ func NewRequestFromResponse(method string, inviteResponse *Response) *Request {
cseq.SeqNo++
}
ackRequest.AppendHeader(cseq)
ackRequest.SetSource(inviteResponse.Destination())
ackRequest.SetDestination(inviteResponse.Source())
ackRequest.SetSource(resp.Destination())
ackRequest.SetDestination(resp.Source())
return ackRequest
}
+15 -5
View File
@@ -34,9 +34,16 @@ func NewResponseFromRequest(
CopyHeaders("Record-Route", req, res)
CopyHeaders("Via", req, res)
CopyHeaders("From", req, res)
CopyHeaders("To", req, res)
CopyHeaders("Call-ID", req, res)
to, ok := req.To()
if ok {
if _, ok := to.Params.Get("tag"); !ok {
to.Params.Add("tag", String{Str: RandString(32)})
}
}
CopyHeaders("CSeq", req, res)
CopyHeaders("Call-ID", req, res)
res.AppendHeader(to)
if statusCode == 100 {
CopyHeaders("Timestamp", req, res)
@@ -45,9 +52,12 @@ func NewResponseFromRequest(
res.SetSource(req.Destination())
res.SetDestination(req.Source())
if len(body) > 0 {
res.SetBody(body, true)
}
res.SetBody(body, true)
res.AppendHeader(&GenericHeader{
HeaderName: "User-Agent",
Contents: "GoWVP/1.0",
})
return res
}
+11 -11
View File
@@ -64,18 +64,18 @@ var StreamList streamsList
func (g *GB28181API) getSSRC(t int) string {
r := false
for {
StreamList.ssrc++
// ssrc最大为四位数,超过时从1开始重新计算
if StreamList.ssrc > 9000 && !r {
StreamList.ssrc = 0
r = true
}
key := fmt.Sprintf("%d%s%04d", t, g.cfg.Domain[3:8], StreamList.ssrc)
// stream := Streams{StreamID: ssrc2stream(key), Stop: false}
// if err := db.Get(db.DBClient, &stream); db.RecordNotFound(err) || stream.CreatedAt == 0 {
return key
// for {
StreamList.ssrc++
// ssrc最大为四位数,超过时从1开始重新计算
if StreamList.ssrc > 9000 && !r {
StreamList.ssrc = 0
r = true
}
key := fmt.Sprintf("%d%s%04d", t, g.cfg.Domain[3:8], StreamList.ssrc)
// stream := Streams{StreamID: ssrc2stream(key), Stop: false}
// if err := db.Get(db.DBClient, &stream); db.RecordNotFound(err) || stream.CreatedAt == 0 {
return key
// }
}
// 定时检查未关闭的流