Skip to content

Commit

Permalink
Address comments from Federico
Browse files Browse the repository at this point in the history
Signed-off-by: Gantigmaa Selenge <[email protected]>
  • Loading branch information
tinaselenge committed Mar 12, 2024
1 parent 33ec40e commit c060c24
Showing 1 changed file with 19 additions and 20 deletions.
39 changes: 19 additions & 20 deletions 06x-new-kafka-roller.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@

## Current situation

The Kafka Roller is a Cluster Operator component that's responsible for coordinating the rolling restart or reconfiguration of Kafka pods when:
The Kafka Roller is an internal Cluster Operator component that's responsible for coordinating the rolling restart or reconfiguration of Kafka pods when:
- non-dynamic reconfigurations needs to be applied
- update in Kafka CRD is detected
- a certificate is renewed
- pods have been manually annotated by the user for controlled restarts
- pod is stuck and is out of date
- Kafka broker is unresponsive to Kafka Admin connections

These are not the exhaustive list of possible triggers for rolling Kafka pods, but the main ones to highlight.

A pod is considered stuck if it is in one of following states:
- `CrashLoopBackOff`
- `ImagePullBackOff`
Expand Down Expand Up @@ -46,7 +44,7 @@ As you can see above, the current KafkaRoller still needs various changes and po

## Proposal

The objective of this proposal is to introduce a new KafkaRoller with simplified logic having a structured design resembling a finite state machine. KafkaRoller desisions are informed by observations coming from different sources (e.g. Kubernetes API, KafkaAgent, Kafka Admin API). These sources will be abstracted so that KafkaRoller is not dependent on their specifics as long as it's getting the information it needs. The abstractions also enable much better unit testing.
The objective of this proposal is to introduce a new KafkaRoller with simplified logic having a structured design resembling a finite state machine. KafkaRoller decisions are informed by observations coming from different sources (e.g. Kubernetes API, KafkaAgent, Kafka Admin API). These sources will be abstracted so that KafkaRoller is not dependent on their specifics as long as it's getting the information it needs. The abstractions also enable much better unit testing.

Depending on the observed states, the roller will perform specific actions. Those actions should cause a subsequent observation to cause a state transition. This iterative process continues until each node's state aligns with the desired state.

Expand All @@ -59,9 +57,9 @@ When a new reconciliation starts up, a context object is created for each node t
- <i>currentNodeRole</i>: Currently assigned process roles for this node (e.g. controller, broker).
- <i>state</i>: It contains the current state of the node based on information collected from the abstracted sources (Kubernetes API, KafkaAgent and Kafka Admin API). The table below describes the possible states.
- <i>reason</i>: It is updated based on the current predicate logic from the Reconciler. For example, an update in the Kafka CR is detected.
- <i>numRestarts</i>: The value is incremented each time the node has been attempted to restart.
- <i>numReconfigs</i>: The value is incremented each time the node has been attempted to reconfigure.
- <i>numAttempts</i>: The value is incremented each time the node cannot be restarted/reconfigured due to not meeting safety conditions (more on this later).
- <i>numRestartAttempts</i>: The value is incremented each time the node has been attempted to restart.
- <i>numReconfigAttempts</i>: The value is incremented each time the node has been attempted to reconfigure.
- <i>numRetries</i>: The value is incremented each time the node cannot be restarted/reconfigured due to not meeting safety conditions (more on this later).
- <i>lastTransitionTime</i>: System.currentTimeMillis of last observed state transition.

<b>States</b>
Expand All @@ -76,16 +74,18 @@ When a new reconciliation starts up, a context object is created for each node t
| SERVING | Node is in running state and ready to serve requests (broker state >= 3 AND != 127). |
| LEADING_ALL_PREFERRED | Node is in running state and leading all preferred replicas. |

The broker states are defined [here](https://github.com/apache/kafka/blob/58ddd693e69599b177d09c2e384f31e7f5e11171/metadata/src/main/java/org/apache/kafka/metadata/BrokerState.java#L46).

### Configurability
The following can be the configured for the new KafkaRoller:
The following can be the configuration options for the new KafkaRoller:

| Configuration | Default value | Exposed to user | Description |
| :-------------| :-------------| :---------------| :---------- |
| maxRestarts | 3 | No | The maximum number of times a node can be restarted before failing the reconciliation. This is checked against the node's `numRestarts`.|
| maxReconfigs | 3 | No | The maximum number of times a node can be reconfigured before restarting it. This is checked against the node's `numReconfigs`.|
| maxAttempts| 10 | No | The maximum number of times a node can retried after not meeting the safety conditions. This is checked against the node's `numAttempts`.|
| maxRestartAttempts | 3 | No | The maximum number of times a node can be restarted before failing the reconciliation. This is checked against the node's `numRestarts`.|
| maxReconfigAttempts | 3 | No | The maximum number of times a node can be reconfigured before restarting it. This is checked against the node's `numReconfigs`.|
| maxRetries| 10 | No | The maximum number of times a node can retried after not meeting the safety conditions. This is checked against the node's `numAttempts`.|
| postOperationTimeoutMs | 60 seconds | Yes | The maximum amount of time we will wait for nodes to transition to `SERVING` state after an operation in each retry. This will be based on the operation timeout that is already exposed to the user via environment variable `STRIMZI_OPERATION_TIMEOUT_MS`. |
| maxBatchSize | 1 | Yes | The maximum number of broker nodes that can be restarted in parallel. |
| maxRestartParallelism | 1 | Yes | The maximum number of broker nodes that can be restarted in parallel. |


### Algorithm
Expand All @@ -110,12 +110,12 @@ Context: {
- `WAIT_FOR_LOG_RECOVERY` - Nodes that have `RECOVERING` state.
- `RESTART` - Nodes that have non-empty list of reasons from the predicate function and have not been restarted yet (Context.numRestarts == 0).
- `MAYBE_RECONFIGURE` - Broker nodes (including combined nodes) that have an empty list of reasons and not been reconfigured yet (Context.numReconfigs == 0).
- `NOP` - Nodes that have been restarted or reconfigured at least once (Context.numRestarts > 0 || Context.numReconfigs > 0 ) and have either
- `NOP` - Nodes that have at least one restart or reconfiguration attempt (Context.numRestarts > 0 || Context.numReconfigs > 0 ) and have either
`LEADING_ALL_PREFERRED` or `SERVING` state.

4. Wait for nodes in `WAIT_FOR_LOG_RECOVERY` group to finish performing log recovery.
- Wait for each node to have `SERVING` within the `postOperationalTimeoutMs`.
- If the timeout is reached for a node and its `numAttempts` is greater than or equal to `maxAttempts`, throw `UnrestartableNodesException` with the log recovery progress (number of remaining logs and segments). Otherwise increment node's `numAttempts` and restart from step 3.
- If the timeout is reached for a node and its `numAttempts` is greater than or equal to `maxRetries`, throw `UnrestartableNodesException` with the log recovery progress (number of remaining logs and segments). Otherwise increment node's `numAttempts` and restart from step 3.

5. Restart nodes in `RESTART_FIRST` category:
- if one or more nodes have `NOT_RUNNING` state, we first need to check 2 special conditions:
Expand All @@ -125,15 +125,15 @@ Context: {
- If a node is in `NOT_RUNNING` state, the restart it only if it has `POD_HAS_OLD_REVISION` reason. This is because, if the node is not running at all, then restarting it likely won't make any difference unless node is out of date.
> For example, if a pod is in pending state due to misconfigured affinity rule, there is no point restarting this pod again or restarting other pods, because that would leave them in pending state as well. If the user then fixed the misconfigured affinity rule, then we should detect that the pod has an old revision, therefore should restart it so that pod is scheduled correctly and runs.
- At this point either we started all nodes or a node or decided not to because of node's reason not being `POD_HAS_OLD_REVISION`. Regardless, wait for nodes to have `SERVING` within `postOperationalTimeoutMs`. If the timeout is reached and the node's `numAttempts` is greater than or equal to `maxAttempts`, throw `TimeoutException`. Otherwise increment node's `numAttempts` and restart from step 3.
- At this point either we started all nodes or a node or decided not to because of node's reason not being `POD_HAS_OLD_REVISION`. Regardless, wait for nodes to have `SERVING` within `postOperationalTimeoutMs`. If the timeout is reached and the node's `numAttempts` is greater than or equal to `maxRetries`, throw `TimeoutException`. Otherwise increment node's `numAttempts` and restart from step 3.


- Otherwise the controllers will be attempted to restart one by one in the following order:
- Otherwise the nodes will be attempted to restart one by one in the following order:
- Pure controller nodes
- Combined nodes
- Broker only nodes

- Wait for the restarted node to have `SERVING` within `postOperationalTimeoutMs`. If the timeout is reached and the node's `numAttempts` is greater than or equal to `maxAttempts`, throw `TimeoutException`. Otherwise increment node's `numAttempts` and restart from step 3.
- Wait for the restarted node to have `SERVING` within `postOperationalTimeoutMs`. If the timeout is reached and the node's `numAttempts` is greater than or equal to `maxRetries`, throw `TimeoutException`. Otherwise increment node's `numAttempts` and restart from step 3.

6. Further refine the broker nodes in `MAYBE_RECONFIGURE` group:
- Describe Kafka configurations for each node via Admin API and compare them against the desired configurations. This is essentially the same mechanism we use today for the current KafkaRoller.
Expand Down Expand Up @@ -165,18 +165,17 @@ Context: {
- If `COMBINED_AND_NOT_ACTIVE_CONTROLLER` group is non empty, return the first node that can be restarted without impacting the quorum health and the availability.
- If `COMBINED_AND_ACTIVE_CONTROLLER` group is non empty, return the node if it can be restarted without impacting the quorum health and the availability. Otherwise return an empty set.
- If `BROKER` group is non empty, batch the broker nodes:
- remove the node from the list, if it is a combined node and cannot be restarted without impacting the quorum health so that it does get included in a batch
- build a map of nodes and their replicating partitions by sending describeTopics request to Admin API
- batch the nodes that do not have any partitions in common therefore can be restarted together
- remove nodes that have an impact on the availability from the batches (more on this later)
- return the largest batch
- If an empty batch is returned, that means none of the nodes met the safety conditions such as availability and qourum health impact. In this case, check their `numAttempts` and if any of them is equal to or greater than `maxAttempts`, throw `UnrestartableNodesException`. Otherwise increment their `numAttempts` and restart from step 3.
- If an empty batch is returned, that means none of the nodes met the safety conditions such as availability and qourum health impact. In this case, check their `numAttempts` and if any of them is equal to or greater than `maxRetries`, throw `UnrestartableNodesException`. Otherwise increment their `numAttempts` and restart from step 3.

8. Restart the nodes from the returned batch in parallel:
- If `numRestarts` of a node is larger than `maxRestarts`, throw `MaxRestartsExceededException`.
- Otherwise, restart each node and transition its state to `RESTARTED` and increment its `numRestarts`.
- After restarting all the nodes in the batch, wait for their states to become `SERVING` until the configured `postOperationalTimeoutMs` is reached.
- If the timeout is reached, throw `TimeoutException` if a node's `numAttempts` is greater than or equal to `maxAttempts`. Otherwise increment their `numAttempts` and restart from step 3.
- If the timeout is reached, throw `TimeoutException` if a node's `numAttempts` is greater than or equal to `maxRetries`. Otherwise increment their `numAttempts` and restart from step 3.

9. If there are no exceptions thrown at this point, the reconciliation completes successfully. If there were `UnrestartableNodesException`, `TimeoutException`, `MaxRestartsExceededException` or any other unexpected exceptions throws, the reconciliation fails.

Expand Down

0 comments on commit c060c24

Please sign in to comment.