From 78730356154448b1fc2c259e6354cf8dfc3832ce Mon Sep 17 00:00:00 2001 From: Albert Louis Rossi Date: Sat, 30 Sep 2023 07:18:13 -0500 Subject: [PATCH] dcache-bulk: implement HA Motivation: Some users/experiments have expressed an interest in making the Bulk service replicable. Modification: The necessary hooks have been added for leader/latch management. Because of the stateful semantics of the database, all physical instances must share a single database instance. Because of the way containers are managed in memory, it is preferable to have all DB insert and update activity, as well as cancellation and clearing (which involves the cache as well) to be done by the leader only. Hence, `submit` (< RESTful POST), `cancel` (< RESTful PATCH) and `clear` (< RESTful DELETE) messages are forwarded to the leader (whose address is obtained by injecting the HA leadership manager into the service). RESTful GET queries (`info`, `list`) are serviced by all physical instances. The `archiver` is also run exclusively by the leader. Result: Bulk is replicable. Target: master Request: 9.2 Patch: https://rb.dcache.org/r/14111/ Requires-notes: yes Requires-book: yes (cookbook paragraph provided) Acked-by: Lea --- .../cookbook-ha-with-replicable-services.md | 16 ++ .../org/dcache/services/bulk/BulkService.java | 179 +++++++++++++----- .../bulk/activity/BulkActivityFactory.java | 19 +- .../bulk/manager/BulkRequestManager.java | 5 + .../manager/ConcurrentRequestManager.java | 19 +- .../services/bulk/store/BulkRequestStore.java | 5 + .../jdbc/request/JdbcBulkRequestArchiver.java | 24 ++- .../jdbc/request/JdbcBulkRequestStore.java | 12 +- .../org/dcache/services/bulk/bulk.xml | 31 ++- .../cells/HAServiceLeadershipManager.java | 4 + skel/share/defaults/bulk.properties | 18 +- skel/share/services/bulk.batch | 2 + 12 files changed, 268 insertions(+), 66 deletions(-) diff --git a/docs/TheBook/src/main/markdown/cookbook-ha-with-replicable-services.md b/docs/TheBook/src/main/markdown/cookbook-ha-with-replicable-services.md index 16348d0e7d5..7ee51858e05 100644 --- a/docs/TheBook/src/main/markdown/cookbook-ha-with-replicable-services.md +++ b/docs/TheBook/src/main/markdown/cookbook-ha-with-replicable-services.md @@ -256,3 +256,19 @@ information. Requests will be load balanced over available instances of `topo`. This service collects nightly statistics about available pools. One can have multiple instances of `statistics` and each instance will collect the same information. + +### `bulk` + +This service processes bulk requests for pinning/unpinning, staging/releasing, +file deletion and qos updating or manipulation. It does not directly affect +transfers, but is perhaps more critical than the other services in this group +because it supports the `REST` API. + +Bulk is fully replicable. All instances *_must_* share the same database instance. +The configuration should be synchronized such that all instances are configured +the same way. + +Request querying is load-balanced over the physical instances, but only the +leader instance is responsible for submission, cancelling or clearing requests, +and only the leader actually processes them (i.e., runs the request container +servicing the request). \ No newline at end of file diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkService.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkService.java index 1b6a42fe579..55a0f03a112 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkService.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkService.java @@ -63,11 +63,13 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import static org.dcache.services.bulk.util.BulkRequestTarget.computeFsPath; import com.google.common.base.Strings; +import com.google.common.base.Throwables; import diskCacheV111.util.CacheException; import diskCacheV111.util.FsPath; import diskCacheV111.util.PnfsHandler; import diskCacheV111.util.TimeoutCacheException; import diskCacheV111.vehicles.Message; +import dmg.cells.nucleus.CellAddressCore; import dmg.cells.nucleus.CellLifeCycleAware; import dmg.cells.nucleus.CellMessageReceiver; import dmg.cells.nucleus.Reply; @@ -81,10 +83,12 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.security.auth.Subject; +import org.apache.curator.framework.recipes.leader.LeaderLatchListener; import org.dcache.auth.Subjects; import org.dcache.auth.attributes.Restriction; import org.dcache.auth.attributes.Restrictions; import org.dcache.cells.CellStub; +import org.dcache.cells.HAServiceLeadershipManager; import org.dcache.cells.MessageReply; import org.dcache.namespace.FileAttribute; import org.dcache.services.bulk.BulkRequest.Depth; @@ -101,7 +105,8 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING /** * Bulk service façade. Handles incoming messages. Handles restart reloading. */ -public final class BulkService implements CellLifeCycleAware, CellMessageReceiver { +public final class BulkService implements CellLifeCycleAware, CellMessageReceiver, + LeaderLatchListener { private static final Logger LOGGER = LoggerFactory.getLogger(BulkService.class); private static final String TARGET_COUNT_ERROR_FORMAT = "The number of targets %s exceeds " @@ -120,14 +125,18 @@ public final class BulkService implements CellLifeCycleAware, CellMessageReceive private BulkTargetStore targetStore; private BulkSubmissionHandler submissionHandler; private BulkServiceStatistics statistics; + private HAServiceLeadershipManager leadershipManager; private ExecutorService incomingExecutorService; private CellStub namespace; + private CellStub endpoint; private Depth allowedDepth; private int maxRequestsPerUser; private int maxFlatTargets; private int maxShallowTargets; private int maxRecursiveTargets; + private boolean initialized; + @Override public void afterStart() { /* @@ -141,31 +150,51 @@ public void afterStart() { * is necessary for processing request targets. */ waitForNamespace(); + } + @Override + public synchronized void isLeader() { + LOGGER.info("isLeader called"); incomingExecutorService.execute(() -> initialize()); } + @Override + public synchronized void notLeader() { + LOGGER.info("notLeader called"); + try { + incomingExecutorService.execute(() -> parkService()); + } catch (Exception e) { + LOGGER.error("notLeader: {}, cause: {}.", + e.getMessage(), String.valueOf(Throwables.getRootCause(e))); + } + } + public Reply messageArrived(BulkRequestMessage message) { LOGGER.trace("received BulkRequestMessage {}", message); MessageReply reply = new MessageReply<>(); incomingExecutorService.execute(() -> { try { - BulkRequest request = message.getRequest(); - Subject subject = message.getSubject(); - Restriction restriction = message.getRestriction(); - checkQuota(subject); - String uid = UUID.randomUUID().toString(); - request.setUid(uid); - checkRestrictions(restriction, uid); - checkActivity(request); - checkDepthConstraints(request); - requestStore.store(subject, restriction, request); - statistics.incrementRequestsReceived(request.getActivity()); - requestManager.signal(); - message.setRequestUrl(request.getUrlPrefix() + "/" + request.getUid()); - reply.reply(message); + if (!leadershipManager.hasLeadership()) { + relayToLeader(message, reply); + } else { + BulkRequest request = message.getRequest(); + Subject subject = message.getSubject(); + Restriction restriction = message.getRestriction(); + checkQuota(subject); + String uid = UUID.randomUUID().toString(); + request.setUid(uid); + checkRestrictions(restriction, uid); + checkActivity(request); + checkDepthConstraints(request); + requestStore.store(subject, restriction, request); + statistics.incrementRequestsReceived(request.getActivity()); + requestManager.signal(); + message.setRequestUrl(request.getUrlPrefix() + "/" + request.getUid()); + reply.reply(message); + } } catch (BulkServiceException e) { - LOGGER.error("messageArrived(BulkRequestMessage) {}: {}", message, e.toString()); + LOGGER.error("messageArrived(BulkRequestMessage) {}: {}", message, + e.toString()); reply.fail(message, e); } catch (Exception e) { reply.fail(message, e); @@ -178,7 +207,6 @@ public Reply messageArrived(BulkRequestMessage message) { public Reply messageArrived(BulkRequestListMessage message) { LOGGER.trace("received BulkRequestListMessage {}", message); - MessageReply reply = new MessageReply<>(); incomingExecutorService.execute(() -> { try { @@ -203,7 +231,6 @@ public Reply messageArrived(BulkRequestListMessage message) { public Reply messageArrived(BulkRequestStatusMessage message) { LOGGER.trace("received BulkRequestStatusMessage {}", message); - MessageReply reply = new MessageReply<>(); incomingExecutorService.execute(() -> { Subject subject = message.getSubject(); @@ -234,26 +261,29 @@ public Reply messageArrived(BulkRequestStatusMessage message) { public Reply messageArrived(BulkRequestCancelMessage message) { LOGGER.trace("received BulkRequestCancelMessage {}", message); - MessageReply reply = new MessageReply<>(); incomingExecutorService.execute(() -> { try { - Subject subject = message.getSubject(); - String uuid = message.getRequestUuid(); - /* - * First check to see if the request corresponds to a stored one. - */ - requestStore.getKey(uuid); - checkRestrictions(message.getRestriction(), uuid); - matchActivity(message.getActivity(), uuid); - List targetPaths = message.getTargetPaths(); - if (targetPaths == null || targetPaths.isEmpty()) { - submissionHandler.cancelRequest(subject, uuid); + if (!leadershipManager.hasLeadership()) { + relayToLeader(message, reply); } else { - validateTargets(uuid, subject, targetPaths); - submissionHandler.cancelTargets(subject, uuid, targetPaths); + Subject subject = message.getSubject(); + String uuid = message.getRequestUuid(); + /* + * First check to see if the request corresponds to a stored one. + */ + requestStore.getKey(uuid); + checkRestrictions(message.getRestriction(), uuid); + matchActivity(message.getActivity(), uuid); + List targetPaths = message.getTargetPaths(); + if (targetPaths == null || targetPaths.isEmpty()) { + submissionHandler.cancelRequest(subject, uuid); + } else { + validateTargets(uuid, subject, targetPaths); + submissionHandler.cancelTargets(subject, uuid, targetPaths); + } + reply.reply(message); } - reply.reply(message); } catch (BulkServiceException e) { LOGGER.error("messageArrived(BulkRequestCancelMessage) {}: {}", message, e.toString()); @@ -267,22 +297,29 @@ public Reply messageArrived(BulkRequestCancelMessage message) { return reply; } + public static void main(String[] args) { + System.out.println(Long.parseLong("60000L")); + } + public Reply messageArrived(BulkRequestClearMessage message) { LOGGER.trace("received BulkRequestClearMessage {}", message); - MessageReply reply = new MessageReply<>(); incomingExecutorService.execute(() -> { try { - String uuid = message.getRequestUuid(); - Subject subject = message.getSubject(); - /* - * First check to see if the request corresponds to a stored one. - */ - requestStore.getKey(uuid); - checkRestrictions(message.getRestriction(), uuid); - matchActivity(message.getActivity(), uuid); - submissionHandler.clearRequest(subject, uuid, message.isCancelIfRunning()); - reply.reply(message); + if (!leadershipManager.hasLeadership()) { + relayToLeader(message, reply); + } else { + String uuid = message.getRequestUuid(); + Subject subject = message.getSubject(); + /* + * First check to see if the request corresponds to a stored one. + */ + requestStore.getKey(uuid); + checkRestrictions(message.getRestriction(), uuid); + matchActivity(message.getActivity(), uuid); + submissionHandler.clearRequest(subject, uuid, message.isCancelIfRunning()); + reply.reply(message); + } } catch (BulkServiceException e) { LOGGER.error("messageArrived(BulkRequestClearMessage) {}: {}", message, e.toString()); @@ -298,7 +335,6 @@ public Reply messageArrived(BulkRequestClearMessage message) { public Reply messageArrived(BulkArchivedRequestInfoMessage message) { LOGGER.trace("received BulkArchivedRequestInfoMessage {}", message); - MessageReply reply = new MessageReply<>(); incomingExecutorService.execute(() -> { try { @@ -322,7 +358,6 @@ public Reply messageArrived(BulkArchivedRequestInfoMessage message) { public Reply messageArrived(BulkArchivedSummaryInfoMessage message) { LOGGER.trace("received BulkArchivedSummaryInfoMessage {}", message); - MessageReply reply = new MessageReply<>(); incomingExecutorService.execute(() -> { try { @@ -368,6 +403,16 @@ public synchronized void setAllowedDepth(Depth allowedDepth) { this.allowedDepth = allowedDepth; } + @Required + public void setEndpointStub(CellStub endpoint) { + this.endpoint = endpoint; + } + + @Required + public void setLeadershipManager(HAServiceLeadershipManager leadershipManager) { + this.leadershipManager = leadershipManager; + } + @Required public synchronized void setMaxFlatTargets(int maxFlatTargets) { this.maxFlatTargets = maxFlatTargets; @@ -435,6 +480,22 @@ private void checkActivity(BulkRequest request) throws BulkServiceException { } } + private void relayToLeader(M message, MessageReply reply) + throws Exception { + CellAddressCore cellAddressCore = leadershipManager.getLeaderAddress(); + endpoint.setDestination(cellAddressCore.toString()); + Message response = endpoint.sendAndWait(message); + if (response.getReturnCode() != 0) { + Object error = response.getErrorObject(); + if (error instanceof Exception) { + throw (Exception) error; + } + throw new Exception(String.valueOf(error)); + } else { + reply.reply(message); + } + } + private synchronized void checkDepthConstraints(BulkRequest request) throws BulkPermissionDeniedException { switch (request.getExpandDirectories()) { @@ -515,7 +576,12 @@ private synchronized void checkTargetCount(BulkRequest request) } } - private void initialize() { + private synchronized void initialize() { + if (initialized) { + LOGGER.info("Service already initialized."); + return; + } + /* * See store specifics for how reload is handled, but the minimal contract is * that all incomplete requests be reset to the QUEUED state. @@ -542,6 +608,25 @@ private void initialize() { requestManager.signal(); LOGGER.info("Service startup completed."); + + initialized = true; + } + + private synchronized void parkService() { + if (!initialized) { + LOGGER.info("Service already parked."); + return; + } + + LOGGER.info("Stopping the job manager."); + try { + requestManager.shutdown(); + } catch (Exception e) { + LOGGER.error("parkService error: {}, {}.", + e.getMessage(), String.valueOf(Throwables.getRootCause(e))); + } + + initialized = false; } private void matchActivity(String activity, String uuid) diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityFactory.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityFactory.java index e37840934bb..27f3e34711f 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityFactory.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityFactory.java @@ -115,6 +115,8 @@ public final class BulkActivityFactory implements CellMessageSender, Environment private QoSResponseReceiver qoSResponseReceiver; private CellEndpoint endpoint; + private boolean initialized; + /** * Generates an instance of the plugin-specific activity to be used by the request jobs. * @@ -157,13 +159,16 @@ public Map getProviders() { } public void initialize() { - ServiceLoader serviceLoader - = ServiceLoader.load(BulkActivityProvider.class); - for (BulkActivityProvider provider : serviceLoader) { - String activity = provider.getActivity(); - provider.setMaxPermits(maxPermits.get(activity)); - provider.configure(environment); - providers.put(provider.getActivity(), provider); + if (!initialized) { + ServiceLoader serviceLoader + = ServiceLoader.load(BulkActivityProvider.class); + for (BulkActivityProvider provider : serviceLoader) { + String activity = provider.getActivity(); + provider.setMaxPermits(maxPermits.get(activity)); + provider.configure(environment); + providers.put(provider.getActivity(), provider); + } + initialized = true; } } diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/BulkRequestManager.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/BulkRequestManager.java index d693a0ac3c1..6bdf4fa40c8 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/BulkRequestManager.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/BulkRequestManager.java @@ -89,6 +89,11 @@ public interface BulkRequestManager extends SignalAware { */ void cancelTargets(String id, List targetPaths); + /** + * Should wipe out any in-memory request state. + */ + void shutdown() throws Exception; + /** * Implementation-specific. */ diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java index 9672e568f82..168b979d7da 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java @@ -72,6 +72,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -129,6 +130,7 @@ class ConcurrentRequestProcessor implements Runnable { @Override public void run() { try { + signals.set(0); while (!Thread.interrupted()) { doRun(); await(); @@ -337,13 +339,18 @@ private ListMultimap userRequests() { */ private ConcurrentRequestProcessor processor; + /** + * For clearing state (=> HA). + */ + private Future processorFuture; + @Override public void initialize() throws Exception { requestJobs = new LinkedHashMap<>(); cancelledJobs = new HashSet<>(); schedulerProvider.initialize(); processor = new ConcurrentRequestProcessor(schedulerProvider.getRequestScheduler()); - processorExecutorService.execute(processor); + processorFuture = processorExecutorService.submit(processor); } @Override @@ -385,6 +392,16 @@ public void cancelTargets(String id, List targetPaths) { } } + @Override + public void shutdown() throws Exception { + if (processorFuture != null) { + processorFuture.cancel(true); + } + requestJobs = null; + cancelledJobs = null; + requestStore.clearCache(); + } + @Override public int countSignals() { return processor.signals.get(); diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/BulkRequestStore.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/BulkRequestStore.java index 39eb96b532a..7130d466364 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/BulkRequestStore.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/BulkRequestStore.java @@ -253,6 +253,11 @@ Optional getRequestStatus(String uid) */ boolean isRequestSubject(Subject subject, String uid) throws BulkStorageException; + /** + * Clear all entries from memory. (May be a NOP). + */ + void clearCache() throws BulkStorageException; + /** * Load the store into memory. (May be a NOP). */ diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestArchiver.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestArchiver.java index 58bc3911f4a..fd9039fdd49 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestArchiver.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestArchiver.java @@ -70,6 +70,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import org.apache.curator.framework.recipes.leader.LeaderLatchListener; import org.dcache.services.bulk.BulkRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,7 +81,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * completed. This involves storing a serialized request info object and deleting the entry from * the main tables. */ -public class JdbcBulkRequestArchiver implements Runnable, CellInfoProvider { +public class JdbcBulkRequestArchiver implements Runnable, CellInfoProvider, LeaderLatchListener { private static final Logger LOGGER = LoggerFactory.getLogger(JdbcBulkRequestArchiver.class); @@ -95,6 +96,7 @@ public class JdbcBulkRequestArchiver implements Runnable, CellInfoProvider { private TimeUnit archiverWindowUnit; private ScheduledFuture future; private long lastRunCompleted; + private boolean leader; @Override public void getInfo(PrintWriter pw) { @@ -122,14 +124,34 @@ public synchronized void reset() { } public synchronized void runNow() { + if (!leader) { + return; + } + if (future != null) { future.cancel(true); } + archiverScheduler.submit(this); + future = archiverScheduler.scheduleAtFixedRate(this, archiverPeriod, archiverPeriod, archiverPeriodUnit); } + @Override + public synchronized void isLeader() { + leader = true; + reset(); + } + + @Override + public synchronized void notLeader() { + leader = false; + if (future != null) { + future.cancel(true); + } + } + @Override public void run() { /* diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestStore.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestStore.java index 064a06361e6..ae9b3c7bbc9 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestStore.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestStore.java @@ -114,7 +114,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import org.dcache.services.bulk.BulkRequestTargetInfo; import org.dcache.services.bulk.BulkStorageException; import org.dcache.services.bulk.store.BulkRequestStore; -import org.dcache.services.bulk.store.jdbc.JdbcBulkDaoUtils; import org.dcache.services.bulk.store.jdbc.rtarget.JdbcBulkTargetStore; import org.dcache.services.bulk.store.jdbc.rtarget.JdbcRequestTargetDao; import org.dcache.services.bulk.util.BulkRequestFilter; @@ -160,7 +159,6 @@ public Optional load(String uid) throws Exception { private JdbcBulkRequestPermissionsDao requestPermissionsDao; private JdbcBulkTargetStore targetStore; private JdbcRequestTargetDao requestTargetDao; - private JdbcBulkDaoUtils utils; private long expiry; private TimeUnit expiryUnit; private long capacity; @@ -473,6 +471,11 @@ public boolean isRequestSubject(Subject subject, String uid) .equals(BulkRequestStore.uidGidKey(requestSubject.get())); } + @Override + public void clearCache() throws BulkStorageException { + requestCache.invalidateAll(); + } + /** * With the RDBMS implementation of the store, the following applies on restart: *

@@ -552,11 +555,6 @@ public void setArchiveDao(JdbcBulkArchiveDao archiveDao) { this.archiveDao = archiveDao; } - @Required - public void setBulkUtils(JdbcBulkDaoUtils utils) { - this.utils = utils; - } - @Required public void setCapacity(long capacity) { this.capacity = capacity; diff --git a/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml b/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml index 904dfa01a0e..08076c1e0d7 100644 --- a/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml +++ b/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml @@ -38,6 +38,12 @@ + + Endpoint communication stub + + + + Maintains runtime information about all pools @@ -125,6 +131,7 @@ + @@ -187,7 +194,6 @@ - @@ -317,6 +323,8 @@ + + @@ -351,7 +359,7 @@ + destroy-method="shutdown"> @@ -364,4 +372,23 @@ + + + Coordinates which bulk service handles the requests + + + + + + + + Propagates leadership change notifications to managed listeners + + + + + + + diff --git a/modules/dcache/src/main/java/org/dcache/cells/HAServiceLeadershipManager.java b/modules/dcache/src/main/java/org/dcache/cells/HAServiceLeadershipManager.java index a51f7d2cd9a..26eae15938e 100644 --- a/modules/dcache/src/main/java/org/dcache/cells/HAServiceLeadershipManager.java +++ b/modules/dcache/src/main/java/org/dcache/cells/HAServiceLeadershipManager.java @@ -80,6 +80,10 @@ public void setLeadershipListener(LeaderLatchListener leadershipListener) { this.leadershipListener = leadershipListener; } + public CellAddressCore getLeaderAddress() throws Exception { + return new CellAddressCore(zkLeaderLatch.getLeader().getId()); + } + public void shutdown() { if (zkLeaderLatch != null) { CloseableUtils.closeQuietly(zkLeaderLatch); diff --git a/skel/share/defaults/bulk.properties b/skel/share/defaults/bulk.properties index b6269940500..e1f09cd01b8 100644 --- a/skel/share/defaults/bulk.properties +++ b/skel/share/defaults/bulk.properties @@ -23,7 +23,18 @@ bulk.cell.subscribe=${bulk.pool-monitor.topic},${bulk.qos-transition-topic} # # This property indicates if this service is replicable. # -(immutable)bulk.cell.replicable = false +# Bulk physical instances should all run over the same database instance. +# HA will use leader semantics to guarantee that only one of those instances +# is actually processing, i.e., storing, running, canceling or clearing requests. +# The other instances are free to query for request information or status. +# +# It is the administrator's responsibility to ensure that all +# bulk service instances: +# +# o have a consistent dCache 'bulk' configuration, +# o share the same database. +# +(immutable)bulk.cell.replicable = true # ---- Global setting for directory expansion. # @@ -154,6 +165,11 @@ bulk.service.qos=${dcache.service.qos} bulk.service.qos.timeout=1 (one-of?MILLISECONDS|SECONDS|MINUTES|HOURS|DAYS)bulk.service.qos.timeout.unit=MINUTES +# ---- How long to wait for a response from the HA leader. +# +bulk.service.ha-leader.timeout=1 +(one-of?MILLISECONDS|SECONDS|MINUTES|HOURS|DAYS)bulk.service.ha-leader.timeout.unit=MINUTES + # Topic on which to expect pool monitor updates # bulk.pool-monitor.topic=${dcache.pool-monitor.topic} diff --git a/skel/share/services/bulk.batch b/skel/share/services/bulk.batch index 33c7af13597..2acaa4ba60b 100644 --- a/skel/share/services/bulk.batch +++ b/skel/share/services/bulk.batch @@ -38,6 +38,8 @@ check -strong bulk.service.poolmanager.timeout.unit check -strong bulk.service.qos check -strong bulk.service.qos.timeout check -strong bulk.service.qos.timeout.unit +check -strong bulk.service.ha-leader.timeout +check -strong bulk.service.ha-leader.timeout.unit check -strong bulk.pool-monitor.topic check -strong bulk.qos-transition-topic check -strong bulk.db.connections.max