feat: support async message sending now (close #58)

This commit is contained in:
JustSong 2023-05-07 12:58:47 +08:00
parent 1cade7a218
commit 96e08578ec
6 changed files with 120 additions and 19 deletions

63
channel/message-queue.go Normal file
View File

@ -0,0 +1,63 @@
package channel
import (
"message-pusher/common"
"message-pusher/model"
)
var AsyncMessageQueue chan int
var AsyncMessageQueueSize = 128
var AsyncMessageSenderNum = 2
func init() {
AsyncMessageQueue = make(chan int, AsyncMessageQueueSize)
for i := 0; i < AsyncMessageSenderNum; i++ {
go asyncMessageSender()
}
}
// LoadAsyncMessages loads async pending messages from database.
// We have to wait the database connection is ready.
func LoadAsyncMessages() {
ids, err := model.GetAsyncPendingMessageIds()
if err != nil {
common.FatalLog("failed to load async pending messages: " + err.Error())
}
for _, id := range ids {
AsyncMessageQueue <- id
}
}
func asyncMessageSenderHelper(message *model.Message) error {
user, err := model.GetUserById(message.UserId, false)
if err != nil {
return err
}
channel_, err := model.GetChannelByName(message.Channel, user.Id)
if err != nil {
return err
}
return SendMessage(message, user, channel_)
}
func asyncMessageSender() {
for {
id := <-AsyncMessageQueue
message, err := model.GetMessageById(id)
if err != nil {
common.SysError("async message sender error: " + err.Error())
continue
}
err = asyncMessageSenderHelper(message)
status := common.MessageSendStatusFailed
if err != nil {
common.SysError("async message sender error: " + err.Error())
} else {
status = common.MessageSendStatusSent
}
err = message.UpdateStatus(status)
if err != nil {
common.SysError("async message sender error: " + err.Error())
}
}
}

View File

@ -101,10 +101,11 @@ const (
)
const (
MessageSendStatusUnknown = 0
MessageSendStatusPending = 1
MessageSendStatusSent = 2
MessageSendStatusFailed = 3
MessageSendStatusUnknown = 0
MessageSendStatusPending = 1
MessageSendStatusSent = 2
MessageSendStatusFailed = 3
MessageSendStatusAsyncPending = 4
)
const (

View File

@ -38,6 +38,7 @@ func GetPushMessage(c *gin.Context) {
Desp: c.Query("desp"),
Short: c.Query("short"),
OpenId: c.Query("openid"),
Async: c.Query("async") == "true",
}
keepCompatible(&message)
pushMessageHelper(c, &message)
@ -55,6 +56,7 @@ func PostPushMessage(c *gin.Context) {
Desp: c.PostForm("desp"),
Short: c.PostForm("short"),
OpenId: c.PostForm("openid"),
Async: c.PostForm("async") == "true",
}
if message == (model.Message{}) {
// Looks like the user is using JSON
@ -142,6 +144,7 @@ func pushMessageHelper(c *gin.Context, message *model.Message) {
c.JSON(http.StatusOK, gin.H{
"success": true,
"message": "",
"uuid": message.Link,
})
}
@ -149,6 +152,7 @@ func saveAndSendMessage(user *model.User, message *model.Message, channel_ *mode
if channel_.Status != common.ChannelStatusEnabled {
return errors.New("该渠道已被禁用")
}
common.MessageCount += 1 // We don't need to use atomic here because it's not a critical value
message.Link = common.GetUUID()
if message.URL == "" {
message.URL = fmt.Sprintf("%s/message/%s", common.ServerAddress, message.Link)
@ -158,25 +162,36 @@ func saveAndSendMessage(user *model.User, message *model.Message, channel_ *mode
defer func() {
// Update the status of the message
status := common.MessageSendStatusFailed
if success {
status = common.MessageSendStatusSent
if message.Async {
status = common.MessageSendStatusAsyncPending
} else {
if success {
status = common.MessageSendStatusSent
}
}
err := message.UpdateStatus(status)
if err != nil {
common.SysError("failed to update the status of the message: " + err.Error())
}
if message.Async {
channel.AsyncMessageQueue <- message.Id
}
}()
err := message.UpdateAndInsert(user.Id)
if err != nil {
return err
}
} else {
if message.Async {
return errors.New("异步发送消息需要用户具备消息持久化的权限")
}
message.Link = "unsaved" // This is for user to identify whether the message is saved
}
err := channel.SendMessage(message, user, channel_)
common.MessageCount += 1 // We don't need to use atomic here because it's not a critical value
if err != nil {
return err
if !message.Async {
err := channel.SendMessage(message, user, channel_)
if err != nil {
return err
}
}
success = true
return nil // After this line, the message status will be updated
@ -258,7 +273,7 @@ func GetUserMessages(c *gin.Context) {
func GetMessage(c *gin.Context) {
messageId, _ := strconv.Atoi(c.Param("id"))
userId := c.GetInt("id")
message, err := model.GetMessageById(messageId, userId)
message, err := model.GetMessageByIds(messageId, userId)
if err != nil {
c.JSON(http.StatusOK, gin.H{
"success": false,
@ -296,7 +311,7 @@ func ResendMessage(c *gin.Context) {
messageId, _ := strconv.Atoi(c.Param("id"))
userId := c.GetInt("id")
helper := func() error {
message, err := model.GetMessageById(messageId, userId)
message, err := model.GetMessageByIds(messageId, userId)
message.Id = 0
if err != nil {
return err

View File

@ -33,6 +33,7 @@ func main() {
if err != nil {
common.FatalLog(err)
}
go channel.LoadAsyncMessages()
defer func() {
err := model.CloseDB()
if err != nil {

View File

@ -18,14 +18,15 @@ type Message struct {
HTMLContent string `json:"html_content" gorm:"-:all"`
Timestamp int64 `json:"timestamp" gorm:"type:bigint"`
Link string `json:"link" gorm:"unique;index"`
To string `json:"to" gorm:"column:to"` // if specified, will send to this user(s)
Status int `json:"status" gorm:"default:0"` // pending, sent, failed
OpenId string `json:"openid" gorm:"-:all"` // alias for to
Desp string `json:"desp" gorm:"-:all"` // alias for content
Short string `json:"short" gorm:"-:all"` // alias for description
To string `json:"to" gorm:"column:to"` // if specified, will send to this user(s)
Status int `json:"status" gorm:"default:0;index"` // pending, sent, failed
OpenId string `json:"openid" gorm:"-:all"` // alias for to
Desp string `json:"desp" gorm:"-:all"` // alias for content
Short string `json:"short" gorm:"-:all"` // alias for description
Async bool `json:"async" gorm:"-"` // if true, will send message asynchronously
}
func GetMessageById(id int, userId int) (*Message, error) {
func GetMessageByIds(id int, userId int) (*Message, error) {
if id == 0 || userId == 0 {
return nil, errors.New("id 或 userId 为空!")
}
@ -34,6 +35,20 @@ func GetMessageById(id int, userId int) (*Message, error) {
return &message, err
}
func GetMessageById(id int) (*Message, error) {
if id == 0 {
return nil, errors.New("id 为空!")
}
message := Message{Id: id}
err := DB.Where(message).First(&message).Error
return &message, err
}
func GetAsyncPendingMessageIds() (ids []int, err error) {
err = DB.Model(&Message{}).Where("status = ?", common.MessageSendStatusAsyncPending).Pluck("id", &ids).Error
return ids, err
}
func GetMessageByLink(link string) (*Message, error) {
if link == "" {
return nil, errors.New("link 为空!")

View File

@ -17,7 +17,7 @@ function renderStatus(status) {
case 1:
return (
<Label basic color='olive'>
投递...
正在投递
</Label>
);
case 2:
@ -32,6 +32,12 @@ function renderStatus(status) {
发送失败
</Label>
);
case 4:
return (
<Label basic color='orange'>
已在队列
</Label>
);
default:
return (
<Label basic color='grey'>