From 8d2be7178cc22ebdbe5e5f9f82556e0b59fff7e5 Mon Sep 17 00:00:00 2001 From: Grant Palau Spencer Date: Thu, 26 Sep 2024 12:08:52 -0700 Subject: [PATCH 1/2] Add auto deregistration of offline participants after timeout --- .../controller/GenericHelixController.java | 2 + .../controller/pipeline/AsyncWorkerType.java | 3 +- .../ParticipantDeregistrationStage.java | 116 ++++++++ .../org/apache/helix/model/ClusterConfig.java | 22 +- .../org/apache/helix/common/ZkTestBase.java | 24 ++ .../TestParticipantDeregistrationStage.java | 267 ++++++++++++++++++ ...eWhenRequireDelayedRebalanceOverwrite.java | 8 +- .../integration/TestForceKillInstance.java | 38 +-- 8 files changed, 450 insertions(+), 30 deletions(-) create mode 100644 helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java create mode 100644 helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index c641b4d71f..e63981d29a 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -88,6 +88,7 @@ import org.apache.helix.controller.stages.MessageGenerationPhase; import org.apache.helix.controller.stages.MessageSelectionStage; import org.apache.helix.controller.stages.MessageThrottleStage; +import org.apache.helix.controller.stages.ParticipantDeregistrationStage; import org.apache.helix.controller.stages.PersistAssignmentStage; import org.apache.helix.controller.stages.ReadClusterDataStage; import org.apache.helix.controller.stages.ResourceComputationStage; @@ -534,6 +535,7 @@ private static PipelineRegistry createDefaultRegistry(String pipelineName) { rebalancePipeline.addStage(new ResourceMessageDispatchStage()); rebalancePipeline.addStage(new PersistAssignmentStage()); rebalancePipeline.addStage(new TargetExteralViewCalcStage()); + rebalancePipeline.addStage(new ParticipantDeregistrationStage()); // external view generation Pipeline externalViewPipeline = new Pipeline(pipelineName); diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java index a1afb95f2a..ecbe7eb0c4 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java +++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java @@ -33,5 +33,6 @@ public enum AsyncWorkerType { ExternalViewComputeWorker, MaintenanceRecoveryWorker, TaskJobPurgeWorker, - CustomizedStateViewComputeWorker + CustomizedStateViewComputeWorker, + ParticipantDeregistrationWorker } diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java new file mode 100644 index 0000000000..1cd7d34ec6 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java @@ -0,0 +1,116 @@ +package org.apache.helix.controller.stages; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.helix.HelixException; +import org.apache.helix.HelixManager; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage; +import org.apache.helix.controller.pipeline.AsyncWorkerType; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.ParticipantHistory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import static org.apache.helix.util.RebalanceUtil.scheduleOnDemandPipeline; + + +public class ParticipantDeregistrationStage extends AbstractAsyncBaseStage { + private static final Logger LOG = LoggerFactory.getLogger(ParticipantDeregistrationStage.class); + + @Override + public AsyncWorkerType getAsyncWorkerType() { + return AsyncWorkerType.ParticipantDeregistrationWorker; + } + + @Override + public void execute(ClusterEvent event) throws Exception { + HelixManager manager = event.getAttribute(AttributeName.helixmanager.name()); + ClusterConfig clusterConfig = manager.getConfigAccessor().getClusterConfig(manager.getClusterName()); + if (clusterConfig == null || !clusterConfig.isParticipantDeregistrationEnabled()) { + LOG.info("Cluster config is null or participant deregistration is not enabled. " + + "Skipping participant deregistration."); + return; + } + + ResourceControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name()); + Map offlineTimeMap = cache.getInstanceOfflineTimeMap(); + long deregisterDelay = clusterConfig.getParticipantDeregistrationTimeout(); + long stageStartTime = System.currentTimeMillis(); + Set participantsToDeregister = new HashSet<>(); + long earliestDeregisterTime = Long.MAX_VALUE; + + + for (Map.Entry entry : offlineTimeMap.entrySet()) { + String instanceName = entry.getKey(); + Long offlineTime = entry.getValue(); + long deregisterTime = offlineTime + deregisterDelay; + + // Skip if instance is still online + if (offlineTime == ParticipantHistory.ONLINE) { + continue; + } + + // If deregister time is in the past, deregister the instance + if (deregisterTime <= stageStartTime) { + participantsToDeregister.add(instanceName); + } else { + // Otherwise, find the next earliest deregister time + if (deregisterTime < earliestDeregisterTime) { + earliestDeregisterTime = deregisterTime; + } + } + } + + if (!participantsToDeregister.isEmpty()) { + Set successfullyDeregisteredParticipants = + deregisterParticipants(manager, cache, participantsToDeregister); + if (!successfullyDeregisteredParticipants.isEmpty()) { + LOG.info("Successfully deregistered {} participants from cluster {}", + successfullyDeregisteredParticipants.size(), cache.getClusterName()); + } + } + // Schedule the next deregister task + if (earliestDeregisterTime != Long.MAX_VALUE) { + long delay = earliestDeregisterTime - stageStartTime; + scheduleOnDemandPipeline(manager.getClusterName(), delay); + } + } + + private Set deregisterParticipants(HelixManager manager, ResourceControllerDataProvider cache, + Set instancesToDeregister) { + Set successfullyDeregisteredInstances = new HashSet<>(); + + if (manager == null || !manager.isConnected() || cache == null || instancesToDeregister == null) { + LOG.info("ParticipantDeregistrationStage failed due to HelixManager being null or not connected!"); + return successfullyDeregisteredInstances; + } + + // Perform safety checks before deregistering the instances + for (String instanceName : instancesToDeregister) { + InstanceConfig instanceConfig = cache.getInstanceConfigMap().get(instanceName); + LiveInstance liveInstance = cache.getLiveInstances().get(instanceName); + + if (instanceConfig == null) { + LOG.debug("Instance config is null for instance {}, skip deregistering the instance", instanceName); + continue; + } + + if (liveInstance != null) { + LOG.debug("Instance {} is still alive, skip deregistering the instance", instanceName); + continue; + } + + try { + manager.getClusterManagmentTool().dropInstance(cache.getClusterName(), instanceConfig); + successfullyDeregisteredInstances.add(instanceName); + } catch (HelixException e) { + LOG.error("Failed to deregister instance {} from cluster {}", instanceName, cache.getClusterName(), e); + } + } + + return successfullyDeregisteredInstances; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java index edb7a76c6d..fc8a5b34c5 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java @@ -165,7 +165,9 @@ public enum ClusterConfigProperty { LAST_ON_DEMAND_REBALANCE_TIMESTAMP, // List of Preferred scoring keys used in evenness score computation - PREFERRED_SCORING_KEYS + PREFERRED_SCORING_KEYS, + PARTICIPANT_DEREGISTRATION_ENABLED, + PARTICIPANT_DEREGISTRATION_TIMEOUT } public enum GlobalRebalancePreferenceKey { @@ -1255,4 +1257,22 @@ public void setPreferredScoringKeys(List preferredScoringKeys) { _record.setListField(ClusterConfigProperty.PREFERRED_SCORING_KEYS.name(), preferredScoringKeys); } + + public boolean isParticipantDeregistrationEnabled() { + return _record.getBooleanField(ClusterConfigProperty.PARTICIPANT_DEREGISTRATION_ENABLED.name(), + false); + } + + public void setParticipantDeregistrationEnabled(boolean enabled) { + _record.setBooleanField(ClusterConfigProperty.PARTICIPANT_DEREGISTRATION_ENABLED.name(), enabled); + } + + public long getParticipantDeregistrationTimeout() { + return _record.getLongField(ClusterConfigProperty.PARTICIPANT_DEREGISTRATION_TIMEOUT.name(), + -1); + } + + public void setParticipantDeregistrationTimeout(long timeout) { + _record.setLongField(ClusterConfigProperty.PARTICIPANT_DEREGISTRATION_TIMEOUT.name(), timeout); + } } diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java index a265605185..278cdfc7ac 100644 --- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java @@ -55,6 +55,7 @@ import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; import org.apache.helix.controller.stages.AttributeName; import org.apache.helix.controller.stages.ClusterEvent; +import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZNRecordSerializer; @@ -830,4 +831,27 @@ public String getClusterName() { return _clusterName; } } + + public MockParticipantManager addParticipant(String cluster, String instanceName) { + _gSetupTool.addInstanceToCluster(cluster, instanceName); + MockParticipantManager toAddParticipant = + new MockParticipantManager(ZK_ADDR, cluster, instanceName); + toAddParticipant.syncStart(); + return toAddParticipant; + } + + public void dropParticipant(String cluster, MockParticipantManager participant) { + if (participant == null) { + return; + } + + try { + participant.syncStop(); + InstanceConfig instanceConfig = + _gSetupTool.getClusterManagementTool().getInstanceConfig(cluster, participant.getInstanceName()); + _gSetupTool.getClusterManagementTool().dropInstance(cluster, instanceConfig); + } catch (Exception e) { + LOG.warn("Error dropping participant " + participant.getInstanceName(), e); + } + } } diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java new file mode 100644 index 0000000000..5512b66712 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java @@ -0,0 +1,267 @@ +package org.apache.helix.controller.stages; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.TestHelper; +import org.apache.helix.common.ZkTestBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.manager.zk.ZKHelixManager; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.HelixConfigScope; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.ParticipantHistory; +import org.apache.helix.model.builder.HelixConfigScopeBuilder; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TestParticipantDeregistrationStage extends ZkTestBase { + final static long DEREGISTER_TIMEOUT = 5000; + protected final String CLASS_NAME = getShortClassName(); + protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + private static final int NUM_NODES = 5; + private List _participants = new ArrayList<>(); + private HelixAdmin _admin; + private HelixDataAccessor _dataAccessor; + private ClusterControllerManager _controller; + private ConfigAccessor _configAccessor; + + @BeforeClass + public void beforeClass() { + System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + _gSetupTool.addCluster(CLUSTER_NAME, true); + + for (int i = 0; i < NUM_NODES; i++) { + String instanceName = PARTICIPANT_PREFIX + "_" + i; + addParticipant(CLUSTER_NAME, instanceName); + } + + _configAccessor = new ConfigAccessor(_gZkClient); + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + _admin = _gSetupTool.getClusterManagementTool(); + _dataAccessor = _controller.getHelixDataAccessor(); + + setAutoDeregisterConfigs(CLUSTER_NAME, true, DEREGISTER_TIMEOUT); + } + + // Asserts that a node will be removed from the cluster after it exceedsthe deregister timeout set in the cluster config + @Test + public void testParticipantAutoLeavesAfterOfflineTimeout() throws Exception { + System.out.println("START " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + + new Date(System.currentTimeMillis())); + + MockParticipantManager participantToDeregister = _participants.get(0); + participantToDeregister.syncStop(); + boolean result = TestHelper.verify(() -> !_admin.getInstancesInCluster(CLUSTER_NAME) + .contains(participantToDeregister.getInstanceName()), TestHelper.WAIT_DURATION); + Assert.assertTrue(result, "Participant should have been deregistered"); + + dropParticipant(CLUSTER_NAME, participantToDeregister); + addParticipant(CLUSTER_NAME, participantToDeregister.getInstanceName()); + System.out.println("END " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + + new Date(System.currentTimeMillis())); + } + + // Asserts that will not be removed from the cluster if it comes back online before the deregister timeout + // and that the deregister timeout is reset, so the node will not be removed until time + // of last offline + deregister timeout + @Test (dependsOnMethods = "testParticipantAutoLeavesAfterOfflineTimeout") + public void testReconnectedParticipantNotDeregisteredWhenLive() throws Exception { + System.out.println("START " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + + new Date(System.currentTimeMillis())); + + MockParticipantManager participantToDeregister = _participants.get(0); + // Kill instance so deregister is scheduled + LiveInstance liveInstance = _dataAccessor.getProperty( + _dataAccessor.keyBuilder().liveInstance(participantToDeregister.getInstanceName())); + participantToDeregister.syncStop(); + + // Sleep for half the deregister timeout + Thread.sleep(DEREGISTER_TIMEOUT * 3/5); + + // Manually recreate live instance so controller thinks it's back online + // This should prevent the node from being deregistered + _dataAccessor.setProperty(_dataAccessor.keyBuilder().liveInstance(participantToDeregister.getInstanceName()), + liveInstance); + + // assert that the instance is still in the cluster + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() < startTime + DEREGISTER_TIMEOUT) { + Assert.assertTrue(_admin.getInstancesInCluster(CLUSTER_NAME) + .contains(participantToDeregister.getInstanceName()), "Participant should not have been deregistered"); + } + + // Re kill and assert that the instance is deregistered + _dataAccessor.removeProperty(_dataAccessor.keyBuilder().liveInstance(participantToDeregister.getInstanceName())); + boolean result = TestHelper.verify(() -> !_admin.getInstancesInCluster(CLUSTER_NAME) + .contains(participantToDeregister.getInstanceName()), TestHelper.WAIT_DURATION); + Assert.assertTrue(result, "Participants should have been deregistered. Participants to deregister: " + + participantToDeregister + " Remaining participants: in cluster " + _admin.getInstancesInCluster(CLUSTER_NAME)); + + dropParticipant(CLUSTER_NAME, participantToDeregister); + addParticipant(CLUSTER_NAME, participantToDeregister.getInstanceName()); + System.out.println("END " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + + new Date(System.currentTimeMillis())); + } + + // Same assertions as above but this time the node is re-killed immediately after being added back + @Test (dependsOnMethods = "testReconnectedParticipantNotDeregisteredWhenLive") + public void testFlappingParticipantIsNotDeregistered() throws Exception { + System.out.println("START " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + + new Date(System.currentTimeMillis())); + + MockParticipantManager participantToDeregister = _participants.get(0); + // Kill instance so deregister is scheduled + LiveInstance liveInstance = _dataAccessor.getProperty( + _dataAccessor.keyBuilder().liveInstance(participantToDeregister.getInstanceName())); + participantToDeregister.syncStop(); + + // Sleep for more than half the deregister timeout + Thread.sleep(DEREGISTER_TIMEOUT * 3/5); + + // Manually recreate live instance so controller thinks it's back online, then immediately delete + _dataAccessor.setProperty(_dataAccessor.keyBuilder().liveInstance(participantToDeregister.getInstanceName()), + liveInstance); + ParticipantHistory participantHistory = _dataAccessor.getProperty(_dataAccessor.keyBuilder() + .participantHistory(participantToDeregister.getInstanceName())); + participantHistory.reportOnline("foo", "bar"); + _dataAccessor.setProperty(_dataAccessor.keyBuilder().participantHistory(participantToDeregister.getInstanceName()), + participantHistory); + + _dataAccessor.removeProperty(_dataAccessor.keyBuilder().liveInstance(participantToDeregister.getInstanceName())); + + // assert that the instance is still in the cluster after original deregistration time should have passed + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() < startTime + (DEREGISTER_TIMEOUT * 3/5)) { + Assert.assertTrue(_admin.getInstancesInCluster(CLUSTER_NAME) + .contains(participantToDeregister.getInstanceName()), "Participant should not have been deregistered"); + } + + // Re kill and assert that the instance is deregistered + _dataAccessor.removeProperty(_dataAccessor.keyBuilder().liveInstance(participantToDeregister.getInstanceName())); + boolean result = TestHelper.verify(() -> !_admin.getInstancesInCluster(CLUSTER_NAME) + .contains(participantToDeregister.getInstanceName()), TestHelper.WAIT_DURATION); + Assert.assertTrue(result, "Participants should have been deregistered. Participants to deregister: " + + participantToDeregister + " Remaining participants: in cluster " + _admin.getInstancesInCluster(CLUSTER_NAME)); + + dropParticipant(CLUSTER_NAME, participantToDeregister); + addParticipant(CLUSTER_NAME, participantToDeregister.getInstanceName()); + System.out.println("END " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + + new Date(System.currentTimeMillis())); + } + + // Tests enabling deregister will trigger deregister for participants that were already offline + @Test (dependsOnMethods = "testFlappingParticipantIsNotDeregistered") + public void testDeregisterAfterConfigEnabled() throws Exception { + System.out.println("START " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + + new Date(System.currentTimeMillis())); + + // Set to deregister to disabled + long testDeregisterTimeout = 1000; + setAutoDeregisterConfigs(CLUSTER_NAME, false, testDeregisterTimeout); + + // Create and immediately kill participants + List killedParticipants = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + MockParticipantManager participantToKill = addParticipant(CLUSTER_NAME, "participants_to_kill_" + i); + participantToKill.syncStop(); + killedParticipants.add(participantToKill); + } + + // Sleep so that participant offline time exceeds deregister timeout + Thread.sleep(testDeregisterTimeout); + // Trigger on disable --> enable deregister + setAutoDeregisterConfigs(CLUSTER_NAME, true, testDeregisterTimeout); + + // Assert participants have been deregistered + boolean result = TestHelper.verify(() -> { + List instances = _admin.getInstancesInCluster(CLUSTER_NAME); + return killedParticipants.stream().noneMatch(participant -> instances.contains(participant.getInstanceName())); + }, TestHelper.WAIT_DURATION); + Assert.assertTrue(result, "Participants should have been deregistered. Participants to deregister: " + + killedParticipants + " Remaining participants: in cluster " + _admin.getInstancesInCluster(CLUSTER_NAME)); + + // reset cluster state + killedParticipants.forEach(participant -> { + dropParticipant(CLUSTER_NAME, participant); + }); + System.out.println("END " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + + new Date(System.currentTimeMillis())); + } + + // Tests shortening deregister timeout will trigger deregister and also deregister participants that now exceed + // the new (shorter) timeout + @Test (dependsOnMethods = "testDeregisterAfterConfigEnabled") + public void testDeregisterAfterConfigTimeoutShortened() throws Exception { + System.out.println("START " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + + new Date(System.currentTimeMillis())); + long longDeregisterTimeout = 1000*60*60*24; + long shortDeregisterTimeout = 1000; + setAutoDeregisterConfigs(CLUSTER_NAME, true, longDeregisterTimeout); + + // Create and immediately kill participants + List killedParticipants = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + MockParticipantManager participantToKill = addParticipant(CLUSTER_NAME, "participants_to_kill_" + i); + participantToKill.syncStop(); + killedParticipants.add(participantToKill); + } + + // Sleep so that participant offline time exceeds deregister timeout + Thread.sleep(shortDeregisterTimeout); + + // Trigger on shorten deregister timeout + setAutoDeregisterConfigs(CLUSTER_NAME, true, shortDeregisterTimeout); + + // Assert participants have been deregistered + boolean result = TestHelper.verify(() -> { + List instances = _admin.getInstancesInCluster(CLUSTER_NAME); + return killedParticipants.stream().noneMatch(participant -> instances.contains(participant.getInstanceName())); + }, TestHelper.WAIT_DURATION); + Assert.assertTrue(result, "Participants should have been deregistered. Participants to deregister: " + + killedParticipants + " Remaining participants: in cluster " + _admin.getInstancesInCluster(CLUSTER_NAME)); + + // reset cluster state + killedParticipants.forEach(participant -> { + dropParticipant(CLUSTER_NAME, participant); + }); + System.out.println("END " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + + new Date(System.currentTimeMillis())); + } + + @Override + public void dropParticipant(String clusterName, MockParticipantManager participant) { + _participants.remove(participant); + super.dropParticipant(clusterName, participant); + } + + @Override + public MockParticipantManager addParticipant(String clusterName, String instanceName) { + MockParticipantManager toAddParticipant = super.addParticipant(clusterName, instanceName); + _participants.add(toAddParticipant); + return toAddParticipant; + } + + private void setAutoDeregisterConfigs(String clusterName, boolean enabled, long timeout) { + ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName); + clusterConfig.setParticipantDeregistrationEnabled(enabled); + clusterConfig.setParticipantDeregistrationTimeout(timeout); + _configAccessor.setClusterConfig(clusterName, clusterConfig); + // Allow participant to ensure compatibility with nodes re-joining when they re-establish connection + HelixConfigScope scope = + new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster( + CLUSTER_NAME).build(); + _configAccessor.set(scope, ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, "true"); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddResourceWhenRequireDelayedRebalanceOverwrite.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddResourceWhenRequireDelayedRebalanceOverwrite.java index 7c93c33b69..c2b57f3904 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestAddResourceWhenRequireDelayedRebalanceOverwrite.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddResourceWhenRequireDelayedRebalanceOverwrite.java @@ -110,12 +110,10 @@ public void testAddResourceWhenInstancesDisabledWithinWindow() { + new Date(System.currentTimeMillis())); } - private MockParticipantManager addParticipant(String cluster, String instanceName) { - _gSetupTool.addInstanceToCluster(cluster, instanceName); - MockParticipantManager toAddParticipant = - new MockParticipantManager(ZK_ADDR, cluster, instanceName); + @Override + public MockParticipantManager addParticipant(String clusterName, String instanceName) { + MockParticipantManager toAddParticipant = super.addParticipant(clusterName, instanceName); _participants.add(toAddParticipant); - toAddParticipant.syncStart(); return toAddParticipant; } diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestForceKillInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestForceKillInstance.java index d7dc442054..c5946a331f 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestForceKillInstance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestForceKillInstance.java @@ -132,7 +132,7 @@ public void testForceKillDropsAssignment() { "Instance should not have any assignments"); // Reset state of cluster - dropParticipant(CLUSTER_NAME, instanceToKillName); + dropParticipant(CLUSTER_NAME, instanceToKill); addParticipant(CLUSTER_NAME, instanceToKillName); System.out.println("END " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + new Date(System.currentTimeMillis())); @@ -166,7 +166,7 @@ public void testSessionExpiration() throws Exception { "Instance should not have any assignments"); // Reset state of cluster - dropParticipant(CLUSTER_NAME, instanceToKillName); + dropParticipant(CLUSTER_NAME, instanceToKill); addParticipant(CLUSTER_NAME, instanceToKillName); System.out.println("END " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + new Date(System.currentTimeMillis())); @@ -203,7 +203,7 @@ public void testDisconnectReconnect() { "Instance should not have any assignments"); // Reset state of cluster - dropParticipant(CLUSTER_NAME, instanceToKillName); + dropParticipant(CLUSTER_NAME, instanceToKill); addParticipant(CLUSTER_NAME, instanceToKillName); System.out.println("END " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + new Date(System.currentTimeMillis())); @@ -236,7 +236,7 @@ public void testRemoveUnknownOperationAfterForceKill() { "Instance should not have any assignments"); // Reset state of cluster - dropParticipant(CLUSTER_NAME, instanceToKillName); + dropParticipant(CLUSTER_NAME, instanceToKill); addParticipant(CLUSTER_NAME, instanceToKillName); System.out.println("END " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + new Date(System.currentTimeMillis())); @@ -271,7 +271,7 @@ public void testSessionExpirationWithoutUnknownOperation() throws Exception { "Instance should have assignments"); // Reset state of cluster - dropParticipant(CLUSTER_NAME, instanceToKillName); + dropParticipant(CLUSTER_NAME, instanceToKill); addParticipant(CLUSTER_NAME, instanceToKillName); System.out.println("END " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + new Date(System.currentTimeMillis())); @@ -309,7 +309,7 @@ public void testDisconnectReconnectWithoutUnknownOperation() { "Instance should not have any assignments"); // Reset state of cluster - dropParticipant(CLUSTER_NAME, instanceToKillName); + dropParticipant(CLUSTER_NAME, instanceToKill); addParticipant(CLUSTER_NAME, instanceToKillName); System.out.println("END " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + new Date(System.currentTimeMillis())); @@ -341,7 +341,7 @@ public void testLiveInstanceZNodeImmediatelyRecreated() { _dataAccessor.removeProperty(_dataAccessor.keyBuilder().liveInstance(instanceToKillName)); // Reset state of cluster - dropParticipant(CLUSTER_NAME, instanceToKillName); + dropParticipant(CLUSTER_NAME, instanceToKill); addParticipant(CLUSTER_NAME, instanceToKillName); System.out.println("END " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + new Date(System.currentTimeMillis())); @@ -375,7 +375,7 @@ public void testDownwardStateTransitionsBlocked() throws Exception { "Instance should not have any assignments"); // Reset state of cluster - dropParticipant(CLUSTER_NAME, instanceToKillName); + dropParticipant(CLUSTER_NAME, instanceToKill); addParticipant(CLUSTER_NAME, instanceToKillName); System.out.println("END " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + new Date(System.currentTimeMillis())); @@ -411,13 +411,14 @@ public void testForceKillWithBlockedDownwardStateTransition() throws Exception { "Instance should not have any assignments"); // Reset state of cluster - dropParticipant(CLUSTER_NAME, instanceToKillName); + dropParticipant(CLUSTER_NAME, instanceToKill); addParticipant(CLUSTER_NAME, instanceToKillName); System.out.println("END " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName() + " at " + new Date(System.currentTimeMillis())); } - private MockParticipantManager addParticipant(String cluster, String instanceName) { + @Override + public MockParticipantManager addParticipant(String cluster, String instanceName) { _gSetupTool.addInstanceToCluster(cluster, instanceName); MockParticipantManager toAddParticipant = new MockParticipantManager(ZK_ADDR, cluster, instanceName); @@ -429,19 +430,10 @@ private MockParticipantManager addParticipant(String cluster, String instanceNam return toAddParticipant; } - protected void dropParticipant(String cluster, String instanceName) { - // find mock participant manager with instanceName and remove it from _mockParticipantManagers. - MockParticipantManager toRemoveManager = _participants.stream() - .filter(manager -> manager.getInstanceName().equals(instanceName)) - .findFirst() - .orElse(null); - if (toRemoveManager != null) { - toRemoveManager.syncStop(); - _participants.remove(toRemoveManager); - } - - InstanceConfig instanceConfig = _gSetupTool.getClusterManagementTool().getInstanceConfig(cluster, instanceName); - _gSetupTool.getClusterManagementTool().dropInstance(cluster, instanceConfig); + @Override + public void dropParticipant(String cluster, MockParticipantManager participant) { + _participants.remove(participant); + super.dropParticipant(cluster, participant); } private Map getEVs() { From 45190e9d6ef6817c73fc365a2637fa1d0b13a0c0 Mon Sep 17 00:00:00 2001 From: Grant Palau Spencer Date: Wed, 20 Nov 2024 16:04:37 -0800 Subject: [PATCH 2/2] respond reviewer feedback --- .../stages/ParticipantDeregistrationStage.java | 14 ++++++-------- .../java/org/apache/helix/model/ClusterConfig.java | 14 ++++---------- .../stages/TestParticipantDeregistrationStage.java | 13 ++++++------- 3 files changed, 16 insertions(+), 25 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java index 1cd7d34ec6..a6464a42a0 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java @@ -40,7 +40,7 @@ public void execute(ClusterEvent event) throws Exception { long deregisterDelay = clusterConfig.getParticipantDeregistrationTimeout(); long stageStartTime = System.currentTimeMillis(); Set participantsToDeregister = new HashSet<>(); - long earliestDeregisterTime = Long.MAX_VALUE; + long nextDeregisterTime = Long.MAX_VALUE; for (Map.Entry entry : offlineTimeMap.entrySet()) { @@ -49,7 +49,7 @@ public void execute(ClusterEvent event) throws Exception { long deregisterTime = offlineTime + deregisterDelay; // Skip if instance is still online - if (offlineTime == ParticipantHistory.ONLINE) { + if (cache.getLiveInstances().containsKey(instanceName)) { continue; } @@ -58,9 +58,7 @@ public void execute(ClusterEvent event) throws Exception { participantsToDeregister.add(instanceName); } else { // Otherwise, find the next earliest deregister time - if (deregisterTime < earliestDeregisterTime) { - earliestDeregisterTime = deregisterTime; - } + nextDeregisterTime = Math.min(nextDeregisterTime, deregisterTime); } } @@ -73,8 +71,8 @@ public void execute(ClusterEvent event) throws Exception { } } // Schedule the next deregister task - if (earliestDeregisterTime != Long.MAX_VALUE) { - long delay = earliestDeregisterTime - stageStartTime; + if (nextDeregisterTime != Long.MAX_VALUE) { + long delay = Math.max(nextDeregisterTime - System.currentTimeMillis(), 0); scheduleOnDemandPipeline(manager.getClusterName(), delay); } } @@ -107,7 +105,7 @@ private Set deregisterParticipants(HelixManager manager, ResourceControl manager.getClusterManagmentTool().dropInstance(cache.getClusterName(), instanceConfig); successfullyDeregisteredInstances.add(instanceName); } catch (HelixException e) { - LOG.error("Failed to deregister instance {} from cluster {}", instanceName, cache.getClusterName(), e); + LOG.warn("Failed to deregister instance {} from cluster {}", instanceName, cache.getClusterName(), e); } } diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java index fc8a5b34c5..56cde31430 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java @@ -166,7 +166,6 @@ public enum ClusterConfigProperty { // List of Preferred scoring keys used in evenness score computation PREFERRED_SCORING_KEYS, - PARTICIPANT_DEREGISTRATION_ENABLED, PARTICIPANT_DEREGISTRATION_TIMEOUT } @@ -1258,15 +1257,6 @@ public void setPreferredScoringKeys(List preferredScoringKeys) { preferredScoringKeys); } - public boolean isParticipantDeregistrationEnabled() { - return _record.getBooleanField(ClusterConfigProperty.PARTICIPANT_DEREGISTRATION_ENABLED.name(), - false); - } - - public void setParticipantDeregistrationEnabled(boolean enabled) { - _record.setBooleanField(ClusterConfigProperty.PARTICIPANT_DEREGISTRATION_ENABLED.name(), enabled); - } - public long getParticipantDeregistrationTimeout() { return _record.getLongField(ClusterConfigProperty.PARTICIPANT_DEREGISTRATION_TIMEOUT.name(), -1); @@ -1275,4 +1265,8 @@ public long getParticipantDeregistrationTimeout() { public void setParticipantDeregistrationTimeout(long timeout) { _record.setLongField(ClusterConfigProperty.PARTICIPANT_DEREGISTRATION_TIMEOUT.name(), timeout); } + + public boolean isParticipantDeregistrationEnabled() { + return getParticipantDeregistrationTimeout() > -1; + } } diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java index 5512b66712..7e9bfabca2 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java @@ -52,7 +52,7 @@ public void beforeClass() { _admin = _gSetupTool.getClusterManagementTool(); _dataAccessor = _controller.getHelixDataAccessor(); - setAutoDeregisterConfigs(CLUSTER_NAME, true, DEREGISTER_TIMEOUT); + setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT); } // Asserts that a node will be removed from the cluster after it exceedsthe deregister timeout set in the cluster config @@ -169,7 +169,7 @@ public void testDeregisterAfterConfigEnabled() throws Exception { // Set to deregister to disabled long testDeregisterTimeout = 1000; - setAutoDeregisterConfigs(CLUSTER_NAME, false, testDeregisterTimeout); + setAutoDeregisterConfigs(CLUSTER_NAME, -1); // Create and immediately kill participants List killedParticipants = new ArrayList<>(); @@ -182,7 +182,7 @@ public void testDeregisterAfterConfigEnabled() throws Exception { // Sleep so that participant offline time exceeds deregister timeout Thread.sleep(testDeregisterTimeout); // Trigger on disable --> enable deregister - setAutoDeregisterConfigs(CLUSTER_NAME, true, testDeregisterTimeout); + setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT); // Assert participants have been deregistered boolean result = TestHelper.verify(() -> { @@ -208,7 +208,7 @@ public void testDeregisterAfterConfigTimeoutShortened() throws Exception { + new Date(System.currentTimeMillis())); long longDeregisterTimeout = 1000*60*60*24; long shortDeregisterTimeout = 1000; - setAutoDeregisterConfigs(CLUSTER_NAME, true, longDeregisterTimeout); + setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT); // Create and immediately kill participants List killedParticipants = new ArrayList<>(); @@ -222,7 +222,7 @@ public void testDeregisterAfterConfigTimeoutShortened() throws Exception { Thread.sleep(shortDeregisterTimeout); // Trigger on shorten deregister timeout - setAutoDeregisterConfigs(CLUSTER_NAME, true, shortDeregisterTimeout); + setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT); // Assert participants have been deregistered boolean result = TestHelper.verify(() -> { @@ -253,9 +253,8 @@ public MockParticipantManager addParticipant(String clusterName, String instance return toAddParticipant; } - private void setAutoDeregisterConfigs(String clusterName, boolean enabled, long timeout) { + private void setAutoDeregisterConfigs(String clusterName, long timeout) { ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName); - clusterConfig.setParticipantDeregistrationEnabled(enabled); clusterConfig.setParticipantDeregistrationTimeout(timeout); _configAccessor.setClusterConfig(clusterName, clusterConfig); // Allow participant to ensure compatibility with nodes re-joining when they re-establish connection