update servers

This commit is contained in:
sujit
2023-08-29 22:43:44 +05:45
parent c9c655e75a
commit 3d2564af60
16 changed files with 321 additions and 240 deletions
+1 -1
View File
@@ -4,7 +4,7 @@ import (
"net/http"
"sync"
"time"
eiot "github.com/oarkflow/socketio/engineio/transport"
)
+33 -33
View File
@@ -17,7 +17,7 @@ import (
"sort"
"strings"
"time"
eiop "github.com/oarkflow/socketio/engineio/protocol"
eios "github.com/oarkflow/socketio/engineio/session"
eiot "github.com/oarkflow/socketio/engineio/transport"
@@ -36,12 +36,12 @@ type upgradeable struct {
type serverV2 struct {
path *string
allowUpgrades bool
pingTimeout time.Duration
upgradeTimeout time.Duration
maxHttpBufferSize int
// https://socket.io/how-to/deal-with-cookies
cookie struct {
name string
@@ -49,20 +49,20 @@ type serverV2 struct {
httpOnly bool
sameSite http.SameSite
}
transportChanBuf int
initialPackets func(eiot.Transporter, *http.Request)
generateID func() SessionID
codec eiot.Codec
eto []eiot.Option
servers map[EIOVersionStr]server
sessions transportSessions
transports map[TransportName]func(SessionID, eiot.Codec) eiot.Transporter
transportRunError chan error
}
@@ -80,7 +80,7 @@ func (v2 *serverV2) new(opts ...Option) *serverV2 {
v2.maxHttpBufferSize = 10e7
v2.transportChanBuf = 1000
v2.transportRunError = make(chan error, 1)
v2.generateID = eios.GenerateID
v2.codec = eiot.Codec{
PacketEncoder: eiop.NewPacketEncoderV2,
@@ -88,19 +88,19 @@ func (v2 *serverV2) new(opts ...Option) *serverV2 {
PayloadEncoder: eiop.NewPayloadEncoderV2,
PayloadDecoder: eiop.NewPayloadDecoderV2,
}
if v2.servers == nil {
v2.servers = make(map[EIOVersionStr]server)
}
v2.servers[Version2] = v2
v2.sessions = NewSessions()
v2.transports = make(map[TransportName]func(SessionID, eiot.Codec) eiot.Transporter)
WithTransport("polling", eiot.NewPollingTransport(v2.transportChanBuf))(v2)
WithTransport("websocket", eiot.NewWebsocketTransport(v2.transportChanBuf))(v2)
v2.eto = []eiot.Option{eiot.WithGovernor(1500*time.Microsecond, 500*time.Microsecond)}
return v2
}
@@ -131,9 +131,9 @@ func (v2 *serverV2) ServeTransport(w http.ResponseWriter, r *http.Request) (eiot
if v2.path == nil || !strings.HasPrefix(r.URL.Path, *v2.path) {
return nil, ErrInvalidURIPath
}
sessionID := sessionIDFrom(r)
switch r.Method {
case http.MethodGet, http.MethodOptions:
break
@@ -145,28 +145,28 @@ func (v2 *serverV2) ServeTransport(w http.ResponseWriter, r *http.Request) (eiot
default:
return nil, ErrInvalidRequestHTTPMethod
}
eioVersion := eioVersionFrom(r)
server, ok := v2.servers[eioVersion]
if !ok || eioVersion == "" {
return nil, ErrUnknownEIOVersion
}
transportName := transportNameFrom(r)
if _, ok := v2.transports[transportName]; !ok || transportName == "" {
return nil, ErrUnknownTransport
}
ctx := r.Context()
ctx = context.WithValue(ctx, ctxSessionID, sessionID)
ctx = context.WithValue(ctx, ctxTransportName, transportName)
ctx = context.WithValue(ctx, ctxEIOVersion, eioVersion)
transport, err := server.serveTransport(w, r.WithContext(ctx))
if err != nil {
return nil, err
}
return transport, err
}
@@ -179,7 +179,7 @@ func ToEOH(err error) error {
func (v2 *serverV2) serveTransport(w http.ResponseWriter, r *http.Request) (transport eiot.Transporter, err error) {
ctx := r.Context()
if origin := r.Header.Get("Origin"); origin != "" {
if strings.EqualFold(origin, r.URL.Hostname()) {
w.Header().Set("Access-Control-Allow-Origin", r.URL.Host)
@@ -188,39 +188,39 @@ func (v2 *serverV2) serveTransport(w http.ResponseWriter, r *http.Request) (tran
if strings.ToUpper(r.Method) == "OPTIONS" {
return nil, IOR
}
sessionID, _ := ctx.Value(ctxSessionID).(SessionID)
if sessionID == "" {
sessionID = v2.generateID()
transportName := transportNameFrom(r)
transport = v2.transports[transportName](sessionID, v2.codec)
if err := v2.sessions.Set(transport); err != nil {
return nil, err
}
transport.Send(v2.handshakePacket(sessionID, transportName))
if v2.initialPackets != nil {
v2.initialPackets(transport, r)
}
if t, ok := transport.(interface {
Write(http.ResponseWriter, *http.Request) error
}); ok {
transport.Send(eiop.Packet{T: eiop.NoopPacket, D: eiot.WriteClose{}})
ctx = v2.sessions.WithTimeout(ctx, v2.pingTimeout)
ctx = v2.sessions.WithInterval(ctx, v2.pingTimeout-10*time.Millisecond)
return transport, ToEOH(t.Write(w, r.WithContext(ctx)))
}
}
upgrade := v2.doUpgrade(v2.sessions.Get(sessionID))(w, r)
if upgrade.err != nil {
return nil, upgrade.err
}
var opts []eiot.Option
if upgrade.isProbeOnInit {
opts = []eiot.Option{eiot.OnInitProbe(upgrade.isProbeOnInit)}
@@ -228,15 +228,15 @@ func (v2 *serverV2) serveTransport(w http.ResponseWriter, r *http.Request) (tran
if upgrade.upgradeFn != nil {
opts = []eiot.Option{eiot.OnUpgrade(upgrade.upgradeFn)}
}
ctx = v2.sessions.WithTimeout(ctx, v2.pingTimeout*4)
ctx = v2.sessions.WithInterval(ctx, v2.pingTimeout)
opts = append(opts, eiot.WithNoPing())
go func() {
v2.transportRunError <- upgrade.transport.Run(w, r.WithContext(ctx), append(v2.eto, opts...)...)
}()
return upgrade.transport, nil
}
+20 -20
View File
@@ -7,7 +7,7 @@ import (
"net/http"
"strings"
"time"
eiop "github.com/oarkflow/socketio/engineio/protocol"
eiot "github.com/oarkflow/socketio/engineio/transport"
)
@@ -23,7 +23,7 @@ func init() { registry[Version3.Int()] = NewServerV3 }
type serverV3 struct {
*serverV2
pingInterval time.Duration
cors struct { // the options that will be forwarded to the cors module. Defaults to no CORS allowed.
enable bool
@@ -40,30 +40,30 @@ type serverV3 struct {
func NewServerV3(opts ...Option) Server {
v3 := (&serverV3{}).new(opts...)
v3.With(opts...)
return v3
}
func (v3 *serverV3) new(opts ...Option) *serverV3 {
v3.serverV2 = (&serverV2{}).new(opts...)
v3.pingTimeout = 5000 * time.Millisecond
v3.pingInterval = 25000 * time.Millisecond
v3.codec = eiot.Codec{
PacketEncoder: eiop.NewPacketEncoderV3,
PacketDecoder: eiop.NewPacketDecoderV3,
PayloadEncoder: eiop.NewPayloadEncoderV3,
PayloadDecoder: eiop.NewPayloadDecoderV3,
}
v3.cors.enable = true
if v3.servers == nil {
v3.servers = make(map[EIOVersionStr]server)
}
v3.servers[Version3] = v3
return v3
}
@@ -76,7 +76,7 @@ func (v3 *serverV3) With(opts ...Option) {
func (v3 *serverV3) serveTransport(w http.ResponseWriter, r *http.Request) (transport eiot.Transporter, err error) {
ctx := r.Context()
if origin := r.Header.Get("Origin"); origin != "" {
if strings.EqualFold(origin, r.URL.Hostname()) {
w.Header().Set("Access-Control-Allow-Origin", r.URL.Host)
@@ -85,39 +85,39 @@ func (v3 *serverV3) serveTransport(w http.ResponseWriter, r *http.Request) (tran
if strings.ToUpper(r.Method) == "OPTIONS" {
return nil, IOR
}
sessionID := sessionIDFrom(r)
if sessionID == "" {
sessionID = v3.generateID()
transportName := transportNameFrom(r)
transport = v3.transports[transportName](sessionID, v3.codec)
if err := v3.sessions.Set(transport); err != nil {
return nil, err
}
transport.Send(v3.handshakePacket(sessionID, transportName))
if v3.initialPackets != nil {
v3.initialPackets(transport, r)
}
if t, ok := transport.(interface {
Write(http.ResponseWriter, *http.Request) error
}); ok {
ctx = v3.sessions.WithInterval(ctx, v3.pingInterval)
ctx = v3.sessions.WithTimeout(ctx, v3.pingTimeout)
transport.Send(eiop.Packet{T: eiop.NoopPacket, D: eiot.WriteClose{}})
return transport, ToEOH(t.Write(w, r.WithContext(ctx)))
}
}
upgrade := v3.doUpgrade(v3.sessions.Get(sessionID))(w, r)
if upgrade.err != nil {
return nil, upgrade.err
}
var opts []eiot.Option
if upgrade.isProbeOnInit {
opts = []eiot.Option{eiot.OnInitProbe(upgrade.isProbeOnInit)}
@@ -125,14 +125,14 @@ func (v3 *serverV3) serveTransport(w http.ResponseWriter, r *http.Request) (tran
if upgrade.upgradeFn != nil {
opts = []eiot.Option{eiot.OnUpgrade(upgrade.upgradeFn)}
}
ctx = v3.sessions.WithInterval(ctx, v3.pingInterval)
ctx = v3.sessions.WithTimeout(ctx, v3.pingTimeout)
go func() {
v3.transportRunError <- upgrade.transport.Run(w, r.WithContext(ctx), append(v3.eto, opts...)...)
}()
return upgrade.transport, nil
}
+20 -19
View File
@@ -7,7 +7,7 @@ import (
"context"
"net/http"
"strings"
eiop "github.com/oarkflow/socketio/engineio/protocol"
eiot "github.com/oarkflow/socketio/engineio/transport"
)
@@ -21,7 +21,7 @@ func init() { registry[Version4.Int()] = NewServerV4 }
type serverV4 struct {
*serverV3
maxPayload int
UseEIO3 bool
}
@@ -29,25 +29,25 @@ type serverV4 struct {
func NewServerV4(opts ...Option) Server {
v4 := (&serverV4{}).new(opts...)
v4.With(opts...)
return v4
}
func (v4 *serverV4) new(opts ...Option) *serverV4 {
v4.serverV3 = (&serverV3{}).new(opts...)
v4.maxPayload = 100000
v4.maxHttpBufferSize = 1e7
v4.codec = eiot.Codec{
PacketEncoder: eiop.NewPacketEncoderV4,
PacketDecoder: eiop.NewPacketDecoderV4,
PayloadEncoder: eiop.NewPayloadEncoderV4,
PayloadDecoder: eiop.NewPayloadDecoderV4,
}
v4.servers[Version4] = v4
return v4
}
@@ -60,7 +60,7 @@ func (v4 *serverV4) With(opts ...Option) {
func (v4 *serverV4) serveTransport(w http.ResponseWriter, r *http.Request) (transport eiot.Transporter, err error) {
ctx := r.Context()
if origin := r.Header.Get("Origin"); origin != "" {
if strings.EqualFold(origin, r.URL.Hostname()) {
w.Header().Set("Access-Control-Allow-Origin", r.URL.Host)
@@ -69,53 +69,54 @@ func (v4 *serverV4) serveTransport(w http.ResponseWriter, r *http.Request) (tran
if strings.ToUpper(r.Method) == "OPTIONS" {
return nil, IOR
}
sessionID, _ := ctx.Value(ctxSessionID).(SessionID)
if sessionID == "" {
sessionID = v4.generateID()
ctx = context.WithValue(ctx, ctxSessionID, sessionID)
transportName := transportNameFrom(r)
transport = v4.transports[transportName](sessionID, v4.codec)
if err := v4.sessions.Set(transport); err != nil {
return nil, err
}
transport.Send(v4.handshakePacket(sessionID, transportName))
if v4.initialPackets != nil {
v4.initialPackets(transport, r)
}
if t, ok := transport.(interface {
Write(http.ResponseWriter, *http.Request) error
}); ok {
transport.Send(eiop.Packet{T: eiop.NoopPacket, D: eiot.WriteClose{}})
ctx = v4.sessions.WithInterval(ctx, v4.pingInterval)
ctx = v4.sessions.WithTimeout(ctx, v4.pingTimeout)
return transport, ToEOH(t.Write(w, r.WithContext(ctx)))
}
}
upgrade := v4.doUpgrade(v4.sessions.Get(sessionID))(w, r)
if err != nil {
return nil, err
}
var opts []eiot.Option
if upgrade.upgradeFn != nil {
opts = []eiot.Option{eiot.OnUpgrade(upgrade.upgradeFn)}
}
ctx = v4.sessions.WithCancel(ctx)
ctx = v4.sessions.WithInterval(ctx, v4.pingInterval)
ctx = v4.sessions.WithTimeout(ctx, v4.pingTimeout)
ctx = v4.sessions.WithDisconnectOnClose(ctx)
go func() {
v4.transportRunError <- upgrade.transport.Run(w, r.WithContext(ctx), append(v4.eto, opts...)...)
}()
return upgrade.transport, err
}
+3 -2
View File
@@ -9,8 +9,9 @@ const (
SessionIntervalKey sessionCtxKey = "interval"
SessionExtendTimeoutKey sessionCtxKey = "timeout-extend"
SessionCloseChannelKey sessionCtxKey = "cancel-channel"
SessionCloseFunctionKey sessionCtxKey = "cancel-function"
SessionCloseChannelKey sessionCtxKey = "cancel-channel"
SessionCloseFunctionKey sessionCtxKey = "cancel-function"
SessionDisconnectFunctionKey sessionCtxKey = "cancel-disconnect"
)
type (
+43 -22
View File
@@ -5,7 +5,7 @@ import (
"sync"
"sync/atomic"
"time"
eios "github.com/oarkflow/socketio/engineio/session"
eiot "github.com/oarkflow/socketio/engineio/transport"
)
@@ -21,10 +21,11 @@ func storeDuration(addr *time.Duration, val time.Duration) {
type transportSessions interface {
Set(eiot.Transporter) error
Get(SessionID) (eiot.Transporter, error)
WithCancel(ctx context.Context) context.Context
WithTimeout(ctx context.Context, d time.Duration) context.Context
WithInterval(ctx context.Context, d time.Duration) context.Context
WithDisconnectOnClose(ctx context.Context) context.Context
}
type sessions struct {
@@ -44,6 +45,9 @@ func NewSessions() *sessions {
shave: 10 * time.Millisecond,
removeTransport: func(sessionID SessionID) {
tr.ʘ.Lock()
if t, ok := tr.s[sessionID]; ok {
t.SendTimeout()
}
delete(tr.s, sessionID)
tr.ʘ.Unlock()
},
@@ -60,48 +64,65 @@ func (t *transport) Set(tr eiot.Transporter) error {
t.ʘ.Lock()
t.s[tr.ID()] = tr
t.ʘ.Unlock()
return nil
}
func (t *transport) Get(sessionID SessionID) (eiot.Transporter, error) {
t.ʘ.RLock()
defer t.ʘ.RUnlock()
if tr, ok := t.s[sessionID]; ok {
return tr, nil
}
return nil, ErrUnknownSessionID
}
type lifecycle struct {
id, td, shave time.Duration
t *sync.Map
i *sync.Map
cancel *sync.Map
removeTransport func(SessionID)
}
func (c *lifecycle) WithDisconnectOnClose(ctx context.Context) context.Context {
sessionID, ok := ctx.Value(ctxSessionID).(SessionID)
if !ok {
// there is no session to attach the context to
return ctx
}
ctx = context.WithValue(ctx, eios.SessionDisconnectFunctionKey, func() func() {
return func() {
c.removeSession(sessionID)
if c.removeTransport != nil {
c.removeTransport(sessionID)
}
}
})
return ctx
}
func (c *lifecycle) WithCancel(ctx context.Context) context.Context {
sessionID, ok := ctx.Value(ctxSessionID).(SessionID)
if !ok {
// there is no session to attach the timer to
return ctx
}
var chanPrefix = "chan:done:"
c.cancel.LoadOrStore(sessionID.PrefixID(chanPrefix), make(chan func(), 1))
ctx = context.WithValue(ctx, eios.SessionCloseChannelKey, func() <-chan func() {
if ch, ok := c.cancel.Load(sessionID.PrefixID(chanPrefix)); ok {
return ch.(chan func())
}
return nil
})
// Cancel will wait for another connections to close before closing this connection.
// As of now this requires all of the sessions to be on a single server, by using
// sticky sessions, otherwise this may not work as expected.
@@ -129,16 +150,16 @@ func (c *lifecycle) WithInterval(ctx context.Context, d time.Duration) context.C
if d <= 0 {
return ctx
}
sessionID, ok := ctx.Value(ctxSessionID).(SessionID)
if !ok {
// there is no session to attach the timer to
return ctx
}
storeDuration(&c.id, d)
c.i.LoadOrStore(sessionID, time.NewTicker(c.id))
var interval eios.IntervalChannel = func() <-chan time.Time {
if val, ok := c.i.Load(sessionID); ok {
val.(*time.Ticker).Reset(loadDuration(&c.id))
@@ -146,7 +167,7 @@ func (c *lifecycle) WithInterval(ctx context.Context, d time.Duration) context.C
}
return nil
}
return context.WithValue(ctx, eios.SessionIntervalKey, interval)
}
@@ -154,13 +175,13 @@ func (c *lifecycle) WithTimeout(ctx context.Context, d time.Duration) context.Co
if d <= 0 {
return ctx
}
sessionID, ok := ctx.Value(ctxSessionID).(SessionID)
if !ok {
// there is no session to attach the timer to
return ctx
}
storeDuration(&c.td, d)
if val, ok := c.t.Load(sessionID); ok {
reset := (loadDuration(&c.td) + loadDuration(&c.id)) - loadDuration(&c.shave)
@@ -171,14 +192,14 @@ func (c *lifecycle) WithTimeout(ctx context.Context, d time.Duration) context.Co
c.t.Store(sessionID, time.NewTimer(reset))
c.setTimeout(sessionID, time.Now())
}
x, cancel := context.WithCancel(ctx)
c.cancel.Store(sessionID, func() { cancel() })
var timeout eios.TimeoutChannel = func() <-chan struct{} {
return x.Done()
}
x = context.WithValue(x, eios.SessionExtendTimeoutKey, eios.ExtendTimeoutFunc(func() {
if val, ok := c.t.Load(sessionID); ok {
reset := (loadDuration(&c.td) + loadDuration(&c.id)) - loadDuration(&c.shave)
@@ -186,7 +207,7 @@ func (c *lifecycle) WithTimeout(ctx context.Context, d time.Duration) context.Co
val.(*time.Timer).Reset(reset)
}
}))
return context.WithValue(x, eios.SessionTimeoutKey, timeout)
}
@@ -194,10 +215,10 @@ func (c *lifecycle) setTimeout(sessionID SessionID, start time.Time) {
go func() {
val, _ := c.t.Load(sessionID)
<-val.(*time.Timer).C
cancel, _ := c.cancel.Load(sessionID)
cancel.(func())()
c.removeSession(sessionID)
if c.removeTransport != nil {
c.removeTransport(sessionID)
+10 -7
View File
@@ -2,7 +2,7 @@ package transport
import (
"net/http"
eiop "github.com/oarkflow/socketio/engineio/protocol"
eios "github.com/oarkflow/socketio/engineio/session"
)
@@ -31,9 +31,10 @@ type Transporter interface {
Name() Name
Send(eiop.Packet)
Receive() <-chan eiop.Packet
SendTimeout()
ReceiveTimeout() <-chan SessionID
Run(http.ResponseWriter, *http.Request, ...Option) error
Shutdown()
}
@@ -45,12 +46,12 @@ type Transport struct {
id SessionID
name Name
codec Codec
sendPing bool
send, receive chan eiop.Packet
shutdown func()
expireId chan SessionID
shutdown func()
}
func (t *Transport) ID() SessionID { return t.id }
@@ -63,3 +64,5 @@ func (t *Transport) Shutdown() {
t.shutdown()
}
}
func (t *Transport) SendTimeout() { t.expireId <- t.id }
func (t *Transport) ReceiveTimeout() <-chan SessionID { return t.expireId }
+25 -24
View File
@@ -8,9 +8,9 @@ import (
"strconv"
"strings"
"time"
"golang.org/x/text/transform"
eiop "github.com/oarkflow/socketio/engineio/protocol"
eios "github.com/oarkflow/socketio/engineio/session"
)
@@ -23,9 +23,9 @@ type handlerWithError func(http.ResponseWriter, *http.Request) error
type PollingTransport struct {
*Transport
sleep time.Duration
compress func(handlerWithError) handlerWithError
}
@@ -39,6 +39,7 @@ func NewPollingTransport(chanBuf int) func(SessionID, Codec) Transporter {
send: make(chan eiop.Packet, chanBuf),
receive: make(chan eiop.Packet, chanBuf),
sendPing: true,
expireId: make(chan eios.ID),
},
compress: func(fn handlerWithError) handlerWithError {
return func(w http.ResponseWriter, r *http.Request) error {
@@ -47,7 +48,7 @@ func NewPollingTransport(chanBuf int) func(SessionID, Codec) Transporter {
},
sleep: 25 * time.Millisecond,
}
return t
}
}
@@ -62,7 +63,7 @@ func (t *PollingTransport) InnerTransport() *Transport { return t.Transport }
func (t *PollingTransport) Run(w http.ResponseWriter, r *http.Request, opts ...Option) (err error) {
t.With(opts...)
switch r.Method {
case http.MethodGet:
return t.compress(jsonp(t.poll))(w, r)
@@ -71,7 +72,7 @@ func (t *PollingTransport) Run(w http.ResponseWriter, r *http.Request, opts ...O
return t.emit(w, r)
}
return nil
}
func (t *PollingTransport) Write(w http.ResponseWriter, r *http.Request) error {
@@ -101,22 +102,22 @@ Write:
}
packets = append(packets, packet)
}
if buffer.use {
for _, packet := range packets[buffer.idx:] {
t.receive <- packet
}
}
return t.codec.PayloadEncoder.To(w).WritePayload(packets[:buffer.idx])
}
// longPoll allows a connection for a specified amout of time... then releases a payload
func (t *PollingTransport) poll(w http.ResponseWriter, r *http.Request) (err error) {
var ctx = r.Context()
var interval, timeout, cancel = make(<-chan time.Time), make(<-chan struct{}), make(<-chan func())
if fn, ok := ctx.Value(eios.SessionIntervalKey).(eios.IntervalChannel); ok {
interval = fn()
}
@@ -126,7 +127,7 @@ func (t *PollingTransport) poll(w http.ResponseWriter, r *http.Request) (err err
if fn, ok := ctx.Value(eios.SessionCloseChannelKey).(func() <-chan func()); ok {
cancel = fn()
}
var done func()
var packets eiop.Payload
@@ -158,7 +159,7 @@ Write:
}
}
}
select {
case stop := <-cancel:
if stop != nil {
@@ -176,14 +177,14 @@ Write:
}
}
}
t.send <- eiop.Packet{T: eiop.NoopPacket, D: socketClose{ErrCloseSocket}} // shutdown the HTTP connection
return err
}
// gather pulls in all of the posts
func (t *PollingTransport) emit(w http.ResponseWriter, r *http.Request) error {
var payload eiop.Payload
if err := t.codec.PayloadDecoder.From(r.Body).ReadPayload(&payload); err != nil {
t.send <- eiop.Packet{T: eiop.NoopPacket, D: socketClose{err}}
@@ -206,9 +207,9 @@ Read:
}
t.send <- packet
}
t.send <- eiop.Packet{T: eiop.NoopPacket, D: socketClose{}} // shutdown the HTTP connection
return nil
}
@@ -238,10 +239,10 @@ func WithHTTPCompression(kind HTTPCompressionKind) Option {
return fn(w, r)
}
w.Header().Set("Content-Encoding", "gzip")
gz := gzip.NewWriter(w)
defer gz.Close()
gzr := compressResponseWriter{Writer: gz, ResponseWriter: w}
return fn(gzr, r)
}
@@ -276,17 +277,17 @@ func jsonp(next handlerWithError) handlerWithError {
if j = r.URL.Query().Get("j"); j == "" {
return next(w, r)
}
tw := transform.NewWriter(w, quoteTransform{})
w.Header().Set("Content-type", "application/json")
fmt.Fprintf(w, `___eio[%s]("`, j)
next(quoteWriter{Writer: tw, ResponseWriter: w}, r)
tw.Close()
fmt.Fprint(w, `");`)
return nil
}
}
+55 -44
View File
@@ -9,10 +9,10 @@ import (
"net/http"
"sync"
"time"
ws "github.com/oarkflow/websocket"
errg "golang.org/x/sync/errgroup"
eiop "github.com/oarkflow/socketio/engineio/protocol"
eios "github.com/oarkflow/socketio/engineio/session"
)
@@ -26,7 +26,7 @@ const defaultPingMsg = "probe"
type WebsocketTransport struct {
*Transport
conn *ws.Conn
origin []string
PingMsg string
buffered bool // default: false
@@ -43,16 +43,17 @@ func NewWebsocketTransport(chanBuf int) func(SessionID, Codec) Transporter {
{
t := &WebsocketTransport{
Transport: &Transport{
id: id,
name: Websocket,
codec: codec,
send: make(chan eiop.Packet, chanBuf),
receive: make(chan eiop.Packet, chanBuf),
id: id,
name: Websocket,
codec: codec,
send: make(chan eiop.Packet, chanBuf),
receive: make(chan eiop.Packet, chanBuf),
expireId: make(chan eios.ID),
},
origin: []string{"*"},
PingMsg: defaultPingMsg,
}
return t
}
}
@@ -66,16 +67,26 @@ func (t *WebsocketTransport) With(opts ...Option) {
func (t *WebsocketTransport) InnerTransport() *Transport { return t.Transport }
func (t *WebsocketTransport) ConnClose(code ws.StatusCode, reason string, ctx context.Context) error {
err := t.conn.Close(code, reason)
if done, ok := ctx.Value(eios.SessionDisconnectFunctionKey).(func() func()); ok {
if cleanup := done(); cleanup != nil {
cleanup()
}
}
return err
}
func (t *WebsocketTransport) Run(w http.ResponseWriter, r *http.Request, opts ...Option) (err error) {
t.With(opts...)
t.conn, err = ws.Accept(w, r, &ws.AcceptOptions{
OriginPatterns: t.origin,
})
if err != nil {
return err
}
ctx := r.Context()
// A context value can be passed in to allow the a server to be setup before the
// probe is attempted, this is good for testing. If the context key is not here
@@ -83,62 +94,62 @@ func (t *WebsocketTransport) Run(w http.ResponseWriter, r *http.Request, opts ..
if complete, ok := ctx.Value(serverSetupComplete).(*sync.WaitGroup); ok && complete != nil {
complete.Wait()
}
if t.isInitProbe {
if err := t.probe(w, r); err != nil {
return err
}
}
grp, ctx := errg.WithContext(ctx)
grp.Go(func() error { return t.incoming(ctx) })
grp.Go(func() error { return t.outgoing(r.WithContext(ctx)) })
err = grp.Wait()
t.conn.Close(ws.StatusNormalClosure, "done")
t.ConnClose(ws.StatusNormalClosure, "done", ctx)
return err
}
func (t *WebsocketTransport) probe(w http.ResponseWriter, r *http.Request) error {
type Packet = eiop.Packet
ctx := r.Context()
enc := t.codec.PacketEncoder
dec := t.codec.PacketDecoder
// Send the Ping...
wsw, err := t.conn.Writer(ctx, ws.MessageText)
if err != nil {
return err
}
if err := enc.To(wsw).WritePacket(Packet{T: eiop.PingPacket, D: t.PingMsg}); err != nil {
return err
}
wsw.Close() // done with the connection, must always close.
// Receive the Pong
_, wsr, err := t.conn.Reader(ctx)
if err != nil {
return err
}
var packet Packet
if err = dec.From(wsr).ReadPacket(&packet); err != nil {
return err
}
if packet.T != eiop.PongPacket {
return fmt.Errorf("expected pong packet")
}
if pingMsg, ok := packet.D.(string); !ok {
return fmt.Errorf("expected pong word to be a string")
} else if pingMsg != t.PingMsg {
return fmt.Errorf("expected pong word is invalid")
}
// then we are successful!
return nil
}
@@ -158,14 +169,14 @@ func (t *WebsocketTransport) incoming(ctx context.Context) (err error) {
if !ok {
extendTimeout = func() {}
}
var done func()
var reason string
defer func() { t.conn.Close(ws.StatusNormalClosure, reason) }()
defer func() { t.ConnClose(ws.StatusNormalClosure, reason, ctx) }()
var start = time.Now()
Write:
for {
select {
case stop := <-cancel:
@@ -186,7 +197,7 @@ Write:
}
return err
}
if err = t.codec.PacketEncoder.To(cw).WritePacket(eiop.Packet{T: eiop.PingPacket, D: nil}); err != nil {
cw.Close()
return err
@@ -200,16 +211,16 @@ Write:
if err != nil {
return err
}
io.Copy(cw, packet.D.(io.Reader))
cw.Close()
} else {
cw, err := t.conn.Writer(ctx, ws.MessageText)
if err != nil {
return err
}
if t.governor.minTime > 0 {
// we need to slow things down sometimes...
if time.Since(start) < t.governor.minTime {
@@ -217,13 +228,13 @@ Write:
}
start = time.Now()
}
err = t.codec.PacketEncoder.To(cw).WritePacket(packet)
cw.Close()
}
}
}
select {
case stop := <-cancel:
if stop != nil {
@@ -234,7 +245,7 @@ Write:
}
default:
}
return nil
}
@@ -257,22 +268,22 @@ func (t *WebsocketTransport) outgoing(r *http.Request) (err error) {
if !ok {
extendTimeout = func() {}
}
var unbuffered = new(sync.WaitGroup)
defer t.conn.Close(ws.StatusNormalClosure, "read")
defer t.ConnClose(ws.StatusNormalClosure, "read", ctx)
for {
if !t.buffered {
unbuffered.Wait()
}
// - /* blocking */ -//
// read a packet off the wire...
msgType, cr, err := t.conn.Reader(ctx) // this will close when shutdown() is called.
if err != nil {
return err
}
extendTimeout()
if msgType != ws.MessageText {
// this is binary data
@@ -295,12 +306,12 @@ func (t *WebsocketTransport) outgoing(r *http.Request) (err error) {
}
continue
}
var packet eiop.Packet
if err = dec.From(cr).ReadPacket(&packet); err != nil {
return err
}
switch packet.T {
case eiop.ClosePacket:
if done, ok := r.Context().Value(eios.SessionCloseFunctionKey).(func() func()); ok {
@@ -309,7 +320,7 @@ func (t *WebsocketTransport) outgoing(r *http.Request) (err error) {
}
}
t.conn.CloseRead(ctx)
t.conn.Close(ws.StatusNormalClosure, "cross origin WebSocket accepted")
t.ConnClose(ws.StatusNormalClosure, "cross origin WebSocket accepted", ctx)
return nil
case eiop.PingPacket:
cw, err := t.conn.Writer(ctx, ws.MessageText)
@@ -334,7 +345,7 @@ func (t *WebsocketTransport) outgoing(r *http.Request) (err error) {
}
}
}
}
}
}
+2 -2
View File
@@ -22,13 +22,13 @@
<script src="https://code.jquery.com/jquery-1.11.1.js"></script>
<script>
var socket = io("http://127.0.0.1:8001", {transports: ["websocket"]});
var s2 = io("http://127.0.0.1:8001/chat", {transports: ["websocket"]})
socket.on('reply', function(msg){
$('#messages').append($('<li>').text(msg));
});
$('form').submit(function(){
socket.emit('msg', $('#m').val(), function(data){
s2.emit('msg', $('#m').val(), function(data){
$('#messages').append($('<li>').text('ACK CALLBACK: ' + data));
});
+23 -13
View File
@@ -5,7 +5,7 @@ import (
"log"
"net/http"
"time"
sio "github.com/oarkflow/socketio"
eio "github.com/oarkflow/socketio/engineio"
eiot "github.com/oarkflow/socketio/engineio/transport"
@@ -13,50 +13,60 @@ import (
)
func main() {
port := ":8001"
port := "localhost:8001"
server := sio.NewServer(
eio.WithPingInterval(300*1*time.Millisecond),
eio.WithPingTimeout(200*1*time.Millisecond),
eio.WithMaxPayload(1000000),
eio.WithTransportOption(eiot.WithGovernor(1500*time.Microsecond, 500*time.Microsecond)),
)
server.OnConnect(func(s *sio.SocketV4) error {
server.Of("/").OnConnect(func(s *sio.SocketV4) error {
log.Println("connected:", s.ID())
s.Of("/").On("notice", CustomWrap(func(a string) error {
s.On("notice", CustomWrap(func(a string) error {
return s.Emit("reply", seri.String("have "+a))
}))
s.Of("/").On("bye", CustomWrap(func(a string) error {
s.On("bye", CustomWrap(func(a string) error {
return s.Emit("bye", seri.String(a))
return nil
}))
s.Of("/").On("msg", CustomWrap(func(a string) error {
s.Of("/chat").On("msg", CustomWrap(func(a string) error {
fmt.Println("Msg", a)
return s.Emit("bye", seri.String(a))
return nil
}))
return nil
})
server.Of("/chat").OnConnect(func(s *sio.SocketV4) error {
log.Println("connected:", s.ID())
s.On("msg", CustomWrap(func(a string) error {
server.In()
return s.Emit("msg", seri.String(a))
}))
return nil
})
server.OnDisconnect(func(reason string) {
log.Println("closed", reason)
})
http.Handle("/socket.io/", server)
http.Handle("/", http.FileServer(http.Dir("asset")))
log.Printf("serving port %s...\n", port)
log.Printf("serving port http://%s...\n", port)
log.Fatal(http.ListenAndServe(port, nil))
}
// Define a custom wrapper
// Define a custom wrapper
type CustomWrap func(string) error
// Define your callback
func (cc CustomWrap) Callback(data ...any) error {
a, aOK := data[0].(string)
if !aOK {
return fmt.Errorf("bad parameters")
}
return cc(a)
}
+5 -5
View File
@@ -5,7 +5,7 @@ import (
"math/rand"
"net/http"
"time"
eios "github.com/oarkflow/socketio/engineio/session"
seri "github.com/oarkflow/socketio/serialize"
sios "github.com/oarkflow/socketio/session"
@@ -28,7 +28,7 @@ type (
// reference sessions through the package
SessionID = eios.ID
SocketID = sios.ID
Namespace = string
Room = string
Event = string
@@ -72,21 +72,21 @@ var _socketIDQuickPrefix = func(now time.Time) func() string {
return func() string {
src := rand.NewSource(now.UnixNano())
rnd := rand.New(src)
cards := [][]rune{
{127137, 127150}, // spades
{127153, 127166}, // hearts
{127169, 127182}, // diamonds
{127185, 127198}, // clubs
}
prefix := make([]rune, 5)
for i := range prefix {
suit := rnd.Intn(4)
card := int32(rnd.Intn(int(cards[suit][1]-cards[suit][0]-1))) + cards[suit][0]
prefix[i] = card
}
enc := hex.EncodeToString([]byte(string(prefix)))
return enc + "::"
}
+36 -17
View File
@@ -3,10 +3,11 @@ package socketio
import (
"context"
"errors"
eiot "github.com/oarkflow/socketio/engineio/transport"
"net/http"
"strings"
"sync"
nmem "github.com/oarkflow/socketio/adaptor/transport/memory"
eio "github.com/oarkflow/socketio/engineio"
erro "github.com/oarkflow/socketio/internal/errors"
@@ -23,18 +24,18 @@ import (
// ServerV1 is the same as the javascript SocketIO v1.0 server.
type ServerV1 struct {
inSocketV1
run func(socketID SocketID, req *Request) error
doConnectPacket func(socketID SocketID, socket siot.Socket, req *Request) error
doDisconnectPacket func(socketID SocketID, socket siot.Socket, req *Request) error
doEventPacket func(socketID SocketID, socket siot.Socket) error
doAckPacket func(socketID SocketID, socket siot.Socket) error
path *string
ctx context.Context
eio eio.EIOServer
transport siot.Transporter
}
@@ -42,13 +43,13 @@ type ServerV1 struct {
func NewServerV1(opts ...Option) *ServerV1 {
v1 := &ServerV1{inSocketV1: inSocketV1{ʟ: new(sync.RWMutex), x: new(sync.Mutex)}}
v1.new(opts...)
v1.eio = eio.NewServerV2(
eio.WithPath(*v1.path),
eio.WithInitialPackets(autoConnect(v1)),
).(eio.EIOServer)
v1.eio.With(opts...)
v1.With(opts...)
return v1
}
@@ -63,21 +64,21 @@ func (v1 *ServerV1) new(opts ...Option) Server {
v1.doDisconnectPacket = doDisconnectPacket(v1)
v1.doEventPacket = doEventPacket(v1)
v1.doAckPacket = doAckPacket(v1)
v1.ns = "/"
v1.path = ampersand("/socket.io/")
v1.events = make(map[Namespace]map[Event]map[SocketID]eventCallback)
v1.onConnect = make(map[Namespace]onConnectCallbackVersion1)
v1.protectedEventName = v1ProtectedEventName
v1.transport = nmem.NewInMemoryTransport(siop.NewPacketV2) // set the default transport
v1.inSocketV1.binary = true // for the v1 implementation this always is set to true
v1.inSocketV1.compress = true // for the v1 implementation this always is set to true
v1.inSocketV1.setTransporter(v1.transport)
return v1
}
@@ -111,12 +112,12 @@ func (v1 *ServerV1) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if v1.path != nil && !strings.HasPrefix(r.URL.Path, *v1.path) { // lock to the default socketio path if present
return
}
ctx := r.Context()
if v1.ctx != nil {
ctx = v1.ctx
}
var eState erro.State
if err := v1.serveHTTP(w, r.WithContext(ctx)); err != nil {
switch {
@@ -138,12 +139,30 @@ func (v1 *ServerV1) serveHTTP(w http.ResponseWriter, r *http.Request) (err error
if err != nil {
return err
}
go func(e eiot.Transporter) {
for {
sessionId, ok := <-e.ReceiveTimeout()
if ok {
socketId := v1.transport.GetSocketID(sessionId)
if socketId != nil && !v1.tr().IsDisconnected(*socketId) {
for namespaceId, namespaces := range v1.events {
if events, ok := namespaces[OnDisconnectEvent]; ok {
if fn, ok := events[*socketId]; ok {
v1.tr().Leave(namespaceId, *socketId, socketIDPrefix+socketId.String())
fn.Callback("client namespace disconnect.")
}
}
}
}
return
}
}
}(eioTransport)
sid, err := v1.transport.Add(eioTransport)
if err != nil {
return err
}
v1.setSocketID(sid)
return v1.run(sid, sioRequest(r))
}
+11 -9
View File
@@ -4,7 +4,7 @@ import (
"errors"
"fmt"
"net/http"
eiot "github.com/oarkflow/socketio/engineio/transport"
siop "github.com/oarkflow/socketio/protocol"
siot "github.com/oarkflow/socketio/transport"
@@ -17,12 +17,12 @@ func autoConnect(v1 *ServerV1) func(transport eiot.Transporter, r *http.Request)
v1.tr().Send(socketID, serviceError(err), siop.WithType(siop.ErrorPacket.Byte()))
return
}
socket := siot.Socket{
Type: siop.ConnectPacket.Byte(),
Namespace: "/",
}
if err := v1.doConnectPacket(socketID, socket, sioRequest(r)); err != nil {
v1.tr().Send(socketID, serviceError(err), siop.WithType(siop.ErrorPacket.Byte()))
return
@@ -82,13 +82,13 @@ func doConnectPacket(v1 *ServerV1) func(SocketID, siot.Socket, *Request) error {
unlock := v1.r()
tr := v1.tr()
unlock()
tr.Join(socket.Namespace, socketID, socketID.Room(socketIDPrefix))
v1.setPrefix()
v1.setSocketID(socketID)
v1.setNsp(socket.Namespace)
if fn, ok := v1.onConnect[socket.Namespace]; ok {
tr.Send(socketID, nil, siop.WithType(byte(siop.ConnectPacket)), siop.WithNamespace(socket.Namespace))
return fn(&SocketV1{inSocketV1: v1.inSocketV1.clone(), req: req, Connected: true})
@@ -101,11 +101,13 @@ func doDisconnectPacket(v1 *ServerV1) func(SocketID, siot.Socket, *Request) erro
return func(socketID SocketID, socket siot.Socket, req *Request) (err error) {
if fn, ok := v1.events[socket.Namespace][OnDisconnectEvent][socketID]; ok {
v1.tr().Leave(socket.Namespace, socketID, socketIDPrefix+socketID.String())
v1.tr().Disconnect(socketID)
return fn.Callback("client namespace disconnect")
}
// for any socket id at the io. (server) level...
if fn, ok := v1.events[socket.Namespace][OnDisconnectEvent][serverEvent]; ok {
v1.tr().Leave(socket.Namespace, socketID, socketIDPrefix+socketID.String())
v1.tr().Disconnect(socketID)
return fn.Callback("client namespace disconnect")
}
return ErrOnDisconnectSocket
@@ -117,7 +119,7 @@ func doEventPacket(v1 *ServerV1) func(SocketID, siot.Socket) error {
type callbackAck interface {
CallbackAck(...interface{}) []interface{}
}
switch data := socket.Data.(type) {
case []interface{}:
event, ok := data[0].(string)
@@ -127,7 +129,7 @@ func doEventPacket(v1 *ServerV1) func(SocketID, siot.Socket) error {
if socket.Type != siop.DisconnectPacket.Byte() {
data = data[1:]
}
if fn, ok := v1.events[socket.Namespace][event][socketID]; ok {
if socket.AckID > 0 {
if fn, ok := fn.(callbackAck); ok {
@@ -151,7 +153,7 @@ func doEventPacket(v1 *ServerV1) func(SocketID, siot.Socket) error {
if socket.Type != siop.DisconnectPacket.Byte() {
data = data[1:]
}
if fn, ok := v1.events[socket.Namespace][event][socketID]; ok {
return fn.Callback(stoi(data)...)
}
+5 -2
View File
@@ -13,8 +13,11 @@ type Transporter interface {
AddSetter
JoinLeaver
SendReceiver
AckID() uint64
GetSocketID(eiot.SessionID) *SocketID
Disconnect(SocketID)
IsDisconnected(SocketID) bool
}
type Sender interface {
@@ -38,7 +41,7 @@ type AddSetter interface {
type Emitter interface {
Sender
Sockets(ns Namespace) SocketArray
Rooms(ns Namespace, id SocketID) RoomArray
}
+29 -20
View File
@@ -3,7 +3,7 @@ package transport
import (
"io"
"strings"
eiop "github.com/oarkflow/socketio/engineio/protocol"
eios "github.com/oarkflow/socketio/engineio/session"
eiot "github.com/oarkflow/socketio/engineio/transport"
@@ -14,18 +14,18 @@ import (
type (
// The EngineIO session ID
SessionID = eios.ID
// The SocketID session ID
SocketID = sios.ID
// The functional option that can be used with Packets
Option = siop.Option
Namespace = string
Room = string
Data interface{} // The Data packet type
// Socket is a generic socket that is passed to the emit function during execution
Socket struct {
Type byte
@@ -57,12 +57,13 @@ func (buf *buffer) StopBuffer() { buf.active = false }
// HTTP long polling, Websockets or Server-Side events
type Transport struct {
id SocketID
*buffer
receive chan Socket
newPacket siop.NewPacket
eioTransport eiot.Transporter
receive chan Socket
newPacket siop.NewPacket
eioTransport eiot.Transporter
isDisconnected bool
}
func NewTransport(id SocketID, eioTransport eiot.Transporter, fn siop.NewPacket) *Transport {
@@ -75,6 +76,14 @@ func NewTransport(id SocketID, eioTransport eiot.Transporter, fn siop.NewPacket)
}
}
func (t *Transport) Disconnect() {
t.isDisconnected = true
}
func (t *Transport) IsDisconnected() bool {
return t.isDisconnected
}
func (t *Transport) SendBuffer() {
for _, packet := range t.buffer.packets {
t.eioTransport.Send(packet)
@@ -90,7 +99,7 @@ func (t *Transport) Send(data Data, opts ...Option) {
t.buffer.packets = append(t.buffer.packets, eioPacket)
return
}
t.eioTransport.Send(eioPacket)
t.sendBinary(eioPacket)
}
@@ -116,11 +125,11 @@ func (t *Transport) Receive() <-chan Socket {
if _, err := pac.(io.ReaderFrom).ReadFrom(strings.NewReader(data)); err != nil {
t.eioTransport.Send(eiop.Packet{T: eiop.NoopPacket, D: err})
}
switch pac.GetType() {
case siop.BinaryEventPacket.Byte(), siop.BinaryAckPacket.Byte():
if in, ok := pac.(interface{ ReadBinary() func(io.Reader) error }); ok {
t.receive <- packetToSocket(pac)
var cntPlaceholders int
EIOPacketData:
@@ -129,7 +138,7 @@ func (t *Transport) Receive() <-chan Socket {
if r, ok := eioPacket.D.(io.Reader); ok && bin != nil {
bin(r)
}
cntPlaceholders++
if cntPlaceholders >= len(pac.GetData().([]interface{}))-1 || // TODO(njones): base this off of the binary index...
eioPacket.T != eiop.BinaryPacket {
@@ -139,26 +148,26 @@ func (t *Transport) Receive() <-chan Socket {
continue
}
}
t.receive <- packetToSocket(pac)
}
switch eioPacket.T {
case eiop.NoopPacket:
if done, ok := eioPacket.D.(interface{ SocketCloseChannel() error }); ok {
if err := done.SocketCloseChannel(); err != nil {
sioPacket := t.newPacket().
WithType(siop.ErrorPacket.Byte()).
WithData(err)
t.receive <- packetToSocket(sioPacket.(packet))
}
close(t.receive)
return
}
}
}
}()
return t.receive