Skip to content

Commit

Permalink
dcache-bulk: fix thread executor injection
Browse files Browse the repository at this point in the history
Motivation:

Recent changes to the job container implementation
inadvertently scrambled thread pool injection
such that one of the executors (intended to run
the main container jobs) never gets used (the
container job is executed by the same thread
pool as its own tasks).

Modification:

Fix the injection.  For the sake of consistency,
all the thread pools are injected into the
container by the job factory rather than
the manager.

Result:

Container jobs run on their own thread pool,
as intended.

Target: master
Request: 9.2
Requires-notes:  no (this is largely invisible)
Patch: https://rb.dcache.org/r/14135
Acked-by: Tigran
alrossi committed Oct 17, 2023
1 parent 32c2127 commit adaacd8
Showing 4 changed files with 49 additions and 50 deletions.
Original file line number Diff line number Diff line change
@@ -80,6 +80,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.services.bulk.util.BulkRequestTarget.PID;
import org.dcache.services.bulk.util.BulkRequestTargetBuilder;
import org.dcache.services.bulk.util.BulkServiceStatistics;
import org.dcache.util.BoundedCachedExecutor;
import org.dcache.util.list.ListDirectoryHandler;
import org.dcache.vehicles.FileAttributes;
import org.slf4j.Logger;
@@ -102,6 +103,9 @@ public final class RequestContainerJobFactory {
private BulkServiceStatistics statistics;
private Semaphore dirListSemaphore;
private Semaphore inFlightSemaphore;
private BoundedCachedExecutor taskExecutor;
private BoundedCachedExecutor callbackExecutor;
private BoundedCachedExecutor listExecutor;

public BulkRequestContainerJob createRequestJob(BulkRequest request)
throws BulkServiceException {
@@ -132,6 +136,9 @@ public BulkRequestContainerJob createRequestJob(BulkRequest request)
containerJob.setListHandler(listHandler);
containerJob.setDirListSemaphore(dirListSemaphore);
containerJob.setInFlightSemaphore(inFlightSemaphore);
containerJob.setExecutor(taskExecutor);
containerJob.setListExecutor(listExecutor);
containerJob.setCallbackExecutor(callbackExecutor);
containerJob.initialize();
return containerJob;
}
@@ -149,11 +156,21 @@ public void setActivityFactory(BulkActivityFactory activityFactory) {
this.activityFactory = activityFactory;
}

@Required
public void setCallbackExecutor(BoundedCachedExecutor callbackExecutor) {
this.callbackExecutor = callbackExecutor;
}

@Required
public void setListHandler(ListDirectoryHandler listHandler) {
this.listHandler = listHandler;
}

@Required
public void setListExecutor(BoundedCachedExecutor listExecutor) {
this.listExecutor = listExecutor;
}

@Required
public void setDirListSemaphore(int permits) {
dirListSemaphore = new Semaphore(permits);
@@ -184,6 +201,11 @@ public void setTargetStore(BulkTargetStore targetStore) {
this.targetStore = targetStore;
}

@Required
public void setTaskExecutor(BoundedCachedExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}

BulkActivity create(BulkRequest request) throws BulkServiceException {
String rid = request.getUid();

Original file line number Diff line number Diff line change
@@ -311,19 +311,9 @@ private ListMultimap<String, String> userRequests() {
private ExecutorService processorExecutorService;

/**
* Thread dedicated to job callbacks.
* Thread dedicated to running containerJobs.
*/
private ExecutorService callbackExecutor;

/**
* Thread dedicated to directory listing.
*/
private ExecutorService listExecutor;

/**
* Thread dedicated to jobs.
*/
private ExecutorService executorService;
private ExecutorService containerExecutor;

/**
* Records number of jobs and requests processed.
@@ -430,24 +420,14 @@ public int getMaxActiveRequests() {
return maxActiveRequests;
}

@Required
public void setCallbackExecutor(BoundedCachedExecutor callbackExecutor) {
this.callbackExecutor = callbackExecutor;
}

@Required
public void setCompletionHandler(BulkRequestCompletionHandler completionHandler) {
this.completionHandler = completionHandler;
}

@Required
public void setExecutor(BoundedCachedExecutor pooledExecutor) {
this.executorService = pooledExecutor;
}

@Required
public void setListExecutor(BoundedCachedExecutor listExecutor) {
this.listExecutor = listExecutor;
public void setContainerExecutor(BoundedCachedExecutor containerExecutor) {
this.containerExecutor = containerExecutor;
}

@Required
@@ -543,14 +523,11 @@ void startJob(BulkRequestContainerJob job) {
String key = job.getTarget().getKey();
LOGGER.trace("submitting job {} to executor, target {}.", key,
job.getTarget());
job.setExecutor(executorService);
job.setListExecutor(listExecutor);
job.setCallbackExecutor(callbackExecutor);
job.setCallback(this);
try {
if (isJobValid(job)) { /* possibly cancelled in flight */
job.update(State.RUNNING);
executorService.submit(new FireAndForgetTask(job));
containerExecutor.submit(new FireAndForgetTask(job));
}
} catch (RuntimeException e) {
job.getTarget().setErrorObject(e);
Original file line number Diff line number Diff line change
@@ -67,13 +67,21 @@
</constructor-arg>
</bean>

<bean id="container-job-executor" class="org.dcache.util.CDCExecutorServiceDecorator">
<bean id="container-job-executor" class="org.dcache.util.BoundedCachedExecutor">
<description>Used to execute jobs that are executed by a batch container.</description>
<constructor-arg>
<bean class="org.dcache.util.BoundedCachedExecutor">
<constructor-arg value="${bulk.limits.container-processing-threads}"/>
</bean>
</constructor-arg>
<constructor-arg value="${bulk.limits.container-processing-threads}"/>
</bean>

<bean id="task-executor" class="org.dcache.util.BoundedCachedExecutor">
<constructor-arg value="${bulk.limits.in-flight-semaphore}"/>
</bean>

<bean id="callback-executor" class="org.dcache.util.BoundedCachedExecutor">
<constructor-arg value="${bulk.db.connections.max}"/>
</bean>

<bean id="dir-list-executor" class="org.dcache.util.BoundedCachedExecutor">
<constructor-arg value="${bulk.limits.dir-list-semaphore}"/>
</bean>

<bean id="cancellation-executor" class="org.dcache.util.CDCScheduledExecutorServiceDecorator">
@@ -209,6 +217,9 @@
<property name="statistics" ref="statistics"/>
<property name="dirListSemaphore" value="${bulk.limits.dir-list-semaphore}"/>
<property name="inFlightSemaphore" value="${bulk.limits.in-flight-semaphore}"/>
<property name="taskExecutor" ref="task-executor"/>
<property name="callbackExecutor" ref="callback-executor"/>
<property name="listExecutor" ref="dir-list-executor"/>
</bean>

<bean id="statistics" class="org.dcache.services.bulk.util.BulkServiceStatistics">
@@ -243,21 +254,7 @@
<property name="submissionHandler" ref="request-handler"/>
<property name="schedulerProvider" ref="scheduler-provider"/>
<property name="maxActiveRequests" value="${bulk.limits.container-processing-threads}"/>
<property name="executor">
<bean class="org.dcache.util.BoundedCachedExecutor">
<constructor-arg value="${bulk.limits.in-flight-semaphore}"/>
</bean>
</property>
<property name="callbackExecutor">
<bean class="org.dcache.util.BoundedCachedExecutor">
<constructor-arg value="${bulk.db.connections.max}"/>
</bean>
</property>
<property name="listExecutor" >
<bean class="org.dcache.util.BoundedCachedExecutor">
<constructor-arg value="${bulk.limits.dir-list-semaphore}"/>
</bean>
</property>
<property name="containerExecutor" ref="container-job-executor"/>
<property name="timeout" value="${bulk.limits.sweep-interval}"/>
<property name="timeoutUnit" value="${bulk.limits.sweep-interval.unit}"/>
</bean>
@@ -299,6 +296,9 @@
<list>
<ref bean="incoming-thread-executor"/>
<ref bean="container-job-executor"/>
<ref bean="task-executor"/>
<ref bean="callback-executor"/>
<ref bean="dir-list-executor"/>
<ref bean="cancellation-executor"/>
</list>
</property>
2 changes: 1 addition & 1 deletion skel/share/defaults/bulk.properties
Original file line number Diff line number Diff line change
@@ -69,7 +69,7 @@ bulk.request-scheduler=org.dcache.services.bulk.manager.scheduler.LeastRecentFir
#
bulk.limits.container-processing-threads=100
bulk.limits.incoming-request-threads=10
bulk.limits.cancellation-threads=10
bulk.limits.cancellation-threads=${bulk.limits.container-processing-threads}

# ---- Expiration of the cache serving to front the request storage.
#

0 comments on commit adaacd8

Please sign in to comment.