mirror of
https://github.com/lxzan/gws
synced 2026-04-22 15:47:14 +08:00
194 lines
5.7 KiB
Go
194 lines
5.7 KiB
Go
package gws
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"unsafe"
|
|
|
|
"github.com/lxzan/gws/internal"
|
|
)
|
|
|
|
// 检查掩码设置是否符合 RFC6455 协议。
|
|
// Checks if the mask setting complies with the RFC6455 protocol.
|
|
func (c *Conn) checkMask(enabled bool) error {
|
|
// RFC6455: 所有从客户端发送到服务器的帧都必须设置掩码位为 1。
|
|
// RFC6455: All frames sent from client to server must have the mask bit set to 1.
|
|
if (c.isServer && !enabled) || (!c.isServer && enabled) {
|
|
return internal.CloseProtocolError
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// 读取控制帧
|
|
// Reads a control frame
|
|
func (c *Conn) readControl() error {
|
|
// RFC6455: 控制帧本身不能被分片。
|
|
// RFC6455: Control frames themselves MUST NOT be fragmented.
|
|
if !c.fh.GetFIN() {
|
|
return internal.CloseProtocolError
|
|
}
|
|
|
|
// RFC6455: 所有控制帧的有效载荷长度必须为 125 字节或更少,并且不能被分片。
|
|
// RFC6455: All control frames MUST have a payload length of 125 bytes or fewer and MUST NOT be fragmented.
|
|
var n = c.fh.GetLengthCode()
|
|
if n > internal.ThresholdV1 {
|
|
return internal.CloseProtocolError
|
|
}
|
|
|
|
// 不回收小块 buffer,控制帧一般 payload 长度为 0
|
|
// Do not recycle small buffers, control frames generally have a payload length of 0
|
|
var payload []byte
|
|
if n > 0 {
|
|
payload = make([]byte, n)
|
|
if err := internal.ReadN(c.br, payload); err != nil {
|
|
return err
|
|
}
|
|
if maskEnabled := c.fh.GetMask(); maskEnabled {
|
|
internal.MaskXOR(payload, c.fh.GetMaskKey())
|
|
}
|
|
}
|
|
|
|
var opcode = c.fh.GetOpcode()
|
|
switch opcode {
|
|
case OpcodePing:
|
|
return c.dispatchControl(OpcodePing, payload, nil)
|
|
case OpcodePong:
|
|
return c.dispatchControl(OpcodePong, payload, nil)
|
|
case OpcodeCloseConnection:
|
|
return c.emitClose(bytes.NewBuffer(payload))
|
|
default:
|
|
var err = fmt.Errorf("gws: unexpected opcode %d", opcode)
|
|
return internal.NewError(internal.CloseProtocolError, err)
|
|
}
|
|
}
|
|
|
|
// 读取消息
|
|
// Reads a message
|
|
func (c *Conn) readMessage() error {
|
|
// 解析帧头并获取内容长度
|
|
// Parse the frame header and get the content length
|
|
contentLength, err := c.fh.Parse(c.br)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if contentLength > c.config.ReadMaxPayloadSize {
|
|
return internal.CloseMessageTooLarge
|
|
}
|
|
|
|
// RSV1, RSV2, RSV3: 每个占 1 位
|
|
// 必须为 0,除非协商的扩展定义了非零值的含义。
|
|
// 如果接收到非零值且没有协商的扩展定义该非零值的含义,接收端点必须关闭 WebSocket 连接。
|
|
// RSV1, RSV2, RSV3: 1 bit each
|
|
// MUST be 0 unless an extension is negotiated that defines meanings for non-zero values.
|
|
// If a nonzero value is received and none of the negotiated extensions defines the meaning of such a nonzero value,
|
|
// the receiving endpoint MUST _Fail the WebSocket Connection_.
|
|
if !c.pd.Enabled && (c.fh.GetRSV1() || c.fh.GetRSV2() || c.fh.GetRSV3()) {
|
|
return internal.CloseProtocolError
|
|
}
|
|
|
|
maskEnabled := c.fh.GetMask()
|
|
if err := c.checkMask(maskEnabled); err != nil {
|
|
return err
|
|
}
|
|
|
|
var opcode = c.fh.GetOpcode()
|
|
var compressed = c.pd.Enabled && c.fh.GetRSV1()
|
|
if !opcode.isDataFrame() {
|
|
return c.readControl()
|
|
}
|
|
|
|
var fin = c.fh.GetFIN()
|
|
var buf = binaryPool.Get(contentLength + len(flateTail))
|
|
var p = buf.Bytes()[:contentLength]
|
|
var closer = Message{Data: buf}
|
|
defer closer.Close()
|
|
|
|
if err := internal.ReadN(c.br, p); err != nil {
|
|
return err
|
|
}
|
|
if maskEnabled {
|
|
internal.MaskXOR(p, c.fh.GetMaskKey())
|
|
}
|
|
|
|
if opcode != OpcodeContinuation && c.continuationFrame.initialized {
|
|
return internal.CloseProtocolError
|
|
}
|
|
|
|
if fin && opcode != OpcodeContinuation {
|
|
*(*[]byte)(unsafe.Pointer(buf)) = p
|
|
if !compressed {
|
|
closer.Data = nil
|
|
}
|
|
return c.emitMessage(&Message{Opcode: opcode, Data: buf, compressed: compressed})
|
|
}
|
|
|
|
// 处理分片消息
|
|
// processing segmented messages
|
|
if !fin && opcode != OpcodeContinuation {
|
|
c.continuationFrame.initialized = true
|
|
c.continuationFrame.compressed = compressed
|
|
c.continuationFrame.opcode = opcode
|
|
c.continuationFrame.buffer = bytes.NewBuffer(make([]byte, 0, contentLength))
|
|
}
|
|
if !c.continuationFrame.initialized {
|
|
return internal.CloseProtocolError
|
|
}
|
|
|
|
c.continuationFrame.buffer.Write(p)
|
|
if c.continuationFrame.buffer.Len() > c.config.ReadMaxPayloadSize {
|
|
return internal.CloseMessageTooLarge
|
|
}
|
|
if !fin {
|
|
return nil
|
|
}
|
|
|
|
msg := &Message{Opcode: c.continuationFrame.opcode, Data: c.continuationFrame.buffer, compressed: c.continuationFrame.compressed}
|
|
c.continuationFrame.reset()
|
|
return c.emitMessage(msg)
|
|
}
|
|
|
|
// 分发消息和异常恢复
|
|
// Dispatch message & Recovery
|
|
func (c *Conn) dispatchMessage(msg *Message) error {
|
|
defer c.config.Recovery(c.config.Logger)
|
|
c.handler.OnMessage(c, msg)
|
|
return nil
|
|
}
|
|
|
|
// 分发控制帧事件并进行异常恢复
|
|
// Dispatch control-frame events with recovery
|
|
//
|
|
// 控制帧(Ping/Pong/Close)的回调如果发生 panic,不应直接导致 ReadLoop 崩溃;
|
|
// 因此这里统一通过 Config.Recovery 进行兜底。
|
|
func (c *Conn) dispatchControl(opcode Opcode, payload []byte, err error) error {
|
|
defer c.config.Recovery(c.config.Logger)
|
|
switch opcode {
|
|
case OpcodePing:
|
|
c.handler.OnPing(c, payload)
|
|
case OpcodePong:
|
|
c.handler.OnPong(c, payload)
|
|
case OpcodeCloseConnection:
|
|
c.handler.OnClose(c, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// 发射消息事件
|
|
// Emit onmessage event
|
|
func (c *Conn) emitMessage(msg *Message) (err error) {
|
|
if msg.compressed {
|
|
msg.Data, err = c.deflater.Decompress(msg.Data, c.dpsWindow.dict)
|
|
if err != nil {
|
|
return internal.NewError(internal.CloseInternalErr, err)
|
|
}
|
|
_, _ = c.dpsWindow.Write(msg.Bytes())
|
|
}
|
|
if !internal.CheckEncoding(c.config.CheckUtf8Enabled, uint8(msg.Opcode), msg.Bytes()) {
|
|
return internal.NewError(internal.CloseUnsupportedData, ErrTextEncoding)
|
|
}
|
|
if c.config.ParallelEnabled {
|
|
return c.readQueue.Go(msg, c.dispatchMessage)
|
|
}
|
|
return c.dispatchMessage(msg)
|
|
}
|