From 013db14c6bbd424bf171c242875eeade86e471c1 Mon Sep 17 00:00:00 2001 From: Albert Louis Rossi Date: Thu, 9 Nov 2023 08:21:28 -0600 Subject: [PATCH] dcache-bulk: fix handling of uncaught exceptions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Motivation: See https://github.com/dCache/dcache/issues/7414 tape API/bulk: seem to choke on any interval other than "PND" and "PTNH" The principal issue here –– properly failing the request on an unexpected exception –– is fixed. Modification: The update of the target state is now properly handled. The container job update is fixed so that the state is stored when the error is added. > NOTE: separate patches will be necessary for 9.1/9.0 > and for 8.2. Result: Proper completion of a request beset by an uncaught or unexpected exception. Target: master Request: 9.2 Patch: https://rb.dcache.org/r/14162/ Bug: #7414 Closes: #7414 Requires-notes: yes Acked-by: Tigran --- .../bulk/job/BulkRequestContainerJob.java | 56 ++++++++++++------- 1 file changed, 36 insertions(+), 20 deletions(-) diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java index 0a34193a25e..952e8160a8b 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java @@ -96,7 +96,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; import javax.security.auth.Subject; import org.dcache.auth.attributes.Restriction; import org.dcache.cells.AbstractMessageCallback; @@ -244,7 +243,6 @@ enum TaskState { */ abstract class ContainerTask implements Runnable { - final Consumer errorHandler = e -> uncaughtException(Thread.currentThread(), e); final long seqNo = taskCounter.getAndIncrement(); final PermitHolder permitHolder = new PermitHolder(); @@ -254,15 +252,8 @@ abstract class ContainerTask implements Runnable { public void run() { try { doInner(); - } catch (InterruptedException e) { - remove(); - containerState = ContainerState.STOP; - jobTarget.setErrorObject(e); - update(CANCELLED); } catch (Throwable e) { - remove(); - errorHandler.accept(e); - Throwables.throwIfUnchecked(e); + handleException(e); } } @@ -286,6 +277,18 @@ void expandDepthFirst(Long id, PID pid, FsPath path, FileAttributes dirAttribute } } + void handleException(Throwable e) { + remove(); + if (e instanceof InterruptedException) { + containerState = ContainerState.STOP; + jobTarget.setErrorObject(e); + update(CANCELLED); + } else { + uncaughtException(Thread.currentThread(), e); + Throwables.throwIfUnchecked(e); + } + } + void submitAsync() throws InterruptedException { /* * Acquisition must be done outside the synchronized block (running), @@ -478,6 +481,15 @@ void doInner() throws Throwable { } } + void handleException(Throwable e) { + target.setErrorObject(e); + if (activityFuture == null) { + activityFuture = Futures.immediateFailedFuture(Throwables.getRootCause(e)); + } + handleCompletion(); + super.handleException(e); + } + void performSync() throws InterruptedException { performActivity(false); @@ -935,7 +947,7 @@ public void uncaughtException(Thread t, Throwable e) { */ containerState = ContainerState.STOP; jobTarget.setErrorObject(e); - update(FAILED); + update(); ThreadGroup group = t.getThreadGroup(); if (group != null) { group.uncaughtException(t, e); @@ -946,15 +958,7 @@ public void uncaughtException(Thread t, Throwable e) { public void update(State state) { if (jobTarget.setState(state)) { - try { - targetStore.update(jobTarget.getId(), jobTarget.getState(), - jobTarget.getErrorType(), - jobTarget.getErrorMessage()); - } catch (BulkStorageException e) { - LOGGER.error("{}, updateJobState: {}", ruid, e.toString()); - } - - signalStateChange(); + update(); } } @@ -1099,4 +1103,16 @@ private BulkRequestTarget toTarget(Long id, PID pid, FsPath path, .createdAt(System.currentTimeMillis()).errorType(errorType) .errorMessage(errorMessage).path(path).build(); } + + private void update() { + try { + targetStore.update(jobTarget.getId(), jobTarget.getState(), + jobTarget.getErrorType(), + jobTarget.getErrorMessage()); + } catch (BulkStorageException e) { + LOGGER.error("{}, updateJobState: {}", ruid, e.toString()); + } + + signalStateChange(); + } } \ No newline at end of file