Skip to content

Commit

Permalink
Merge pull request #7394 from alrossi/fix/9.2/bulk-fix-executor-injec…
Browse files Browse the repository at this point in the history
…tion

dcache-bulk: fix thread executor injection
  • Loading branch information
svemeyer authored Oct 17, 2023
2 parents 04a6cac + ea7e261 commit 1c1a08d
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.services.bulk.util.BulkRequestTarget.PID;
import org.dcache.services.bulk.util.BulkRequestTargetBuilder;
import org.dcache.services.bulk.util.BulkServiceStatistics;
import org.dcache.util.BoundedCachedExecutor;
import org.dcache.util.list.ListDirectoryHandler;
import org.dcache.vehicles.FileAttributes;
import org.slf4j.Logger;
Expand All @@ -102,6 +103,9 @@ public final class RequestContainerJobFactory {
private BulkServiceStatistics statistics;
private Semaphore dirListSemaphore;
private Semaphore inFlightSemaphore;
private BoundedCachedExecutor taskExecutor;
private BoundedCachedExecutor callbackExecutor;
private BoundedCachedExecutor listExecutor;

public BulkRequestContainerJob createRequestJob(BulkRequest request)
throws BulkServiceException {
Expand Down Expand Up @@ -132,6 +136,9 @@ public BulkRequestContainerJob createRequestJob(BulkRequest request)
containerJob.setListHandler(listHandler);
containerJob.setDirListSemaphore(dirListSemaphore);
containerJob.setInFlightSemaphore(inFlightSemaphore);
containerJob.setExecutor(taskExecutor);
containerJob.setListExecutor(listExecutor);
containerJob.setCallbackExecutor(callbackExecutor);
containerJob.initialize();
return containerJob;
}
Expand All @@ -149,11 +156,21 @@ public void setActivityFactory(BulkActivityFactory activityFactory) {
this.activityFactory = activityFactory;
}

@Required
public void setCallbackExecutor(BoundedCachedExecutor callbackExecutor) {
this.callbackExecutor = callbackExecutor;
}

@Required
public void setListHandler(ListDirectoryHandler listHandler) {
this.listHandler = listHandler;
}

@Required
public void setListExecutor(BoundedCachedExecutor listExecutor) {
this.listExecutor = listExecutor;
}

@Required
public void setDirListSemaphore(int permits) {
dirListSemaphore = new Semaphore(permits);
Expand Down Expand Up @@ -184,6 +201,11 @@ public void setTargetStore(BulkTargetStore targetStore) {
this.targetStore = targetStore;
}

@Required
public void setTaskExecutor(BoundedCachedExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}

BulkActivity create(BulkRequest request) throws BulkServiceException {
String rid = request.getUid();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,19 +311,9 @@ private ListMultimap<String, String> userRequests() {
private ExecutorService processorExecutorService;

/**
* Thread dedicated to job callbacks.
* Thread dedicated to running containerJobs.
*/
private ExecutorService callbackExecutor;

/**
* Thread dedicated to directory listing.
*/
private ExecutorService listExecutor;

/**
* Thread dedicated to jobs.
*/
private ExecutorService executorService;
private ExecutorService containerExecutor;

/**
* Records number of jobs and requests processed.
Expand Down Expand Up @@ -430,24 +420,14 @@ public int getMaxActiveRequests() {
return maxActiveRequests;
}

@Required
public void setCallbackExecutor(BoundedCachedExecutor callbackExecutor) {
this.callbackExecutor = callbackExecutor;
}

@Required
public void setCompletionHandler(BulkRequestCompletionHandler completionHandler) {
this.completionHandler = completionHandler;
}

@Required
public void setExecutor(BoundedCachedExecutor pooledExecutor) {
this.executorService = pooledExecutor;
}

@Required
public void setListExecutor(BoundedCachedExecutor listExecutor) {
this.listExecutor = listExecutor;
public void setContainerExecutor(BoundedCachedExecutor containerExecutor) {
this.containerExecutor = containerExecutor;
}

@Required
Expand Down Expand Up @@ -543,14 +523,11 @@ void startJob(BulkRequestContainerJob job) {
String key = job.getTarget().getKey();
LOGGER.trace("submitting job {} to executor, target {}.", key,
job.getTarget());
job.setExecutor(executorService);
job.setListExecutor(listExecutor);
job.setCallbackExecutor(callbackExecutor);
job.setCallback(this);
try {
if (isJobValid(job)) { /* possibly cancelled in flight */
job.update(State.RUNNING);
executorService.submit(new FireAndForgetTask(job));
containerExecutor.submit(new FireAndForgetTask(job));
}
} catch (RuntimeException e) {
job.getTarget().setErrorObject(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,21 @@
</constructor-arg>
</bean>

<bean id="container-job-executor" class="org.dcache.util.CDCExecutorServiceDecorator">
<bean id="container-job-executor" class="org.dcache.util.BoundedCachedExecutor">
<description>Used to execute jobs that are executed by a batch container.</description>
<constructor-arg>
<bean class="org.dcache.util.BoundedCachedExecutor">
<constructor-arg value="${bulk.limits.container-processing-threads}"/>
</bean>
</constructor-arg>
<constructor-arg value="${bulk.limits.container-processing-threads}"/>
</bean>

<bean id="task-executor" class="org.dcache.util.BoundedCachedExecutor">
<constructor-arg value="${bulk.limits.in-flight-semaphore}"/>
</bean>

<bean id="callback-executor" class="org.dcache.util.BoundedCachedExecutor">
<constructor-arg value="${bulk.db.connections.max}"/>
</bean>

<bean id="dir-list-executor" class="org.dcache.util.BoundedCachedExecutor">
<constructor-arg value="${bulk.limits.dir-list-semaphore}"/>
</bean>

<bean id="cancellation-executor" class="org.dcache.util.CDCScheduledExecutorServiceDecorator">
Expand Down Expand Up @@ -209,6 +217,9 @@
<property name="statistics" ref="statistics"/>
<property name="dirListSemaphore" value="${bulk.limits.dir-list-semaphore}"/>
<property name="inFlightSemaphore" value="${bulk.limits.in-flight-semaphore}"/>
<property name="taskExecutor" ref="task-executor"/>
<property name="callbackExecutor" ref="callback-executor"/>
<property name="listExecutor" ref="dir-list-executor"/>
</bean>

<bean id="statistics" class="org.dcache.services.bulk.util.BulkServiceStatistics">
Expand Down Expand Up @@ -243,21 +254,7 @@
<property name="submissionHandler" ref="request-handler"/>
<property name="schedulerProvider" ref="scheduler-provider"/>
<property name="maxActiveRequests" value="${bulk.limits.container-processing-threads}"/>
<property name="executor">
<bean class="org.dcache.util.BoundedCachedExecutor">
<constructor-arg value="${bulk.limits.in-flight-semaphore}"/>
</bean>
</property>
<property name="callbackExecutor">
<bean class="org.dcache.util.BoundedCachedExecutor">
<constructor-arg value="${bulk.db.connections.max}"/>
</bean>
</property>
<property name="listExecutor" >
<bean class="org.dcache.util.BoundedCachedExecutor">
<constructor-arg value="${bulk.limits.dir-list-semaphore}"/>
</bean>
</property>
<property name="containerExecutor" ref="container-job-executor"/>
<property name="timeout" value="${bulk.limits.sweep-interval}"/>
<property name="timeoutUnit" value="${bulk.limits.sweep-interval.unit}"/>
</bean>
Expand Down Expand Up @@ -299,6 +296,9 @@
<list>
<ref bean="incoming-thread-executor"/>
<ref bean="container-job-executor"/>
<ref bean="task-executor"/>
<ref bean="callback-executor"/>
<ref bean="dir-list-executor"/>
<ref bean="cancellation-executor"/>
</list>
</property>
Expand Down
2 changes: 1 addition & 1 deletion skel/share/defaults/bulk.properties
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ bulk.request-scheduler=org.dcache.services.bulk.manager.scheduler.LeastRecentFir
#
bulk.limits.container-processing-threads=100
bulk.limits.incoming-request-threads=10
bulk.limits.cancellation-threads=10
bulk.limits.cancellation-threads=${bulk.limits.container-processing-threads}

# ---- Expiration of the cache serving to front the request storage.
#
Expand Down

0 comments on commit 1c1a08d

Please sign in to comment.