mirror of
https://github.com/wonli/aqi.git
synced 2024-06-28 09:57:05 +08:00
189 lines
4.9 KiB
Go
189 lines
4.9 KiB
Go
package ws
|
||
|
||
import (
|
||
"fmt"
|
||
"net"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/gobwas/ws"
|
||
"github.com/gobwas/ws/wsutil"
|
||
|
||
"github.com/wonli/aqi/logger"
|
||
)
|
||
|
||
type Client struct {
|
||
Hub *Hubc `json:"-"`
|
||
Conn net.Conn `json:"-"`
|
||
Send chan []byte `json:"-"`
|
||
Endpoint string `json:"-"` //入口地址
|
||
OnceId string `json:"-"` //临时ID,扫码登录等场景作为客户端唯一标识
|
||
Disconnecting bool `json:"-"` //已被设置为断开状态(消息发送完之后断开连接)
|
||
SyncMsg bool `json:"-"` //是否接收消息
|
||
LastMsgId int `json:"-"` //最后一条消息ID
|
||
RequiredValid bool `json:"-"` //人机验证标识
|
||
Validated bool `json:"-"` //是否已验证
|
||
ValidExpiry time.Time `json:"-"` //验证有效期
|
||
ValidCacheData any `json:"-"` //验证相关缓存数据
|
||
AuthCode string `json:"-"` //用于校验JWT中的code,如果相等识别为同一个用户的网络地址变更
|
||
ErrorCount int `json:"-"` //错误次数
|
||
Closed bool `json:"-"` //是否已经关闭
|
||
|
||
User *User `json:"user,omitempty"` //关联用户
|
||
Scope string `json:"scope"` //登录jwt scope, 用于判断用户从哪里登录的
|
||
AppId string `json:"appId"` //登录应用Id
|
||
StoreId uint `json:"storeId"` //店铺ID
|
||
MerchantId uint `json:"merchantId"` //商户ID
|
||
TenantId uint `json:"tenantId"` //租户ID
|
||
Platform string `json:"platform"` //登录平台
|
||
GroupId string `json:"groupId"` //用户分组Id
|
||
IsLogin bool `json:"isLogin"` //是否已登录
|
||
LoginAction string `json:"loginAction"` //登录动作
|
||
ForceDialogId string `json:"forceDialogId"` //打开聊天界面的会话ID
|
||
IpAddress string `json:"ipAddress"` //IP地址
|
||
IpLocation string `json:"ipLocation"` //通过IP转换获得的地理位置
|
||
IpConnAddr string `json:"IpConnAddr"` //conn连接IP地址
|
||
ConnectionTime time.Time `json:"connectionTime"`
|
||
LastRequestTime time.Time `json:"lastRequestTime"`
|
||
LastHeartbeatTime time.Time `json:"lastHeartbeatTime"`
|
||
|
||
mu sync.RWMutex
|
||
Keys map[string]any
|
||
}
|
||
|
||
// Reader 读取
|
||
func (c *Client) Reader() {
|
||
defer func() {
|
||
c.Hub.Disconnect <- c
|
||
}()
|
||
|
||
for {
|
||
request, op, err := wsutil.ReadClientData(c.Conn)
|
||
if err != nil {
|
||
c.Log("xx", "Error reading data", err.Error())
|
||
return
|
||
}
|
||
|
||
if op == ws.OpText {
|
||
req := string(request)
|
||
c.Log("<-", req)
|
||
go Dispatcher(c, req)
|
||
} else if op == ws.OpPing {
|
||
err = wsutil.WriteServerMessage(c.Conn, ws.OpPong, nil)
|
||
if err != nil {
|
||
c.Log("xx", "Reply pong", err.Error())
|
||
}
|
||
} else {
|
||
c.Log("xx", "Unrecognized action")
|
||
}
|
||
}
|
||
}
|
||
|
||
// Write 发送
|
||
func (c *Client) Write() {
|
||
timer := time.NewTicker(5 * time.Second)
|
||
defer func() {
|
||
timer.Stop()
|
||
c.Hub.Disconnect <- c
|
||
}()
|
||
|
||
for {
|
||
select {
|
||
case msg, ok := <-c.Send:
|
||
if !ok {
|
||
return
|
||
}
|
||
|
||
err := wsutil.WriteServerMessage(c.Conn, ws.OpText, msg)
|
||
if err != nil {
|
||
c.Log("xx", "Send msg error", err.Error())
|
||
return
|
||
}
|
||
|
||
//如果设置为断开状态
|
||
//在消息发送完成后将断开与服务器的连接
|
||
if c.Disconnecting {
|
||
return
|
||
}
|
||
|
||
c.Log("->", string(msg))
|
||
case <-timer.C:
|
||
err := wsutil.WriteServerMessage(c.Conn, ws.OpPing, nil)
|
||
if err != nil {
|
||
c.Log("xx", "Error actively pinging the client", err.Error())
|
||
return
|
||
}
|
||
|
||
c.LastHeartbeatTime = time.Now()
|
||
if c.User != nil {
|
||
c.User.LastHeartbeatTime = c.LastHeartbeatTime
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// Log websocket日志
|
||
func (c *Client) Log(symbol string, msg ...string) {
|
||
s := strings.Join(msg, ", ")
|
||
if c.IsLogin {
|
||
s = fmt.Sprintf("%s %s [%s-%s] %s", c.Conn.RemoteAddr(), symbol, c.User.Suid, c.AppId, s)
|
||
} else {
|
||
s = fmt.Sprintf("%s %s %s", c.Conn.RemoteAddr(), symbol, s)
|
||
}
|
||
|
||
if len(s) > 300 {
|
||
logger.SugarLog.Info(s[:300] + "...")
|
||
} else {
|
||
logger.SugarLog.Info(s)
|
||
}
|
||
}
|
||
|
||
// SendMsg 把消息加入发送队列
|
||
func (c *Client) SendMsg(msg []byte) {
|
||
defer func() {
|
||
if err := recover(); err != nil {
|
||
c.Hub.Disconnect <- c
|
||
logger.SugarLog.Errorf("SendMsg recover error(%s): %s", c.IpConnAddr, err)
|
||
}
|
||
}()
|
||
|
||
c.Send <- msg
|
||
}
|
||
|
||
// SendRawMsg 构造消息再发送
|
||
func (c *Client) SendRawMsg(code int, action, msg string, data any) {
|
||
a := &Action{
|
||
Action: action,
|
||
Code: code,
|
||
Msg: msg,
|
||
Data: data,
|
||
}
|
||
|
||
c.SendMsg(a.Encode())
|
||
}
|
||
|
||
// Close 关闭客户端
|
||
func (c *Client) Close() {
|
||
defer func() {
|
||
if err := recover(); err != nil {
|
||
c.Log("xx", "recover!! -> ", fmt.Sprintf("%v", err))
|
||
return
|
||
}
|
||
}()
|
||
|
||
if !c.Closed {
|
||
//防止重复关闭
|
||
c.Closed = true
|
||
|
||
//关闭通道
|
||
close(c.Send)
|
||
|
||
//关闭网络连接
|
||
_ = c.Conn.Close()
|
||
|
||
//打印日志
|
||
c.Log("xx", fmt.Sprintf("Close client -> %s", c.IpConnAddr))
|
||
}
|
||
}
|