mirror of
https://github.com/aptible/supercronic.git
synced 2026-04-22 23:17:11 +08:00
293 lines
7.0 KiB
Go
293 lines
7.0 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"fmt"
|
|
"os"
|
|
"os/signal"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/aptible/supercronic/cron"
|
|
"github.com/aptible/supercronic/crontab"
|
|
"github.com/aptible/supercronic/log/hook"
|
|
"github.com/aptible/supercronic/prometheus_metrics"
|
|
"github.com/evalphobia/logrus_sentry"
|
|
"github.com/fsnotify/fsnotify"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
var (
|
|
Version = "<unset>"
|
|
)
|
|
|
|
var Usage = func() {
|
|
fmt.Fprintf(os.Stderr, "Usage: %s [OPTIONS] CRONTAB\n\nAvailable options:\n", os.Args[0])
|
|
flag.PrintDefaults()
|
|
}
|
|
|
|
func main() {
|
|
debug := flag.Bool("debug", false, "enable debug logging")
|
|
quiet := flag.Bool("quiet", false, "do not log informational messages (takes precedence over debug)")
|
|
json := flag.Bool("json", false, "enable JSON logging")
|
|
printVersion := flag.Bool("version", false, "print version and exit")
|
|
test := flag.Bool("test", false, "test crontab (does not run jobs)")
|
|
inotify := flag.Bool("inotify", false, "use inotify to detect crontab file changes")
|
|
// If this flag changes, update forkExec to disable reaping in the child process
|
|
disableReap := flag.Bool("no-reap", false, "disable reaping of dead processes, note: reaping requires pid 1")
|
|
prometheusListen := flag.String(
|
|
"prometheus-listen-address",
|
|
"",
|
|
fmt.Sprintf(
|
|
"give a valid ip[:port] address to expose Prometheus metrics at /metrics (port defaults to %s), "+
|
|
"use 0.0.0.0 for all network interfaces.",
|
|
prometheus_metrics.DefaultPort,
|
|
),
|
|
)
|
|
splitLogs := flag.Bool("split-logs", false, "split log output into stdout/stderr")
|
|
passthroughLogs := flag.Bool("passthrough-logs", false, "passthrough logs from commands, do not wrap them in Supercronic logging")
|
|
sentry := flag.String("sentry-dsn", "", "enable Sentry error logging, using provided DSN")
|
|
sentryEnvironmentFlag := flag.String("sentry-environment", "", "specify the application's environment for Sentry error reporting")
|
|
sentryReleaseFlag := flag.String("sentry-release", "", "specify the application's release version for Sentry error reporting")
|
|
sentryAlias := flag.String("sentryDsn", "", "alias for sentry-dsn")
|
|
overlapping := flag.Bool("overlapping", false, "enable tasks overlapping")
|
|
flag.Parse()
|
|
|
|
var (
|
|
sentryDsn string
|
|
sentryEnvironment string
|
|
sentryRelease string
|
|
)
|
|
|
|
sentryDsn = os.Getenv("SENTRY_DSN")
|
|
sentryEnvironment = os.Getenv("SENTRY_ENVIRONMENT")
|
|
sentryRelease = os.Getenv("SENTRY_RELEASE")
|
|
|
|
if *sentryAlias != "" {
|
|
sentryDsn = *sentryAlias
|
|
}
|
|
|
|
if *sentry != "" {
|
|
sentryDsn = *sentry
|
|
}
|
|
|
|
if *sentryEnvironmentFlag != "" {
|
|
sentryEnvironment = *sentryEnvironmentFlag
|
|
}
|
|
|
|
if *sentryReleaseFlag != "" {
|
|
sentryRelease = *sentryReleaseFlag
|
|
}
|
|
|
|
if *debug {
|
|
logrus.SetLevel(logrus.DebugLevel)
|
|
}
|
|
|
|
if *quiet {
|
|
logrus.SetLevel(logrus.WarnLevel)
|
|
}
|
|
|
|
if *json {
|
|
logrus.SetFormatter(&logrus.JSONFormatter{})
|
|
} else {
|
|
logrus.SetFormatter(&logrus.TextFormatter{FullTimestamp: true})
|
|
}
|
|
if *splitLogs {
|
|
hook.RegisterSplitLogger(
|
|
logrus.StandardLogger(),
|
|
os.Stdout,
|
|
os.Stderr,
|
|
)
|
|
}
|
|
|
|
if *printVersion {
|
|
fmt.Println(Version)
|
|
os.Exit(0)
|
|
return
|
|
}
|
|
|
|
if flag.NArg() != 1 {
|
|
Usage()
|
|
os.Exit(2)
|
|
return
|
|
}
|
|
if !*disableReap {
|
|
if os.Getpid() == 1 {
|
|
// Clean up zombie processes caused by incorrect crontab commands
|
|
// Use forkExec to avoid random waitid errors
|
|
// https://github.com/aptible/supercronic/issues/88
|
|
// https://github.com/aptible/supercronic/issues/171
|
|
logrus.Info("reaping dead processes")
|
|
forkExec()
|
|
return
|
|
}
|
|
|
|
logrus.Warn("process reaping disabled, not pid 1")
|
|
}
|
|
crontabFileName := flag.Args()[0]
|
|
|
|
var watcher *fsnotify.Watcher
|
|
if *inotify {
|
|
logrus.Info("using inotify to detect crontab file changes")
|
|
var err error
|
|
watcher, err = fsnotify.NewWatcher()
|
|
if err != nil {
|
|
logrus.Fatal(err)
|
|
return
|
|
}
|
|
defer watcher.Close()
|
|
|
|
logrus.Infof("adding file watch for '%s'", crontabFileName)
|
|
if err := watcher.Add(crontabFileName); err != nil {
|
|
logrus.Fatal(err)
|
|
return
|
|
}
|
|
}
|
|
|
|
var sentryHook *logrus_sentry.SentryHook
|
|
if sentryDsn != "" {
|
|
sentryLevels := []logrus.Level{
|
|
logrus.PanicLevel,
|
|
logrus.FatalLevel,
|
|
logrus.ErrorLevel,
|
|
}
|
|
sh, err := logrus_sentry.NewSentryHook(sentryDsn, sentryLevels)
|
|
if err != nil {
|
|
logrus.Fatalf("Could not init sentry logger: %s", err)
|
|
} else {
|
|
sh.Timeout = 5 * time.Second
|
|
sentryHook = sh
|
|
}
|
|
|
|
if sentryEnvironment != "" {
|
|
sh.SetEnvironment(sentryEnvironment)
|
|
}
|
|
|
|
if sentryRelease != "" {
|
|
sh.SetRelease(sentryRelease)
|
|
}
|
|
|
|
if sentryHook != nil {
|
|
logrus.StandardLogger().AddHook(sentryHook)
|
|
}
|
|
}
|
|
|
|
promMetrics := prometheus_metrics.NewPrometheusMetrics()
|
|
|
|
if *prometheusListen != "" {
|
|
promServerShutdownClosure, err := prometheus_metrics.InitHTTPServer(*prometheusListen, context.Background())
|
|
if err != nil {
|
|
logrus.Fatalf("prometheus http startup failed: %s", err.Error())
|
|
}
|
|
|
|
defer func() {
|
|
if err := promServerShutdownClosure(); err != nil {
|
|
logrus.Fatalf("prometheus http shutdown failed: %s", err.Error())
|
|
}
|
|
}()
|
|
}
|
|
|
|
termChan := make(chan os.Signal, 1)
|
|
signal.Notify(termChan, signalList...)
|
|
|
|
if *inotify {
|
|
go func() {
|
|
for {
|
|
select {
|
|
case event, ok := <-watcher.Events:
|
|
if !ok {
|
|
return
|
|
}
|
|
logrus.Debugf("event: %v, watch-list: %v", event, watcher.WatchList())
|
|
|
|
switch event.Op {
|
|
case event.Op & fsnotify.Write:
|
|
logrus.Debug("watched file changed")
|
|
termChan <- syscall.SIGUSR2
|
|
|
|
// workaround for k8s configmap and secret mounts
|
|
case event.Op & fsnotify.Remove:
|
|
logrus.Debug("watched file changed")
|
|
if err := watcher.Add(crontabFileName); err != nil {
|
|
logrus.Fatal(err)
|
|
return
|
|
}
|
|
termChan <- syscall.SIGUSR2
|
|
}
|
|
|
|
case err, ok := <-watcher.Errors:
|
|
if !ok {
|
|
return
|
|
}
|
|
logrus.Error("error:", err)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
for {
|
|
promMetrics.Reset()
|
|
|
|
logrus.Infof("read crontab: %s", crontabFileName)
|
|
tab, err := readCrontabAtPath(crontabFileName)
|
|
|
|
if err != nil {
|
|
logrus.Fatal(err)
|
|
break
|
|
}
|
|
|
|
if *test {
|
|
logrus.Info("crontab is valid")
|
|
os.Exit(0)
|
|
break
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
exitCtx, notifyExit := context.WithCancel(context.Background())
|
|
|
|
for _, job := range tab.Jobs {
|
|
cronLogger := logrus.WithFields(logrus.Fields{
|
|
"job.schedule": job.Schedule,
|
|
"job.command": job.Command,
|
|
"job.position": job.Position,
|
|
})
|
|
|
|
cron.StartJob(&wg, tab.Context, job, exitCtx, cronLogger, *overlapping, *passthroughLogs, &promMetrics)
|
|
}
|
|
|
|
termSig := <-termChan
|
|
|
|
if termSig == syscall.SIGUSR2 {
|
|
logrus.Infof("received %s, reloading crontab", termSig)
|
|
} else {
|
|
logrus.Infof("received %s, shutting down", termSig)
|
|
}
|
|
notifyExit()
|
|
|
|
logrus.Info("waiting for jobs to finish")
|
|
wg.Wait()
|
|
|
|
if termSig != syscall.SIGUSR2 {
|
|
logrus.Info("exiting")
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func readCrontabAtPath(path string) (*crontab.Crontab, error) {
|
|
file, err := os.Open(path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
defer file.Close()
|
|
|
|
return crontab.ParseCrontab(file)
|
|
}
|
|
|
|
var signalList = []os.Signal{
|
|
syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR2,
|
|
}
|