From 3a255bb92c80bf7072d6fc60dbf88fff4855eb69 Mon Sep 17 00:00:00 2001 From: Albert Louis Rossi Date: Wed, 15 Nov 2023 07:39:11 -0600 Subject: [PATCH] dcache-bulk,dcache-qos: repair mass cancellation issues Motivation: The latest modifications (9.2 and master) created a regression in how bulk request cancellation is handled. The symptoms present themselves under two guises: - cancellation stalls and requires a second cancellation to complete; this is not always successful in cancelling all the requests in question. - cancellation en masse provokes a large number of timeout failures from QoS because of message congestion and response time. Modifications: The second problem seems to be solved fairly easily by increasing a few default message timeouts from 1 to 5 minutes (both bulk and qos). These are reset by this patch. The first problem has been addressed as follows: 1. `checkForRequestCancellation()` has been added in a number positions (some accidentally eliminated from earlier versions) allowing ongoing operations to be short-circuited. This includes any writing of state to the database. 2. The update of the database entries to CANCELLED is now done before (rather than after) the attempt to cancel in-flight activities; since we now short-circuit writing of state after completion on cancelled targets. Discovered targets which have not yet been stored should be ignored. In addition to these changes, this patch also includes: - A refactoring of the handling of RuntimeExceptions which makes for clearer code and which does not throw or catch Throwable. - Correction of an error in `handleCompletion` where `cancel` is called on the ContainerTask instead the container itself. - A minor refactoring of the `checkForRequestCancellation` method. - Elimination of redundant iterator remove() in one cancellation method. Result: Mass cancellation of all running requests now finishes correctly. (Note that there may be a tail of operations still being cancelled by the `QoSEngine` thereafter, but everything eventually settles out.) Both non-recursive and recursive requests were tested. Target: master Request: 9.2 Patch: https://rb.dcache.org/r/14169 Requires-notes: yes (Fixes cancellation bugs in Bulk) Acked-by: Tigran --- .../bulk/job/BulkRequestContainerJob.java | 137 +++++++++++------- .../services/bulk/util/BulkRequestTarget.java | 2 +- skel/share/defaults/bulk.properties | 4 +- skel/share/defaults/qos.properties | 8 +- 4 files changed, 96 insertions(+), 55 deletions(-) diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java index 952e8160a8b..15afa59ee02 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java @@ -97,6 +97,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; +import javax.ws.rs.HEAD; import org.dcache.auth.attributes.Restriction; import org.dcache.cells.AbstractMessageCallback; import org.dcache.cells.CellStub; @@ -252,8 +253,8 @@ abstract class ContainerTask implements Runnable { public void run() { try { doInner(); - } catch (Throwable e) { - handleException(e); + } catch (RuntimeException e) { + uncaughtException(Thread.currentThread(), e); } } @@ -324,7 +325,7 @@ void remove() { } } - abstract void doInner() throws Throwable; + abstract void doInner(); } class DirListTask extends ContainerTask { @@ -343,10 +344,12 @@ class DirListTask extends ContainerTask { permitHolder.setTaskSemaphore(dirListSemaphore); } - void doInner() throws Throwable { + void doInner() { try { + checkForRequestCancellation(); DirectoryStream stream = getDirectoryListing(path); for (DirectoryEntry entry : stream) { + checkForRequestCancellation(); LOGGER.debug("{} - DirListTask, directory {}, entry {}", ruid, path, entry.getName()); FsPath childPath = path.child(entry.getName()); @@ -384,6 +387,7 @@ void doInner() throws Throwable { } } + checkForRequestCancellation(); switch (targetType) { case BOTH: case DIR: @@ -402,6 +406,18 @@ void doInner() throws Throwable { SKIPPED, null)); } } + } catch (InterruptedException e) { + /* + * Cancelled. Do nothing. + */ + permitHolder.releaseIfHoldingPermit(); + } catch (BulkServiceException | CacheException e) { + /* + * Fail fast + */ + containerState = ContainerState.STOP; + jobTarget.setErrorObject(e); + update(); } finally { remove(); } @@ -457,27 +473,42 @@ void cancel() { } @Override - void doInner() throws Throwable { - switch (state) { - case RESOLVE_PATH: - resolvePath(); - break; - case FETCH_ATTRIBUTES: - fetchAttributes(); - break; - case HANDLE_DIR_TARGET: - performActivity(); - break; - case HANDLE_TARGET: - default: - switch (depth) { - case NONE: - performActivity(); - break; - default: - handleTarget(); - } - break; + void doInner() { + try { + checkForRequestCancellation(); + switch (state) { + case RESOLVE_PATH: + resolvePath(); + break; + case FETCH_ATTRIBUTES: + fetchAttributes(); + break; + case HANDLE_DIR_TARGET: + performActivity(); + break; + case HANDLE_TARGET: + default: + switch (depth) { + case NONE: + performActivity(); + break; + default: + handleTarget(); + } + break; + } + } catch (InterruptedException e) { + /* + * Cancellation. Do nothing. + */ + permitHolder.releaseIfHoldingPermit(); + } catch (RuntimeException e) { + target.setErrorObject(e); + if (activityFuture == null) { + activityFuture = Futures.immediateFailedFuture(Throwables.getRootCause(e)); + } + handleCompletion(); + uncaughtException(Thread.currentThread(), e); } } @@ -631,6 +662,7 @@ private void handleCompletion() { State state = RUNNING; try { + checkForRequestCancellation(); activity.handleCompletion(target, activityFuture); state = target.getState(); @@ -644,10 +676,15 @@ private void handleCompletion() { } catch (BulkStorageException e) { LOGGER.error("{}, could not store target from result {}, {}, {}: {}.", ruid, target.getId(), target.getPath(), target.getAttributes(), e.toString()); + } catch (InterruptedException e) { + /* + * Cancelled. Do nothing. + */ + return; } - if (state == FAILED && request.isCancelOnFailure()) { - cancel(); + if (state == FAILED && request.isCancelOnFailure() && !jobTarget.isTerminated()) { + BulkRequestContainerJob.this.cancel(); } else { remove(); } @@ -674,15 +711,21 @@ private void storeOrUpdate(Throwable error) { return; } - target.setState(error == null ? RUNNING : FAILED); - target.setErrorObject(error); + if (jobTarget.isTerminated()) { + error = new InterruptedException(); + } + + if (error == null) { + target.setState(RUNNING); + } else { + target.setErrorObject(error); + } try { /* * If this is an insert (id == null), the target id will be updated to what is * returned from the database. */ - checkForRequestCancellation(); targetStore.storeOrUpdate(target); LOGGER.debug("{} - storeOrUpdate, target id {}", ruid, target.getId()); } catch (BulkStorageException e) { @@ -690,9 +733,6 @@ private void storeOrUpdate(Throwable error) { ruid, target.getId(), target.getPath(), target.getAttributes(), e.toString()); error = e; - } catch (InterruptedException e) { - remove(); - return; } if (error != null) { @@ -762,11 +802,14 @@ public void cancel() { */ containerState = ContainerState.STOP; update(CANCELLED); + targetStore.cancelAll(rid); LOGGER.debug("{} - cancel, running {}.", ruid, running.size()); + int count = 0; + /* - * Drain running tasks. Calling task cancel removes the task from the map. + * Drain running tasks. */ while (true) { ContainerTask task; @@ -778,12 +821,12 @@ public void cancel() { task = running.values().iterator().next(); } - task.cancel(); - - LOGGER.debug("{} - cancel: task {} cancelled.", ruid, task.seqNo); + task.cancel(); // removes the task + ++count; } - targetStore.cancelAll(rid); + LOGGER.trace("{} - cancel: {} tasks cancelled; running size: {}.", ruid, count, + running.size()); signalStateChange(); } @@ -794,8 +837,7 @@ public void cancel(long targetId) { ContainerTask task = i.next(); if (task instanceof TargetTask && targetId == ((TargetTask) task).target.getId()) { - task.cancel(); - i.remove(); + task.cancel(); // removes the task break; } } @@ -963,19 +1005,10 @@ public void update(State state) { } private void checkForRequestCancellation() throws InterruptedException { - if (containerState == ContainerState.STOP) { - throw new InterruptedException(); - } - - if (isRunThreadInterrupted()) { + if (containerState == ContainerState.STOP || isRunThreadInterrupted() + || jobTarget.isTerminated()) { throw new InterruptedException(); } - - synchronized (running) { - if (jobTarget.isTerminated()) { - throw new InterruptedException(); - } - } } private void checkTransitionToDirs() { @@ -1038,6 +1071,7 @@ private void processDirTargets() { */ for (DirTarget dirTarget : sorted) { try { + checkForRequestCancellation(); new TargetTask(toTarget(dirTarget.id, dirTarget.pid, dirTarget.path, Optional.of(dirTarget.attributes), CREATED, null), TaskState.HANDLE_DIR_TARGET).performSync(); @@ -1064,6 +1098,7 @@ private void processFileTargets() { for (BulkRequestTarget target : requestTargets) { try { + checkForRequestCancellation(); new TargetTask(target, TaskState.RESOLVE_PATH).submitAsync(); } catch (InterruptedException e) { /* diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BulkRequestTarget.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BulkRequestTarget.java index 15c7189785a..de7d3632be0 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BulkRequestTarget.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BulkRequestTarget.java @@ -301,7 +301,7 @@ public void setErrorObject(Object error) { errorType = errorObject.getClass().getCanonicalName(); errorMessage = errorObject.getMessage(); - setState(State.FAILED); + setState(errorObject instanceof InterruptedException ? State.CANCELLED : State.FAILED); } } diff --git a/skel/share/defaults/bulk.properties b/skel/share/defaults/bulk.properties index e5c077b1933..bdefe91ea12 100644 --- a/skel/share/defaults/bulk.properties +++ b/skel/share/defaults/bulk.properties @@ -175,8 +175,10 @@ bulk.service.poolmanager.timeout=1 bulk.service.qos=${dcache.service.qos} # ---- How long to wait for a response from qos. +# The default timeout is slightly longer in order to support cancellations +# of many requests simultaneously, especially if these requests each have many targets. # -bulk.service.qos.timeout=1 +bulk.service.qos.timeout=5 (one-of?MILLISECONDS|SECONDS|MINUTES|HOURS|DAYS)bulk.service.qos.timeout.unit=MINUTES # ---- How long to wait for a response from the HA leader. diff --git a/skel/share/defaults/qos.properties b/skel/share/defaults/qos.properties index d9b0cd8ffda..6c9259676aa 100644 --- a/skel/share/defaults/qos.properties +++ b/skel/share/defaults/qos.properties @@ -118,9 +118,11 @@ qos.service.pinmanager.timeout=1 (one-of?MILLISECONDS|SECONDS|MINUTES|HOURS|DAYS)qos.service.pinmanager.timeout.unit=MINUTES # ---- Endpoint (cell) settings for contacting pnfs manager. +# Activity in the engine and verifier can be intense so a longer default timeout +# may at times be required. # qos.service.pnfsmanager=${dcache.service.pnfsmanager} -qos.service.pnfsmanager.timeout=1 +qos.service.pnfsmanager.timeout=5 (one-of?MILLISECONDS|SECONDS|MINUTES|HOURS|DAYS)qos.service.pnfsmanager.timeout.unit=MINUTES # ---- Endpoint (cell) settings for contacting pools (destination is dynamic). @@ -134,9 +136,11 @@ qos.service.transition.timeout=1 (one-of?MILLISECONDS|SECONDS|MINUTES|HOURS|DAYS)qos.service.transition.timeout.unit=MINUTES # ---- Main external entry point for qos. +# The verifier requirements requests can accumulate against the engine so a longer +# default timeout is indicated. # qos.service.requirements=${dcache.service.qos} -qos.service.requirements.timeout=1 +qos.service.requirements.timeout=5 (one-of?MILLISECONDS|SECONDS|MINUTES|HOURS|DAYS)qos.service.requirements.timeout.unit=MINUTES # ---- Internal endpoints consumed only by other qos services.