mirror of
https://github.com/lkmio/lkm.git
synced 2026-04-22 16:17:05 +08:00
731 lines
23 KiB
Go
731 lines
23 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"fmt"
|
|
"github.com/gorilla/mux"
|
|
"github.com/gorilla/websocket"
|
|
"github.com/lkmio/avformat/utils"
|
|
"github.com/lkmio/lkm/flv"
|
|
"github.com/lkmio/lkm/gb28181"
|
|
"github.com/lkmio/lkm/hls"
|
|
"github.com/lkmio/lkm/log"
|
|
"github.com/lkmio/lkm/rtc"
|
|
"github.com/lkmio/lkm/stream"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
type ApiServer struct {
|
|
upgrader *websocket.Upgrader
|
|
router *mux.Router
|
|
}
|
|
|
|
var apiServer *ApiServer
|
|
|
|
func init() {
|
|
apiServer = &ApiServer{
|
|
upgrader: &websocket.Upgrader{
|
|
CheckOrigin: func(r *http.Request) bool {
|
|
return true
|
|
},
|
|
},
|
|
|
|
router: mux.NewRouter(),
|
|
}
|
|
}
|
|
|
|
func filterSourceID(f func(sourceId string, w http.ResponseWriter, req *http.Request), suffix string) func(http.ResponseWriter, *http.Request) {
|
|
return func(w http.ResponseWriter, req *http.Request) {
|
|
source, err := stream.Path2SourceID(req.URL.Path, suffix)
|
|
if err != nil {
|
|
log.Sugar.Errorf("拉流失败 解析流id发生err: %s path: %s", err.Error(), req.URL.Path)
|
|
httpResponse(w, http.StatusBadRequest, err.Error())
|
|
return
|
|
}
|
|
|
|
f(source, w, req)
|
|
}
|
|
}
|
|
|
|
type IDS struct {
|
|
// 内部SinkID可能是uint64或者string类型, 但外部传参均使用string类型,程序内部自行兼容ipv6.
|
|
Sink string `json:"sink"`
|
|
Source string `json:"source"`
|
|
}
|
|
|
|
func withJsonParams[T any](f func(params T, w http.ResponseWriter, req *http.Request), params interface{}) func(http.ResponseWriter, *http.Request) {
|
|
return func(w http.ResponseWriter, req *http.Request) {
|
|
newParams := new(T)
|
|
if err := HttpDecodeJSONBody(w, req, newParams); err != nil {
|
|
log.Sugar.Errorf("处理http请求失败 err: %s path: %s", err.Error(), req.URL.Path)
|
|
httpResponseError(w, err.Error())
|
|
return
|
|
}
|
|
|
|
f(*newParams, w, req)
|
|
}
|
|
}
|
|
|
|
func startApiServer(addr string) {
|
|
/**
|
|
http://host:port/xxx.flv
|
|
http://host:port/xxx.rtc
|
|
http://host:port/xxx.m3u8
|
|
http://host:port/xxx_0.ts
|
|
ws://host:port/xxx.flv
|
|
*/
|
|
|
|
apiServer.router.Use(func(handler http.Handler) http.Handler {
|
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
// 添加 CORS 头以解决跨域问题
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS, PUT, DELETE")
|
|
w.Header().Set("Access-Control-Allow-Headers", "*")
|
|
|
|
// 如果是OPTIONS请求,直接返回
|
|
if r.Method == "OPTIONS" {
|
|
w.WriteHeader(http.StatusOK)
|
|
return
|
|
}
|
|
|
|
handler.ServeHTTP(w, r)
|
|
})
|
|
})
|
|
|
|
// 点播, 映射录制资源
|
|
// 放在最前面, 避免被后面的路由拦截
|
|
apiServer.router.PathPrefix("/record/").Handler(http.StripPrefix("/record/", http.FileServer(http.Dir(stream.AppConfig.Record.Dir))))
|
|
|
|
// {source}.flv和/{source}/{stream}.flv意味着, 推流id(路径)只能嵌套一层
|
|
apiServer.router.HandleFunc("/{source}.flv", filterSourceID(apiServer.onFlv, ".flv"))
|
|
apiServer.router.HandleFunc("/{source}/{stream}.flv", filterSourceID(apiServer.onFlv, ".flv"))
|
|
|
|
if stream.AppConfig.Hls.Enable {
|
|
apiServer.router.HandleFunc("/{source}.m3u8", filterSourceID(apiServer.onHLS, ".m3u8"))
|
|
apiServer.router.HandleFunc("/{source}/{stream}.m3u8", filterSourceID(apiServer.onHLS, ".m3u8"))
|
|
apiServer.router.HandleFunc("/{source}.ts", filterSourceID(apiServer.onTS, ".ts"))
|
|
apiServer.router.HandleFunc("/{source}/{stream}.ts", filterSourceID(apiServer.onTS, ".ts"))
|
|
}
|
|
|
|
if stream.AppConfig.WebRtc.Enable {
|
|
apiServer.router.HandleFunc("/{source}.rtc", filterSourceID(apiServer.onRtc, ".rtc"))
|
|
apiServer.router.HandleFunc("/{source}/{stream}.rtc", filterSourceID(apiServer.onRtc, ".rtc"))
|
|
}
|
|
|
|
apiServer.router.HandleFunc("/api/v1/source/list", apiServer.OnSourceList) // 查询所有推流源
|
|
apiServer.router.HandleFunc("/api/v1/source/close", withJsonParams(apiServer.OnSourceClose, &IDS{})) // 关闭推流源
|
|
apiServer.router.HandleFunc("/api/v1/sink/list", withJsonParams(apiServer.OnSinkList, &IDS{})) // 查询某个推流源下,所有的拉流端列表
|
|
apiServer.router.HandleFunc("/api/v1/sink/close", withJsonParams(apiServer.OnSinkClose, &IDS{})) // 关闭拉流端
|
|
apiServer.router.HandleFunc("/api/v1/sink/add", withJsonParams(apiServer.OnSinkAdd, &GBOffer{})) // 级联/广播/JT转GB
|
|
apiServer.router.HandleFunc("/api/v1/record/start", apiServer.OnRecordStart) // 开启录制
|
|
apiServer.router.HandleFunc("/api/v1/record/stop", apiServer.OnRecordStop) // 关闭录制
|
|
|
|
apiServer.router.HandleFunc("/api/v1/streams/statistics", nil) // 统计所有推拉流
|
|
|
|
if stream.AppConfig.GB28181.Enable {
|
|
apiServer.router.HandleFunc("/ws/v1/gb28181/talk", apiServer.OnGBTalk) // 对讲的主讲人WebSocket连接
|
|
apiServer.router.HandleFunc("/api/v1/control/ws-talk/{device}/{channel}", apiServer.OnLiveGBSTalk) // livegbs一对一对讲
|
|
apiServer.router.HandleFunc("/api/v1/gb28181/source/create", withJsonParams(apiServer.OnGBOfferCreate, &SourceSDP{})) // 创建国标源
|
|
apiServer.router.HandleFunc("/api/v1/gb28181/answer/set", withJsonParams(apiServer.OnGBSourceConnect, &SourceSDP{})) // 设置应答sdp, 如果是active模式拉流, 设置对方的地址. 下载文件设置文件大小
|
|
apiServer.router.HandleFunc("/api/v1/gb28181/speed/set", withJsonParams(apiServer.OnGBSpeedSet, &SourceSDP{}))
|
|
}
|
|
|
|
apiServer.router.HandleFunc("/api/v1/gc/force", func(writer http.ResponseWriter, request *http.Request) {
|
|
runtime.GC()
|
|
})
|
|
|
|
apiServer.router.HandleFunc("/api/v1/stream/info", apiServer.OnStreamInfo)
|
|
|
|
apiServer.router.PathPrefix("/web/").Handler(http.StripPrefix("/web/", http.FileServer(http.Dir("./web"))))
|
|
|
|
http.Handle("/", apiServer.router)
|
|
|
|
srv := &http.Server{
|
|
Handler: apiServer.router,
|
|
Addr: addr,
|
|
// Good practice: enforce timeouts for servers you create!
|
|
WriteTimeout: 30 * time.Second,
|
|
ReadTimeout: 30 * time.Second,
|
|
}
|
|
|
|
err := srv.ListenAndServe()
|
|
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func (api *ApiServer) generateSinkID(_ string) stream.SinkID {
|
|
return utils.RandStringBytes(18)
|
|
}
|
|
|
|
func (api *ApiServer) onFlv(sourceId string, w http.ResponseWriter, r *http.Request) {
|
|
// 区分ws请求
|
|
ws := true
|
|
if !("upgrade" == strings.ToLower(r.Header.Get("Connection"))) {
|
|
ws = false
|
|
} else if !("websocket" == strings.ToLower(r.Header.Get("Upgrade"))) {
|
|
ws = false
|
|
} else if !("13" == r.Header.Get("Sec-Websocket-Version")) {
|
|
ws = false
|
|
}
|
|
|
|
if ws {
|
|
apiServer.onWSFlv(sourceId, w, r)
|
|
} else {
|
|
apiServer.onHttpFLV(sourceId, w, r)
|
|
}
|
|
}
|
|
|
|
func (api *ApiServer) onWSFlv(sourceId string, w http.ResponseWriter, r *http.Request) {
|
|
conn, err := api.upgrader.Upgrade(w, r, nil)
|
|
if err != nil {
|
|
log.Sugar.Errorf("ws拉流失败 source: %s err: %s", sourceId, err.Error())
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
sink := flv.NewFLVSink(api.generateSinkID(r.RemoteAddr), sourceId, flv.NewWSConn(conn))
|
|
ok := stream.SubscribeStream(sink, r.URL.Query())
|
|
if utils.HookStateOK != ok {
|
|
log.Sugar.Warnf("ws-flv 拉流失败 source: %s sink: %s", sourceId, sink.String())
|
|
_ = conn.Close()
|
|
} else {
|
|
log.Sugar.Infof("ws-flv 拉流成功 source: %s sink: %s", sourceId, sink.String())
|
|
}
|
|
|
|
netConn := conn.NetConn()
|
|
bytes := make([]byte, 64)
|
|
for {
|
|
if _, err := netConn.Read(bytes); err != nil {
|
|
log.Sugar.Infof("ws-flv 断开连接 source: %s sink:%s", sourceId, sink.String())
|
|
sink.Close()
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (api *ApiServer) onHttpFLV(sourceId string, w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "video/x-flv")
|
|
w.Header().Set("Connection", "Keep-Alive")
|
|
w.Header().Set("Transfer-Encoding", "chunked")
|
|
|
|
var conn net.Conn
|
|
if hj, ok := w.(http.Hijacker); !ok {
|
|
log.Sugar.Errorf("http-flv 拉流失败 不支持hijacking. source: %s remote: %s", sourceId, r.RemoteAddr)
|
|
http.Error(w, "webserver doesn't support hijacking", http.StatusInternalServerError)
|
|
return
|
|
} else {
|
|
w.WriteHeader(http.StatusOK)
|
|
var err error
|
|
if conn, _, err = hj.Hijack(); err != nil {
|
|
log.Sugar.Errorf("http-flv 拉流失败 source: %s remote: %s err: %s", sourceId, r.RemoteAddr, err.Error())
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
}
|
|
|
|
sink := flv.NewFLVSink(api.generateSinkID(r.RemoteAddr), sourceId, conn)
|
|
ok := stream.SubscribeStream(sink, r.URL.Query())
|
|
if utils.HookStateOK != ok {
|
|
log.Sugar.Warnf("http-flv 拉流失败 source: %s sink: %s", sourceId, sink.String())
|
|
sink.Close()
|
|
} else {
|
|
log.Sugar.Infof("http-flv 拉流成功 source: %s sink: %s", sourceId, sink.String())
|
|
}
|
|
|
|
bytes := make([]byte, 64)
|
|
for {
|
|
if _, err := conn.Read(bytes); err != nil {
|
|
log.Sugar.Infof("http-flv 断开连接 sink:%s", sink.String())
|
|
sink.Close()
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (api *ApiServer) onTS(source string, w http.ResponseWriter, r *http.Request) {
|
|
sid := r.URL.Query().Get(hls.SessionIDKey)
|
|
var sink stream.Sink
|
|
if sid != "" {
|
|
sink = hls.SinkManager.Find(stream.SinkID(sid))
|
|
}
|
|
if sink == nil {
|
|
log.Sugar.Errorf("hls session with id '%s' has expired.", sid)
|
|
w.WriteHeader(http.StatusForbidden)
|
|
return
|
|
}
|
|
|
|
index := strings.LastIndex(source, "_")
|
|
if index < 0 || index == len(source)-1 {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
seq := source[index+1:]
|
|
tsPath := stream.AppConfig.Hls.TSPath(sink.GetSourceID(), seq)
|
|
if _, err := os.Stat(tsPath); err != nil {
|
|
w.WriteHeader(http.StatusNotFound)
|
|
return
|
|
}
|
|
|
|
sink.(*hls.M3U8Sink).RefreshPlayingTime()
|
|
w.Header().Set("Content-Type", "video/MP2T")
|
|
http.ServeFile(w, r, tsPath)
|
|
}
|
|
|
|
func (api *ApiServer) onHLS(source string, w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "application/vnd.apple.mpegurl")
|
|
|
|
// 如果没有携带会话ID, 认为是首次拉流. Server将生成会话ID, 应答给拉流端, 后续拉流请求(.M3U8和.TS的HTTP请求)都将携带该会话ID.
|
|
// 会话ID的Key为"hls_sid", 为避免冲突, 播放端和hook server不要再使用, 否则会一直拉流失败.
|
|
sid := r.URL.Query().Get(hls.SessionIDKey)
|
|
if sid == "" {
|
|
sid = utils.RandStringBytes(10)
|
|
|
|
query := r.URL.Query()
|
|
query.Add(hls.SessionIDKey, sid)
|
|
path := fmt.Sprintf("/%s.m3u8?%s", source, query.Encode())
|
|
|
|
response := "#EXTM3U\r\n" +
|
|
"#EXT-X-STREAM-INF:BANDWIDTH=1,AVERAGE-BANDWIDTH=1\r\n" +
|
|
path + "\r\n"
|
|
w.Write([]byte(response))
|
|
return
|
|
}
|
|
|
|
sink := hls.SinkManager.Find(sid)
|
|
if sink == nil {
|
|
// 创建sink
|
|
sink = hls.NewM3U8Sink(sid, source, sid)
|
|
sink.(*hls.M3U8Sink).RefreshPlayingTime()
|
|
|
|
if hls.SinkManager.Add(sink) {
|
|
ok := stream.SubscribeStream(sink, r.URL.Query())
|
|
if utils.HookStateOK != ok {
|
|
log.Sugar.Warnf("m3u8拉流失败 source: %s sink: %s", source, sink.String())
|
|
_ = hls.SinkManager.Remove(sink.GetID())
|
|
w.WriteHeader(http.StatusForbidden)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// 更新最近的M3U8文件
|
|
playlist := sink.(*hls.M3U8Sink).GetPlaylist(nil)
|
|
if playlist == "" {
|
|
if playlist = sink.(*hls.M3U8Sink).GetPlaylist(r.Context()); playlist == "" {
|
|
log.Sugar.Warnf("hls拉流失败 未能生成有效m3u8文件 sink: %s source: %s", sink.GetID(), sink.GetSourceID())
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
}
|
|
|
|
w.Write([]byte(playlist))
|
|
}
|
|
|
|
func (api *ApiServer) onRtc(sourceId string, w http.ResponseWriter, r *http.Request) {
|
|
v := struct {
|
|
Type string `json:"type"`
|
|
SDP string `json:"sdp"`
|
|
}{}
|
|
|
|
data, err := io.ReadAll(r.Body)
|
|
var liveGBSWF bool
|
|
|
|
if err != nil {
|
|
log.Sugar.Errorf("rtc拉流失败 err: %s remote: %s", err.Error(), r.RemoteAddr)
|
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
return
|
|
} else if liveGBSWF = "livegbs" == r.URL.Query().Get("wf"); liveGBSWF {
|
|
// 兼容livegbs前端播放webrtc
|
|
offer, err := base64.StdEncoding.DecodeString(string(data))
|
|
if err != nil {
|
|
log.Sugar.Errorf("rtc拉流失败 err: %s remote: %s", err.Error(), r.RemoteAddr)
|
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
v.Type = "offer"
|
|
v.SDP = string(offer)
|
|
} else if err := json.Unmarshal(data, &v); err != nil {
|
|
log.Sugar.Errorf("rtc拉流失败 err: %s remote: %s", err.Error(), r.RemoteAddr)
|
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
done := make(chan struct{})
|
|
sink := rtc.NewSink(api.generateSinkID(r.RemoteAddr), sourceId, v.SDP, func(sdp string) {
|
|
response := struct {
|
|
Type string `json:"type"`
|
|
SDP string `json:"sdp"`
|
|
}{
|
|
Type: "answer",
|
|
SDP: sdp,
|
|
}
|
|
|
|
var body []byte
|
|
body, err = json.Marshal(response)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if liveGBSWF {
|
|
body = []byte(base64.StdEncoding.EncodeToString([]byte(sdp)))
|
|
} else {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
}
|
|
|
|
w.Write(body)
|
|
close(done)
|
|
})
|
|
|
|
log.Sugar.Infof("rtc拉流请求 source: %s sink: %s sdp:%q", sourceId, sink.String(), v.SDP)
|
|
|
|
ok := stream.SubscribeStream(sink, r.URL.Query())
|
|
if utils.HookStateOK != ok {
|
|
log.Sugar.Warnf("rtc拉流失败 source: %s sink: %s", sourceId, sink.String())
|
|
w.WriteHeader(http.StatusForbidden)
|
|
return
|
|
}
|
|
|
|
select {
|
|
case <-r.Context().Done():
|
|
log.Sugar.Infof("rtc拉流请求取消 source: %s sink: %s", sourceId, stream.SinkID2String(sink.GetID()))
|
|
sink.Close()
|
|
break
|
|
case <-done:
|
|
break
|
|
}
|
|
}
|
|
|
|
func (api *ApiServer) OnSourceList(w http.ResponseWriter, r *http.Request) {
|
|
sources := stream.SourceManager.All()
|
|
|
|
type SourceDetails struct {
|
|
ID string `json:"id"`
|
|
Protocol string `json:"protocol"` // 推流协议
|
|
Time time.Time `json:"time"` // 推流时间
|
|
SinkCount int `json:"sink_count"` // 播放端计数
|
|
Bitrate string `json:"bitrate"` // 码率统计
|
|
Tracks []string `json:"tracks"` // 每路流编码器ID
|
|
Urls []string `json:"urls"` // 拉流地址
|
|
}
|
|
|
|
var details []SourceDetails
|
|
for _, source := range sources {
|
|
var codecs []string
|
|
tracks := source.OriginTracks()
|
|
for _, track := range tracks {
|
|
codecs = append(codecs, track.Stream.CodecID.String())
|
|
}
|
|
|
|
details = append(details, SourceDetails{
|
|
ID: source.GetID(),
|
|
Protocol: source.GetType().String(),
|
|
Time: source.CreateTime(),
|
|
SinkCount: source.GetTransStreamPublisher().SinkCount(),
|
|
Bitrate: strconv.Itoa(source.GetBitrateStatistics().PreviousSecond()/1024) + "KBS", // 后续开发
|
|
Tracks: codecs,
|
|
Urls: stream.GetStreamPlayUrls(source.GetID()),
|
|
})
|
|
}
|
|
|
|
httpResponseOK(w, details)
|
|
}
|
|
|
|
func (api *ApiServer) OnSinkList(v *IDS, w http.ResponseWriter, r *http.Request) {
|
|
source := stream.SourceManager.Find(v.Source)
|
|
if source == nil {
|
|
httpResponseOK(w, nil)
|
|
return
|
|
}
|
|
|
|
type SinkDetails struct {
|
|
ID string `json:"id"`
|
|
Protocol string `json:"protocol"` // 拉流协议
|
|
Time time.Time `json:"time"` // 拉流时间
|
|
Bitrate string `json:"bitrate"` // 码率统计
|
|
Tracks []string `json:"tracks"` // 每路流编码器ID
|
|
}
|
|
|
|
var details []SinkDetails
|
|
sinks := source.GetTransStreamPublisher().Sinks()
|
|
for _, sink := range sinks {
|
|
details = append(details,
|
|
SinkDetails{
|
|
ID: stream.SinkID2String(sink.GetID()),
|
|
Protocol: sink.GetProtocol().String(),
|
|
Time: sink.CreateTime(),
|
|
},
|
|
)
|
|
}
|
|
|
|
httpResponseOK(w, details)
|
|
}
|
|
|
|
func (api *ApiServer) OnSourceClose(v *IDS, w http.ResponseWriter, r *http.Request) {
|
|
log.Sugar.Infof("close source: %v", v.Source)
|
|
|
|
if source := stream.SourceManager.Find(v.Source); source != nil {
|
|
source.Close()
|
|
} else {
|
|
log.Sugar.Warnf("Source with ID %s does not exist.", v.Source)
|
|
}
|
|
|
|
httpResponseOK(w, nil)
|
|
}
|
|
|
|
func (api *ApiServer) OnSinkClose(v *IDS, w http.ResponseWriter, r *http.Request) {
|
|
log.Sugar.Infof("close sink: %v", v)
|
|
|
|
var sinkId stream.SinkID
|
|
i, err := strconv.ParseUint(v.Sink, 10, 64)
|
|
if err != nil {
|
|
sinkId = stream.SinkID(v.Sink)
|
|
} else {
|
|
sinkId = stream.SinkID(i)
|
|
}
|
|
|
|
if source := stream.SourceManager.Find(v.Source); source != nil {
|
|
if sink := source.GetTransStreamPublisher().FindSink(sinkId); sink != nil {
|
|
sink.Close()
|
|
|
|
if sink.GetProtocol() == stream.TransStreamHls {
|
|
_ = hls.SinkManager.Remove(sinkId)
|
|
}
|
|
}
|
|
} else {
|
|
log.Sugar.Warnf("Source with ID %s does not exist.", v.Source)
|
|
}
|
|
|
|
httpResponseOK(w, nil)
|
|
}
|
|
|
|
func (api *ApiServer) OnStreamInfo(w http.ResponseWriter, r *http.Request) {
|
|
id := r.URL.Query().Get("streamid")
|
|
source := stream.SourceManager.Find(id)
|
|
if source == nil || source.IsClosed() {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
httpResponseJson(w, "stream not found")
|
|
return
|
|
} else if !source.IsCompleted() {
|
|
// 在请求结束前, 每隔1秒检查track探测是否完成
|
|
ticker := time.NewTicker(time.Second)
|
|
defer ticker.Stop()
|
|
for !source.IsClosed() && !source.IsCompleted() && r.Context().Err() == nil {
|
|
select {
|
|
case <-ticker.C:
|
|
break
|
|
case <-r.Context().Done():
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
tracks := source.OriginTracks()
|
|
if len(tracks) < 1 {
|
|
return
|
|
}
|
|
|
|
var deviceId string
|
|
var channelId string
|
|
split := strings.Split(id, "/")
|
|
if len(split) < 2 {
|
|
return
|
|
}
|
|
|
|
deviceId = split[0]
|
|
channelId = split[1]
|
|
if len(split[1]) >= 20 {
|
|
channelId = split[1][:20]
|
|
}
|
|
|
|
var transport string
|
|
if stream.SourceType28181 == source.GetType() {
|
|
if gb28181.SetupUDP != source.(gb28181.GBSource).SetupType() {
|
|
transport = "TCP"
|
|
} else {
|
|
transport = "UDP"
|
|
}
|
|
}
|
|
|
|
var token string
|
|
cookie, err := r.Cookie("token")
|
|
if err == nil {
|
|
token = cookie.Value
|
|
}
|
|
|
|
urls := stream.GetStreamPlayUrlsMap(id)
|
|
liveGBSUrls := make(map[string]string)
|
|
for streamName, url := range urls {
|
|
url += "?stream_token=" + token
|
|
|
|
// 兼容livegbs前端播放webrtc
|
|
if streamName == "rtc" {
|
|
if strings.HasPrefix(url, "http") {
|
|
url = strings.Replace(url, "http", "webrtc", 1)
|
|
} else if strings.HasPrefix(url, "https") {
|
|
url = strings.Replace(url, "https", "webrtcs", 1)
|
|
}
|
|
|
|
url += "&wf=livegbs"
|
|
}
|
|
|
|
liveGBSUrls[streamName] = url
|
|
}
|
|
|
|
var recordStartTime string
|
|
if startTime := source.GetTransStreamPublisher().RecordStartTime(); !startTime.IsZero() {
|
|
recordStartTime = startTime.Format("2006-01-02 15:04:05")
|
|
}
|
|
|
|
gbSource := Source2GBSource(source)
|
|
var downloadInfo *DownloadInfo
|
|
if gbSource != nil && InviteTypeDownload == gbSource.GetSessionName() {
|
|
progress := gbSource.GetPlaybackProgress()
|
|
gbSource.GetTransStreamPublisher()
|
|
downloadInfo = &DownloadInfo{
|
|
PlaybackDuration: gbSource.GetDuration(),
|
|
PlaybackSpeed: gbSource.GetSpeed(),
|
|
PlaybackFileSize: gbSource.GetFileSize(),
|
|
PlaybackStartTime: gbSource.GetStartTime(),
|
|
PlaybackEndTime: gbSource.GetEndTime(),
|
|
PlaybackFileURL: gbSource.GetTransStreamPublisher().GetRecordStreamPlayUrl(),
|
|
PlaybackProgress: progress,
|
|
Progress: progress,
|
|
}
|
|
|
|
}
|
|
statistics := source.GetBitrateStatistics()
|
|
response := struct {
|
|
*DownloadInfo
|
|
AudioEnable bool `json:"AudioEnable"`
|
|
CDN string `json:"CDN"`
|
|
CascadeSize int `json:"CascadeSize"`
|
|
ChannelID string `json:"ChannelID"`
|
|
ChannelName string `json:"ChannelName"`
|
|
CloudRecord bool `json:"CloudRecord"`
|
|
DecodeSize int `json:"DecodeSize"`
|
|
DeviceID string `json:"DeviceID"`
|
|
Duration int `json:"Duration"`
|
|
FLV string `json:"FLV"`
|
|
HLS string `json:"HLS"`
|
|
InBitRate int `json:"InBitRate"`
|
|
InBytes int `json:"InBytes"`
|
|
NumOutputs int `json:"NumOutputs"`
|
|
Ondemand bool `json:"Ondemand"`
|
|
OutBytes int `json:"OutBytes"`
|
|
RTMP string `json:"RTMP"`
|
|
RTPCount int `json:"RTPCount"`
|
|
RTPLostCount int `json:"RTPLostCount"`
|
|
RTPLostRate int `json:"RTPLostRate"`
|
|
RTSP string `json:"RTSP"`
|
|
RecordStartAt string `json:"RecordStartAt"` // 录制时间
|
|
RelaySize int `json:"RelaySize"`
|
|
SMSID string `json:"SMSID"`
|
|
SnapURL string `json:"SnapURL"`
|
|
SourceAudioCodecName string `json:"SourceAudioCodecName"`
|
|
SourceAudioSampleRate int `json:"SourceAudioSampleRate"`
|
|
SourceVideoCodecName string `json:"SourceVideoCodecName"`
|
|
SourceVideoFrameRate int `json:"SourceVideoFrameRate"`
|
|
SourceVideoHeight int `json:"SourceVideoHeight"`
|
|
SourceVideoWidth int `json:"SourceVideoWidth"`
|
|
StartAt string `json:"StartAt"`
|
|
StreamID string `json:"StreamID"`
|
|
Transport string `json:"Transport"`
|
|
VideoFrameCount int `json:"VideoFrameCount"`
|
|
WEBRTC string `json:"WEBRTC"`
|
|
WS_FLV string `json:"WS_FLV"`
|
|
}{
|
|
DownloadInfo: downloadInfo,
|
|
AudioEnable: true,
|
|
CDN: "",
|
|
CascadeSize: 0,
|
|
ChannelID: channelId,
|
|
ChannelName: "",
|
|
CloudRecord: false,
|
|
DecodeSize: 0,
|
|
DeviceID: deviceId,
|
|
Duration: int(time.Since(source.CreateTime()).Seconds()),
|
|
FLV: liveGBSUrls["flv"],
|
|
HLS: liveGBSUrls["hls"],
|
|
InBitRate: statistics.PreviousSecond() * 8 / 1024,
|
|
InBytes: int(statistics.Total()),
|
|
NumOutputs: source.GetTransStreamPublisher().SinkCount(),
|
|
Ondemand: true,
|
|
OutBytes: 0,
|
|
RTMP: liveGBSUrls["rtmp"],
|
|
RTPCount: 0,
|
|
RTPLostCount: 0,
|
|
RTPLostRate: 0,
|
|
RTSP: liveGBSUrls["rtsp"],
|
|
RecordStartAt: recordStartTime,
|
|
RelaySize: 0,
|
|
SMSID: "",
|
|
SnapURL: "",
|
|
SourceVideoFrameRate: 0,
|
|
StartAt: source.CreateTime().Format("2006-01-02 15:04:05"),
|
|
StreamID: id,
|
|
Transport: transport,
|
|
VideoFrameCount: 0,
|
|
WEBRTC: liveGBSUrls["rtc"],
|
|
WS_FLV: liveGBSUrls["ws_flv"],
|
|
}
|
|
|
|
for _, track := range tracks {
|
|
if utils.AVMediaTypeAudio == track.Stream.MediaType {
|
|
response.SourceAudioCodecName = track.Stream.CodecID.String()
|
|
response.SourceAudioSampleRate = track.Stream.AudioConfig.SampleRate
|
|
} else if utils.AVMediaTypeVideo == track.Stream.MediaType {
|
|
response.SourceVideoCodecName = track.Stream.CodecID.String()
|
|
// response.SourceVideoFrameRate
|
|
if track.Stream.CodecParameters != nil {
|
|
response.SourceVideoWidth = track.Stream.CodecParameters.Width()
|
|
response.SourceVideoHeight = track.Stream.CodecParameters.Height()
|
|
}
|
|
}
|
|
}
|
|
|
|
httpResponseJson(w, &response)
|
|
}
|
|
|
|
func (api *ApiServer) OnRecordStart(w http.ResponseWriter, req *http.Request) {
|
|
streamId := req.FormValue("streamid")
|
|
source := stream.SourceManager.Find(streamId)
|
|
if source == nil {
|
|
log.Sugar.Errorf("OnRecordStart stream not found streamid %s", streamId)
|
|
w.WriteHeader(http.StatusNotFound)
|
|
} else if ok := source.GetTransStreamPublisher().StartRecord(); !ok {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
} else {
|
|
// 返回拉流地址
|
|
httpResponseJson(w, &struct {
|
|
DownloadURL string `json:"DownloadURL"`
|
|
}{
|
|
DownloadURL: source.GetTransStreamPublisher().GetRecordStreamPlayUrl(),
|
|
})
|
|
}
|
|
|
|
}
|
|
|
|
func (api *ApiServer) OnRecordStop(w http.ResponseWriter, req *http.Request) {
|
|
streamId := req.FormValue("streamid")
|
|
source := stream.SourceManager.Find(streamId)
|
|
if source == nil {
|
|
log.Sugar.Errorf("OnRecordStop stream not found streamid %s", streamId)
|
|
w.WriteHeader(http.StatusNotFound)
|
|
} else if err := source.GetTransStreamPublisher().StopRecord(); err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
httpResponseJson(w, err.Error())
|
|
}
|
|
}
|