From 5abafe674c6da6151f1422e24c4adbaabf78aff7 Mon Sep 17 00:00:00 2001 From: Gantigmaa Selenge Date: Mon, 19 Feb 2024 18:02:41 +0000 Subject: [PATCH] Add explanation for retrying the node Signed-off-by: Gantigmaa Selenge --- 06x-new-kafka-roller.md | 116 ++++++++++++++++++++++------------------ 1 file changed, 63 insertions(+), 53 deletions(-) diff --git a/06x-new-kafka-roller.md b/06x-new-kafka-roller.md index 46260447..bdca9ca8 100644 --- a/06x-new-kafka-roller.md +++ b/06x-new-kafka-roller.md @@ -21,7 +21,7 @@ A pod is considered stuck if it is in one of following states: ### Known Issues The existing KafkaRoller has been suffering from the following shortcomings: -- While it is safe and simple to restart one broker at a time, it is slow in large clusters. +- While it is safe and simple to restart one broker at a time, it is slow in large clusters ([related issue](https://github.com/strimzi/strimzi-kafka-operator/issues/8547)). - It doesn’t worry about partition preferred leadership - Hard to reason about when things go wrong. The code is complex to understand and it's not easy to determine why a pod was restarted from logs that tend to be noisy. - Potential race condition between Cruise Control rebalance and KafkaRoller that could cause partitions under minimum in sync replica. This issue is described in more detail in the `Future Improvements` section. @@ -29,16 +29,12 @@ The existing KafkaRoller has been suffering from the following shortcomings: The following non-trivial fixes and changes are missing from the current KafkaRoller's KRaft implementation: -- Currently KafkaRoller has to connect to brokers successfully in order to get KRaft quorum information and determine whether a controller node can be restarted. This is because it was not possible to directly talk to KRaft controllers at the time before [KIP 919](https://cwiki.apache.org/confluence/display/KAFKA/KIP-919%3A+Allow+AdminClient+to+Talk+Directly+with+the+KRaft+Controller+Quorum+and+add+Controller+Registration) was implemented. +- Currently KafkaRoller has to connect to brokers successfully in order to get KRaft quorum information and determine whether a controller node can be restarted. This is because it was not possible to directly talk to KRaft controllers at the time before [KIP 919](https://cwiki.apache.org/confluence/display/KAFKA/KIP-919%3A+Allow+AdminClient+to+Talk+Directly+with+the+KRaft+Controller+Quorum+and+add+Controller+Registration) was implemented. The issue is raised [here](https://github.com/strimzi/strimzi-kafka-operator/issues/9692). - KafkaRoller takes a long time to reconcile combined nodes if they are all in `Pending` state. This is because the combined node does not become ready until the quorum is formed and KafkaRoller waits for a pod to become ready before it attempts to restart other nodes. In order for the quorum to form, at least the majority of controller nodes need to be running at the same time. This is not easy to solve in the current KafkaRoller without introducing some major changes because it processes each node individually and there is no mechanism to restart multiple nodes in parallel. More information can be found [here](https://github.com/strimzi/strimzi-kafka-operator/issues/9426). - The quorum health check is based on the `controller.quorum.fetch.timeout.ms` configuration which it reads from the desired configurations passed from the Reconciler. However, `CAReconcilor` and manual rolling update pass null value for desired configurations because in both cases, the nodes don't need reconfigurations. This results in performing the quorum healthcheck based on the hard-coded default value of `controller.quorum.fetch.timeout.ms` rather than the accurate configuration value when doing manual rolling update and rolling nodes for certificate renewal. -- The current roller's quorum health will be broken once we have scaling supported via [KIP-853](https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes). KafkaRoller relies on the response from `describeQuorum` API to get the total number of configured controllers. During scale down, the nodes could return inconsistent number of controllers in their responses until all nodes are updated with the correct configuration. This could result in quorum healthcheck not passing therefore not able to restart nodes. - -- KafkaRoller cannot transition nodes from controller only role to combined role. This is because the KRaft controller identifies the role of the node by `NodeRef` object which contains the desired role for the node rather than the currently assigned role. The current roller would have to be updated with a new mechanism to get the currently assigned role. More information can be found here [here](https://github.com/strimzi/strimzi-kafka-operator/issues/9434). - ## Motivation @@ -60,36 +56,39 @@ It will also introduce an algorithm that can restart brokers in parallel while a | Configuration | Default value | Exposed to user | Description | | :-------------| :-------------| :---------------| :---------- | -| maxRestartAttempts | 3 | No | The maximum number of times we attempt to restart a broker before failing the reconciliation. This is checked against `numRestarts` in the `ServerContext`.| -| maxReconfigAttempts | 3 | No | The maximum number of times we attempt to reconfigure a broker before restarting it. | -| postRestartTimeoutMs | 60 seconds | Yes | The maximum amount of time we will wait for brokers to transition to `SERVING` state after a restart. This will be based on the operational timeout that is already exposed to the user. | -| postReconfigureTimeoutMs | 60 seconds | Yes | The maximum amount of time we will wait for brokers to transition to `SERVING` state after a reconfiguration. +| maxRestarts | 3 | No | The maximum number of times a node can be restarted before failing the reconciliation. This is checked against `numRestarts` in the `Context`.| +| maxReconfigs | 3 | No | The maximum number of times a node can be reconfigured before restarting it. This is checked against `numReconfigs` in the `Context`.| +| maxAttempts| 10 | No | The maximum number of times a node can be retried before failing the reconciliation. This is checked against `numAttempts` in the `Context`.| +| postOperationTimeoutMs | 60 seconds | Yes | The maximum amount of time we will wait for nodes to transition to `SERVING` state after an operation in each retry. This will be based on the operational timeout that is already exposed to the user. | | maxBatchSize | 1 | Yes | The maximum number of broker nodes that can be restarted in parallel. | -1. When a new reconciliation starts up, `ServerContext` is created for each broker. +1. When a new reconciliation starts up, `Context` is created for each node. ``` - ServerContext { + Context { nodeId: int nodeRole: String state: ServerState reason: String numRestarts: int + numReconfigs: int + numAttempts: int lastTransitionTime: long } ``` - nodeId: Node ID. - - nodeRoles: Process roles of this node (e.g. controller, broker). This will be set using the pod labels `strimzi.io/controller-role` and `strimzi.io/broker-role` because these are currently assigned roles of the node. + - nodeRoles: Process roles of this node (e.g. controller, broker). This will be set using the pod labels `strimzi.io/controller-role` and `strimzi.io/broker-role` because these are the currently assigned roles of the node. - state: It contains the current state of the node based on information collected from the abstracted sources. The table below describes the possible states. - reason: It is updated based on the current predicate logic from the Reconciler. For example, an update in the Kafka CR is detected. - numRestarts: The value is incremented each time the node has been attempted to restart. - numReconfigs: The value is incremented each time the node has been attempted to reconfigure. + - numReconfigs: The value is incremented each time the node has been retried by the roller. - lastTransitionTime: System.currentTimeMillis of last observed state transition. States | State | Description | | :-------------------- | :---------- | - | UNKNOWN | The initial state when creating `ServerContext` for a node. We expect to transition from this state fairly quickly after creating the context for nodes. | + | UNKNOWN | The initial state when creating `Context` for a node. We expect to transition from this state fairly quickly after creating the context for nodes. | | NOT_RUNNING | Node is not running (Kafka process is not running) | | NOT_READY | Node is running but not ready to serve requests (broker state < 2 OR == 127) | | RESTARTED | After successful `kubectl delete pod`. | @@ -100,88 +99,99 @@ It will also introduce an algorithm that can restart brokers in parallel while a 2. The existing predicate function will be called for each of the nodes and those for which the function returns a non-empty list of reasons will be restarted. -2. Group the nodes into four categories: - - `RESTART_FIRST` - Nodes that have `NOT_READY` or `NOT_RUNNING` state in their contexts. The group will also include nodes that - we cannot connect to via Admin API. - - `RESTART` - Nodes that have non-empty list of reasons from the predicate function and have not been restarted yet (ServerContext.numRestarts == 0). - - `MAYBE_RECONFIGURE` - Nodes that have empty list of reasons and have not been reconfigured yet (ServerContext.numReconfigs == 0). - - `NOP` - Nodes that have been restarted or reconfigured at least once (ServerContext.numRestarts > 0 || ServerContext.numReconfigs > 0 ) and have either - `LEADING_ALL_PREFERRED` or `SERVING` state. Also nodes that have `RECOVERING` state. +3. Group the nodes into the following categories based on information collected from the abstracted sources: + - `RESTART_FIRST` - Nodes that have `NOT_READY` or `NOT_RUNNING` state in their contexts. The group will also include nodes that we cannot connect to via Admin API. + - `WAIT_FOR_LOG_RECOVERY` - Nodes that have `RECOVERING` state. + - `RESTART` - Nodes that have non-empty list of reasons from the predicate function and have not been restarted yet (Context.numRestarts == 0). + - `MAYBE_RECONFIGURE` - Broker nodes (including combined nodes) that have an empty list of reasons and not been reconfigured yet (Context.numReconfigs == 0). + - `NOP` - Nodes that have been restarted or reconfigured at least once (Context.numRestarts > 0 || Context.numReconfigs > 0 ) and have either + `LEADING_ALL_PREFERRED` or `SERVING` state. +4. Wait for nodes in `WAIT_FOR_LOG_RECOVERY` group to finish performing log recovery. + - Wait for each node to have `SERVING` within the `postOperationalTimeoutMs`. + - If the timeout is reached for a node and its `numAttempts` is greater than or equal to `maxAttempts`, throw `UnrestartableNodesException` with the log recovery progress (number of remaining logs and segments). Otherwise increment node's `numAttempts` and restart from step 3. -3. Restart nodes in `RESTART_FIRST` category either one by one in the following order unless all nodes are combined +5. Restart nodes in `RESTART_FIRST` category either one by one in the following order unless all nodes are combined and are in `NOT_RUNNING` state: - Pure controller nodes - - Combined nodes. + - Combined nodes - Broker only nodes If all controllers are combined and are in `NOT_RUNNING` state, restart all nodes in parallel and wait for them to have `SERVING`. Explained more in detail below. - Wait until the restarted node to have `SERVING` and then `LEADING_ALL_PREFERRED` state within `postReconfigureTimeoutMs`. + Wait until the restarted node to have `SERVING` within `postOperationalTimeoutMs`. If the timeout is reached and the node's `numAttempts` is greater than or equal to `maxAttempts`, throw `TimeoutException`. Otherwise increment node's `numAttempts` and restart from step 3. -4. Further refine the nodes in `MAYBE_RECONFIGURE` category: +6. Further refine the broker nodes in `MAYBE_RECONFIGURE` group: - Describe Kafka configurations for each node via Admin API and compare them against the desired configurations. This is essentially the same mechanism we use today for the current KafkaRoller. - If a node has configuration changes and they can be dynamically updated, add the node into another group called `RECONFIGURE`. - If a node has configuration changes but they cannot be dynamically updated, add nodes into the `RESTART` group. - If a node has no configuration changes, put the node into the `NOP` group. -5. Reconfigure each node in `RECONFIGURE` group: - - If `numReconfigs` of a node is greater than the configured `maxReconfigAttempts`, add a restart reason to its context. Otherwise continue. +7. Reconfigure each node in `RECONFIGURE` group: + - If `numReconfigs` of a node is greater than the configured `maxReconfigs`, add a restart reason to its context. Otherwise continue. - Send `incrementalAlterConfig` request with its config updates. - Transitions the node's state to `RECONFIGURED` and increment its `numReconfigs`. - - Wait for each node that got configurations updated until they have `SERVING` and then `LEADING_ALL_PREFERRED` state within `postReconfigureTimeoutMs`. + - Wait for each node that got configurations updated until they have `SERVING` within the `postOperationalTimeoutMs`. + - If the `postOperationalTimeoutMs` is reached, restart from step 3. -6. If at this point, the `RESTART` group is empty, the reconciliation will be completed successfully. +8. If at this point, the `RESTART` group is empty, the reconciliation will be completed successfully. -7. Otherwise, batch nodes in `RESTART` group and get the next batch to restart: +9. Otherwise, batch nodes in `RESTART` group and get the next batch to restart: - Further categorize nodes based on their roles so that the following restart order can be enforced: 1. `NON_ACTIVE_PURE_CONTROLLER` - Pure controller that is not the active controller - 2. `ACTIVE_PURE_CONTROLLER` - Pure controller is the active controller (the quorum leader) + 2. `ACTIVE_PURE_CONTROLLER` - Pure controller that is the active controller (the quorum leader) 3. `BROKER_AND_NOT_ACTIVE_CONTROLLER` - Node that is at least a broker but also might be a controller (combined) and is not the active controller 4. `BROKER_AND_ACTIVE_CONTROLLER` - Combined node that is the active controller (the quorum leader) + + The returned batch will contain only one node if it is not `BROKER_AND_NOT_ACTIVE_CONTROLLER` group, so that controllers are restarted one at a time. - - If `NON_ACTIVE_PURE_CONTROLLER` group is non empty, check which nodes can be restarted without impacting the quorum health (more on this later) and return a batch containing the first one. - - If `ACTIVE_PURE_CONTROLLER` group is non empty, check if the node can be restarted without impacting the quorum health and return a batch containing the active controller node. - - If `BROKER_AND_NOT_ACTIVE_CONTROLLER` group is non empty, batch the nodes: - - build a map of brokers and their replicating partitions by sending describeTopics request to Admin API. - - batch the nodes that do not have any partitions in common therefore can be restarted together. - - remove nodes that have an impact on the availability from the batches (more on this later). - - return the largest batch. - -8. Restart the nodes in the returned batch in parallel: - - If `numRestarts` of a node is larger than `maxRestarts`, return `MaxRestartsExceededException` , which will fail the reconciliation. + - If `NON_ACTIVE_PURE_CONTROLLER` group is non empty, return the first node that can be restarted without impacting the quorum health (more on this later). + - If `ACTIVE_PURE_CONTROLLER` group is non empty, return the node if it can be restarted without impacting the quorum health. Otherwise return an empty set. + - If `BROKER_AND_NOT_ACTIVE_CONTROLLER` group is non empty, batch the broker nodes: + - remove the node from the list, if it is a combined node and cannot be restarted without impacting the quorum health so that it does get included in a batch + - build a map of nodes and their replicating partitions by sending describeTopics request to Admin API + - batch the nodes that do not have any partitions in common therefore can be restarted together + - remove nodes that have an impact on the availability from the batches (more on this later) + - return the largest batch + - If an empty batch is returned, that means none of the nodes met the safety conditions such as availability and qourum health impact. In this case, check their `numAttempts` and if any of them is equal to or greater than `maxAttempts`, throw `UnrestartableNodesException`. Otherwise increment their `numAttempts` and restart from step 3. + +8. Restart the nodes from the returned batch in parallel: + - If `numRestarts` of a node is larger than `maxRestarts`, throw `MaxRestartsExceededException`. - Otherwise, restart each node and transition its state to `RESTARTED` and increment its `numRestarts`. - - After restarting all the nodes in the batch, wait for their states to become `SERVING` and then `LEADING_ALL_PREFERRED` until the configured `postRestartTimeoutMs` is reached. + - After restarting all the nodes in the batch, wait for their states to become `SERVING` until the configured `postOperationalTimeoutMs` is reached. + - If the timeout is reached, throw `TimeoutException` if a node's `numAttempts` is greater than or equal to `maxAttempts`. Otherwise increment their `numAttempts` and restart from step 3. + +9. If there are no exceptions thrown at this point, the reconciliation completes successfully. If there were `UnrestartableNodesException`, `TimeoutException`, `MaxRestartsExceededException` or any other unexpected exceptions throws, the reconciliation fails. -9. If there are no exceptions thrown at this point, the reconciliation completes successfully. +TODO: we need to figure out when to elect preferred leaders and not fail the reconciliation if did not become leaders within the timeout. This does not apply to pure controllers. #### Restarting not running combined nodes When restarting not running combined nodes, we will apply a special logic to address the issue described in https://github.com/strimzi/strimzi-kafka-operator/issues/9426. -In step 3, we restart each node in the `RESTART_FIRST` group one by one. In this specific case, we will compare the total number of not running combined nodes against the total number of controller nodes in the cluster. This is to identify whether all of controllers nodes in this cluster are running in combined mode and all of them are in `NOT_RUNNING` state. In this case, we will restart all the nodes in parallel to give the best chance of forming the quorum. We will then wait for the nodes to have `SERVING` state and then `LEADING_ALL_PREFERRED`. +In step 3, we restart each node in the `RESTART_FIRST` group one by one. In this specific case, we will compare the total number of not running combined nodes against the total number of controller nodes in the cluster. This is to identify whether all of controllers nodes in this cluster are running in combined mode and none of them are running. In this case, we will restart all the nodes in parallel to give the best chance of forming the quorum. We will then wait for the nodes to have `SERVING` state. #### Quorum health check -The quorum health logic is similar to the current KafkaRoller except for a couple of differences. The current KafkaRoller uses the `controller.quorum.fetch.timeout.ms` config value from the desired configurations passed from the reconciler or uses the hard-coded default value if the reconciler pass null for desired configurations. The new roller will use the configuration of the active controller. This will mean that the quorum health check is done from the active controller's point of view. +The quorum health logic is similar to the current KafkaRoller except for a couple of differences. The current KafkaRoller uses the `controller.quorum.fetch.timeout.ms` config value from the desired configurations passed from the reconciler or uses the hard-coded default value if the reconciler pass null for desired configurations. The new roller will use the configuration value of the active controller. This will mean that the quorum health check is done from the active controller's point of view. Also the current KafkaRoller does not connect to the controller via Admin API to get the quorum health information. By the time, we implement this proposal, Strimzi should support Kafka 3.7 which includes [KIP 919](https://cwiki.apache.org/confluence/display/KAFKA/KIP-919%3A+Allow+AdminClient+to+Talk+Directly+with+the+KRaft+Controller+Quorum+and+add+Controller+Registration). Therefore new KafkaRoller will be able to connect to the controller directly for quorum information and active controller's configuration. -The total number of controllers used in the quorum healthcheck will based on the currently assigned roles of the nodes, rather the size of the `voters` list returned from `describeQuorum` API. That way, when we are scaling down, the quorum healthcheck does not fail due to inconsistent responses from the nodes. This will address the issue described in https://github.com/strimzi/strimzi-kafka-operator/issues/9434. +TODO: we need to figure out how to work out the total number of controller without relying on the describe API. #### Availability check -At this point, we would have already built a map of brokers and their replicating partitions by sending describeTopics requests to the Admin API. Then ISRs that the broker is part of will be checked against the configured under minimum ISR size. If `size(ISR containing the broker) - minISR` > 0, the broker can be considered safe to restart. Otherwise, restarting the broker would cause under minimum ISR partition. If it's less than 0, it means the partition is already under minimum ISR and restarting it would either not make a difference or make things worse. In both cases, the broker should not be restarted. +At this point, we would have already built a map of brokers and their replicating partitions by sending describeTopics requests to the Admin API. Then ISRs that the broker is part of will be checked against the configured under minimum ISR size. If `size(ISR containing the broker) - minISR > 0`, the broker can be considered safe to restart. If it equals to 0, restarting the broker could cause under minimum ISR partition. If it's less than 0, it means the partition is already under minimum ISR and restarting it would either not make a difference or make things worse. In both cases, the broker should not be restarted. -### Switching from the current KafkaRoller to the new KafkaRoller +However, if `size(ISR containing the broker) - minISR <= 0` but the topic partition is configured with replication size less than minISR, the check will pass to proceed with the broker restart. -The new KafkaRoller will only work with KRaft clusters therefore when running in Zookeeper mode, the current KafkaRoller +### Switching from the old KafkaRoller to the new KafkaRoller -The new KafkaRoller will be enabled by default for new KRaft clusters which means new KRaft clusters will always run with the new KafkaRoller. +The new KafkaRoller will only work with KRaft clusters therefore when running in Zookeeper mode, the current KafkaRoller will be used. -The current KafkaRoller will be used during migration of existing clusters from Zookeeper to KRaft mode and will be switched to the new roller once the migration is completed. +Kafka CR's `KafkaMetadataState` represents where the metadata is stored for the cluster. It is set to `KRaft` when a cluster is fully migrated to KRaft or was created in KRaft mode. KafkaReconciler will be updated to switch to the new roller based on this state. This means the old KafkaRoller will be used during migration of existing clusters from Zookeeper to KRaft mode and the new roller is used only after the migration is completed and for clusters created in KRaft mode. -### Future potentials +### Future improvement - We are not looking to solve the potential race condition between KafkaRoller and Cruise Control rebalance activity right away but this is something we can solve in the future. An example scenario that cause this race: Let's say we have 5 brokers cluster, minimum in sync replica for topic partition foo-0 is set to 2. The possible sequence of events that could happen: @@ -201,7 +211,7 @@ the [`strimzi-Kafka-operator` GitHub repository](https://github.com/strimzi/stri ## Compatibility -The new KafkaRoller introduced by this proposal will be feature-gated. +The new KafkaRoller introduced by this proposal will used only for KRaft based clusters. This proposal should have no impact on any existing Kafka clusters deployed with ZooKeeper. ## Rejected