Extend sfu-ws with data-channel broadcasting

Relates to #534
This commit is contained in:
imalic3 2019-03-21 23:11:33 +07:00 committed by Sean DuBois
parent f174986c74
commit 768536cade
4 changed files with 87 additions and 2 deletions

View File

@ -100,6 +100,7 @@ Check out the **[contributing wiki](https://github.com/pions/webrtc/wiki/Contrib
* [mxmCherry](https://github.com/mxmCherry)
* [Alex Browne](https://github.com/albrow) - *JavaScript/Wasm bindings*
* [adwpc](https://github.com/adwpc) - *SFU example with websocket*
* [imalic3](https://github.com/imalic3) - *SFU websocket example with datachannel broadcast*
### License
MIT License - see [LICENSE](LICENSE) for full text

View File

@ -0,0 +1,47 @@
package main
import (
"sync"
"github.com/pions/webrtc"
)
type BroadcastHub struct {
broadcastChannel chan []byte
listenChannels map[*uint16]*webrtc.DataChannel
dataMutex *sync.RWMutex
}
func newHub() *BroadcastHub {
hub := &BroadcastHub{
broadcastChannel: make(chan []byte),
listenChannels: make(map[*uint16]*webrtc.DataChannel),
dataMutex: new(sync.RWMutex),
}
go hub.run()
return hub
}
func (h *BroadcastHub) addListener(d *webrtc.DataChannel) {
h.dataMutex.Lock()
h.listenChannels[d.ID()] = d
h.dataMutex.Unlock()
}
func (h *BroadcastHub) run() {
for {
select {
case message := <-h.broadcastChannel:
h.dataMutex.RLock()
channels := h.listenChannels
h.dataMutex.RUnlock()
for _, client := range channels {
if err := client.SendText(string(message)); err != nil {
h.dataMutex.Lock()
delete(h.listenChannels, client.ID())
h.dataMutex.Unlock()
}
}
}
}
}

View File

@ -40,6 +40,9 @@ var (
// Websocket upgrader
upgrader = websocket.Upgrader{}
// Broadcast channels
broadcastHub = newHub()
)
const (
@ -131,12 +134,25 @@ func room(w http.ResponseWriter, r *http.Request) {
// Send server sdp to publisher
checkError(c.WriteMessage(mt, []byte(answer.SDP)))
// Register incoming channel
pubReceiver.OnDataChannel(func(d *webrtc.DataChannel) {
d.OnMessage(func(msg webrtc.DataChannelMessage) {
// Broadcast the data to subSenders
broadcastHub.broadcastChannel <- msg.Data
})
})
} else {
// Create a new PeerConnection
subSender, err := api.NewPeerConnection(peerConnectionConfig)
checkError(err)
// Register data channel creation handling
subSender.OnDataChannel(func(d *webrtc.DataChannel) {
broadcastHub.addListener(d)
})
// Waiting for publisher track finish
for {
videoTrackLock.RLock()

View File

@ -16,7 +16,7 @@
<body>
<video id="video1" width="320" height="240" autoplay muted controls></video> <br />
<input id="msginput" type="text" style="width: 320px;height: 24px;" placeholder="typing here..." onkeydown="sendMessage(this)"><br />
<button class="sessbtn" onclick="window.createSession(true)">Publish</button>
<button class="sessbtn" onclick="window.createSession(false)">Subscribe</button>
@ -35,6 +35,7 @@ var log = msg => {
var sock = null;
var wsuri = "wss://" + location.host + "/ws";
var dataChannel = null;
window.onload = function() {
sock = new WebSocket(wsuri);
sock.onopen = function() {
@ -55,6 +56,20 @@ window.onload = function() {
}
};
window.sendMessage = element => {
if(event.key === 'Enter') {
let message = element.value
if (dataChannel === null){
return;
}
if (message === ''){
return alert('Message must not be empty')
}
dataChannel.send(message)
element.value = ''
}
}
window.createSession = isPublisher => {
let pc = new RTCPeerConnection({
iceServers: [
@ -75,7 +90,10 @@ window.createSession = isPublisher => {
if (isPublisher) {
navigator.mediaDevices.getUserMedia({ video: true, audio: true})
.then(stream => pc.addStream(document.getElementById('video1').srcObject = stream))
.then(stream => {
pc.addStream(document.getElementById('video1').srcObject = stream)
dataChannel = pc.createDataChannel('data')
})
.catch(log)
pc.onnegotiationneeded = e => {
<!-- console.log("Publisher createOffer") -->
@ -85,6 +103,9 @@ window.createSession = isPublisher => {
}
} else {
<!-- console.log("Subcriber createOffer") -->
document.getElementById('msginput').style = 'display: none'
dataChannel = pc.createDataChannel('data')
dataChannel.onmessage = e => log(`receive data from '${dataChannel.label}' payload '${e.data}'`)
pc.createOffer({ offerToReceiveVideo: true , offerToReceiveAudio: true})
.then(d => pc.setLocalDescription(d))
.catch(log)