diff --git a/api.go b/api.go
index 30378b3..5983116 100644
--- a/api.go
+++ b/api.go
@@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
+ "gb-cms/hook"
"github.com/ghettovoice/gosip/sip"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
@@ -31,7 +32,7 @@ type InviteParams struct {
type StreamParams struct {
Stream StreamID `json:"stream"` // Source
- Protocol string `json:"protocol"` // 推拉流协议
+ Protocol int `json:"protocol"` // 推拉流协议
RemoteAddr string `json:"remote_addr"` // peer地址
}
@@ -161,8 +162,8 @@ func startApiServer(addr string) {
apiServer.router.HandleFunc("/api/v1/ptz/control", apiServer.OnPTZControl) // 云台控制
apiServer.router.HandleFunc("/api/v1/platform/list", apiServer.OnPlatformList) // 级联设备列表
- apiServer.router.HandleFunc("/api/v1/platform/add", withJsonResponse(apiServer.OnPlatformAdd, &SIPUAParams{})) // 添加级联设备
- apiServer.router.HandleFunc("/api/v1/platform/remove", withJsonResponse(apiServer.OnPlatformRemove, &SIPUAParams{})) // 删除级联设备
+ apiServer.router.HandleFunc("/api/v1/platform/add", withJsonResponse(apiServer.OnPlatformAdd, &PlatformModel{})) // 添加级联设备
+ apiServer.router.HandleFunc("/api/v1/platform/remove", withJsonResponse(apiServer.OnPlatformRemove, &PlatformModel{})) // 删除级联设备
apiServer.router.HandleFunc("/api/v1/platform/channel/bind", withJsonResponse(apiServer.OnPlatformChannelBind, &PlatformChannel{})) // 级联绑定通道
apiServer.router.HandleFunc("/api/v1/platform/channel/unbind", withJsonResponse(apiServer.OnPlatformChannelUnbind, &PlatformChannel{})) // 级联解绑通道
@@ -170,6 +171,14 @@ func startApiServer(addr string) {
apiServer.router.HandleFunc("/api/v1/broadcast/hangup", withJsonResponse(apiServer.OnHangup, &BroadcastParams{})) // 挂断广播会话
apiServer.router.HandleFunc("/api/v1/talk", apiServer.OnTalk) // 语音对讲
+ apiServer.router.HandleFunc("/api/v1/jt/device/add", withJsonResponse(apiServer.OnVirtualDeviceAdd, &JTDeviceModel{}))
+ apiServer.router.HandleFunc("/api/v1/jt/device/edit", withJsonResponse(apiServer.OnVirtualDeviceEdit, &JTDeviceModel{}))
+ apiServer.router.HandleFunc("/api/v1/jt/device/remove", withJsonResponse(apiServer.OnVirtualDeviceRemove, &JTDeviceModel{}))
+
+ apiServer.router.HandleFunc("/api/v1/jt/channel/add", withJsonResponse(apiServer.OnVirtualChannelAdd, &Channel{}))
+ apiServer.router.HandleFunc("/api/v1/jt/channel/edit", withJsonResponse(apiServer.OnVirtualChannelEdit, &Channel{}))
+ apiServer.router.HandleFunc("/api/v1/jt/channel/remove", withJsonResponse(apiServer.OnVirtualChannelRemove, &Channel{}))
+
http.Handle("/", apiServer.router)
srv := &http.Server{
@@ -201,9 +210,13 @@ func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *htt
//ffplay -i rtmp://127.0.0.1/34020000001320000001/34020000001310000001.session_id_0?setup=passive"&"stream_type=playback"&"start_time=2024-06-18T15:20:56"&"end_time=2024-06-18T15:25:56
//ffplay -i rtmp://127.0.0.1/34020000001320000001/34020000001310000001.session_id_0?setup=passive&stream_type=playback&start_time=2024-06-18T15:20:56&end_time=2024-06-18T15:25:56
+ // 拉流地址携带的参数
+ query := r.URL.Query()
+ jtSource := query.Get("forward_type") == "gateway_1078"
+
// 跳过非国标拉流
sourceStream := strings.Split(string(params.Stream), "/")
- if len(sourceStream) != 2 || len(sourceStream[0]) != 20 || len(sourceStream[1]) < 20 {
+ if !jtSource && (len(sourceStream) != 2 || len(sourceStream[0]) != 20 || len(sourceStream[1]) < 20) {
Sugar.Infof("跳过非国标拉流 stream: %s", params.Stream)
return
}
@@ -211,6 +224,7 @@ func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *htt
// 已经存在,累加计数
if stream, _ := StreamDao.QueryStream(params.Stream); stream != nil {
stream.IncreaseSinkCount()
+ return
}
deviceId := sourceStream[0]
@@ -219,35 +233,52 @@ func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *htt
channelId = channelId[:20]
}
- // 发起invite的参数
- query := r.URL.Query()
- inviteParams := &InviteParams{
- DeviceID: deviceId,
- ChannelID: channelId,
- StartTime: query.Get("start_time"),
- EndTime: query.Get("end_time"),
- Setup: strings.ToLower(query.Get("setup")),
- Speed: query.Get("speed"),
- streamId: params.Stream,
- }
-
var code int
- var stream *Stream
- var err error
- streamType := strings.ToLower(query.Get("stream_type"))
- if "playback" == streamType {
- code, stream, err = api.DoInvite(InviteTypePlay, inviteParams, false, w, r)
- } else if "download" == streamType {
- code, stream, err = api.DoInvite(InviteTypeDownload, inviteParams, false, w, r)
- } else {
- code, stream, err = api.DoInvite(InviteTypePlay, inviteParams, false, w, r)
- }
+ // 通知1078信令服务器
+ if jtSource {
+ if len(sourceStream) != 2 {
+ code = http.StatusBadRequest
+ Sugar.Errorf("1078信令服务器转发请求参数错误")
+ return
+ }
- if err != nil {
- Sugar.Errorf("请求流失败 err: %s", err.Error())
- utils.Assert(http.StatusOK != code)
- } else if http.StatusOK == code {
- stream.IncreaseSinkCount()
+ simNumber := sourceStream[0]
+ channelNumber := sourceStream[1]
+ response, err := hook.PostOnInviteEvent(simNumber, channelNumber)
+ if err != nil {
+ code = http.StatusInternalServerError
+ Sugar.Errorf("通知1078信令服务器失败 err: %s sim number: %s channel number: %s", err.Error(), simNumber, channelNumber)
+ } else if code = response.StatusCode; code != http.StatusOK {
+ Sugar.Errorf("通知1078信令服务器失败. 响应状态码: %d sim number: %s channel number: %s", response.StatusCode, simNumber, channelNumber)
+ }
+ } else {
+ inviteParams := &InviteParams{
+ DeviceID: deviceId,
+ ChannelID: channelId,
+ StartTime: query.Get("start_time"),
+ EndTime: query.Get("end_time"),
+ Setup: strings.ToLower(query.Get("setup")),
+ Speed: query.Get("speed"),
+ streamId: params.Stream,
+ }
+
+ var stream *Stream
+ var err error
+ streamType := strings.ToLower(query.Get("stream_type"))
+ if "playback" == streamType {
+ code, stream, err = api.DoInvite(InviteTypePlay, inviteParams, false, w, r)
+ } else if "download" == streamType {
+ code, stream, err = api.DoInvite(InviteTypeDownload, inviteParams, false, w, r)
+ } else {
+ code, stream, err = api.DoInvite(InviteTypePlay, inviteParams, false, w, r)
+ }
+
+ if err != nil {
+ Sugar.Errorf("请求流失败 err: %s", err.Error())
+ utils.Assert(http.StatusOK != code)
+ } else if http.StatusOK == code {
+ stream.IncreaseSinkCount()
+ }
}
w.WriteHeader(code)
@@ -256,63 +287,50 @@ func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *htt
func (api *ApiServer) OnPlayDone(params *PlayDoneParams, w http.ResponseWriter, r *http.Request) {
Sugar.Infof("播放结束事件. protocol: %s stream: %s", params.Protocol, params.Stream)
- //stream := StreamManager.Find(params.StreamID)
- //if stream == nil {
- // Sugar.Errorf("处理播放结束事件失败, stream不存在. id: %s", params.StreamID)
- // return
- //}
-
- //if 0 == stream.DecreaseSinkCount() && Config.AutoCloseOnIdle {
- // CloseStream(params.StreamID, true)
- //}
-
- if !strings.HasPrefix(params.Protocol, "gb") {
- return
- }
-
sink := RemoveForwardSink(params.Stream, params.Sink)
if sink == nil {
- Sugar.Errorf("处理转发结束事件失败, 找不到sink. stream: %s sink: %s", params.Stream, params.Sink)
return
}
// 级联断开连接, 向上级发送Bye请求
- if params.Protocol == "gb_cascaded_forward" {
+ if params.Protocol == TransStreamGBCascaded {
if platform := PlatformManager.Find(sink.ServerAddr); platform != nil {
callID, _ := sink.Dialog.CallID()
- platform.CloseStream(callID.Value(), true, false)
+ platform.(*Platform).CloseStream(callID.Value(), true, false)
}
- } else if params.Protocol == "gb_talk_forward" {
- // 对讲设备断开连接
+ } else {
+ sink.Close(true, false)
}
-
- sink.Close(true, false)
}
func (api *ApiServer) OnPublish(params *StreamParams, w http.ResponseWriter, r *http.Request) {
Sugar.Infof("推流事件. protocol: %s stream: %s", params.Protocol, params.Stream)
- stream := Dialogs.Find(string(params.Stream))
+ if SourceTypeRtmp == params.Protocol {
+ return
+ }
+
+ stream := EarlyDialogs.Find(string(params.Stream))
if stream != nil {
stream.Put(200)
}
// 对讲websocket已连接
// 创建stream
- if "gb_talk" == params.Protocol {
+ if params.Protocol == SourceTypeGBTalk {
Sugar.Infof("对讲websocket已连接, stream: %s", params.Stream)
+ }
- s := &Stream{
- StreamID: params.Stream,
- Protocol: params.Protocol,
- }
+ s := &Stream{
+ StreamID: params.Stream,
+ Protocol: params.Protocol,
+ }
- _, ok := StreamDao.SaveStream(s)
- if !ok {
- Sugar.Errorf("处理推流事件失败, stream已存在. id: %s", params.Stream)
- w.WriteHeader(http.StatusBadRequest)
- return
- }
+ _, ok := StreamDao.SaveStream(s)
+ if !ok {
+ Sugar.Errorf("处理推流事件失败, stream已存在. id: %s", params.Stream)
+ w.WriteHeader(http.StatusBadRequest)
+ return
}
}
@@ -321,7 +339,7 @@ func (api *ApiServer) OnPublishDone(params *StreamParams, w http.ResponseWriter,
CloseStream(params.Stream, false)
// 对讲websocket断开连接
- if "gb_talk" == params.Protocol {
+ if SourceTypeGBTalk == params.Protocol {
}
}
@@ -330,7 +348,7 @@ func (api *ApiServer) OnIdleTimeout(params *StreamParams, w http.ResponseWriter,
Sugar.Infof("推流空闲超时事件. protocol: %s stream: %s", params.Protocol, params.Stream)
// 非rtmp空闲超时, 返回非200应答, 删除会话
- if params.Protocol != "rtmp" {
+ if SourceTypeRtmp != params.Protocol {
w.WriteHeader(http.StatusForbidden)
CloseStream(params.Stream, false)
}
@@ -340,7 +358,7 @@ func (api *ApiServer) OnReceiveTimeout(params *StreamParams, w http.ResponseWrit
Sugar.Infof("收流超时事件. protocol: %s stream: %s", params.Protocol, params.Stream)
// 非rtmp推流超时, 返回非200应答, 删除会话
- if params.Protocol != "rtmp" {
+ if SourceTypeRtmp != params.Protocol {
w.WriteHeader(http.StatusForbidden)
CloseStream(params.Stream, false)
}
@@ -576,7 +594,7 @@ func (api *ApiServer) OnSeekPlayback(v *SeekParams, w http.ResponseWriter, r *ht
seekRequest.RemoveHeader(RtspMessageType.Name())
seekRequest.AppendHeader(&RtspMessageType)
- SipUA.SendRequest(seekRequest)
+ SipStack.SendRequest(seekRequest)
return nil, nil
}
@@ -605,7 +623,7 @@ func (api *ApiServer) OnBroadcast(v *BroadcastParams, w http.ResponseWriter, r *
defer func() {
if !ok {
if InviteSourceId != "" {
- Dialogs.Remove(InviteSourceId)
+ EarlyDialogs.Remove(InviteSourceId)
}
if sinkStreamId != "" {
@@ -642,7 +660,7 @@ func (api *ApiServer) OnBroadcast(v *BroadcastParams, w http.ResponseWriter, r *
sink := &Sink{
StreamID: v.StreamId,
SinkStreamID: sinkStreamId,
- Protocol: "gb_talk_forward",
+ Protocol: "gb_talk",
CreateTime: time.Now().Unix(),
SetupType: setupType,
}
@@ -651,7 +669,7 @@ func (api *ApiServer) OnBroadcast(v *BroadcastParams, w http.ResponseWriter, r *
if err := SinkDao.SaveForwardSink(v.StreamId, sink); err != nil {
Sugar.Errorf("广播失败, 设备正在广播中. stream: %s", sinkStreamId)
return nil, fmt.Errorf("设备正在广播中")
- } else if _, ok = Dialogs.Add(InviteSourceId, streamWaiting); !ok {
+ } else if _, ok = EarlyDialogs.Add(InviteSourceId, streamWaiting); !ok {
Sugar.Errorf("广播失败, id冲突. id: %s", InviteSourceId)
return nil, fmt.Errorf("id冲突")
}
@@ -682,7 +700,8 @@ func (api *ApiServer) OnBroadcast(v *BroadcastParams, w http.ResponseWriter, r *
Sugar.Errorf("广播失败, 下级设备invite失败. stream: %s", sinkStreamId)
return nil, fmt.Errorf("错误应答 code: %d", code)
} else {
- ok = AddForwardSink(v.StreamId, sink)
+ //ok = AddForwardSink(v.StreamId, sink)
+ ok = true
}
break
case <-cancel.Done():
@@ -712,7 +731,7 @@ func (api *ApiServer) OnStarted(w http.ResponseWriter, req *http.Request) {
}
}
-func (api *ApiServer) OnPlatformAdd(v *SIPUAParams, w http.ResponseWriter, r *http.Request) (interface{}, error) {
+func (api *ApiServer) OnPlatformAdd(v *PlatformModel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
Sugar.Infof("添加级联设备 %v", *v)
if v.Username == "" {
@@ -731,27 +750,32 @@ func (api *ApiServer) OnPlatformAdd(v *SIPUAParams, w http.ResponseWriter, r *ht
}
v.Status = "OFF"
-
- platform, err := NewGBPlatform(v, SipUA)
- if err == nil {
- err = AddPlatform(platform)
+ platform, err := NewPlatform(&v.SIPUAOptions, SipStack)
+ if err != nil {
+ Sugar.Errorf("创建级联设备失败 err: %s", err.Error())
+ return nil, err
}
- if err == nil {
- platform.Start()
+ if !PlatformManager.Add(v.ServerAddr, platform) {
+ Sugar.Errorf("ua添加失败, id冲突. key: %s", v.ServerAddr)
+ return fmt.Errorf("ua添加失败, id冲突. key: %s", v.ServerAddr), nil
+ } else if err = PlatformDao.SavePlatform(v); err != nil {
+ PlatformManager.Remove(v.ServerAddr)
+ Sugar.Errorf("保存级联设备失败 err: %s", err.Error())
+ return nil, err
}
+ platform.Start()
return nil, err
}
-func (api *ApiServer) OnPlatformRemove(v *SIPUAParams, w http.ResponseWriter, r *http.Request) (interface{}, error) {
+func (api *ApiServer) OnPlatformRemove(v *PlatformModel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
Sugar.Infof("删除级联设备 %v", *v)
- platform, err := RemovePlatform(v.ServerAddr)
+ err := PlatformDao.DeleteUAByAddr(v.ServerAddr)
if err != nil {
- Sugar.Errorf("删除级联设备失败 err: %s", err.Error())
return nil, err
- } else if platform != nil {
+ } else if platform := PlatformManager.Remove(v.ServerAddr); platform != nil {
platform.Stop()
}
@@ -759,8 +783,8 @@ func (api *ApiServer) OnPlatformRemove(v *SIPUAParams, w http.ResponseWriter, r
}
func (api *ApiServer) OnPlatformList(w http.ResponseWriter, r *http.Request) {
- platforms := LoadPlatforms()
- httpResponseOK(w, platforms)
+ //platforms := LoadPlatforms()
+ //httpResponseOK(w, platforms)
}
func (api *ApiServer) OnPlatformChannelBind(v *PlatformChannel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
diff --git a/api_jt.go b/api_jt.go
new file mode 100644
index 0000000..203b53e
--- /dev/null
+++ b/api_jt.go
@@ -0,0 +1,116 @@
+package main
+
+import (
+ "fmt"
+ "net/http"
+)
+
+func (api *ApiServer) OnVirtualDeviceAdd(device *JTDeviceModel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
+ Sugar.Infof("add virtual device: %v", *device)
+
+ if len(device.Username) != 20 {
+ Sugar.Errorf("invalid username: %s", device.Username)
+ return nil, fmt.Errorf("invalid username: %s", device.Username)
+ } else if len(device.SeverID) != 20 {
+ Sugar.Errorf("invalid server id: %s", device.SeverID)
+ return nil, fmt.Errorf("invalid server id: %s", device.SeverID)
+ } else if device.SimNumber == "" {
+ // sim卡号必选项
+ Sugar.Errorf("sim number is required")
+ return nil, fmt.Errorf("sim number is required")
+ }
+
+ if JTDeviceDao.ExistDevice(device.Username, device.SimNumber) {
+ // 用户名或sim卡号已存在
+ Sugar.Errorf("username or sim number already exists")
+ return nil, fmt.Errorf("username or sim number already exists")
+ } else if DeviceDao.ExistDevice(device.Username) {
+ // 用户名与下级设备冲突
+ Sugar.Errorf("username already exists")
+ return nil, fmt.Errorf("username already exists")
+ }
+
+ jtDevice, err := NewJTDevice(device, SipStack)
+ if err != nil {
+ Sugar.Errorf("create virtual device failed: %s", err.Error())
+ return nil, err
+ }
+
+ if !JTDeviceManager.Add(device.Username, jtDevice) {
+ return nil, fmt.Errorf("ua添加失败, id冲突. key: %s", device.Username)
+ } else if err = JTDeviceDao.SaveDevice(device); err != nil {
+ JTDeviceManager.Remove(device.Username)
+ Sugar.Errorf("save device failed: %s", err.Error())
+ return nil, err
+ }
+
+ jtDevice.Start()
+
+ if err != nil {
+ Sugar.Errorf("add jt device failed: %s", err.Error())
+ return nil, err
+ }
+
+ return nil, nil
+}
+
+func (api *ApiServer) OnVirtualDeviceEdit(device *JTDeviceModel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
+
+ return nil, nil
+}
+
+func (api *ApiServer) OnVirtualDeviceRemove(device *JTDeviceModel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
+ err := JTDeviceDao.DeleteDevice(device.Username)
+ if err != nil {
+ return nil, err
+ } else if client := JTDeviceManager.Remove(device.Username); client != nil {
+ client.Stop()
+ }
+
+ return nil, nil
+}
+
+func (api *ApiServer) OnVirtualChannelAdd(channel *Channel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
+ Sugar.Infof("add virtual channel: %v", *channel)
+
+ device, err := JTDeviceDao.QueryDevice(channel.RootID)
+ if err != nil {
+ Sugar.Errorf("query jt device failed: %s device: %s ", err.Error(), channel.RootID)
+ return nil, err
+ }
+
+ if len(channel.DeviceID) != 20 {
+ Sugar.Errorf("invalid channel id: %s", channel.DeviceID)
+ return nil, fmt.Errorf("invalid channel id: %s", channel.DeviceID)
+ }
+
+ channel.ParentID = device.Username
+ channel.RootID = device.Username
+ channel.GroupID = device.Username
+ err = ChannelDao.SaveJTChannel(channel)
+ if err != nil {
+ Sugar.Errorf("save channel failed: %s", err.Error())
+ }
+ return nil, err
+}
+
+func (api *ApiServer) OnVirtualChannelEdit(channel *Channel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
+
+ return nil, nil
+}
+
+func (api *ApiServer) OnVirtualChannelRemove(channel *Channel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
+ Sugar.Infof("remove virtual channel: %v", *channel)
+
+ device, err := JTDeviceDao.QueryDevice(channel.RootID)
+ if err != nil {
+ Sugar.Errorf("query jt device failed: %s device: %s ", err.Error(), channel.RootID)
+ return nil, err
+ }
+
+ err = ChannelDao.DeleteChannel(device.Username, channel.DeviceID)
+ if err != nil {
+ Sugar.Errorf("delete channel failed: %s", err.Error())
+ }
+ return nil, err
+}
diff --git a/broadcast.go b/broadcast.go
index b9c0114..cbb7811 100644
--- a/broadcast.go
+++ b/broadcast.go
@@ -2,12 +2,8 @@ package main
import (
"fmt"
- "gb-cms/sdp"
"github.com/ghettovoice/gosip/sip"
- "net"
"net/http"
- "strconv"
- "strings"
)
const (
@@ -18,107 +14,50 @@ const (
"%s\r\n" +
"%s\r\n" +
"\r\n"
-
- AnswerFormat = "v=0\r\n" +
- "o=%s 0 0 IN IP4 %s\r\n" +
- "s=Play\r\n" +
- "c=IN IP4 %s\r\n" +
- "t=0 0\r\n" +
- "m=audio %d %s 8\r\n" +
- "a=sendonly\r\n" +
- "a=rtpmap:8 PCMA/8000\r\n"
)
-func findSetup(descriptor *sdp.SDP) SetupType {
- var tcp bool
- if descriptor.Audio != nil {
- tcp = strings.Contains(descriptor.Audio.Proto, "TCP")
- }
-
- if !tcp && descriptor.Video != nil {
- tcp = strings.Contains(descriptor.Video.Proto, "TCP")
- }
-
- setup := SetupTypeUDP
- if tcp {
- for _, attr := range descriptor.Attrs {
- if "setup" == attr[0] {
- if SetupTypePassive.String() == attr[1] {
- setup = SetupTypePassive
- } else if SetupTypeActive.String() == attr[1] {
- setup = SetupTypeActive
- }
- }
- }
- }
-
- return setup
-}
-
func (d *Device) DoBroadcast(sourceId, channelId string) error {
body := fmt.Sprintf(BroadcastFormat, 1, sourceId, channelId)
request := d.BuildMessageRequest(channelId, body)
- SipUA.SendRequest(request)
+ SipStack.SendRequest(request)
return nil
}
// OnInvite 语音广播
func (d *Device) OnInvite(request sip.Request, user string) sip.Response {
- streamWaiting := Dialogs.Find(user)
+ // 会话是否存在
+ streamWaiting := EarlyDialogs.Find(user)
if streamWaiting == nil {
return CreateResponseWithStatusCode(request, http.StatusBadRequest)
}
+ // 解析offer
sink := streamWaiting.data.(*Sink)
body := request.Body()
- offer, err := sdp.Parse(body)
+ offer, err := ParseGBSDP(body)
if err != nil {
Sugar.Infof("广播失败, 解析sdp发生err: %s sink: %s sdp: %s", err.Error(), sink.SinkID, body)
streamWaiting.Put(http.StatusBadRequest)
return CreateResponseWithStatusCode(request, http.StatusBadRequest)
- } else if offer.Audio == nil {
+ } else if offer.media == nil {
Sugar.Infof("广播失败, offer中缺少audio字段. sink: %s sdp: %s", sink.SinkID, body)
streamWaiting.Put(http.StatusBadRequest)
return CreateResponseWithStatusCode(request, http.StatusBadRequest)
}
- // 通知流媒体服务器创建answer
- offerSetup := findSetup(offer)
- answerSetup := sink.SetupType
- finalSetup := offerSetup
- if answerSetup != offerSetup {
- finalSetup = answerSetup
+ // http接口中设置的setup优先级高于sdp中的setup
+ if offer.answerSetup != sink.SetupType {
+ offer.answerSetup = sink.SetupType
}
- addr := net.JoinHostPort(offer.Addr, strconv.Itoa(int(offer.Audio.Port)))
- host, port, sinkId, err := CreateAnswer(string(sink.StreamID), addr, offerSetup.String(), answerSetup.String(), "", string(InviteTypeBroadcast))
+ response, err := AddForwardSink(TransStreamGBTalk, request, user, sink, sink.StreamID, offer, InviteTypeBroadcast, "8 PCMA/8000")
if err != nil {
Sugar.Errorf("广播失败, 流媒体创建answer发生err: %s sink: %s ", err.Error(), sink.SinkID)
streamWaiting.Put(http.StatusInternalServerError)
return CreateResponseWithStatusCode(request, http.StatusInternalServerError)
}
- var answerSDP string
- // UDP广播
- if SetupTypeUDP == finalSetup {
- answerSDP = fmt.Sprintf(AnswerFormat, Config.SipID, host, host, port, "RTP/AVP")
- } else {
- // TCP广播
- answerSDP = fmt.Sprintf(AnswerFormat, Config.SipID, host, host, port, "TCP/RTP/AVP")
- }
-
- // 创建answer和dialog
- response := CreateResponseWithStatusCode(request, http.StatusOK)
- setToTag(response)
-
- sink.SinkID = sinkId
- sink.SetDialog(d.CreateDialogRequestFromAnswer(response, true))
-
- response.SetBody(answerSDP, true)
- response.AppendHeader(&SDPMessageType)
- response.AppendHeader(GlobalContactAddress.AsContactHeader())
-
streamWaiting.Put(http.StatusOK)
return response
}
diff --git a/client.go b/client.go
index 5646fb5..aefab1e 100644
--- a/client.go
+++ b/client.go
@@ -2,14 +2,22 @@ package main
import (
"encoding/xml"
+ "fmt"
"gb-cms/sdp"
"github.com/ghettovoice/gosip/sip"
"strconv"
"strings"
)
+const (
+ DefaultDomainName = "本域"
+ DefaultManufacturer = "github/lkmio"
+ DefaultModel = "gb-cms"
+ DefaultFirmware = "dev"
+)
+
type GBClient interface {
- SipClient
+ SIPUA
GBDevice
@@ -22,19 +30,21 @@ type GBClient interface {
OnQueryDeviceInfo(sn int)
OnSubscribeCatalog(sn int)
+
+ CloseStream(callId string, bye, ms bool)
}
-type Client struct {
- *sipClient
+type gbClient struct {
+ *sipUA
Device
deviceInfo *DeviceInfoResponse
}
-func (g *Client) OnQueryCatalog(sn int, channels []*Channel) {
+func (g *gbClient) OnQueryCatalog(sn int, channels []*Channel) {
response := CatalogResponse{}
response.SN = sn
response.CmdType = CmdCatalog
- response.DeviceID = g.sipClient.Username
+ response.DeviceID = g.sipUA.Username
response.SumNum = len(channels)
if response.SumNum < 1 {
@@ -48,60 +58,78 @@ func (g *Client) OnQueryCatalog(sn int, channels []*Channel) {
response.DeviceList.Devices = nil
response.DeviceList.Num = 1 // 一次发一个通道
response.DeviceList.Devices = append(response.DeviceList.Devices, &channel)
- response.DeviceList.Devices[0].ParentID = g.sipClient.Username
+ response.DeviceList.Devices[0].ParentID = g.sipUA.Username
g.SendMessage(&response)
}
}
-func (g *Client) SendMessage(msg interface{}) {
+func (g *gbClient) SendMessage(msg interface{}) {
marshal, err := xml.MarshalIndent(msg, "", " ")
if err != nil {
panic(err)
}
- request, err := BuildMessageRequest(g.sipClient.Username, g.sipClient.ListenAddr, g.sipClient.SeverID, g.sipClient.ServerAddr, g.sipClient.Transport, string(marshal))
+ request, err := BuildMessageRequest(g.sipUA.Username, g.sipUA.ListenAddr, g.sipUA.SeverID, g.sipUA.ServerAddr, g.sipUA.Transport, string(marshal))
if err != nil {
panic(err)
}
- g.sipClient.ua.SendRequest(request)
+ g.sipUA.stack.SendRequest(request)
}
-func (g *Client) OnQueryDeviceInfo(sn int) {
+func (g *gbClient) OnQueryDeviceInfo(sn int) {
g.deviceInfo.SN = sn
g.SendMessage(&g.deviceInfo)
}
-func (g *Client) OnInvite(request sip.Request, user string) sip.Response {
+func (g *gbClient) OnInvite(request sip.Request, user string) sip.Response {
return nil
}
-func (g *Client) SetDeviceInfo(name, manufacturer, model, firmware string) {
+func (g *gbClient) SetDeviceInfo(name, manufacturer, model, firmware string) {
g.deviceInfo.DeviceName = name
g.deviceInfo.Manufacturer = manufacturer
g.deviceInfo.Model = model
g.deviceInfo.Firmware = firmware
}
-func (g *Client) OnSubscribeCatalog(sn int) {
+func (g *gbClient) OnSubscribeCatalog(sn int) {
}
-func ParseGBSDP(body string) (offer *sdp.SDP, ssrc string, speed int, media *sdp.Media, offerSetup, answerSetup string, err error) {
- offer, err = sdp.Parse(body)
+func (g *gbClient) CloseStream(callId string, bye, ms bool) {
+
+}
+
+type GBSDP struct {
+ sdp *sdp.SDP
+ ssrc string
+ speed int
+ media *sdp.Media
+ mediaType string
+ offerSetup, answerSetup SetupType
+ startTime, stopTime string
+ connectionAddr string
+ isTcpTransport bool
+}
+
+func ParseGBSDP(body string) (*GBSDP, error) {
+ offer, err := sdp.Parse(body)
if err != nil {
- return nil, "", 0, nil, "", "", err
+ return nil, err
}
+ gbSdp := &GBSDP{sdp: offer}
// 解析设置下载速度
var setup string
for _, attr := range offer.Attrs {
if "downloadspeed" == attr[0] {
- speed, err = strconv.Atoi(attr[1])
+ speed, err := strconv.Atoi(attr[1])
if err != nil {
- return nil, "", 0, nil, "", "", err
+ return nil, err
}
+ gbSdp.speed = speed
} else if "setup" == attr[0] {
setup = attr[1]
}
@@ -110,35 +138,51 @@ func ParseGBSDP(body string) (offer *sdp.SDP, ssrc string, speed int, media *sdp
// 解析ssrc
for _, attr := range offer.Other {
if "y" == attr[0] {
- ssrc = attr[1]
+ gbSdp.ssrc = attr[1]
}
}
if offer.Video != nil {
- media = offer.Video
+ gbSdp.media = offer.Video
+ gbSdp.mediaType = "video"
} else if offer.Audio != nil {
- media = offer.Audio
+ gbSdp.media = offer.Audio
+ gbSdp.mediaType = "audio"
}
- tcp := strings.HasPrefix(media.Proto, "TCP")
+ tcp := strings.HasPrefix(gbSdp.media.Proto, "TCP")
if "passive" == setup && tcp {
- offerSetup = "passive"
- answerSetup = "active"
+ gbSdp.offerSetup = SetupTypePassive
+ gbSdp.answerSetup = SetupTypeActive
} else if "active" == setup && tcp {
- offerSetup = "active"
- answerSetup = "passive"
+ gbSdp.offerSetup = SetupTypeActive
+ gbSdp.answerSetup = SetupTypePassive
}
- return
+ time := strings.Split(gbSdp.sdp.Time, " ")
+ if len(time) < 2 {
+ return nil, fmt.Errorf("sdp的时间范围格式错误 time: %s sdp: %s", gbSdp.sdp.Time, body)
+ }
+
+ gbSdp.startTime = time[0]
+ gbSdp.stopTime = time[1]
+ gbSdp.isTcpTransport = tcp
+ gbSdp.connectionAddr = fmt.Sprintf("%s:%d", gbSdp.sdp.Addr, gbSdp.media.Port)
+ return gbSdp, nil
}
-func NewGBClient(params *SIPUAParams, ua SipServer) GBClient {
- sip := &sipClient{
- SIPUAParams: *params,
- ListenAddr: ua.ListenAddr(),
- ua: ua,
+func NewGBClient(params *SIPUAOptions, stack SipServer) GBClient {
+ ua := &sipUA{
+ SIPUAOptions: *params,
+ ListenAddr: stack.ListenAddr(),
+ stack: stack,
}
- client := &Client{sip, Device{DeviceID: params.Username}, &DeviceInfoResponse{BaseResponse: BaseResponse{BaseMessage: BaseMessage{DeviceID: params.Username, CmdType: CmdDeviceInfo}, Result: "OK"}}}
+ // 心跳间隔最低10秒
+ if ua.SIPUAOptions.KeepaliveInterval < 10 {
+ ua.SIPUAOptions.KeepaliveInterval = 10
+ }
+
+ client := &gbClient{ua, Device{DeviceID: params.Username}, &DeviceInfoResponse{BaseResponse: BaseResponse{BaseMessage: BaseMessage{DeviceID: params.Username, CmdType: CmdDeviceInfo}, Result: "OK"}}}
return client
}
diff --git a/client_benchmark_test.go b/client_benchmark_test.go
index d0abe5f..d932d22 100644
--- a/client_benchmark_test.go
+++ b/client_benchmark_test.go
@@ -100,8 +100,8 @@ package main
// m.Close(true)
//}
//
-//type VirtualDevice struct {
-// *Client
+//type Platform struct {
+// *gbClient
// streams map[string]*MediaStream
// lock sync.Locker
//}
@@ -126,7 +126,7 @@ package main
// }
//}
//
-//func (v VirtualDevice) OnInvite(request sip.Request, user string) sip.Response {
+//func (v Platform) OnInvite(request sip.Request, user string) sip.Response {
// if len(rtpPackets) < 1 {
// return CreateResponseWithStatusCode(request, http.StatusInternalServerError)
// }
@@ -150,10 +150,10 @@ package main
// var ip string
// var port sip.Port
// var contactAddr string
-// if v.sipClient.NatAddr != "" {
-// contactAddr = v.sipClient.NatAddr
+// if v.sipUA.NatAddr != "" {
+// contactAddr = v.sipUA.NatAddr
// } else {
-// contactAddr = v.sipClient.ListenAddr
+// contactAddr = v.sipUA.ListenAddr
// }
//
// host, p, _ := net.SplitHostPort(contactAddr)
@@ -180,7 +180,7 @@ package main
// i, _ := strconv.Atoi(ssrc)
// stream.ssrc = uint32(i)
// stream.tcp = tcp
-// stream.dialog = CreateDialogRequestFromAnswer(response, true, v.sipClient.Domain)
+// stream.dialog = CreateDialogRequestFromAnswer(response, true, v.sipUA.Domain)
// callId, _ := response.CallID()
//
// {
@@ -203,7 +203,7 @@ package main
//
// if sendBye {
// bye := CreateRequestFromDialog(stream.dialog, sip.BYE)
-// v.sipClient.ua.SendRequest(bye)
+// v.sipUA.stack.SendRequest(bye)
// }
//
// stream.dialog = nil
@@ -219,7 +219,7 @@ package main
// stream.Start()
//
// // 绑定到StreamManager, bye请求才会找到设备回调
-// streamId := GenerateStreamID(InviteTypePlay, v.sipClient.Username, user, "", "")
+// streamId := GenerateStreamID(InviteTypePlay, v.sipUA.Username, user, "", "")
// s := StreamID{StreamID: streamId, Dialog: stream.dialog}
// StreamManager.Add(&s)
//
@@ -228,7 +228,7 @@ package main
// return response
//}
//
-//func (v VirtualDevice) OnBye(request sip.Request) {
+//func (v Platform) OnBye(request sip.Request) {
// id, _ := request.CallID()
// stream, ok := v.streams[id.Value()]
// if !ok {
@@ -245,7 +245,7 @@ package main
// stream.Close(false)
//}
//
-//func (v VirtualDevice) Offline() {
+//func (v Platform) Offline() {
// for _, stream := range v.streams {
// stream.Close(true)
// }
@@ -333,7 +333,7 @@ package main
// channelId := clientConfig.ChannelIDPrefix + fmt.Sprintf("%07d", i+1)
// client := NewGBClient(deviceId, clientConfig.ServerAddr, clientConfig.Domain, "UDP", clientConfig.Password, 500, 40, server)
//
-// device := VirtualDevice{client.(*Client), map[string]*MediaStream{}, &sync.Mutex{}}
+// device := Platform{client.(*gbClient), map[string]*MediaStream{}, &sync.Mutex{}}
// device.SetDeviceInfo(fmt.Sprintf("测试设备%d", i+1), "lkmio", "lkmio_gb", "dev-0.0.1")
//
// channel := &Channel{
diff --git a/client_manager.go b/client_manager.go
new file mode 100644
index 0000000..d9dd10f
--- /dev/null
+++ b/client_manager.go
@@ -0,0 +1,94 @@
+package main
+
+import (
+ "sync"
+)
+
+var (
+ // PlatformManager 管理级联设备
+ PlatformManager = &ClientManager{
+ clients: make(map[string]GBClient, 8), // server addr->client
+ addrMap: make(map[string]int, 8),
+ }
+
+ // JTDeviceManager 管理1078设备
+ JTDeviceManager = &ClientManager{
+ clients: make(map[string]GBClient, 8), // username->client
+ addrMap: make(map[string]int, 8),
+ }
+)
+
+type ClientManager struct {
+ clients map[string]GBClient
+ addrMap map[string]int
+ lock sync.RWMutex
+}
+
+func (p *ClientManager) Add(key string, client GBClient) bool {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
+ if _, ok := p.clients[key]; ok {
+ return false
+ }
+
+ p.clients[key] = client
+ p.addrMap[client.GetDomain()]++
+ return true
+}
+
+func (p *ClientManager) Find(key string) GBClient {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+ if client, ok := p.clients[key]; ok {
+ return client
+ }
+ return nil
+}
+
+func (p *ClientManager) Remove(addr string) GBClient {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
+ client, ok := p.clients[addr]
+ if !ok {
+ return nil
+ }
+
+ p.addrMap[client.GetDomain()]++
+ if p.addrMap[client.GetDomain()] < 1 {
+ delete(p.addrMap, client.GetDomain())
+ }
+
+ delete(p.clients, addr)
+ return client
+}
+
+func (p *ClientManager) All() []GBClient {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+
+ clients := make([]GBClient, 0, len(p.clients))
+ for _, client := range p.clients {
+ clients = append(clients, client)
+ }
+
+ return clients
+}
+
+func (p *ClientManager) ExistClientByServerAddr(addr string) bool {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+ _, ok := p.addrMap[addr]
+ return ok
+}
+
+func RemovePlatform(key string) (GBClient, error) {
+ err := PlatformDao.DeleteUAByAddr(key)
+ if err != nil {
+ return nil, err
+ }
+
+ platform := PlatformManager.Remove(key)
+ return platform, nil
+}
diff --git a/config.go b/config.go
index 8836b5a..b7eec6a 100644
--- a/config.go
+++ b/config.go
@@ -25,6 +25,13 @@ type Config_ struct {
Addr string `json:"addr"`
Password string `json:"password"`
}
+
+ Hooks struct {
+ Online string `json:"online"`
+ Offline string `json:"offline"`
+ Position string `json:"position"`
+ OnInvite string `json:"on_invite"`
+ }
}
type LogConfig struct {
diff --git a/config.json b/config.json
index fa4c2f0..978bc64 100644
--- a/config.json
+++ b/config.json
@@ -22,6 +22,12 @@
"offline": "",
"?position" : "设备位置通知",
- "position": ""
+ "position": "",
+
+ "?on_invite": "被邀请, 用于通知1078信令服务器, 向设备下发推流指令",
+ "on_invite": "http://localhost:8081/api/v1/jt1078/on_invite",
+
+ "?on_answer": "被查询录像,用于通知1078信令服务器",
+ "on_query_record": ""
}
}
\ No newline at end of file
diff --git a/dao_channel.go b/dao_channel.go
index f96a397..49890ea 100644
--- a/dao_channel.go
+++ b/dao_channel.go
@@ -1,6 +1,7 @@
package main
import (
+ "fmt"
"gorm.io/gorm"
)
@@ -13,11 +14,25 @@ type DaoChannel interface {
QueryChannels(deviceId, groupId, string, page, size int) ([]*Channel, int, error)
+ QueryChannelsByRootID(rootId string) ([]*Channel, error)
+
+ QueryChannelsByChannelID(channelId string) ([]*Channel, error)
+
QueryChanelCount(deviceId string) (int, error)
QueryOnlineChanelCount(deviceId string) (int, error)
QueryChannelByTypeCode(codecs ...int) ([]*Channel, error)
+
+ ExistChannel(channelId string) bool
+
+ SaveJTChannel(channel *Channel) error
+
+ ExistJTChannel(simNumber string, channelNumber int) bool
+
+ QueryJTChannelBySimNumber(simNumber string) (*Channel, error)
+
+ DeleteChannel(deviceId string, channelId string) error
}
type daoChannel struct {
@@ -68,6 +83,15 @@ func (d *daoChannel) QueryChannels(deviceId, groupId string, page, size int) ([]
return channels, int(total), nil
}
+func (d *daoChannel) QueryChannelsByRootID(rootId string) ([]*Channel, error) {
+ var channels []*Channel
+ tx := db.Where("root_id =?", rootId).Find(&channels)
+ if tx.Error != nil {
+ return nil, tx.Error
+ }
+ return channels, nil
+}
+
func (d *daoChannel) QueryChanelCount(deviceId string) (int, error) {
var total int64
tx := db.Model(&Channel{}).Where("root_id =?", deviceId).Count(&total)
@@ -95,3 +119,37 @@ func (d *daoChannel) QueryChannelByTypeCode(codecs ...int) ([]*Channel, error) {
}
return channels, nil
}
+
+func (d *daoChannel) ExistChannel(channelId string) bool {
+ var channel Channel
+ if db.Select("id").Where("device_id =?", channelId).Take(&channel).Error == nil {
+ return true
+ }
+
+ return false
+}
+
+func (d *daoChannel) SaveJTChannel(channel *Channel) error {
+ return DBTransaction(func(tx *gorm.DB) error {
+ var old Channel
+ if tx.Select("id").Where("root_id =? and channel_number =?", channel.RootID, channel.ChannelNumber).Take(&old).Error == nil {
+ return fmt.Errorf("channel number %d already exist", channel.ChannelNumber)
+ } else if tx.Select("id").Where("device_id =?", channel.DeviceID).Take(&old).Error == nil {
+ return fmt.Errorf("channel id %s already exist", channel.DeviceID)
+ }
+ return tx.Save(channel).Error
+ })
+}
+
+func (d *daoChannel) DeleteChannel(deviceId string, channelId string) error {
+ return db.Where("root_id =? and device_id =?", deviceId, channelId).Unscoped().Delete(&Channel{}).Error
+}
+
+func (d *daoChannel) QueryChannelsByChannelID(channelId string) ([]*Channel, error) {
+ var channels []*Channel
+ tx := db.Where("device_id =?", channelId).Find(&channels)
+ if tx.Error != nil {
+ return nil, tx.Error
+ }
+ return channels, nil
+}
diff --git a/dao_jt.go b/dao_jt.go
new file mode 100644
index 0000000..a3c29c0
--- /dev/null
+++ b/dao_jt.go
@@ -0,0 +1,118 @@
+package main
+
+import (
+ "fmt"
+ "gorm.io/gorm"
+)
+
+// JTDeviceModel 数据库表结构
+type JTDeviceModel struct {
+ GBModel
+ SIPUAOptions
+ Manufacturer string `json:"manufacturer"`
+ Model string `json:"model"`
+ Firmware string `json:"firmware"`
+ SimNumber string `json:"sim_number"`
+}
+
+func (g *JTDeviceModel) TableName() string {
+ return "lkm_jt_device"
+}
+
+// DaoJTDevice 保存级联和1078设备的sipua参数项
+type DaoJTDevice interface {
+ LoadDevices() ([]*JTDeviceModel, error)
+
+ UpdateOnlineStatus(status OnlineStatus, username string) error
+
+ QueryDevice(user string) (*JTDeviceModel, error)
+
+ QueryDeviceBySimNumber(simNumber string) (*JTDeviceModel, error)
+
+ ExistDevice(username, simNumber string) bool
+
+ DeleteDevice(username string) error
+
+ SaveDevice(model *JTDeviceModel) error
+
+ UpdateDevice(model *JTDeviceModel) error
+}
+
+type daoJTDevice struct {
+}
+
+func (d *daoJTDevice) LoadDevices() ([]*JTDeviceModel, error) {
+ var devices []*JTDeviceModel
+ tx := db.Find(&devices)
+ if tx.Error != nil {
+ return nil, tx.Error
+ }
+
+ return devices, nil
+}
+
+func (d *daoJTDevice) UpdateOnlineStatus(status OnlineStatus, username string) error {
+ return DBTransaction(func(tx *gorm.DB) error {
+ return tx.Model(&JTDeviceModel{}).Where("username =?", username).Update("status", status).Error
+ })
+}
+
+func (d *daoJTDevice) ExistDevice(id, simNumber string) bool {
+ var device JTDeviceModel
+ if db.Where("username =? or sim_number =?", id, simNumber).Select("id").Take(&device).Error == nil {
+ return true
+ }
+
+ return false
+}
+
+func (d *daoJTDevice) QueryDevice(id string) (*JTDeviceModel, error) {
+ var device JTDeviceModel
+ tx := db.Where("username =?", id).Take(&device)
+ if tx.Error != nil {
+ return nil, tx.Error
+ }
+ return &device, nil
+}
+
+func (d *daoJTDevice) DeleteDevice(id string) error {
+ return DBTransaction(func(tx *gorm.DB) error {
+ return tx.Where("username =?", id).Unscoped().Delete(&JTDeviceModel{}).Error
+ })
+}
+
+func (d *daoJTDevice) QueryDeviceBySimNumber(simNumber string) (*JTDeviceModel, error) {
+ var device JTDeviceModel
+ tx := db.Where("sim_number =?", simNumber).Take(&device)
+ if tx.Error != nil {
+ return nil, tx.Error
+ }
+
+ return &device, nil
+}
+
+func (d *daoJTDevice) SaveDevice(model *JTDeviceModel) error {
+ return DBTransaction(func(tx *gorm.DB) error {
+ var old JTDeviceModel
+ tx = tx.Where("username =? or sim_number =?", model.Username, model.SimNumber).Select("id").First(&old)
+ if tx.Error == nil {
+ return fmt.Errorf("username or sim number already exists")
+ }
+
+ return db.Save(model).Error
+ })
+}
+
+func (d *daoJTDevice) UpdateDevice(model *JTDeviceModel) error {
+ return DBTransaction(func(tx *gorm.DB) error {
+ var old JTDeviceModel
+ tx = tx.Where("username =? or sim_number =?", model.Username, model.SimNumber).Select("id").First(&old)
+ if tx.Error != nil {
+ return tx.Error
+ } else {
+ model.ID = old.ID
+ }
+
+ return db.Save(model).Error
+ })
+}
diff --git a/dao_platform.go b/dao_platform.go
index adef939..c9e1531 100644
--- a/dao_platform.go
+++ b/dao_platform.go
@@ -1,17 +1,26 @@
package main
-type DaoPlatform interface {
- LoadPlatforms() ([]*SIPUAParams, error)
+// PlatformModel 数据库表结构
+type PlatformModel struct {
+ GBModel
+ SIPUAOptions
+}
- QueryPlatform(addr string) (*SIPUAParams, error)
+func (g *PlatformModel) TableName() string {
+ return "lkm_platform"
+}
- SavePlatform(platform *SIPUAParams) error
+// DaoVirtualDevice 保存级联和1078设备的sipua参数项
+type DaoVirtualDevice interface {
+ LoadPlatforms() ([]*PlatformModel, error)
+
+ QueryPlatform(addr string) (*PlatformModel, error)
+
+ SavePlatform(platform *PlatformModel) error
DeletePlatform(addr string) error
- UpdatePlatform(platform *SIPUAParams) error
-
- UpdatePlatformStatus(addr string, status OnlineStatus) error
+ UpdatePlatform(platform *PlatformModel) error
BindChannels(addr string, channels [][2]string) ([][2]string, error)
@@ -26,8 +35,8 @@ type DaoPlatform interface {
type daoPlatform struct {
}
-func (d *daoPlatform) LoadPlatforms() ([]*SIPUAParams, error) {
- var platforms []*SIPUAParams
+func (d *daoPlatform) LoadPlatforms() ([]*PlatformModel, error) {
+ var platforms []*PlatformModel
tx := db.Find(&platforms)
if tx.Error != nil {
return nil, tx.Error
@@ -36,8 +45,8 @@ func (d *daoPlatform) LoadPlatforms() ([]*SIPUAParams, error) {
return platforms, nil
}
-func (d *daoPlatform) QueryPlatform(addr string) (*SIPUAParams, error) {
- var platform SIPUAParams
+func (d *daoPlatform) QueryUAByAddr(addr string) (*PlatformModel, error) {
+ var platform PlatformModel
tx := db.Where("server_addr =?", addr).First(&platform)
if tx.Error != nil {
return nil, tx.Error
@@ -46,8 +55,8 @@ func (d *daoPlatform) QueryPlatform(addr string) (*SIPUAParams, error) {
return &platform, nil
}
-func (d *daoPlatform) SavePlatform(platform *SIPUAParams) error {
- var old SIPUAParams
+func (d *daoPlatform) SavePlatform(platform *PlatformModel) error {
+ var old PlatformModel
tx := db.Where("server_addr =?", platform.ServerAddr).First(&old)
if tx.Error == nil {
platform.ID = old.ID
@@ -55,27 +64,27 @@ func (d *daoPlatform) SavePlatform(platform *SIPUAParams) error {
return db.Save(platform).Error
}
-func (d *daoPlatform) DeletePlatform(addr string) error {
- return db.Where("server_addr =?", addr).Unscoped().Delete(&SIPUAParams{}).Error
+func (d *daoPlatform) DeleteUAByAddr(addr string) error {
+ return db.Where("server_addr =?", addr).Unscoped().Delete(&PlatformModel{}).Error
}
-func (d *daoPlatform) UpdatePlatform(platform *SIPUAParams) error {
+func (d *daoPlatform) UpdatePlatform(platform *PlatformModel) error {
//TODO implement me
panic("implement me")
}
-func (d *daoPlatform) UpdatePlatformStatus(addr string, status OnlineStatus) error {
- return db.Model(&SIPUAParams{}).Where("server_addr =?", addr).Update("status", status).Error
+func (d *daoPlatform) UpdateOnlineStatus(status OnlineStatus, addr string) error {
+ return db.Model(&PlatformModel{}).Where("server_addr =?", addr).Update("status", status).Error
}
-type DBPlatformChannel struct {
+type PlatformChannelModel struct {
GBModel
DeviceID string `json:"device_id"`
Channel string `json:"channel_id"`
ServerAddr string `json:"server_addr"`
}
-func (d *DBPlatformChannel) TableName() string {
+func (d *PlatformChannelModel) TableName() string {
return "lkm_platform_channel"
}
@@ -83,10 +92,10 @@ func (d *daoPlatform) BindChannels(addr string, channels [][2]string) ([][2]stri
var res [][2]string
for _, channel := range channels {
- var old DBPlatformChannel
+ var old PlatformChannelModel
_ = db.Where("device_id =? and channel_id =? and server_addr =?", channel[0], channel[1], addr).First(&old)
if old.ID == 0 {
- _ = db.Create(&DBPlatformChannel{
+ _ = db.Create(&PlatformChannelModel{
DeviceID: channel[0],
Channel: channel[1],
})
@@ -100,7 +109,7 @@ func (d *daoPlatform) BindChannels(addr string, channels [][2]string) ([][2]stri
func (d *daoPlatform) UnbindChannels(addr string, channels [][2]string) ([][2]string, error) {
var res [][2]string
for _, channel := range channels {
- tx := db.Unscoped().Delete(&DBPlatformChannel{}, "device_id =? and channel_id =? and server_addr =?", channel[0], channel[1], addr)
+ tx := db.Unscoped().Delete(&PlatformChannelModel{}, "device_id =? and channel_id =? and server_addr =?", channel[0], channel[1], addr)
if tx.Error == nil {
res = append(res, channel)
} else {
@@ -112,8 +121,8 @@ func (d *daoPlatform) UnbindChannels(addr string, channels [][2]string) ([][2]st
}
func (d *daoPlatform) QueryPlatformChannel(addr string, channelId string) (string, *Channel, error) {
- var platformChannel DBPlatformChannel
- tx := db.Model(&DBPlatformChannel{}).Where("channel_id =? and server_addr =?", channelId, addr).First(&platformChannel)
+ var platformChannel PlatformChannelModel
+ tx := db.Model(&PlatformChannelModel{}).Where("channel_id =? and server_addr =?", channelId, addr).First(&platformChannel)
if tx.Error != nil {
return "", nil, tx.Error
}
@@ -128,7 +137,7 @@ func (d *daoPlatform) QueryPlatformChannel(addr string, channelId string) (strin
}
func (d *daoPlatform) QueryPlatformChannels(addr string) ([]*Channel, error) {
- var platformChannels []*DBPlatformChannel
+ var platformChannels []*PlatformChannelModel
tx := db.Where("server_addr =?", addr).Find(&platformChannels)
if tx.Error != nil {
return nil, tx.Error
@@ -143,7 +152,6 @@ func (d *daoPlatform) QueryPlatformChannels(addr string) ([]*Channel, error) {
} else {
Sugar.Errorf("查询级联设备通道失败. device_id: %s, channel_id: %s err: %s", platformChannel.DeviceID, platformChannel.Channel, tx.Error)
}
-
}
return channels, nil
diff --git a/db_sqlite.go b/db_sqlite.go
index 61a676c..f03236d 100644
--- a/db_sqlite.go
+++ b/db_sqlite.go
@@ -21,6 +21,7 @@ var (
PlatformDao = &daoPlatform{}
StreamDao = &daoStream{}
SinkDao = &daoSink{}
+ JTDeviceDao = &daoJTDevice{}
)
func init() {
@@ -61,13 +62,15 @@ func init() {
panic(err)
} else if err = db.AutoMigrate(&Channel{}); err != nil {
panic(err)
- } else if err = db.AutoMigrate(&SIPUAParams{}); err != nil {
+ } else if err = db.AutoMigrate(&PlatformModel{}); err != nil {
panic(err)
} else if err = db.AutoMigrate(&Stream{}); err != nil {
panic(err)
} else if err = db.AutoMigrate(&Sink{}); err != nil {
panic(err)
- } else if err = db.AutoMigrate(&DBPlatformChannel{}); err != nil {
+ } else if err = db.AutoMigrate(&PlatformChannelModel{}); err != nil {
+ panic(err)
+ } else if err = db.AutoMigrate(&JTDeviceModel{}); err != nil {
panic(err)
}
diff --git a/device.go b/device.go
index 5cca9bd..6636daf 100644
--- a/device.go
+++ b/device.go
@@ -127,19 +127,19 @@ func (d *Device) BuildMessageRequest(to, body string) sip.Request {
func (d *Device) QueryDeviceInfo() {
body := fmt.Sprintf(DeviceInfoFormat, "1", d.DeviceID)
request := d.BuildMessageRequest(d.DeviceID, body)
- SipUA.SendRequest(request)
+ SipStack.SendRequest(request)
}
func (d *Device) QueryCatalog() {
body := fmt.Sprintf(CatalogFormat, "1", d.DeviceID)
request := d.BuildMessageRequest(d.DeviceID, body)
- SipUA.SendRequest(request)
+ SipStack.SendRequest(request)
}
func (d *Device) QueryRecord(channelId, startTime, endTime string, sn int, type_ string) error {
body := fmt.Sprintf(QueryRecordFormat, sn, channelId, startTime, endTime, type_)
request := d.BuildMessageRequest(channelId, body)
- SipUA.SendRequest(request)
+ SipStack.SendRequest(request)
return nil
}
@@ -169,7 +169,7 @@ func (d *Device) SubscribePosition(channelId string) error {
event := Event("Catalog;id=2")
request.AppendHeader(&event)
- response, err := SipUA.SendRequestWithTimeout(5, request)
+ response, err := SipStack.SendRequestWithTimeout(5, request)
if err != nil {
return err
}
@@ -184,7 +184,7 @@ func (d *Device) SubscribePosition(channelId string) error {
func (d *Device) Broadcast(sourceId, channelId string) sip.ClientTransaction {
body := fmt.Sprintf(BroadcastFormat, 1, sourceId, channelId)
request := d.BuildMessageRequest(channelId, body)
- return SipUA.SendRequest(request)
+ return SipStack.SendRequest(request)
}
func (d *Device) UpdateChannel(id string, event string) {
@@ -241,7 +241,7 @@ func (d *Device) NewRequestBuilder(method sip.RequestMethod, fromUser, realm, to
func (d *Device) BuildInviteRequest(sessionName, channelId, ip string, port uint16, startTime, stopTime, setup string, speed int, ssrc string) (sip.Request, error) {
builder := d.NewRequestBuilder(sip.INVITE, Config.SipID, Config.SipContactAddr, channelId)
- sdp := BuildSDP(Config.SipID, sessionName, ip, port, startTime, stopTime, setup, speed, ssrc)
+ sdp := BuildSDP("video", Config.SipID, sessionName, ip, port, startTime, stopTime, setup, speed, ssrc, "96 PS/90000")
builder.SetContentType(&SDPMessageType)
builder.SetContact(GlobalContactAddress)
builder.SetBody(sdp)
diff --git a/dialogs.go b/dialogs.go
index 44e299c..9041f08 100644
--- a/dialogs.go
+++ b/dialogs.go
@@ -10,7 +10,7 @@ import (
)
var (
- Dialogs = NewDialogManager[*StreamWaiting]()
+ EarlyDialogs = NewDialogManager[*StreamWaiting]()
)
type StreamWaiting struct {
diff --git a/hook/event.go b/hook/event.go
new file mode 100644
index 0000000..437245c
--- /dev/null
+++ b/hook/event.go
@@ -0,0 +1,50 @@
+package hook
+
+import (
+ "bytes"
+ "encoding/json"
+ "net/http"
+)
+
+const (
+ EventTypeDeviceOnline = iota + 1
+ EventTypeDeviceOffline
+ EventTypeDevicePosition
+ EventTypeDeviceOnInvite
+)
+
+var (
+ EventUrls = make(map[int]string)
+)
+
+func RegisterEventUrl(event int, url string) {
+ EventUrls[event] = url
+}
+
+func PostEvent(url string, body []byte) (*http.Response, error) {
+ client := &http.Client{
+ //Timeout: time.Duration(AppConfig.Hooks.Timeout),
+ }
+
+ request, err := http.NewRequest("post", url, bytes.NewBuffer(body))
+ if err != nil {
+ return nil, err
+ }
+
+ request.Header.Set("Content-Type", "application/json")
+ return client.Do(request)
+}
+
+func PostOnInviteEvent(simNumber, channelNumber string) (*http.Response, error) {
+ params := map[string]string{
+ "sim_number": simNumber,
+ "channel_number": channelNumber,
+ }
+
+ body, err := json.Marshal(params)
+ if err != nil {
+ return nil, err
+ }
+
+ return PostEvent(EventUrls[EventTypeDeviceOnInvite], body)
+}
diff --git a/jt_device.go b/jt_device.go
new file mode 100644
index 0000000..a86cb00
--- /dev/null
+++ b/jt_device.go
@@ -0,0 +1,70 @@
+package main
+
+import (
+ "github.com/ghettovoice/gosip/sip"
+ "net/http"
+ "strconv"
+ "strings"
+)
+
+type JTDevice struct {
+ *Platform
+ username string
+ simNumber string
+}
+
+func (g *JTDevice) OnInvite(request sip.Request, user string) sip.Response {
+ // 通知1078的信令服务器
+ channels, _ := ChannelDao.QueryChannelsByChannelID(user)
+ if len(channels) < 1 {
+ Sugar.Errorf("处理1078的invite失败. 通道不存在 channel: %s device: %s", user, g.Username)
+ return CreateResponseWithStatusCode(request, http.StatusNotFound)
+ } else if channels[0].RootID != g.username {
+ Sugar.Errorf("处理1078的invite失败. 设备和通道不匹配 channel: %s device: %s", user, g.Username)
+ return CreateResponseWithStatusCode(request, http.StatusNotFound)
+ }
+
+ channel := channels[0]
+ gbsdp, err := ParseGBSDP(request.Body())
+ if err != nil {
+ Sugar.Errorf("处理上级Invite失败, 解析上级SDP发生错误 err: %s sdp: %s", err.Error(), request.Body())
+ return CreateResponseWithStatusCode(request, http.StatusBadRequest)
+ }
+
+ var inviteType InviteType
+ inviteType.SessionName2Type(strings.ToLower(gbsdp.sdp.Session))
+ if InviteTypePlay != inviteType {
+ Sugar.Warnf("处理上级Invite失败, 1078暂不支持非实时预览流 inviteType: %s channel: %s device: %s", inviteType, user, g.Username)
+ return CreateResponseWithStatusCode(request, http.StatusNotImplemented)
+ }
+
+ streamId := GenerateStreamID(inviteType, g.simNumber, strconv.Itoa(channel.ChannelNumber), gbsdp.startTime, gbsdp.stopTime)
+
+ sink := &Sink{
+ StreamID: streamId,
+ ServerAddr: g.ServerAddr,
+ Protocol: "gb_gateway"}
+
+ response, err := AddForwardSink(TransStreamGBGateway, request, user, sink, streamId, gbsdp, inviteType, "96 PS/90000")
+ if err != nil {
+ Sugar.Errorf("处理1078的invite失败. 发送hook失败 err: %s channel: %s device: %s", err.Error(), user, g.Username)
+ return CreateResponseWithStatusCode(request, http.StatusInternalServerError)
+ }
+
+ return response
+}
+
+func NewJTDevice(model *JTDeviceModel, ua SipServer) (*JTDevice, error) {
+ platform, err := NewPlatform(&model.SIPUAOptions, ua)
+ if err != nil {
+ return nil, err
+ }
+
+ platform.SetDeviceInfo(model.Name, model.Manufacturer, model.Model, model.Firmware)
+
+ return &JTDevice{
+ Platform: platform,
+ username: model.Username,
+ simNumber: model.SimNumber,
+ }, nil
+}
diff --git a/live.go b/live.go
index 100ca1e..e3fe76a 100644
--- a/live.go
+++ b/live.go
@@ -41,7 +41,7 @@ func (i *InviteType) SessionName2Type(name string) {
func (d *Device) StartStream(inviteType InviteType, streamId StreamID, channelId, startTime, stopTime, setup string, speed int, sync bool) (*Stream, error) {
stream := &Stream{
StreamID: streamId,
- Protocol: "28181",
+ Protocol: SourceType28181,
}
// 先添加占位置, 防止重复请求
@@ -64,8 +64,8 @@ func (d *Device) StartStream(inviteType InviteType, streamId StreamID, channelId
// 等待流媒体服务发送推流通知
wait := func() bool {
waiting := StreamWaiting{}
- _, _ = Dialogs.Add(string(streamId), &waiting)
- defer Dialogs.Remove(string(streamId))
+ _, _ = EarlyDialogs.Add(string(streamId), &waiting)
+ defer EarlyDialogs.Remove(string(streamId))
ok := http.StatusOK == waiting.Receive(10)
if !ok {
@@ -95,12 +95,12 @@ func (d *Device) Invite(inviteType InviteType, streamId StreamID, channelId, sta
defer func() {
// 如果失败, 告知流媒体服务释放国标源
if err != nil {
- go CloseSource(string(streamId))
+ go MSCloseSource(string(streamId))
}
}()
// 告知流媒体服务创建国标源, 返回收流地址信息
- ip, port, urls, ssrc, msErr := CreateGBSource(string(streamId), setup, "", string(inviteType))
+ ip, port, urls, ssrc, msErr := MSCreateGBSource(string(streamId), setup, "", string(inviteType))
if msErr != nil {
Sugar.Errorf("创建GBSource失败 err: %s", msErr.Error())
return nil, nil, msErr
@@ -126,7 +126,7 @@ func (d *Device) Invite(inviteType InviteType, streamId StreamID, channelId, sta
var body string
reqCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
// invite信令交互
- SipUA.SendRequestWithContext(reqCtx, inviteRequest, gosip.WithResponseHandler(func(res sip.Response, request sip.Request) {
+ SipStack.SendRequestWithContext(reqCtx, inviteRequest, gosip.WithResponseHandler(func(res sip.Response, request sip.Request) {
if res.StatusCode() < 200 {
} else if res.StatusCode() == 200 {
@@ -144,7 +144,7 @@ func (d *Device) Invite(inviteType InviteType, streamId StreamID, channelId, sta
Sugar.Infof("send ack %s", ackRequest.String())
- err = SipUA.Send(ackRequest)
+ err = SipStack.Send(ackRequest)
if err != nil {
cancel()
Sugar.Errorf("send ack error %s %s", err.Error(), ackRequest.String())
@@ -172,7 +172,7 @@ func (d *Device) Invite(inviteType InviteType, streamId StreamID, channelId, sta
}
addr := fmt.Sprintf("%s:%d", answer.Addr, answer.Video.Port)
- if err = ConnectGBSource(string(streamId), addr); err != nil {
+ if err = MSConnectGBSource(string(streamId), addr); err != nil {
Sugar.Errorf("设置GB28181连接地址失败 err: %s addr: %s", err.Error(), addr)
return nil, nil, err
}
diff --git a/main.go b/main.go
index 9390b0c..bd78b24 100644
--- a/main.go
+++ b/main.go
@@ -2,6 +2,7 @@ package main
import (
"encoding/json"
+ "gb-cms/hook"
"go.uber.org/zap/zapcore"
"net"
"net/http"
@@ -11,14 +12,14 @@ import (
)
var (
- Config *Config_
- SipUA SipServer
+ Config *Config_
+ SipStack SipServer
)
func init() {
logConfig := LogConfig{
Level: int(zapcore.DebugLevel),
- Name: "./logs/cms.log",
+ Name: "./logs/clog",
MaxSize: 10,
MaxBackup: 100,
MaxAge: 7,
@@ -38,6 +39,10 @@ func main() {
indent, _ := json.MarshalIndent(Config, "", "\t")
Sugar.Infof("server config:\r\n%s", indent)
+ if config.Hooks.OnInvite != "" {
+ hook.RegisterEventUrl(hook.EventTypeDeviceOnInvite, config.Hooks.OnInvite)
+ }
+
OnlineDeviceManager.Start(time.Duration(Config.AliveExpires)*time.Second/4, time.Duration(Config.AliveExpires)*time.Second, OnExpires)
// 从数据库中恢复会话
@@ -58,7 +63,7 @@ func main() {
Sugar.Infof("启动sip server成功. addr: %s:%d", config.ListenIP, config.SipPort)
Config.SipContactAddr = net.JoinHostPort(config.PublicIP, strconv.Itoa(config.SipPort))
- SipUA = server
+ SipStack = server
// 在sip启动后, 关闭无效的流
for _, stream := range streams {
@@ -71,6 +76,8 @@ func main() {
// 启动级联设备
startPlatformDevices()
+ // 启动1078设备
+ startJTDevices()
httpAddr := net.JoinHostPort(config.ListenIP, strconv.Itoa(config.HttpPort))
Sugar.Infof("启动http server. addr: %s", httpAddr)
diff --git a/media_server.go b/media_server.go
index e49cae5..5c11e12 100644
--- a/media_server.go
+++ b/media_server.go
@@ -6,10 +6,29 @@ import (
"fmt"
"net"
"net/http"
+ "net/url"
"strconv"
"time"
)
+const (
+ TransStreamRtmp = iota + 1
+ TransStreamFlv = 2
+ TransStreamRtsp = 3
+ TransStreamHls = 4
+ TransStreamRtc = 5
+ TransStreamGBCascaded = 6 // 国标级联转发
+ TransStreamGBTalk = 7 // 国标广播/对讲转发
+ TransStreamGBGateway = 8 // 国标网关
+)
+
+const (
+ SourceTypeRtmp = iota + 1
+ SourceType28181
+ SourceType1078
+ SourceTypeGBTalk
+)
+
type SourceDetails struct {
ID string `json:"id"`
Protocol string `json:"protocol"` // 推流协议
@@ -43,10 +62,22 @@ type SourceSDP struct {
type GBOffer struct {
SourceSDP
- AnswerSetup string `json:"answer_setup,omitempty"` // 希望应答的连接方式
+ AnswerSetup string `json:"answer_setup,omitempty"` // 希望应答的连接方式
+ TransStreamProtocol int `json:"trans_stream_protocol,omitempty"`
}
func Send(path string, body interface{}) (*http.Response, error) {
+ return SendWithUrlParams(path, body, nil)
+}
+
+func SendWithUrlParams(path string, body interface{}, values url.Values) (*http.Response, error) {
+ if values != nil {
+ params := values.Encode()
+ if len(params) > 0 {
+ path = fmt.Sprintf("%s?%s", path, params)
+ }
+ }
+
url := fmt.Sprintf("http://%s/%s", Config.MediaServer, path)
data, err := json.Marshal(body)
@@ -67,7 +98,7 @@ func Send(path string, body interface{}) (*http.Response, error) {
return client.Do(request)
}
-func CreateGBSource(id, setup string, ssrc string, sessionName string) (string, uint16, []string, string, error) {
+func MSCreateGBSource(id, setup string, ssrc string, sessionName string) (string, uint16, []string, string, error) {
v := &SourceSDP{
Source: id,
SDP: SDP{
@@ -102,7 +133,7 @@ func CreateGBSource(id, setup string, ssrc string, sessionName string) (string,
return host, uint16(port), data.Data.Urls, data.Data.SSRC, err
}
-func ConnectGBSource(id, addr string) error {
+func MSConnectGBSource(id, addr string) error {
v := &SourceSDP{
Source: id,
SDP: SDP{
@@ -114,7 +145,7 @@ func ConnectGBSource(id, addr string) error {
return err
}
-func CloseSource(id string) error {
+func MSCloseSource(id string) error {
v := &struct {
Source string `json:"source"`
}{
@@ -125,10 +156,53 @@ func CloseSource(id string) error {
return err
}
-func CreateAnswer(id, addr, offerSetup, answerSetup, ssrc, sessionName string) (string, uint16, string, error) {
+func MSCloseSink(sourceId string, sinkId string) {
+ v := struct {
+ SourceID string `json:"source"`
+ SinkID string `json:"sink"` // sink id
+ }{
+ sourceId, sinkId,
+ }
+
+ _, _ = Send("api/v1/sink/close", v)
+}
+
+func MSQuerySourceList() ([]*SourceDetails, error) {
+ response, err := Send("api/v1/source/list", nil)
+ if err != nil {
+ return nil, err
+ }
+
+ data := &Response[[]*SourceDetails]{}
+ if err = DecodeJSONBody(response.Body, data); err != nil {
+ return nil, err
+ }
+
+ return data.Data, err
+}
+
+func MSQuerySinkList(source string) ([]*SinkDetails, error) {
+ id := struct {
+ Source string `json:"source"`
+ }{source}
+
+ response, err := Send("api/v1/sink/list", id)
+ if err != nil {
+ return nil, err
+ }
+
+ data := &Response[[]*SinkDetails]{}
+ if err = DecodeJSONBody(response.Body, data); err != nil {
+ return nil, err
+ }
+
+ return data.Data, err
+}
+
+func MSAddForwardSink(protocol int, source, addr, offerSetup, answerSetup, ssrc, sessionName string, values url.Values) (string, uint16, string, error) {
offer := &GBOffer{
SourceSDP: SourceSDP{
- Source: id,
+ Source: source,
SDP: SDP{
Addr: addr,
Setup: offerSetup,
@@ -136,10 +210,12 @@ func CreateAnswer(id, addr, offerSetup, answerSetup, ssrc, sessionName string) (
SessionName: sessionName,
},
},
- AnswerSetup: answerSetup,
+ AnswerSetup: answerSetup,
+ TransStreamProtocol: protocol,
}
- response, err := Send("api/v1/gb28181/answer/create", offer)
+ var err error
+ response, err := SendWithUrlParams("api/v1/sink/add", offer, values)
if err != nil {
return "", 0, "", err
}
@@ -163,46 +239,3 @@ func CreateAnswer(id, addr, offerSetup, answerSetup, ssrc, sessionName string) (
port, _ := strconv.Atoi(p)
return host, uint16(port), data.Data.Sink, nil
}
-
-func CloseSink(sourceId string, sinkId string) {
- v := struct {
- SourceID string `json:"source"`
- SinkID string `json:"sink"` // sink id
- }{
- sourceId, sinkId,
- }
-
- _, _ = Send("api/v1/sink/close", v)
-}
-
-func QuerySourceList() ([]*SourceDetails, error) {
- response, err := Send("api/v1/source/list", nil)
- if err != nil {
- return nil, err
- }
-
- data := &Response[[]*SourceDetails]{}
- if err = DecodeJSONBody(response.Body, data); err != nil {
- return nil, err
- }
-
- return data.Data, err
-}
-
-func QuerySinkList(source string) ([]*SinkDetails, error) {
- id := struct {
- Source string `json:"source"`
- }{source}
-
- response, err := Send("api/v1/sink/list", id)
- if err != nil {
- return nil, err
- }
-
- data := &Response[[]*SinkDetails]{}
- if err = DecodeJSONBody(response.Body, data); err != nil {
- return nil, err
- }
-
- return data.Data, err
-}
diff --git a/message_factory.go b/message_factory.go
index ceafa1d..2a0fd0a 100644
--- a/message_factory.go
+++ b/message_factory.go
@@ -12,15 +12,14 @@ const (
XmlHeaderGBK = `` + "\r\n"
)
-func BuildSDP(userName, sessionName, ip string, port uint16, startTime, stopTime, setup string, speed int, ssrc string) string {
+func BuildSDP(media, userName, sessionName, ip string, port uint16, startTime, stopTime, setup string, speed int, ssrc string, attrs ...string) string {
format := "v=0\r\n" +
"o=%s 0 0 IN IP4 %s\r\n" +
"s=%s\r\n" +
"c=IN IP4 %s\r\n" +
"t=%s %s\r\n" +
- "m=video %d %s 96\r\n" +
- "a=%s\r\n" +
- "a=rtpmap:96 PS/90000\r\n"
+ "m=%s %d %s %s\r\n" +
+ "a=%s\r\n"
tcpFormat := "a=setup:%s\r\n" +
"a=connection:new\r\n"
@@ -34,7 +33,16 @@ func BuildSDP(userName, sessionName, ip string, port uint16, startTime, stopTime
mediaProtocol = "RTP/AVP"
}
- sdp := fmt.Sprintf(format, userName, ip, sessionName, ip, startTime, stopTime, port, mediaProtocol, "recvonly")
+ var mediaFormats []string
+ for _, attr := range attrs {
+ mediaFormats = append(mediaFormats, strings.Split(attr, " ")[0])
+ }
+
+ sdp := fmt.Sprintf(format, userName, ip, sessionName, ip, startTime, stopTime, media, port, mediaProtocol, strings.Join(mediaFormats, " "), "recvonly")
+ for _, attr := range attrs {
+ sdp += fmt.Sprintf("a=rtpmap:%s\r\n", attr)
+ }
+
if tcp {
sdp += fmt.Sprintf(tcpFormat, setup)
}
@@ -54,6 +62,7 @@ func NewSIPRequestBuilderWithTransport(transport string) *sip.RequestBuilder {
}
builder.AddVia(&hop)
+ builder.SetUserAgent(nil)
return builder
}
diff --git a/platform.go b/platform.go
index a995a03..448aa12 100644
--- a/platform.go
+++ b/platform.go
@@ -10,19 +10,24 @@ import (
"sync"
)
-type GBPlatform struct {
- *Client
+const (
+ UATypeGB = iota + 1
+ UATypeJT
+)
+
+type Platform struct {
+ *gbClient
lock sync.Mutex
sinks map[string]StreamID // 保存级联转发的sink, 方便离线的时候关闭sink
}
-func (g *GBPlatform) addSink(callId string, stream StreamID) {
+func (g *Platform) addSink(callId string, stream StreamID) {
g.lock.Lock()
defer g.lock.Unlock()
g.sinks[callId] = stream
}
-func (g *GBPlatform) removeSink(callId string) StreamID {
+func (g *Platform) removeSink(callId string) StreamID {
g.lock.Lock()
defer g.lock.Unlock()
stream := g.sinks[callId]
@@ -31,17 +36,17 @@ func (g *GBPlatform) removeSink(callId string) StreamID {
}
// OnBye 被上级挂断
-func (g *GBPlatform) OnBye(request sip.Request) {
+func (g *Platform) OnBye(request sip.Request) {
id, _ := request.CallID()
g.CloseStream(id.Value(), false, true)
}
// CloseStream 关闭级联会话
-func (g *GBPlatform) CloseStream(callId string, bye, ms bool) {
+func (g *Platform) CloseStream(callId string, bye, ms bool) {
_ = g.removeSink(callId)
sink := RemoveForwardSinkWithCallId(callId)
if sink == nil {
- Sugar.Errorf("关闭级联转发sink失败, 找不到sink. callid: %s", callId)
+ Sugar.Errorf("关闭转发sink失败, 找不到sink. callid: %s", callId)
return
}
@@ -49,7 +54,7 @@ func (g *GBPlatform) CloseStream(callId string, bye, ms bool) {
}
// CloseStreams 关闭所有级联会话
-func (g *GBPlatform) CloseStreams(bye, ms bool) {
+func (g *Platform) CloseStreams(bye, ms bool) {
var callIds []string
g.lock.Lock()
@@ -66,8 +71,8 @@ func (g *GBPlatform) CloseStreams(bye, ms bool) {
}
// OnInvite 被上级呼叫
-func (g *GBPlatform) OnInvite(request sip.Request, user string) sip.Response {
- Sugar.Infof("收到级联Invite请求 platform: %s channel: %s sdp: %s", g.SeverID, user, request.Body())
+func (g *Platform) OnInvite(request sip.Request, user string) sip.Response {
+ Sugar.Infof("收到上级Invite请求 platform: %s channel: %s sdp: %s", g.SeverID, user, request.Body())
source := request.Source()
platform := PlatformManager.Find(source)
@@ -75,124 +80,84 @@ func (g *GBPlatform) OnInvite(request sip.Request, user string) sip.Response {
deviceId, channel, err := PlatformDao.QueryPlatformChannel(g.ServerAddr, user)
if err != nil {
- Sugar.Errorf("级联转发失败, 查询数据库失败 err: %s platform: %s channel: %s", err.Error(), g.SeverID, user)
+ Sugar.Errorf("处理上级Invite失败, 查询数据库失败 err: %s platform: %s channel: %s", err.Error(), g.SeverID, user)
return CreateResponseWithStatusCode(request, http.StatusInternalServerError)
}
// 查找通道对应的设备
device, _ := DeviceDao.QueryDevice(deviceId)
if device == nil {
- Sugar.Errorf("级联转发失败, 设备不存在 device: %s channel: %s", device, user)
+ Sugar.Errorf("处理上级Invite失败, 设备不存在 device: %s channel: %s", device, user)
return CreateResponseWithStatusCode(request, http.StatusNotFound)
}
- parse, ssrc, speed, media, offerSetup, answerSetup, err := ParseGBSDP(request.Body())
+ gbSdp, err := ParseGBSDP(request.Body())
if err != nil {
- Sugar.Errorf("级联转发失败, 解析上级SDP发生错误 err: %s sdp: %s", err.Error(), request.Body())
+ Sugar.Errorf("处理上级Invite失败,err: %s sdp: %s", err.Error(), request.Body())
return CreateResponseWithStatusCode(request, http.StatusBadRequest)
}
- // 解析时间范围
- time := strings.Split(parse.Time, " ")
- if len(time) < 2 {
- Sugar.Errorf("级联转发失败 上级sdp的时间范围格式错误 time: %s sdp: %s", parse.Time, request.Body())
- return CreateResponseWithStatusCode(request, http.StatusBadRequest)
- }
-
- var streamId StreamID
var inviteType InviteType
- inviteType.SessionName2Type(strings.ToLower(parse.Session))
- switch inviteType {
- case InviteTypePlay:
- streamId = GenerateStreamID(InviteTypePlay, channel.ParentID, user, "", "")
- break
- case InviteTypePlayback:
- // 级联下载和回放不限制路数,也不共享流
- streamId = GenerateStreamID(InviteTypePlayback, channel.ParentID, user, time[0], time[1]) + StreamID("."+utils.RandStringBytes(10))
- break
- case InviteTypeDownload:
- streamId = GenerateStreamID(InviteTypeDownload, channel.ParentID, user, time[0], time[1]) + StreamID("."+utils.RandStringBytes(10))
- break
- }
+ inviteType.SessionName2Type(strings.ToLower(gbSdp.sdp.Session))
+ streamId := GenerateStreamID(inviteType, channel.RootID, channel.DeviceID, gbSdp.startTime, gbSdp.stopTime)
+ // 如果流不存在, 向通道发送Invite请求
stream, _ := StreamDao.QueryStream(streamId)
- addr := fmt.Sprintf("%s:%d", parse.Addr, media.Port)
if stream == nil {
- s := channel.SetupType.String()
- println(s)
- stream, err = device.StartStream(inviteType, streamId, user, time[0], time[1], channel.SetupType.String(), 0, true)
+ stream, err = device.StartStream(inviteType, streamId, user, gbSdp.startTime, gbSdp.stopTime, channel.SetupType.String(), 0, true)
if err != nil {
- Sugar.Errorf("级联转发失败 err: %s stream: %s", err.Error(), streamId)
+ Sugar.Errorf("处理上级Invite失败 err: %s stream: %s", err.Error(), streamId)
return CreateResponseWithStatusCode(request, http.StatusBadRequest)
}
}
- ip, port, sinkID, err := CreateAnswer(string(streamId), addr, offerSetup, answerSetup, ssrc, string(inviteType))
- if err != nil {
- Sugar.Errorf("级联转发失败,向流媒体服务添加转发Sink失败 err: %s", err.Error())
-
- if "play" != parse.Session {
- CloseStream(streamId, true)
- }
-
- return CreateResponseWithStatusCode(request, http.StatusInternalServerError)
- }
-
- // answer添加contact头域
- answer := BuildSDP(user, parse.Session, ip, port, time[0], time[1], answerSetup, speed, ssrc)
- response := CreateResponseWithStatusCode(request, http.StatusOK)
- response.RemoveHeader("Contact")
- response.AppendHeader(GlobalContactAddress.AsContactHeader())
- response.AppendHeader(&SDPMessageType)
- response.SetBody(answer, true)
-
- setToTag(response)
-
sink := &Sink{
- SinkID: sinkID,
StreamID: streamId,
ServerAddr: g.ServerAddr,
- Protocol: "gb_cascaded_forward"}
- sink.SetDialog(g.CreateDialogRequestFromAnswer(response, true))
+ Protocol: "gb_cascaded"}
+
+ response, err := AddForwardSink(TransStreamGBCascaded, request, user, sink, streamId, gbSdp, inviteType, "96 PS/90000")
+ if err != nil {
+ Sugar.Errorf("处理上级Invite失败 err: %s stream: %s", err.Error(), streamId)
+ }
- AddForwardSink(streamId, sink)
return response
}
-func (g *GBPlatform) Start() {
- Sugar.Infof("启动级联设备, deivce: %s transport: %s addr: %s", g.Username, g.sipClient.Transport, g.sipClient.ServerAddr)
- g.sipClient.Start()
- g.sipClient.SetOnRegisterHandler(g.onlineCB, g.offlineCB)
+func (g *Platform) Start() {
+ Sugar.Infof("启动级联设备, deivce: %s transport: %s addr: %s", g.Username, g.sipUA.Transport, g.sipUA.ServerAddr)
+ g.sipUA.Start()
+ g.sipUA.SetOnRegisterHandler(g.Online, g.Offline)
}
-func (g *GBPlatform) Stop() {
- g.sipClient.Stop()
- g.sipClient.SetOnRegisterHandler(nil, nil)
+func (g *Platform) Stop() {
+ g.sipUA.Stop()
+ g.sipUA.SetOnRegisterHandler(nil, nil)
// 释放所有推流
g.CloseStreams(true, true)
}
-func (g *GBPlatform) Online() {
- Sugar.Infof("级联设备上线 device: %s", g.SeverID)
+func (g *Platform) Online() {
+ Sugar.Infof("ua上线 device: %s server addr: %s", g.Username, g.ServerAddr)
- if err := PlatformDao.UpdatePlatformStatus(g.SeverID, ON); err != nil {
- Sugar.Infof("更新级联设备状态失败 err: %s device: %s", err.Error(), g.SeverID)
+ if err := PlatformDao.UpdateOnlineStatus(ON, g.ServerAddr); err != nil {
+ Sugar.Infof("ua状态失败 err: %s server addr: %s", err.Error(), g.ServerAddr)
}
}
-func (g *GBPlatform) Offline() {
- Sugar.Infof("级联设备离线 device: %s", g.SeverID)
+func (g *Platform) Offline() {
+ Sugar.Infof("ua离线 device: %s server addr: %s", g.Username, g.ServerAddr)
- if err := PlatformDao.UpdatePlatformStatus(g.SeverID, OFF); err != nil {
- Sugar.Infof("更新级联设备状态失败 err: %s device: %s", err.Error(), g.SeverID)
+ if err := PlatformDao.UpdateOnlineStatus(OFF, g.ServerAddr); err != nil {
+ Sugar.Infof("ua状态失败 err: %s server addr: %s", err.Error(), g.ServerAddr)
}
// 释放所有推流
g.CloseStreams(true, true)
}
-func NewGBPlatform(record *SIPUAParams, ua SipServer) (*GBPlatform, error) {
+func NewPlatform(record *SIPUAOptions, ua SipServer) (*Platform, error) {
if len(record.SeverID) != 20 {
return nil, fmt.Errorf("SeverID must be exactly 20 characters long")
}
@@ -201,6 +166,6 @@ func NewGBPlatform(record *SIPUAParams, ua SipServer) (*GBPlatform, error) {
return nil, err
}
- gbClient := NewGBClient(record, ua)
- return &GBPlatform{Client: gbClient.(*Client), sinks: make(map[string]StreamID, 8)}, nil
+ client := NewGBClient(record, ua)
+ return &Platform{gbClient: client.(*gbClient), sinks: make(map[string]StreamID, 8)}, nil
}
diff --git a/platform_manager.go b/platform_manager.go
deleted file mode 100644
index d8b4a21..0000000
--- a/platform_manager.go
+++ /dev/null
@@ -1,119 +0,0 @@
-package main
-
-import (
- "fmt"
- "sync"
-)
-
-var (
- PlatformManager = &platformManager{
- addrMap: make(map[string]*GBPlatform, 8),
- }
-)
-
-type platformManager struct {
- addrMap map[string]*GBPlatform //上级地址->平台
- lock sync.RWMutex
-}
-
-func (p *platformManager) Add(platform *GBPlatform) bool {
- p.lock.Lock()
- defer p.lock.Unlock()
-
- if _, ok := p.addrMap[platform.sipClient.ServerAddr]; ok {
- return false
- }
-
- p.addrMap[platform.sipClient.ServerAddr] = platform
- return true
-}
-
-func (p *platformManager) Find(addr string) *GBPlatform {
- p.lock.RLock()
- defer p.lock.RUnlock()
- if platform, ok := p.addrMap[addr]; ok {
- return platform
- }
- return nil
-}
-
-func (p *platformManager) Remove(addr string) *GBPlatform {
- p.lock.Lock()
- defer p.lock.Unlock()
-
- platform, ok := p.addrMap[addr]
- if !ok {
- return nil
- }
-
- delete(p.addrMap, addr)
- return platform
-}
-func (p *platformManager) Platforms() []*GBPlatform {
- p.lock.RLock()
- defer p.lock.RUnlock()
-
- platforms := make([]*GBPlatform, 0, len(p.addrMap))
- for _, platform := range p.addrMap {
- platforms = append(platforms, platform)
- }
-
- return platforms
-}
-
-func AddPlatform(platform *GBPlatform) error {
- ok := PlatformManager.Add(platform)
- if !ok {
- return fmt.Errorf("平台添加失败, 地址冲突. addr: %s", platform.sipClient.ServerAddr)
- }
-
- err := PlatformDao.SavePlatform(&platform.SIPUAParams)
- if err != nil {
- PlatformManager.Remove(platform.sipClient.ServerAddr)
- return fmt.Errorf("平台保存到数据库失败, err: %s", err.Error())
- }
-
- return nil
-}
-
-func RemovePlatform(addr string) (*GBPlatform, error) {
- err := PlatformDao.DeletePlatform(addr)
- if err != nil {
- return nil, err
- }
-
- platform := PlatformManager.Remove(addr)
- return platform, nil
-}
-
-func LoadPlatforms() []*SIPUAParams {
- platforms := PlatformManager.Platforms()
- params := make([]*SIPUAParams, 0, len(platforms))
- for _, platform := range platforms {
- params = append(params, &platform.SIPUAParams)
- }
-
- return params
-}
-
-func QueryPlatform(add string) *GBPlatform {
- return PlatformManager.Find(add)
-}
-
-func UpdatePlatformStatus(addr string, status OnlineStatus) error {
- platform := PlatformManager.Find(addr)
- if platform == nil {
- return fmt.Errorf("平台不存在. addr: %s", addr)
- }
-
- //old := platform.Device.Status
- platform.Device.Status = status
-
- err := PlatformDao.UpdatePlatformStatus(addr, status)
- // platform.Device.Status = old
- if err != nil {
- return err
- }
-
- return nil
-}
diff --git a/position.go b/position.go
index 73c46c5..dd9afa5 100644
--- a/position.go
+++ b/position.go
@@ -47,7 +47,7 @@ func (d *Device) DoSubscribePosition(channelId string) error {
event := Event("Catalog;id=2")
request.AppendHeader(&event)
- response, err := SipUA.SendRequestWithTimeout(5, request)
+ response, err := SipStack.SendRequestWithTimeout(5, request)
if err != nil {
return err
}
diff --git a/recover.go b/recover.go
index ee5167c..44baf46 100644
--- a/recover.go
+++ b/recover.go
@@ -14,34 +14,40 @@ func startPlatformDevices() {
}
for _, record := range platforms {
- platform, err := NewGBPlatform(record, SipUA)
+ platform, err := NewPlatform(&record.SIPUAOptions, SipStack)
// 都入库了不允许失败, 程序有BUG, 及时修复
utils.Assert(err == nil)
- utils.Assert(PlatformManager.Add(platform))
+ utils.Assert(PlatformManager.Add(platform.ServerAddr, platform))
- if err := PlatformDao.UpdatePlatformStatus(record.ServerAddr, OFF); err != nil {
+ if err := PlatformDao.UpdateOnlineStatus(OFF, record.ServerAddr); err != nil {
Sugar.Infof("更新级联设备状态失败 err: %s device: %s", err.Error(), record.SeverID)
}
- // 恢复级联会话
- // 不删会话能正常通信
- //for _, stream := range streams {
- // sinks := stream.GetForwardStreamSinks()
- // for _, sink := range sinks {
- // if sink.DeviceID != record.SeverID {
- // continue
- // }
- //
- // callId, _ := sink.Dialog.CallID()
- // channelCallId, _ := stream.Dialog.CallID()
- // platform.addSink(callId.Value(), channelCallId.Value())
- // }
- //}
-
platform.Start()
}
}
+// 启动1078设备
+func startJTDevices() {
+ devices, err := JTDeviceDao.LoadDevices()
+ if err != nil {
+ Sugar.Errorf("查询1078设备失败 err: %s", err.Error())
+ return
+ }
+
+ for _, record := range devices {
+ // 都入库了不允许失败, 程序有BUG, 及时修复
+ device, err := NewJTDevice(record, SipStack)
+ utils.Assert(err == nil)
+ utils.Assert(JTDeviceManager.Add(device.Username, device))
+
+ if err := JTDeviceDao.UpdateOnlineStatus(OFF, device.Username); err != nil {
+ Sugar.Infof("更新1078设备状态失败 err: %s device: %s", err.Error(), record.SeverID)
+ }
+ device.Start()
+ }
+}
+
// 返回需要关闭的推流源和转流Sink
func recoverStreams() (map[string]*Stream, map[string]*Sink) {
// 比较数据库和流媒体服务器中的流会话, 以流媒体服务器中的为准, 释放过期的会话
@@ -55,10 +61,10 @@ func recoverStreams() (map[string]*Stream, map[string]*Sink) {
dbSinks, _ := SinkDao.LoadForwardSinks()
// 查询流媒体服务器中的推流源列表
- msSources, err := QuerySourceList()
+ msSources, err := MSQuerySourceList()
if err != nil {
// 流媒体服务器崩了, 存在的所有记录都无效, 全部删除
- Sugar.Warnf("恢复推流失败, 查询推流源列表发生错误, 删除数据库中的所有记录. err: %s", err.Error())
+ Sugar.Warnf("恢复推流失败, 查询推流源列表发生错误, 删除所有推流记录. err: %s", err.Error())
}
// 查询推流源下所有的转发sink列表
@@ -70,7 +76,7 @@ func recoverStreams() (map[string]*Stream, map[string]*Sink) {
}
// 查询转发sink
- sinks, err := QuerySinkList(source.ID)
+ sinks, err := MSQuerySinkList(source.ID)
if err != nil {
Sugar.Warnf("查询拉流列表发生 err: %s", err.Error())
continue
diff --git a/sink.go b/sink.go
index bc02c73..7fa3566 100644
--- a/sink.go
+++ b/sink.go
@@ -6,18 +6,18 @@ import (
"github.com/ghettovoice/gosip/sip/parser"
)
-// Sink 国标级联转发流
+// Sink 级联/对讲/网关转发流Sink
type Sink struct {
GBModel
SinkID string `json:"sink_id"` // 流媒体服务器中的sink id
StreamID StreamID `json:"stream_id"` // 推流ID
SinkStreamID StreamID `json:"sink_stream_id"` // 广播使用, 每个广播设备的唯一ID
- Protocol string `json:"protocol,omitempty"` // 转发流协议, gb_cascaded_forward/gb_talk_forward
+ Protocol string `json:"protocol,omitempty"` // 转发流协议, gb_cascaded/gb_talk/gb_gateway
Dialog *RequestWrapper `json:"dialog,omitempty"`
CallID string `json:"call_id,omitempty"`
ServerAddr string `json:"server_addr,omitempty"` // 级联上级地址
CreateTime int64 `json:"create_time"`
- SetupType SetupType // 转发类型
+ SetupType SetupType // 流转发类型
}
// Close 关闭级联会话. 是否向上级发送bye请求, 是否通知流媒体服务器发送删除sink
@@ -28,7 +28,7 @@ func (s *Sink) Close(bye, ms bool) {
}
if ms {
- go CloseSink(string(s.StreamID), s.SinkID)
+ go MSCloseSink(string(s.StreamID), s.SinkID)
}
}
@@ -51,7 +51,7 @@ func (s *Sink) MarshalJSON() ([]byte, error) {
func (s *Sink) Bye() {
if s.Dialog != nil && s.Dialog.Request != nil {
byeRequest := CreateRequestFromDialog(s.Dialog.Request, sip.BYE)
- go SipUA.SendRequest(byeRequest)
+ go SipStack.SendRequest(byeRequest)
}
}
diff --git a/sink_manager.go b/sink_manager.go
index 56c5f00..af77906 100644
--- a/sink_manager.go
+++ b/sink_manager.go
@@ -1,12 +1,50 @@
package main
-func AddForwardSink(StreamID StreamID, sink *Sink) bool {
- if err := SinkDao.SaveForwardSink(StreamID, sink); err != nil {
- Sugar.Errorf("保存sink到数据库失败, stream: %s sink: %s err: %s", StreamID, sink.SinkID, err.Error())
- return false
+import (
+ "github.com/ghettovoice/gosip/sip"
+ "net/http"
+ "net/url"
+)
+
+func AddForwardSink(forwardType int, request sip.Request, user string, sink *Sink, streamId StreamID, gbSdp *GBSDP, inviteType InviteType, attrs ...string) (sip.Response, error) {
+ urlParams := make(url.Values)
+ if TransStreamGBTalk == forwardType {
+ urlParams.Add("forward_type", "broadcast")
+ } else if TransStreamGBCascaded == forwardType {
+ urlParams.Add("forward_type", "cascaded")
+ } else if TransStreamGBGateway == forwardType {
+ urlParams.Add("forward_type", "gateway_1078")
}
- return true
+ ip, port, sinkID, err := MSAddForwardSink(forwardType, string(streamId), gbSdp.connectionAddr, gbSdp.offerSetup.String(), gbSdp.answerSetup.String(), gbSdp.ssrc, string(inviteType), urlParams)
+ if err != nil {
+ Sugar.Errorf("处理上级Invite失败,向流媒体服务添加转发Sink失败 err: %s", err.Error())
+ if InviteTypePlay != inviteType {
+ CloseStream(streamId, true)
+ }
+
+ return nil, err
+ }
+
+ sink.SinkID = sinkID
+ // 创建answer
+ answer := BuildSDP(gbSdp.mediaType, user, gbSdp.sdp.Session, ip, port, gbSdp.startTime, gbSdp.stopTime, gbSdp.answerSetup.String(), gbSdp.speed, gbSdp.ssrc, attrs...)
+ response := CreateResponseWithStatusCode(request, http.StatusOK)
+
+ // answer添加contact头域
+ response.RemoveHeader("Contact")
+ response.AppendHeader(GlobalContactAddress.AsContactHeader())
+ response.AppendHeader(&SDPMessageType)
+ response.SetBody(answer, true)
+ setToTag(response)
+
+ sink.SetDialog(CreateDialogRequestFromAnswer(response, true, request.Source()))
+
+ if err = SinkDao.SaveForwardSink(streamId, sink); err != nil {
+ Sugar.Errorf("保存sink到数据库失败, stream: %s sink: %s err: %s", streamId, sink.SinkID, err.Error())
+ }
+
+ return response, nil
}
func RemoveForwardSink(StreamID StreamID, sinkID string) *Sink {
diff --git a/sip_handler.go b/sip_handler.go
index 00f9bab..645735b 100644
--- a/sip_handler.go
+++ b/sip_handler.go
@@ -104,6 +104,14 @@ func (e *EventHandler) OnCatalog(device string, response *CatalogResponse) {
}
}
+func GetTypeCode(id string) string {
+ if len(id) != 20 {
+ return ""
+ }
+
+ return id[10:13]
+}
+
func (e *EventHandler) OnRecord(device string, response *QueryRecordInfoResponse) {
event := SNManager.FindEvent(response.SN)
if event == nil {
diff --git a/sip_server.go b/sip_server.go
index 6fefdc4..bec4e66 100644
--- a/sip_server.go
+++ b/sip_server.go
@@ -64,6 +64,13 @@ type sipServer struct {
handler EventHandler
}
+type SipRequestSource struct {
+ req sip.Request
+ tx sip.ServerTransaction
+ fromCascade bool
+ fromJt bool
+}
+
func (s *sipServer) Send(msg sip.Message) error {
return s.sip.Send(msg)
}
@@ -74,39 +81,39 @@ func setToTag(response sip.Message) {
to.Params = sip.NewParams().Add("tag", sip.String{Str: util.RandString(10)})
}
-func (s *sipServer) OnRegister(req sip.Request, tx sip.ServerTransaction, parent bool) {
+func (s *sipServer) OnRegister(wrapper *SipRequestSource) {
var device GBDevice
var queryCatalog bool
- fromHeaders := req.GetHeaders("From")
+ fromHeaders := wrapper.req.GetHeaders("From")
if len(fromHeaders) == 0 {
- Sugar.Errorf("not find From header. message: %s", req.String())
+ Sugar.Errorf("not find From header. message: %s", wrapper.req.String())
return
}
- _ = req.GetHeaders("Authorization")
+ _ = wrapper.req.GetHeaders("Authorization")
fromHeader := fromHeaders[0].(*sip.FromHeader)
- expiresHeader := req.GetHeaders("Expires")
+ expiresHeader := wrapper.req.GetHeaders("Expires")
- response := sip.NewResponseFromRequest("", req, 200, "OK", "")
+ response := sip.NewResponseFromRequest("", wrapper.req, 200, "OK", "")
id := fromHeader.Address.User().String()
if len(expiresHeader) > 0 && "0" == expiresHeader[0].Value() {
Sugar.Infof("设备注销 Device: %s", id)
s.handler.OnUnregister(id)
} else /*if authorizationHeader == nil*/ {
var expires int
- expires, device, queryCatalog = s.handler.OnRegister(id, req.Transport(), req.Source())
+ expires, device, queryCatalog = s.handler.OnRegister(id, wrapper.req.Transport(), wrapper.req.Source())
if device != nil {
- Sugar.Infof("注册成功 Device: %s addr: %s", id, req.Source())
+ Sugar.Infof("注册成功 Device: %s addr: %s", id, wrapper.req.Source())
expiresHeader := sip.Expires(expires)
response.AppendHeader(&expiresHeader)
} else {
Sugar.Infof("注册失败 Device: %s", id)
- response = sip.NewResponseFromRequest("", req, 401, "Unauthorized", "")
+ response = sip.NewResponseFromRequest("", wrapper.req, 401, "Unauthorized", "")
}
}
- SendResponse(tx, response)
+ SendResponse(wrapper.tx, response)
if device != nil {
// 查询设备信息
@@ -119,9 +126,9 @@ func (s *sipServer) OnRegister(req sip.Request, tx sip.ServerTransaction, parent
}
// OnInvite 收到上级预览/下级设备广播请求
-func (s *sipServer) OnInvite(req sip.Request, tx sip.ServerTransaction, parent bool) {
- SendResponse(tx, sip.NewResponseFromRequest("", req, 100, "Trying", ""))
- user := req.Recipient().User().String()
+func (s *sipServer) OnInvite(wrapper *SipRequestSource) {
+ SendResponse(wrapper.tx, sip.NewResponseFromRequest("", wrapper.req, 100, "Trying", ""))
+ user := wrapper.req.Recipient().User().String()
//if len(user) != 20 {
// SendResponseWithStatusCode(req, tx, http.StatusNotFound)
@@ -130,43 +137,52 @@ func (s *sipServer) OnInvite(req sip.Request, tx sip.ServerTransaction, parent b
// 查找对应的设备
var device GBDevice
- if parent {
+ if wrapper.fromCascade {
// 级联设备
- device = PlatformManager.Find(req.Source())
- } else if session := Dialogs.Find(user); session != nil {
- // 语音广播设备
- device, _ = DeviceDao.QueryDevice(session.data.(*Sink).SinkStreamID.DeviceID())
+ device = PlatformManager.Find(wrapper.req.Source())
+ } else if wrapper.fromJt {
+ // 部标设备
+ // 1. 根据通道查找到对应的设备ID
+ // 2. 根据Subject头域查找对应的设备ID
+ if channels, _ := ChannelDao.QueryChannelsByChannelID(user); len(channels) > 0 {
+ device = JTDeviceManager.Find(channels[0].RootID)
+ }
} else {
- // 根据Subject头域查找设备
- headers := req.GetHeaders("Subject")
- if len(headers) > 0 {
- subject := headers[0].(*sip.GenericHeader)
- split := strings.Split(strings.Split(subject.Value(), ",")[0], ":")
- if len(split) > 1 {
- device, _ = DeviceDao.QueryDevice(split[1])
+ if session := EarlyDialogs.Find(user); session != nil {
+ // 语音广播设备
+ device, _ = DeviceDao.QueryDevice(session.data.(*Sink).SinkStreamID.DeviceID())
+ } else {
+ // 根据Subject头域查找设备
+ headers := wrapper.req.GetHeaders("Subject")
+ if len(headers) > 0 {
+ subject := headers[0].(*sip.GenericHeader)
+ split := strings.Split(strings.Split(subject.Value(), ",")[0], ":")
+ if len(split) > 1 {
+ device, _ = DeviceDao.QueryDevice(split[1])
+ }
}
}
}
if device == nil {
- logger.Error("处理Invite失败, 找不到设备. request: %s", req.String())
+ logger.Error("处理Invite失败, 找不到设备. request: %s", wrapper.req.String())
- SendResponseWithStatusCode(req, tx, http.StatusNotFound)
+ SendResponseWithStatusCode(wrapper.req, wrapper.tx, http.StatusNotFound)
} else {
- response := device.OnInvite(req, user)
- SendResponse(tx, response)
+ response := device.OnInvite(wrapper.req, user)
+ SendResponse(wrapper.tx, response)
}
}
-func (s *sipServer) OnAck(req sip.Request, tx sip.ServerTransaction, parent bool) {
+func (s *sipServer) OnAck(wrapper *SipRequestSource) {
}
-func (s *sipServer) OnBye(req sip.Request, tx sip.ServerTransaction, parent bool) {
- response := sip.NewResponseFromRequest("", req, 200, "OK", "")
- SendResponse(tx, response)
+func (s *sipServer) OnBye(wrapper *SipRequestSource) {
+ response := sip.NewResponseFromRequest("", wrapper.req, 200, "OK", "")
+ SendResponse(wrapper.tx, response)
- id, _ := req.CallID()
+ id, _ := wrapper.req.CallID()
var deviceId string
if stream, _ := StreamDao.DeleteStreamByCallID(id.Value()); stream != nil {
@@ -177,48 +193,53 @@ func (s *sipServer) OnBye(req sip.Request, tx sip.ServerTransaction, parent bool
sink.Close(false, true)
}
- if parent {
- // 上级设备挂断
- if platform := PlatformManager.Find(req.Source()); platform != nil {
- platform.OnBye(req)
+ if wrapper.fromCascade {
+ // 级联上级挂断
+ if platform := PlatformManager.Find(wrapper.req.Source()); platform != nil {
+ platform.OnBye(wrapper.req)
+ }
+ } else if wrapper.fromJt {
+ // 部标设备挂断
+ if jtDevice := JTDeviceManager.Find(deviceId); jtDevice != nil {
+ jtDevice.OnBye(wrapper.req)
}
} else if device, _ := DeviceDao.QueryDevice(deviceId); device != nil {
- device.OnBye(req)
+ device.OnBye(wrapper.req)
}
}
-func (s *sipServer) OnNotify(req sip.Request, tx sip.ServerTransaction, parent bool) {
- response := sip.NewResponseFromRequest("", req, 200, "OK", "")
- SendResponse(tx, response)
+func (s *sipServer) OnNotify(wrapper *SipRequestSource) {
+ response := sip.NewResponseFromRequest("", wrapper.req, 200, "OK", "")
+ SendResponse(wrapper.tx, response)
mobilePosition := MobilePositionNotify{}
- if err := DecodeXML([]byte(req.Body()), &mobilePosition); err != nil {
- Sugar.Errorf("解析位置通知失败 err: %s request: %s", err.Error(), req.String())
+ if err := DecodeXML([]byte(wrapper.req.Body()), &mobilePosition); err != nil {
+ Sugar.Errorf("解析位置通知失败 err: %s request: %s", err.Error(), wrapper.req.String())
return
}
s.handler.OnNotifyPosition(&mobilePosition)
}
-func (s *sipServer) OnMessage(req sip.Request, tx sip.ServerTransaction, parent bool) {
+func (s *sipServer) OnMessage(wrapper *SipRequestSource) {
var ok bool
defer func() {
var response sip.Response
if ok {
- response = CreateResponseWithStatusCode(req, http.StatusOK)
+ response = CreateResponseWithStatusCode(wrapper.req, http.StatusOK)
} else {
- response = CreateResponseWithStatusCode(req, http.StatusForbidden)
+ response = CreateResponseWithStatusCode(wrapper.req, http.StatusForbidden)
}
- SendResponse(tx, response)
+ SendResponse(wrapper.tx, response)
}()
- body := req.Body()
+ body := wrapper.req.Body()
xmlName := GetRootElementName(body)
cmd := GetCmdType(body)
src, ok := s.xmlReflectTypes[xmlName+"."+cmd]
if !ok {
- Sugar.Errorf("处理XML消息失败, 找不到结构体. request: %s", req.String())
+ Sugar.Errorf("处理XML消息失败, 找不到结构体. request: %s", wrapper.req.String())
return
}
@@ -232,7 +253,7 @@ func (s *sipServer) OnMessage(req sip.Request, tx sip.ServerTransaction, parent
deviceId := message.(BaseMessageGetter).GetDeviceID()
if CmdBroadcast == cmd {
// 广播消息
- from, _ := req.From()
+ from, _ := wrapper.req.From()
deviceId = from.Address.User().String()
}
@@ -241,9 +262,15 @@ func (s *sipServer) OnMessage(req sip.Request, tx sip.ServerTransaction, parent
break
case XmlNameQuery:
// 被上级查询
- device := PlatformManager.Find(req.Source())
+ var device GBClient
+ if wrapper.fromCascade {
+ device = PlatformManager.Find(wrapper.req.Source())
+ } else if wrapper.fromJt {
+ device = JTDeviceManager.Find(deviceId)
+ }
+
if ok = device != nil; !ok {
- Sugar.Errorf("处理上级请求消息失败, 找不到级联设备 addr: %s request: %s", req.Source(), req.String())
+ Sugar.Errorf("处理上级请求消息失败, 找不到级联设备 addr: %s request: %s", wrapper.req.Source(), wrapper.req.String())
return
}
@@ -253,13 +280,15 @@ func (s *sipServer) OnMessage(req sip.Request, tx sip.ServerTransaction, parent
var channels []*Channel
// 查询出所有通道
- if PlatformDao != nil {
- result, err := PlatformDao.QueryPlatformChannels(device.ServerAddr)
+ if wrapper.fromCascade {
+ result, err := PlatformDao.QueryPlatformChannels(device.GetDomain())
if err != nil {
Sugar.Errorf("查询设备通道列表失败 err: %s device: %s", err.Error(), device.GetID())
}
channels = result
+ } else if wrapper.fromJt {
+ channels, _ = ChannelDao.QueryChannelsByRootID(device.GetID())
} else {
// 从模拟多个国标客户端中查找
channels = DeviceChannelsManager.FindChannels(device.GetID())
@@ -272,7 +301,7 @@ func (s *sipServer) OnMessage(req sip.Request, tx sip.ServerTransaction, parent
case XmlNameNotify:
if CmdKeepalive == cmd {
// 下级设备心跳通知
- ok = s.handler.OnKeepAlive(deviceId, req.Source())
+ ok = s.handler.OnKeepAlive(deviceId, wrapper.req.Source())
}
break
@@ -332,22 +361,28 @@ func (s *sipServer) ListenAddr() string {
}
// 过滤SIP消息、超找消息来源
-func filterRequest(f func(req sip.Request, tx sip.ServerTransaction, parent bool)) gosip.RequestHandler {
+func filterRequest(f func(wrapper *SipRequestSource)) gosip.RequestHandler {
return func(req sip.Request, tx sip.ServerTransaction) {
source := req.Source()
+ // 是否是级联上级下发的请求
platform := PlatformManager.Find(source)
+ // 是否是部标设备上级下发的请求
+ var fromJt bool
+ if platform == nil {
+ fromJt = JTDeviceManager.ExistClientByServerAddr(req.Source())
+ }
switch req.Method() {
case sip.SUBSCRIBE, sip.INFO:
- if platform == nil {
- // SUBSCRIBE/INFO只能上级发起
+ if platform == nil || fromJt {
+ // SUBSCRIBE/INFO只能本级域向下级发起
SendResponseWithStatusCode(req, tx, http.StatusBadRequest)
Sugar.Errorf("处理%s请求失败, %s消息只能上级发起. request: %s", req.Method(), req.Method(), req.String())
return
}
break
case sip.NOTIFY, sip.REGISTER:
- if platform != nil {
+ if platform != nil || fromJt {
// NOTIFY和REGISTER只能下级发起
SendResponseWithStatusCode(req, tx, http.StatusBadRequest)
Sugar.Errorf("处理%s请求失败, %s消息只能下级发起. request: %s", req.Method(), req.Method(), req.String())
@@ -356,13 +391,19 @@ func filterRequest(f func(req sip.Request, tx sip.ServerTransaction, parent bool
break
}
- f(req, tx, platform != nil)
+ f(&SipRequestSource{
+ req,
+ tx,
+ platform != nil,
+ fromJt,
+ })
}
}
func StartSipServer(id, listenIP, publicIP string, listenPort int) (SipServer, error) {
ua := gosip.NewServer(gosip.ServerConfig{
- Host: publicIP,
+ Host: publicIP,
+ UserAgent: "github/lkmio",
}, nil, nil, logger)
addr := net.JoinHostPort(listenIP, strconv.Itoa(listenPort))
@@ -392,11 +433,11 @@ func StartSipServer(id, listenIP, publicIP string, listenPort int) (SipServer, e
utils.Assert(ua.OnRequest(sip.NOTIFY, filterRequest(server.OnNotify)) == nil)
utils.Assert(ua.OnRequest(sip.MESSAGE, filterRequest(server.OnMessage)) == nil)
- utils.Assert(ua.OnRequest(sip.INFO, filterRequest(func(req sip.Request, tx sip.ServerTransaction, parent bool) {
+ utils.Assert(ua.OnRequest(sip.INFO, filterRequest(func(wrapper *SipRequestSource) {
})) == nil)
- utils.Assert(ua.OnRequest(sip.CANCEL, filterRequest(func(req sip.Request, tx sip.ServerTransaction, parent bool) {
+ utils.Assert(ua.OnRequest(sip.CANCEL, filterRequest(func(wrapper *SipRequestSource) {
})) == nil)
- utils.Assert(ua.OnRequest(sip.SUBSCRIBE, filterRequest(func(req sip.Request, tx sip.ServerTransaction, parent bool) {
+ utils.Assert(ua.OnRequest(sip.SUBSCRIBE, filterRequest(func(wrapper *SipRequestSource) {
})) == nil)
server.listenAddr = addr
diff --git a/sip_client.go b/sip_ua.go
similarity index 87%
rename from sip_client.go
rename to sip_ua.go
index cf8de60..579f01e 100644
--- a/sip_client.go
+++ b/sip_ua.go
@@ -25,7 +25,7 @@ var (
UnregisterExpiresHeader = sip.Expires(0)
)
-type SipClient interface {
+type SIPUA interface {
doRegister(request sip.Request) bool
doUnregister()
@@ -37,10 +37,12 @@ type SipClient interface {
Stop()
SetOnRegisterHandler(online, offline func())
+
+ GetDomain() string
}
-type SIPUAParams struct {
- GBModel
+type SIPUAOptions struct {
+ Name string `json:"name"` // display name, 国标DeviceInfo消息中的Name
Username string `json:"username"` // 用户名
SeverID string `json:"server_id"` // 上级ID, 必选. 作为主键, 不能重复.
ServerAddr string `json:"server_addr"` // 上级地址, 必选
@@ -51,17 +53,13 @@ type SIPUAParams struct {
Status OnlineStatus `json:"status"` // 在线状态
}
-func (g *SIPUAParams) TableName() string {
- return "lkm_virtual_device"
-}
-
-type sipClient struct {
- SIPUAParams
+type sipUA struct {
+ SIPUAOptions
ListenAddr string //UA的监听地址
NatAddr string //Nat地址
- ua SipServer
+ stack SipServer
exited bool
ctx context.Context
cancel context.CancelFunc
@@ -74,7 +72,7 @@ type sipClient struct {
offlineCB func()
}
-func (g *sipClient) doRegister(request sip.Request) bool {
+func (g *sipUA) doRegister(request sip.Request) bool {
hop, _ := request.ViaHop()
empty := sip.String{}
hop.Params.Add("rport", &empty)
@@ -82,7 +80,7 @@ func (g *sipClient) doRegister(request sip.Request) bool {
for i := 0; i < 2; i++ {
//发起注册, 第一次未携带授权头, 第二次携带授权头
- clientTransaction := g.ua.SendRequest(request)
+ clientTransaction := g.stack.SendRequest(request)
//等待响应
responses := clientTransaction.Responses()
@@ -118,7 +116,7 @@ func (g *sipClient) doRegister(request sip.Request) bool {
return false
}
-func (g *sipClient) startNewRegister() bool {
+func (g *sipUA) startNewRegister() bool {
builder := NewRequestBuilder(sip.REGISTER, g.Username, g.ListenAddr, g.SeverID, g.ServerAddr, g.Transport)
expires := sip.Expires(g.RegisterExpires)
builder.SetExpires(&expires)
@@ -159,30 +157,30 @@ func CopySipRequest(old sip.Request) sip.Request {
return request
}
-func (g *sipClient) refreshRegister() bool {
+func (g *sipUA) refreshRegister() bool {
request := CopySipRequest(g.registerOKRequest)
return g.doRegister(request)
}
-func (g *sipClient) doUnregister() {
+func (g *sipUA) doUnregister() {
request := CopySipRequest(g.registerOKRequest)
request.RemoveHeader("Expires")
request.AppendHeader(&UnregisterExpiresHeader)
- g.ua.SendRequest(request)
+ g.stack.SendRequest(request)
if g.offlineCB != nil {
go g.offlineCB()
}
}
-func (g *sipClient) doKeepalive() bool {
+func (g *sipUA) doKeepalive() bool {
body := fmt.Sprintf(KeepAliveBody, time.Now().UnixMilli()/1000, g.Username)
request, err := BuildMessageRequest(g.Username, g.ListenAddr, g.SeverID, g.ServerAddr, g.Transport, body)
if err != nil {
panic(err)
}
- transaction := g.ua.SendRequest(request)
+ transaction := g.stack.SendRequest(request)
responses := transaction.Responses()
var response sip.Response
@@ -197,7 +195,7 @@ func (g *sipClient) doKeepalive() bool {
}
// IsExpires 是否临近注册有效期
-func (g *sipClient) IsExpires() (bool, int) {
+func (g *sipUA) IsExpires() (bool, int) {
if !g.registerOK {
return false, 0
}
@@ -207,7 +205,7 @@ func (g *sipClient) IsExpires() (bool, int) {
}
// Refresh 处理Client的生命周期任务, 发起注册, 发送心跳,断开重连等, 并返回下次刷新任务时间
-func (g *sipClient) Refresh() time.Duration {
+func (g *sipUA) Refresh() time.Duration {
expires, _ := g.IsExpires()
if !g.registerOK || expires {
@@ -256,7 +254,7 @@ func (g *sipClient) Refresh() time.Duration {
return time.Duration(g.KeepaliveInterval) * time.Second
}
-func (g *sipClient) Start() {
+func (g *sipUA) Start() {
utils.Assert(!g.exited)
g.ctx, g.cancel = context.WithCancel(context.Background())
@@ -284,21 +282,24 @@ func (g *sipClient) Start() {
}()
}
-func (g *sipClient) Stop() {
+func (g *sipUA) Stop() {
utils.Assert(!g.exited)
+ if g.registerOK {
+ g.doUnregister()
+ }
g.exited = true
g.cancel()
g.registerOK = false
g.onlineCB = nil
g.offlineCB = nil
-
- if g.registerOK {
- g.doUnregister()
- }
}
-func (g *sipClient) SetOnRegisterHandler(online, offline func()) {
+func (g *sipUA) SetOnRegisterHandler(online, offline func()) {
g.onlineCB = online
g.offlineCB = offline
}
+
+func (g *sipUA) GetDomain() string {
+ return g.ServerAddr
+}
diff --git a/stream.go b/stream.go
index 7244418..ef047f5 100644
--- a/stream.go
+++ b/stream.go
@@ -34,6 +34,15 @@ func (s SetupType) String() string {
panic("invalid setup type")
}
+func (s SetupType) MediaProtocol() string {
+ switch s {
+ case SetupTypePassive, SetupTypeActive:
+ return "TCP/RTP/AVP"
+ default:
+ return "RTP/AVP"
+ }
+}
+
// RequestWrapper sql序列化
type RequestWrapper struct {
sip.Request
@@ -71,7 +80,7 @@ func (r *RequestWrapper) Scan(value interface{}) error {
type Stream struct {
GBModel
StreamID StreamID `json:"stream_id"` // 流ID
- Protocol string `json:"protocol,omitempty"` // 推流协议, rtmp/28181/1078/gb_talk
+ Protocol int `json:"protocol,omitempty"` // 推流协议, rtmp/28181/1078/gb_talk
Dialog *RequestWrapper `json:"dialog,omitempty"` // 国标流的SipCall会话
SinkCount int32 `json:"sink_count"` // 拉流端计数(包含级联转发)
SetupType SetupType
@@ -158,7 +167,7 @@ func (s *Stream) Close(bye, ms bool) {
if ms {
// 告知媒体服务释放source
- go CloseSource(string(s.StreamID))
+ go MSCloseSource(string(s.StreamID))
}
// 关闭所转发会话
@@ -170,7 +179,7 @@ func (s *Stream) Close(bye, ms bool) {
func (s *Stream) Bye() {
if s.Dialog != nil && s.Dialog.Request != nil {
- go SipUA.SendRequest(s.CreateRequestFromDialog(sip.BYE))
+ go SipStack.SendRequest(s.CreateRequestFromDialog(sip.BYE))
s.Dialog = nil
}
}
diff --git a/xml.go b/xml.go
index 2cb6496..5c31bce 100644
--- a/xml.go
+++ b/xml.go
@@ -17,8 +17,8 @@ type Channel struct {
GBModel
// RootID 是设备的根ID, 用于查询设备的所有通道.
- RootID string `json:"-" xml:"-" gorm:"index"` // 根设备ID
- TypeCode int `json:"-" xml:"-" gorm:"index"` // 设备类型编码
+ RootID string `json:"root_id" xml:"-" gorm:"index"` // 根设备ID
+ TypeCode int `json:"-" xml:"-" gorm:"index"` // 设备类型编码
// 所在组ID. 扩展的数据库字段, 方便查询某个目录下的设备列表.
// 如果ParentID不为空, ParentID作为组ID, 如果ParentID为空, BusinessGroupID作为组ID.
@@ -49,6 +49,7 @@ type Channel struct {
Longitude string `json:"longitude" xml:"Longitude,omitempty"`
Latitude string `json:"latitude" xml:"Latitude,omitempty"`
SetupType SetupType `json:"setup_type,omitempty"`
+ ChannelNumber int `json:"channel_number" xml:"-"` // 对应1078的通道号
}
func (d *Channel) Online() bool {
diff --git a/xml_record.go b/xml_record.go
index 15ef228..ab1f84e 100644
--- a/xml_record.go
+++ b/xml_record.go
@@ -63,6 +63,6 @@ type RecordInfo struct {
func (d *Device) DoQueryRecordList(channelId, startTime, endTime string, sn int, type_ string) error {
body := fmt.Sprintf(QueryRecordFormat, sn, channelId, startTime, endTime, type_)
request := d.BuildMessageRequest(channelId, body)
- SipUA.SendRequest(request)
+ SipStack.SendRequest(request)
return nil
}