aqi/ws/client.go

189 lines
4.9 KiB
Go
Raw Normal View History

2024-06-18 18:08:39 +08:00
package ws
import (
"fmt"
"net"
"strings"
"sync"
"time"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"github.com/wonli/aqi/logger"
)
type Client struct {
Hub *Hubc `json:"-"`
Conn net.Conn `json:"-"`
Send chan []byte `json:"-"`
Endpoint string `json:"-"` //入口地址
OnceId string `json:"-"` //临时ID扫码登录等场景作为客户端唯一标识
Disconnecting bool `json:"-"` //已被设置为断开状态(消息发送完之后断开连接)
SyncMsg bool `json:"-"` //是否接收消息
LastMsgId int `json:"-"` //最后一条消息ID
RequiredValid bool `json:"-"` //人机验证标识
Validated bool `json:"-"` //是否已验证
ValidExpiry time.Time `json:"-"` //验证有效期
ValidCacheData any `json:"-"` //验证相关缓存数据
AuthCode string `json:"-"` //用于校验JWT中的code如果相等识别为同一个用户的网络地址变更
ErrorCount int `json:"-"` //错误次数
Closed bool `json:"-"` //是否已经关闭
User *User `json:"user,omitempty"` //关联用户
Scope string `json:"scope"` //登录jwt scope, 用于判断用户从哪里登录的
AppId string `json:"appId"` //登录应用Id
StoreId uint `json:"storeId"` //店铺ID
MerchantId uint `json:"merchantId"` //商户ID
TenantId uint `json:"tenantId"` //租户ID
Platform string `json:"platform"` //登录平台
GroupId string `json:"groupId"` //用户分组Id
IsLogin bool `json:"isLogin"` //是否已登录
LoginAction string `json:"loginAction"` //登录动作
ForceDialogId string `json:"forceDialogId"` //打开聊天界面的会话ID
IpAddress string `json:"ipAddress"` //IP地址
IpLocation string `json:"ipLocation"` //通过IP转换获得的地理位置
IpConnAddr string `json:"IpConnAddr"` //conn连接IP地址
ConnectionTime time.Time `json:"connectionTime"`
LastRequestTime time.Time `json:"lastRequestTime"`
LastHeartbeatTime time.Time `json:"lastHeartbeatTime"`
mu sync.RWMutex
Keys map[string]any
}
// Reader 读取
func (c *Client) Reader() {
defer func() {
c.Hub.Disconnect <- c
}()
for {
request, op, err := wsutil.ReadClientData(c.Conn)
if err != nil {
c.Log("xx", "Error reading data", err.Error())
return
}
if op == ws.OpText {
req := string(request)
c.Log("<-", req)
go Dispatcher(c, req)
} else if op == ws.OpPing {
err = wsutil.WriteServerMessage(c.Conn, ws.OpPong, nil)
if err != nil {
c.Log("xx", "Reply pong", err.Error())
}
} else {
c.Log("xx", "Unrecognized action")
}
}
}
// Write 发送
func (c *Client) Write() {
timer := time.NewTicker(5 * time.Second)
defer func() {
timer.Stop()
c.Hub.Disconnect <- c
}()
for {
select {
case msg, ok := <-c.Send:
if !ok {
return
}
err := wsutil.WriteServerMessage(c.Conn, ws.OpText, msg)
if err != nil {
c.Log("xx", "Send msg error", err.Error())
return
}
//如果设置为断开状态
//在消息发送完成后将断开与服务器的连接
if c.Disconnecting {
return
}
c.Log("->", string(msg))
case <-timer.C:
err := wsutil.WriteServerMessage(c.Conn, ws.OpPing, nil)
if err != nil {
c.Log("xx", "Error actively pinging the client", err.Error())
return
}
c.LastHeartbeatTime = time.Now()
if c.User != nil {
c.User.LastHeartbeatTime = c.LastHeartbeatTime
}
}
}
}
// Log websocket日志
func (c *Client) Log(symbol string, msg ...string) {
s := strings.Join(msg, ", ")
if c.IsLogin {
s = fmt.Sprintf("%s %s [%s-%s] %s", c.Conn.RemoteAddr(), symbol, c.User.Suid, c.AppId, s)
} else {
s = fmt.Sprintf("%s %s %s", c.Conn.RemoteAddr(), symbol, s)
}
if len(s) > 300 {
logger.SugarLog.Info(s[:300] + "...")
} else {
logger.SugarLog.Info(s)
}
}
// SendMsg 把消息加入发送队列
func (c *Client) SendMsg(msg []byte) {
defer func() {
if err := recover(); err != nil {
c.Hub.Disconnect <- c
logger.SugarLog.Errorf("SendMsg recover error(%s): %s", c.IpConnAddr, err)
}
}()
c.Send <- msg
}
// SendRawMsg 构造消息再发送
func (c *Client) SendRawMsg(code int, action, msg string, data any) {
a := &Action{
Action: action,
Code: code,
Msg: msg,
Data: data,
}
c.SendMsg(a.Encode())
}
// Close 关闭客户端
func (c *Client) Close() {
defer func() {
if err := recover(); err != nil {
c.Log("xx", "recover!! -> ", fmt.Sprintf("%v", err))
return
}
}()
if !c.Closed {
//防止重复关闭
c.Closed = true
//关闭通道
close(c.Send)
//关闭网络连接
_ = c.Conn.Close()
//打印日志
c.Log("xx", fmt.Sprintf("Close client -> %s", c.IpConnAddr))
}
}