feature: support on invite request

This commit is contained in:
pg
2025-02-15 19:03:13 +08:00
committed by pggiroro
parent 84f4390834
commit 8ab2fa29d1
3 changed files with 285 additions and 1 deletions
+122 -1
View File
@@ -143,7 +143,7 @@ type GB28181ProPlugin struct {
m7s.Plugin
AutoInvite bool `default:"true" desc:"自动邀请"`
Serial string `default:"34020000002000000001" desc:"sip 服务 id"` //sip 服务器 id, 默认 34020000002000000001
Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000
Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000
Username string
Password string
Sip SipConfig
@@ -177,6 +177,8 @@ func (gb *GB28181ProPlugin) OnInit() (err error) {
gb.server.OnMessage(gb.OnMessage)
gb.server.OnBye(gb.OnBye)
gb.devices.L = new(sync.RWMutex)
gb.server.OnInvite(gb.OnInvite)
gb.server.OnAck(gb.OnAck)
if gb.MediaPort.Valid() {
gb.SetDescription("tcp", fmt.Sprintf("%d-%d", gb.MediaPort[0], gb.MediaPort[1]))
@@ -641,3 +643,122 @@ func (gb *GB28181ProPlugin) OnBye(req *sip.Request, tx sip.ServerTransaction) {
func (gb *GB28181ProPlugin) GetSerial() string {
return gb.Serial
}
func (gb *GB28181ProPlugin) OnInvite(req *sip.Request, tx sip.ServerTransaction) {
// 解析 INVITE 请求
inviteInfo, err := gb28181.DecodeSDP(req)
if err != nil {
gb.Error("OnInvite", "error", "decode sdp failed", "err", err.Error())
_ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusBadRequest, err.Error(), nil))
return
}
// 检查设备是否存在
d, ok := gb.devices.Get(inviteInfo.RequesterId)
if !ok {
gb.Error("OnInvite", "error", "device not found", "deviceId", inviteInfo.RequesterId)
_ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusNotFound, "Device Not Found", nil))
return
}
// 检查通道是否存在
_, ok = d.channels.Get(inviteInfo.TargetChannelId)
if !ok {
gb.Error("OnInvite", "error", "channel not found", "channelId", inviteInfo.TargetChannelId)
_ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusNotFound, "Channel Not Found", nil))
return
}
gb.Info("OnInvite", "action", "start", "deviceId", inviteInfo.RequesterId, "channelId", inviteInfo.TargetChannelId)
// 发送100 Trying响应
_ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusTrying, "Trying", nil))
// 获取媒体信息
mediaPort := uint16(0)
if gb.MediaPort.Valid() {
select {
case port := <-gb.tcpPorts:
mediaPort = port
gb.Debug("OnInvite", "action", "allocate port", "port", port)
default:
gb.Error("OnInvite", "error", "no available port")
_ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusServiceUnavailable, "No Available Port", nil))
return
}
} else {
mediaPort = gb.MediaPort[0]
gb.Debug("OnInvite", "action", "use default port", "port", mediaPort)
}
// 设置SSRC
ssrc := d.CreateSSRC(gb.Serial)
gb.Debug("OnInvite", "action", "create ssrc", "ssrc", ssrc)
// 构建SDP响应
sdpIP := d.LocalIP
if sdpIP == "" {
sdpIP = d.mediaIp
}
responseContent := []string{
"v=0",
fmt.Sprintf("o=%s 0 0 IN IP4 %s", gb.Serial, sdpIP),
"s=Play",
fmt.Sprintf("c=IN IP4 %s", sdpIP),
"t=0 0",
}
// 根据传输模式添加媒体行
var mediaLine string
switch strings.ToUpper(d.StreamMode) {
case "TCP-PASSIVE", "TCP-ACTIVE":
mediaLine = fmt.Sprintf("m=video %d TCP/RTP/AVP 96", mediaPort)
responseContent = append(responseContent, mediaLine)
if d.StreamMode == "TCP-PASSIVE" {
responseContent = append(responseContent, "a=setup:passive")
} else {
responseContent = append(responseContent, "a=setup:active")
}
responseContent = append(responseContent, "a=connection:new")
gb.Debug("OnInvite", "action", "create sdp", "mode", d.StreamMode)
default:
mediaLine = fmt.Sprintf("m=video %d RTP/AVP 96", mediaPort)
responseContent = append(responseContent, mediaLine)
gb.Debug("OnInvite", "action", "create sdp", "mode", "UDP")
}
responseContent = append(responseContent,
"a=recvonly",
"a=rtpmap:96 PS/90000",
fmt.Sprintf("y=%010d", ssrc),
)
// 发送200 OK响应
response := sip.NewResponseFromRequest(req, sip.StatusOK, "OK", nil)
contentType := sip.ContentTypeHeader("application/sdp")
response.AppendHeader(&contentType)
response.SetBody([]byte(strings.Join(responseContent, "\r\n") + "\r\n"))
if err := tx.Respond(response); err != nil {
gb.Error("OnInvite", "error", "send response failed", "err", err.Error())
return
}
gb.Info("OnInvite", "action", "complete", "deviceId", inviteInfo.RequesterId, "channelId", inviteInfo.TargetChannelId,
"ip", inviteInfo.IP, "port", inviteInfo.Port, "tcp", inviteInfo.TCP, "tcpActive", inviteInfo.TCPActive)
// TODO: 实现媒体流处理
// 1. 创建合适的Publisher
// 2. 创建PS流接收器
// 3. 配置接收参数:
// - 使用解析出的 inviteInfo.IP 和 inviteInfo.Port 作为目标地址
// - 使用 inviteInfo.TCP 和 inviteInfo.TCPActive 确定传输模式
// - 使用本地分配的 mediaPort 作为监听端口
// 4. 启动接收任务
// 5. 处理媒体流解复用
// 6. 根据 inviteInfo.SessionName 判断是实时点播还是历史回放
// - 如果是回放,使用 inviteInfo.StartTime 和 inviteInfo.StopTime
// 7. 使用 inviteInfo.SSRC 标识流
// 8. 如果指定了 inviteInfo.DownloadSpeed,控制下载速度
}
+36
View File
@@ -0,0 +1,36 @@
package gb28181
// InviteInfo 从INVITE消息中解析需要的信息
type InviteInfo struct {
// 请求者ID
RequesterId string `json:"requesterId"`
// 目标通道ID
TargetChannelId string `json:"targetChannelId"`
// 源通道ID
SourceChannelId string `json:"sourceChannelId"`
// 会话名称
SessionName string `json:"sessionName"`
// SSRC
SSRC string `json:"ssrc"`
// 是否使用TCP
TCP bool `json:"tcp"`
// TCP是否为主动模式
TCPActive bool `json:"tcpActive"`
// 呼叫ID
CallId string `json:"callId"`
// 开始时间
StartTime int64 `json:"startTime"`
// 结束时间
StopTime int64 `json:"stopTime"`
// 下载速度
DownloadSpeed string `json:"downloadSpeed"`
// IP地址
IP string `json:"ip"`
// 端口
Port int `json:"port"`
}
// NewInviteInfo 创建一个新的 InviteInfo 实例
func NewInviteInfo() *InviteInfo {
return &InviteInfo{}
}
+127
View File
@@ -0,0 +1,127 @@
package gb28181
import (
"fmt"
"strconv"
"strings"
"github.com/emiago/sipgo/sip"
)
// DecodeSDP 从 SIP 请求中解析 SDP 信息
func DecodeSDP(req *sip.Request) (*InviteInfo, error) {
inviteInfo := NewInviteInfo()
// 获取请求者ID
from := req.From()
if from == nil || from.Address.User == "" {
return nil, fmt.Errorf("无法从请求中获取来源id")
}
inviteInfo.RequesterId = from.Address.User
// 获取目标通道ID
channelIDArray := getChannelIDFromRequest(req)
if channelIDArray == nil {
return nil, fmt.Errorf("无法从请求中获取通道id")
}
inviteInfo.TargetChannelId = channelIDArray[0]
if len(channelIDArray) == 2 {
inviteInfo.SourceChannelId = channelIDArray[1]
}
// 获取CallID
callID := req.CallID()
if callID != nil {
inviteInfo.CallId = callID.Value()
}
// 解析SDP消息
sdpStr := string(req.Body())
if sdpStr == "" {
return nil, fmt.Errorf("SDP内容为空")
}
// 解析SDP各个字段
lines := strings.Split(sdpStr, "\r\n")
var mediaDesc []string
for _, line := range lines {
if line == "" {
continue
}
switch {
case strings.HasPrefix(line, "s="):
inviteInfo.SessionName = strings.TrimPrefix(line, "s=")
case strings.HasPrefix(line, "c="):
// c=IN IP4 192.168.1.100
parts := strings.Split(line, " ")
if len(parts) >= 3 {
inviteInfo.IP = parts[2]
}
case strings.HasPrefix(line, "t="):
// t=开始时间 结束时间
parts := strings.Split(strings.TrimPrefix(line, "t="), " ")
if len(parts) >= 2 {
startTime, err := strconv.ParseInt(parts[0], 10, 64)
if err == nil {
inviteInfo.StartTime = startTime
}
stopTime, err := strconv.ParseInt(parts[1], 10, 64)
if err == nil {
inviteInfo.StopTime = stopTime
}
}
case strings.HasPrefix(line, "m="):
mediaDesc = strings.Split(strings.TrimPrefix(line, "m="), " ")
if len(mediaDesc) >= 3 {
port, err := strconv.Atoi(mediaDesc[1])
if err == nil {
inviteInfo.Port = port
}
// 检查传输协议
if strings.Contains(mediaDesc[2], "TCP") {
inviteInfo.TCP = true
}
}
case strings.HasPrefix(line, "a=setup:"):
if strings.Contains(line, "active") {
inviteInfo.TCPActive = true
} else if strings.Contains(line, "passive") {
inviteInfo.TCPActive = false
}
case strings.HasPrefix(line, "y="):
inviteInfo.SSRC = strings.TrimPrefix(line, "y=")
case strings.HasPrefix(line, "a=downloadspeed:"):
inviteInfo.DownloadSpeed = strings.TrimPrefix(line, "a=downloadspeed:")
}
}
return inviteInfo, nil
}
// getChannelIDFromRequest 从请求中获取通道ID
func getChannelIDFromRequest(req *sip.Request) []string {
to := req.To()
if to == nil {
return nil
}
channelID := to.Address.User
if channelID == "" {
return nil
}
// 检查是否包含源通道ID
if strings.Contains(channelID, "@") {
return strings.Split(channelID, "@")
}
return []string{channelID}
}