package ws import ( "sync" "github.com/wonli/aqi/logger" ) type PubSub struct { Topics *sync.Map //Topics map[string]*Topic //主题名称和Top对应map TopicMsgQueue chan *TopicMsg //主题消息队列 } func NewPubSub() *PubSub { return &PubSub{ Topics: new(sync.Map), TopicMsgQueue: make(chan *TopicMsg, 128), } } func (a *PubSub) initTopic(topicId string) *Topic { //主题不存在时先创建主题 topic, ok := a.Topics.Load(topicId) if !ok { t := &Topic{ Id: topicId, PubSub: a, SubUsers: sync.Map{}, SubHandlers: sync.Map{}, } a.Topics.Store(topicId, t) return t } return topic.(*Topic) } // Pub 发布主题 func (a *PubSub) Pub(topicId string, data any) { msg := Action{ Action: "subscriber", Data: H{ "topicId": topicId, "message": data, }, } //主题不存在时先创建主题 a.initTopic(topicId) a.TopicMsgQueue <- &TopicMsg{ Ori: data, TopicId: topicId, Msg: msg.Encode(), } } // Sub 订阅主题 func (a *PubSub) Sub(topicId string, user *User) { a.initTopic(topicId).AddSubUser(user) } // SubFunc 以函数方式订阅 func (a *PubSub) SubFunc(topicId string, f func(msg *TopicMsg)) { a.initTopic(topicId).AddSubHandle(f) } func (a *PubSub) Start() { for { select { case msg, ok := <-a.TopicMsgQueue: if !ok { logger.SugarLog.Info("从订阅主题队列取数据失败") continue } t, hasTopic := a.Topics.Load(msg.TopicId) if !hasTopic { logger.SugarLog.Info("未发布订阅主题收到消息") continue } //订阅消息的函数处理 t.(*Topic).ApplyFunc(msg) //订阅消息的用户处理 t.(*Topic).SendToSubUser(msg.Msg) } } }