mirror of
https://github.com/wonli/aqi.git
synced 2024-06-28 09:57:05 +08:00
83 lines
1.3 KiB
Go
83 lines
1.3 KiB
Go
package worker
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
|
|
"github.com/hibiken/asynq"
|
|
|
|
"github.com/wonli/aqi/logger"
|
|
)
|
|
|
|
var Engine *EngineClient
|
|
|
|
type EngineClient struct {
|
|
Running bool
|
|
Router map[string]Task
|
|
|
|
Opt *asynq.RedisClientOpt
|
|
Server *asynq.Server
|
|
}
|
|
|
|
func InitEngine(rds *asynq.RedisClientOpt, config asynq.Config) *EngineClient {
|
|
server := asynq.NewServer(rds, config)
|
|
Engine = &EngineClient{
|
|
Opt: rds,
|
|
Server: server,
|
|
Router: map[string]Task{},
|
|
}
|
|
|
|
return Engine
|
|
}
|
|
|
|
func (e *EngineClient) Register(t Task) {
|
|
if e.Running {
|
|
logger.SugarLog.Errorf("please register in router")
|
|
return
|
|
}
|
|
|
|
name := t.GetName()
|
|
if name == "" {
|
|
logger.SugarLog.Errorf("failed to register, name is empty")
|
|
return
|
|
}
|
|
|
|
e.Router[name] = t
|
|
}
|
|
|
|
func (e *EngineClient) Add(task *asynq.Task) error {
|
|
t := task.Type()
|
|
if t == "" {
|
|
return fmt.Errorf("task type is undefined")
|
|
}
|
|
|
|
_, ok := e.Router[t]
|
|
if !ok {
|
|
return fmt.Errorf("task not registered")
|
|
}
|
|
|
|
client := asynq.NewClient(e.Opt)
|
|
defer client.Close()
|
|
|
|
_, err := client.Enqueue(task)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (e *EngineClient) Run() {
|
|
s := asynq.NewServeMux()
|
|
for name, handler := range e.Router {
|
|
s.Handle(name, handler)
|
|
}
|
|
|
|
e.Running = true
|
|
err := e.Server.Run(s)
|
|
if err != nil {
|
|
logger.SugarLog.Errorf("failed to start asynq service: :%s", err.Error())
|
|
os.Exit(0)
|
|
}
|
|
}
|