From 1ac05266a5cbab0c05551fc40201376cca13b540 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 23 Mar 2016 13:16:13 -0400 Subject: [PATCH] NIFI-483: Use ZooKeeper's Leader Election to determine Primary Node. This closes #301 Signed-off-by: Matt Gilman --- nifi-assembly/pom.xml | 6 + .../org/apache/nifi/util/NiFiProperties.java | 9 + .../ClusterManagerProtocolSender.java | 9 - .../ClusterManagerProtocolSenderImpl.java | 27 -- .../ClusterManagerProtocolSenderListener.java | 7 - .../protocol/jaxb/message/ObjectFactory.java | 5 - .../message/PrimaryRoleAssignmentMessage.java | 55 ---- .../protocol/message/ProtocolMessage.java | 1 - .../nifi/cluster/manager/ClusterManager.java | 18 -- .../manager/impl/WebClusterManager.java | 199 +----------- .../nifi-framework-core/pom.xml | 9 + .../nifi/controller/FlowController.java | 103 ++++--- .../nifi/controller/StandardFlowService.java | 40 --- .../CuratorLeaderElectionManager.java | 285 ++++++++++++++++++ .../election/LeaderElectionManager.java | 71 +++++ .../LeaderElectionStateChangeListener.java | 35 +++ .../src/main/resources/conf/nifi.properties | 6 + .../nifi/web/StandardNiFiServiceFacade.java | 6 - pom.xml | 13 + 19 files changed, 495 insertions(+), 409 deletions(-) delete mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/PrimaryRoleAssignmentMessage.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionStateChangeListener.java diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 0422b8efd5bb..09a8d5016b02 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -456,6 +456,12 @@ language governing permissions and limitations under the License. --> 10 0 sec + + + 3 secs + 3 secs + /nifi + diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index 8c98c0bc7ba5..517b19a65a70 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -174,6 +174,12 @@ public class NiFiProperties extends Properties { public static final String CLUSTER_NODE_UNICAST_MANAGER_ADDRESS = "nifi.cluster.node.unicast.manager.address"; public static final String CLUSTER_NODE_UNICAST_MANAGER_PROTOCOL_PORT = "nifi.cluster.node.unicast.manager.protocol.port"; + // zookeeper properties + public static final String ZOOKEEPER_CONNECT_STRING = "nifi.zookeeper.connect.string"; + public static final String ZOOKEEPER_CONNECT_TIMEOUT = "nifi.zookeeper.connect.timeout"; + public static final String ZOOKEEPER_SESSION_TIMEOUT = "nifi.zookeeper.session.timeout"; + public static final String ZOOKEEPER_ROOT_NODE = "nifi.zookeeper.root.node"; + // cluster manager properties public static final String CLUSTER_IS_MANAGER = "nifi.cluster.is.manager"; public static final String CLUSTER_MANAGER_ADDRESS = "nifi.cluster.manager.address"; @@ -226,6 +232,9 @@ public class NiFiProperties extends Properties { public static final String DEFAULT_PERSISTENT_STATE_DIRECTORY = "./conf/state"; public static final String DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY = "5 mins"; public static final String DEFAULT_BORED_YIELD_DURATION = "10 millis"; + public static final String DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT = "3 secs"; + public static final String DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = "3 secs"; + public static final String DEFAULT_ZOOKEEPER_ROOT_NODE = "/nifi"; // cluster common defaults public static final String DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "5 sec"; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java index 10653ff58faf..bdefbbf3348a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java @@ -19,7 +19,6 @@ import org.apache.nifi.cluster.protocol.message.DisconnectMessage; import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; -import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage; import org.apache.nifi.reporting.BulletinRepository; @@ -56,14 +55,6 @@ public interface ClusterManagerProtocolSender { */ void disconnect(DisconnectMessage msg) throws ProtocolException; - /** - * Sends an "assign primary role" message to a node. - * - * @param msg a message - * @throws ProtocolException if communication failed - */ - void assignPrimaryRole(PrimaryRoleAssignmentMessage msg) throws ProtocolException; - /** * Sets the {@link BulletinRepository} that can be used to report bulletins * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java index 636a6d354c08..fb9292eabe46 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java @@ -31,7 +31,6 @@ import org.apache.nifi.cluster.protocol.message.DisconnectMessage; import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; -import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; @@ -56,7 +55,6 @@ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolS private final ProtocolContext protocolContext; private final SocketConfiguration socketConfiguration; private int handshakeTimeoutSeconds; - private volatile BulletinRepository bulletinRepository; public ClusterManagerProtocolSenderImpl(final SocketConfiguration socketConfiguration, final ProtocolContext protocolContext) { if (socketConfiguration == null) { @@ -71,7 +69,6 @@ public ClusterManagerProtocolSenderImpl(final SocketConfiguration socketConfigur @Override public void setBulletinRepository(final BulletinRepository bulletinRepository) { - this.bulletinRepository = bulletinRepository; } /** @@ -183,30 +180,6 @@ public void disconnect(final DisconnectMessage msg) throws ProtocolException { } } - /** - * Assigns the primary role to a node. - * - * @param msg a message - * - * @throws ProtocolException if the message failed to be sent - */ - @Override - public void assignPrimaryRole(final PrimaryRoleAssignmentMessage msg) throws ProtocolException { - Socket socket = null; - try { - socket = createSocket(msg.getNodeId(), true); - - try { - // marshal message to output stream - final ProtocolMessageMarshaller marshaller = protocolContext.createMarshaller(); - marshaller.marshal(msg, socket.getOutputStream()); - } catch (final IOException ioe) { - throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); - } - } finally { - SocketUtils.closeQuietly(socket); - } - } private void setConnectionHandshakeTimeoutOnSocket(final Socket socket) throws SocketException { // update socket timeout, if handshake timeout was set; otherwise use socket's current timeout diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java index 8eb83a4e3977..54d33a870c67 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java @@ -26,7 +26,6 @@ import org.apache.nifi.cluster.protocol.message.DisconnectMessage; import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; -import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage; import org.apache.nifi.reporting.BulletinRepository; @@ -108,10 +107,4 @@ public ReconnectionResponseMessage requestReconnection(final ReconnectionRequest public void disconnect(DisconnectMessage msg) throws ProtocolException { sender.disconnect(msg); } - - @Override - public void assignPrimaryRole(PrimaryRoleAssignmentMessage msg) throws ProtocolException { - sender.assignPrimaryRole(msg); - } - } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java index 516b67e9c327..25041ce7b8db 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java @@ -27,7 +27,6 @@ import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage; import org.apache.nifi.cluster.protocol.message.PingMessage; -import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage; @@ -92,8 +91,4 @@ public MulticastProtocolMessage createMulticastProtocolMessage() { public ControllerStartupFailureMessage createControllerStartupFailureMessage() { return new ControllerStartupFailureMessage(); } - - public PrimaryRoleAssignmentMessage createPrimaryRoleAssignmentMessage() { - return new PrimaryRoleAssignmentMessage(); - } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/PrimaryRoleAssignmentMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/PrimaryRoleAssignmentMessage.java deleted file mode 100644 index 4b7563a0b55d..000000000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/PrimaryRoleAssignmentMessage.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.message; - -import javax.xml.bind.annotation.XmlRootElement; -import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter; - -/** - */ -@XmlRootElement(name = "primaryRoleAssignmentMessage") -public class PrimaryRoleAssignmentMessage extends ProtocolMessage { - - private NodeIdentifier nodeId; - - private boolean primary; - - @XmlJavaTypeAdapter(NodeIdentifierAdapter.class) - public NodeIdentifier getNodeId() { - return nodeId; - } - - public void setNodeId(NodeIdentifier nodeId) { - this.nodeId = nodeId; - } - - public boolean isPrimary() { - return primary; - } - - public void setPrimary(boolean primary) { - this.primary = primary; - } - - @Override - public MessageType getType() { - return MessageType.PRIMARY_ROLE; - } - -} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java index f01efd8fce2c..5953e0926903 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java @@ -31,7 +31,6 @@ public static enum MessageType { FLOW_RESPONSE, HEARTBEAT, PING, - PRIMARY_ROLE, RECONNECTION_REQUEST, RECONNECTION_RESPONSE, SERVICE_BROADCAST, diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java index 51de54b4133d..27ada88a6b12 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java @@ -23,9 +23,7 @@ import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException; import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException; import org.apache.nifi.cluster.manager.exception.IllegalNodeReconnectionException; -import org.apache.nifi.cluster.manager.exception.IneligiblePrimaryNodeException; import org.apache.nifi.cluster.manager.exception.NodeDisconnectionException; -import org.apache.nifi.cluster.manager.exception.PrimaryRoleAssignmentException; import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.node.Node; import org.apache.nifi.cluster.node.Node.Status; @@ -141,22 +139,6 @@ public interface ClusterManager extends NodeInformant { */ List getNodeEvents(final String nodeId); - /** - * Revokes the primary role from the current primary node and assigns the primary role to given given node ID. - * - * If role revocation fails, then the current primary node is set to disconnected while retaining the primary role and no role assignment is performed. - * - * If role assignment fails, then the given node is set to disconnected and is given the primary role. - * - * @param nodeId the node identifier - * @param userDn the Distinguished Name of the user requesting that the Primary Node be assigned - * - * @throws UnknownNodeException if the node with the given identifier does not exist - * @throws IneligiblePrimaryNodeException if the node with the given identifier is not eligible to be the primary node - * @throws PrimaryRoleAssignmentException if the cluster was unable to change the primary role to the requested node - */ - void setPrimaryNode(String nodeId, String userDn) throws UnknownNodeException, IneligiblePrimaryNodeException, PrimaryRoleAssignmentException; - /** * @return the primary node of the cluster or null if no primary node exists */ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index 303e98e66997..6f1bc2c3bf9a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -93,12 +93,10 @@ import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException; import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException; import org.apache.nifi.cluster.manager.exception.IllegalNodeReconnectionException; -import org.apache.nifi.cluster.manager.exception.IneligiblePrimaryNodeException; import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException; import org.apache.nifi.cluster.manager.exception.NoResponseFromNodesException; import org.apache.nifi.cluster.manager.exception.NodeDisconnectionException; import org.apache.nifi.cluster.manager.exception.NodeReconnectionException; -import org.apache.nifi.cluster.manager.exception.PrimaryRoleAssignmentException; import org.apache.nifi.cluster.manager.exception.SafeModeMutableRequestException; import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.exception.UriConstructionException; @@ -118,7 +116,6 @@ import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage; import org.apache.nifi.cluster.protocol.message.DisconnectMessage; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; -import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage; @@ -551,9 +548,6 @@ public void start() throws IOException { servicesBroadcaster.start(); } - // start in safe mode - executeSafeModeTask(); - // Load and start running Reporting Tasks final byte[] serializedReportingTasks = clusterDataFlow.getReportingTasks(); if (serializedReportingTasks != null && serializedReportingTasks.length > 0) { @@ -1312,88 +1306,6 @@ private void requestDisconnection(final NodeIdentifier nodeId, final boolean ign } } - /** - * Messages the node to have the primary role. If the messaging fails, then the node is marked as disconnected. - * - * @param nodeId the node ID to assign primary role - * - * @return true if primary role assigned; false otherwise - */ - private boolean assignPrimaryRole(final NodeIdentifier nodeId) { - writeLock.lock(); - try { - // create primary role message - final PrimaryRoleAssignmentMessage msg = new PrimaryRoleAssignmentMessage(); - msg.setNodeId(nodeId); - msg.setPrimary(true); - logger.info("Attempting to assign primary role to node: " + nodeId); - - // message - senderListener.assignPrimaryRole(msg); - - logger.info("Assigned primary role to node: " + nodeId); - addBulletin(nodeId, Severity.INFO, "Node assigned primary role"); - - // true indicates primary role assigned - return true; - - } catch (final ProtocolException ex) { - - logger.warn("Failed attempt to assign primary role to node " + nodeId + " due to " + ex); - addBulletin(nodeId, Severity.ERROR, "Failed to assign primary role to node due to: " + ex); - - // mark node as disconnected and log/record the event - final Node node = getRawNode(nodeId.getId()); - node.setStatus(Status.DISCONNECTED); - addEvent(node.getNodeId(), "Disconnected because of failed attempt to assign primary role."); - - addBulletin(nodeId, Severity.WARNING, "Node disconnected because of failed attempt to assign primary role"); - - // false indicates primary role failed to be assigned - return false; - } finally { - writeLock.unlock("assignPrimaryRole"); - } - } - - /** - * Messages the node with the given node ID to no longer have the primary role. If the messaging fails, then the node is marked as disconnected. - * - * @return true if the primary role was revoked from the node; false otherwise - */ - private boolean revokePrimaryRole(final NodeIdentifier nodeId) { - writeLock.lock(); - try { - // create primary role message - final PrimaryRoleAssignmentMessage msg = new PrimaryRoleAssignmentMessage(); - msg.setNodeId(nodeId); - msg.setPrimary(false); - logger.info("Attempting to revoke primary role from node: " + nodeId); - - // send message - senderListener.assignPrimaryRole(msg); - - logger.info("Revoked primary role from node: " + nodeId); - addBulletin(nodeId, Severity.INFO, "Primary Role revoked from node"); - - // true indicates primary role was revoked - return true; - } catch (final ProtocolException ex) { - - logger.warn("Failed attempt to revoke primary role from node " + nodeId + " due to " + ex); - - // mark node as disconnected and log/record the event - final Node node = getRawNode(nodeId.getId()); - node.setStatus(Status.DISCONNECTED); - addEvent(node.getNodeId(), "Disconnected because of failed attempt to revoke primary role."); - addBulletin(node, Severity.ERROR, "Node disconnected because of failed attempt to revoke primary role"); - - // false indicates primary role failed to be revoked - return false; - } finally { - writeLock.unlock("revokePrimaryRole"); - } - } private NodeIdentifier addRequestorDn(final NodeIdentifier nodeId, final String dn) { return new NodeIdentifier(nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort(), @@ -1778,12 +1690,6 @@ private void processPendingHeartbeats() { // get a raw reference to the node (if it doesn't exist, node will be null) node = getRawNode(resolvedNodeIdentifier.getId()); - // if the node thinks it has the primary role, but the manager has assigned the role to a different node, then revoke the role - if (mostRecentHeartbeat.isPrimary() && !isPrimaryNode(resolvedNodeIdentifier)) { - addEvent(resolvedNodeIdentifier, "Heartbeat indicates node is running as primary node. Revoking primary role because primary role is assigned to a different node."); - revokePrimaryRole(resolvedNodeIdentifier); - } - final boolean heartbeatIndicatesNotYetConnected = !mostRecentHeartbeat.isConnected(); if (isBlockedByFirewall(resolvedNodeIdentifier.getSocketAddress())) { @@ -1871,6 +1777,10 @@ private void processPendingHeartbeats() { // record heartbeat node.setHeartbeat(mostRecentHeartbeat); + + if (mostRecentHeartbeat.isPrimary()) { + setPrimaryNodeId(node.getNodeId()); + } } } catch (final Exception e) { logger.error("Failed to process heartbeat from {}:{} due to {}", @@ -1984,47 +1894,6 @@ public Set getNodeIds(final Status... statuses) { } } - @Override - public void setPrimaryNode(final String nodeId, final String userDn) throws UnknownNodeException, IneligiblePrimaryNodeException, PrimaryRoleAssignmentException { - writeLock.lock(); - try { - - final Node node = getNode(nodeId); - if (node == null) { - throw new UnknownNodeException("Node does not exist."); - } else if (Status.CONNECTED != node.getStatus()) { - throw new IneligiblePrimaryNodeException("Node must be connected before it can be assigned as the primary node."); - } - - // revoke primary role - final Node primaryNode; - if ((primaryNode = getPrimaryNode()) != null) { - if (primaryNode.getStatus() == Status.DISCONNECTED) { - throw new PrimaryRoleAssignmentException("A disconnected, primary node exists. Delete the node before assigning the primary role to a different node."); - } else if (revokePrimaryRole(primaryNode.getNodeId())) { - addEvent(primaryNode.getNodeId(), "Role revoked from this node as part of primary role reassignment."); - } else { - throw new PrimaryRoleAssignmentException( - "Failed to revoke primary role from node. Primary node is now disconnected. Delete the node before assigning the primary role to a different node."); - } - } - - // change the primary node ID to the given node - setPrimaryNodeId(node.getNodeId()); - - // assign primary role - if (assignPrimaryRole(node.getNodeId())) { - addEvent(node.getNodeId(), "Role assigned to this node as part of primary role reassignment. Action performed by " + userDn); - addBulletin(node, Severity.INFO, "Primary Role assigned to node by " + userDn); - } else { - throw new PrimaryRoleAssignmentException( - "Cluster manager assigned primary role to node, but the node failed to accept the assignment. Cluster manager disconnected node."); - } - } finally { - writeLock.unlock("setPrimaryNode"); - } - } - private int getClusterProtocolHeartbeatSeconds() { return (int) FormatUtils.getTimeDuration(properties.getClusterProtocolHeartbeatInterval(), TimeUnit.SECONDS); } @@ -4508,66 +4377,6 @@ private void logNodes(final String header, final Logger logger) { } } - private void executeSafeModeTask() { - - new Thread(new Runnable() { - - private final long threadStartTime = System.currentTimeMillis(); - - @Override - public void run() { - logger.info("Entering safe mode..."); - final int safeModeSeconds = (int) FormatUtils.getTimeDuration(properties.getClusterManagerSafeModeDuration(), TimeUnit.SECONDS); - final long timeToElect = safeModeSeconds <= 0 ? Long.MAX_VALUE : threadStartTime + TimeUnit.MILLISECONDS.convert(safeModeSeconds, TimeUnit.SECONDS); - boolean exitSafeMode = false; - while (isRunning()) { - - writeLock.lock(); - try { - - final long currentTime = System.currentTimeMillis(); - if (timeToElect < currentTime) { - final Set connectedNodeIds = getNodeIds(Status.CONNECTED); - if (!connectedNodeIds.isEmpty()) { - // get first connected node ID - final NodeIdentifier connectedNodeId = connectedNodeIds.iterator().next(); - if (assignPrimaryRole(connectedNodeId)) { - try { - setPrimaryNodeId(connectedNodeId); - exitSafeMode = true; - } catch (final DaoException de) { - final String message = String.format("Failed to persist primary node ID '%s' in cluster dataflow.", connectedNodeId); - logger.warn(message); - addBulletin(connectedNodeId, Severity.WARNING, message); - revokePrimaryRole(connectedNodeId); - } - } - } - } - - if (!isInSafeMode()) { - // a primary node has been selected outside of this thread - exitSafeMode = true; - logger.info("Exiting safe mode because " + primaryNodeId + " has been assigned the primary role."); - break; - } - } finally { - writeLock.unlock("executeSafeModeTask"); - } - - if (!exitSafeMode) { - // sleep for a bit - try { - Thread.sleep(1000); - } catch (final InterruptedException ie) { - return; - } - } - - } - } - }).start(); - } /** * This timer task simply processes any pending heartbeats. This timer task is not strictly needed, as HeartbeatMonitoringTimerTask will do this. However, this task is scheduled much more diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml index dc5b7d378cc0..ff3ecba9843d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml @@ -129,6 +129,15 @@ nifi-processor-utils + + org.apache.curator + curator-framework + + + org.apache.curator + curator-recipes + + org.apache.curator curator-test diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 632fa1a9bfd6..9f14354529ca 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -83,6 +83,9 @@ import org.apache.nifi.controller.exception.ProcessorInstantiationException; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.label.StandardLabel; +import org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager; +import org.apache.nifi.controller.leader.election.LeaderElectionManager; +import org.apache.nifi.controller.leader.election.LeaderElectionStateChangeListener; import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; @@ -231,6 +234,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R public static final String ROOT_GROUP_ID_ALIAS = "root"; public static final String DEFAULT_ROOT_GROUP_NAME = "NiFi Flow"; + public static final String PRIMARY_NODE_ROLE_NAME = "primary-node"; private final AtomicInteger maxTimerDrivenThreads; private final AtomicInteger maxEventDrivenThreads; @@ -277,6 +281,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private ProcessGroup rootGroup; private final List startConnectablesAfterInitialization; private final List startRemoteGroupPortsAfterInitialization; + private final LeaderElectionManager leaderElectionManager; + /** * true if controller is configured to operate in a clustered environment @@ -327,12 +333,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private boolean clustered; private String clusterManagerDN; - // guarded by rwLock - /** - * true if controller is the primary of the cluster - */ - private boolean primary; - // guarded by rwLock /** * true if connected to a cluster @@ -527,6 +527,11 @@ public void run() { }, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS); heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false, false)); + if (configuredForClustering) { + leaderElectionManager = new CuratorLeaderElectionManager(4); + } else { + leaderElectionManager = null; + } } private static FlowFileRepository createFlowFileRepository(final NiFiProperties properties, final ResourceClaimManager contentClaimManager) { @@ -1159,6 +1164,10 @@ public void shutdown(final boolean kill) { throw new IllegalStateException("Controller already stopped or still stopping..."); } + if (leaderElectionManager != null) { + leaderElectionManager.stop(); + } + if (kill) { this.timerDrivenEngineRef.get().shutdownNow(); this.eventDrivenEngineRef.get().shutdownNow(); @@ -1365,7 +1374,7 @@ void setRootGroup(final ProcessGroup group) { } // update the heartbeat bean - this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, connected)); + this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connected)); } finally { writeLock.unlock(); } @@ -3119,6 +3128,19 @@ public void setClustered(final boolean clustered, final String clusterInstanceId // update the bulletin repository if (isChanging) { if (clustered) { + leaderElectionManager.register(PRIMARY_NODE_ROLE_NAME, new LeaderElectionStateChangeListener() { + @Override + public void onLeaderElection() { + setPrimary(true); + } + + @Override + public void onLeaderRelinquish() { + setPrimary(false); + } + }); + + leaderElectionManager.start(); stateManagerProvider.enableClusterProvider(); if (zooKeeperStateServer != null) { @@ -3157,6 +3179,8 @@ public void run() { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L)); } } else { + leaderElectionManager.unregister(PRIMARY_NODE_ROLE_NAME); + if (zooKeeperStateServer != null) { zooKeeperStateServer.shutdown(); } @@ -3170,7 +3194,7 @@ public void run() { } // update the heartbeat bean - this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, connected)); + this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connected)); } finally { writeLock.unlock(); } @@ -3180,51 +3204,38 @@ public void run() { * @return true if this instance is the primary node in the cluster; false otherwise */ public boolean isPrimary() { - rwLock.readLock().lock(); - try { - return primary; - } finally { - rwLock.readLock().unlock(); - } + return leaderElectionManager != null && leaderElectionManager.isLeader(PRIMARY_NODE_ROLE_NAME); } public void setPrimary(final boolean primary) { - rwLock.writeLock().lock(); - try { - // no update, so return - if (this.primary == primary) { - return; - } - - LOG.info("Setting primary flag from '" + this.primary + "' to '" + primary + "'"); - - final PrimaryNodeState nodeState = primary ? PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED; - final ProcessGroup rootGroup = getGroup(getRootGroupId()); - for (final ProcessorNode procNode : rootGroup.findAllProcessors()) { - try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, procNode.getProcessor(), nodeState); - } + final PrimaryNodeState nodeState = primary ? PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED; + final ProcessGroup rootGroup = getGroup(getRootGroupId()); + for (final ProcessorNode procNode : rootGroup.findAllProcessors()) { + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, procNode.getProcessor(), nodeState); } - for (final ControllerServiceNode serviceNode : getAllControllerServices()) { - try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, serviceNode.getControllerServiceImplementation(), nodeState); - } + } + for (final ControllerServiceNode serviceNode : getAllControllerServices()) { + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, serviceNode.getControllerServiceImplementation(), nodeState); } - for (final ReportingTaskNode reportingTaskNode : getAllReportingTasks()) { - try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, reportingTaskNode.getReportingTask(), nodeState); - } + } + for (final ReportingTaskNode reportingTaskNode : getAllReportingTasks()) { + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, reportingTaskNode.getReportingTask(), nodeState); } + } - // update primary - this.primary = primary; - eventDrivenWorkerQueue.setPrimary(primary); + // update primary + eventDrivenWorkerQueue.setPrimary(primary); - // update the heartbeat bean - this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, connected)); - } finally { - rwLock.writeLock().unlock(); - } + // update the heartbeat bean + this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, connected)); + + // Emit a bulletin detailing the fact that the primary node state has changed + final String message = primary ? "This node has been elected Primary Node" : "This node is no longer Primary Node"; + final Bulletin bulletin = BulletinFactory.createBulletin("Primary Node", Severity.INFO.name(), message); + bulletinRepository.addBulletin(bulletin); } static boolean areEqual(final String a, final String b) { @@ -3603,7 +3614,7 @@ public void setConnected(final boolean connected) { this.connected = connected; // update the heartbeat bean - this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, connected)); + this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connected)); } finally { rwLock.writeLock().unlock(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index 6250c5a86d80..67d0338776ef 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -55,7 +55,6 @@ import org.apache.nifi.cluster.protocol.message.DisconnectMessage; import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; -import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; @@ -342,7 +341,6 @@ public boolean canHandle(final ProtocolMessage msg) { case RECONNECTION_REQUEST: case DISCONNECTION_REQUEST: case FLOW_REQUEST: - case PRIMARY_ROLE: return true; default: return false; @@ -380,14 +378,6 @@ public void run() { } }, "Disconnect from Cluster").start(); - return null; - case PRIMARY_ROLE: - new Thread(new Runnable() { - @Override - public void run() { - handlePrimaryRoleAssignment((PrimaryRoleAssignmentMessage) request); - } - }, "Set Primary Role Status").start(); return null; default: throw new ProtocolException("Handler cannot handle message type: " + request.getType()); @@ -512,14 +502,6 @@ private FlowResponseMessage handleFlowRequest(final FlowRequestMessage request) } } - private void handlePrimaryRoleAssignment(final PrimaryRoleAssignmentMessage msg) { - writeLock.lock(); - try { - controller.setPrimary(msg.isPrimary()); - } finally { - writeLock.unlock(); - } - } private void handleReconnectionRequest(final ReconnectionRequestMessage request) { writeLock.lock(); @@ -747,9 +729,6 @@ private void loadFromConnectionResponse(final ConnectionResponse response) throw controller.setConnected(true); - // set primary - controller.setPrimary(response.isPrimary()); - // start the processors as indicated by the dataflow controller.onFlowInitialized(dataFlow.isAutoStartProcessors()); @@ -862,7 +841,6 @@ public void run() { } private class SaveHolder { - private final Calendar saveTime; private final boolean shouldArchive; @@ -871,22 +849,4 @@ private SaveHolder(final Calendar moment, final boolean archive) { shouldArchive = archive; } } - - public boolean isPrimary() { - readLock.lock(); - try { - return controller.isPrimary(); - } finally { - readLock.unlock(); - } - } - - public void setPrimary(boolean primary) { - writeLock.lock(); - try { - controller.setPrimary(primary); - } finally { - writeLock.unlock(); - } - } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java new file mode 100644 index 000000000000..4c0cbd04daa8 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.leader.election; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.leader.LeaderSelector; +import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; +import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.retry.RetryForever; +import org.apache.nifi.engine.FlowEngine; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; +import org.apache.zookeeper.common.PathUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CuratorLeaderElectionManager implements LeaderElectionManager { + private static final Logger logger = LoggerFactory.getLogger(CuratorLeaderElectionManager.class); + + private final FlowEngine leaderElectionMonitorEngine; + private final int sessionTimeoutMs; + private final int connectionTimeoutMs; + private final String rootPath; + private final String connectString; + + private CuratorFramework curatorClient; + + private volatile boolean stopped = true; + + private final Map leaderRoles = new HashMap<>(); + private final Map registeredRoles = new HashMap<>(); + + public CuratorLeaderElectionManager(final int threadPoolSize) { + leaderElectionMonitorEngine = new FlowEngine(threadPoolSize, "Leader Election Notification", true); + + final NiFiProperties properties = NiFiProperties.getInstance(); + + connectString = properties.getProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING); + if (connectString == null || connectString.trim().isEmpty()) { + throw new IllegalStateException("The '" + NiFiProperties.ZOOKEEPER_CONNECT_STRING + "' property is not set in nifi.properties"); + } + + sessionTimeoutMs = getTimePeriod(properties, NiFiProperties.ZOOKEEPER_SESSION_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT); + connectionTimeoutMs = getTimePeriod(properties, NiFiProperties.ZOOKEEPER_CONNECT_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT); + rootPath = properties.getProperty(NiFiProperties.ZOOKEEPER_ROOT_NODE, NiFiProperties.DEFAULT_ZOOKEEPER_ROOT_NODE); + + try { + PathUtils.validatePath(rootPath); + } catch (final IllegalArgumentException e) { + throw new IllegalStateException("The '" + NiFiProperties.ZOOKEEPER_ROOT_NODE + "' property in nifi.properties is set to an illegal value: " + rootPath); + } + } + + + @Override + public synchronized void start() { + if (!stopped) { + return; + } + + stopped = false; + + final RetryPolicy retryPolicy = new RetryForever(5000); + curatorClient = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy); + curatorClient.start(); + + // Call #register for each already-registered role. This will + // cause us to start listening for leader elections for that + // role again + for (final Map.Entry entry : registeredRoles.entrySet()) { + register(entry.getKey(), entry.getValue()); + } + + logger.info("{} started", this); + } + + private int getTimePeriod(final NiFiProperties properties, final String propertyName, final String defaultValue) { + final String timeout = properties.getProperty(propertyName, defaultValue); + try { + return (int) FormatUtils.getTimeDuration(timeout, TimeUnit.MILLISECONDS); + } catch (final Exception e) { + logger.warn("Value of '" + propertyName + "' property is set to '" + timeout + "', which is not a valid time period. Using default of " + defaultValue); + return (int) FormatUtils.getTimeDuration(defaultValue, TimeUnit.MILLISECONDS); + } + } + + + @Override + public synchronized void register(final String roleName) { + register(roleName, null); + } + + + @Override + public synchronized void register(final String roleName, final LeaderElectionStateChangeListener listener) { + logger.debug("{} Registering new Leader Selector for role {}", this, roleName); + + if (leaderRoles.containsKey(roleName)) { + logger.warn("{} Attempted to register Leader Election for role '{}' but this role is already registered", this, roleName); + return; + } + + final String leaderPath = (rootPath.endsWith("/") ? "" : "/") + "leaders/" + roleName; + + try { + PathUtils.validatePath(rootPath); + } catch (final IllegalArgumentException e) { + throw new IllegalStateException("Cannot register leader election for role '" + roleName + "' because this is not a valid role name"); + } + + registeredRoles.put(roleName, listener); + + if (!isStopped()) { + final ElectionListener electionListener = new ElectionListener(roleName, listener); + final LeaderSelector leaderSelector = new LeaderSelector(curatorClient, leaderPath, leaderElectionMonitorEngine, electionListener); + leaderSelector.autoRequeue(); + leaderSelector.start(); + + final LeaderRole leaderRole = new LeaderRole(leaderSelector, electionListener); + + leaderRoles.put(roleName, leaderRole); + } + logger.info("{} Registered new Leader Selector for role {}", this, roleName); + } + + @Override + public synchronized void unregister(final String roleName) { + registeredRoles.remove(roleName); + + final LeaderRole leaderRole = leaderRoles.remove(roleName); + final LeaderSelector leaderSelector = leaderRole.getLeaderSelector(); + if (leaderSelector == null) { + logger.warn("Cannot unregister Leader Election Role '{}' becuase that role is not registered", roleName); + return; + } + + leaderSelector.close(); + } + + @Override + public synchronized void stop() { + stopped = true; + + for (final LeaderRole role : leaderRoles.values()) { + final LeaderSelector selector = role.getLeaderSelector(); + selector.close(); + } + + leaderRoles.clear(); + + if (curatorClient != null) { + curatorClient.close(); + curatorClient = null; + } + + logger.info("{} stopped and closed", this); + } + + @Override + public boolean isStopped() { + return stopped; + } + + + @Override + public String toString() { + return "CuratorLeaderElectionManager[stopped=" + isStopped() + "]"; + } + + + @Override + public synchronized boolean isLeader(final String roleName) { + final LeaderRole role = leaderRoles.get(roleName); + if (role == null) { + return false; + } + + return role.isLeader(); + } + + + private static class LeaderRole { + private final LeaderSelector leaderSelector; + private final ElectionListener electionListener; + + public LeaderRole(final LeaderSelector leaderSelector, final ElectionListener electionListener) { + this.leaderSelector = leaderSelector; + this.electionListener = electionListener; + } + + public LeaderSelector getLeaderSelector() { + return leaderSelector; + } + + public boolean isLeader() { + return electionListener.isLeader(); + } + } + + + private class ElectionListener extends LeaderSelectorListenerAdapter implements LeaderSelectorListener { + private final String roleName; + private final LeaderElectionStateChangeListener listener; + + private volatile boolean leader; + + public ElectionListener(final String roleName, final LeaderElectionStateChangeListener listener) { + this.roleName = roleName; + this.listener = listener; + } + + public boolean isLeader() { + return leader; + } + + @Override + public void stateChanged(final CuratorFramework client, final ConnectionState newState) { + logger.info("{} Connection State changed to {}", this, newState.name()); + super.stateChanged(client, newState); + } + + @Override + public void takeLeadership(final CuratorFramework client) throws Exception { + leader = true; + logger.info("{} This node has been elected Leader for Role '{}'", this, roleName); + + if (listener != null) { + leaderElectionMonitorEngine.submit(new Runnable() { + @Override + public void run() { + listener.onLeaderElection(); + } + }); + } + + // Curator API states that we lose the leadership election when we return from this method, + // so we will block as long as we are not interrupted or closed. Then, we will set leader to false. + try { + while (!isStopped()) { + try { + Thread.sleep(1000L); + } catch (final InterruptedException ie) { + logger.info("{} has been interrupted; no longer leader for role '{}'", this, roleName); + Thread.currentThread().interrupt(); + return; + } + } + } finally { + leader = false; + logger.info("{} This node is no longer leader for role '{}'", this, roleName); + + if (listener != null) { + leaderElectionMonitorEngine.submit(new Runnable() { + @Override + public void run() { + listener.onLeaderRelinquish(); + } + }); + } + } + } + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java new file mode 100644 index 000000000000..d16dbdbebd86 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.leader.election; + +public interface LeaderElectionManager { + /** + * Starts managing leader elections for all registered roles + */ + void start(); + + /** + * Adds a new role for which a leader is required + * + * @param roleName the name of the role + */ + void register(String roleName); + + /** + * Adds a new role for which a leader is required + * + * @param roleName the name of the role + * @param listener a listener that will be called when the node gains or relinquishes + * the role of leader + */ + void register(String roleName, LeaderElectionStateChangeListener listener); + + /** + * Removes the role with the given name from this manager. If this + * node is the elected leader for the given role, this node will relinquish + * the leadership role + * + * @param roleName the name of the role to unregister + */ + void unregister(String roleName); + + /** + * Returns a boolean value indicating whether or not this node + * is the elected leader for the given role + * + * @param roleName the name of the role + * @return true if the node is the elected leader, false otherwise. + */ + boolean isLeader(String roleName); + + /** + * @return true if the manager is stopped, false otherwise. + */ + boolean isStopped(); + + /** + * Stops managing leader elections and relinquishes the role as leader + * for all registered roles. If the LeaderElectionManager is later started + * again, all previously registered roles will still be registered. + */ + void stop(); +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionStateChangeListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionStateChangeListener.java new file mode 100644 index 000000000000..79c7a7520e6a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionStateChangeListener.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.leader.election; + +/** + * Callback interface that can be used to listen for state changes so that the node + * can be notified when it becomes the Elected Leader for a role or is no longer the + * Elected Leader + */ +public interface LeaderElectionStateChangeListener { + /** + * This method is invoked whenever this node is elected leader + */ + void onLeaderElection(); + + /** + * This method is invoked whenever this node no longer is the elected leader. + */ + void onLeaderRelinquish(); +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index 24d229508245..beb71c1439db 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -168,6 +168,12 @@ nifi.cluster.node.protocol.threads=${nifi.cluster.node.protocol.threads} nifi.cluster.node.unicast.manager.address=${nifi.cluster.node.unicast.manager.address} nifi.cluster.node.unicast.manager.protocol.port=${nifi.cluster.node.unicast.manager.protocol.port} +# zookeeper properties, used for cluster management # +nifi.zookeeper.connect.string=${nifi.zookeeper.connect.string} +nifi.zookeeper.connect.timeout=${nifi.zookeeper.connect.timeout} +nifi.zookeeper.session.timeout=${nifi.zookeeper.session.timeout} +nifi.zookeeper.root.node=${nifi.zookeeper.root.node} + # cluster manager properties (only configure for cluster manager) # nifi.cluster.is.manager=${nifi.cluster.is.manager} nifi.cluster.manager.address=${nifi.cluster.manager.address} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index b981bdee889e..4fdda0683a6d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -760,12 +760,6 @@ public NodeDTO updateNode(NodeDTO nodeDTO) { clusterManager.requestReconnection(nodeDTO.getNodeId(), userDn); } else if (Node.Status.DISCONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) { clusterManager.requestDisconnection(nodeDTO.getNodeId(), userDn); - } else { - // handle primary - final Boolean primary = nodeDTO.isPrimary(); - if (primary != null && primary) { - clusterManager.setPrimaryNode(nodeDTO.getNodeId(), userDn); - } } final String nodeId = nodeDTO.getNodeId(); diff --git a/pom.xml b/pom.xml index dc764b082028..442acdd55085 100644 --- a/pom.xml +++ b/pom.xml @@ -765,6 +765,17 @@ language governing permissions and limitations under the License. --> zookeeper 3.4.6 + + org.apache.curator + curator-framework + 2.10.0 + + + org.apache.curator + curator-recipes + 2.10.0 + + @@ -779,6 +790,8 @@ language governing permissions and limitations under the License. --> 6.8.8 test + + org.jsoup jsoup