From 2076857a4cce60dea4858d141f2cd96cffa1e89b Mon Sep 17 00:00:00 2001 From: Albert Louis Rossi Date: Thu, 19 Oct 2023 07:28:18 -0500 Subject: [PATCH] dcache-bulk: use rate limiter to throttle semaphore release Motivation: master@0b4140b454b819de55c7c412539294537ff0beb8 https://rb.dcache.org/r/14118/ restructured the container job for greater throughput. In order to pace execution of calls on external services, however, two semaphores were used. For task execution, the semaphore is not released until the future of the execution completes. This works well but has the drawback of not allowing submission to continue to services like PinManager if there are max permit number of tasks waiting for a response (e.g., in the case of actual staging). On the other hand, releasing the semaphore immediately upon reception of the future, causes calls to external services to pile up, causing timeout errors of various sorts. Modification: Relying on the thread pool size will not work in this case because the execution of the activity must be asynchronous; the turnover is extremely rapid. Instead we adopt the solution of a rate limiter to throttle the semaphore release. Each activity is given a limiter for the service endpoint it communicates with. The rates for these (PinManager, PnfsManager, QoSEngine) are configurable. Result: Performance and stability is sustained, but throughput continues when the submitted task activities are in a state of waiting for future completion. Target: master Request: 9.2 (fixes an important issue) Patch: https://rb.dcache.org/r/14136/ Requires-notes: yes (No longer blocks throughput of new tasks when the number of tasks waiting for completion from an external service like PinManager reaches max available task permits.) Acked-by: Tigran --- .../services/bulk/BulkServiceCommands.java | 9 +- .../services/bulk/activity/BulkActivity.java | 40 +++-- .../bulk/activity/BulkActivityFactory.java | 28 +++- .../bulk/job/BulkRequestContainerJob.java | 139 ++++++++++++------ .../org/dcache/services/bulk/bulk.xml | 17 +++ skel/share/defaults/bulk.properties | 9 ++ skel/share/services/bulk.batch | 3 + 7 files changed, 180 insertions(+), 65 deletions(-) diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkServiceCommands.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkServiceCommands.java index b79931854e6..15e05239ba3 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkServiceCommands.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkServiceCommands.java @@ -183,7 +183,7 @@ public final class BulkServiceCommands implements CellCommandListener { /** * name | class | type | permits */ - private static final String FORMAT_ACTIVITY = "%-20s | %100s | %7s "; + private static final String FORMAT_ACTIVITY = "%-20s | %100s | %7s"; /** * name | required | description @@ -264,8 +264,9 @@ public String nextCommand() { } } - private static String formatActivity(Entry entry) { + private static String formatActivity(Entry entry, BulkActivityFactory factory) { BulkActivityProvider provider = entry.getValue(); + return String.format(FORMAT_ACTIVITY, entry.getKey(), provider.getActivityClass(), @@ -544,14 +545,14 @@ public String call() throws Exception { Sorter sorter = new Sorter(SortOrder.valueOf(sort.toUpperCase())); String activities = activityFactory.getProviders().entrySet() .stream() - .map(BulkServiceCommands::formatActivity) + .map(e -> formatActivity(e, activityFactory)) .sorted(sorter) .collect(joining("\n")); if (activities == null) { return "There are no mapped activities!"; } - return String.format(FORMAT_ACTIVITY, "NAME", "CLASS", "TYPE") + return String.format(FORMAT_ACTIVITY, "NAME", "CLASS", "TYPE", "RATE") + "\n" + activities; } } diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivity.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivity.java index 8ab79b81e58..49f82faabe0 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivity.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivity.java @@ -60,6 +60,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING package org.dcache.services.bulk.activity; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.RateLimiter; import diskCacheV111.util.FsPath; import java.util.Collections; import java.util.EnumSet; @@ -76,11 +77,12 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import org.dcache.vehicles.FileAttributes; /** - * Base definition for a bulk activity. Specifies the interfaces for executing the action on a + * Base definition for a bulk activity. Specifies the interfaces for executing + * the action on a * given target and for listening (asynchronously) for a result. *

* An instance of an activity is constructed on a request-by-request basis - * by the JobFactory. It should not be shared between requests. + * by the JobFactory. It should not be shared between requests. * * @param the type of object returned with the listenable future. */ @@ -90,10 +92,10 @@ public enum TargetType { FILE, DIR, BOTH } - public static final Set MINIMALLY_REQUIRED_ATTRIBUTES - = Collections.unmodifiableSet(EnumSet.of(FileAttribute.PNFSID, FileAttribute.TYPE, - FileAttribute.OWNER_GROUP, FileAttribute.OWNER, FileAttribute.ACCESS_LATENCY, - FileAttribute.RETENTION_POLICY)); + public static final Set MINIMALLY_REQUIRED_ATTRIBUTES = Collections + .unmodifiableSet(EnumSet.of(FileAttribute.PNFSID, FileAttribute.TYPE, + FileAttribute.OWNER_GROUP, FileAttribute.OWNER, FileAttribute.ACCESS_LATENCY, + FileAttribute.RETENTION_POLICY)); private static final BulkTargetRetryPolicy DEFAULT_RETRY_POLICY = new NoRetryPolicy(); @@ -102,6 +104,7 @@ public enum TargetType { protected Subject subject; protected Restriction restriction; + protected RateLimiter rateLimiter; protected BulkTargetRetryPolicy retryPolicy; protected Set descriptors; @@ -127,6 +130,20 @@ public void setRetryPolicy(BulkTargetRetryPolicy retryPolicy) { this.retryPolicy = retryPolicy; } + public void throttle() { + if (rateLimiter != null) { + rateLimiter.acquire(); + } + } + + public RateLimiter getRateLimiter() { + return rateLimiter; + } + + public void setRateLimiter(RateLimiter rateLimiter) { + this.rateLimiter = rateLimiter; + } + public TargetType getTargetType() { return targetType; } @@ -154,17 +171,18 @@ public void setDescriptors(Set descriptors) { /** * Performs the activity. * - * @param rid of the request. - * @param tid of the target. - * @param path of the target on which to perform the activity. + * @param rid of the request. + * @param tid of the target. + * @param path of the target on which to perform the activity. * @return future result of the activity. * @throws BulkServiceException */ public abstract ListenableFuture perform(String rid, long tid, FsPath path, FileAttributes attributes) - throws BulkServiceException; + throws BulkServiceException; /** - * An activity instance is on a request-by-request basis, so the parameters need to be + * An activity instance is on a request-by-request basis, so the parameters need + * to be * configured by the factory. * * @param arguments parameters of the specific activity. diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityFactory.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityFactory.java index 2904da6b8f0..2466f22b808 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityFactory.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityFactory.java @@ -60,6 +60,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING package org.dcache.services.bulk.activity; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.RateLimiter; import diskCacheV111.poolManager.PoolManagerAware; import diskCacheV111.util.NamespaceHandlerAware; import diskCacheV111.util.PnfsHandler; @@ -70,6 +71,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.HashMap; import java.util.Map; import java.util.ServiceLoader; +import java.util.stream.Collectors; import javax.security.auth.Subject; import org.dcache.auth.Subjects; import org.dcache.auth.attributes.Restriction; @@ -90,7 +92,8 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING /** * Creates activities on the basis of activity mappings. *

- * For each activity (such as pinning, deletion, etc.), there must be an SPI provider which creates + * For each activity (such as pinning, deletion, etc.), there must be an SPI + * provider which creates * the class implementing the activity API contract. */ public final class BulkActivityFactory implements CellMessageSender, EnvironmentAware { @@ -98,9 +101,11 @@ public final class BulkActivityFactory implements CellMessageSender, Environment private static final Logger LOGGER = LoggerFactory.getLogger(BulkActivityFactory.class); private final Map providers = Collections.synchronizedMap( - new HashMap<>()); + new HashMap<>()); private Map retryPolicies; + private Map rateLimiters; + private Map rateLimiterActivityIndex; private Map environment; private CellStub pnfsManager; @@ -114,7 +119,8 @@ public final class BulkActivityFactory implements CellMessageSender, Environment private boolean initialized; /** - * Generates an instance of the plugin-specific activity to be used by the request jobs. + * Generates an instance of the plugin-specific activity to be used by the + * request jobs. * * @param request being serviced. * @param subject of user who submitted the request. @@ -137,11 +143,16 @@ public BulkActivity createActivity(BulkRequest request, Subject subject, BulkActivity bulkActivity = provider.createActivity(); bulkActivity.setSubject(subject); bulkActivity.setRestriction(restriction); + String rateLimiterType = rateLimiterActivityIndex.get(activity); + if (rateLimiterType != null) { + bulkActivity.setRateLimiter(rateLimiters.get(rateLimiterType)); + } BulkTargetRetryPolicy retryPolicy = retryPolicies.get(activity); if (retryPolicy != null) { bulkActivity.setRetryPolicy(retryPolicy); } + configureEndpoints(bulkActivity); bulkActivity.configure(request.getArguments()); @@ -207,6 +218,17 @@ public void setQoSResponseReceiver(QoSResponseReceiver qoSResponseReceiver) { this.qoSResponseReceiver = qoSResponseReceiver; } + @Required + public void setRateLimiters(Map rates) { + rateLimiters = rates.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> RateLimiter.create(e.getValue()))); + } + + @Required + public void setRateLimiterActivityIndex(Map rateLimiterActivityIndex) { + this.rateLimiterActivityIndex = rateLimiterActivityIndex; + } + @Required public void setRetryPolicies(Map retryPolicies) { this.retryPolicies = retryPolicies; diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java index 48e072fcd60..0a34193a25e 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java @@ -127,8 +127,8 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING /** * Container job for a list of targets which may or may not be associated with each other via a - * common parent. It handles all file targets asynchronously, recurs if directory listing is enabled, - * and processes directory targets serially last in depth-first reverse order. + * common parent. It handles all file targets asynchronously, recurs if directory listing is + * enabled, and processes directory targets serially last in depth-first reverse order. */ public final class BulkRequestContainerJob implements Runnable, NamespaceHandlerAware, Comparable, @@ -156,9 +156,10 @@ public static FsPath findAbsolutePath(String prefix, String path) { } /** - * Directories that serve as targets. These are stored in memory, sorted and processed last. + * Directories that serve as targets. These are stored in memory, sorted and processed last. */ static class DirTarget { + final FsPath path; final FileAttributes attributes; final PID pid; @@ -170,46 +171,85 @@ static class DirTarget { this.pid = pid; this.attributes = attributes; this.path = path; - depth = (int)path.toString().chars().filter(c -> c == '/').count(); + depth = (int) path.toString().chars().filter(c -> c == '/').count(); } } /** - * Depth-first (descending order). + * Depth-first (descending order). */ static class DirTargetSorter implements Comparator { + @Override public int compare(DirTarget o1, DirTarget o2) { - return Integer.compare(o2.depth, o1.depth); /* DESCENDING ORDER */ + return Integer.compare(o2.depth, o1.depth); /* DESCENDING ORDER */ } } /** - * Container delays processing directory targets until the final step. + * Encapsulates manipulation of the semaphore. + */ + class PermitHolder { + + private final AtomicBoolean holdingPermit = new AtomicBoolean(false); + private Semaphore taskSemaphore; + + void acquireIfNotHoldingPermit() throws InterruptedException { + if (taskSemaphore == null) { + return; + } + + if (holdingPermit.compareAndSet(false, true)) { + taskSemaphore.acquire(); + } + } + + void throttledRelease() { + activity.throttle(); + releaseIfHoldingPermit(); + } + + void releaseIfHoldingPermit() { + if (taskSemaphore == null) { + return; + } + + if (holdingPermit.compareAndSet(true, false)) { + taskSemaphore.release(); + } + } + + void setTaskSemaphore(Semaphore taskSemaphore) { + this.taskSemaphore = taskSemaphore; + } + } + + /** + * Container delays processing directory targets until the final step. */ enum ContainerState { START, PROCESS_FILES, WAIT, PROCESS_DIRS, STOP } /** - * Only INITIAL targets go through all three states. DISCOVERED targets - * already have their proper paths and attributes from listing. + * Only INITIAL targets go through all three states. DISCOVERED targets already have their + * proper paths and attributes from listing. */ enum TaskState { RESOLVE_PATH, FETCH_ATTRIBUTES, HANDLE_TARGET, HANDLE_DIR_TARGET } /** - * Wrapper task for directory listing and target processing. + * Wrapper task for directory listing and target processing. */ abstract class ContainerTask implements Runnable { + final Consumer errorHandler = e -> uncaughtException(Thread.currentThread(), e); final long seqNo = taskCounter.getAndIncrement(); - final AtomicBoolean holdingPermit = new AtomicBoolean(false); + final PermitHolder permitHolder = new PermitHolder(); Future taskFuture; ExecutorService taskExecutor; - Semaphore taskSemaphore; public void run() { try { @@ -237,7 +277,8 @@ void cancel() { void expandDepthFirst(Long id, PID pid, FsPath path, FileAttributes dirAttributes) throws BulkServiceException, CacheException { - LOGGER.debug("{} - expandDepthFirst, {}, {}, {}, {}", ruid, id, pid, path, dirAttributes); + LOGGER.debug("{} - expandDepthFirst, {}, {}, {}, {}", ruid, id, pid, path, + dirAttributes); try { new DirListTask(id, pid, path, dirAttributes).submitAsync(); } catch (InterruptedException e) { @@ -247,16 +288,14 @@ void expandDepthFirst(Long id, PID pid, FsPath path, FileAttributes dirAttribute void submitAsync() throws InterruptedException { /* - * Acquisition must be done outside the synchronized block (running), - * else there could be a deadlock. + * Acquisition must be done outside the synchronized block (running), + * else there could be a deadlock. */ - if (holdingPermit.compareAndSet(false, true)) { - taskSemaphore.acquire(); - } + permitHolder.acquireIfNotHoldingPermit(); synchronized (running) { if (jobTarget.isTerminated()) { - taskSemaphore.release(); + permitHolder.releaseIfHoldingPermit(); return; } @@ -269,13 +308,12 @@ void submitAsync() throws InterruptedException { } void remove() { - if (holdingPermit.compareAndSet(true, false)) { - taskSemaphore.release(); - } + permitHolder.releaseIfHoldingPermit(); synchronized (running) { running.remove(seqNo); - LOGGER.debug("{} - remove task {}, task count now {}.", ruid, seqNo, running.size()); + LOGGER.debug("{} - remove task {}, task count now {}.", ruid, seqNo, + running.size()); if (running.isEmpty()) { checkTransitionToDirs(); @@ -287,6 +325,7 @@ void remove() { } class DirListTask extends ContainerTask { + final Long id; final PID pid; final FsPath path; @@ -298,7 +337,7 @@ class DirListTask extends ContainerTask { this.path = path; this.dirAttributes = dirAttributes; taskExecutor = listExecutor; - taskSemaphore = dirListSemaphore; + permitHolder.setTaskSemaphore(dirListSemaphore); } void doInner() throws Throwable { @@ -384,12 +423,12 @@ class TargetTask extends ContainerTask { final BulkRequestTarget target; /* - * From activity.perform() + * From activity.perform() */ ListenableFuture activityFuture; /* - * Determines the doInner() switch + * Determines the doInner() switch */ TaskState state; @@ -397,7 +436,7 @@ class TargetTask extends ContainerTask { this.target = target; state = initialState; taskExecutor = BulkRequestContainerJob.this.executor; - taskSemaphore = inFlightSemaphore; + permitHolder.setTaskSemaphore(inFlightSemaphore); } void cancel() { @@ -452,7 +491,7 @@ void performSync() throws InterruptedException { } /** - * (1) symlink resolution on initial targets; bypassed for discovered targets. + * (1) symlink resolution on initial targets; bypassed for discovered targets. */ private void resolvePath() { LOGGER.debug("{} - resolvePath, resolving {}", ruid, target.getPath()); @@ -484,13 +523,14 @@ public void failure(int rc, Object error) { } /** - * (2) retrieval of required file attributes. + * (2) retrieval of required file attributes. */ private void fetchAttributes() { LOGGER.debug("{} - fetchAttributes for path {}", ruid, target.getPath()); PnfsGetFileAttributes message = new PnfsGetFileAttributes(target.getPath().toString(), MINIMALLY_REQUIRED_ATTRIBUTES); - ListenableFuture requestFuture = pnfsHandler.requestAsync(message); + ListenableFuture requestFuture = pnfsHandler.requestAsync( + message); CellStub.addCallback(requestFuture, new AbstractMessageCallback<>() { @Override public void success(PnfsGetFileAttributes message) { @@ -512,7 +552,7 @@ public void failure(int rc, Object error) { } /** - * (3b) either recurs on directory or performs activity on file. + * (3b) either recurs on directory or performs activity on file. */ private void handleTarget() throws InterruptedException { LOGGER.debug("{} - handleTarget for {}, path {}.", ruid, target.getActivity(), @@ -524,8 +564,8 @@ private void handleTarget() throws InterruptedException { storeOrUpdate(null); expandDepthFirst(target.getId(), target.getPid(), target.getPath(), attributes); /* - * Swap out for the directory listing task. - * (We must do this AFTER the directory task has been added to running.) + * Swap out for the directory listing task. + * (We must do this AFTER the directory task has been added to running.) */ remove(); } else if (type != FileType.SPECIAL) { @@ -539,7 +579,7 @@ private void handleTarget() throws InterruptedException { } /** - * (3a) Performs activity on either file or directory target. + * (3a) Performs activity on either file or directory target. */ private void performActivity() throws InterruptedException { performActivity(true); @@ -554,7 +594,8 @@ private void performActivity(boolean async) throws InterruptedException { storeOrUpdate(null); if (hasBeenSpecificallyCancelled(this)) { - LOGGER.debug("{} - performActivity hasBeenSpecificallyCancelled for {}.", ruid, path); + LOGGER.debug("{} - performActivity hasBeenSpecificallyCancelled for {}.", ruid, + path); remove(); } @@ -562,6 +603,7 @@ private void performActivity(boolean async) throws InterruptedException { activityFuture = activity.perform(ruid, id == null ? seqNo : id, path, attributes); if (async) { activityFuture.addListener(() -> handleCompletion(), callbackExecutor); + permitHolder.throttledRelease(); } } catch (BulkServiceException | UnsupportedOperationException e) { LOGGER.error("{}, perform failed for {}: {}", ruid, target, e.getMessage()); @@ -615,7 +657,8 @@ private void storeOrUpdate(Throwable error) { LOGGER.debug("{} - storeOrUpdate {}.", ruid, target); if (hasBeenSpecificallyCancelled(this)) { - LOGGER.debug("{} - storeOrUpdate, hasBeenSpecificallyCancelled {}.", ruid, target.getPath()); + LOGGER.debug("{} - storeOrUpdate, hasBeenSpecificallyCancelled {}.", ruid, + target.getPath()); return; } @@ -624,14 +667,15 @@ private void storeOrUpdate(Throwable error) { try { /* - * If this is an insert (id == null), the target id will be updated to what is - * returned from the database. + * If this is an insert (id == null), the target id will be updated to what is + * returned from the database. */ checkForRequestCancellation(); targetStore.storeOrUpdate(target); LOGGER.debug("{} - storeOrUpdate, target id {}", ruid, target.getId()); } catch (BulkStorageException e) { - LOGGER.error("{}, could not store or update target from result {}, {}, {}: {}.", ruid, + LOGGER.error("{}, could not store or update target from result {}, {}, {}: {}.", + ruid, target.getId(), target.getPath(), target.getAttributes(), e.toString()); error = e; } catch (InterruptedException e) { @@ -700,9 +744,9 @@ public void cancel() { interruptRunThread(); /* - * Thread may already have exited. + * Thread may already have exited. * - * Update terminates job target. + * Update terminates job target. */ containerState = ContainerState.STOP; update(CANCELLED); @@ -710,7 +754,7 @@ public void cancel() { LOGGER.debug("{} - cancel, running {}.", ruid, running.size()); /* - * Drain running tasks. Calling task cancel removes the task from the map. + * Drain running tasks. Calling task cancel removes the task from the map. */ while (true) { ContainerTask task; @@ -832,7 +876,7 @@ public void run() { } catch (InterruptedException e) { LOGGER.debug("{} - run: interrupted", ruid); /* - * If the state has not already been set to terminal, do so. + * If the state has not already been set to terminal, do so. */ containerState = ContainerState.STOP; update(CANCELLED); @@ -952,7 +996,8 @@ private boolean hasBeenSpecificallyCancelled(TargetTask task) { targetStore.update(target.getId(), CANCELLED, null, null); } } catch (BulkServiceException | UnsupportedOperationException e) { - LOGGER.error("hasBeenSpecificallyCancelled {}, failed for {}: {}", ruid, target.getPath(), + LOGGER.error("hasBeenSpecificallyCancelled {}, failed for {}: {}", ruid, + target.getPath(), e.toString()); } return true; @@ -985,7 +1030,7 @@ private void processDirTargets() { Arrays.sort(sorted, SORTER); /* - * Process serially in this thread + * Process serially in this thread */ for (DirTarget dirTarget : sorted) { try { @@ -994,7 +1039,7 @@ private void processDirTargets() { TaskState.HANDLE_DIR_TARGET).performSync(); } catch (InterruptedException e) { /* - * Cancel most likely called; stop processing. + * Cancel most likely called; stop processing. */ break; } @@ -1018,7 +1063,7 @@ private void processFileTargets() { new TargetTask(target, TaskState.RESOLVE_PATH).submitAsync(); } catch (InterruptedException e) { /* - * Cancel most likely called; stop processing. + * Cancel most likely called; stop processing. */ break; } diff --git a/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml b/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml index 603d7a400bb..dd9784483cb 100644 --- a/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml +++ b/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml @@ -205,6 +205,23 @@ + + + + + + + + + + + + + + + + + diff --git a/skel/share/defaults/bulk.properties b/skel/share/defaults/bulk.properties index 6876b56bba5..e5c077b1933 100644 --- a/skel/share/defaults/bulk.properties +++ b/skel/share/defaults/bulk.properties @@ -109,6 +109,15 @@ bulk.limits.dir-list-semaphore=20 # bulk.limits.in-flight-semaphore=2000 +# ---- Endpoint throttles +# +# These help bulk avoid flooding the respective service (PinManager, QoSEngine, PnfsManager) +# with requests at a rate it cannot handle. +# +bulk.limits.pin-manager-rate-per-second=1000 +bulk.limits.pnfs-manager-rate-per-second=2000 +bulk.limits.qos-engine-rate-per-second=500 + # ---- Interval of inactivity by the request manager consumer if not signalled # internally (as for instance when a request job completes). The consumer checks # for request readiness and completion. diff --git a/skel/share/services/bulk.batch b/skel/share/services/bulk.batch index edc406269df..43d1a75a0ce 100644 --- a/skel/share/services/bulk.batch +++ b/skel/share/services/bulk.batch @@ -26,6 +26,9 @@ check -strong bulk.limits.archiver-window check -strong bulk.limits.archiver-window.unit check -strong bulk.limits.dir-list-semaphore check -strong bulk.limits.in-flight-semaphore +check -strong bulk.limits.pin-manager-rate-per-second +check -strong bulk.limits.pnfs-manager-rate-per-second +check -strong bulk.limits.qos-engine-rate-per-second check -strong bulk.service.pnfsmanager check -strong bulk.service.pnfsmanager.timeout check -strong bulk.service.pnfsmanager.timeout.unit