diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java index 952e8160a8b..7317af31a8d 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java @@ -252,8 +252,8 @@ abstract class ContainerTask implements Runnable { public void run() { try { doInner(); - } catch (Throwable e) { - handleException(e); + } catch (RuntimeException e) { + uncaughtException(Thread.currentThread(), e); } } @@ -277,18 +277,6 @@ void expandDepthFirst(Long id, PID pid, FsPath path, FileAttributes dirAttribute } } - void handleException(Throwable e) { - remove(); - if (e instanceof InterruptedException) { - containerState = ContainerState.STOP; - jobTarget.setErrorObject(e); - update(CANCELLED); - } else { - uncaughtException(Thread.currentThread(), e); - Throwables.throwIfUnchecked(e); - } - } - void submitAsync() throws InterruptedException { /* * Acquisition must be done outside the synchronized block (running), @@ -324,7 +312,7 @@ void remove() { } } - abstract void doInner() throws Throwable; + abstract void doInner(); } class DirListTask extends ContainerTask { @@ -343,10 +331,12 @@ class DirListTask extends ContainerTask { permitHolder.setTaskSemaphore(dirListSemaphore); } - void doInner() throws Throwable { + void doInner() { try { + checkForRequestCancellation(); DirectoryStream stream = getDirectoryListing(path); for (DirectoryEntry entry : stream) { + checkForRequestCancellation(); LOGGER.debug("{} - DirListTask, directory {}, entry {}", ruid, path, entry.getName()); FsPath childPath = path.child(entry.getName()); @@ -384,6 +374,7 @@ void doInner() throws Throwable { } } + checkForRequestCancellation(); switch (targetType) { case BOTH: case DIR: @@ -402,6 +393,18 @@ void doInner() throws Throwable { SKIPPED, null)); } } + } catch (InterruptedException e) { + /* + * Cancelled. Do nothing. + */ + permitHolder.releaseIfHoldingPermit(); + } catch (BulkServiceException | CacheException e) { + /* + * Fail fast + */ + containerState = ContainerState.STOP; + jobTarget.setErrorObject(e); + update(); } finally { remove(); } @@ -457,37 +460,43 @@ void cancel() { } @Override - void doInner() throws Throwable { - switch (state) { - case RESOLVE_PATH: - resolvePath(); - break; - case FETCH_ATTRIBUTES: - fetchAttributes(); - break; - case HANDLE_DIR_TARGET: - performActivity(); - break; - case HANDLE_TARGET: - default: - switch (depth) { - case NONE: - performActivity(); - break; - default: - handleTarget(); - } - break; - } - } - - void handleException(Throwable e) { - target.setErrorObject(e); - if (activityFuture == null) { - activityFuture = Futures.immediateFailedFuture(Throwables.getRootCause(e)); + void doInner() { + try { + checkForRequestCancellation(); + switch (state) { + case RESOLVE_PATH: + resolvePath(); + break; + case FETCH_ATTRIBUTES: + fetchAttributes(); + break; + case HANDLE_DIR_TARGET: + performActivity(); + break; + case HANDLE_TARGET: + default: + switch (depth) { + case NONE: + performActivity(); + break; + default: + handleTarget(); + } + break; + } + } catch (InterruptedException e) { + /* + * Cancellation. Do nothing. + */ + permitHolder.releaseIfHoldingPermit(); + } catch (RuntimeException e) { + target.setErrorObject(e); + if (activityFuture == null) { + activityFuture = Futures.immediateFailedFuture(Throwables.getRootCause(e)); + } + handleCompletion(); + uncaughtException(Thread.currentThread(), e); } - handleCompletion(); - super.handleException(e); } void performSync() throws InterruptedException { @@ -631,6 +640,7 @@ private void handleCompletion() { State state = RUNNING; try { + checkForRequestCancellation(); activity.handleCompletion(target, activityFuture); state = target.getState(); @@ -644,10 +654,15 @@ private void handleCompletion() { } catch (BulkStorageException e) { LOGGER.error("{}, could not store target from result {}, {}, {}: {}.", ruid, target.getId(), target.getPath(), target.getAttributes(), e.toString()); + } catch (InterruptedException e) { + /* + * Cancelled. Do nothing. + */ + return; } - if (state == FAILED && request.isCancelOnFailure()) { - cancel(); + if (state == FAILED && request.isCancelOnFailure() && !jobTarget.isTerminated()) { + BulkRequestContainerJob.this.cancel(); } else { remove(); } @@ -674,15 +689,21 @@ private void storeOrUpdate(Throwable error) { return; } - target.setState(error == null ? RUNNING : FAILED); - target.setErrorObject(error); + if (jobTarget.isTerminated()) { + error = new InterruptedException(); + } + + if (error == null) { + target.setState(RUNNING); + } else { + target.setErrorObject(error); + } try { /* * If this is an insert (id == null), the target id will be updated to what is * returned from the database. */ - checkForRequestCancellation(); targetStore.storeOrUpdate(target); LOGGER.debug("{} - storeOrUpdate, target id {}", ruid, target.getId()); } catch (BulkStorageException e) { @@ -690,9 +711,6 @@ private void storeOrUpdate(Throwable error) { ruid, target.getId(), target.getPath(), target.getAttributes(), e.toString()); error = e; - } catch (InterruptedException e) { - remove(); - return; } if (error != null) { @@ -762,11 +780,14 @@ public void cancel() { */ containerState = ContainerState.STOP; update(CANCELLED); + targetStore.cancelAll(rid); LOGGER.debug("{} - cancel, running {}.", ruid, running.size()); + int count = 0; + /* - * Drain running tasks. Calling task cancel removes the task from the map. + * Drain running tasks. */ while (true) { ContainerTask task; @@ -778,12 +799,12 @@ public void cancel() { task = running.values().iterator().next(); } - task.cancel(); - - LOGGER.debug("{} - cancel: task {} cancelled.", ruid, task.seqNo); + task.cancel(); // removes the task + ++count; } - targetStore.cancelAll(rid); + LOGGER.trace("{} - cancel: {} tasks cancelled; running size: {}.", ruid, count, + running.size()); signalStateChange(); } @@ -794,8 +815,7 @@ public void cancel(long targetId) { ContainerTask task = i.next(); if (task instanceof TargetTask && targetId == ((TargetTask) task).target.getId()) { - task.cancel(); - i.remove(); + task.cancel(); // removes the task break; } } @@ -963,19 +983,10 @@ public void update(State state) { } private void checkForRequestCancellation() throws InterruptedException { - if (containerState == ContainerState.STOP) { - throw new InterruptedException(); - } - - if (isRunThreadInterrupted()) { + if (containerState == ContainerState.STOP || isRunThreadInterrupted() + || jobTarget.isTerminated()) { throw new InterruptedException(); } - - synchronized (running) { - if (jobTarget.isTerminated()) { - throw new InterruptedException(); - } - } } private void checkTransitionToDirs() { @@ -1038,6 +1049,7 @@ private void processDirTargets() { */ for (DirTarget dirTarget : sorted) { try { + checkForRequestCancellation(); new TargetTask(toTarget(dirTarget.id, dirTarget.pid, dirTarget.path, Optional.of(dirTarget.attributes), CREATED, null), TaskState.HANDLE_DIR_TARGET).performSync(); @@ -1064,6 +1076,7 @@ private void processFileTargets() { for (BulkRequestTarget target : requestTargets) { try { + checkForRequestCancellation(); new TargetTask(target, TaskState.RESOLVE_PATH).submitAsync(); } catch (InterruptedException e) { /* diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BulkRequestTarget.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BulkRequestTarget.java index 15c7189785a..de7d3632be0 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BulkRequestTarget.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BulkRequestTarget.java @@ -301,7 +301,7 @@ public void setErrorObject(Object error) { errorType = errorObject.getClass().getCanonicalName(); errorMessage = errorObject.getMessage(); - setState(State.FAILED); + setState(errorObject instanceof InterruptedException ? State.CANCELLED : State.FAILED); } } diff --git a/skel/share/defaults/bulk.properties b/skel/share/defaults/bulk.properties index e5c077b1933..bdefe91ea12 100644 --- a/skel/share/defaults/bulk.properties +++ b/skel/share/defaults/bulk.properties @@ -175,8 +175,10 @@ bulk.service.poolmanager.timeout=1 bulk.service.qos=${dcache.service.qos} # ---- How long to wait for a response from qos. +# The default timeout is slightly longer in order to support cancellations +# of many requests simultaneously, especially if these requests each have many targets. # -bulk.service.qos.timeout=1 +bulk.service.qos.timeout=5 (one-of?MILLISECONDS|SECONDS|MINUTES|HOURS|DAYS)bulk.service.qos.timeout.unit=MINUTES # ---- How long to wait for a response from the HA leader. diff --git a/skel/share/defaults/qos.properties b/skel/share/defaults/qos.properties index d9b0cd8ffda..6c9259676aa 100644 --- a/skel/share/defaults/qos.properties +++ b/skel/share/defaults/qos.properties @@ -118,9 +118,11 @@ qos.service.pinmanager.timeout=1 (one-of?MILLISECONDS|SECONDS|MINUTES|HOURS|DAYS)qos.service.pinmanager.timeout.unit=MINUTES # ---- Endpoint (cell) settings for contacting pnfs manager. +# Activity in the engine and verifier can be intense so a longer default timeout +# may at times be required. # qos.service.pnfsmanager=${dcache.service.pnfsmanager} -qos.service.pnfsmanager.timeout=1 +qos.service.pnfsmanager.timeout=5 (one-of?MILLISECONDS|SECONDS|MINUTES|HOURS|DAYS)qos.service.pnfsmanager.timeout.unit=MINUTES # ---- Endpoint (cell) settings for contacting pools (destination is dynamic). @@ -134,9 +136,11 @@ qos.service.transition.timeout=1 (one-of?MILLISECONDS|SECONDS|MINUTES|HOURS|DAYS)qos.service.transition.timeout.unit=MINUTES # ---- Main external entry point for qos. +# The verifier requirements requests can accumulate against the engine so a longer +# default timeout is indicated. # qos.service.requirements=${dcache.service.qos} -qos.service.requirements.timeout=1 +qos.service.requirements.timeout=5 (one-of?MILLISECONDS|SECONDS|MINUTES|HOURS|DAYS)qos.service.requirements.timeout.unit=MINUTES # ---- Internal endpoints consumed only by other qos services.