From 8cac6fa53e66b78a5ee8113af57af0317dff2d95 Mon Sep 17 00:00:00 2001 From: Albert Louis Rossi Date: Tue, 10 Oct 2023 08:51:06 -0500 Subject: [PATCH] dcache-bulk: fix cancellation issues Motivation: Changes introduced by: https://rb.dcache.org/r/14115 master@8a5c358af45586383d87873952407c646f81f4c6 and https://rb.dcache.org/r/14118/ master@0b4140b454b819de55c7c412539294537ff0beb8 inadvertently introduced a regression in request cancellation. In addition, the semaphore release was not always taking place. Modification: The semaphore issue is addressed by selective modifications of synchronization on the `running` map. The problem with cancellation (hanging perpetually in the CANCELLING state) was solved by eliminating the re-entrance on the synchronization block when cancelling running tasks. We have also modified the `cancel` procedure to interrupt the main thread first and then set container state. We also changed the main target loops not to throw the interrupted exception but simply to stop processing. Result: Testing of cancellations for both recursive and non-recursive tasks show correctness of semaphore count and completion of the cancellation. Target: master Request: 9.2 (a definite bug!) Patch: https://rb.dcache.org/r/14127/ Depends-on: #14126 Requires-notes: yes (if 9.2 released before this is applied) Acked-by: Tigran --- .../bulk/job/BulkRequestContainerJob.java | 194 +++++++++++------- .../services/bulk/util/BulkRequestTarget.java | 2 +- 2 files changed, 119 insertions(+), 77 deletions(-) diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java index 9047b348bea..48e072fcd60 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java @@ -94,6 +94,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import javax.security.auth.Subject; @@ -203,17 +204,13 @@ enum TaskState { */ abstract class ContainerTask implements Runnable { final Consumer errorHandler = e -> uncaughtException(Thread.currentThread(), e); - final long seqNo; + final long seqNo = taskCounter.getAndIncrement(); + final AtomicBoolean holdingPermit = new AtomicBoolean(false); Future taskFuture; - boolean holdingPermit; ExecutorService taskExecutor; Semaphore taskSemaphore; - ContainerTask() { - seqNo = taskCounter.getAndIncrement(); - } - public void run() { try { doInner(); @@ -232,44 +229,58 @@ public void run() { void cancel() { if (taskFuture != null) { taskFuture.cancel(true); + LOGGER.debug("{} - task future cancelled {}.", ruid, seqNo); } + remove(); } void expandDepthFirst(Long id, PID pid, FsPath path, FileAttributes dirAttributes) - throws BulkServiceException, CacheException, InterruptedException { + throws BulkServiceException, CacheException { LOGGER.debug("{} - expandDepthFirst, {}, {}, {}, {}", ruid, id, pid, path, dirAttributes); - new DirListTask(id, pid, path, dirAttributes).submitAsync(); + try { + new DirListTask(id, pid, path, dirAttributes).submitAsync(); + } catch (InterruptedException e) { + LOGGER.trace("{} - expandDepthFirst {} interrupted.", ruid, id); + } } void submitAsync() throws InterruptedException { - checkForRequestCancellation(); - - if (!holdingPermit) { + /* + * Acquisition must be done outside the synchronized block (running), + * else there could be a deadlock. + */ + if (holdingPermit.compareAndSet(false, true)) { taskSemaphore.acquire(); - holdingPermit = true; } synchronized (running) { + if (jobTarget.isTerminated()) { + taskSemaphore.release(); + return; + } + running.put(seqNo, this); - LOGGER.debug("{} - submitAsync {}, task count is now {}.", ruid, seqNo, running.size()); - } + LOGGER.debug("{} - submitAsync {}, task count is now {}.", ruid, seqNo, + running.size()); - taskFuture = taskExecutor.submit(this); + taskFuture = taskExecutor.submit(this); + } } void remove() { + if (holdingPermit.compareAndSet(true, false)) { + taskSemaphore.release(); + } + synchronized (running) { running.remove(seqNo); LOGGER.debug("{} - remove task {}, task count now {}.", ruid, seqNo, running.size()); - } - if (holdingPermit) { - taskSemaphore.release(); - holdingPermit = false; + if (running.isEmpty()) { + checkTransitionToDirs(); + } } - - checkTransitionToDirs(); } abstract void doInner() throws Throwable; @@ -392,10 +403,12 @@ class TargetTask extends ContainerTask { void cancel() { if (activityFuture != null) { activityFuture.cancel(true); + LOGGER.debug("{} - activity future cancelled for task {}.", ruid, seqNo); } if (target != null) { activity.cancel(target); + LOGGER.debug("{} - target cancelled for task {}.", ruid, seqNo); } super.cancel(); @@ -441,7 +454,7 @@ void performSync() throws InterruptedException { /** * (1) symlink resolution on initial targets; bypassed for discovered targets. */ - private void resolvePath() throws InterruptedException { + private void resolvePath() { LOGGER.debug("{} - resolvePath, resolving {}", ruid, target.getPath()); PnfsResolveSymlinksMessage message = new PnfsResolveSymlinksMessage( target.getPath().toString(), null); @@ -464,14 +477,8 @@ public void success(PnfsResolveSymlinksMessage message) { @Override public void failure(int rc, Object error) { LOGGER.error("{} - resolvePath, callback failure for {}.", ruid, target); - try { - storeOrUpdate(CacheExceptionFactory.exceptionOf( - rc, Objects.toString(error, null))); - } catch (InterruptedException e) { - errorHandler.accept(e); - } finally { - remove(); - } + storeOrUpdate(CacheExceptionFactory.exceptionOf( + rc, Objects.toString(error, null))); } }, callbackExecutor); } @@ -479,7 +486,7 @@ public void failure(int rc, Object error) { /** * (2) retrieval of required file attributes. */ - private void fetchAttributes() throws InterruptedException { + private void fetchAttributes() { LOGGER.debug("{} - fetchAttributes for path {}", ruid, target.getPath()); PnfsGetFileAttributes message = new PnfsGetFileAttributes(target.getPath().toString(), MINIMALLY_REQUIRED_ATTRIBUTES); @@ -498,14 +505,8 @@ public void success(PnfsGetFileAttributes message) { @Override public void failure(int rc, Object error) { LOGGER.error("{} - fetchAttributes, callback failure for {}.", ruid, target); - try { - storeOrUpdate(CacheExceptionFactory.exceptionOf( - rc, Objects.toString(error, null))); - } catch (InterruptedException e) { - errorHandler.accept(e); - } finally { - remove(); - } + storeOrUpdate(CacheExceptionFactory.exceptionOf( + rc, Objects.toString(error, null))); } }, callbackExecutor); } @@ -552,8 +553,8 @@ private void performActivity(boolean async) throws InterruptedException { storeOrUpdate(null); - if (hasBeenCancelled(this)) { - LOGGER.debug("{} - performActivity hasBeenCancelled for {}.", ruid, path); + if (hasBeenSpecificallyCancelled(this)) { + LOGGER.debug("{} - performActivity hasBeenSpecificallyCancelled for {}.", ruid, path); remove(); } @@ -610,11 +611,11 @@ private void retryFailed() throws BulkStorageException { } } - private void storeOrUpdate(Throwable error) throws InterruptedException { + private void storeOrUpdate(Throwable error) { LOGGER.debug("{} - storeOrUpdate {}.", ruid, target); - if (hasBeenCancelled(this)) { - LOGGER.debug("{} - storeOrUpdate, hasBeenCancelled {}.", ruid, target.getPath()); + if (hasBeenSpecificallyCancelled(this)) { + LOGGER.debug("{} - storeOrUpdate, hasBeenSpecificallyCancelled {}.", ruid, target.getPath()); return; } @@ -626,12 +627,16 @@ private void storeOrUpdate(Throwable error) throws InterruptedException { * If this is an insert (id == null), the target id will be updated to what is * returned from the database. */ + checkForRequestCancellation(); targetStore.storeOrUpdate(target); LOGGER.debug("{} - storeOrUpdate, target id {}", ruid, target.getId()); } catch (BulkStorageException e) { LOGGER.error("{}, could not store or update target from result {}, {}, {}: {}.", ruid, target.getId(), target.getPath(), target.getAttributes(), e.toString()); error = e; + } catch (InterruptedException e) { + remove(); + return; } if (error != null) { @@ -692,22 +697,36 @@ public BulkRequestContainerJob(BulkActivity activity, BulkRequestTarget jobTarge } public void cancel() { + interruptRunThread(); + + /* + * Thread may already have exited. + * + * Update terminates job target. + */ containerState = ContainerState.STOP; + update(CANCELLED); + + LOGGER.debug("{} - cancel, running {}.", ruid, running.size()); - jobTarget.cancel(); + /* + * Drain running tasks. Calling task cancel removes the task from the map. + */ + while (true) { + ContainerTask task; + synchronized (running) { + if (running.isEmpty()) { + break; + } - LOGGER.debug("{} - cancel: target state is now {}.", ruid, jobTarget.getState()); + task = running.values().iterator().next(); + } - interruptRunThread(); + task.cancel(); - synchronized (running) { - LOGGER.debug("{} - cancel: running {}.", ruid, running.size()); - running.values().forEach(ContainerTask::cancel); - LOGGER.debug("{} - cancel: running targets cancelled.", ruid); - running.clear(); + LOGGER.debug("{} - cancel: task {} cancelled.", ruid, task.seqNo); } - LOGGER.debug("{} - cancel: calling cancel all on target store.", ruid); targetStore.cancelAll(rid); signalStateChange(); @@ -773,7 +792,7 @@ public void initialize() { containerState = ContainerState.PROCESS_FILES; } - public synchronized boolean isReady() { + public boolean isReady() { switch (jobTarget.getState()) { case READY: case CREATED: @@ -819,7 +838,12 @@ public void run() { update(CANCELLED); } setRunThread(null); - checkTransitionToDirs(); + + synchronized (running) { + if (running.isEmpty()) { + checkTransitionToDirs(); + } + } } public void setDirListSemaphore(Semaphore dirListSemaphore) { @@ -879,39 +903,43 @@ public void uncaughtException(Thread t, Throwable e) { public void update(State state) { if (jobTarget.setState(state)) { try { - targetStore.update(jobTarget.getId(), jobTarget.getState(), jobTarget.getErrorType(), + targetStore.update(jobTarget.getId(), jobTarget.getState(), + jobTarget.getErrorType(), jobTarget.getErrorMessage()); } catch (BulkStorageException e) { LOGGER.error("{}, updateJobState: {}", ruid, e.toString()); } + signalStateChange(); } } private void checkForRequestCancellation() throws InterruptedException { - if (isRunThreadInterrupted() || containerState == ContainerState.STOP - || jobTarget.isTerminated()) { + if (containerState == ContainerState.STOP) { + throw new InterruptedException(); + } + + if (isRunThreadInterrupted()) { throw new InterruptedException(); } - } - private void checkTransitionToDirs() { synchronized (running) { - if (!running.isEmpty()) { - LOGGER.debug("{} - checkTransitionToDirs, running {}", ruid, running.size()); - return; + if (jobTarget.isTerminated()) { + throw new InterruptedException(); } } + } - synchronized (this) { - if (containerState == ContainerState.WAIT) { - containerState = ContainerState.PROCESS_DIRS; - executor.submit(this); - } + private void checkTransitionToDirs() { + LOGGER.debug("{} - checkTransitionToDirs: {}", ruid, containerState); + if (containerState == ContainerState.WAIT) { + containerState = ContainerState.PROCESS_DIRS; + LOGGER.debug("{} - checkTransitionToDirs is now {}", ruid, containerState); + executor.submit(this); } } - private boolean hasBeenCancelled(TargetTask task) { + private boolean hasBeenSpecificallyCancelled(TargetTask task) { synchronized (cancelledPaths) { BulkRequestTarget target = task.target; if (cancelledPaths.remove(target.getPath().toString())) { @@ -924,7 +952,7 @@ private boolean hasBeenCancelled(TargetTask task) { targetStore.update(target.getId(), CANCELLED, null, null); } } catch (BulkServiceException | UnsupportedOperationException e) { - LOGGER.error("hasBeenCancelled {}, failed for {}: {}", ruid, target.getPath(), + LOGGER.error("hasBeenSpecificallyCancelled {}, failed for {}: {}", ruid, target.getPath(), e.toString()); } return true; @@ -945,7 +973,7 @@ private synchronized boolean isRunThreadInterrupted() { return runThread != null && runThread.isInterrupted(); } - private void processDirTargets() throws InterruptedException { + private void processDirTargets() { if (dirs.isEmpty()) { LOGGER.debug("{} - processDirTargets, nothing to do.", ruid); return; @@ -960,13 +988,20 @@ private void processDirTargets() throws InterruptedException { * Process serially in this thread */ for (DirTarget dirTarget : sorted) { - new TargetTask(toTarget(dirTarget.id, dirTarget.pid, dirTarget.path, - Optional.of(dirTarget.attributes), CREATED, null), - TaskState.HANDLE_DIR_TARGET).performSync(); + try { + new TargetTask(toTarget(dirTarget.id, dirTarget.pid, dirTarget.path, + Optional.of(dirTarget.attributes), CREATED, null), + TaskState.HANDLE_DIR_TARGET).performSync(); + } catch (InterruptedException e) { + /* + * Cancel most likely called; stop processing. + */ + break; + } } } - private void processFileTargets() throws InterruptedException { + private void processFileTargets() { List requestTargets = targetStore.getInitialTargets(rid, true); LOGGER.debug("{} - processFileTargets, initial size {}.", ruid, requestTargets.size()); @@ -979,7 +1014,14 @@ private void processFileTargets() throws InterruptedException { } for (BulkRequestTarget target : requestTargets) { - new TargetTask(target, TaskState.RESOLVE_PATH).submitAsync(); + try { + new TargetTask(target, TaskState.RESOLVE_PATH).submitAsync(); + } catch (InterruptedException e) { + /* + * Cancel most likely called; stop processing. + */ + break; + } } } diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BulkRequestTarget.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BulkRequestTarget.java index 38165b3a094..15c7189785a 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BulkRequestTarget.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BulkRequestTarget.java @@ -260,7 +260,7 @@ public Long getStartedAt() { return startedAt; } - public State getState() { + public synchronized State getState() { return state; }