mirror of
https://github.com/wonli/aqi.git
synced 2024-06-28 09:57:05 +08:00
83 lines
1.3 KiB
Go
83 lines
1.3 KiB
Go
|
package worker
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"os"
|
||
|
|
||
|
"github.com/hibiken/asynq"
|
||
|
|
||
|
"github.com/wonli/aqi/logger"
|
||
|
)
|
||
|
|
||
|
var Engine *EngineClient
|
||
|
|
||
|
type EngineClient struct {
|
||
|
Running bool
|
||
|
Router map[string]Task
|
||
|
|
||
|
Opt *asynq.RedisClientOpt
|
||
|
Server *asynq.Server
|
||
|
}
|
||
|
|
||
|
func InitEngine(rds *asynq.RedisClientOpt, config asynq.Config) *EngineClient {
|
||
|
server := asynq.NewServer(rds, config)
|
||
|
Engine = &EngineClient{
|
||
|
Opt: rds,
|
||
|
Server: server,
|
||
|
Router: map[string]Task{},
|
||
|
}
|
||
|
|
||
|
return Engine
|
||
|
}
|
||
|
|
||
|
func (e *EngineClient) Register(t Task) {
|
||
|
if e.Running {
|
||
|
logger.SugarLog.Errorf("please register in router")
|
||
|
return
|
||
|
}
|
||
|
|
||
|
name := t.GetName()
|
||
|
if name == "" {
|
||
|
logger.SugarLog.Errorf("failed to register, name is empty")
|
||
|
return
|
||
|
}
|
||
|
|
||
|
e.Router[name] = t
|
||
|
}
|
||
|
|
||
|
func (e *EngineClient) Add(task *asynq.Task) error {
|
||
|
t := task.Type()
|
||
|
if t == "" {
|
||
|
return fmt.Errorf("task type is undefined")
|
||
|
}
|
||
|
|
||
|
_, ok := e.Router[t]
|
||
|
if !ok {
|
||
|
return fmt.Errorf("task not registered")
|
||
|
}
|
||
|
|
||
|
client := asynq.NewClient(e.Opt)
|
||
|
defer client.Close()
|
||
|
|
||
|
_, err := client.Enqueue(task)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (e *EngineClient) Run() {
|
||
|
s := asynq.NewServeMux()
|
||
|
for name, handler := range e.Router {
|
||
|
s.Handle(name, handler)
|
||
|
}
|
||
|
|
||
|
e.Running = true
|
||
|
err := e.Server.Run(s)
|
||
|
if err != nil {
|
||
|
logger.SugarLog.Errorf("failed to start asynq service: :%s", err.Error())
|
||
|
os.Exit(0)
|
||
|
}
|
||
|
}
|