feat: add crontab server and example (#101)

This commit is contained in:
Richard
2023-08-04 17:27:51 +08:00
committed by GitHub
parent bf3b452ad8
commit b4e4b80458
9 changed files with 288 additions and 1 deletions
+58
View File
@@ -0,0 +1,58 @@
package crontab
import (
"strings"
"time"
"github.com/go-eagle/eagle/pkg/log"
)
type Logger struct {
Log log.Logger
}
func (l Logger) Info(msg string, keysAndValues ...interface{}) {
keysAndValues = formatTimes(keysAndValues)
keysAndValues = append([]interface{}{
msg,
}, keysAndValues...)
l.Log.Infof(formatString(len(keysAndValues)), keysAndValues...)
}
func (l Logger) Error(err error, msg string, keysAndValues ...interface{}) {
keysAndValues = formatTimes(keysAndValues)
keysAndValues = append([]interface{}{
msg,
"error", err,
}, keysAndValues...)
l.Log.Errorf(formatString(len(keysAndValues)+2), keysAndValues...)
}
// formatString returns a logfmt-like format string for the number of
// key/values.
func formatString(numKeysAndValues int) string {
var sb strings.Builder
sb.WriteString("%s")
if numKeysAndValues > 0 {
sb.WriteString(", ")
}
for i := 0; i < numKeysAndValues/2; i++ {
if i > 0 {
sb.WriteString(", ")
}
sb.WriteString("%v=%v")
}
return sb.String()
}
// formatTimes formats any time.Time values as RFC3339.
func formatTimes(keysAndValues []interface{}) []interface{} {
var formattedArgs []interface{}
for _, arg := range keysAndValues {
if t, ok := arg.(time.Time); ok {
arg = t.Format(time.RFC3339)
}
formattedArgs = append(formattedArgs, arg)
}
return formattedArgs
}
+119
View File
@@ -0,0 +1,119 @@
package crontab
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
"github.com/robfig/cron/v3"
"github.com/go-eagle/eagle/pkg/config"
)
// Config crontab config
type Config struct {
Timezone string
Tasks []Task
}
// Task crontab task
type Task struct {
Name string
Schedule string
}
// Server crontab server
type Server struct {
conf *Config
// cron schedule
schedule *cron.Cron
jobs map[string]cron.Job
logger cron.Logger
stop chan struct{}
}
// NewServer new a crontab server
func NewServer(jobs map[string]cron.Job, logger cron.Logger) *Server {
// load config
cfg, err := loadConf()
if err != nil {
panic(err)
}
if len(cfg.Tasks) == 0 {
panic("crontab config is empty")
}
if len(jobs) == 0 || jobs == nil {
panic("crontab jobs is empty")
}
if len(cfg.Tasks) != len(jobs) {
panic("crontab tasks and jobs not match")
}
loc, err := time.LoadLocation(cfg.Timezone)
if err != nil {
panic(err)
}
// new server
return &Server{
conf: cfg,
schedule: cron.New(
cron.WithLocation(loc),
cron.WithLogger(logger),
),
jobs: jobs,
stop: make(chan struct{}, 1),
}
}
// Start the crontab server
func (s *Server) Start(ctx context.Context) error {
for _, task := range s.conf.Tasks {
task := task
// get job
job, ok := s.jobs[task.Name]
if !ok {
return fmt.Errorf("[crontab] job not found: %s", task.Name)
}
_, err := s.schedule.AddJob(task.Schedule, job)
if err != nil {
return errors.Wrapf(err, "[crontab] add job [%s] error", task.Name)
}
}
s.schedule.Start()
select {
case <-s.stop:
s.schedule.Stop()
}
return nil
}
// Stop the crontab server
func (s *Server) Stop(ctx context.Context) error {
s.logger.Info("[crontab] server stopping...")
s.stop <- struct{}{}
return nil
}
// loadConf load config
func loadConf() (ret *Config, err error) {
v, err := config.LoadWithType("crontab", "yaml")
if err != nil {
return nil, err
}
c := Config{}
err = v.Unmarshal(&c)
if err != nil {
return nil, err
}
return &c, nil
}