aqi/ws/pubsub.go
2024-06-18 18:09:39 +08:00

91 lines
1.7 KiB
Go

package ws
import (
"sync"
"github.com/wonli/aqi/logger"
)
type PubSub struct {
Topics *sync.Map //Topics map[string]*Topic //主题名称和Top对应map
TopicMsgQueue chan *TopicMsg //主题消息队列
}
func NewPubSub() *PubSub {
return &PubSub{
Topics: new(sync.Map),
TopicMsgQueue: make(chan *TopicMsg, 128),
}
}
func (a *PubSub) initTopic(topicId string) *Topic {
//主题不存在时先创建主题
topic, ok := a.Topics.Load(topicId)
if !ok {
t := &Topic{
Id: topicId,
PubSub: a,
SubUsers: sync.Map{},
SubHandlers: sync.Map{},
}
a.Topics.Store(topicId, t)
return t
}
return topic.(*Topic)
}
// Pub 发布主题
func (a *PubSub) Pub(topicId string, data any) {
msg := Action{
Action: "subscriber",
Data: H{
"topicId": topicId,
"message": data,
},
}
//主题不存在时先创建主题
a.initTopic(topicId)
a.TopicMsgQueue <- &TopicMsg{
Ori: data,
TopicId: topicId,
Msg: msg.Encode(),
}
}
// Sub 订阅主题
func (a *PubSub) Sub(topicId string, user *User) {
a.initTopic(topicId).AddSubUser(user)
}
// SubFunc 以函数方式订阅
func (a *PubSub) SubFunc(topicId string, f func(msg *TopicMsg)) {
a.initTopic(topicId).AddSubHandle(f)
}
func (a *PubSub) Start() {
for {
select {
case msg, ok := <-a.TopicMsgQueue:
if !ok {
logger.SugarLog.Info("从订阅主题队列取数据失败")
continue
}
t, hasTopic := a.Topics.Load(msg.TopicId)
if !hasTopic {
logger.SugarLog.Info("未发布订阅主题收到消息")
continue
}
//订阅消息的函数处理
t.(*Topic).ApplyFunc(msg)
//订阅消息的用户处理
t.(*Topic).SendToSubUser(msg.Msg)
}
}
}