From f725b6a17afc094f6e75f811490fca9e8f40c88a Mon Sep 17 00:00:00 2001 From: quite4work <58137382+quite4work@users.noreply.github.com> Date: Fri, 8 Oct 2021 12:10:44 +0300 Subject: [PATCH] Add `-replacing` flag Co-authored-by: tyranron --- README.md | 2 ++ cron/cron.go | 37 ++++++++++++++++---- cron/cron_test.go | 78 +++++++++++++++++++++++++++++++++++++------ integration/test.bats | 5 +++ main.go | 3 +- 5 files changed, 106 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index 80d14d1..546e20a 100644 --- a/README.md +++ b/README.md @@ -212,6 +212,8 @@ You can optionally disable this behavior and allow overlapping instances of your jobs by passing the `-overlapping` flag to Supercronic. Supercronic will still warn about jobs falling behind, but will run duplicate instances of them. +If you pass `-replacing` flag and it's time for a new job iteration to run, +Supercronic will kill the previous job process if it hasn't finished yet. ## Reload crontab diff --git a/cron/cron.go b/cron/cron.go index 512dc5c..248dff2 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -63,7 +63,7 @@ func startReaderDrain(wg *sync.WaitGroup, readerLogger *logrus.Entry, reader io. }() } -func runJob(cronCtx *crontab.Context, command string, jobLogger *logrus.Entry, passthroughLogs bool) error { +func runJob(cronCtx *crontab.Context, command string, jobLogger *logrus.Entry, passthroughLogs bool, nextRun time.Time, replacing bool) error { jobLogger.Info("starting") cmd := exec.Command(cronCtx.Shell, "-c", command) @@ -101,6 +101,22 @@ func runJob(cronCtx *crontab.Context, command string, jobLogger *logrus.Entry, p return err } + if replacing { + ctx, cancel := context.WithDeadline(context.Background(), nextRun) + defer cancel() + go func(pid int) { + // Kill command and its sub-processes once the deadline is exceeded. + <-ctx.Done() + if ctx.Err() == context.DeadlineExceeded { + // Negative number tells to kill the whole process group. + // By convention PGID of process group equals to the PID of the + // group leader, so the command process is the first member of + // the process group and is the group leader. + syscall.Kill(-pid, syscall.SIGKILL) + } + }(cmd.Process.Pid) + } + var wg sync.WaitGroup if stdout != nil { @@ -122,7 +138,7 @@ func runJob(cronCtx *crontab.Context, command string, jobLogger *logrus.Entry, p return nil } -func monitorJob(ctx context.Context, job *crontab.Job, t0 time.Time, jobLogger *logrus.Entry, overlapping bool, promMetrics *prometheus_metrics.PrometheusMetrics) { +func monitorJob(ctx context.Context, job *crontab.Job, t0 time.Time, jobLogger *logrus.Entry, overlapping bool, replacing bool, promMetrics *prometheus_metrics.PrometheusMetrics) { t := t0 for { @@ -134,6 +150,9 @@ func monitorJob(ctx context.Context, job *crontab.Job, t0 time.Time, jobLogger * if overlapping { m = "overlapping jobs" } + if replacing { + m = "replacing job" + } jobLogger.Warnf("%s: job is still running since %s (%s elapsed)", m, t0, t.Sub(t0)) @@ -149,9 +168,10 @@ func startFunc( exitCtx context.Context, logger *logrus.Entry, overlapping bool, + replacing bool, expression crontab.Expression, timezone *time.Location, - fn func(time.Time, *logrus.Entry), + fn func(time.Time, *logrus.Entry, bool), ) { wg.Add(1) @@ -200,7 +220,7 @@ func startFunc( "iteration": cronIteration, }) - fn(nextRun, jobLogger) + fn(nextRun, jobLogger, replacing) } if overlapping { @@ -221,10 +241,11 @@ func StartJob( exitCtx context.Context, cronLogger *logrus.Entry, overlapping bool, + replacing bool, passthroughLogs bool, promMetrics *prometheus_metrics.PrometheusMetrics, ) { - runThisJob := func(t0 time.Time, jobLogger *logrus.Entry) { + runThisJob := func(t0 time.Time, jobLogger *logrus.Entry, replacing bool) { promMetrics.CronsCurrentlyRunningGauge.With(jobPromLabels(job)).Inc() defer func() { @@ -234,7 +255,7 @@ func StartJob( monitorCtx, cancelMonitor := context.WithCancel(context.Background()) defer cancelMonitor() - go monitorJob(monitorCtx, job, t0, jobLogger, overlapping, promMetrics) + go monitorJob(monitorCtx, job, t0, jobLogger, overlapping, replacing, promMetrics) timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) { promMetrics.CronsExecutionTimeHistogram.With(jobPromLabels(job)).Observe(v) @@ -242,7 +263,8 @@ func StartJob( defer timer.ObserveDuration() - err := runJob(cronCtx, job.Command, jobLogger, passthroughLogs) + nextRun := job.Expression.Next(t0) + err := runJob(cronCtx, job.Command, jobLogger, passthroughLogs, nextRun, replacing) promMetrics.CronsExecCounter.With(jobPromLabels(job)).Inc() @@ -262,6 +284,7 @@ func StartJob( exitCtx, cronLogger, overlapping, + replacing, job.Expression, cronCtx.Timezone, runThisJob, diff --git a/cron/cron_test.go b/cron/cron_test.go index ca1a67c..8d0d656 100644 --- a/cron/cron_test.go +++ b/cron/cron_test.go @@ -150,7 +150,7 @@ func TestRunJob(t *testing.T) { label := fmt.Sprintf("RunJob(%q)", tt.command) logger, channel := newTestLogger() - err := runJob(tt.context, tt.command, logger, false) + err := runJob(tt.context, tt.command, logger, false, time.Now(), false) if tt.success { assert.Nil(t, err, label) } else { @@ -198,7 +198,7 @@ func TestStartJobExitsOnRequest(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - StartJob(&wg, &basicContext, &job, ctx, logger, false, false, &PROM_METRICS) + StartJob(&wg, &basicContext, &job, ctx, logger, false, false, false, &PROM_METRICS) wg.Wait() } @@ -218,7 +218,7 @@ func TestStartJobRunsJob(t *testing.T) { logger, channel := newTestLogger() - StartJob(&wg, &basicContext, &job, ctx, logger, false, false, &PROM_METRICS) + StartJob(&wg, &basicContext, &job, ctx, logger, false, false, false, &PROM_METRICS) select { case entry := <-channel: @@ -266,6 +266,62 @@ func TestStartJobRunsJob(t *testing.T) { wg.Wait() } +func TestStartJobReplacesPreviousJobs(t *testing.T) { + job := crontab.Job{ + CrontabLine: crontab.CrontabLine{ + Expression: &testExpression{2 * time.Second}, + Schedule: "always!", + Command: "sleep 100", + }, + Position: 1, + } + + var wg sync.WaitGroup + ctx, cancel := context.WithCancel(context.Background()) + + logger, channel := newTestLogger() + + StartJob(&wg, &basicContext, &job, ctx, logger, false, true, false, &PROM_METRICS) + + select { + case entry := <-channel: + assert.Regexp(t, regexp.MustCompile("job will run next"), entry.Message) + case <-time.After(time.Second): + t.Fatalf("timed out waiting for schedule") + } + + select { + case entry := <-channel: + assert.Regexp(t, regexp.MustCompile("starting"), entry.Message) + case <-time.After(3 * time.Second): + t.Fatalf("timed out waiting for start") + } + + select { + case entry := <-channel: + assert.Regexp(t, regexp.MustCompile("replacing job"), entry.Message) + case <-time.After(3 * time.Second): + t.Fatalf("timed out waiting for job replace warning") + } + + select { + case entry := <-channel: + assert.Regexp(t, regexp.MustCompile("killed"), entry.Message) + case <-time.After(time.Second): + t.Fatalf("timed out waiting for job kill") + } + + select { + case entry := <-channel: + assert.Regexp(t, regexp.MustCompile("job will run next"), entry.Message) + case <-time.After(time.Second): + t.Fatalf("timed out waiting for schedule of the second job iteration") + } + + cancel() + wg.Wait() +} + func TestStartFuncWaitsForCompletion(t *testing.T) { // We use startFunc to start a function, wait for it to start, then // tell the whole thing to exit, and verify that it waits for the @@ -281,12 +337,12 @@ func TestStartFuncWaitsForCompletion(t *testing.T) { ctxStep1, step1Done := context.WithCancel(context.Background()) ctxStep2, step2Done := context.WithCancel(context.Background()) - testFn := func(t0 time.Time, jobLogger *logrus.Entry) { + testFn := func(t0 time.Time, jobLogger *logrus.Entry, replacing bool) { step1Done() <-ctxStep2.Done() } - startFunc(&wg, ctxStartFunc, logger, false, expr, time.Local, testFn) + startFunc(&wg, ctxStartFunc, logger, false, false, expr, time.Local, testFn) go func() { wg.Wait() allDone() @@ -329,12 +385,12 @@ func TestStartFuncDoesNotRunOverlappingJobs(t *testing.T) { ctxStartFunc, cancelStartFunc := context.WithCancel(context.Background()) ctxAllDone, allDone := context.WithCancel(context.Background()) - testFn := func(t0 time.Time, jobLogger *logrus.Entry) { + testFn := func(t0 time.Time, jobLogger *logrus.Entry, replacing bool) { testChan <- nil <-ctxAllDone.Done() } - startFunc(&wg, ctxStartFunc, logger, false, expr, time.Local, testFn) + startFunc(&wg, ctxStartFunc, logger, false, false, expr, time.Local, testFn) select { case <-testChan: @@ -368,12 +424,12 @@ func TestStartFuncRunsOverlappingJobs(t *testing.T) { ctxStartFunc, cancelStartFunc := context.WithCancel(context.Background()) ctxAllDone, allDone := context.WithCancel(context.Background()) - testFn := func(t0 time.Time, jobLogger *logrus.Entry) { + testFn := func(t0 time.Time, jobLogger *logrus.Entry, replacing bool) { testChan <- nil <-ctxAllDone.Done() } - startFunc(&wg, ctxStartFunc, logger, true, expr, time.Local, testFn) + startFunc(&wg, ctxStartFunc, logger, true, false, expr, time.Local, testFn) for i := 0; i < 5; i++ { select { @@ -406,7 +462,7 @@ func TestStartFuncUsesTz(t *testing.T) { it := 0 - testFn := func(t0 time.Time, jobLogger *logrus.Entry) { + testFn := func(t0 time.Time, jobLogger *logrus.Entry, replacing bool) { testChan <- t0.Location() it += 1 @@ -422,7 +478,7 @@ func TestStartFuncUsesTz(t *testing.T) { } } - startFunc(&wg, ctxStartFunc, logger, false, expr, loc, testFn) + startFunc(&wg, ctxStartFunc, logger, false, false, expr, loc, testFn) for i := 0; i < 5; i++ { select { diff --git a/integration/test.bats b/integration/test.bats index 1f4d8aa..c5998fd 100644 --- a/integration/test.bats +++ b/integration/test.bats @@ -62,6 +62,11 @@ wait_for() { [[ "$n" -ge 4 ]] } +@test "it runs replacing jobs" { + n="$(SUPERCRONIC_ARGS="-replacing" run_supercronic "${BATS_TEST_DIRNAME}/timeout.crontab" 5s | grep -iE "killed" | wc -l)" + [[ "$n" -ge 3 ]] +} + @test "it supports debug logging " { SUPERCRONIC_ARGS="-debug" run_supercronic "${BATS_TEST_DIRNAME}/hello.crontab" | grep -iE "debug" } diff --git a/main.go b/main.go index a8f1990..7de51ce 100644 --- a/main.go +++ b/main.go @@ -42,6 +42,7 @@ func main() { sentry := flag.String("sentry-dsn", "", "enable Sentry error logging, using provided DSN") sentryAlias := flag.String("sentryDsn", "", "alias for sentry-dsn") overlapping := flag.Bool("overlapping", false, "enable tasks overlapping") + replacing := flag.Bool("replacing", false, "enable tasks replacing") flag.Parse() var sentryDsn string @@ -147,7 +148,7 @@ func main() { "job.position": job.Position, }) - cron.StartJob(&wg, tab.Context, job, exitCtx, cronLogger, *overlapping, *passthroughLogs, &promMetrics) + cron.StartJob(&wg, tab.Context, job, exitCtx, cronLogger, *overlapping, *replacing, *passthroughLogs, &promMetrics) } termChan := make(chan os.Signal, 1)