Skip to content

Commit

Permalink
dcache-bulk: give directory listing a separate executor
Browse files Browse the repository at this point in the history
Motivation:

The container rewrite originally put
everything, including directory listing,
on the same unbounded thread pool.

Then a change was made to make
that pool bounded.

However, the extra semaphore
permits involved can lead to
a deadlock situation where no
further listing takes place
and tasks make no progress
(this was observed and is
reproducible).

Modification:

Return directory listing to its
own executor.

We also modify slightly the
construction of the inner tasks
so that only parent methods
which manipulate the semaphore
are necessary.

Result:

Lockup eliminated.

Target: master
Request: 9.2
Patch: https://rb.dcache.org/r/14126/
Requires-notes:  yes, if backported after initial release.
Acked-by: Tigran
  • Loading branch information
alrossi committed Oct 10, 2023
1 parent 3cbe3f4 commit 76fde73
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ abstract class ContainerTask implements Runnable {
final long seqNo;

Future taskFuture;
boolean holdingPermit;
ExecutorService taskExecutor;
Semaphore taskSemaphore;

ContainerTask() {
seqNo = taskCounter.getAndIncrement();
Expand Down Expand Up @@ -242,11 +245,17 @@ void expandDepthFirst(Long id, PID pid, FsPath path, FileAttributes dirAttribute
void submitAsync() throws InterruptedException {
checkForRequestCancellation();

if (!holdingPermit) {
taskSemaphore.acquire();
holdingPermit = true;
}

synchronized (running) {
running.put(seqNo, this);
LOGGER.debug("{} - submitAsync {}, task count is now {}.", ruid, seqNo, running.size());
}
taskFuture = executor.submit(this);

taskFuture = taskExecutor.submit(this);
}

void remove() {
Expand All @@ -255,6 +264,11 @@ void remove() {
LOGGER.debug("{} - remove task {}, task count now {}.", ruid, seqNo, running.size());
}

if (holdingPermit) {
taskSemaphore.release();
holdingPermit = false;
}

checkTransitionToDirs();
}

Expand All @@ -272,6 +286,8 @@ class DirListTask extends ContainerTask {
this.pid = pid;
this.path = path;
this.dirAttributes = dirAttributes;
taskExecutor = listExecutor;
taskSemaphore = dirListSemaphore;
}

void doInner() throws Throwable {
Expand Down Expand Up @@ -345,15 +361,10 @@ private void addDirTarget(Long id, PID pid, FsPath path, FileAttributes attribut

private DirectoryStream getDirectoryListing(FsPath path)
throws CacheException, InterruptedException {
dirListSemaphore.acquire();
try {
LOGGER.debug("{} - DirListTask, getDirectoryListing for path {}, calling list ...",
ruid, path);
return listHandler.list(subject, restriction, path, null,
Range.closedOpen(0, Integer.MAX_VALUE), MINIMALLY_REQUIRED_ATTRIBUTES);
} finally {
dirListSemaphore.release();
}
LOGGER.debug("{} - DirListTask, getDirectoryListing for path {}, calling list ...",
ruid, path);
return listHandler.list(subject, restriction, path, null,
Range.closedOpen(0, Integer.MAX_VALUE), MINIMALLY_REQUIRED_ATTRIBUTES);
}
}

Expand All @@ -371,11 +382,11 @@ class TargetTask extends ContainerTask {
*/
TaskState state;

boolean holdingPermit;

TargetTask(BulkRequestTarget target, TaskState initialState) {
this.target = target;
state = initialState;
taskExecutor = BulkRequestContainerJob.this.executor;
taskSemaphore = inFlightSemaphore;
}

void cancel() {
Expand Down Expand Up @@ -415,23 +426,6 @@ void doInner() throws Throwable {
}
}

@Override
void submitAsync() throws InterruptedException {
if (!holdingPermit) {
inFlightSemaphore.acquire();
holdingPermit = true;
}
super.submitAsync();
}

void remove() {
super.remove();
if (holdingPermit) {
inFlightSemaphore.release();
holdingPermit = false;
}
}

void performSync() throws InterruptedException {
performActivity(false);

Expand Down Expand Up @@ -668,6 +662,7 @@ private void storeOrUpdate(Throwable error) throws InterruptedException {
private SignalAware callback;
private Thread runThread;
private ExecutorService executor;
private ExecutorService listExecutor;
private ExecutorService callbackExecutor;
private Semaphore dirListSemaphore;
private Semaphore inFlightSemaphore;
Expand Down Expand Up @@ -847,6 +842,10 @@ public void setExecutor(ExecutorService executor) {
this.executor = executor;
}

public void setListExecutor(ExecutorService listExecutor) {
this.listExecutor = listExecutor;
}

public void setNamespaceHandler(PnfsHandler pnfsHandler) {
this.pnfsHandler = pnfsHandler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@ private ListMultimap<String, String> userRequests() {
*/
private ExecutorService callbackExecutor;

/**
* Thread dedicated to directory listing.
*/
private ExecutorService listExecutor;

/**
* Thread dedicated to jobs.
*/
Expand Down Expand Up @@ -440,6 +445,11 @@ public void setExecutor(BoundedCachedExecutor pooledExecutor) {
this.executorService = pooledExecutor;
}

@Required
public void setListExecutor(BoundedCachedExecutor listExecutor) {
this.listExecutor = listExecutor;
}

@Required
public void setTargetStore(BulkTargetStore targetStore) {
this.targetStore = targetStore;
Expand Down Expand Up @@ -534,6 +544,7 @@ void startJob(BulkRequestContainerJob job) {
LOGGER.trace("submitting job {} to executor, target {}.", key,
job.getTarget());
job.setExecutor(executorService);
job.setListExecutor(listExecutor);
job.setCallbackExecutor(callbackExecutor);
job.setCallback(this);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,11 @@
<constructor-arg value="${bulk.db.connections.max}"/>
</bean>
</property>
<property name="listExecutor" >
<bean class="org.dcache.util.BoundedCachedExecutor">
<constructor-arg value="${bulk.limits.dir-list-semaphore}"/>
</bean>
</property>
<property name="timeout" value="${bulk.limits.sweep-interval}"/>
<property name="timeoutUnit" value="${bulk.limits.sweep-interval.unit}"/>
</bean>
Expand Down

0 comments on commit 76fde73

Please sign in to comment.