From 8ab2fa29d1beb870bd966c0ea5bbfd5031e18a34 Mon Sep 17 00:00:00 2001 From: pg Date: Sat, 15 Feb 2025 19:03:13 +0800 Subject: [PATCH] feature: support on invite request --- plugin/gb28181pro/index.go | 123 +++++++++++++++++++++++++- plugin/gb28181pro/pkg/invite_info.go | 36 ++++++++ plugin/gb28181pro/pkg/sdputil.go | 127 +++++++++++++++++++++++++++ 3 files changed, 285 insertions(+), 1 deletion(-) create mode 100644 plugin/gb28181pro/pkg/invite_info.go create mode 100644 plugin/gb28181pro/pkg/sdputil.go diff --git a/plugin/gb28181pro/index.go b/plugin/gb28181pro/index.go index dd956c0..32991f1 100644 --- a/plugin/gb28181pro/index.go +++ b/plugin/gb28181pro/index.go @@ -143,7 +143,7 @@ type GB28181ProPlugin struct { m7s.Plugin AutoInvite bool `default:"true" desc:"自动邀请"` Serial string `default:"34020000002000000001" desc:"sip 服务 id"` //sip 服务器 id, 默认 34020000002000000001 - Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000 + Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000 Username string Password string Sip SipConfig @@ -177,6 +177,8 @@ func (gb *GB28181ProPlugin) OnInit() (err error) { gb.server.OnMessage(gb.OnMessage) gb.server.OnBye(gb.OnBye) gb.devices.L = new(sync.RWMutex) + gb.server.OnInvite(gb.OnInvite) + gb.server.OnAck(gb.OnAck) if gb.MediaPort.Valid() { gb.SetDescription("tcp", fmt.Sprintf("%d-%d", gb.MediaPort[0], gb.MediaPort[1])) @@ -641,3 +643,122 @@ func (gb *GB28181ProPlugin) OnBye(req *sip.Request, tx sip.ServerTransaction) { func (gb *GB28181ProPlugin) GetSerial() string { return gb.Serial } + +func (gb *GB28181ProPlugin) OnInvite(req *sip.Request, tx sip.ServerTransaction) { + // 解析 INVITE 请求 + inviteInfo, err := gb28181.DecodeSDP(req) + if err != nil { + gb.Error("OnInvite", "error", "decode sdp failed", "err", err.Error()) + _ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusBadRequest, err.Error(), nil)) + return + } + + // 检查设备是否存在 + d, ok := gb.devices.Get(inviteInfo.RequesterId) + if !ok { + gb.Error("OnInvite", "error", "device not found", "deviceId", inviteInfo.RequesterId) + _ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusNotFound, "Device Not Found", nil)) + return + } + + // 检查通道是否存在 + _, ok = d.channels.Get(inviteInfo.TargetChannelId) + if !ok { + gb.Error("OnInvite", "error", "channel not found", "channelId", inviteInfo.TargetChannelId) + _ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusNotFound, "Channel Not Found", nil)) + return + } + + gb.Info("OnInvite", "action", "start", "deviceId", inviteInfo.RequesterId, "channelId", inviteInfo.TargetChannelId) + + // 发送100 Trying响应 + _ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusTrying, "Trying", nil)) + + // 获取媒体信息 + mediaPort := uint16(0) + if gb.MediaPort.Valid() { + select { + case port := <-gb.tcpPorts: + mediaPort = port + gb.Debug("OnInvite", "action", "allocate port", "port", port) + default: + gb.Error("OnInvite", "error", "no available port") + _ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusServiceUnavailable, "No Available Port", nil)) + return + } + } else { + mediaPort = gb.MediaPort[0] + gb.Debug("OnInvite", "action", "use default port", "port", mediaPort) + } + + // 设置SSRC + ssrc := d.CreateSSRC(gb.Serial) + gb.Debug("OnInvite", "action", "create ssrc", "ssrc", ssrc) + + // 构建SDP响应 + sdpIP := d.LocalIP + if sdpIP == "" { + sdpIP = d.mediaIp + } + + responseContent := []string{ + "v=0", + fmt.Sprintf("o=%s 0 0 IN IP4 %s", gb.Serial, sdpIP), + "s=Play", + fmt.Sprintf("c=IN IP4 %s", sdpIP), + "t=0 0", + } + + // 根据传输模式添加媒体行 + var mediaLine string + switch strings.ToUpper(d.StreamMode) { + case "TCP-PASSIVE", "TCP-ACTIVE": + mediaLine = fmt.Sprintf("m=video %d TCP/RTP/AVP 96", mediaPort) + responseContent = append(responseContent, mediaLine) + if d.StreamMode == "TCP-PASSIVE" { + responseContent = append(responseContent, "a=setup:passive") + } else { + responseContent = append(responseContent, "a=setup:active") + } + responseContent = append(responseContent, "a=connection:new") + gb.Debug("OnInvite", "action", "create sdp", "mode", d.StreamMode) + default: + mediaLine = fmt.Sprintf("m=video %d RTP/AVP 96", mediaPort) + responseContent = append(responseContent, mediaLine) + gb.Debug("OnInvite", "action", "create sdp", "mode", "UDP") + } + + responseContent = append(responseContent, + "a=recvonly", + "a=rtpmap:96 PS/90000", + fmt.Sprintf("y=%010d", ssrc), + ) + + // 发送200 OK响应 + response := sip.NewResponseFromRequest(req, sip.StatusOK, "OK", nil) + contentType := sip.ContentTypeHeader("application/sdp") + response.AppendHeader(&contentType) + response.SetBody([]byte(strings.Join(responseContent, "\r\n") + "\r\n")) + + if err := tx.Respond(response); err != nil { + gb.Error("OnInvite", "error", "send response failed", "err", err.Error()) + return + } + + gb.Info("OnInvite", "action", "complete", "deviceId", inviteInfo.RequesterId, "channelId", inviteInfo.TargetChannelId, + "ip", inviteInfo.IP, "port", inviteInfo.Port, "tcp", inviteInfo.TCP, "tcpActive", inviteInfo.TCPActive) + + // TODO: 实现媒体流处理 + // 1. 创建合适的Publisher + // 2. 创建PS流接收器 + // 3. 配置接收参数: + // - 使用解析出的 inviteInfo.IP 和 inviteInfo.Port 作为目标地址 + // - 使用 inviteInfo.TCP 和 inviteInfo.TCPActive 确定传输模式 + // - 使用本地分配的 mediaPort 作为监听端口 + // 4. 启动接收任务 + // 5. 处理媒体流解复用 + // 6. 根据 inviteInfo.SessionName 判断是实时点播还是历史回放 + // - 如果是回放,使用 inviteInfo.StartTime 和 inviteInfo.StopTime + // 7. 使用 inviteInfo.SSRC 标识流 + // 8. 如果指定了 inviteInfo.DownloadSpeed,控制下载速度 +} diff --git a/plugin/gb28181pro/pkg/invite_info.go b/plugin/gb28181pro/pkg/invite_info.go new file mode 100644 index 0000000..f519b9f --- /dev/null +++ b/plugin/gb28181pro/pkg/invite_info.go @@ -0,0 +1,36 @@ +package gb28181 + +// InviteInfo 从INVITE消息中解析需要的信息 +type InviteInfo struct { + // 请求者ID + RequesterId string `json:"requesterId"` + // 目标通道ID + TargetChannelId string `json:"targetChannelId"` + // 源通道ID + SourceChannelId string `json:"sourceChannelId"` + // 会话名称 + SessionName string `json:"sessionName"` + // SSRC + SSRC string `json:"ssrc"` + // 是否使用TCP + TCP bool `json:"tcp"` + // TCP是否为主动模式 + TCPActive bool `json:"tcpActive"` + // 呼叫ID + CallId string `json:"callId"` + // 开始时间 + StartTime int64 `json:"startTime"` + // 结束时间 + StopTime int64 `json:"stopTime"` + // 下载速度 + DownloadSpeed string `json:"downloadSpeed"` + // IP地址 + IP string `json:"ip"` + // 端口 + Port int `json:"port"` +} + +// NewInviteInfo 创建一个新的 InviteInfo 实例 +func NewInviteInfo() *InviteInfo { + return &InviteInfo{} +} diff --git a/plugin/gb28181pro/pkg/sdputil.go b/plugin/gb28181pro/pkg/sdputil.go new file mode 100644 index 0000000..07b9085 --- /dev/null +++ b/plugin/gb28181pro/pkg/sdputil.go @@ -0,0 +1,127 @@ +package gb28181 + +import ( + "fmt" + "strconv" + "strings" + + "github.com/emiago/sipgo/sip" +) + +// DecodeSDP 从 SIP 请求中解析 SDP 信息 +func DecodeSDP(req *sip.Request) (*InviteInfo, error) { + inviteInfo := NewInviteInfo() + + // 获取请求者ID + from := req.From() + if from == nil || from.Address.User == "" { + return nil, fmt.Errorf("无法从请求中获取来源id") + } + inviteInfo.RequesterId = from.Address.User + + // 获取目标通道ID + channelIDArray := getChannelIDFromRequest(req) + if channelIDArray == nil { + return nil, fmt.Errorf("无法从请求中获取通道id") + } + inviteInfo.TargetChannelId = channelIDArray[0] + if len(channelIDArray) == 2 { + inviteInfo.SourceChannelId = channelIDArray[1] + } + + // 获取CallID + callID := req.CallID() + if callID != nil { + inviteInfo.CallId = callID.Value() + } + + // 解析SDP消息 + sdpStr := string(req.Body()) + if sdpStr == "" { + return nil, fmt.Errorf("SDP内容为空") + } + + // 解析SDP各个字段 + lines := strings.Split(sdpStr, "\r\n") + var mediaDesc []string + + for _, line := range lines { + if line == "" { + continue + } + + switch { + case strings.HasPrefix(line, "s="): + inviteInfo.SessionName = strings.TrimPrefix(line, "s=") + + case strings.HasPrefix(line, "c="): + // c=IN IP4 192.168.1.100 + parts := strings.Split(line, " ") + if len(parts) >= 3 { + inviteInfo.IP = parts[2] + } + + case strings.HasPrefix(line, "t="): + // t=开始时间 结束时间 + parts := strings.Split(strings.TrimPrefix(line, "t="), " ") + if len(parts) >= 2 { + startTime, err := strconv.ParseInt(parts[0], 10, 64) + if err == nil { + inviteInfo.StartTime = startTime + } + stopTime, err := strconv.ParseInt(parts[1], 10, 64) + if err == nil { + inviteInfo.StopTime = stopTime + } + } + + case strings.HasPrefix(line, "m="): + mediaDesc = strings.Split(strings.TrimPrefix(line, "m="), " ") + if len(mediaDesc) >= 3 { + port, err := strconv.Atoi(mediaDesc[1]) + if err == nil { + inviteInfo.Port = port + } + // 检查传输协议 + if strings.Contains(mediaDesc[2], "TCP") { + inviteInfo.TCP = true + } + } + + case strings.HasPrefix(line, "a=setup:"): + if strings.Contains(line, "active") { + inviteInfo.TCPActive = true + } else if strings.Contains(line, "passive") { + inviteInfo.TCPActive = false + } + + case strings.HasPrefix(line, "y="): + inviteInfo.SSRC = strings.TrimPrefix(line, "y=") + + case strings.HasPrefix(line, "a=downloadspeed:"): + inviteInfo.DownloadSpeed = strings.TrimPrefix(line, "a=downloadspeed:") + } + } + + return inviteInfo, nil +} + +// getChannelIDFromRequest 从请求中获取通道ID +func getChannelIDFromRequest(req *sip.Request) []string { + to := req.To() + if to == nil { + return nil + } + + channelID := to.Address.User + if channelID == "" { + return nil + } + + // 检查是否包含源通道ID + if strings.Contains(channelID, "@") { + return strings.Split(channelID, "@") + } + + return []string{channelID} +}