diff --git a/ws/event_clientanswer.go b/ws/event_clientanswer.go index 3965aa8..19ea767 100644 --- a/ws/event_clientanswer.go +++ b/ws/event_clientanswer.go @@ -32,7 +32,7 @@ func (e *ClientAnswer) Execute(rooms *Rooms, current ClientInfo) error { return fmt.Errorf("permission denied for session %s", e.SID) } - room.Users[session.Host].Write <- outgoing.ClientAnswer(*e) + room.Users[session.Host].WriteTimeout(outgoing.ClientAnswer(*e)) return nil } diff --git a/ws/event_clientice.go b/ws/event_clientice.go index 4c0ac15..fc76068 100644 --- a/ws/event_clientice.go +++ b/ws/event_clientice.go @@ -32,7 +32,7 @@ func (e *ClientICE) Execute(rooms *Rooms, current ClientInfo) error { return fmt.Errorf("permission denied for session %s", e.SID) } - room.Users[session.Host].Write <- outgoing.ClientICE(*e) + room.Users[session.Host].WriteTimeout(outgoing.ClientICE(*e)) return nil } diff --git a/ws/event_create.go b/ws/event_create.go index 6f1daf2..2c10b5a 100644 --- a/ws/event_create.go +++ b/ws/event_create.go @@ -70,7 +70,7 @@ func (e *Create) Execute(rooms *Rooms, current ClientInfo) error { Streaming: false, Owner: true, Addr: current.Addr, - Write: current.Write, + _write: current.Write, }, }, } diff --git a/ws/event_disconnected.go b/ws/event_disconnected.go index d422050..67b236e 100644 --- a/ws/event_disconnected.go +++ b/ws/event_disconnected.go @@ -20,7 +20,7 @@ func (e *Disconnected) Execute(rooms *Rooms, current ClientInfo) error { func (e *Disconnected) executeNoError(rooms *Rooms, current ClientInfo) { roomID := rooms.connected[current.ID] delete(rooms.connected, current.ID) - current.Write <- outgoing.CloseWriter{Code: e.Code, Reason: e.Reason} + writeTimeout[outgoing.Message](current.Write, outgoing.CloseWriter{Code: e.Code, Reason: e.Reason}) if roomID == "" { return @@ -46,14 +46,14 @@ func (e *Disconnected) executeNoError(rooms *Rooms, current ClientInfo) { if bytes.Equal(session.Client.Bytes(), current.ID.Bytes()) { host, ok := room.Users[session.Host] if ok { - host.Write <- outgoing.EndShare(id) + host.WriteTimeout(outgoing.EndShare(id)) } room.closeSession(rooms, id) } if bytes.Equal(session.Host.Bytes(), current.ID.Bytes()) { client, ok := room.Users[session.Client] if ok { - client.Write <- outgoing.EndShare(id) + client.WriteTimeout(outgoing.EndShare(id)) } room.closeSession(rooms, id) } @@ -62,7 +62,7 @@ func (e *Disconnected) executeNoError(rooms *Rooms, current ClientInfo) { if user.Owner && room.CloseOnOwnerLeave { for _, member := range room.Users { delete(rooms.connected, member.ID) - member.Write <- outgoing.CloseWriter{Code: websocket.CloseNormalClosure, Reason: CloseOwnerLeft} + member.WriteTimeout(outgoing.CloseWriter{Code: websocket.CloseNormalClosure, Reason: CloseOwnerLeft}) } rooms.closeRoom(roomID) return diff --git a/ws/event_health.go b/ws/event_health.go index 1264fd4..bb82c24 100644 --- a/ws/event_health.go +++ b/ws/event_health.go @@ -5,6 +5,6 @@ type Health struct { } func (e *Health) Execute(rooms *Rooms, current ClientInfo) error { - e.Response <- len(rooms.connected) + writeTimeout(e.Response, len(rooms.connected)) return nil } diff --git a/ws/event_hostice.go b/ws/event_hostice.go index cf3b54d..93189be 100644 --- a/ws/event_hostice.go +++ b/ws/event_hostice.go @@ -32,7 +32,7 @@ func (e *HostICE) Execute(rooms *Rooms, current ClientInfo) error { return fmt.Errorf("permission denied for session %s", e.SID) } - room.Users[session.Client].Write <- outgoing.HostICE(*e) + room.Users[session.Client].WriteTimeout(outgoing.HostICE(*e)) return nil } diff --git a/ws/event_hostoffer.go b/ws/event_hostoffer.go index 1bff55e..36a86a4 100644 --- a/ws/event_hostoffer.go +++ b/ws/event_hostoffer.go @@ -32,7 +32,7 @@ func (e *HostOffer) Execute(rooms *Rooms, current ClientInfo) error { return fmt.Errorf("permission denied for session %s", e.SID) } - room.Users[session.Client].Write <- outgoing.HostOffer(*e) + room.Users[session.Client].WriteTimeout(outgoing.HostOffer(*e)) return nil } diff --git a/ws/event_join.go b/ws/event_join.go index ef75595..8e14f0b 100644 --- a/ws/event_join.go +++ b/ws/event_join.go @@ -38,7 +38,7 @@ func (e *Join) Execute(rooms *Rooms, current ClientInfo) error { Streaming: false, Owner: false, Addr: current.Addr, - Write: current.Write, + _write: current.Write, } rooms.connected[current.ID] = room.ID room.notifyInfoChanged() diff --git a/ws/event_stop_share.go b/ws/event_stop_share.go index 9d6504a..1760c4d 100644 --- a/ws/event_stop_share.go +++ b/ws/event_stop_share.go @@ -25,7 +25,7 @@ func (e *StopShare) Execute(rooms *Rooms, current ClientInfo) error { if bytes.Equal(session.Host.Bytes(), current.ID.Bytes()) { client, ok := room.Users[session.Client] if ok { - client.Write <- outgoing.EndShare(id) + client.WriteTimeout(outgoing.EndShare(id)) } room.closeSession(rooms, id) } diff --git a/ws/room.go b/ws/room.go index 0c8bbaf..758ea01 100644 --- a/ws/room.go +++ b/ws/room.go @@ -4,8 +4,10 @@ import ( "fmt" "net" "sort" + "time" "github.com/rs/xid" + "github.com/rs/zerolog/log" "github.com/screego/server/config" "github.com/screego/server/ws/outgoing" ) @@ -60,8 +62,8 @@ func (r *Room) newSession(host, client xid.ID, rooms *Rooms, v4, v6 net.IP) { Username: clientName, }} } - r.Users[host].Write <- outgoing.HostSession{Peer: client, ID: id, ICEServers: iceHost} - r.Users[client].Write <- outgoing.ClientSession{Peer: host, ID: id, ICEServers: iceClient} + r.Users[host].WriteTimeout(outgoing.HostSession{Peer: client, ID: id, ICEServers: iceHost}) + r.Users[client].WriteTimeout(outgoing.ClientSession{Peer: host, ID: id, ICEServers: iceClient}) } func (r *Rooms) addresses(prefix string, v4, v6 net.IP, tcp bool) (result []string) { @@ -122,10 +124,10 @@ func (r *Room) notifyInfoChanged() { return left.Name < right.Name }) - current.Write <- outgoing.Room{ + current.WriteTimeout(outgoing.Room{ ID: r.ID, Users: users, - } + }) } } @@ -135,5 +137,17 @@ type User struct { Name string Streaming bool Owner bool - Write chan<- outgoing.Message + _write chan<- outgoing.Message +} + +func (u *User) WriteTimeout(msg outgoing.Message) { + writeTimeout(u._write, msg) +} + +func writeTimeout[T any](ch chan<- T, msg T) { + select { + case <-time.After(2 * time.Second): + log.Warn().Interface("event", fmt.Sprintf("%T", msg)).Interface("payload", msg).Msg("Client write loop didn't accept the message.") + case ch <- msg: + } }