Skip to content

Commit

Permalink
reduction step failing fails jobs (hapifhir#5831)
Browse files Browse the repository at this point in the history
* adding a catch for failing reduction step

* spotless

* added changelog

---------

Co-authored-by: leif stawnyczy <[email protected]>
  • Loading branch information
TipzCM and leif stawnyczy authored Apr 8, 2024
1 parent eafa2ab commit 107de2c
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
type: fix
issue: 5828
title: "When batch 2 jobs with Reduction steps fail in the final part
of the reduction step, this would often leave the job
stuck in the FINALIZE state.
This has been fixed; the job will now FAIL.
"
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,61 @@ public void testFirstStepToSecondStep_singleChunkFasttracks() throws Interrupted
assertEquals(1.0, jobInstance.getProgress());
}

@Test
public void reductionStepFailing_willFailJob() throws InterruptedException {
// setup
String jobId = new Exception().getStackTrace()[0].getMethodName();
int totalChunks = 3;
AtomicInteger chunkCounter = new AtomicInteger();
String error = "this is an error";

buildAndDefine3StepReductionJob(jobId, new IReductionStepHandler() {

@Override
public void firstStep(StepExecutionDetails<TestJobParameters, VoidModel> theStep, IJobDataSink<FirstStepOutput> theDataSink) {
for (int i = 0; i < totalChunks; i++) {
theDataSink.accept(new FirstStepOutput());
}
}

@Override
public void secondStep(StepExecutionDetails<TestJobParameters, FirstStepOutput> theStep, IJobDataSink<SecondStepOutput> theDataSink) {
SecondStepOutput output = new SecondStepOutput();
theDataSink.accept(output);
}

@Override
public void reductionStepConsume(ChunkExecutionDetails<TestJobParameters, SecondStepOutput> theChunkDetails, IJobDataSink<ReductionStepOutput> theDataSink) {
chunkCounter.getAndIncrement();
}

@Override
public void reductionStepRun(StepExecutionDetails<TestJobParameters, SecondStepOutput> theStepExecutionDetails, IJobDataSink<ReductionStepOutput> theDataSink) {
// always throw
throw new RuntimeException(error);
}
});

// test
JobInstanceStartRequest request = buildRequest(jobId);
myFirstStepLatch.setExpectedCount(1);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
String instanceId = startResponse.getInstanceId();
assertNotNull(instanceId);

// waiting for job to end (any status - but we'll verify failed later)
myBatch2JobHelper.awaitJobHasStatus(instanceId, StatusEnum.getEndedStatuses().toArray(new StatusEnum[0]));

// verify
Optional<JobInstance> instanceOp = myJobPersistence.fetchInstance(instanceId);
assertTrue(instanceOp.isPresent());
JobInstance jobInstance = instanceOp.get();

assertEquals(totalChunks, chunkCounter.get());

assertEquals(StatusEnum.FAILED, jobInstance.getStatus());
}

@Test
public void testJobWithReductionStepFiresCompletionHandler() throws InterruptedException {
// setup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
Expand Down Expand Up @@ -137,7 +138,6 @@ public void triggerReductionStep(String theInstanceId, JobWorkCursor<?, ?, ?> th
public void reducerPass() {
if (myCurrentlyExecuting.tryAcquire()) {
try {

String[] instanceIds = myInstanceIdToJobWorkCursor.keySet().toArray(new String[0]);
if (instanceIds.length > 0) {
String instanceId = instanceIds[0];
Expand Down Expand Up @@ -214,6 +214,36 @@ ReductionStepChunkProcessingResponse executeReductionStep(
boolean defaultSuccessValue = true;
ReductionStepChunkProcessingResponse response = new ReductionStepChunkProcessingResponse(defaultSuccessValue);

try {
processChunksAndCompleteJob(theJobWorkCursor, step, instance, parameters, reductionStepWorker, response);
} catch (Exception ex) {
ourLog.error("Job completion failed for Job {}", instance.getInstanceId());

executeInTransactionWithSynchronization(() -> {
myJobPersistence.updateInstance(instance.getInstanceId(), theInstance -> {
theInstance.setStatus(StatusEnum.FAILED);
return true;
});
return null;
});
response.setSuccessful(false);
}

// if no successful chunks, return false
if (!response.hasSuccessfulChunksIds()) {
response.setSuccessful(false);
}

return response;
}

private <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> void processChunksAndCompleteJob(
JobWorkCursor<PT, IT, OT> theJobWorkCursor,
JobDefinitionStep<PT, IT, OT> step,
JobInstance instance,
PT parameters,
IReductionStepWorker<PT, IT, OT> reductionStepWorker,
ReductionStepChunkProcessingResponse response) {
try {
executeInTransactionWithSynchronization(() -> {
try (Stream<WorkChunk> chunkIterator =
Expand Down Expand Up @@ -277,13 +307,6 @@ ReductionStepChunkProcessingResponse executeReductionStep(
return null;
});
}

// if no successful chunks, return false
if (!response.hasSuccessfulChunksIds()) {
response.setSuccessful(false);
}

return response;
}

private <T> T executeInTransactionWithSynchronization(Callable<T> runnable) {
Expand Down

0 comments on commit 107de2c

Please sign in to comment.