diff --git a/docs/TheBook/src/main/markdown/config-bulk.md b/docs/TheBook/src/main/markdown/config-bulk.md index 71361983627..650503a0d9a 100644 --- a/docs/TheBook/src/main/markdown/config-bulk.md +++ b/docs/TheBook/src/main/markdown/config-bulk.md @@ -90,26 +90,9 @@ a different id. The default lifetime is five minutes (the same as for the NFS do the [QoS Engine](config-qos-engine.md). - **LOG_TARGET** : logs metadata for each target at the INFO level. -Each activity is associated with - -- a permit count (used in connection with a semaphore for throttling execution); -- two thread queues, one for the execution of the container job, -and the other for the execution of callbacks on activity futures; -- a retry policy (currently the only retry policy is a NOP, i.e., no retry). - -The permits are configurable using either the property or the admin shell -command ``request policy``. - +Each activity is associated with a retry policy (currently the only retry policy is a NOP, i.e., no retry). Should other retry policies become available, these can be set via a property. -The number and distribution of thread executors is hard-coded for the activities, but their -respective sizes can be adjusted using the properties: - - ``` - bulk.limits.container-processing-threads=110 - bulk.limits.activity-callback-threads=50 - ``` - ## Container Design Version 2 of the bulk service has introduced improvements for better scalability and recovery. diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkService.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkService.java index 55a0f03a112..822debac7b0 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkService.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkService.java @@ -59,7 +59,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING */ package org.dcache.services.bulk; -import static org.dcache.services.bulk.job.AbstractRequestContainerJob.findAbsolutePath; +import static org.dcache.services.bulk.job.BulkRequestContainerJob.findAbsolutePath; import static org.dcache.services.bulk.util.BulkRequestTarget.computeFsPath; import com.google.common.base.Strings; diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkServiceCommands.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkServiceCommands.java index ad74f3c546c..6ec8bd1bcdc 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkServiceCommands.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkServiceCommands.java @@ -93,6 +93,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.security.auth.Subject; @@ -181,7 +182,7 @@ public final class BulkServiceCommands implements CellCommandListener { /** * name | class | type | permits */ - private static final String FORMAT_ACTIVITY = "%-20s | %100s | %7s | %10s "; + private static final String FORMAT_ACTIVITY = "%-20s | %100s | %7s "; /** * name | required | description @@ -267,8 +268,7 @@ private static String formatActivity(Entry entry) return String.format(FORMAT_ACTIVITY, entry.getKey(), provider.getActivityClass(), - provider.getTargetType(), - provider.getMaxPermits()); + provider.getTargetType()); } private static String formatArgument(BulkActivityArgumentDescriptor descriptor) { @@ -550,7 +550,7 @@ public String call() throws Exception { return "There are no mapped activities!"; } - return String.format(FORMAT_ACTIVITY, "NAME", "CLASS", "TYPE", "PERMITS") + return String.format(FORMAT_ACTIVITY, "NAME", "CLASS", "TYPE") + "\n" + activities; } } @@ -1371,7 +1371,7 @@ public PagedTargetResult call() throws Exception { private BulkActivityFactory activityFactory; private BulkTargetStore targetStore; private BulkServiceStatistics statistics; - private ExecutorService executor; + private ExecutorService executor = Executors.newSingleThreadExecutor(); private JdbcBulkArchiveDao archiveDao; @@ -1390,11 +1390,6 @@ public void setArchiveDao(JdbcBulkArchiveDao archiveDao) { this.archiveDao = archiveDao; } - @Required - public void setExecutor(ExecutorService executor) { - this.executor = executor; - } - @Required public void setRequestManager(BulkRequestManager requestManager) { this.requestManager = requestManager; diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivity.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivity.java index fa88a53a16b..8ab79b81e58 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivity.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivity.java @@ -65,14 +65,13 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.EnumSet; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import javax.security.auth.Subject; import org.dcache.auth.attributes.Restriction; import org.dcache.namespace.FileAttribute; import org.dcache.services.bulk.BulkServiceException; import org.dcache.services.bulk.activity.retry.BulkTargetRetryPolicy; import org.dcache.services.bulk.activity.retry.NoRetryPolicy; -import org.dcache.services.bulk.util.BatchedResult; import org.dcache.services.bulk.util.BulkRequestTarget; import org.dcache.vehicles.FileAttributes; @@ -98,25 +97,17 @@ public enum TargetType { private static final BulkTargetRetryPolicy DEFAULT_RETRY_POLICY = new NoRetryPolicy(); - private static final int DEFAULT_PERMITS = 50; - protected final String name; protected final TargetType targetType; protected Subject subject; protected Restriction restriction; - protected Set requiredAttributes; - protected int maxPermits; - protected ExecutorService activityExecutor; - protected ExecutorService callbackExecutor; protected BulkTargetRetryPolicy retryPolicy; protected Set descriptors; protected BulkActivity(String name, TargetType targetType) { this.name = name; this.targetType = targetType; - requiredAttributes = MINIMALLY_REQUIRED_ATTRIBUTES; - maxPermits = DEFAULT_PERMITS; retryPolicy = DEFAULT_RETRY_POLICY; } @@ -124,10 +115,6 @@ public void cancel(BulkRequestTarget target) { target.cancel(); } - public int getMaxPermits() { - return maxPermits; - } - public String getName() { return name; } @@ -144,10 +131,6 @@ public TargetType getTargetType() { return targetType; } - public Set getRequiredAttributes() { - return requiredAttributes; - } - public Subject getSubject() { return subject; } @@ -164,39 +147,10 @@ public void setRestriction(Restriction restriction) { this.restriction = restriction; } - public ExecutorService getActivityExecutor() { - return activityExecutor; - } - - public void setActivityExecutor(ExecutorService activityExecutor) { - this.activityExecutor = activityExecutor; - } - - public ExecutorService getCallbackExecutor() { - return callbackExecutor; - } - - public void setCallbackExecutor(ExecutorService callbackExecutor) { - this.callbackExecutor = callbackExecutor; - } - public void setDescriptors(Set descriptors) { this.descriptors = descriptors; } - public void setMaxPermits(int maxPermits) { - this.maxPermits = maxPermits; - } - - /** - * Completion handler method. Calls the internal implementation. - * - * @param result of the targeted activity. - */ - public void handleCompletion(BatchedResult result) { - handleCompletion(result.getTarget(), result.getFuture()); - } - /** * Performs the activity. * @@ -223,5 +177,5 @@ public abstract ListenableFuture perform(String rid, long tid, FsPath path, F * @param target which has terminate. * @param future the future returned by the activity call to perform(); */ - protected abstract void handleCompletion(BulkRequestTarget target, ListenableFuture future); + public abstract void handleCompletion(BulkRequestTarget target, Future future); } diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityFactory.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityFactory.java index 27f3e34711f..2904da6b8f0 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityFactory.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityFactory.java @@ -70,7 +70,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.HashMap; import java.util.Map; import java.util.ServiceLoader; -import java.util.concurrent.ExecutorService; import javax.security.auth.Subject; import org.dcache.auth.Subjects; import org.dcache.auth.attributes.Restriction; @@ -102,9 +101,6 @@ public final class BulkActivityFactory implements CellMessageSender, Environment new HashMap<>()); private Map retryPolicies; - private Map activityExecutors; - private Map callbackExecutors; - private Map maxPermits; private Map environment; private CellStub pnfsManager; @@ -142,8 +138,6 @@ public BulkActivity createActivity(BulkRequest request, Subject subject, bulkActivity.setSubject(subject); bulkActivity.setRestriction(restriction); - bulkActivity.setActivityExecutor(activityExecutors.get(activity)); - bulkActivity.setCallbackExecutor(callbackExecutors.get(activity)); BulkTargetRetryPolicy retryPolicy = retryPolicies.get(activity); if (retryPolicy != null) { bulkActivity.setRetryPolicy(retryPolicy); @@ -163,8 +157,6 @@ public void initialize() { ServiceLoader serviceLoader = ServiceLoader.load(BulkActivityProvider.class); for (BulkActivityProvider provider : serviceLoader) { - String activity = provider.getActivity(); - provider.setMaxPermits(maxPermits.get(activity)); provider.configure(environment); providers.put(provider.getActivity(), provider); } @@ -215,26 +207,11 @@ public void setQoSResponseReceiver(QoSResponseReceiver qoSResponseReceiver) { this.qoSResponseReceiver = qoSResponseReceiver; } - @Required - public void setMaxPermits(Map maxPermits) { - this.maxPermits = maxPermits; - } - @Required public void setRetryPolicies(Map retryPolicies) { this.retryPolicies = retryPolicies; } - @Required - public void setActivityExecutors(Map activityExecutors) { - this.activityExecutors = activityExecutors; - } - - @Required - public void setCallbackExecutors(Map callbackExecutors) { - this.callbackExecutors = callbackExecutors; - } - @Override public void setEnvironment(Map environment) { this.environment = environment; diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityProvider.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityProvider.java index 80f1fb0e184..9ea066eaac4 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityProvider.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityProvider.java @@ -73,7 +73,6 @@ public abstract class BulkActivityProvider { protected final String activity; protected final TargetType targetType; - protected int maxPermits; protected BulkActivityProvider(String activity, TargetType targetType) { this.activity = activity; @@ -88,14 +87,6 @@ public TargetType getTargetType() { return targetType; } - public int getMaxPermits() { - return maxPermits; - } - - public void setMaxPermits(int maxPermits) { - this.maxPermits = maxPermits; - } - /** * @return an instance of the specific activity type to be configured by factory. * @@ -103,7 +94,6 @@ public void setMaxPermits(int maxPermits) { */ public J createActivity() throws BulkServiceException { J activity = activityInstance(); - activity.setMaxPermits(maxPermits); activity.setDescriptors(getDescriptors()); return activity; } diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/delete/DeleteActivity.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/delete/DeleteActivity.java index 5a3ce9ab66d..8c3687326b6 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/delete/DeleteActivity.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/delete/DeleteActivity.java @@ -71,6 +71,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.Map; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.dcache.namespace.FileType; import org.dcache.services.bulk.activity.BulkActivity; import org.dcache.services.bulk.util.BulkRequestTarget; @@ -105,8 +106,8 @@ public void setNamespaceHandler(PnfsHandler pnfsHandler) { } @Override - protected void handleCompletion(BulkRequestTarget target, - ListenableFuture future) { + public void handleCompletion(BulkRequestTarget target, + Future future) { PnfsDeleteEntryMessage reply; try { reply = getUninterruptibly(future); diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/log/LogTargetActivity.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/log/LogTargetActivity.java index 2185e7601a7..330ac0158ba 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/log/LogTargetActivity.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/log/LogTargetActivity.java @@ -63,6 +63,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import com.google.common.util.concurrent.ListenableFuture; import diskCacheV111.util.FsPath; import java.util.Map; +import java.util.concurrent.Future; import org.dcache.services.bulk.activity.BulkActivity; import org.dcache.services.bulk.util.BulkRequestTarget; import org.dcache.services.bulk.util.BulkRequestTarget.State; @@ -110,7 +111,7 @@ public ListenableFuture perform(String ruid, long tid, FsPath } @Override - protected void handleCompletion(BulkRequestTarget target, ListenableFuture future) { + public void handleCompletion(BulkRequestTarget target, Future future) { target.setState(State.COMPLETED); } diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/pin/PinManagerActivity.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/pin/PinManagerActivity.java index e9a17a0b845..02019cfd741 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/pin/PinManagerActivity.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/pin/PinManagerActivity.java @@ -75,6 +75,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.Optional; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.dcache.cells.CellStub; import org.dcache.pinmanager.PinManagerAware; import org.dcache.pinmanager.PinManagerPinMessage; @@ -104,7 +105,7 @@ public void setNamespaceHandler(PnfsHandler pnfsHandler) { } @Override - protected void handleCompletion(BulkRequestTarget target, ListenableFuture future) { + public void handleCompletion(BulkRequestTarget target, Future future) { Message reply; try { reply = getUninterruptibly(future); diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/qos/UpdateQoSActivity.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/qos/UpdateQoSActivity.java index 9050c4944cb..e61b169dab0 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/qos/UpdateQoSActivity.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/qos/UpdateQoSActivity.java @@ -76,6 +76,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.Map; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.dcache.cells.CellStub; import org.dcache.qos.QoSException; import org.dcache.qos.data.FileQoSRequirements; @@ -197,7 +198,7 @@ protected void configure(Map arguments) throws BulkServiceExcept @Override public void handleCompletion(BulkRequestTarget target, - ListenableFuture future) { + Future future) { QoSTransitionCompletedMessage message; try { message = getUninterruptibly(future); diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/handler/BulkRequestHandler.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/handler/BulkRequestHandler.java index 2311ef5470c..9561b88c719 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/handler/BulkRequestHandler.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/handler/BulkRequestHandler.java @@ -74,7 +74,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import org.dcache.services.bulk.BulkRequestStatus; import org.dcache.services.bulk.BulkServiceException; import org.dcache.services.bulk.BulkStorageException; -import org.dcache.services.bulk.job.AbstractRequestContainerJob; +import org.dcache.services.bulk.job.BulkRequestContainerJob; import org.dcache.services.bulk.job.RequestContainerJobFactory; import org.dcache.services.bulk.manager.BulkRequestManager; import org.dcache.services.bulk.store.BulkRequestStore; @@ -210,7 +210,7 @@ public void setTargetStore(BulkTargetStore targetStore) { @Override public void submitRequestJob(BulkRequest request) throws BulkServiceException { - AbstractRequestContainerJob job = jobFactory.createRequestJob(request); + BulkRequestContainerJob job = jobFactory.createRequestJob(request); if (storeJobTarget(job.getTarget())) { requestManager.submit(job); } diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/AbstractRequestContainerJob.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/AbstractRequestContainerJob.java deleted file mode 100644 index 9aef072df61..00000000000 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/AbstractRequestContainerJob.java +++ /dev/null @@ -1,611 +0,0 @@ -/* -COPYRIGHT STATUS: -Dec 1st 2001, Fermi National Accelerator Laboratory (FNAL) documents and -software are sponsored by the U.S. Department of Energy under Contract No. -DE-AC02-76CH03000. Therefore, the U.S. Government retains a world-wide -non-exclusive, royalty-free license to publish or reproduce these documents -and software for U.S. Government purposes. All documents and software -available from this server are protected under the U.S. and Foreign -Copyright Laws, and FNAL reserves all rights. - -Distribution of the software available from this server is free of -charge subject to the user following the terms of the Fermitools -Software Legal Information. - -Redistribution and/or modification of the software shall be accompanied -by the Fermitools Software Legal Information (including the copyright -notice). - -The user is asked to feed back problems, benefits, and/or suggestions -about the software to the Fermilab Software Providers. - -Neither the name of Fermilab, the URA, nor the names of the contributors -may be used to endorse or promote products derived from this software -without specific prior written permission. - -DISCLAIMER OF LIABILITY (BSD): - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS -FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL FERMILAB, -OR THE URA, OR THE U.S. DEPARTMENT of ENERGY, OR CONTRIBUTORS BE LIABLE -FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT -OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR -BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING -NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -Liabilities of the Government: - -This software is provided by URA, independent from its Prime Contract -with the U.S. Department of Energy. URA is acting independently from -the Government and in its own private capacity and is not acting on -behalf of the U.S. Government, nor as its contractor nor its agent. -Correspondingly, it is understood and agreed that the U.S. Government -has no connection to this software and in no manner whatsoever shall -be liable for nor assume any responsibility or obligation for any claim, -cost, or damages arising out of or resulting from the use of the software -available from this server. - -Export Control: - -All documents and software available from this server are subject to U.S. -export control laws. Anyone downloading information from this server is -obligated to secure any necessary Government licenses before exporting -documents or software obtained from this server. - */ -package org.dcache.services.bulk.job; - -import static org.dcache.services.bulk.activity.BulkActivity.MINIMALLY_REQUIRED_ATTRIBUTES; -import static org.dcache.services.bulk.util.BulkRequestTarget.State.CANCELLED; -import static org.dcache.services.bulk.util.BulkRequestTarget.State.COMPLETED; -import static org.dcache.services.bulk.util.BulkRequestTarget.State.FAILED; -import static org.dcache.services.bulk.util.BulkRequestTarget.State.READY; -import static org.dcache.services.bulk.util.BulkRequestTarget.State.SKIPPED; -import static org.dcache.services.bulk.util.BulkRequestTarget.computeFsPath; - -import com.google.common.base.Throwables; -import com.google.common.collect.Range; -import diskCacheV111.util.CacheException; -import diskCacheV111.util.FsPath; -import diskCacheV111.util.NamespaceHandlerAware; -import diskCacheV111.util.PnfsHandler; -import java.lang.Thread.UncaughtExceptionHandler; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; -import javax.security.auth.Subject; -import org.dcache.auth.attributes.Restriction; -import org.dcache.services.bulk.BulkRequest; -import org.dcache.services.bulk.BulkRequest.Depth; -import org.dcache.services.bulk.BulkServiceException; -import org.dcache.services.bulk.BulkStorageException; -import org.dcache.services.bulk.activity.BulkActivity; -import org.dcache.services.bulk.activity.BulkActivity.TargetType; -import org.dcache.services.bulk.store.BulkTargetStore; -import org.dcache.services.bulk.util.BatchedResult; -import org.dcache.services.bulk.util.BulkRequestTarget; -import org.dcache.services.bulk.util.BulkRequestTarget.PID; -import org.dcache.services.bulk.util.BulkRequestTarget.State; -import org.dcache.services.bulk.util.BulkRequestTargetBuilder; -import org.dcache.services.bulk.util.BulkServiceStatistics; -import org.dcache.util.BoundedExecutor; -import org.dcache.util.SignalAware; -import org.dcache.util.list.DirectoryEntry; -import org.dcache.util.list.DirectoryStream; -import org.dcache.util.list.ListDirectoryHandler; -import org.dcache.vehicles.FileAttributes; -import org.dcache.vehicles.PnfsResolveSymlinksMessage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Base class for the implementations. Acts as a container for a list of targets which may or may - * not be associated with each other via a common parent. It handles all targets by calling - * activity.perform() on them serially using the activity's semaphore, and then holding them in a - * map as waiting tasks with a callback listener. - */ -public abstract class AbstractRequestContainerJob - implements Runnable, NamespaceHandlerAware, Comparable, - UncaughtExceptionHandler { - - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRequestContainerJob.class); - - public static FsPath findAbsolutePath(String prefix, String path) { - FsPath absPath = computeFsPath(null, path); - if (prefix == null) { - return absPath; - } - - FsPath pref = FsPath.create(prefix); - - if (!absPath.hasPrefix(pref)) { - absPath = computeFsPath(prefix, path); - } - - return absPath; - } - - abstract class DirListTask implements Runnable { - - protected Consumer errorHandler = e -> uncaughtException(Thread.currentThread(), e); - - public void run() { - try { - doList(); - } catch (InterruptedException e) { - containerState = ContainerState.STOP; - target.setErrorObject(e); - update(CANCELLED); - } catch (Throwable e) { - errorHandler.accept(e); - Throwables.throwIfUnchecked(e); - } - } - - protected abstract void doList() throws Throwable; - } - - enum ContainerState { - START, PROCESS_FILES, WAIT, PROCESS_DIRS, STOP - } - - protected final BulkRequest request; - protected final BulkActivity activity; - protected final Long rid; - protected final String ruid; - protected final Depth depth; - protected final String targetPrefix; - protected final Map waiting; - - /** - * A temporary placeholder for tracking purposes; it will not be the same as the actual - * autogenerated key in the database. - */ - protected final AtomicLong id; - protected final BulkServiceStatistics statistics; - protected final AtomicInteger dirExpansionCount; - - protected BulkTargetStore targetStore; - protected PnfsHandler pnfsHandler; - protected Semaphore semaphore; - - protected volatile ContainerState containerState; - - private final TargetType targetType; - private final BulkRequestTarget target; - private final Subject subject; - private final Restriction restriction; - private final Set cancelledPaths; - - private ListDirectoryHandler listHandler; - private SignalAware callback; - - private BoundedExecutor dirListExecutor; - private Thread runThread; - - AbstractRequestContainerJob(BulkActivity activity, BulkRequestTarget target, - BulkRequest request, BulkServiceStatistics statistics) { - id = new AtomicLong(0L); - this.request = request; - this.activity = activity; - this.target = target; - this.subject = activity.getSubject(); - this.restriction = activity.getRestriction(); - this.statistics = statistics; - waiting = new HashMap<>(); - cancelledPaths = new HashSet<>(); - rid = request.getId(); - ruid = request.getUid(); - depth = request.getExpandDirectories(); - targetPrefix = request.getTargetPrefix(); - targetType = activity.getTargetType(); - semaphore = new Semaphore(activity.getMaxPermits()); - containerState = ContainerState.START; - dirExpansionCount = new AtomicInteger(0); - } - - public void cancel() { - containerState = ContainerState.STOP; - - target.cancel(); - - LOGGER.debug("cancel {}: target state is now {}.", ruid, target.getState()); - - semaphore.drainPermits(); - - interruptRunThread(); - - synchronized (waiting) { - LOGGER.debug("cancel {}: waiting {}.", ruid, waiting.size()); - waiting.values().forEach(r -> r.cancel(activity)); - LOGGER.debug("cancel {}: waiting targets cancelled.", ruid); - waiting.clear(); - } - - LOGGER.debug("cancel {}: calling cancel all on target store.", ruid); - targetStore.cancelAll(rid); - - signalStateChange(); - } - - public void cancel(long id) { - synchronized (waiting) { - for (Iterator i = waiting.values().iterator(); i.hasNext(); ) { - BatchedResult result = i.next(); - if (result.getTarget().getId() == id) { - result.cancel(activity); - i.remove(); - break; - } - } - } - - try { - targetStore.update(id, CANCELLED, null, null); - } catch (BulkStorageException e) { - LOGGER.error("Failed to cancel {}::{}: {}.", ruid, id, e.toString()); - } - } - - public void cancel(String path) { - FsPath toCancel = findAbsolutePath(targetPrefix, path); - - Optional found; - - synchronized (waiting) { - found = waiting.values().stream().filter(r -> r.getTarget().getPath().equals(toCancel)) - .findAny(); - } - - if (found.isPresent()) { - cancel(found.get().getTarget().getId()); - } else { - synchronized (cancelledPaths) { - cancelledPaths.add(toCancel); - } - } - } - - @Override - public int compareTo(AbstractRequestContainerJob other) { - return target.getKey().compareTo(other.target.getKey()); - } - - public BulkActivity getActivity() { - return activity; - } - - public BulkRequestTarget getTarget() { - return target; - } - - public void initialize() { - LOGGER.trace("BulkJob {}, initialize() called ...", target.getKey()); - target.setState(READY); - containerState = ContainerState.PROCESS_FILES; - } - - public synchronized boolean isReady() { - switch (target.getState()) { - case READY: - case CREATED: - return true; - default: - return false; - } - } - - @Override - public void run() { - setRunThread(Thread.currentThread()); - try { - switch (containerState) { - case PROCESS_FILES: - preprocessTargets(); - checkForRequestCancellation(); - processFileTargets(); - checkForRequestCancellation(); - containerState = ContainerState.WAIT; - break; - case PROCESS_DIRS: - checkForRequestCancellation(); - semaphore = new Semaphore(1); /* synchronous */ - processDirTargets(); - containerState = ContainerState.STOP; - update(COMPLETED); - break; - case STOP: - LOGGER.debug("run {} was prematurely stopped; exiting", ruid); - update(CANCELLED); - setRunThread(null); - return; - default: - throw new RuntimeException( - "run container called with container in wrong state " + containerState - + "; this is a bug."); - } - } catch (InterruptedException e) { - LOGGER.debug("run {} interrupted", ruid); - /* - * If the state has not already been set to terminal, do so. - */ - containerState = ContainerState.STOP; - update(CANCELLED); - } - setRunThread(null); - checkTransitionToDirs(); - } - - public void setListHandler(ListDirectoryHandler listHandler) { - this.listHandler = listHandler; - } - - public void setDirListExecutor(BoundedExecutor dirListExecutor) { - this.dirListExecutor = dirListExecutor; - } - - @Override - public void setNamespaceHandler(PnfsHandler pnfsHandler) { - this.pnfsHandler = pnfsHandler; - } - - public void setTargetStore(BulkTargetStore targetStore) { - this.targetStore = targetStore; - } - - public void setCallback(SignalAware callback) { - this.callback = callback; - } - - @Override - public void uncaughtException(Thread t, Throwable e) { - /* - * Don't leave the request in non-terminal state in case of uncaught exception. - * We also try to handle uncaught exceptions here, so as not to kill the - * manager thread. - */ - containerState = ContainerState.STOP; - target.setErrorObject(e); - update(FAILED); - ThreadGroup group = t.getThreadGroup(); - if (group != null) { - group.uncaughtException(t, e); - } else { - LOGGER.error("Uncaught exception: please report to team@dcache.org", e); - } - } - - public void update(State state) { - if (target.setState(state)) { - try { - targetStore.update(target.getId(), target.getState(), target.getErrorType(), - target.getErrorMessage()); - } catch (BulkStorageException e) { - LOGGER.error("{}, updateJobState: {}", ruid, e.toString()); - } - signalStateChange(); - } - } - - protected void checkForRequestCancellation() throws InterruptedException { - if (isRunThreadInterrupted() || containerState == ContainerState.STOP - || target.isTerminated()) { - throw new InterruptedException(); - } - } - - protected DirectoryStream getDirectoryListing(FsPath path) - throws CacheException, InterruptedException { - LOGGER.trace("getDirectoryListing {}, path {}, calling list ...", ruid, path); - return listHandler.list(subject, restriction, path, null, - Range.closedOpen(0, Integer.MAX_VALUE), MINIMALLY_REQUIRED_ATTRIBUTES); - } - - protected void expandDepthFirst(Long id, PID pid, FsPath path, FileAttributes dirAttributes) - throws BulkServiceException, CacheException, InterruptedException { - checkForRequestCancellation(); - DirListTask task = new DirListTask() { - @Override - public void doList() throws Throwable { - try { - DirectoryStream stream = getDirectoryListing(path); - for (DirectoryEntry entry : stream) { - LOGGER.trace("expandDepthFirst {}, directory {}, entry {}", ruid, path, - entry.getName()); - FsPath childPath = path.child(entry.getName()); - FileAttributes childAttributes = entry.getFileAttributes(); - - switch (childAttributes.getFileType()) { - case DIR: - switch (depth) { - case ALL: - LOGGER.debug("expandDepthFirst {}, found directory {}, " - + "expand ALL.", ruid, childPath); - expandDepthFirst(null, PID.DISCOVERED, childPath, - childAttributes); - break; - case TARGETS: - switch (targetType) { - case BOTH: - case DIR: - handleDirTarget(null, PID.DISCOVERED, childPath, - childAttributes); - } - break; - } - break; - case LINK: - case REGULAR: - handleFileTarget(PID.DISCOVERED, childPath, childAttributes); - break; - case SPECIAL: - default: - LOGGER.trace("expandDepthFirst {}, cannot handle special " - + "file {}.", ruid, childPath); - break; - } - - checkForRequestCancellation(); - } - - switch (targetType) { - case BOTH: - case DIR: - handleDirTarget(id, pid, path, dirAttributes); - break; - case FILE: - /* - * Because we now store all initial targets immediately, - * we need to mark such a directory as SKIPPED; otherwise - * the request will not complete on the basis of querying for - * completed targets and finding this one unhandled. - */ - if (pid == PID.INITIAL) { - targetStore.storeOrUpdate( - toTarget(id, pid, path, Optional.of(dirAttributes), - SKIPPED, null)); - } - } - } finally { - dirExpansionCount.decrementAndGet(); - checkTransitionToDirs(); - } - } - }; - - dirExpansionCount.incrementAndGet(); - - /* - * The executor is shared among containers. To avoid total inactivity should - * the running container be starved by other containers, we allow it - * to execute on its own thread if all other threads are currently occupied. - */ - if (dirListExecutor.getWorkQueueSize() >= dirListExecutor.getMaximumPoolSize()) { - task.run(); - } else { - dirListExecutor.execute(task); - } - } - - protected List getInitialTargets() { - return targetStore.getInitialTargets(rid, true); - } - - protected boolean hasBeenCancelled(Long id, PID pid, FsPath path, FileAttributes attributes) { - synchronized (cancelledPaths) { - if (cancelledPaths.remove(path.toString())) { - BulkRequestTarget target = toTarget(id, pid, path, Optional.of(attributes), - CANCELLED, null); - try { - if (id == null) { - targetStore.store(target); - } else { - targetStore.update(target.getId(), CANCELLED, null, null); - } - } catch (BulkServiceException | UnsupportedOperationException e) { - LOGGER.error("hasBeenCancelled {}, failed for {}: {}", ruid, path, e.toString()); - } - return true; - } - } - - return false; - } - - protected void preprocessTargets() throws InterruptedException { - // NOP default - } - - protected void removeTarget(BulkRequestTarget target) { - synchronized (waiting) { - waiting.remove(target.getPath()); - } - - semaphore.release(); - - checkTransitionToDirs(); - } - - protected FsPath resolvePath(String targetPath) throws CacheException { - PnfsResolveSymlinksMessage message = new PnfsResolveSymlinksMessage(targetPath, null); - message = pnfsHandler.request(message); - return FsPath.create(message.getResolvedPath()); - } - - private void checkTransitionToDirs() { - if (dirExpansionCount.get() <= 0 && semaphore.availablePermits() == activity.getMaxPermits()) { - synchronized (this) { - if (containerState == ContainerState.WAIT) { - containerState = ContainerState.PROCESS_DIRS; - activity.getActivityExecutor().submit(this); - } - } - } - } - - protected BulkRequestTarget toTarget(Long id, PID pid, FsPath path, Optional attributes, - State state, Throwable throwable) { - String errorType = null; - String errorMessage = null; - Throwable root = null; - if (throwable != null) { - root = Throwables.getRootCause(throwable); - errorType = root.getClass().getCanonicalName(); - errorMessage = root.getMessage(); - } - - return BulkRequestTargetBuilder.builder(statistics).attributes(attributes.orElse(null)) - .activity(activity.getName()).id(id).pid(pid).rid(rid).ruid(ruid).state(state) - .createdAt(System.currentTimeMillis()).errorType(errorType) - .errorMessage(errorMessage).path(path).build(); - } - - protected abstract void handleFileTarget(PID pid, FsPath path, FileAttributes attributes) - throws InterruptedException; - - protected abstract void handleDirTarget(Long id, PID pid, FsPath path, FileAttributes attributes) - throws InterruptedException; - - protected abstract void processFileTargets() throws InterruptedException; - - protected abstract void processDirTargets() throws InterruptedException; - - protected abstract void retryFailed(BatchedResult result, FileAttributes attributes) - throws BulkStorageException; - - private void signalStateChange() { - if (callback != null) { - callback.signal(); - } - } - - private synchronized void interruptRunThread() { - if (runThread != null) { - runThread.interrupt(); - LOGGER.debug("cancel {}: container job interrupted.", ruid); - } - } - - private synchronized boolean isRunThreadInterrupted() { - return runThread != null && runThread.isInterrupted(); - } - - private synchronized void setRunThread(Thread runThread) { - this.runThread = runThread; - if (runThread != null) { - this.runThread.setUncaughtExceptionHandler(this); - } - } -} \ No newline at end of file diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java new file mode 100644 index 00000000000..53cd8198fe8 --- /dev/null +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java @@ -0,0 +1,1008 @@ +/* +COPYRIGHT STATUS: +Dec 1st 2001, Fermi National Accelerator Laboratory (FNAL) documents and +software are sponsored by the U.S. Department of Energy under Contract No. +DE-AC02-76CH03000. Therefore, the U.S. Government retains a world-wide +non-exclusive, royalty-free license to publish or reproduce these documents +and software for U.S. Government purposes. All documents and software +available from this server are private under the U.S. and Foreign +Copyright Laws, and FNAL reserves all rights. + +Distribution of the software available from this server is free of +charge subject to the user following the terms of the Fermitools +Software Legal Information. + +Redistribution and/or modification of the software shall be accompanied +by the Fermitools Software Legal Information (including the copyright +notice). + +The user is asked to feed back problems, benefits, and/or suggestions +about the software to the Fermilab Software Providers. + +Neither the name of Fermilab, the URA, nor the names of the contributors +may be used to endorse or promote products derived from this software +without specific prior written permission. + +DISCLAIMER OF LIABILITY (BSD): + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL FERMILAB, +OR THE URA, OR THE U.S. DEPARTMENT of ENERGY, OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT +OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR +BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +Liabilities of the Government: + +This software is provided by URA, independent from its Prime Contract +with the U.S. Department of Energy. URA is acting independently from +the Government and in its own private capacity and is not acting on +behalf of the U.S. Government, nor as its contractor nor its agent. +Correspondingly, it is understood and agreed that the U.S. Government +has no connection to this software and in no manner whatsoever shall +be liable for nor assume any responsibility or obligation for any claim, +cost, or damages arising out of or resulting from the use of the software +available from this server. + +Export Control: + +All documents and software available from this server are subject to U.S. +export control laws. Anyone downloading information from this server is +obligated to secure any necessary Government licenses before exporting +documents or software obtained from this server. + */ +package org.dcache.services.bulk.job; + +import static org.dcache.services.bulk.activity.BulkActivity.MINIMALLY_REQUIRED_ATTRIBUTES; +import static org.dcache.services.bulk.util.BulkRequestTarget.State.CANCELLED; +import static org.dcache.services.bulk.util.BulkRequestTarget.State.COMPLETED; +import static org.dcache.services.bulk.util.BulkRequestTarget.State.CREATED; +import static org.dcache.services.bulk.util.BulkRequestTarget.State.FAILED; +import static org.dcache.services.bulk.util.BulkRequestTarget.State.READY; +import static org.dcache.services.bulk.util.BulkRequestTarget.State.RUNNING; +import static org.dcache.services.bulk.util.BulkRequestTarget.State.SKIPPED; +import static org.dcache.services.bulk.util.BulkRequestTarget.computeFsPath; + +import com.google.common.base.Throwables; +import com.google.common.collect.Range; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import diskCacheV111.util.CacheException; +import diskCacheV111.util.FsPath; +import diskCacheV111.util.NamespaceHandlerAware; +import diskCacheV111.util.PnfsHandler; +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import javax.security.auth.Subject; +import org.dcache.auth.attributes.Restriction; +import org.dcache.cells.AbstractMessageCallback; +import org.dcache.cells.CellStub; +import org.dcache.namespace.FileType; +import org.dcache.services.bulk.BulkRequest; +import org.dcache.services.bulk.BulkRequest.Depth; +import org.dcache.services.bulk.BulkServiceException; +import org.dcache.services.bulk.BulkStorageException; +import org.dcache.services.bulk.activity.BulkActivity; +import org.dcache.services.bulk.activity.BulkActivity.TargetType; +import org.dcache.services.bulk.store.BulkTargetStore; +import org.dcache.services.bulk.util.BulkRequestTarget; +import org.dcache.services.bulk.util.BulkRequestTarget.PID; +import org.dcache.services.bulk.util.BulkRequestTarget.State; +import org.dcache.services.bulk.util.BulkRequestTargetBuilder; +import org.dcache.services.bulk.util.BulkServiceStatistics; +import org.dcache.util.CacheExceptionFactory; +import org.dcache.util.SignalAware; +import org.dcache.util.list.DirectoryEntry; +import org.dcache.util.list.DirectoryStream; +import org.dcache.util.list.ListDirectoryHandler; +import org.dcache.vehicles.FileAttributes; +import org.dcache.vehicles.PnfsGetFileAttributes; +import org.dcache.vehicles.PnfsResolveSymlinksMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Container job for a list of targets which may or may not be associated with each other via a + * common parent. It handles all file targets asynchronously, recurs if directory listing is enabled, + * and processes directory targets serially last in depth-first reverse order. + */ +public final class BulkRequestContainerJob + implements Runnable, NamespaceHandlerAware, Comparable, + UncaughtExceptionHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(BulkRequestContainerJob.class); + + private static final DirTargetSorter SORTER = new DirTargetSorter(); + + static final AtomicLong taskCounter = new AtomicLong(0L); + + public static FsPath findAbsolutePath(String prefix, String path) { + FsPath absPath = computeFsPath(null, path); + if (prefix == null) { + return absPath; + } + + FsPath pref = FsPath.create(prefix); + + if (!absPath.hasPrefix(pref)) { + absPath = computeFsPath(prefix, path); + } + + return absPath; + } + + /** + * Directories that serve as targets. These are stored in memory, sorted and processed last. + */ + static class DirTarget { + final FsPath path; + final FileAttributes attributes; + final PID pid; + final Long id; + final int depth; + + DirTarget(Long id, PID pid, FsPath path, FileAttributes attributes) { + this.id = id; + this.pid = pid; + this.attributes = attributes; + this.path = path; + depth = (int)path.toString().chars().filter(c -> c == '/').count(); + } + } + + /** + * Depth-first (descending order). + */ + static class DirTargetSorter implements Comparator { + @Override + public int compare(DirTarget o1, DirTarget o2) { + return Integer.compare(o2.depth, o1.depth); /* DESCENDING ORDER */ + } + } + + /** + * Container delays processing directory targets until the final step. + */ + enum ContainerState { + START, PROCESS_FILES, WAIT, PROCESS_DIRS, STOP + } + + /** + * Only INITIAL targets go through all three states. DISCOVERED targets + * already have their proper paths and attributes from listing. + */ + enum TaskState { + RESOLVE_PATH, FETCH_ATTRIBUTES, HANDLE_TARGET, HANDLE_DIR_TARGET + } + + /** + * Wrapper task for directory listing and target processing. + */ + abstract class ContainerTask implements Runnable { + final Consumer errorHandler = e -> uncaughtException(Thread.currentThread(), e); + final long seqNo; + + Future taskFuture; + + ContainerTask() { + seqNo = taskCounter.getAndIncrement(); + } + + public void run() { + try { + doInner(); + } catch (InterruptedException e) { + remove(); + containerState = ContainerState.STOP; + jobTarget.setErrorObject(e); + update(CANCELLED); + } catch (Throwable e) { + remove(); + errorHandler.accept(e); + Throwables.throwIfUnchecked(e); + } + } + + void cancel() { + if (taskFuture != null) { + taskFuture.cancel(true); + } + remove(); + } + + void expandDepthFirst(Long id, PID pid, FsPath path, FileAttributes dirAttributes) + throws BulkServiceException, CacheException, InterruptedException { + LOGGER.debug("{} - expandDepthFirst, {}, {}, {}, {}", ruid, id, pid, path, dirAttributes); + new DirListTask(id, pid, path, dirAttributes).submitAsync(); + } + + void submitAsync() throws InterruptedException { + checkForRequestCancellation(); + + synchronized (running) { + running.put(seqNo, this); + LOGGER.debug("{} - submitAsync {}, task count is now {}.", ruid, seqNo, running.size()); + } + taskFuture = executor.submit(this); + } + + void remove() { + synchronized (running) { + running.remove(seqNo); + LOGGER.debug("{} - remove task {}, task count now {}.", ruid, seqNo, running.size()); + } + + checkTransitionToDirs(); + } + + abstract void doInner() throws Throwable; + } + + class DirListTask extends ContainerTask { + final Long id; + final PID pid; + final FsPath path; + final FileAttributes dirAttributes; + + DirListTask(Long id, PID pid, FsPath path, FileAttributes dirAttributes) { + this.id = id; + this.pid = pid; + this.path = path; + this.dirAttributes = dirAttributes; + } + + void doInner() throws Throwable { + try { + DirectoryStream stream = getDirectoryListing(path); + for (DirectoryEntry entry : stream) { + LOGGER.debug("{} - DirListTask, directory {}, entry {}", ruid, path, + entry.getName()); + FsPath childPath = path.child(entry.getName()); + FileAttributes childAttributes = entry.getFileAttributes(); + + switch (childAttributes.getFileType()) { + case DIR: + switch (depth) { + case ALL: + expandDepthFirst(null, PID.DISCOVERED, childPath, + childAttributes); + break; + case TARGETS: + switch (targetType) { + case BOTH: + case DIR: + addDirTarget(null, PID.DISCOVERED, childPath, + childAttributes); + } + break; + } + break; + case LINK: + case REGULAR: + new TargetTask( + toTarget(null, PID.DISCOVERED, childPath, + Optional.of(childAttributes), CREATED, null), + TaskState.HANDLE_TARGET).submitAsync(); + break; + case SPECIAL: + default: + LOGGER.trace("{} - DirListTask, cannot handle special file {}.", + ruid, childPath); + break; + } + } + + switch (targetType) { + case BOTH: + case DIR: + addDirTarget(id, pid, path, dirAttributes); + break; + case FILE: + /* + * Because we now store all initial targets immediately, + * we need to mark such a directory as SKIPPED; otherwise + * the request will not complete on the basis of querying for + * completed targets and finding this one unhandled. + */ + if (pid == PID.INITIAL) { + targetStore.storeOrUpdate( + toTarget(id, pid, path, Optional.of(dirAttributes), + SKIPPED, null)); + } + } + } finally { + remove(); + } + } + + private void addDirTarget(Long id, PID pid, FsPath path, FileAttributes attributes) { + LOGGER.debug("{} - DirListTask, addDirTarget, adding directory {} ...", ruid, path); + dirs.add(new DirTarget(id, pid, path, attributes)); + } + + private DirectoryStream getDirectoryListing(FsPath path) + throws CacheException, InterruptedException { + dirListSemaphore.acquire(); + try { + LOGGER.debug("{} - DirListTask, getDirectoryListing for path {}, calling list ...", + ruid, path); + return listHandler.list(subject, restriction, path, null, + Range.closedOpen(0, Integer.MAX_VALUE), MINIMALLY_REQUIRED_ATTRIBUTES); + } finally { + dirListSemaphore.release(); + } + } + } + + class TargetTask extends ContainerTask { + + final BulkRequestTarget target; + + /* + * From activity.perform() + */ + ListenableFuture activityFuture; + + /* + * Determines the doInner() switch + */ + TaskState state; + + boolean holdingPermit; + + TargetTask(BulkRequestTarget target, TaskState initialState) { + this.target = target; + state = initialState; + } + + void cancel() { + if (target != null) { + activity.cancel(target); + } + + super.cancel(); + } + + @Override + void doInner() throws Throwable { + switch (state) { + case RESOLVE_PATH: + resolvePath(); + break; + case FETCH_ATTRIBUTES: + fetchAttributes(); + break; + case HANDLE_DIR_TARGET: + performActivity(); + break; + case HANDLE_TARGET: + default: + switch (depth) { + case NONE: + performActivity(); + break; + default: + handleTarget(); + } + break; + } + } + + @Override + void submitAsync() throws InterruptedException { + if (!holdingPermit) { + inFlightSemaphore.acquire(); + holdingPermit = true; + } + super.submitAsync(); + } + + void remove() { + super.remove(); + if (holdingPermit) { + inFlightSemaphore.release(); + holdingPermit = false; + } + } + + void performSync() throws InterruptedException { + performActivity(false); + + try { + activityFuture.get(); + } catch (ExecutionException e) { + activityFuture = Futures.immediateFailedFuture(e.getCause()); + } + + handleCompletion(); + } + + /** + * (1) symlink resolution on initial targets; bypassed for discovered targets. + */ + private void resolvePath() throws InterruptedException { + LOGGER.debug("{} - resolvePath, resolving {}", ruid, target.getPath()); + PnfsResolveSymlinksMessage message = new PnfsResolveSymlinksMessage( + target.getPath().toString(), null); + ListenableFuture requestFuture = pnfsHandler.requestAsync( + message); + CellStub.addCallback(requestFuture, new AbstractMessageCallback<>() { + @Override + public void success(PnfsResolveSymlinksMessage message) { + LOGGER.debug("{} - resolvePath {}, callback success.", ruid, target.getPath()); + FsPath path = FsPath.create(message.getResolvedPath()); + if (targetPrefix != null && !path.contains(targetPrefix)) { + path = computeFsPath(targetPrefix, path.toString()); + } + LOGGER.debug("{} - resolvePath, resolved path {}", ruid, path); + target.setPath(path); + state = TaskState.FETCH_ATTRIBUTES; + taskFuture = executor.submit(TargetTask.this); + } + + @Override + public void failure(int rc, Object error) { + LOGGER.error("{} - resolvePath, callback failure for {}.", ruid, target); + try { + storeOrUpdate(CacheExceptionFactory.exceptionOf( + rc, Objects.toString(error, null))); + } catch (InterruptedException e) { + errorHandler.accept(e); + } finally { + remove(); + } + } + }, MoreExecutors.directExecutor()); + } + + /** + * (2) retrieval of required file attributes. + */ + private void fetchAttributes() throws InterruptedException { + LOGGER.debug("{} - fetchAttributes for path {}", ruid, target.getPath()); + PnfsGetFileAttributes message = new PnfsGetFileAttributes(target.getPath().toString(), + MINIMALLY_REQUIRED_ATTRIBUTES); + ListenableFuture requestFuture = pnfsHandler.requestAsync(message); + CellStub.addCallback(requestFuture, new AbstractMessageCallback<>() { + @Override + public void success(PnfsGetFileAttributes message) { + LOGGER.debug("{} - fetchAttributes for path {}, callback success.", + ruid, target.getPath()); + FileAttributes attributes = message.getFileAttributes(); + target.setAttributes(attributes); + state = TaskState.HANDLE_TARGET; + taskFuture = executor.submit(TargetTask.this); + } + + @Override + public void failure(int rc, Object error) { + LOGGER.error("{} - fetchAttributes, callback failure for {}.", ruid, target); + try { + storeOrUpdate(CacheExceptionFactory.exceptionOf( + rc, Objects.toString(error, null))); + } catch (InterruptedException e) { + errorHandler.accept(e); + } finally { + remove(); + } + } + }, MoreExecutors.directExecutor()); + } + + /** + * (3b) either recurs on directory or performs activity on file. + */ + private void handleTarget() throws InterruptedException { + LOGGER.debug("{} - handleTarget for {}, path {}.", ruid, target.getActivity(), + target.getPath()); + FileAttributes attributes = target.getAttributes(); + FileType type = attributes.getFileType(); + try { + if (type == FileType.DIR) { + storeOrUpdate(null); + expandDepthFirst(target.getId(), target.getPid(), target.getPath(), attributes); + /* + * Swap out for the directory listing task. + * (We must do this AFTER the directory task has been added to running.) + */ + remove(); + } else if (type != FileType.SPECIAL) { + performActivity(); + } + } catch (BulkServiceException | CacheException e) { + LOGGER.error("handleTarget {}, path {}, error {}.", ruid, target.getPath(), + e.getMessage()); + storeOrUpdate(e); + } + } + + /** + * (3a) Performs activity on either file or directory target. + */ + private void performActivity() throws InterruptedException { + performActivity(true); + } + + private void performActivity(boolean async) throws InterruptedException { + Long id = target.getId(); + FsPath path = target.getPath(); + FileAttributes attributes = target.getAttributes(); + LOGGER.debug("{} - performActivity {} on {}.", ruid, activity, path); + + storeOrUpdate(null); + + if (hasBeenCancelled(this)) { + LOGGER.debug("{} - performActivity hasBeenCancelled for {}.", ruid, path); + remove(); + } + + try { + activityFuture = activity.perform(ruid, id == null ? seqNo : id, path, attributes); + if (async) { + activityFuture.addListener(() -> handleCompletion(), executor); + } + } catch (BulkServiceException | UnsupportedOperationException e) { + LOGGER.error("{}, perform failed for {}: {}", ruid, target, e.getMessage()); + activityFuture = Futures.immediateFailedFuture(e); + if (async) { + handleCompletion(); + } + } + } + + private void handleCompletion() { + LOGGER.debug("{} - handleCompletion {}", ruid, target.getPath()); + + State state = RUNNING; + try { + activity.handleCompletion(target, activityFuture); + state = target.getState(); + + if (state == FAILED && activity.getRetryPolicy().shouldRetry(target)) { + retryFailed(); + return; + } + + targetStore.update(target.getId(), state, target.getErrorType(), + target.getErrorMessage()); + } catch (BulkStorageException e) { + LOGGER.error("{}, could not store target from result {}, {}, {}: {}.", ruid, + target.getId(), target.getPath(), target.getAttributes(), e.toString()); + } + + if (state == FAILED && request.isCancelOnFailure()) { + cancel(); + } else { + remove(); + } + } + + private void retryFailed() throws BulkStorageException { + LOGGER.debug("{} - retryFailed {}.", ruid, target); + target.resetToReady(); + try { + performActivity(); + } catch (InterruptedException e) { + LOGGER.debug("{}. retryFailed {}, interrupted.", ruid, target); + activityFuture = Futures.immediateFailedFuture(e); + handleCompletion(); + } + } + + private void storeOrUpdate(Throwable error) throws InterruptedException { + LOGGER.debug("{} - storeOrUpdate {}.", ruid, target); + + if (hasBeenCancelled(this)) { + LOGGER.debug("{} - storeOrUpdate, hasBeenCancelled {}.", ruid, target.getPath()); + return; + } + + target.setState(error == null ? RUNNING : FAILED); + target.setErrorObject(error); + + try { + /* + * If this is an insert (id == null), the target id will be updated to what is + * returned from the database. + */ + targetStore.storeOrUpdate(target); + LOGGER.debug("{} - storeOrUpdate, target id {}", ruid, target.getId()); + } catch (BulkStorageException e) { + LOGGER.error("{}, could not store or update target from result {}, {}, {}: {}.", ruid, + target.getId(), target.getPath(), target.getAttributes(), e.toString()); + error = e; + } + + if (error != null) { + remove(); + } + } + } + + private final BulkRequest request; + private final BulkActivity activity; + private final Long rid; + private final String ruid; + private final Depth depth; + private final String targetPrefix; + private final BulkServiceStatistics statistics; + private final TargetType targetType; + private final BulkRequestTarget jobTarget; + private final Subject subject; + private final Restriction restriction; + + private final Map running; + private final Set cancelledPaths; + private final Queue dirs; + + private BulkTargetStore targetStore; + private PnfsHandler pnfsHandler; + private ListDirectoryHandler listHandler; + private SignalAware callback; + private Thread runThread; + private ExecutorService executor; + private Semaphore dirListSemaphore; + private Semaphore inFlightSemaphore; + + private volatile ContainerState containerState; + + public BulkRequestContainerJob(BulkActivity activity, BulkRequestTarget jobTarget, + BulkRequest request, BulkServiceStatistics statistics) { + this.request = request; + this.activity = activity; + this.jobTarget = jobTarget; + this.subject = activity.getSubject(); + this.restriction = activity.getRestriction(); + this.statistics = statistics; + + rid = request.getId(); + ruid = request.getUid(); + depth = request.getExpandDirectories(); + targetPrefix = request.getTargetPrefix(); + targetType = activity.getTargetType(); + + running = new HashMap<>(); + cancelledPaths = new HashSet<>(); + dirs = new ConcurrentLinkedQueue<>(); + + containerState = ContainerState.START; + } + + public void cancel() { + containerState = ContainerState.STOP; + + jobTarget.cancel(); + + LOGGER.debug("{} - cancel: target state is now {}.", ruid, jobTarget.getState()); + + interruptRunThread(); + + synchronized (running) { + LOGGER.debug("{} - cancel: running {}.", ruid, running.size()); + running.values().forEach(ContainerTask::cancel); + LOGGER.debug("{} - cancel: running targets cancelled.", ruid); + running.clear(); + } + + LOGGER.debug("{} - cancel: calling cancel all on target store.", ruid); + targetStore.cancelAll(rid); + + signalStateChange(); + } + + public void cancel(long targetId) { + synchronized (running) { + for (Iterator i = running.values().iterator(); i.hasNext(); ) { + ContainerTask task = i.next(); + if (task instanceof TargetTask + && targetId == ((TargetTask) task).target.getId()) { + task.cancel(); + i.remove(); + break; + } + } + } + + try { + targetStore.update(targetId, CANCELLED, null, null); + } catch (BulkStorageException e) { + LOGGER.error("Failed to cancel {}::{}: {}.", ruid, targetId, e.toString()); + } + } + + public void cancel(String targetPath) { + LOGGER.debug("{} - cancel path {}.", ruid, targetPath); + FsPath toCancel = findAbsolutePath(targetPrefix, targetPath); + + Optional found; + + synchronized (running) { + found = running.values().stream().filter(TargetTask.class::isInstance) + .map(TargetTask.class::cast).filter(t -> t.target.getPath().equals(toCancel)) + .findAny(); + } + + if (found.isPresent()) { + cancel(found.get().target.getId()); + } else { + synchronized (cancelledPaths) { + cancelledPaths.add(toCancel); + } + } + } + + @Override + public int compareTo(BulkRequestContainerJob other) { + return jobTarget.getKey().compareTo(other.jobTarget.getKey()); + } + + public BulkActivity getActivity() { + return activity; + } + + public BulkRequestTarget getTarget() { + return jobTarget; + } + + public void initialize() { + LOGGER.trace("BulkRequestContainerJob {}, initialize() called ...", jobTarget.getKey()); + jobTarget.setState(READY); + containerState = ContainerState.PROCESS_FILES; + } + + public synchronized boolean isReady() { + switch (jobTarget.getState()) { + case READY: + case CREATED: + return true; + default: + return false; + } + } + + @Override + public void run() { + setRunThread(Thread.currentThread()); + try { + checkForRequestCancellation(); + switch (containerState) { + case PROCESS_FILES: + LOGGER.debug("{} - run: PROCESS FILES", ruid); + processFileTargets(); + containerState = ContainerState.WAIT; + break; + case PROCESS_DIRS: + LOGGER.debug("{} - run: PROCESS DIRS", ruid); + processDirTargets(); + containerState = ContainerState.STOP; + update(COMPLETED); + break; + case STOP: + LOGGER.debug("{} - run: prematurely stopped; exiting", ruid); + update(CANCELLED); + setRunThread(null); + return; + default: + throw new RuntimeException( + "run container called with container in wrong state " + containerState + + "; this is a bug."); + } + } catch (InterruptedException e) { + LOGGER.debug("{} - run: interrupted", ruid); + /* + * If the state has not already been set to terminal, do so. + */ + containerState = ContainerState.STOP; + update(CANCELLED); + } + setRunThread(null); + checkTransitionToDirs(); + } + + public void setDirListSemaphore(Semaphore dirListSemaphore) { + this.dirListSemaphore = dirListSemaphore; + } + + public void setInFlightSemaphore(Semaphore inFlightSemaphore) { + this.inFlightSemaphore = inFlightSemaphore; + } + + public void setListHandler(ListDirectoryHandler listHandler) { + this.listHandler = listHandler; + } + + public void setExecutor(ExecutorService executor) { + this.executor = executor; + } + + public void setNamespaceHandler(PnfsHandler pnfsHandler) { + this.pnfsHandler = pnfsHandler; + } + + public void setTargetStore(BulkTargetStore targetStore) { + this.targetStore = targetStore; + } + + public void setCallback(SignalAware callback) { + this.callback = callback; + } + + @Override + public void uncaughtException(Thread t, Throwable e) { + /* + * Won't leave the request in non-terminal state in case of uncaught exception. + * We also try to handle uncaught exceptions here, so as not to kill the + * manager thread. + */ + containerState = ContainerState.STOP; + jobTarget.setErrorObject(e); + update(FAILED); + ThreadGroup group = t.getThreadGroup(); + if (group != null) { + group.uncaughtException(t, e); + } else { + LOGGER.error("Uncaught exception: please report to team@dcache.org", e); + } + } + + public void update(State state) { + if (jobTarget.setState(state)) { + try { + targetStore.update(jobTarget.getId(), jobTarget.getState(), jobTarget.getErrorType(), + jobTarget.getErrorMessage()); + } catch (BulkStorageException e) { + LOGGER.error("{}, updateJobState: {}", ruid, e.toString()); + } + signalStateChange(); + } + } + + private void checkForRequestCancellation() throws InterruptedException { + if (isRunThreadInterrupted() || containerState == ContainerState.STOP + || jobTarget.isTerminated()) { + throw new InterruptedException(); + } + } + + private void checkTransitionToDirs() { + synchronized (running) { + if (!running.isEmpty()) { + LOGGER.debug("{} - checkTransitionToDirs, running {}", ruid, running.size()); + return; + } + } + + synchronized (this) { + if (containerState == ContainerState.WAIT) { + containerState = ContainerState.PROCESS_DIRS; + executor.submit(this); + } + } + } + + private boolean hasBeenCancelled(TargetTask task) { + synchronized (cancelledPaths) { + BulkRequestTarget target = task.target; + if (cancelledPaths.remove(target.getPath().toString())) { + target = toTarget(target.getId(), target.getPid(), target.getPath(), + Optional.of(target.getAttributes()), CANCELLED, null); + try { + if (target.getId() == null) { + targetStore.store(target); + } else { + targetStore.update(target.getId(), CANCELLED, null, null); + } + } catch (BulkServiceException | UnsupportedOperationException e) { + LOGGER.error("hasBeenCancelled {}, failed for {}: {}", ruid, target.getPath(), + e.toString()); + } + return true; + } + } + + return false; + } + + private synchronized void interruptRunThread() { + if (runThread != null) { + runThread.interrupt(); + LOGGER.debug("{} - container job interrupted.", ruid); + } + } + + private synchronized boolean isRunThreadInterrupted() { + return runThread != null && runThread.isInterrupted(); + } + + private void processDirTargets() throws InterruptedException { + if (dirs.isEmpty()) { + LOGGER.debug("{} - processDirTargets, nothing to do.", ruid); + return; + } + + LOGGER.debug("{} - processDirTargets, size {}.", ruid, dirs.size()); + + DirTarget[] sorted = dirs.toArray(new DirTarget[0]); + Arrays.sort(sorted, SORTER); + + /* + * Process serially in this thread + */ + for (DirTarget dirTarget : sorted) { + new TargetTask(toTarget(dirTarget.id, dirTarget.pid, dirTarget.path, + Optional.of(dirTarget.attributes), CREATED, null), + TaskState.HANDLE_DIR_TARGET).performSync(); + } + } + + private void processFileTargets() throws InterruptedException { + List requestTargets = targetStore.getInitialTargets(rid, true); + + LOGGER.debug("{} - processFileTargets, initial size {}.", ruid, requestTargets.size()); + + if (requestTargets.isEmpty()) { + LOGGER.error("{} - processFileTargets, no initial targets!.", ruid); + containerState = ContainerState.STOP; + update(FAILED); + return; + } + + for (BulkRequestTarget target : requestTargets) { + new TargetTask(target, TaskState.RESOLVE_PATH).submitAsync(); + } + } + + private synchronized void setRunThread(Thread runThread) { + this.runThread = runThread; + if (runThread != null) { + this.runThread.setUncaughtExceptionHandler(this); + } + } + + private void signalStateChange() { + if (callback != null) { + callback.signal(); + } + } + + private BulkRequestTarget toTarget(Long id, PID pid, FsPath path, + Optional attributes, State state, Throwable throwable) { + String errorType = null; + String errorMessage = null; + Throwable root; + if (throwable != null) { + root = Throwables.getRootCause(throwable); + errorType = root.getClass().getCanonicalName(); + errorMessage = root.getMessage(); + } + + return BulkRequestTargetBuilder.builder(statistics).attributes(attributes.orElse(null)) + .activity(activity.getName()).id(id).pid(pid).rid(rid).ruid(ruid).state(state) + .createdAt(System.currentTimeMillis()).errorType(errorType) + .errorMessage(errorMessage).path(path).build(); + } +} \ No newline at end of file diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/RequestContainerJob.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/RequestContainerJob.java deleted file mode 100644 index 9f5909e59a8..00000000000 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/RequestContainerJob.java +++ /dev/null @@ -1,312 +0,0 @@ -/* -COPYRIGHT STATUS: -Dec 1st 2001, Fermi National Accelerator Laboratory (FNAL) documents and -software are sponsored by the U.S. Department of Energy under Contract No. -DE-AC02-76CH03000. Therefore, the U.S. Government retains a world-wide -non-exclusive, royalty-free license to publish or reproduce these documents -and software for U.S. Government purposes. All documents and software -available from this server are protected under the U.S. and Foreign -Copyright Laws, and FNAL reserves all rights. - -Distribution of the software available from this server is free of -charge subject to the user following the terms of the Fermitools -Software Legal Information. - -Redistribution and/or modification of the software shall be accompanied -by the Fermitools Software Legal Information (including the copyright -notice). - -The user is asked to feed back problems, benefits, and/or suggestions -about the software to the Fermilab Software Providers. - -Neither the name of Fermilab, the URA, nor the names of the contributors -may be used to endorse or promote products derived from this software -without specific prior written permission. - -DISCLAIMER OF LIABILITY (BSD): - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS -FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL FERMILAB, -OR THE URA, OR THE U.S. DEPARTMENT of ENERGY, OR CONTRIBUTORS BE LIABLE -FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT -OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR -BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING -NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -Liabilities of the Government: - -This software is provided by URA, independent from its Prime Contract -with the U.S. Department of Energy. URA is acting independently from -the Government and in its own private capacity and is not acting on -behalf of the U.S. Government, nor as its contractor nor its agent. -Correspondingly, it is understood and agreed that the U.S. Government -has no connection to this software and in no manner whatsoever shall -be liable for nor assume any responsibility or obligation for any claim, -cost, or damages arising out of or resulting from the use of the software -available from this server. - -Export Control: - -All documents and software available from this server are subject to U.S. -export control laws. Anyone downloading information from this server is -obligated to secure any necessary Government licenses before exporting -documents or software obtained from this server. - */ -package org.dcache.services.bulk.job; - -import static org.dcache.services.bulk.activity.BulkActivity.MINIMALLY_REQUIRED_ATTRIBUTES; -import static org.dcache.services.bulk.util.BulkRequestTarget.State.FAILED; -import static org.dcache.services.bulk.util.BulkRequestTarget.State.RUNNING; -import static org.dcache.services.bulk.util.BulkRequestTarget.computeFsPath; - -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import diskCacheV111.util.CacheException; -import diskCacheV111.util.FsPath; -import java.util.Arrays; -import java.util.Comparator; -import java.util.List; -import java.util.Optional; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import org.dcache.namespace.FileType; -import org.dcache.services.bulk.BulkRequest; -import org.dcache.services.bulk.BulkServiceException; -import org.dcache.services.bulk.BulkStorageException; -import org.dcache.services.bulk.activity.BulkActivity; -import org.dcache.services.bulk.util.BatchedResult; -import org.dcache.services.bulk.util.BulkRequestTarget; -import org.dcache.services.bulk.util.BulkRequestTarget.PID; -import org.dcache.services.bulk.util.BulkRequestTarget.State; -import org.dcache.services.bulk.util.BulkServiceStatistics; -import org.dcache.vehicles.FileAttributes; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This version of the container does no preprocessing, storing the targets as they go live. It thus - * offers a much faster pathway toward target completion with potentially greater throughput. - */ -public final class RequestContainerJob extends AbstractRequestContainerJob { - - private static final Logger LOGGER = LoggerFactory.getLogger(RequestContainerJob.class); - - private static final DirTargetSorter SORTER = new DirTargetSorter(); - - static class DirTarget { - final FsPath path; - final FileAttributes attributes; - final PID pid; - final Long id; - final int depth; - - DirTarget(Long id, PID pid, FsPath path, FileAttributes attributes) { - this.id = id; - this.pid = pid; - this.attributes = attributes; - this.path = path; - depth = (int)path.toString().chars().filter(c -> c == '/').count(); - } - } - - static class DirTargetSorter implements Comparator { - @Override - public int compare(DirTarget o1, DirTarget o2) { - return Integer.compare(o2.depth, o1.depth); /* DESCENDING ORDER */ - } - } - - private final Queue dirs; - - public RequestContainerJob(BulkActivity activity, BulkRequestTarget target, - BulkRequest request, BulkServiceStatistics statistics) { - super(activity, target, request, statistics); - dirs = new ConcurrentLinkedQueue<>(); - } - - @Override - protected void processFileTargets() throws InterruptedException { - List requestTargets = getInitialTargets(); - - if (requestTargets.isEmpty()) { - containerState = ContainerState.STOP; - update(FAILED); - return; - } - - for (BulkRequestTarget tgt : requestTargets) { - checkForRequestCancellation(); - Long id = tgt.getId(); - try { - FsPath path = resolvePath(tgt.getPath().toString()); - if (targetPrefix != null && !path.contains(targetPrefix)) { - path = computeFsPath(targetPrefix, tgt.getPath().toString()); - } - - switch (depth) { - case NONE: - perform(id, PID.INITIAL, path, null); - break; - default: - handleTarget(id, PID.INITIAL, path); - } - } catch (CacheException e) { - LOGGER.error("problem handling target {}: {}.", tgt, e.toString()); - tgt.setState(FAILED); - tgt.setErrorObject(e); - try { - targetStore.storeOrUpdate(tgt); - } catch (BulkStorageException ex) { - LOGGER.error("processFileTargets {}, path {}, could not store, error {}.", ruid, - tgt.getPath(), - ex.getMessage()); - } - } - } - } - - @Override - protected void processDirTargets() throws InterruptedException { - DirTarget[] sorted = dirs.toArray(new DirTarget[0]); - Arrays.sort(sorted, SORTER); - for (DirTarget dirTarget : sorted) { - checkForRequestCancellation(); - perform(dirTarget.id, dirTarget.pid, dirTarget.path, dirTarget.attributes); - } - } - - @Override - protected void handleDirTarget(Long id, PID pid, FsPath path, FileAttributes attributes) { - dirs.add(new DirTarget(id, pid, path, attributes)); - } - - @Override - protected void handleFileTarget(PID pid, FsPath path, FileAttributes attributes) - throws InterruptedException { - perform(null, pid, path, attributes); - } - - @Override - protected void retryFailed(BatchedResult result, FileAttributes attributes) - throws BulkStorageException { - BulkRequestTarget completedTarget = result.getTarget(); - Long id = completedTarget.getId(); - FsPath path = completedTarget.getPath(); - PID pid = completedTarget.getPid(); - completedTarget.resetToReady(); - try { - perform(id, pid, path, attributes); - } catch (InterruptedException e) { - LOGGER.debug("{}. retryFailed interrupted", ruid); - targetStore.update(result.getTarget().getId(), FAILED, - InterruptedException.class.getCanonicalName(), - "retryFailed interrupted for " + ruid); - } - } - - private void handleCompletion(BatchedResult result, FileAttributes attributes) { - activity.handleCompletion(result); - - BulkRequestTarget completedTarget = result.getTarget(); - State state = completedTarget.getState(); - - try { - if (state == FAILED && activity.getRetryPolicy().shouldRetry(completedTarget)) { - retryFailed(result, attributes); - return; - } - - targetStore.update(completedTarget.getId(), state, completedTarget.getErrorType(), - completedTarget.getErrorMessage()); - } catch (BulkStorageException e) { - LOGGER.error("{} could not store target from result: {}, {}: {}.", ruid, result, - attributes, e.toString()); - } - - removeTarget(completedTarget); /* RELEASES SEMAPHORE */ - - if (state == FAILED && request.isCancelOnFailure()) { - cancel(); - } - } - - private void handleTarget(Long id, PID pid, FsPath path) throws InterruptedException { - checkForRequestCancellation(); - FileAttributes attributes = null; - LOGGER.debug("handleTarget {}, path {}.", ruid, path); - try { - attributes = pnfsHandler.getFileAttributes(path, MINIMALLY_REQUIRED_ATTRIBUTES); - if (attributes.getFileType() == FileType.DIR) { - expandDepthFirst(id, pid, path, attributes); - } else if (attributes.getFileType() != FileType.SPECIAL) { - perform(id, pid, path, attributes); - } - } catch (BulkServiceException | CacheException e) { - LOGGER.error("handleTarget {}, path {}, error {}.", ruid, path, e.getMessage()); - register(id, pid, path, Futures.immediateFailedFuture(e), attributes, e); - } - } - - private ListenableFuture perform(Long id, PID pid, FsPath path, FileAttributes attributes) - throws InterruptedException { - checkForRequestCancellation(); - - if (hasBeenCancelled(id, pid, path, attributes)) { - return Futures.immediateCancelledFuture(); - } - - semaphore.acquire(); - - ListenableFuture future; - try { - future = activity.perform(ruid, id == null ? this.id.getAndIncrement() : id, path, - attributes); - } catch (BulkServiceException | UnsupportedOperationException e) { - LOGGER.error("{}, perform failed for {}: {}", ruid, path, e.getMessage()); - future = Futures.immediateFailedFuture(e); - register(id, pid, path, future, attributes, e); - return future; - } - - register(id, pid, path, future, attributes, null); - return future; - } - - private void register(Long id, PID pid, FsPath path, ListenableFuture future, FileAttributes attributes, - Throwable error) throws InterruptedException { - checkForRequestCancellation(); - - if (hasBeenCancelled(id, pid, path, attributes)) { - return; - } - - BulkRequestTarget target = toTarget(id, pid, path, Optional.ofNullable(attributes), - error == null ? RUNNING : FAILED, error); - - BatchedResult result = new BatchedResult(target, future); - - if (error == null) { - try { - /* - * If this is an insert (id == null), the target id will be updated to what is - * returned from the database. - */ - targetStore.storeOrUpdate(target); - } catch (BulkStorageException e) { - LOGGER.error("{}, could not store target from result {}, {}, {}: {}.", ruid, result, - attributes, e.toString()); - } - } - - synchronized (waiting) { - waiting.put(path, result); - future.addListener(() -> handleCompletion(result, attributes), - activity.getCallbackExecutor()); - } - } -} \ No newline at end of file diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/RequestContainerJobFactory.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/RequestContainerJobFactory.java index 12b85f7609a..28a997e5160 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/RequestContainerJobFactory.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/RequestContainerJobFactory.java @@ -64,6 +64,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import diskCacheV111.util.PnfsHandler; import java.util.Optional; +import java.util.concurrent.Semaphore; import javax.security.auth.Subject; import org.dcache.auth.attributes.Restriction; import org.dcache.cells.CellStub; @@ -79,7 +80,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import org.dcache.services.bulk.util.BulkRequestTarget.PID; import org.dcache.services.bulk.util.BulkRequestTargetBuilder; import org.dcache.services.bulk.util.BulkServiceStatistics; -import org.dcache.util.BoundedExecutor; import org.dcache.util.list.ListDirectoryHandler; import org.dcache.vehicles.FileAttributes; import org.slf4j.Logger; @@ -100,9 +100,10 @@ public final class RequestContainerJobFactory { private ListDirectoryHandler listHandler; private BulkTargetStore targetStore; private BulkServiceStatistics statistics; - private BoundedExecutor dirListExecutor; + private Semaphore dirListSemaphore; + private Semaphore inFlightSemaphore; - public AbstractRequestContainerJob createRequestJob(BulkRequest request) + public BulkRequestContainerJob createRequestJob(BulkRequest request) throws BulkServiceException { String rid = request.getUid(); LOGGER.trace("createRequestJob {}", rid); @@ -124,29 +125,43 @@ public AbstractRequestContainerJob createRequestJob(BulkRequest request) pnfsHandler.setSubject(activity.getSubject()); LOGGER.trace("createRequestJob {}, creating batch request job.", request.getUid()); - AbstractRequestContainerJob containerJob - = new RequestContainerJob(activity, target, request, statistics); + BulkRequestContainerJob containerJob + = new BulkRequestContainerJob(activity, target, request, statistics); containerJob.setNamespaceHandler(pnfsHandler); containerJob.setTargetStore(targetStore); containerJob.setListHandler(listHandler); - containerJob.setDirListExecutor(dirListExecutor); + containerJob.setDirListSemaphore(dirListSemaphore); + containerJob.setInFlightSemaphore(inFlightSemaphore); containerJob.initialize(); return containerJob; } + public int getDirListSemaphoreAvailable() { + return dirListSemaphore.availablePermits(); + } + + public int getInFlightSemaphoreAvailable() { + return inFlightSemaphore.availablePermits(); + } + @Required public void setActivityFactory(BulkActivityFactory activityFactory) { this.activityFactory = activityFactory; } @Required - public void setDirListExecutor(BoundedExecutor dirListExecutor) { - this.dirListExecutor = dirListExecutor; + public void setListHandler(ListDirectoryHandler listHandler) { + this.listHandler = listHandler; } @Required - public void setListHandler(ListDirectoryHandler listHandler) { - this.listHandler = listHandler; + public void setDirListSemaphore(int permits) { + dirListSemaphore = new Semaphore(permits); + } + + @Required + public void setInFlightSemaphore(int permits) { + inFlightSemaphore = new Semaphore(permits); } @Required diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/BulkRequestManager.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/BulkRequestManager.java index 6bdf4fa40c8..fae5b4acf59 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/BulkRequestManager.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/BulkRequestManager.java @@ -60,7 +60,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING package org.dcache.services.bulk.manager; import java.util.List; -import org.dcache.services.bulk.job.AbstractRequestContainerJob; +import org.dcache.services.bulk.job.BulkRequestContainerJob; import org.dcache.services.bulk.util.BulkRequestTarget; import org.dcache.util.SignalAware; @@ -104,7 +104,7 @@ public interface BulkRequestManager extends SignalAware { * * @param job to be submitted to the queue. */ - void submit(AbstractRequestContainerJob job); + void submit(BulkRequestContainerJob job); /** * @param maxActiveRequests max number of requests that can be in the active state at a given diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java index 168b979d7da..7179b18d0df 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java @@ -72,6 +72,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -83,7 +84,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import org.dcache.services.bulk.BulkStorageException; import org.dcache.services.bulk.handler.BulkRequestCompletionHandler; import org.dcache.services.bulk.handler.BulkSubmissionHandler; -import org.dcache.services.bulk.job.AbstractRequestContainerJob; +import org.dcache.services.bulk.job.BulkRequestContainerJob; import org.dcache.services.bulk.manager.scheduler.BulkRequestScheduler; import org.dcache.services.bulk.manager.scheduler.BulkSchedulerProvider; import org.dcache.services.bulk.store.BulkRequestStore; @@ -168,15 +169,15 @@ private void await() throws InterruptedException { private void broadcastTargetCancel() { List toCancel; - synchronized (cancelledJobs) { - toCancel = cancelledJobs.stream().collect(Collectors.toList()); - cancelledJobs.clear(); + synchronized (cancelledTargets) { + toCancel = cancelledTargets.stream().collect(Collectors.toList()); + cancelledTargets.clear(); } synchronized (requestJobs) { toCancel.forEach(key -> { String[] ridId = BulkRequestTarget.parse(key); - AbstractRequestContainerJob job = requestJobs.get(ridId[0]); + BulkRequestContainerJob job = requestJobs.get(ridId[0]); if (job != null) { long id = Long.valueOf(ridId[1]); job.cancel(id); @@ -216,7 +217,7 @@ private void doRun() throws InterruptedException { @GuardedBy("requestJobs") private boolean isTerminated(String rid) { - AbstractRequestContainerJob job = requestJobs.get(rid); + BulkRequestContainerJob job = requestJobs.get(rid); if (job != null && job.getTarget().getState() == State.CANCELLED) { return completionHandler.checkTerminated(rid, true); } @@ -245,7 +246,7 @@ private void processNextRequests() { * immediately started. */ synchronized (requestJobs) { - requestJobs.values().stream().filter(AbstractRequestContainerJob::isReady) + requestJobs.values().stream().filter(BulkRequestContainerJob::isReady) .forEach(ConcurrentRequestManager.this::startJob); } } @@ -308,6 +309,11 @@ private ListMultimap userRequests() { */ private ExecutorService processorExecutorService; + /** + * Thread dedicated to jobs. + */ + private ExecutorService pooledExecutorService; + /** * Records number of jobs and requests processed. */ @@ -327,12 +333,12 @@ private ListMultimap userRequests() { /** * Held for the lifetime of the container run. */ - private Map requestJobs; + private Map requestJobs; /** * Ids of jobs cancelled individually. To avoid doing cancellation on the calling thread. */ - private Collection cancelledJobs; + private Collection cancelledTargets; /** * Handles the promotion of jobs to running. @@ -347,16 +353,18 @@ private ListMultimap userRequests() { @Override public void initialize() throws Exception { requestJobs = new LinkedHashMap<>(); - cancelledJobs = new HashSet<>(); + cancelledTargets = new HashSet<>(); schedulerProvider.initialize(); processor = new ConcurrentRequestProcessor(schedulerProvider.getRequestScheduler()); + processorExecutorService = Executors.newSingleThreadScheduledExecutor(); + pooledExecutorService = Executors.newCachedThreadPool(); processorFuture = processorExecutorService.submit(processor); } @Override public void cancel(BulkRequestTarget target) { - synchronized (cancelledJobs) { - cancelledJobs.add(target.getKey()); + synchronized (cancelledTargets) { + cancelledTargets.add(target.getKey()); } processor.signal(); } @@ -364,7 +372,7 @@ public void cancel(BulkRequestTarget target) { @Override public boolean cancelRequest(String requestId) { synchronized (requestJobs) { - AbstractRequestContainerJob job = requestJobs.get(requestId); + BulkRequestContainerJob job = requestJobs.get(requestId); if (job != null) { job.cancel(); processor.signal(); @@ -380,7 +388,7 @@ public boolean cancelRequest(String requestId) { @Override public void cancelTargets(String id, List targetPaths) { synchronized (requestJobs) { - AbstractRequestContainerJob job = requestJobs.get(id); + BulkRequestContainerJob job = requestJobs.get(id); if (job != null) { targetPaths.forEach(job::cancel); LOGGER.trace("{} request targets cancelled for {}.", targetPaths.size(), id); @@ -398,7 +406,7 @@ public void shutdown() throws Exception { processorFuture.cancel(true); } requestJobs = null; - cancelledJobs = null; + cancelledTargets = null; requestStore.clearCache(); } @@ -427,11 +435,6 @@ public void setMaxActiveRequests(int maxActiveRequests) { this.maxActiveRequests = maxActiveRequests; } - @Required - public void setProcessorExecutorService(ExecutorService processorExecutorService) { - this.processorExecutorService = processorExecutorService; - } - @Required public void setRequestStore(BulkRequestStore requestStore) { this.requestStore = requestStore; @@ -474,7 +477,7 @@ public void signal() { } @Override - public void submit(AbstractRequestContainerJob job) { + public void submit(BulkRequestContainerJob job) { synchronized (requestJobs) { requestJobs.put(job.getTarget().getRuid(), job); } @@ -511,16 +514,16 @@ void activateRequest(BulkRequest request) { } } - void startJob(AbstractRequestContainerJob job) { + void startJob(BulkRequestContainerJob job) { String key = job.getTarget().getKey(); - long id = job.getTarget().getId(); LOGGER.trace("submitting job {} to executor, target {}.", key, job.getTarget()); + job.setExecutor(pooledExecutorService); job.setCallback(this); try { if (isJobValid(job)) { /* possibly cancelled in flight */ job.update(State.RUNNING); - job.getActivity().getActivityExecutor().submit(new FireAndForgetTask(job)); + pooledExecutorService.submit(new FireAndForgetTask(job)); } } catch (RuntimeException e) { job.getTarget().setErrorObject(e); @@ -535,7 +538,7 @@ void startJob(AbstractRequestContainerJob job) { * This is here mostly in order to catch jobs which have changed state on the fly * during cancellation. It is only called by the processor thread (when it invokes startJob). */ - private boolean isJobValid(AbstractRequestContainerJob job) { + private boolean isJobValid(BulkRequestContainerJob job) { BulkRequestTarget target = job.getTarget(); if (target.isTerminated()) { diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BatchedResult.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BatchedResult.java deleted file mode 100644 index fc3cf9fb385..00000000000 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BatchedResult.java +++ /dev/null @@ -1,91 +0,0 @@ -/* -COPYRIGHT STATUS: -Dec 1st 2001, Fermi National Accelerator Laboratory (FNAL) documents and -software are sponsored by the U.S. Department of Energy under Contract No. -DE-AC02-76CH03000. Therefore, the U.S. Government retains a world-wide -non-exclusive, royalty-free license to publish or reproduce these documents -and software for U.S. Government purposes. All documents and software -available from this server are protected under the U.S. and Foreign -Copyright Laws, and FNAL reserves all rights. - -Distribution of the software available from this server is free of -charge subject to the user following the terms of the Fermitools -Software Legal Information. - -Redistribution and/or modification of the software shall be accompanied -by the Fermitools Software Legal Information (including the copyright -notice). - -The user is asked to feed back problems, benefits, and/or suggestions -about the software to the Fermilab Software Providers. - -Neither the name of Fermilab, the URA, nor the names of the contributors -may be used to endorse or promote products derived from this software -without specific prior written permission. - -DISCLAIMER OF LIABILITY (BSD): - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS -FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL FERMILAB, -OR THE URA, OR THE U.S. DEPARTMENT of ENERGY, OR CONTRIBUTORS BE LIABLE -FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT -OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR -BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING -NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -Liabilities of the Government: - -This software is provided by URA, independent from its Prime Contract -with the U.S. Department of Energy. URA is acting independently from -the Government and in its own private capacity and is not acting on -behalf of the U.S. Government, nor as its contractor nor its agent. -Correspondingly, it is understood and agreed that the U.S. Government -has no connection to this software and in no manner whatsoever shall -be liable for nor assume any responsibility or obligation for any claim, -cost, or damages arising out of or resulting from the use of the software -available from this server. - -Export Control: - -All documents and software available from this server are subject to U.S. -export control laws. Anyone downloading information from this server is -obligated to secure any necessary Government licenses before exporting -documents or software obtained from this server. - */ -package org.dcache.services.bulk.util; - -import com.google.common.util.concurrent.ListenableFuture; -import org.dcache.services.bulk.activity.BulkActivity; - -/** - * In-memory placeholder for results of batched operations which associates the target with the - * activity future. - */ -public final class BatchedResult { - - private final BulkRequestTarget target; - private final ListenableFuture future; - - public BatchedResult(BulkRequestTarget target, ListenableFuture future) { - this.future = future; - this.target = target; - } - - public void cancel(BulkActivity activity) { - future.cancel(true); - activity.cancel(target); - } - - public ListenableFuture getFuture() { - return future; - } - - public BulkRequestTarget getTarget() { - return target; - } -} diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BulkServiceStatistics.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BulkServiceStatistics.java index 09e94997b5e..461c1173777 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BulkServiceStatistics.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BulkServiceStatistics.java @@ -78,6 +78,8 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.dcache.services.bulk.job.RequestContainerJobFactory; +import org.springframework.beans.factory.annotation.Required; /** * Provides activity statistics via the CellInfo interface. @@ -105,6 +107,7 @@ public static Long getTimestamp(String datetime) throws ParseException { private static final String LAST_SWEEP = "Last job sweep at %s"; private static final String LAST_SWEEP_DURATION = "Last job sweep took %s seconds"; private static final String STATS_FORMAT = "%-20s : %10s"; + private static final String CONCURRENCY_FORMAT = "%-45s : %10s"; private final Date started = new Date(); @@ -119,6 +122,8 @@ public static Long getTimestamp(String datetime) throws ParseException { FAILED.name(), new AtomicLong(0L), SKIPPED.name(), new AtomicLong(0L)); + + private RequestContainerJobFactory factory; private long lastSweep = started.getTime(); private long lastSweepDuration = 0; @@ -172,6 +177,12 @@ public void getInfo(PrintWriter pw) { pw.println("---------------- REQUESTS (current) -----------------"); pw.println(String.format(STATS_FORMAT, "Active", activeRequests.get())); pw.println(); + + pw.println(String.format(CONCURRENCY_FORMAT, "Available permits for directory listing", + factory.getDirListSemaphoreAvailable())); + pw.println(String.format(CONCURRENCY_FORMAT, "Available permits for in-flight targets", + factory.getInFlightSemaphoreAvailable())); + pw.println(); } public String getOwnerCounts() { @@ -216,6 +227,11 @@ public void setActive(int count) { activeRequests.set(count); } + @Required + public void setRequestContainerJobFactory(RequestContainerJobFactory factory) { + this.factory = factory; + } + public void sweepFinished(long duration) { lastSweep = System.currentTimeMillis(); lastSweepDuration = duration; diff --git a/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml b/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml index 08076c1e0d7..097bea036be 100644 --- a/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml +++ b/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml @@ -76,15 +76,6 @@ - - Used to execute the future callbacks to jobs which send and wait for replies. - - - - - - - Used to cancel requests. @@ -94,26 +85,6 @@ - - - - - - - - - - - - - - - - - - - - Encapsulates the bulk database connection pool and properties. @@ -215,39 +186,6 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -269,11 +207,13 @@ - + + Tracks request and target states (counts), sweeper state, etc. + @@ -302,7 +242,6 @@ - @@ -336,7 +275,6 @@ - @@ -346,11 +284,7 @@ - - - - diff --git a/skel/share/defaults/bulk.properties b/skel/share/defaults/bulk.properties index e1f09cd01b8..8a2ef72d8c6 100644 --- a/skel/share/defaults/bulk.properties +++ b/skel/share/defaults/bulk.properties @@ -64,16 +64,12 @@ bulk.request-scheduler=org.dcache.services.bulk.manager.scheduler.LeastRecentFir # adjusted via an admin command to oversubscribe to # this preset number, which may allow for some speed up # if high latency requests such as pinning predominate. -# - activity-callback threads: for running the completion handling on activities # - incoming-request threads: for handling requests received on the message queue # - cancellation threads: for handling cancellation requests # -bulk.limits.container-processing-threads=200 -bulk.limits.activity-callback-threads=50 +bulk.limits.container-processing-threads=100 bulk.limits.incoming-request-threads=10 -bulk.limits.cancellation-threads=20 -bulk.limits.dir-list-threads=20 -(deprecated)bulk.limits.delay-clear-threads= +bulk.limits.cancellation-threads=10 # ---- Expiration of the cache serving to front the request storage. # @@ -104,6 +100,15 @@ bulk.limits.max.targets-per-shallow-request=10 # bulk.limits.max.targets-per-recursive-request=1 +# ---- The maximum number of directory listings that can occur concurrently. +# This is necessary to avoid timing out calls to the PnfsManager. +# +bulk.limits.dir-list-semaphore=20 + +# ---- The maximum number of target tasks that can occur concurrently. +# +bulk.limits.in-flight-semaphore=2000 + # ---- Interval of inactivity by the request manager consumer if not signalled # internally (as for instance when a request job completes). The consumer checks # for request readiness and completion. @@ -197,22 +202,8 @@ bulk.db.url=jdbc:postgresql://${bulk.db.host}/${bulk.db.name}?targetServerType=m bulk.db.fetch-size=1000 # ---- Activity plugin properties -# -# Max permits is the number permits for the semaphore used by the request container to -# perform the activity on individual targets. This number is on a container-by-container -# basis; for instance, if the bulk.limits.container-processing-threads is set to 100, -# this means that the number of concurrent requests to pin manager, e.g., would be 100000. -# These numbers should be adjusted up or down depending on concurrency requirements -# so as not to generate DOS attacks on the other dCache services such as PnfsManager, -# PinManager or the QoSEngine. -# -bulk.plugin!delete.max-permits=100 -bulk.plugin!pin.max-permits=1000 -bulk.plugin!stage.max-permits=1000 -bulk.plugin!unpin.max-permits=1000 -bulk.plugin!release.max-permits=1000 -bulk.plugin!update-qos.max-permits=1000 -bulk.plugin!log-target.max-permits=100 + +bulk.plugin!delete.default-batch-size=100 # ---- Algorithm for determining what action to take on job failures. # @@ -257,3 +248,13 @@ bulk.qos-transition-topic=${dcache.qos.transition-topic} (deprecated)bulk.service.ping=no longer used (deprecated)bulk.service.ping.timeout=no longer used (deprecated)bulk.service.ping.timeout.unit=no longer used +(deprecated)bulk.limits.dir-list-threads=use bulk.limits.dir-list-semaphore +(deprecated)bulk.limits.delay-clear-threads=no longer used +(deprecated)bulk.limits.activity-callback-threads=no longer used +(deprecated)bulk.plugin!delete.max-permits=no longer used; see bulk.limits.in-flight-semaphore +(deprecated)bulk.plugin!pin.max-permits=no longer used; see bulk.limits.in-flight-semaphore +(deprecated)bulk.plugin!stage.max-permits=no longer used; see bulk.limits.in-flight-semaphore +(deprecated)bulk.plugin!unpin.max-permits=no longer used; see bulk.limits.in-flight-semaphore +(deprecated)bulk.plugin!release.max-permits=no longer used; see bulk.limits.in-flight-semaphore +(deprecated)bulk.plugin!update-qos.max-permits=no longer used; see bulk.limits.in-flight-semaphore +(deprecated)bulk.plugin!log-target.max-permits=no longer used; see bulk.limits.in-flight-semaphore \ No newline at end of file diff --git a/skel/share/services/bulk.batch b/skel/share/services/bulk.batch index 2acaa4ba60b..edc406269df 100644 --- a/skel/share/services/bulk.batch +++ b/skel/share/services/bulk.batch @@ -8,10 +8,8 @@ check -strong bulk.cell.subscribe check -strong bulk.allowed-directory-expansion check -strong bulk.request-scheduler check -strong bulk.limits.container-processing-threads -check -strong bulk.limits.activity-callback-threads check -strong bulk.limits.incoming-request-threads check -strong bulk.limits.cancellation-threads -check -strong bulk.limits.dir-list-threads check -strong bulk.limits.request-cache-expiration check -strong bulk.limits.request-cache-expiration.unit check -strong bulk.limits.max-requests-per-user @@ -26,6 +24,8 @@ check -strong bulk.limits.archiver-period check -strong bulk.limits.archiver-period.unit check -strong bulk.limits.archiver-window check -strong bulk.limits.archiver-window.unit +check -strong bulk.limits.dir-list-semaphore +check -strong bulk.limits.in-flight-semaphore check -strong bulk.service.pnfsmanager check -strong bulk.service.pnfsmanager.timeout check -strong bulk.service.pnfsmanager.timeout.unit @@ -51,17 +51,10 @@ check -strong bulk.db.password check -strong bulk.db.password.file check -strong bulk.db.schema.auto check -strong bulk.db.fetch-size -check -strong bulk.plugin!delete.max-permits -check -strong bulk.plugin!pin.max-permits check -strong bulk.plugin!pin.default-lifetime check -strong bulk.plugin!pin.default-lifetime.unit -check -strong bulk.plugin!stage.max-permits check -strong bulk.plugin!stage.default-lifetime check -strong bulk.plugin!stage.default-lifetime.unit -check -strong bulk.plugin!unpin.max-permits -check -strong bulk.plugin!release.max-permits -check -strong bulk.plugin!update-qos.max-permits -check -strong bulk.plugin!log-target.max-permits check -strong bulk.plugin!delete.retry-policy check -strong bulk.plugin!pin.retry-policy check -strong bulk.plugin!stage.retry-policy