Skip to content

Commit

Permalink
dcache-bulk: fix cancellation issues
Browse files Browse the repository at this point in the history
Motivation:

Changes introduced by:

https://rb.dcache.org/r/14115
master@8a5c358af45586383d87873952407c646f81f4c6

and

https://rb.dcache.org/r/14118/
master@0b4140b454b819de55c7c412539294537ff0beb8

inadvertently introduced a regression in
request cancellation.   In addition, the
semaphore release was not always taking
place.

Modification:

The semaphore issue is addressed by
selective modifications of synchronization
on the `running` map.

The problem with cancellation (hanging
perpetually in the CANCELLING state)
was solved by eliminating the re-entrance
on the synchronization block when
cancelling running tasks.  We have
also modified the `cancel` procedure
to interrupt the main thread first and
then set container state.  We also
changed the main target loops not
to throw the interrupted exception
but simply to stop processing.

Result:

Testing of cancellations for
both recursive and non-recursive
tasks show correctness of semaphore
count and completion of the cancellation.

Target: master
Request: 9.2 (a definite bug!)
Patch: https://rb.dcache.org/r/14127/
Depends-on: #14126
Requires-notes: yes (if 9.2 released before this is applied)
Acked-by: Tigran
  • Loading branch information
alrossi authored and lemora committed Oct 12, 2023
1 parent 66b67ea commit 8cac6fa
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import javax.security.auth.Subject;
Expand Down Expand Up @@ -203,17 +204,13 @@ enum TaskState {
*/
abstract class ContainerTask implements Runnable {
final Consumer<Throwable> errorHandler = e -> uncaughtException(Thread.currentThread(), e);
final long seqNo;
final long seqNo = taskCounter.getAndIncrement();
final AtomicBoolean holdingPermit = new AtomicBoolean(false);

Future taskFuture;
boolean holdingPermit;
ExecutorService taskExecutor;
Semaphore taskSemaphore;

ContainerTask() {
seqNo = taskCounter.getAndIncrement();
}

public void run() {
try {
doInner();
Expand All @@ -232,44 +229,58 @@ public void run() {
void cancel() {
if (taskFuture != null) {
taskFuture.cancel(true);
LOGGER.debug("{} - task future cancelled {}.", ruid, seqNo);
}

remove();
}

void expandDepthFirst(Long id, PID pid, FsPath path, FileAttributes dirAttributes)
throws BulkServiceException, CacheException, InterruptedException {
throws BulkServiceException, CacheException {
LOGGER.debug("{} - expandDepthFirst, {}, {}, {}, {}", ruid, id, pid, path, dirAttributes);
new DirListTask(id, pid, path, dirAttributes).submitAsync();
try {
new DirListTask(id, pid, path, dirAttributes).submitAsync();
} catch (InterruptedException e) {
LOGGER.trace("{} - expandDepthFirst {} interrupted.", ruid, id);
}
}

void submitAsync() throws InterruptedException {
checkForRequestCancellation();

if (!holdingPermit) {
/*
* Acquisition must be done outside the synchronized block (running),
* else there could be a deadlock.
*/
if (holdingPermit.compareAndSet(false, true)) {
taskSemaphore.acquire();
holdingPermit = true;
}

synchronized (running) {
if (jobTarget.isTerminated()) {
taskSemaphore.release();
return;
}

running.put(seqNo, this);
LOGGER.debug("{} - submitAsync {}, task count is now {}.", ruid, seqNo, running.size());
}
LOGGER.debug("{} - submitAsync {}, task count is now {}.", ruid, seqNo,
running.size());

taskFuture = taskExecutor.submit(this);
taskFuture = taskExecutor.submit(this);
}
}

void remove() {
if (holdingPermit.compareAndSet(true, false)) {
taskSemaphore.release();
}

synchronized (running) {
running.remove(seqNo);
LOGGER.debug("{} - remove task {}, task count now {}.", ruid, seqNo, running.size());
}

if (holdingPermit) {
taskSemaphore.release();
holdingPermit = false;
if (running.isEmpty()) {
checkTransitionToDirs();
}
}

checkTransitionToDirs();
}

abstract void doInner() throws Throwable;
Expand Down Expand Up @@ -392,10 +403,12 @@ class TargetTask extends ContainerTask {
void cancel() {
if (activityFuture != null) {
activityFuture.cancel(true);
LOGGER.debug("{} - activity future cancelled for task {}.", ruid, seqNo);
}

if (target != null) {
activity.cancel(target);
LOGGER.debug("{} - target cancelled for task {}.", ruid, seqNo);
}

super.cancel();
Expand Down Expand Up @@ -441,7 +454,7 @@ void performSync() throws InterruptedException {
/**
* (1) symlink resolution on initial targets; bypassed for discovered targets.
*/
private void resolvePath() throws InterruptedException {
private void resolvePath() {
LOGGER.debug("{} - resolvePath, resolving {}", ruid, target.getPath());
PnfsResolveSymlinksMessage message = new PnfsResolveSymlinksMessage(
target.getPath().toString(), null);
Expand All @@ -464,22 +477,16 @@ public void success(PnfsResolveSymlinksMessage message) {
@Override
public void failure(int rc, Object error) {
LOGGER.error("{} - resolvePath, callback failure for {}.", ruid, target);
try {
storeOrUpdate(CacheExceptionFactory.exceptionOf(
rc, Objects.toString(error, null)));
} catch (InterruptedException e) {
errorHandler.accept(e);
} finally {
remove();
}
storeOrUpdate(CacheExceptionFactory.exceptionOf(
rc, Objects.toString(error, null)));
}
}, callbackExecutor);
}

/**
* (2) retrieval of required file attributes.
*/
private void fetchAttributes() throws InterruptedException {
private void fetchAttributes() {
LOGGER.debug("{} - fetchAttributes for path {}", ruid, target.getPath());
PnfsGetFileAttributes message = new PnfsGetFileAttributes(target.getPath().toString(),
MINIMALLY_REQUIRED_ATTRIBUTES);
Expand All @@ -498,14 +505,8 @@ public void success(PnfsGetFileAttributes message) {
@Override
public void failure(int rc, Object error) {
LOGGER.error("{} - fetchAttributes, callback failure for {}.", ruid, target);
try {
storeOrUpdate(CacheExceptionFactory.exceptionOf(
rc, Objects.toString(error, null)));
} catch (InterruptedException e) {
errorHandler.accept(e);
} finally {
remove();
}
storeOrUpdate(CacheExceptionFactory.exceptionOf(
rc, Objects.toString(error, null)));
}
}, callbackExecutor);
}
Expand Down Expand Up @@ -552,8 +553,8 @@ private void performActivity(boolean async) throws InterruptedException {

storeOrUpdate(null);

if (hasBeenCancelled(this)) {
LOGGER.debug("{} - performActivity hasBeenCancelled for {}.", ruid, path);
if (hasBeenSpecificallyCancelled(this)) {
LOGGER.debug("{} - performActivity hasBeenSpecificallyCancelled for {}.", ruid, path);
remove();
}

Expand Down Expand Up @@ -610,11 +611,11 @@ private void retryFailed() throws BulkStorageException {
}
}

private void storeOrUpdate(Throwable error) throws InterruptedException {
private void storeOrUpdate(Throwable error) {
LOGGER.debug("{} - storeOrUpdate {}.", ruid, target);

if (hasBeenCancelled(this)) {
LOGGER.debug("{} - storeOrUpdate, hasBeenCancelled {}.", ruid, target.getPath());
if (hasBeenSpecificallyCancelled(this)) {
LOGGER.debug("{} - storeOrUpdate, hasBeenSpecificallyCancelled {}.", ruid, target.getPath());
return;
}

Expand All @@ -626,12 +627,16 @@ private void storeOrUpdate(Throwable error) throws InterruptedException {
* If this is an insert (id == null), the target id will be updated to what is
* returned from the database.
*/
checkForRequestCancellation();
targetStore.storeOrUpdate(target);
LOGGER.debug("{} - storeOrUpdate, target id {}", ruid, target.getId());
} catch (BulkStorageException e) {
LOGGER.error("{}, could not store or update target from result {}, {}, {}: {}.", ruid,
target.getId(), target.getPath(), target.getAttributes(), e.toString());
error = e;
} catch (InterruptedException e) {
remove();
return;
}

if (error != null) {
Expand Down Expand Up @@ -692,22 +697,36 @@ public BulkRequestContainerJob(BulkActivity activity, BulkRequestTarget jobTarge
}

public void cancel() {
interruptRunThread();

/*
* Thread may already have exited.
*
* Update terminates job target.
*/
containerState = ContainerState.STOP;
update(CANCELLED);

LOGGER.debug("{} - cancel, running {}.", ruid, running.size());

jobTarget.cancel();
/*
* Drain running tasks. Calling task cancel removes the task from the map.
*/
while (true) {
ContainerTask task;
synchronized (running) {
if (running.isEmpty()) {
break;
}

LOGGER.debug("{} - cancel: target state is now {}.", ruid, jobTarget.getState());
task = running.values().iterator().next();
}

interruptRunThread();
task.cancel();

synchronized (running) {
LOGGER.debug("{} - cancel: running {}.", ruid, running.size());
running.values().forEach(ContainerTask::cancel);
LOGGER.debug("{} - cancel: running targets cancelled.", ruid);
running.clear();
LOGGER.debug("{} - cancel: task {} cancelled.", ruid, task.seqNo);
}

LOGGER.debug("{} - cancel: calling cancel all on target store.", ruid);
targetStore.cancelAll(rid);

signalStateChange();
Expand Down Expand Up @@ -773,7 +792,7 @@ public void initialize() {
containerState = ContainerState.PROCESS_FILES;
}

public synchronized boolean isReady() {
public boolean isReady() {
switch (jobTarget.getState()) {
case READY:
case CREATED:
Expand Down Expand Up @@ -819,7 +838,12 @@ public void run() {
update(CANCELLED);
}
setRunThread(null);
checkTransitionToDirs();

synchronized (running) {
if (running.isEmpty()) {
checkTransitionToDirs();
}
}
}

public void setDirListSemaphore(Semaphore dirListSemaphore) {
Expand Down Expand Up @@ -879,39 +903,43 @@ public void uncaughtException(Thread t, Throwable e) {
public void update(State state) {
if (jobTarget.setState(state)) {
try {
targetStore.update(jobTarget.getId(), jobTarget.getState(), jobTarget.getErrorType(),
targetStore.update(jobTarget.getId(), jobTarget.getState(),
jobTarget.getErrorType(),
jobTarget.getErrorMessage());
} catch (BulkStorageException e) {
LOGGER.error("{}, updateJobState: {}", ruid, e.toString());
}

signalStateChange();
}
}

private void checkForRequestCancellation() throws InterruptedException {
if (isRunThreadInterrupted() || containerState == ContainerState.STOP
|| jobTarget.isTerminated()) {
if (containerState == ContainerState.STOP) {
throw new InterruptedException();
}

if (isRunThreadInterrupted()) {
throw new InterruptedException();
}
}

private void checkTransitionToDirs() {
synchronized (running) {
if (!running.isEmpty()) {
LOGGER.debug("{} - checkTransitionToDirs, running {}", ruid, running.size());
return;
if (jobTarget.isTerminated()) {
throw new InterruptedException();
}
}
}

synchronized (this) {
if (containerState == ContainerState.WAIT) {
containerState = ContainerState.PROCESS_DIRS;
executor.submit(this);
}
private void checkTransitionToDirs() {
LOGGER.debug("{} - checkTransitionToDirs: {}", ruid, containerState);
if (containerState == ContainerState.WAIT) {
containerState = ContainerState.PROCESS_DIRS;
LOGGER.debug("{} - checkTransitionToDirs is now {}", ruid, containerState);
executor.submit(this);
}
}

private boolean hasBeenCancelled(TargetTask task) {
private boolean hasBeenSpecificallyCancelled(TargetTask task) {
synchronized (cancelledPaths) {
BulkRequestTarget target = task.target;
if (cancelledPaths.remove(target.getPath().toString())) {
Expand All @@ -924,7 +952,7 @@ private boolean hasBeenCancelled(TargetTask task) {
targetStore.update(target.getId(), CANCELLED, null, null);
}
} catch (BulkServiceException | UnsupportedOperationException e) {
LOGGER.error("hasBeenCancelled {}, failed for {}: {}", ruid, target.getPath(),
LOGGER.error("hasBeenSpecificallyCancelled {}, failed for {}: {}", ruid, target.getPath(),
e.toString());
}
return true;
Expand All @@ -945,7 +973,7 @@ private synchronized boolean isRunThreadInterrupted() {
return runThread != null && runThread.isInterrupted();
}

private void processDirTargets() throws InterruptedException {
private void processDirTargets() {
if (dirs.isEmpty()) {
LOGGER.debug("{} - processDirTargets, nothing to do.", ruid);
return;
Expand All @@ -960,13 +988,20 @@ private void processDirTargets() throws InterruptedException {
* Process serially in this thread
*/
for (DirTarget dirTarget : sorted) {
new TargetTask(toTarget(dirTarget.id, dirTarget.pid, dirTarget.path,
Optional.of(dirTarget.attributes), CREATED, null),
TaskState.HANDLE_DIR_TARGET).performSync();
try {
new TargetTask(toTarget(dirTarget.id, dirTarget.pid, dirTarget.path,
Optional.of(dirTarget.attributes), CREATED, null),
TaskState.HANDLE_DIR_TARGET).performSync();
} catch (InterruptedException e) {
/*
* Cancel most likely called; stop processing.
*/
break;
}
}
}

private void processFileTargets() throws InterruptedException {
private void processFileTargets() {
List<BulkRequestTarget> requestTargets = targetStore.getInitialTargets(rid, true);

LOGGER.debug("{} - processFileTargets, initial size {}.", ruid, requestTargets.size());
Expand All @@ -979,7 +1014,14 @@ private void processFileTargets() throws InterruptedException {
}

for (BulkRequestTarget target : requestTargets) {
new TargetTask(target, TaskState.RESOLVE_PATH).submitAsync();
try {
new TargetTask(target, TaskState.RESOLVE_PATH).submitAsync();
} catch (InterruptedException e) {
/*
* Cancel most likely called; stop processing.
*/
break;
}
}
}

Expand Down
Loading

0 comments on commit 8cac6fa

Please sign in to comment.