diff --git a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java index 5d3bc72935..8be03d2280 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java @@ -518,14 +518,14 @@ public NodeState brokerState(int brokerId, long shutdownTimeoutNs) { return NodeState.UNKNOWN; } if (broker.shuttingDown()) { - return NodeState.SHUTTING_DOWN; + return NodeState.CONTROLLED_SHUTDOWN; } if (broker.fenced()) { if (broker.lastControlledShutdownNs() + shutdownTimeoutNs > time.nanoseconds()) { // The broker is still in controlled shutdown. - return NodeState.SHUTTING_DOWN; + return NodeState.CONTROLLED_SHUTDOWN; } - return NodeState.SHUTDOWN; + return NodeState.FENCED; } return NodeState.ACTIVE; } diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/NodeState.java b/metadata/src/main/java/org/apache/kafka/controller/stream/NodeState.java index 97eb8fc497..c21fcd8114 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/NodeState.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/NodeState.java @@ -11,29 +11,23 @@ package org.apache.kafka.controller.stream; +import org.apache.kafka.controller.BrokerControlState; + public enum NodeState { /** * The node is active and can handle requests. */ ACTIVE, - /** - * The node is shutting down in a controlled manner. - */ - SHUTTING_DOWN, /** * The node is shut down and cannot handle requests. */ - SHUTDOWN, + FENCED, /** - * Use @{@link #SHUTTING_DOWN} instead. + * The node is shutting down in a controlled manner. + * Note: In AutoMQ, this state is different from {@link BrokerControlState#CONTROLLED_SHUTDOWN}. In some cases, + * a node in {@link BrokerControlState#FENCED} state may still be shutting down in a controlled manner. */ - @Deprecated CONTROLLED_SHUTDOWN, - /** - * Use @{@link #SHUTDOWN} instead. - */ - @Deprecated - FENCED, /** * The state of the node is unknown, possibly because it has not yet registered. */ diff --git a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java index 294909475d..9c96de0b8b 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java @@ -374,7 +374,7 @@ public void testBrokerState() { manager.register(0, true); // FENCED Broker - assertEquals(NodeState.SHUTDOWN, manager.brokerState(0, shutdownTimeoutNs)); + assertEquals(NodeState.FENCED, manager.brokerState(0, shutdownTimeoutNs)); // UNFENCED Broker manager.touch(0, false, 100); @@ -382,16 +382,16 @@ public void testBrokerState() { // CONTROLLED_SHUTDOWN Broker manager.maybeUpdateControlledShutdownOffset(0, 100); - assertEquals(NodeState.SHUTTING_DOWN, manager.brokerState(0, shutdownTimeoutNs)); + assertEquals(NodeState.CONTROLLED_SHUTDOWN, manager.brokerState(0, shutdownTimeoutNs)); // SHUTDOWN_NOW Broker within shutdownTimeoutNs manager.touch(0, true, 100); manager.time().sleep(5); - assertEquals(NodeState.SHUTTING_DOWN, manager.brokerState(0, shutdownTimeoutNs)); + assertEquals(NodeState.CONTROLLED_SHUTDOWN, manager.brokerState(0, shutdownTimeoutNs)); // SHUTDOWN_NOW Broker after shutdownTimeoutNs manager.time().sleep(6); - assertEquals(NodeState.SHUTDOWN, manager.brokerState(0, shutdownTimeoutNs)); + assertEquals(NodeState.FENCED, manager.brokerState(0, shutdownTimeoutNs)); // UNFENCED Broker after SHUTDOWN manager.touch(0, false, 100); diff --git a/metadata/src/test/java/org/apache/kafka/controller/stream/NodeControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/stream/NodeControlManagerTest.java index 1b0266837e..b725d9e69a 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/stream/NodeControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/stream/NodeControlManagerTest.java @@ -95,7 +95,7 @@ public void testRegister() { assertTrue(nodeControlManager.nodeMetadataMap.containsKey(0)); when(nodeRuntimeInfoGetter.hasOpeningStreams(eq(0))).thenReturn(true); - when(nodeRuntimeInfoGetter.state(eq(0))).thenReturn(NodeState.SHUTDOWN); + when(nodeRuntimeInfoGetter.state(eq(0))).thenReturn(NodeState.FENCED); ControllerResult getRst = nodeControlManager.getMetadata( new AutomqGetNodesRequest(new AutomqGetNodesRequestData().setNodeIds(List.of(0, 1)), @@ -107,7 +107,7 @@ public void testRegister() { assertEquals(0, nodes.get(0).nodeId()); assertEquals(2L, nodes.get(0).nodeEpoch()); assertEquals("wal2", nodes.get(0).walConfig()); - assertEquals("SHUTDOWN", nodes.get(0).state()); + assertEquals(NodeState.FENCED.name(), nodes.get(0).state()); } AutomqRegisterNodeRequestData.TagCollection tags(Map tags) {