diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java index 9047b348bea..48e072fcd60 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java @@ -94,6 +94,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import javax.security.auth.Subject; @@ -203,17 +204,13 @@ enum TaskState { */ abstract class ContainerTask implements Runnable { final Consumer errorHandler = e -> uncaughtException(Thread.currentThread(), e); - final long seqNo; + final long seqNo = taskCounter.getAndIncrement(); + final AtomicBoolean holdingPermit = new AtomicBoolean(false); Future taskFuture; - boolean holdingPermit; ExecutorService taskExecutor; Semaphore taskSemaphore; - ContainerTask() { - seqNo = taskCounter.getAndIncrement(); - } - public void run() { try { doInner(); @@ -232,44 +229,58 @@ public void run() { void cancel() { if (taskFuture != null) { taskFuture.cancel(true); + LOGGER.debug("{} - task future cancelled {}.", ruid, seqNo); } + remove(); } void expandDepthFirst(Long id, PID pid, FsPath path, FileAttributes dirAttributes) - throws BulkServiceException, CacheException, InterruptedException { + throws BulkServiceException, CacheException { LOGGER.debug("{} - expandDepthFirst, {}, {}, {}, {}", ruid, id, pid, path, dirAttributes); - new DirListTask(id, pid, path, dirAttributes).submitAsync(); + try { + new DirListTask(id, pid, path, dirAttributes).submitAsync(); + } catch (InterruptedException e) { + LOGGER.trace("{} - expandDepthFirst {} interrupted.", ruid, id); + } } void submitAsync() throws InterruptedException { - checkForRequestCancellation(); - - if (!holdingPermit) { + /* + * Acquisition must be done outside the synchronized block (running), + * else there could be a deadlock. + */ + if (holdingPermit.compareAndSet(false, true)) { taskSemaphore.acquire(); - holdingPermit = true; } synchronized (running) { + if (jobTarget.isTerminated()) { + taskSemaphore.release(); + return; + } + running.put(seqNo, this); - LOGGER.debug("{} - submitAsync {}, task count is now {}.", ruid, seqNo, running.size()); - } + LOGGER.debug("{} - submitAsync {}, task count is now {}.", ruid, seqNo, + running.size()); - taskFuture = taskExecutor.submit(this); + taskFuture = taskExecutor.submit(this); + } } void remove() { + if (holdingPermit.compareAndSet(true, false)) { + taskSemaphore.release(); + } + synchronized (running) { running.remove(seqNo); LOGGER.debug("{} - remove task {}, task count now {}.", ruid, seqNo, running.size()); - } - if (holdingPermit) { - taskSemaphore.release(); - holdingPermit = false; + if (running.isEmpty()) { + checkTransitionToDirs(); + } } - - checkTransitionToDirs(); } abstract void doInner() throws Throwable; @@ -392,10 +403,12 @@ class TargetTask extends ContainerTask { void cancel() { if (activityFuture != null) { activityFuture.cancel(true); + LOGGER.debug("{} - activity future cancelled for task {}.", ruid, seqNo); } if (target != null) { activity.cancel(target); + LOGGER.debug("{} - target cancelled for task {}.", ruid, seqNo); } super.cancel(); @@ -441,7 +454,7 @@ void performSync() throws InterruptedException { /** * (1) symlink resolution on initial targets; bypassed for discovered targets. */ - private void resolvePath() throws InterruptedException { + private void resolvePath() { LOGGER.debug("{} - resolvePath, resolving {}", ruid, target.getPath()); PnfsResolveSymlinksMessage message = new PnfsResolveSymlinksMessage( target.getPath().toString(), null); @@ -464,14 +477,8 @@ public void success(PnfsResolveSymlinksMessage message) { @Override public void failure(int rc, Object error) { LOGGER.error("{} - resolvePath, callback failure for {}.", ruid, target); - try { - storeOrUpdate(CacheExceptionFactory.exceptionOf( - rc, Objects.toString(error, null))); - } catch (InterruptedException e) { - errorHandler.accept(e); - } finally { - remove(); - } + storeOrUpdate(CacheExceptionFactory.exceptionOf( + rc, Objects.toString(error, null))); } }, callbackExecutor); } @@ -479,7 +486,7 @@ public void failure(int rc, Object error) { /** * (2) retrieval of required file attributes. */ - private void fetchAttributes() throws InterruptedException { + private void fetchAttributes() { LOGGER.debug("{} - fetchAttributes for path {}", ruid, target.getPath()); PnfsGetFileAttributes message = new PnfsGetFileAttributes(target.getPath().toString(), MINIMALLY_REQUIRED_ATTRIBUTES); @@ -498,14 +505,8 @@ public void success(PnfsGetFileAttributes message) { @Override public void failure(int rc, Object error) { LOGGER.error("{} - fetchAttributes, callback failure for {}.", ruid, target); - try { - storeOrUpdate(CacheExceptionFactory.exceptionOf( - rc, Objects.toString(error, null))); - } catch (InterruptedException e) { - errorHandler.accept(e); - } finally { - remove(); - } + storeOrUpdate(CacheExceptionFactory.exceptionOf( + rc, Objects.toString(error, null))); } }, callbackExecutor); } @@ -552,8 +553,8 @@ private void performActivity(boolean async) throws InterruptedException { storeOrUpdate(null); - if (hasBeenCancelled(this)) { - LOGGER.debug("{} - performActivity hasBeenCancelled for {}.", ruid, path); + if (hasBeenSpecificallyCancelled(this)) { + LOGGER.debug("{} - performActivity hasBeenSpecificallyCancelled for {}.", ruid, path); remove(); } @@ -610,11 +611,11 @@ private void retryFailed() throws BulkStorageException { } } - private void storeOrUpdate(Throwable error) throws InterruptedException { + private void storeOrUpdate(Throwable error) { LOGGER.debug("{} - storeOrUpdate {}.", ruid, target); - if (hasBeenCancelled(this)) { - LOGGER.debug("{} - storeOrUpdate, hasBeenCancelled {}.", ruid, target.getPath()); + if (hasBeenSpecificallyCancelled(this)) { + LOGGER.debug("{} - storeOrUpdate, hasBeenSpecificallyCancelled {}.", ruid, target.getPath()); return; } @@ -626,12 +627,16 @@ private void storeOrUpdate(Throwable error) throws InterruptedException { * If this is an insert (id == null), the target id will be updated to what is * returned from the database. */ + checkForRequestCancellation(); targetStore.storeOrUpdate(target); LOGGER.debug("{} - storeOrUpdate, target id {}", ruid, target.getId()); } catch (BulkStorageException e) { LOGGER.error("{}, could not store or update target from result {}, {}, {}: {}.", ruid, target.getId(), target.getPath(), target.getAttributes(), e.toString()); error = e; + } catch (InterruptedException e) { + remove(); + return; } if (error != null) { @@ -692,22 +697,36 @@ public BulkRequestContainerJob(BulkActivity activity, BulkRequestTarget jobTarge } public void cancel() { + interruptRunThread(); + + /* + * Thread may already have exited. + * + * Update terminates job target. + */ containerState = ContainerState.STOP; + update(CANCELLED); + + LOGGER.debug("{} - cancel, running {}.", ruid, running.size()); - jobTarget.cancel(); + /* + * Drain running tasks. Calling task cancel removes the task from the map. + */ + while (true) { + ContainerTask task; + synchronized (running) { + if (running.isEmpty()) { + break; + } - LOGGER.debug("{} - cancel: target state is now {}.", ruid, jobTarget.getState()); + task = running.values().iterator().next(); + } - interruptRunThread(); + task.cancel(); - synchronized (running) { - LOGGER.debug("{} - cancel: running {}.", ruid, running.size()); - running.values().forEach(ContainerTask::cancel); - LOGGER.debug("{} - cancel: running targets cancelled.", ruid); - running.clear(); + LOGGER.debug("{} - cancel: task {} cancelled.", ruid, task.seqNo); } - LOGGER.debug("{} - cancel: calling cancel all on target store.", ruid); targetStore.cancelAll(rid); signalStateChange(); @@ -773,7 +792,7 @@ public void initialize() { containerState = ContainerState.PROCESS_FILES; } - public synchronized boolean isReady() { + public boolean isReady() { switch (jobTarget.getState()) { case READY: case CREATED: @@ -819,7 +838,12 @@ public void run() { update(CANCELLED); } setRunThread(null); - checkTransitionToDirs(); + + synchronized (running) { + if (running.isEmpty()) { + checkTransitionToDirs(); + } + } } public void setDirListSemaphore(Semaphore dirListSemaphore) { @@ -879,39 +903,43 @@ public void uncaughtException(Thread t, Throwable e) { public void update(State state) { if (jobTarget.setState(state)) { try { - targetStore.update(jobTarget.getId(), jobTarget.getState(), jobTarget.getErrorType(), + targetStore.update(jobTarget.getId(), jobTarget.getState(), + jobTarget.getErrorType(), jobTarget.getErrorMessage()); } catch (BulkStorageException e) { LOGGER.error("{}, updateJobState: {}", ruid, e.toString()); } + signalStateChange(); } } private void checkForRequestCancellation() throws InterruptedException { - if (isRunThreadInterrupted() || containerState == ContainerState.STOP - || jobTarget.isTerminated()) { + if (containerState == ContainerState.STOP) { + throw new InterruptedException(); + } + + if (isRunThreadInterrupted()) { throw new InterruptedException(); } - } - private void checkTransitionToDirs() { synchronized (running) { - if (!running.isEmpty()) { - LOGGER.debug("{} - checkTransitionToDirs, running {}", ruid, running.size()); - return; + if (jobTarget.isTerminated()) { + throw new InterruptedException(); } } + } - synchronized (this) { - if (containerState == ContainerState.WAIT) { - containerState = ContainerState.PROCESS_DIRS; - executor.submit(this); - } + private void checkTransitionToDirs() { + LOGGER.debug("{} - checkTransitionToDirs: {}", ruid, containerState); + if (containerState == ContainerState.WAIT) { + containerState = ContainerState.PROCESS_DIRS; + LOGGER.debug("{} - checkTransitionToDirs is now {}", ruid, containerState); + executor.submit(this); } } - private boolean hasBeenCancelled(TargetTask task) { + private boolean hasBeenSpecificallyCancelled(TargetTask task) { synchronized (cancelledPaths) { BulkRequestTarget target = task.target; if (cancelledPaths.remove(target.getPath().toString())) { @@ -924,7 +952,7 @@ private boolean hasBeenCancelled(TargetTask task) { targetStore.update(target.getId(), CANCELLED, null, null); } } catch (BulkServiceException | UnsupportedOperationException e) { - LOGGER.error("hasBeenCancelled {}, failed for {}: {}", ruid, target.getPath(), + LOGGER.error("hasBeenSpecificallyCancelled {}, failed for {}: {}", ruid, target.getPath(), e.toString()); } return true; @@ -945,7 +973,7 @@ private synchronized boolean isRunThreadInterrupted() { return runThread != null && runThread.isInterrupted(); } - private void processDirTargets() throws InterruptedException { + private void processDirTargets() { if (dirs.isEmpty()) { LOGGER.debug("{} - processDirTargets, nothing to do.", ruid); return; @@ -960,13 +988,20 @@ private void processDirTargets() throws InterruptedException { * Process serially in this thread */ for (DirTarget dirTarget : sorted) { - new TargetTask(toTarget(dirTarget.id, dirTarget.pid, dirTarget.path, - Optional.of(dirTarget.attributes), CREATED, null), - TaskState.HANDLE_DIR_TARGET).performSync(); + try { + new TargetTask(toTarget(dirTarget.id, dirTarget.pid, dirTarget.path, + Optional.of(dirTarget.attributes), CREATED, null), + TaskState.HANDLE_DIR_TARGET).performSync(); + } catch (InterruptedException e) { + /* + * Cancel most likely called; stop processing. + */ + break; + } } } - private void processFileTargets() throws InterruptedException { + private void processFileTargets() { List requestTargets = targetStore.getInitialTargets(rid, true); LOGGER.debug("{} - processFileTargets, initial size {}.", ruid, requestTargets.size()); @@ -979,7 +1014,14 @@ private void processFileTargets() throws InterruptedException { } for (BulkRequestTarget target : requestTargets) { - new TargetTask(target, TaskState.RESOLVE_PATH).submitAsync(); + try { + new TargetTask(target, TaskState.RESOLVE_PATH).submitAsync(); + } catch (InterruptedException e) { + /* + * Cancel most likely called; stop processing. + */ + break; + } } } diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BulkRequestTarget.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BulkRequestTarget.java index 38165b3a094..15c7189785a 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BulkRequestTarget.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BulkRequestTarget.java @@ -260,7 +260,7 @@ public Long getStartedAt() { return startedAt; } - public State getState() { + public synchronized State getState() { return state; }