Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add auto deregistration of offline participants after timeout #2932

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.helix.controller.stages.MessageGenerationPhase;
import org.apache.helix.controller.stages.MessageSelectionStage;
import org.apache.helix.controller.stages.MessageThrottleStage;
import org.apache.helix.controller.stages.ParticipantDeregistrationStage;
import org.apache.helix.controller.stages.PersistAssignmentStage;
import org.apache.helix.controller.stages.ReadClusterDataStage;
import org.apache.helix.controller.stages.ResourceComputationStage;
Expand Down Expand Up @@ -534,6 +535,7 @@ private static PipelineRegistry createDefaultRegistry(String pipelineName) {
rebalancePipeline.addStage(new ResourceMessageDispatchStage());
rebalancePipeline.addStage(new PersistAssignmentStage());
rebalancePipeline.addStage(new TargetExteralViewCalcStage());
rebalancePipeline.addStage(new ParticipantDeregistrationStage());

// external view generation
Pipeline externalViewPipeline = new Pipeline(pipelineName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,6 @@ public enum AsyncWorkerType {
ExternalViewComputeWorker,
MaintenanceRecoveryWorker,
TaskJobPurgeWorker,
CustomizedStateViewComputeWorker
CustomizedStateViewComputeWorker,
ParticipantDeregistrationWorker
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package org.apache.helix.controller.stages;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
import org.apache.helix.controller.pipeline.AsyncWorkerType;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.ParticipantHistory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.helix.util.RebalanceUtil.scheduleOnDemandPipeline;


public class ParticipantDeregistrationStage extends AbstractAsyncBaseStage {
private static final Logger LOG = LoggerFactory.getLogger(ParticipantDeregistrationStage.class);

@Override
public AsyncWorkerType getAsyncWorkerType() {
return AsyncWorkerType.ParticipantDeregistrationWorker;
}

@Override
public void execute(ClusterEvent event) throws Exception {
HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
ClusterConfig clusterConfig = manager.getConfigAccessor().getClusterConfig(manager.getClusterName());
if (clusterConfig == null || !clusterConfig.isParticipantDeregistrationEnabled()) {
LOG.info("Cluster config is null or participant deregistration is not enabled. "
+ "Skipping participant deregistration.");
return;
}

ResourceControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name());
Map<String, Long> offlineTimeMap = cache.getInstanceOfflineTimeMap();
long deregisterDelay = clusterConfig.getParticipantDeregistrationTimeout();
long stageStartTime = System.currentTimeMillis();
Set<String> participantsToDeregister = new HashSet<>();
long nextDeregisterTime = Long.MAX_VALUE;


for (Map.Entry<String, Long> entry : offlineTimeMap.entrySet()) {
String instanceName = entry.getKey();
Long offlineTime = entry.getValue();
long deregisterTime = offlineTime + deregisterDelay;

// Skip if instance is still online
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should rely on LiveInstances instead of history.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adjusted this check to be:

if (cache.getLiveInstances().containsKey(instanceName)) {

if (cache.getLiveInstances().containsKey(instanceName)) {
continue;
}

// If deregister time is in the past, deregister the instance
if (deregisterTime <= stageStartTime) {
participantsToDeregister.add(instanceName);
} else {
// Otherwise, find the next earliest deregister time
nextDeregisterTime = Math.min(nextDeregisterTime, deregisterTime);
}
}

if (!participantsToDeregister.isEmpty()) {
Set<String> successfullyDeregisteredParticipants =
deregisterParticipants(manager, cache, participantsToDeregister);
if (!successfullyDeregisteredParticipants.isEmpty()) {
LOG.info("Successfully deregistered {} participants from cluster {}",
successfullyDeregisteredParticipants.size(), cache.getClusterName());
}
}
// Schedule the next deregister task
if (nextDeregisterTime != Long.MAX_VALUE) {
long delay = Math.max(nextDeregisterTime - System.currentTimeMillis(), 0);
scheduleOnDemandPipeline(manager.getClusterName(), delay);
}
}

private Set<String> deregisterParticipants(HelixManager manager, ResourceControllerDataProvider cache,
Set<String> instancesToDeregister) {
Set<String> successfullyDeregisteredInstances = new HashSet<>();

if (manager == null || !manager.isConnected() || cache == null || instancesToDeregister == null) {
LOG.info("ParticipantDeregistrationStage failed due to HelixManager being null or not connected!");
return successfullyDeregisteredInstances;
}

// Perform safety checks before deregistering the instances
for (String instanceName : instancesToDeregister) {
InstanceConfig instanceConfig = cache.getInstanceConfigMap().get(instanceName);
LiveInstance liveInstance = cache.getLiveInstances().get(instanceName);

if (instanceConfig == null) {
LOG.debug("Instance config is null for instance {}, skip deregistering the instance", instanceName);
continue;
}

if (liveInstance != null) {
LOG.debug("Instance {} is still alive, skip deregistering the instance", instanceName);
continue;
}

try {
manager.getClusterManagmentTool().dropInstance(cache.getClusterName(), instanceConfig);
successfullyDeregisteredInstances.add(instanceName);
} catch (HelixException e) {
LOG.warn("Failed to deregister instance {} from cluster {}", instanceName, cache.getClusterName(), e);
}
}

return successfullyDeregisteredInstances;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ public enum ClusterConfigProperty {
LAST_ON_DEMAND_REBALANCE_TIMESTAMP,

// List of Preferred scoring keys used in evenness score computation
PREFERRED_SCORING_KEYS
PREFERRED_SCORING_KEYS,
PARTICIPANT_DEREGISTRATION_TIMEOUT
}

public enum GlobalRebalancePreferenceKey {
Expand Down Expand Up @@ -1255,4 +1256,17 @@ public void setPreferredScoringKeys(List<String> preferredScoringKeys) {
_record.setListField(ClusterConfigProperty.PREFERRED_SCORING_KEYS.name(),
preferredScoringKeys);
}

public long getParticipantDeregistrationTimeout() {
return _record.getLongField(ClusterConfigProperty.PARTICIPANT_DEREGISTRATION_TIMEOUT.name(),
-1);
}

public void setParticipantDeregistrationTimeout(long timeout) {
_record.setLongField(ClusterConfigProperty.PARTICIPANT_DEREGISTRATION_TIMEOUT.name(), timeout);
}

public boolean isParticipantDeregistrationEnabled() {
return getParticipantDeregistrationTimeout() > -1;
}
}
24 changes: 24 additions & 0 deletions helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
Expand Down Expand Up @@ -830,4 +831,27 @@ public String getClusterName() {
return _clusterName;
}
}

public MockParticipantManager addParticipant(String cluster, String instanceName) {
_gSetupTool.addInstanceToCluster(cluster, instanceName);
MockParticipantManager toAddParticipant =
new MockParticipantManager(ZK_ADDR, cluster, instanceName);
toAddParticipant.syncStart();
return toAddParticipant;
}

public void dropParticipant(String cluster, MockParticipantManager participant) {
if (participant == null) {
return;
}

try {
participant.syncStop();
InstanceConfig instanceConfig =
_gSetupTool.getClusterManagementTool().getInstanceConfig(cluster, participant.getInstanceName());
_gSetupTool.getClusterManagementTool().dropInstance(cluster, instanceConfig);
} catch (Exception e) {
LOG.warn("Error dropping participant " + participant.getInstanceName(), e);
}
}
}
Loading
Loading