diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkServiceCommands.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkServiceCommands.java index b79931854e6..15e05239ba3 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkServiceCommands.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkServiceCommands.java @@ -183,7 +183,7 @@ public final class BulkServiceCommands implements CellCommandListener { /** * name | class | type | permits */ - private static final String FORMAT_ACTIVITY = "%-20s | %100s | %7s "; + private static final String FORMAT_ACTIVITY = "%-20s | %100s | %7s"; /** * name | required | description @@ -264,8 +264,9 @@ public String nextCommand() { } } - private static String formatActivity(Entry entry) { + private static String formatActivity(Entry entry, BulkActivityFactory factory) { BulkActivityProvider provider = entry.getValue(); + return String.format(FORMAT_ACTIVITY, entry.getKey(), provider.getActivityClass(), @@ -544,14 +545,14 @@ public String call() throws Exception { Sorter sorter = new Sorter(SortOrder.valueOf(sort.toUpperCase())); String activities = activityFactory.getProviders().entrySet() .stream() - .map(BulkServiceCommands::formatActivity) + .map(e -> formatActivity(e, activityFactory)) .sorted(sorter) .collect(joining("\n")); if (activities == null) { return "There are no mapped activities!"; } - return String.format(FORMAT_ACTIVITY, "NAME", "CLASS", "TYPE") + return String.format(FORMAT_ACTIVITY, "NAME", "CLASS", "TYPE", "RATE") + "\n" + activities; } } diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivity.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivity.java index 8ab79b81e58..49f82faabe0 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivity.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivity.java @@ -60,6 +60,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING package org.dcache.services.bulk.activity; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.RateLimiter; import diskCacheV111.util.FsPath; import java.util.Collections; import java.util.EnumSet; @@ -76,11 +77,12 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import org.dcache.vehicles.FileAttributes; /** - * Base definition for a bulk activity. Specifies the interfaces for executing the action on a + * Base definition for a bulk activity. Specifies the interfaces for executing + * the action on a * given target and for listening (asynchronously) for a result. *

* An instance of an activity is constructed on a request-by-request basis - * by the JobFactory. It should not be shared between requests. + * by the JobFactory. It should not be shared between requests. * * @param the type of object returned with the listenable future. */ @@ -90,10 +92,10 @@ public enum TargetType { FILE, DIR, BOTH } - public static final Set MINIMALLY_REQUIRED_ATTRIBUTES - = Collections.unmodifiableSet(EnumSet.of(FileAttribute.PNFSID, FileAttribute.TYPE, - FileAttribute.OWNER_GROUP, FileAttribute.OWNER, FileAttribute.ACCESS_LATENCY, - FileAttribute.RETENTION_POLICY)); + public static final Set MINIMALLY_REQUIRED_ATTRIBUTES = Collections + .unmodifiableSet(EnumSet.of(FileAttribute.PNFSID, FileAttribute.TYPE, + FileAttribute.OWNER_GROUP, FileAttribute.OWNER, FileAttribute.ACCESS_LATENCY, + FileAttribute.RETENTION_POLICY)); private static final BulkTargetRetryPolicy DEFAULT_RETRY_POLICY = new NoRetryPolicy(); @@ -102,6 +104,7 @@ public enum TargetType { protected Subject subject; protected Restriction restriction; + protected RateLimiter rateLimiter; protected BulkTargetRetryPolicy retryPolicy; protected Set descriptors; @@ -127,6 +130,20 @@ public void setRetryPolicy(BulkTargetRetryPolicy retryPolicy) { this.retryPolicy = retryPolicy; } + public void throttle() { + if (rateLimiter != null) { + rateLimiter.acquire(); + } + } + + public RateLimiter getRateLimiter() { + return rateLimiter; + } + + public void setRateLimiter(RateLimiter rateLimiter) { + this.rateLimiter = rateLimiter; + } + public TargetType getTargetType() { return targetType; } @@ -154,17 +171,18 @@ public void setDescriptors(Set descriptors) { /** * Performs the activity. * - * @param rid of the request. - * @param tid of the target. - * @param path of the target on which to perform the activity. + * @param rid of the request. + * @param tid of the target. + * @param path of the target on which to perform the activity. * @return future result of the activity. * @throws BulkServiceException */ public abstract ListenableFuture perform(String rid, long tid, FsPath path, FileAttributes attributes) - throws BulkServiceException; + throws BulkServiceException; /** - * An activity instance is on a request-by-request basis, so the parameters need to be + * An activity instance is on a request-by-request basis, so the parameters need + * to be * configured by the factory. * * @param arguments parameters of the specific activity. diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityFactory.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityFactory.java index 2904da6b8f0..2466f22b808 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityFactory.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityFactory.java @@ -60,6 +60,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING package org.dcache.services.bulk.activity; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.RateLimiter; import diskCacheV111.poolManager.PoolManagerAware; import diskCacheV111.util.NamespaceHandlerAware; import diskCacheV111.util.PnfsHandler; @@ -70,6 +71,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.HashMap; import java.util.Map; import java.util.ServiceLoader; +import java.util.stream.Collectors; import javax.security.auth.Subject; import org.dcache.auth.Subjects; import org.dcache.auth.attributes.Restriction; @@ -90,7 +92,8 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING /** * Creates activities on the basis of activity mappings. *

- * For each activity (such as pinning, deletion, etc.), there must be an SPI provider which creates + * For each activity (such as pinning, deletion, etc.), there must be an SPI + * provider which creates * the class implementing the activity API contract. */ public final class BulkActivityFactory implements CellMessageSender, EnvironmentAware { @@ -98,9 +101,11 @@ public final class BulkActivityFactory implements CellMessageSender, Environment private static final Logger LOGGER = LoggerFactory.getLogger(BulkActivityFactory.class); private final Map providers = Collections.synchronizedMap( - new HashMap<>()); + new HashMap<>()); private Map retryPolicies; + private Map rateLimiters; + private Map rateLimiterActivityIndex; private Map environment; private CellStub pnfsManager; @@ -114,7 +119,8 @@ public final class BulkActivityFactory implements CellMessageSender, Environment private boolean initialized; /** - * Generates an instance of the plugin-specific activity to be used by the request jobs. + * Generates an instance of the plugin-specific activity to be used by the + * request jobs. * * @param request being serviced. * @param subject of user who submitted the request. @@ -137,11 +143,16 @@ public BulkActivity createActivity(BulkRequest request, Subject subject, BulkActivity bulkActivity = provider.createActivity(); bulkActivity.setSubject(subject); bulkActivity.setRestriction(restriction); + String rateLimiterType = rateLimiterActivityIndex.get(activity); + if (rateLimiterType != null) { + bulkActivity.setRateLimiter(rateLimiters.get(rateLimiterType)); + } BulkTargetRetryPolicy retryPolicy = retryPolicies.get(activity); if (retryPolicy != null) { bulkActivity.setRetryPolicy(retryPolicy); } + configureEndpoints(bulkActivity); bulkActivity.configure(request.getArguments()); @@ -207,6 +218,17 @@ public void setQoSResponseReceiver(QoSResponseReceiver qoSResponseReceiver) { this.qoSResponseReceiver = qoSResponseReceiver; } + @Required + public void setRateLimiters(Map rates) { + rateLimiters = rates.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> RateLimiter.create(e.getValue()))); + } + + @Required + public void setRateLimiterActivityIndex(Map rateLimiterActivityIndex) { + this.rateLimiterActivityIndex = rateLimiterActivityIndex; + } + @Required public void setRetryPolicies(Map retryPolicies) { this.retryPolicies = retryPolicies; diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java index 48e072fcd60..0a34193a25e 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java @@ -127,8 +127,8 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING /** * Container job for a list of targets which may or may not be associated with each other via a - * common parent. It handles all file targets asynchronously, recurs if directory listing is enabled, - * and processes directory targets serially last in depth-first reverse order. + * common parent. It handles all file targets asynchronously, recurs if directory listing is + * enabled, and processes directory targets serially last in depth-first reverse order. */ public final class BulkRequestContainerJob implements Runnable, NamespaceHandlerAware, Comparable, @@ -156,9 +156,10 @@ public static FsPath findAbsolutePath(String prefix, String path) { } /** - * Directories that serve as targets. These are stored in memory, sorted and processed last. + * Directories that serve as targets. These are stored in memory, sorted and processed last. */ static class DirTarget { + final FsPath path; final FileAttributes attributes; final PID pid; @@ -170,46 +171,85 @@ static class DirTarget { this.pid = pid; this.attributes = attributes; this.path = path; - depth = (int)path.toString().chars().filter(c -> c == '/').count(); + depth = (int) path.toString().chars().filter(c -> c == '/').count(); } } /** - * Depth-first (descending order). + * Depth-first (descending order). */ static class DirTargetSorter implements Comparator { + @Override public int compare(DirTarget o1, DirTarget o2) { - return Integer.compare(o2.depth, o1.depth); /* DESCENDING ORDER */ + return Integer.compare(o2.depth, o1.depth); /* DESCENDING ORDER */ } } /** - * Container delays processing directory targets until the final step. + * Encapsulates manipulation of the semaphore. + */ + class PermitHolder { + + private final AtomicBoolean holdingPermit = new AtomicBoolean(false); + private Semaphore taskSemaphore; + + void acquireIfNotHoldingPermit() throws InterruptedException { + if (taskSemaphore == null) { + return; + } + + if (holdingPermit.compareAndSet(false, true)) { + taskSemaphore.acquire(); + } + } + + void throttledRelease() { + activity.throttle(); + releaseIfHoldingPermit(); + } + + void releaseIfHoldingPermit() { + if (taskSemaphore == null) { + return; + } + + if (holdingPermit.compareAndSet(true, false)) { + taskSemaphore.release(); + } + } + + void setTaskSemaphore(Semaphore taskSemaphore) { + this.taskSemaphore = taskSemaphore; + } + } + + /** + * Container delays processing directory targets until the final step. */ enum ContainerState { START, PROCESS_FILES, WAIT, PROCESS_DIRS, STOP } /** - * Only INITIAL targets go through all three states. DISCOVERED targets - * already have their proper paths and attributes from listing. + * Only INITIAL targets go through all three states. DISCOVERED targets already have their + * proper paths and attributes from listing. */ enum TaskState { RESOLVE_PATH, FETCH_ATTRIBUTES, HANDLE_TARGET, HANDLE_DIR_TARGET } /** - * Wrapper task for directory listing and target processing. + * Wrapper task for directory listing and target processing. */ abstract class ContainerTask implements Runnable { + final Consumer errorHandler = e -> uncaughtException(Thread.currentThread(), e); final long seqNo = taskCounter.getAndIncrement(); - final AtomicBoolean holdingPermit = new AtomicBoolean(false); + final PermitHolder permitHolder = new PermitHolder(); Future taskFuture; ExecutorService taskExecutor; - Semaphore taskSemaphore; public void run() { try { @@ -237,7 +277,8 @@ void cancel() { void expandDepthFirst(Long id, PID pid, FsPath path, FileAttributes dirAttributes) throws BulkServiceException, CacheException { - LOGGER.debug("{} - expandDepthFirst, {}, {}, {}, {}", ruid, id, pid, path, dirAttributes); + LOGGER.debug("{} - expandDepthFirst, {}, {}, {}, {}", ruid, id, pid, path, + dirAttributes); try { new DirListTask(id, pid, path, dirAttributes).submitAsync(); } catch (InterruptedException e) { @@ -247,16 +288,14 @@ void expandDepthFirst(Long id, PID pid, FsPath path, FileAttributes dirAttribute void submitAsync() throws InterruptedException { /* - * Acquisition must be done outside the synchronized block (running), - * else there could be a deadlock. + * Acquisition must be done outside the synchronized block (running), + * else there could be a deadlock. */ - if (holdingPermit.compareAndSet(false, true)) { - taskSemaphore.acquire(); - } + permitHolder.acquireIfNotHoldingPermit(); synchronized (running) { if (jobTarget.isTerminated()) { - taskSemaphore.release(); + permitHolder.releaseIfHoldingPermit(); return; } @@ -269,13 +308,12 @@ void submitAsync() throws InterruptedException { } void remove() { - if (holdingPermit.compareAndSet(true, false)) { - taskSemaphore.release(); - } + permitHolder.releaseIfHoldingPermit(); synchronized (running) { running.remove(seqNo); - LOGGER.debug("{} - remove task {}, task count now {}.", ruid, seqNo, running.size()); + LOGGER.debug("{} - remove task {}, task count now {}.", ruid, seqNo, + running.size()); if (running.isEmpty()) { checkTransitionToDirs(); @@ -287,6 +325,7 @@ void remove() { } class DirListTask extends ContainerTask { + final Long id; final PID pid; final FsPath path; @@ -298,7 +337,7 @@ class DirListTask extends ContainerTask { this.path = path; this.dirAttributes = dirAttributes; taskExecutor = listExecutor; - taskSemaphore = dirListSemaphore; + permitHolder.setTaskSemaphore(dirListSemaphore); } void doInner() throws Throwable { @@ -384,12 +423,12 @@ class TargetTask extends ContainerTask { final BulkRequestTarget target; /* - * From activity.perform() + * From activity.perform() */ ListenableFuture activityFuture; /* - * Determines the doInner() switch + * Determines the doInner() switch */ TaskState state; @@ -397,7 +436,7 @@ class TargetTask extends ContainerTask { this.target = target; state = initialState; taskExecutor = BulkRequestContainerJob.this.executor; - taskSemaphore = inFlightSemaphore; + permitHolder.setTaskSemaphore(inFlightSemaphore); } void cancel() { @@ -452,7 +491,7 @@ void performSync() throws InterruptedException { } /** - * (1) symlink resolution on initial targets; bypassed for discovered targets. + * (1) symlink resolution on initial targets; bypassed for discovered targets. */ private void resolvePath() { LOGGER.debug("{} - resolvePath, resolving {}", ruid, target.getPath()); @@ -484,13 +523,14 @@ public void failure(int rc, Object error) { } /** - * (2) retrieval of required file attributes. + * (2) retrieval of required file attributes. */ private void fetchAttributes() { LOGGER.debug("{} - fetchAttributes for path {}", ruid, target.getPath()); PnfsGetFileAttributes message = new PnfsGetFileAttributes(target.getPath().toString(), MINIMALLY_REQUIRED_ATTRIBUTES); - ListenableFuture requestFuture = pnfsHandler.requestAsync(message); + ListenableFuture requestFuture = pnfsHandler.requestAsync( + message); CellStub.addCallback(requestFuture, new AbstractMessageCallback<>() { @Override public void success(PnfsGetFileAttributes message) { @@ -512,7 +552,7 @@ public void failure(int rc, Object error) { } /** - * (3b) either recurs on directory or performs activity on file. + * (3b) either recurs on directory or performs activity on file. */ private void handleTarget() throws InterruptedException { LOGGER.debug("{} - handleTarget for {}, path {}.", ruid, target.getActivity(), @@ -524,8 +564,8 @@ private void handleTarget() throws InterruptedException { storeOrUpdate(null); expandDepthFirst(target.getId(), target.getPid(), target.getPath(), attributes); /* - * Swap out for the directory listing task. - * (We must do this AFTER the directory task has been added to running.) + * Swap out for the directory listing task. + * (We must do this AFTER the directory task has been added to running.) */ remove(); } else if (type != FileType.SPECIAL) { @@ -539,7 +579,7 @@ private void handleTarget() throws InterruptedException { } /** - * (3a) Performs activity on either file or directory target. + * (3a) Performs activity on either file or directory target. */ private void performActivity() throws InterruptedException { performActivity(true); @@ -554,7 +594,8 @@ private void performActivity(boolean async) throws InterruptedException { storeOrUpdate(null); if (hasBeenSpecificallyCancelled(this)) { - LOGGER.debug("{} - performActivity hasBeenSpecificallyCancelled for {}.", ruid, path); + LOGGER.debug("{} - performActivity hasBeenSpecificallyCancelled for {}.", ruid, + path); remove(); } @@ -562,6 +603,7 @@ private void performActivity(boolean async) throws InterruptedException { activityFuture = activity.perform(ruid, id == null ? seqNo : id, path, attributes); if (async) { activityFuture.addListener(() -> handleCompletion(), callbackExecutor); + permitHolder.throttledRelease(); } } catch (BulkServiceException | UnsupportedOperationException e) { LOGGER.error("{}, perform failed for {}: {}", ruid, target, e.getMessage()); @@ -615,7 +657,8 @@ private void storeOrUpdate(Throwable error) { LOGGER.debug("{} - storeOrUpdate {}.", ruid, target); if (hasBeenSpecificallyCancelled(this)) { - LOGGER.debug("{} - storeOrUpdate, hasBeenSpecificallyCancelled {}.", ruid, target.getPath()); + LOGGER.debug("{} - storeOrUpdate, hasBeenSpecificallyCancelled {}.", ruid, + target.getPath()); return; } @@ -624,14 +667,15 @@ private void storeOrUpdate(Throwable error) { try { /* - * If this is an insert (id == null), the target id will be updated to what is - * returned from the database. + * If this is an insert (id == null), the target id will be updated to what is + * returned from the database. */ checkForRequestCancellation(); targetStore.storeOrUpdate(target); LOGGER.debug("{} - storeOrUpdate, target id {}", ruid, target.getId()); } catch (BulkStorageException e) { - LOGGER.error("{}, could not store or update target from result {}, {}, {}: {}.", ruid, + LOGGER.error("{}, could not store or update target from result {}, {}, {}: {}.", + ruid, target.getId(), target.getPath(), target.getAttributes(), e.toString()); error = e; } catch (InterruptedException e) { @@ -700,9 +744,9 @@ public void cancel() { interruptRunThread(); /* - * Thread may already have exited. + * Thread may already have exited. * - * Update terminates job target. + * Update terminates job target. */ containerState = ContainerState.STOP; update(CANCELLED); @@ -710,7 +754,7 @@ public void cancel() { LOGGER.debug("{} - cancel, running {}.", ruid, running.size()); /* - * Drain running tasks. Calling task cancel removes the task from the map. + * Drain running tasks. Calling task cancel removes the task from the map. */ while (true) { ContainerTask task; @@ -832,7 +876,7 @@ public void run() { } catch (InterruptedException e) { LOGGER.debug("{} - run: interrupted", ruid); /* - * If the state has not already been set to terminal, do so. + * If the state has not already been set to terminal, do so. */ containerState = ContainerState.STOP; update(CANCELLED); @@ -952,7 +996,8 @@ private boolean hasBeenSpecificallyCancelled(TargetTask task) { targetStore.update(target.getId(), CANCELLED, null, null); } } catch (BulkServiceException | UnsupportedOperationException e) { - LOGGER.error("hasBeenSpecificallyCancelled {}, failed for {}: {}", ruid, target.getPath(), + LOGGER.error("hasBeenSpecificallyCancelled {}, failed for {}: {}", ruid, + target.getPath(), e.toString()); } return true; @@ -985,7 +1030,7 @@ private void processDirTargets() { Arrays.sort(sorted, SORTER); /* - * Process serially in this thread + * Process serially in this thread */ for (DirTarget dirTarget : sorted) { try { @@ -994,7 +1039,7 @@ private void processDirTargets() { TaskState.HANDLE_DIR_TARGET).performSync(); } catch (InterruptedException e) { /* - * Cancel most likely called; stop processing. + * Cancel most likely called; stop processing. */ break; } @@ -1018,7 +1063,7 @@ private void processFileTargets() { new TargetTask(target, TaskState.RESOLVE_PATH).submitAsync(); } catch (InterruptedException e) { /* - * Cancel most likely called; stop processing. + * Cancel most likely called; stop processing. */ break; } diff --git a/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml b/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml index 603d7a400bb..dd9784483cb 100644 --- a/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml +++ b/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml @@ -205,6 +205,23 @@ + + + + + + + + + + + + + + + + + diff --git a/skel/share/defaults/bulk.properties b/skel/share/defaults/bulk.properties index 6876b56bba5..e5c077b1933 100644 --- a/skel/share/defaults/bulk.properties +++ b/skel/share/defaults/bulk.properties @@ -109,6 +109,15 @@ bulk.limits.dir-list-semaphore=20 # bulk.limits.in-flight-semaphore=2000 +# ---- Endpoint throttles +# +# These help bulk avoid flooding the respective service (PinManager, QoSEngine, PnfsManager) +# with requests at a rate it cannot handle. +# +bulk.limits.pin-manager-rate-per-second=1000 +bulk.limits.pnfs-manager-rate-per-second=2000 +bulk.limits.qos-engine-rate-per-second=500 + # ---- Interval of inactivity by the request manager consumer if not signalled # internally (as for instance when a request job completes). The consumer checks # for request readiness and completion. diff --git a/skel/share/services/bulk.batch b/skel/share/services/bulk.batch index edc406269df..43d1a75a0ce 100644 --- a/skel/share/services/bulk.batch +++ b/skel/share/services/bulk.batch @@ -26,6 +26,9 @@ check -strong bulk.limits.archiver-window check -strong bulk.limits.archiver-window.unit check -strong bulk.limits.dir-list-semaphore check -strong bulk.limits.in-flight-semaphore +check -strong bulk.limits.pin-manager-rate-per-second +check -strong bulk.limits.pnfs-manager-rate-per-second +check -strong bulk.limits.qos-engine-rate-per-second check -strong bulk.service.pnfsmanager check -strong bulk.service.pnfsmanager.timeout check -strong bulk.service.pnfsmanager.timeout.unit