Skip to content

Commit

Permalink
Merge pull request #6318 from flowlogix/CP_SUBSYSTEM_FAILURE_FIXES
Browse files Browse the repository at this point in the history
FISH-7640 Improved Hazelcast functionality as it relates to CP subsystem
  • Loading branch information
Pandrex247 authored Sep 1, 2023
2 parents 05eb91a + 5fd7566 commit 66d15a4
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.cp.IAtomicLong;
import com.hazelcast.cp.exception.CPSubsystemException;
import com.hazelcast.cp.lock.FencedLock;
import com.hazelcast.map.IMap;
import com.sun.enterprise.container.common.spi.ClusteredSingletonLookup;
import fish.payara.nucleus.hazelcast.HazelcastCore;

import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import org.glassfish.internal.api.Globals;
import org.glassfish.internal.api.JavaEEContextUtil;
Expand Down Expand Up @@ -91,8 +93,8 @@ public final String getSessionHzKey() {

@Override
public FencedLock getDistributedLock() {
return lock.updateAndGet(v -> v != null ? v : getHazelcastInstance().getCPSubsystem()
.getLock(makeLockKey()));
return retryCpOperation(() -> lock.updateAndGet(v -> v != null ?
v : getHazelcastInstance().getCPSubsystem().getLock(makeLockKey())));
}

@Override
Expand All @@ -104,8 +106,8 @@ public IMap<String, Object> getClusteredSingletonMap() {

@Override
public IAtomicLong getClusteredUsageCount() {
return count.updateAndGet(v -> v != null ? v : getHazelcastInstance().getCPSubsystem()
.getAtomicLong(makeCountKey()));
return retryCpOperation(() -> count.updateAndGet(v -> v != null ?
v : getHazelcastInstance().getCPSubsystem().getAtomicLong(makeCountKey())));
}

private HazelcastInstance getHazelcastInstance() {
Expand Down Expand Up @@ -143,6 +145,18 @@ public HazelcastCore getHazelcastCore() {
return hzCore;
}

private <TT> TT retryCpOperation(Supplier<TT> operation) {
CPSubsystemException exception = null;
for (int ii = 0; ii < 3; ++ii) {
try {
return operation.get();
} catch (CPSubsystemException e) {
exception = e;
}
}
throw exception;
}

private String makeKeyPrefix() {
return String.format("Payara/%s/singleton/", singletonType.name().toLowerCase());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,16 @@

package com.sun.ejb.containers;

import com.hazelcast.cp.exception.CPSubsystemException;
import com.hazelcast.cp.lock.exception.LockOwnershipLostException;
import com.sun.ejb.ComponentContext;
import com.sun.ejb.EjbInvocation;
import com.sun.ejb.InvocationInfo;
import com.sun.ejb.MethodLockInfo;
import com.sun.enterprise.security.SecurityManager;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import jakarta.ejb.ConcurrentAccessException;
import jakarta.ejb.ConcurrentAccessTimeoutException;
import jakarta.ejb.IllegalLoopbackException;
Expand Down Expand Up @@ -209,7 +212,11 @@ public void releaseContext(EjbInvocation inv) {

Lock theLock = inv.getCMCLock();
if (theLock != null) {
theLock.unlock();
try {
theLock.unlock();
} catch (CPSubsystemException | LockOwnershipLostException e) {
_logger.log(Level.WARNING, "Distributed unlock failed", e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@

import java.lang.annotation.Annotation;

import com.hazelcast.cp.exception.CPSubsystemException;
import com.hazelcast.cp.lock.FencedLock;
import com.hazelcast.cp.lock.exception.LockOwnershipLostException;
import fish.payara.cluster.DistributedLockType;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.spi.Context;
Expand All @@ -60,13 +62,16 @@
import fish.payara.cluster.Clustered;
import fish.payara.micro.cdi.extension.cluster.annotations.ClusterScoped;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* @Clustered singleton CDI context implementation
*
* @author lprimak
*/
class ClusterScopeContext implements Context {
private static final Logger log = Logger.getLogger(ClusterScopeContext.class.getName());
private final BeanManager beanManager;
private final ClusteredSingletonLookupImpl clusteredLookup;

Expand Down Expand Up @@ -198,8 +203,12 @@ private static boolean lock(Clustered clusteredAnnotation, FencedLock lock) {

protected static void unlock(Clustered clusteredAnnotation, FencedLock lock) {
if (clusteredAnnotation.lock() == DistributedLockType.LOCK) {
if (lock.isLockedByCurrentThread()) {
lock.unlock();
try {
if (lock.isLockedByCurrentThread()) {
lock.unlock();
}
} catch (CPSubsystemException | LockOwnershipLostException e) {
log.log(Level.WARNING, "Distributed unlock failed", e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
package fish.payara.nucleus.hazelcast;

import com.hazelcast.cache.impl.HazelcastServerCachingProvider;
import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.ClusterState;
import com.hazelcast.cluster.Member;
import com.hazelcast.collection.ISet;
import com.hazelcast.config.Config;
import com.hazelcast.config.DiscoveryStrategyConfig;
import com.hazelcast.config.ExecutorConfig;
Expand All @@ -56,11 +60,18 @@
import com.hazelcast.config.TcpIpConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.cp.event.CPGroupAvailabilityEvent;
import com.hazelcast.cp.event.CPGroupAvailabilityListener;
import com.hazelcast.cp.event.CPMembershipEvent;
import com.hazelcast.cp.event.CPMembershipListener;
import com.hazelcast.cp.exception.CPGroupDestroyedException;
import com.hazelcast.internal.config.ConfigLoader;
import com.hazelcast.kubernetes.KubernetesProperties;
import com.hazelcast.map.IMap;
import com.hazelcast.nio.serialization.Serializer;
import com.hazelcast.nio.serialization.StreamSerializer;
import com.hazelcast.spi.properties.ClusterProperty;
import com.sun.enterprise.util.Utility;
import fish.payara.nucleus.events.HazelcastEvents;
import jakarta.annotation.PostConstruct;
Expand Down Expand Up @@ -92,7 +103,9 @@
import java.beans.PropertyChangeEvent;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.MalformedURLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -101,11 +114,19 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import static com.hazelcast.spi.properties.ClusterProperty.WAIT_SECONDS_BEFORE_JOIN;
import static com.hazelcast.cp.CPGroup.METADATA_CP_GROUP_NAME;
import static fish.payara.nucleus.hazelcast.PayaraHazelcastDiscoveryFactory.HOST_AWARE_PARTITIONING;
import static java.lang.String.valueOf;

Expand Down Expand Up @@ -160,6 +181,9 @@ public class HazelcastCore implements EventListener, ConfigListener {
@Inject
Transactions transactions;

final Lock cpResetLock = new ReentrantLock();
final AtomicReference<Instant> lastResetTime = new AtomicReference<>(Instant.EPOCH);

/**
* Returns the version of the object that has been instantiated.
* @return null if an instance of {@link HazelcastCore} has not been created
Expand Down Expand Up @@ -383,18 +407,8 @@ public Object run(final HazelcastConfigSpecificConfiguration hazelcastRuntimeCon
}
}
config.setClassLoader(clh.getCommonClassLoader());

// The below are to test split-brain scenario,
// see https://github.com/hazelcast/hazelcast/issues/17586
// and https://github.com/hazelcast/hazelcast/issues/17260
// config.setProperty(MAX_NO_HEARTBEAT_SECONDS.getName(), "5");
// config.setProperty(HEARTBEAT_INTERVAL_SECONDS.getName(), "1");
// config.setProperty(MERGE_FIRST_RUN_DELAY_SECONDS.getName(), "5");
// config.setProperty(MERGE_NEXT_RUN_DELAY_SECONDS.getName(), "5");

// can't quite set it to zero yet because of:
// https://github.com/hazelcast/hazelcast/issues/17586
config.setProperty(WAIT_SECONDS_BEFORE_JOIN.getName(), "1");
// can set to zero as of Hazelcast 5.4 or greater
config.setProperty(ClusterProperty.WAIT_SECONDS_BEFORE_JOIN.getName(), "1");

if(ctxUtil != null) {
SerializationConfig serializationConfig = new SerializationConfig();
Expand Down Expand Up @@ -513,13 +527,15 @@ private void buildNetworkConfiguration(Config config, boolean hostAwarePartition
} else {
//build the domain discovery config
config.setProperty("hazelcast.discovery.enabled", "true");
PartitionGroupConfig partitionGroupConfig = config.getPartitionGroupConfig();
partitionGroupConfig.setEnabled(true);
partitionGroupConfig.setGroupType(PartitionGroupConfig.MemberGroupType.SPI);
config.getNetworkConfig().getJoin().getDiscoveryConfig().addDiscoveryStrategyConfig(
new DiscoveryStrategyConfig(DomainDiscoveryStrategy.class.getName())
.addProperty(HOST_AWARE_PARTITIONING.key(), hostAwarePartitioning));
config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false);
if (Boolean.parseBoolean(System.getProperty("hazelcast.auto-partition-group", "true"))) {
PartitionGroupConfig partitionGroupConfig = config.getPartitionGroupConfig();
partitionGroupConfig.setEnabled(true);
partitionGroupConfig.setGroupType(PartitionGroupConfig.MemberGroupType.SPI);
}
}

if (env.isDas() && !env.isMicro()) {
Expand Down Expand Up @@ -559,6 +575,7 @@ private synchronized void bootstrapHazelcast() {
cpSubsystemLogger.setLevel(Level.SEVERE);
config.getMemberAttributeConfig().setAttribute(INSTANCE_GROUP_ATTRIBUTE, memberGroup);
theInstance = Hazelcast.newHazelcastInstance(config);
autoPromoteCPMembers(config);
} finally {
cpSubsystemLogger.setLevel(cpSubsystemLevel);
}
Expand Down Expand Up @@ -770,4 +787,128 @@ public Object run(final HazelcastConfigSpecificConfiguration hazelcastConfigSpec
}
}, nodeConfig);
}

private void autoPromoteCPMembers(Config config) {
final String availabilityStructureName = "Payara/cluster/cp/availability";
String waitBeforeJoinStr = config.getProperty(ClusterProperty.WAIT_SECONDS_BEFORE_JOIN.getName());
if (waitBeforeJoinStr == null) {
waitBeforeJoinStr = ClusterProperty.WAIT_SECONDS_BEFORE_JOIN.getDefaultValue();
}
final int waitBeforeJoin = Math.max(5, Integer.parseInt(waitBeforeJoinStr));

String maxWaitBeforeJoinStr = config.getProperty(ClusterProperty.MAX_WAIT_SECONDS_BEFORE_JOIN.getName());
if (maxWaitBeforeJoinStr == null) {
maxWaitBeforeJoinStr = ClusterProperty.MAX_WAIT_SECONDS_BEFORE_JOIN.getDefaultValue();
}
final int maxWaitBeforeJoin = Integer.parseInt(maxWaitBeforeJoinStr) * 10 * 2;

if (!config.isLiteMember() && config.getCPSubsystemConfig().getCPMemberCount() > 0 && Boolean.parseBoolean(
System.getProperty("hazelcast.cp-subsystem.auto-promote", "true"))) {
theInstance.getCPSubsystem().addMembershipListener(new CPMembershipListener() {
@Override
public void memberAdded(CPMembershipEvent cpMembershipEvent) {
theInstance.getMap(availabilityStructureName).remove(cpMembershipEvent.getMember().getAddress());
}

@Override
public void memberRemoved(CPMembershipEvent cpMembershipEvent) {
try {
if (!cpMembershipEvent.getMember().equals(theInstance.getCPSubsystem()
.getCPSubsystemManagementService().getLocalCPMember())) {
theInstance.getCPSubsystem().getCPSubsystemManagementService()
.getCPGroup(METADATA_CP_GROUP_NAME).toCompletableFuture()
.get(waitBeforeJoin, TimeUnit.SECONDS);
}
} catch (CompletionException | InterruptedException | ExecutionException | TimeoutException e) {
if (e.getCause() instanceof IllegalStateException) {
theInstance.getSet(availabilityStructureName).add(theInstance.getCluster().getLocalMember());
}
}
}
});
theInstance.getCPSubsystem().addGroupAvailabilityListener(new CPGroupAvailabilityListener() {
@Override
public void availabilityDecreased(CPGroupAvailabilityEvent cpGroupAvailabilityEvent) {
if (cpGroupAvailabilityEvent.isMetadataGroup()) {
var map = theInstance.getMap(availabilityStructureName);
cpGroupAvailabilityEvent.getUnavailableMembers().forEach(member -> {
map.put(member.getAddress(), member.getUuid());
});
}
}

@Override
public void majorityLost(CPGroupAvailabilityEvent cpGroupAvailabilityEvent) {
if (cpGroupAvailabilityEvent.isMetadataGroup()) {
theInstance.getSet(availabilityStructureName).add(theInstance.getCluster().getLocalMember());
}
}
});

var cpManagementService = theInstance.getCPSubsystem().getCPSubsystemManagementService();
if (cpManagementService.isDiscoveryCompleted()) {
Executors.newSingleThreadExecutor().submit(() -> {
try {
for (int ii = 0; ii < maxWaitBeforeJoin; ++ii) {
if (theInstance.getCluster().getClusterState() == ClusterState.ACTIVE) {
break;
}
TimeUnit.MILLISECONDS.sleep(100);
}
sendCPResetToMaster(availabilityStructureName, waitBeforeJoin);

var localMember = theInstance.getCluster().getLocalMember();
IMap<Address, UUID> map = theInstance.getMap(availabilityStructureName);
UUID uuid = map.get(localMember.getAddress());
if (uuid != null || cpManagementService.getCPMembers().toCompletableFuture().join()
.size() < config.getCPSubsystemConfig().getCPMemberCount()) {
if (uuid != null) {
try {
cpManagementService.removeCPMember(uuid).toCompletableFuture().join();
} catch (CompletionException e) {
}
map.remove(localMember.getAddress());
}
cpManagementService.promoteToCPMember();
Logger.getLogger(HazelcastCore.class.getName()).log(Level.INFO, "Instance Promoted into CP Subsystem");
}
} catch (HazelcastInstanceNotActiveException e) {
} catch (Exception exc) {
if (exc.getCause() instanceof CPGroupDestroyedException) { }
else {
Logger.getLogger(HazelcastCore.class.getName()).log(Level.WARNING, "Auto CP Promotion Failure", exc);
}
}
});
}
}
}

private void sendCPResetToMaster(String availabilityStructureName, int waitBeforeJoin) {
ISet<Member> cpMembersToReset = theInstance.getSet(availabilityStructureName);
if (!cpMembersToReset.isEmpty()) {
var fn = (Serializable & Runnable) () -> {
theCore.cpResetLock.lock();
try {
if (theCore.lastResetTime.get().plusSeconds(waitBeforeJoin).isAfter(Instant.now())) {
return;
}
try {
theCore.theInstance.getCPSubsystem().getCPSubsystemManagementService()
.getCPGroup(METADATA_CP_GROUP_NAME).toCompletableFuture().get(waitBeforeJoin, TimeUnit.SECONDS);
} catch (CompletionException | InterruptedException | ExecutionException | TimeoutException e) {
theCore.theInstance.getCPSubsystem().getCPSubsystemManagementService().reset().toCompletableFuture().join();
theCore.lastResetTime.set(Instant.now());
}
theCore.theInstance.getSet(availabilityStructureName).clear();
} catch (Exception exc) {
Logger.getLogger(HazelcastCore.class.getName()).log(Level.FINE, "Auto CP Reset Failure", exc);
}
finally {
theCore.cpResetLock.unlock();
}
};
theInstance.getExecutorService(availabilityStructureName).executeOnMembers(fn, cpMembersToReset);
}
}
}

0 comments on commit 66d15a4

Please sign in to comment.