Skip to content

Commit

Permalink
NIFI-483: Use ZooKeeper's Leader Election to determine Primary Node. …
Browse files Browse the repository at this point in the history
…This closes apache#301

Signed-off-by: Matt Gilman <[email protected]>
  • Loading branch information
markap14 authored and mcgilman committed Apr 4, 2016
1 parent 0d3bd2c commit 1ac0526
Show file tree
Hide file tree
Showing 19 changed files with 495 additions and 409 deletions.
6 changes: 6 additions & 0 deletions nifi-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,12 @@ language governing permissions and limitations under the License. -->
<nifi.cluster.manager.protocol.threads>10</nifi.cluster.manager.protocol.threads>
<nifi.cluster.manager.safemode.duration>0 sec</nifi.cluster.manager.safemode.duration>

<!-- nifi.properties: zookeeper properties -->
<nifi.zookeeper.connect.string></nifi.zookeeper.connect.string>
<nifi.zookeeper.connect.timeout>3 secs</nifi.zookeeper.connect.timeout>
<nifi.zookeeper.session.timeout>3 secs</nifi.zookeeper.session.timeout>
<nifi.zookeeper.root.node>/nifi</nifi.zookeeper.root.node>

<!-- nifi.properties: kerberos properties -->
<nifi.kerberos.krb5.file> </nifi.kerberos.krb5.file>
<nifi.kerberos.service.principal />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ public class NiFiProperties extends Properties {
public static final String CLUSTER_NODE_UNICAST_MANAGER_ADDRESS = "nifi.cluster.node.unicast.manager.address";
public static final String CLUSTER_NODE_UNICAST_MANAGER_PROTOCOL_PORT = "nifi.cluster.node.unicast.manager.protocol.port";

// zookeeper properties
public static final String ZOOKEEPER_CONNECT_STRING = "nifi.zookeeper.connect.string";
public static final String ZOOKEEPER_CONNECT_TIMEOUT = "nifi.zookeeper.connect.timeout";
public static final String ZOOKEEPER_SESSION_TIMEOUT = "nifi.zookeeper.session.timeout";
public static final String ZOOKEEPER_ROOT_NODE = "nifi.zookeeper.root.node";

// cluster manager properties
public static final String CLUSTER_IS_MANAGER = "nifi.cluster.is.manager";
public static final String CLUSTER_MANAGER_ADDRESS = "nifi.cluster.manager.address";
Expand Down Expand Up @@ -226,6 +232,9 @@ public class NiFiProperties extends Properties {
public static final String DEFAULT_PERSISTENT_STATE_DIRECTORY = "./conf/state";
public static final String DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY = "5 mins";
public static final String DEFAULT_BORED_YIELD_DURATION = "10 millis";
public static final String DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT = "3 secs";
public static final String DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = "3 secs";
public static final String DEFAULT_ZOOKEEPER_ROOT_NODE = "/nifi";

// cluster common defaults
public static final String DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "5 sec";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
import org.apache.nifi.reporting.BulletinRepository;
Expand Down Expand Up @@ -56,14 +55,6 @@ public interface ClusterManagerProtocolSender {
*/
void disconnect(DisconnectMessage msg) throws ProtocolException;

/**
* Sends an "assign primary role" message to a node.
*
* @param msg a message
* @throws ProtocolException if communication failed
*/
void assignPrimaryRole(PrimaryRoleAssignmentMessage msg) throws ProtocolException;

/**
* Sets the {@link BulletinRepository} that can be used to report bulletins
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
Expand All @@ -56,7 +55,6 @@ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolS
private final ProtocolContext<ProtocolMessage> protocolContext;
private final SocketConfiguration socketConfiguration;
private int handshakeTimeoutSeconds;
private volatile BulletinRepository bulletinRepository;

public ClusterManagerProtocolSenderImpl(final SocketConfiguration socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) {
if (socketConfiguration == null) {
Expand All @@ -71,7 +69,6 @@ public ClusterManagerProtocolSenderImpl(final SocketConfiguration socketConfigur

@Override
public void setBulletinRepository(final BulletinRepository bulletinRepository) {
this.bulletinRepository = bulletinRepository;
}

/**
Expand Down Expand Up @@ -183,30 +180,6 @@ public void disconnect(final DisconnectMessage msg) throws ProtocolException {
}
}

/**
* Assigns the primary role to a node.
*
* @param msg a message
*
* @throws ProtocolException if the message failed to be sent
*/
@Override
public void assignPrimaryRole(final PrimaryRoleAssignmentMessage msg) throws ProtocolException {
Socket socket = null;
try {
socket = createSocket(msg.getNodeId(), true);

try {
// marshal message to output stream
final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
marshaller.marshal(msg, socket.getOutputStream());
} catch (final IOException ioe) {
throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe);
}
} finally {
SocketUtils.closeQuietly(socket);
}
}

private void setConnectionHandshakeTimeoutOnSocket(final Socket socket) throws SocketException {
// update socket timeout, if handshake timeout was set; otherwise use socket's current timeout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
import org.apache.nifi.reporting.BulletinRepository;
Expand Down Expand Up @@ -108,10 +107,4 @@ public ReconnectionResponseMessage requestReconnection(final ReconnectionRequest
public void disconnect(DisconnectMessage msg) throws ProtocolException {
sender.disconnect(msg);
}

@Override
public void assignPrimaryRole(PrimaryRoleAssignmentMessage msg) throws ProtocolException {
sender.assignPrimaryRole(msg);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage;
import org.apache.nifi.cluster.protocol.message.PingMessage;
import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
Expand Down Expand Up @@ -92,8 +91,4 @@ public MulticastProtocolMessage createMulticastProtocolMessage() {
public ControllerStartupFailureMessage createControllerStartupFailureMessage() {
return new ControllerStartupFailureMessage();
}

public PrimaryRoleAssignmentMessage createPrimaryRoleAssignmentMessage() {
return new PrimaryRoleAssignmentMessage();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public static enum MessageType {
FLOW_RESPONSE,
HEARTBEAT,
PING,
PRIMARY_ROLE,
RECONNECTION_REQUEST,
RECONNECTION_RESPONSE,
SERVICE_BROADCAST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException;
import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
import org.apache.nifi.cluster.manager.exception.IllegalNodeReconnectionException;
import org.apache.nifi.cluster.manager.exception.IneligiblePrimaryNodeException;
import org.apache.nifi.cluster.manager.exception.NodeDisconnectionException;
import org.apache.nifi.cluster.manager.exception.PrimaryRoleAssignmentException;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.node.Node;
import org.apache.nifi.cluster.node.Node.Status;
Expand Down Expand Up @@ -141,22 +139,6 @@ public interface ClusterManager extends NodeInformant {
*/
List<Event> getNodeEvents(final String nodeId);

/**
* Revokes the primary role from the current primary node and assigns the primary role to given given node ID.
*
* If role revocation fails, then the current primary node is set to disconnected while retaining the primary role and no role assignment is performed.
*
* If role assignment fails, then the given node is set to disconnected and is given the primary role.
*
* @param nodeId the node identifier
* @param userDn the Distinguished Name of the user requesting that the Primary Node be assigned
*
* @throws UnknownNodeException if the node with the given identifier does not exist
* @throws IneligiblePrimaryNodeException if the node with the given identifier is not eligible to be the primary node
* @throws PrimaryRoleAssignmentException if the cluster was unable to change the primary role to the requested node
*/
void setPrimaryNode(String nodeId, String userDn) throws UnknownNodeException, IneligiblePrimaryNodeException, PrimaryRoleAssignmentException;

/**
* @return the primary node of the cluster or null if no primary node exists
*/
Expand Down
Loading

0 comments on commit 1ac0526

Please sign in to comment.