Files
plugin-webrtc/main.go
T
2023-05-14 16:24:31 +08:00

259 lines
7.1 KiB
Go

package webrtc
import (
"io/ioutil"
"net"
"net/http"
"regexp"
"time"
"go.uber.org/zap"
"m7s.live/engine/v4"
_ "embed"
"github.com/pion/interceptor"
. "github.com/pion/webrtc/v3"
"m7s.live/engine/v4/config"
"m7s.live/engine/v4/util"
"m7s.live/plugin/webrtc/v4/webrtc"
)
// }{[]string{
// "stun:stun.ekiga.net",
// "stun:stun.ideasip.com",
// "stun:stun.schlund.de",
// "stun:stun.stunprotocol.org:3478",
// "stun:stun.voiparound.com",
// "stun:stun.voipbuster.com",
// "stun:stun.voipstunt.com",
// "stun:stun.voxgratia.org",
// "stun:stun.services.mozilla.com",
// "stun:stun.xten.com",
// "stun:stun.softjoys.com",
// "stun:stunserver.org",
// "stun:stun.schlund.de",
// "stun:stun.rixtelecom.se",
// "stun:stun.iptel.org",
// "stun:stun.ideasip.com",
// "stun:stun.fwdnet.net",
// "stun:stun.ekiga.net",
// "stun:stun01.sipphone.com",
// }}
// type udpConn struct {
// conn *net.UDPConn
// port int
// }
//go:embed publish.html
var publishHTML []byte
//go:embed subscribe.html
var subscribeHTML []byte
var (
reg_level = regexp.MustCompile("profile-level-id=(4.+f)")
)
type WebRTCConfig struct {
config.Publish
config.Subscribe
ICEServers []ICEServer
PublicIP []string
Port string `default:"tcp:9000"`
PLI time.Duration `default:"2s"` // 视频流丢包后,发送PLI请求
m MediaEngine
s SettingEngine
api *API
}
func (conf *WebRTCConfig) OnEvent(event any) {
switch event.(type) {
case engine.FirstConfig:
if len(conf.ICEServers) > 0 {
for i := range conf.ICEServers {
b, _ := conf.ICEServers[i].MarshalJSON()
conf.ICEServers[i].UnmarshalJSON(b)
}
}
webrtc.RegisterCodecs(&conf.m)
i := &interceptor.Registry{}
if len(conf.PublicIP) > 0 {
conf.s.SetNAT1To1IPs(conf.PublicIP, ICECandidateTypeHost)
}
protocol, ports := util.Conf2Listener(conf.Port)
if len(ports) == 0 {
WebRTCPlugin.Fatal("webrtc port config error")
}
if protocol == "tcp" {
tcpport := int(ports[0])
tcpl, err := net.ListenTCP("tcp", &net.TCPAddr{
IP: net.IP{0, 0, 0, 0},
Port: tcpport,
})
if err != nil {
WebRTCPlugin.Fatal("webrtc listener tcp", zap.Error(err))
}
WebRTCPlugin.Info("webrtc start listen", zap.Int("port", tcpport))
conf.s.SetICETCPMux(NewICETCPMux(nil, tcpl, 4096))
conf.s.SetNetworkTypes([]NetworkType{NetworkTypeTCP4, NetworkTypeTCP6})
} else if len(ports) == 2 {
conf.s.SetEphemeralUDPPortRange(ports[0], ports[1])
} else {
// 创建共享WEBRTC端口 默认9000
udpListener, err := net.ListenUDP("udp", &net.UDPAddr{
IP: net.IP{0, 0, 0, 0},
Port: int(ports[0]),
})
if err != nil {
WebRTCPlugin.Fatal("webrtc listener udp", zap.Error(err))
}
WebRTCPlugin.Info("webrtc start listen", zap.Uint16("port", ports[0]))
conf.s.SetICEUDPMux(NewICEUDPMux(nil, udpListener))
conf.s.SetNetworkTypes([]NetworkType{NetworkTypeUDP4, NetworkTypeUDP6})
}
if err := RegisterDefaultInterceptors(&conf.m, i); err != nil {
panic(err)
}
conf.api = NewAPI(WithMediaEngine(&conf.m),
WithInterceptorRegistry(i), WithSettingEngine(conf.s))
}
}
func (conf *WebRTCConfig) Play_(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/sdp")
streamPath := r.URL.Path[len("/play/"):]
bytes, err := ioutil.ReadAll(r.Body)
var suber WebRTCSubscriber
suber.SDP = string(bytes)
if suber.PeerConnection, err = conf.api.NewPeerConnection(Configuration{
ICEServers: conf.ICEServers,
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
suber.OnICECandidate(func(ice *ICECandidate) {
if ice != nil {
suber.Info(ice.ToJSON().Candidate)
}
})
if err = suber.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: suber.SDP}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err = WebRTCPlugin.Subscribe(streamPath, &suber); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if sdp, err := suber.GetAnswer(); err == nil {
w.Write([]byte(sdp))
} else {
http.Error(w, err.Error(), http.StatusBadRequest)
}
}
func (conf *WebRTCConfig) Push_(w http.ResponseWriter, r *http.Request) {
streamPath := r.URL.Path[len("/push/"):]
w.Header().Set("Content-Type", "application/sdp")
bytes, err := ioutil.ReadAll(r.Body)
var puber WebRTCPublisher
puber.SDP = string(bytes)
if puber.PeerConnection, err = conf.api.NewPeerConnection(Configuration{
ICEServers: conf.ICEServers,
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
puber.SetIO(puber.PeerConnection) //TODO: 单PC需要注释掉
puber.OnICECandidate(func(ice *ICECandidate) {
if ice != nil {
puber.Info(ice.ToJSON().Candidate)
}
})
puber.OnDataChannel(func(d *DataChannel) {
puber.Info("OnDataChannel", zap.String("label", d.Label()))
d.OnMessage(func(msg DataChannelMessage) {
puber.SDP = string(msg.Data[1:])
puber.Debug("dc message", zap.String("sdp", puber.SDP))
if err := puber.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: puber.SDP}); err != nil {
return
}
if answer, err := puber.GetAnswer(); err == nil {
d.SendText(answer)
} else {
return
}
switch msg.Data[0] {
case '0':
puber.Stop()
case '1':
}
})
})
// if _, err = puber.AddTransceiverFromKind(RTPCodecTypeVideo); err != nil {
// http.Error(w, err.Error(), http.StatusInternalServerError)
// return
// }
// if _, err = puber.AddTransceiverFromKind(RTPCodecTypeAudio); err != nil {
// http.Error(w, err.Error(), http.StatusInternalServerError)
// return
// }
if err = WebRTCPlugin.Publish(streamPath, &puber); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
puber.OnConnectionStateChange(func(state PeerConnectionState) {
switch state {
case PeerConnectionStateConnected:
case PeerConnectionStateDisconnected, PeerConnectionStateFailed:
puber.Stop()
}
})
if err := puber.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: puber.SDP}); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if answer, err := puber.GetAnswer(); err == nil {
w.Write([]byte(answer))
} else {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
}
func (conf *WebRTCConfig) Test_Publish(w http.ResponseWriter, r *http.Request) {
w.Write(publishHTML)
}
func (conf *WebRTCConfig) Test_Subscribe(w http.ResponseWriter, r *http.Request) {
w.Write(subscribeHTML)
}
var webrtcConfig WebRTCConfig
var WebRTCPlugin = engine.InstallPlugin(&webrtcConfig)
func (conf *WebRTCConfig) Batch(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/sdp")
bytes, err := ioutil.ReadAll(r.Body)
var suber WebRTCBatcher
suber.SDP = string(bytes)
if suber.PeerConnection, err = conf.api.NewPeerConnection(Configuration{
ICEServers: conf.ICEServers,
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err = suber.Start(); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if sdp, err := suber.GetAnswer(); err == nil {
w.Write([]byte(sdp))
} else {
http.Error(w, err.Error(), http.StatusBadRequest)
}
}