Skip to content

Commit

Permalink
Merge pull request #42072 from manovotn/issue42034
Browse files Browse the repository at this point in the history
Quartz - allow bean based jobs to be interruptable
  • Loading branch information
mkouba authored Jul 23, 2024
2 parents 1038be4 + 176649a commit e1987aa
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package io.quarkus.quartz.test.programmatic;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.quartz.InterruptableJob;
import org.quartz.Job;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.UnableToInterruptJobException;

import io.quarkus.test.QuarkusUnitTest;

public class InterruptableJobTest {

@RegisterExtension
static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot((jar) -> jar
.addClasses(MyJob.class)
.addAsResource(new StringAsset("quarkus.scheduler.start-mode=forced"),
"application.properties"));

@Inject
Scheduler scheduler;

static final CountDownLatch INTERRUPT_LATCH = new CountDownLatch(1);
static final CountDownLatch EXECUTE_LATCH = new CountDownLatch(1);

static final CountDownLatch NON_INTERRUPTABLE_EXECUTE_LATCH = new CountDownLatch(1);
static final CountDownLatch NON_INTERRUPTABLE_HOLD_LATCH = new CountDownLatch(1);

@Test
public void testInterruptableJob() throws InterruptedException {

String jobKey = "myJob";
JobKey key = new JobKey(jobKey);
Trigger trigger = TriggerBuilder.newTrigger()
.startNow()
.build();

JobDetail job = JobBuilder.newJob(MyJob.class)
.withIdentity(key)
.build();

try {
scheduler.scheduleJob(job, trigger);
// wait for job to start executing, then interrupt
EXECUTE_LATCH.await(2, TimeUnit.SECONDS);
scheduler.interrupt(key);
} catch (SchedulerException e) {
throw new RuntimeException(e);
}

assertTrue(INTERRUPT_LATCH.await(5, TimeUnit.SECONDS));
}

@Test
public void testNonInterruptableJob() throws InterruptedException {

String jobKey = "myNonInterruptableJob";
JobKey key = new JobKey(jobKey);
Trigger trigger = TriggerBuilder.newTrigger()
.startNow()
.build();

JobDetail job = JobBuilder.newJob(MyNonInterruptableJob.class)
.withIdentity(key)
.build();

try {
scheduler.scheduleJob(job, trigger);
} catch (SchedulerException e) {
throw new RuntimeException(e);
}

// wait for job to start executing, then interrupt
NON_INTERRUPTABLE_EXECUTE_LATCH.await(2, TimeUnit.SECONDS);
try {
scheduler.interrupt(key);
fail("Should have thrown UnableToInterruptJobException");
} catch (UnableToInterruptJobException e) {
// This is expected, release the latch holding the job
NON_INTERRUPTABLE_HOLD_LATCH.countDown();
}
}

@ApplicationScoped
static class MyJob implements InterruptableJob {

@Override
public void execute(JobExecutionContext context) {
EXECUTE_LATCH.countDown();
try {
// halt execution so that we can interrupt it
INTERRUPT_LATCH.await(4, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Override
public void interrupt() {
INTERRUPT_LATCH.countDown();
}
}

@ApplicationScoped
static class MyNonInterruptableJob implements Job {

@Override
public void execute(JobExecutionContext context) {
NON_INTERRUPTABLE_EXECUTE_LATCH.countDown();
try {
// halt execution so that we can interrupt it
NON_INTERRUPTABLE_HOLD_LATCH.await(4, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Instance;

import org.quartz.InterruptableJob;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.Scheduler;
import org.quartz.UnableToInterruptJobException;
import org.quartz.spi.TriggerFiredBundle;

/**
Expand All @@ -15,7 +17,7 @@
* trigger.
* We will therefore create a new dependent bean for every trigger and destroy it afterwards.
*/
class CdiAwareJob implements Job {
class CdiAwareJob implements InterruptableJob {

private final Instance<? extends Job> jobInstance;

Expand All @@ -34,4 +36,22 @@ public void execute(JobExecutionContext context) throws JobExecutionException {
}
}
}

@Override
public void interrupt() throws UnableToInterruptJobException {
Instance.Handle<? extends Job> handle = jobInstance.getHandle();
// delegate if possible; throw an exception in other cases
if (InterruptableJob.class.isAssignableFrom(handle.getBean().getBeanClass())) {
try {
((InterruptableJob) handle.get()).interrupt();
} finally {
if (handle.getBean().getScope().equals(Dependent.class)) {
handle.destroy();
}
}
} else {
throw new UnableToInterruptJobException("Job " + handle.getBean().getBeanClass()
+ " can not be interrupted, since it does not implement " + InterruptableJob.class.getName());
}
}
}

0 comments on commit e1987aa

Please sign in to comment.