From adaacd864a805b8337d0c1529bd95be777775734 Mon Sep 17 00:00:00 2001 From: Albert Louis Rossi Date: Tue, 17 Oct 2023 07:14:33 -0500 Subject: [PATCH] dcache-bulk: fix thread executor injection Motivation: Recent changes to the job container implementation inadvertently scrambled thread pool injection such that one of the executors (intended to run the main container jobs) never gets used (the container job is executed by the same thread pool as its own tasks). Modification: Fix the injection. For the sake of consistency, all the thread pools are injected into the container by the job factory rather than the manager. Result: Container jobs run on their own thread pool, as intended. Target: master Request: 9.2 Requires-notes: no (this is largely invisible) Patch: https://rb.dcache.org/r/14135 Acked-by: Tigran --- .../bulk/job/RequestContainerJobFactory.java | 22 ++++++++++ .../manager/ConcurrentRequestManager.java | 33 +++------------ .../org/dcache/services/bulk/bulk.xml | 42 +++++++++---------- skel/share/defaults/bulk.properties | 2 +- 4 files changed, 49 insertions(+), 50 deletions(-) diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/RequestContainerJobFactory.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/RequestContainerJobFactory.java index 28a997e5160..5e145773772 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/RequestContainerJobFactory.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/RequestContainerJobFactory.java @@ -80,6 +80,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import org.dcache.services.bulk.util.BulkRequestTarget.PID; import org.dcache.services.bulk.util.BulkRequestTargetBuilder; import org.dcache.services.bulk.util.BulkServiceStatistics; +import org.dcache.util.BoundedCachedExecutor; import org.dcache.util.list.ListDirectoryHandler; import org.dcache.vehicles.FileAttributes; import org.slf4j.Logger; @@ -102,6 +103,9 @@ public final class RequestContainerJobFactory { private BulkServiceStatistics statistics; private Semaphore dirListSemaphore; private Semaphore inFlightSemaphore; + private BoundedCachedExecutor taskExecutor; + private BoundedCachedExecutor callbackExecutor; + private BoundedCachedExecutor listExecutor; public BulkRequestContainerJob createRequestJob(BulkRequest request) throws BulkServiceException { @@ -132,6 +136,9 @@ public BulkRequestContainerJob createRequestJob(BulkRequest request) containerJob.setListHandler(listHandler); containerJob.setDirListSemaphore(dirListSemaphore); containerJob.setInFlightSemaphore(inFlightSemaphore); + containerJob.setExecutor(taskExecutor); + containerJob.setListExecutor(listExecutor); + containerJob.setCallbackExecutor(callbackExecutor); containerJob.initialize(); return containerJob; } @@ -149,11 +156,21 @@ public void setActivityFactory(BulkActivityFactory activityFactory) { this.activityFactory = activityFactory; } + @Required + public void setCallbackExecutor(BoundedCachedExecutor callbackExecutor) { + this.callbackExecutor = callbackExecutor; + } + @Required public void setListHandler(ListDirectoryHandler listHandler) { this.listHandler = listHandler; } + @Required + public void setListExecutor(BoundedCachedExecutor listExecutor) { + this.listExecutor = listExecutor; + } + @Required public void setDirListSemaphore(int permits) { dirListSemaphore = new Semaphore(permits); @@ -184,6 +201,11 @@ public void setTargetStore(BulkTargetStore targetStore) { this.targetStore = targetStore; } + @Required + public void setTaskExecutor(BoundedCachedExecutor taskExecutor) { + this.taskExecutor = taskExecutor; + } + BulkActivity create(BulkRequest request) throws BulkServiceException { String rid = request.getUid(); diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java index 1bef438619c..aa7808a04c3 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java @@ -311,19 +311,9 @@ private ListMultimap userRequests() { private ExecutorService processorExecutorService; /** - * Thread dedicated to job callbacks. + * Thread dedicated to running containerJobs. */ - private ExecutorService callbackExecutor; - - /** - * Thread dedicated to directory listing. - */ - private ExecutorService listExecutor; - - /** - * Thread dedicated to jobs. - */ - private ExecutorService executorService; + private ExecutorService containerExecutor; /** * Records number of jobs and requests processed. @@ -430,24 +420,14 @@ public int getMaxActiveRequests() { return maxActiveRequests; } - @Required - public void setCallbackExecutor(BoundedCachedExecutor callbackExecutor) { - this.callbackExecutor = callbackExecutor; - } - @Required public void setCompletionHandler(BulkRequestCompletionHandler completionHandler) { this.completionHandler = completionHandler; } @Required - public void setExecutor(BoundedCachedExecutor pooledExecutor) { - this.executorService = pooledExecutor; - } - - @Required - public void setListExecutor(BoundedCachedExecutor listExecutor) { - this.listExecutor = listExecutor; + public void setContainerExecutor(BoundedCachedExecutor containerExecutor) { + this.containerExecutor = containerExecutor; } @Required @@ -543,14 +523,11 @@ void startJob(BulkRequestContainerJob job) { String key = job.getTarget().getKey(); LOGGER.trace("submitting job {} to executor, target {}.", key, job.getTarget()); - job.setExecutor(executorService); - job.setListExecutor(listExecutor); - job.setCallbackExecutor(callbackExecutor); job.setCallback(this); try { if (isJobValid(job)) { /* possibly cancelled in flight */ job.update(State.RUNNING); - executorService.submit(new FireAndForgetTask(job)); + containerExecutor.submit(new FireAndForgetTask(job)); } } catch (RuntimeException e) { job.getTarget().setErrorObject(e); diff --git a/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml b/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml index 78909bb1b51..603d7a400bb 100644 --- a/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml +++ b/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml @@ -67,13 +67,21 @@ - + Used to execute jobs that are executed by a batch container. - - - - - + + + + + + + + + + + + + @@ -209,6 +217,9 @@ + + + @@ -243,21 +254,7 @@ - - - - - - - - - - - - - - - + @@ -299,6 +296,9 @@ + + + diff --git a/skel/share/defaults/bulk.properties b/skel/share/defaults/bulk.properties index 8a2ef72d8c6..6876b56bba5 100644 --- a/skel/share/defaults/bulk.properties +++ b/skel/share/defaults/bulk.properties @@ -69,7 +69,7 @@ bulk.request-scheduler=org.dcache.services.bulk.manager.scheduler.LeastRecentFir # bulk.limits.container-processing-threads=100 bulk.limits.incoming-request-threads=10 -bulk.limits.cancellation-threads=10 +bulk.limits.cancellation-threads=${bulk.limits.container-processing-threads} # ---- Expiration of the cache serving to front the request storage. #