Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add -replacing flag #94

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ You can optionally disable this behavior and allow overlapping instances of
your jobs by passing the `-overlapping` flag to Supercronic. Supercronic will
still warn about jobs falling behind, but will run duplicate instances of them.

If you pass `-replacing` flag and it's time for a new job iteration to run,
Supercronic will kill the previous job process if it hasn't finished yet.

## Reload crontab

Expand Down
37 changes: 30 additions & 7 deletions cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func startReaderDrain(wg *sync.WaitGroup, readerLogger *logrus.Entry, reader io.
}()
}

func runJob(cronCtx *crontab.Context, command string, jobLogger *logrus.Entry, passthroughLogs bool) error {
func runJob(cronCtx *crontab.Context, command string, jobLogger *logrus.Entry, passthroughLogs bool, nextRun time.Time, replacing bool) error {
jobLogger.Info("starting")

cmd := exec.Command(cronCtx.Shell, "-c", command)
Expand Down Expand Up @@ -101,6 +101,22 @@ func runJob(cronCtx *crontab.Context, command string, jobLogger *logrus.Entry, p
return err
}

if replacing {
ctx, cancel := context.WithDeadline(context.Background(), nextRun)
defer cancel()
go func(pid int) {
// Kill command and its sub-processes once the deadline is exceeded.
<-ctx.Done()
if ctx.Err() == context.DeadlineExceeded {
// Negative number tells to kill the whole process group.
// By convention PGID of process group equals to the PID of the
// group leader, so the command process is the first member of
// the process group and is the group leader.
syscall.Kill(-pid, syscall.SIGKILL)
}
}(cmd.Process.Pid)
}

var wg sync.WaitGroup

if stdout != nil {
Expand All @@ -122,7 +138,7 @@ func runJob(cronCtx *crontab.Context, command string, jobLogger *logrus.Entry, p
return nil
}

func monitorJob(ctx context.Context, job *crontab.Job, t0 time.Time, jobLogger *logrus.Entry, overlapping bool, promMetrics *prometheus_metrics.PrometheusMetrics) {
func monitorJob(ctx context.Context, job *crontab.Job, t0 time.Time, jobLogger *logrus.Entry, overlapping bool, replacing bool, promMetrics *prometheus_metrics.PrometheusMetrics) {
t := t0

for {
Expand All @@ -134,6 +150,9 @@ func monitorJob(ctx context.Context, job *crontab.Job, t0 time.Time, jobLogger *
if overlapping {
m = "overlapping jobs"
}
if replacing {
m = "replacing job"
}

jobLogger.Warnf("%s: job is still running since %s (%s elapsed)", m, t0, t.Sub(t0))

Expand All @@ -149,9 +168,10 @@ func startFunc(
exitCtx context.Context,
logger *logrus.Entry,
overlapping bool,
replacing bool,
expression crontab.Expression,
timezone *time.Location,
fn func(time.Time, *logrus.Entry),
fn func(time.Time, *logrus.Entry, bool),
) {
wg.Add(1)

Expand Down Expand Up @@ -200,7 +220,7 @@ func startFunc(
"iteration": cronIteration,
})

fn(nextRun, jobLogger)
fn(nextRun, jobLogger, replacing)
}

if overlapping {
Expand All @@ -221,10 +241,11 @@ func StartJob(
exitCtx context.Context,
cronLogger *logrus.Entry,
overlapping bool,
replacing bool,
passthroughLogs bool,
promMetrics *prometheus_metrics.PrometheusMetrics,
) {
runThisJob := func(t0 time.Time, jobLogger *logrus.Entry) {
runThisJob := func(t0 time.Time, jobLogger *logrus.Entry, replacing bool) {
promMetrics.CronsCurrentlyRunningGauge.With(jobPromLabels(job)).Inc()

defer func() {
Expand All @@ -234,15 +255,16 @@ func StartJob(
monitorCtx, cancelMonitor := context.WithCancel(context.Background())
defer cancelMonitor()

go monitorJob(monitorCtx, job, t0, jobLogger, overlapping, promMetrics)
go monitorJob(monitorCtx, job, t0, jobLogger, overlapping, replacing, promMetrics)

timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
promMetrics.CronsExecutionTimeHistogram.With(jobPromLabels(job)).Observe(v)
}))

defer timer.ObserveDuration()

err := runJob(cronCtx, job.Command, jobLogger, passthroughLogs)
nextRun := job.Expression.Next(t0)
err := runJob(cronCtx, job.Command, jobLogger, passthroughLogs, nextRun, replacing)

promMetrics.CronsExecCounter.With(jobPromLabels(job)).Inc()

Expand All @@ -262,6 +284,7 @@ func StartJob(
exitCtx,
cronLogger,
overlapping,
replacing,
job.Expression,
cronCtx.Timezone,
runThisJob,
Expand Down
78 changes: 67 additions & 11 deletions cron/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestRunJob(t *testing.T) {
label := fmt.Sprintf("RunJob(%q)", tt.command)
logger, channel := newTestLogger()

err := runJob(tt.context, tt.command, logger, false)
err := runJob(tt.context, tt.command, logger, false, time.Now(), false)
if tt.success {
assert.Nil(t, err, label)
} else {
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestStartJobExitsOnRequest(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()

StartJob(&wg, &basicContext, &job, ctx, logger, false, false, &PROM_METRICS)
StartJob(&wg, &basicContext, &job, ctx, logger, false, false, false, &PROM_METRICS)

wg.Wait()
}
Expand All @@ -218,7 +218,7 @@ func TestStartJobRunsJob(t *testing.T) {

logger, channel := newTestLogger()

StartJob(&wg, &basicContext, &job, ctx, logger, false, false, &PROM_METRICS)
StartJob(&wg, &basicContext, &job, ctx, logger, false, false, false, &PROM_METRICS)

select {
case entry := <-channel:
Expand Down Expand Up @@ -266,6 +266,62 @@ func TestStartJobRunsJob(t *testing.T) {
wg.Wait()
}

func TestStartJobReplacesPreviousJobs(t *testing.T) {
job := crontab.Job{
CrontabLine: crontab.CrontabLine{
Expression: &testExpression{2 * time.Second},
Schedule: "always!",
Command: "sleep 100",
},
Position: 1,
}

var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())

logger, channel := newTestLogger()

StartJob(&wg, &basicContext, &job, ctx, logger, false, true, false, &PROM_METRICS)

select {
case entry := <-channel:
assert.Regexp(t, regexp.MustCompile("job will run next"), entry.Message)
case <-time.After(time.Second):
t.Fatalf("timed out waiting for schedule")
}

select {
case entry := <-channel:
assert.Regexp(t, regexp.MustCompile("starting"), entry.Message)
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for start")
}

select {
case entry := <-channel:
assert.Regexp(t, regexp.MustCompile("replacing job"), entry.Message)
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for job replace warning")
}

select {
case entry := <-channel:
assert.Regexp(t, regexp.MustCompile("killed"), entry.Message)
case <-time.After(time.Second):
t.Fatalf("timed out waiting for job kill")
}

select {
case entry := <-channel:
assert.Regexp(t, regexp.MustCompile("job will run next"), entry.Message)
case <-time.After(time.Second):
t.Fatalf("timed out waiting for schedule of the second job iteration")
}

cancel()
wg.Wait()
}

func TestStartFuncWaitsForCompletion(t *testing.T) {
// We use startFunc to start a function, wait for it to start, then
// tell the whole thing to exit, and verify that it waits for the
Expand All @@ -281,12 +337,12 @@ func TestStartFuncWaitsForCompletion(t *testing.T) {
ctxStep1, step1Done := context.WithCancel(context.Background())
ctxStep2, step2Done := context.WithCancel(context.Background())

testFn := func(t0 time.Time, jobLogger *logrus.Entry) {
testFn := func(t0 time.Time, jobLogger *logrus.Entry, replacing bool) {
step1Done()
<-ctxStep2.Done()
}

startFunc(&wg, ctxStartFunc, logger, false, expr, time.Local, testFn)
startFunc(&wg, ctxStartFunc, logger, false, false, expr, time.Local, testFn)
go func() {
wg.Wait()
allDone()
Expand Down Expand Up @@ -329,12 +385,12 @@ func TestStartFuncDoesNotRunOverlappingJobs(t *testing.T) {
ctxStartFunc, cancelStartFunc := context.WithCancel(context.Background())
ctxAllDone, allDone := context.WithCancel(context.Background())

testFn := func(t0 time.Time, jobLogger *logrus.Entry) {
testFn := func(t0 time.Time, jobLogger *logrus.Entry, replacing bool) {
testChan <- nil
<-ctxAllDone.Done()
}

startFunc(&wg, ctxStartFunc, logger, false, expr, time.Local, testFn)
startFunc(&wg, ctxStartFunc, logger, false, false, expr, time.Local, testFn)

select {
case <-testChan:
Expand Down Expand Up @@ -368,12 +424,12 @@ func TestStartFuncRunsOverlappingJobs(t *testing.T) {
ctxStartFunc, cancelStartFunc := context.WithCancel(context.Background())
ctxAllDone, allDone := context.WithCancel(context.Background())

testFn := func(t0 time.Time, jobLogger *logrus.Entry) {
testFn := func(t0 time.Time, jobLogger *logrus.Entry, replacing bool) {
testChan <- nil
<-ctxAllDone.Done()
}

startFunc(&wg, ctxStartFunc, logger, true, expr, time.Local, testFn)
startFunc(&wg, ctxStartFunc, logger, true, false, expr, time.Local, testFn)

for i := 0; i < 5; i++ {
select {
Expand Down Expand Up @@ -406,7 +462,7 @@ func TestStartFuncUsesTz(t *testing.T) {

it := 0

testFn := func(t0 time.Time, jobLogger *logrus.Entry) {
testFn := func(t0 time.Time, jobLogger *logrus.Entry, replacing bool) {
testChan <- t0.Location()
it += 1

Expand All @@ -422,7 +478,7 @@ func TestStartFuncUsesTz(t *testing.T) {
}
}

startFunc(&wg, ctxStartFunc, logger, false, expr, loc, testFn)
startFunc(&wg, ctxStartFunc, logger, false, false, expr, loc, testFn)

for i := 0; i < 5; i++ {
select {
Expand Down
5 changes: 5 additions & 0 deletions integration/test.bats
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ wait_for() {
[[ "$n" -ge 4 ]]
}

@test "it runs replacing jobs" {
n="$(SUPERCRONIC_ARGS="-replacing" run_supercronic "${BATS_TEST_DIRNAME}/timeout.crontab" 5s | grep -iE "killed" | wc -l)"
[[ "$n" -ge 3 ]]
}

@test "it supports debug logging " {
SUPERCRONIC_ARGS="-debug" run_supercronic "${BATS_TEST_DIRNAME}/hello.crontab" | grep -iE "debug"
}
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func main() {
sentry := flag.String("sentry-dsn", "", "enable Sentry error logging, using provided DSN")
sentryAlias := flag.String("sentryDsn", "", "alias for sentry-dsn")
overlapping := flag.Bool("overlapping", false, "enable tasks overlapping")
replacing := flag.Bool("replacing", false, "enable tasks replacing")
flag.Parse()

var sentryDsn string
Expand Down Expand Up @@ -147,7 +148,7 @@ func main() {
"job.position": job.Position,
})

cron.StartJob(&wg, tab.Context, job, exitCtx, cronLogger, *overlapping, *passthroughLogs, &promMetrics)
cron.StartJob(&wg, tab.Context, job, exitCtx, cronLogger, *overlapping, *replacing, *passthroughLogs, &promMetrics)
}

termChan := make(chan os.Signal, 1)
Expand Down