From 768536cade750f50534c7e456a69db7f642a579d Mon Sep 17 00:00:00 2001 From: imalic3 Date: Thu, 21 Mar 2019 23:11:33 +0700 Subject: [PATCH] Extend sfu-ws with data-channel broadcasting Relates to #534 --- README.md | 1 + examples/sfu-ws/broadcast_data.go | 47 +++++++++++++++++++++++++++++++ examples/sfu-ws/room.go | 16 +++++++++++ examples/sfu-ws/sfu.html | 25 ++++++++++++++-- 4 files changed, 87 insertions(+), 2 deletions(-) create mode 100644 examples/sfu-ws/broadcast_data.go diff --git a/README.md b/README.md index 8c200100..b775544b 100644 --- a/README.md +++ b/README.md @@ -100,6 +100,7 @@ Check out the **[contributing wiki](https://github.com/pions/webrtc/wiki/Contrib * [mxmCherry](https://github.com/mxmCherry) * [Alex Browne](https://github.com/albrow) - *JavaScript/Wasm bindings* * [adwpc](https://github.com/adwpc) - *SFU example with websocket* +* [imalic3](https://github.com/imalic3) - *SFU websocket example with datachannel broadcast* ### License MIT License - see [LICENSE](LICENSE) for full text diff --git a/examples/sfu-ws/broadcast_data.go b/examples/sfu-ws/broadcast_data.go new file mode 100644 index 00000000..00e0d5f3 --- /dev/null +++ b/examples/sfu-ws/broadcast_data.go @@ -0,0 +1,47 @@ +package main + +import ( + "sync" + + "github.com/pions/webrtc" +) + +type BroadcastHub struct { + broadcastChannel chan []byte + listenChannels map[*uint16]*webrtc.DataChannel + dataMutex *sync.RWMutex +} + +func newHub() *BroadcastHub { + hub := &BroadcastHub{ + broadcastChannel: make(chan []byte), + listenChannels: make(map[*uint16]*webrtc.DataChannel), + dataMutex: new(sync.RWMutex), + } + go hub.run() + return hub +} + +func (h *BroadcastHub) addListener(d *webrtc.DataChannel) { + h.dataMutex.Lock() + h.listenChannels[d.ID()] = d + h.dataMutex.Unlock() +} + +func (h *BroadcastHub) run() { + for { + select { + case message := <-h.broadcastChannel: + h.dataMutex.RLock() + channels := h.listenChannels + h.dataMutex.RUnlock() + for _, client := range channels { + if err := client.SendText(string(message)); err != nil { + h.dataMutex.Lock() + delete(h.listenChannels, client.ID()) + h.dataMutex.Unlock() + } + } + } + } +} diff --git a/examples/sfu-ws/room.go b/examples/sfu-ws/room.go index b20af8ef..edf5971c 100644 --- a/examples/sfu-ws/room.go +++ b/examples/sfu-ws/room.go @@ -40,6 +40,9 @@ var ( // Websocket upgrader upgrader = websocket.Upgrader{} + + // Broadcast channels + broadcastHub = newHub() ) const ( @@ -131,12 +134,25 @@ func room(w http.ResponseWriter, r *http.Request) { // Send server sdp to publisher checkError(c.WriteMessage(mt, []byte(answer.SDP))) + + // Register incoming channel + pubReceiver.OnDataChannel(func(d *webrtc.DataChannel) { + d.OnMessage(func(msg webrtc.DataChannelMessage) { + // Broadcast the data to subSenders + broadcastHub.broadcastChannel <- msg.Data + }) + }) } else { // Create a new PeerConnection subSender, err := api.NewPeerConnection(peerConnectionConfig) checkError(err) + // Register data channel creation handling + subSender.OnDataChannel(func(d *webrtc.DataChannel) { + broadcastHub.addListener(d) + }) + // Waiting for publisher track finish for { videoTrackLock.RLock() diff --git a/examples/sfu-ws/sfu.html b/examples/sfu-ws/sfu.html index ac584f82..19a7eb66 100644 --- a/examples/sfu-ws/sfu.html +++ b/examples/sfu-ws/sfu.html @@ -16,7 +16,7 @@
- +
@@ -35,6 +35,7 @@ var log = msg => { var sock = null; var wsuri = "wss://" + location.host + "/ws"; +var dataChannel = null; window.onload = function() { sock = new WebSocket(wsuri); sock.onopen = function() { @@ -55,6 +56,20 @@ window.onload = function() { } }; +window.sendMessage = element => { + if(event.key === 'Enter') { + let message = element.value + if (dataChannel === null){ + return; + } + if (message === ''){ + return alert('Message must not be empty') + } + dataChannel.send(message) + element.value = '' + } +} + window.createSession = isPublisher => { let pc = new RTCPeerConnection({ iceServers: [ @@ -75,7 +90,10 @@ window.createSession = isPublisher => { if (isPublisher) { navigator.mediaDevices.getUserMedia({ video: true, audio: true}) - .then(stream => pc.addStream(document.getElementById('video1').srcObject = stream)) + .then(stream => { + pc.addStream(document.getElementById('video1').srcObject = stream) + dataChannel = pc.createDataChannel('data') + }) .catch(log) pc.onnegotiationneeded = e => { @@ -85,6 +103,9 @@ window.createSession = isPublisher => { } } else { + document.getElementById('msginput').style = 'display: none' + dataChannel = pc.createDataChannel('data') + dataChannel.onmessage = e => log(`receive data from '${dataChannel.label}' payload '${e.data}'`) pc.createOffer({ offerToReceiveVideo: true , offerToReceiveAudio: true}) .then(d => pc.setLocalDescription(d)) .catch(log)