Skip to content

Commit

Permalink
Add progress status for partition rebalances
Browse files Browse the repository at this point in the history
Signed-off-by: Kyle Liberti <[email protected]>
  • Loading branch information
kyguy committed Nov 26, 2024
1 parent 315a70f commit 169723b
Showing 1 changed file with 328 additions and 0 deletions.
328 changes: 328 additions & 0 deletions 088-rebalance-progress-status
Original file line number Diff line number Diff line change
@@ -0,0 +1,328 @@
# Partition Rebalance Progress Status

This proposal introduces a new feature to monitor the progression of an ongoing partition rebalance executed by a Strimzi-managed Cruise Control instance via a `KafkaRebalance` custom resource.

## Current Situation

At this time, Strimzi users are able to execute partition rebalances via `KafkaRebalance` custom resources but can only monitor the progression of those partition rebalances in two ways outside the `KafkaRebalance` resource by:

- Manually querying the Cruise Control REST API endpoint directly.
- Inspecting the logs of the Cruise Control instance.

Unfortunately, neither of these methods are particularly user friendly.

## Motivation

Information surrounding the progress of an executing partition rebalance is useful for planning future cluster operations. Knowing things like:

- How much time an ongoing partition rebalance has left to take
- How much data an ongoing partition rebalance has left to transfer

helps users understand the cost of an ongoing partition rebalance, decide whether or not they should continue or cancel it, and know when future operations will be able to be safely executed.

Further, having this information readily available and easily accessible via `KafkaRebalance` custom resources allows users and third-party tools like the Kubernetes CLI or Strimzi Console to easily track the progression of a partition rebalance.

## Proposal

This proposal would extend the status section of the `KafkaRebalance` custom resource to include a “progress” section that displays information related to ongoing partition rebalances.

In this “progress” section, we include the following fields:

- estimatedTimeToCompletion: The minimum estimated amount time it will take in minutes until partition rebalance is complete.
- percentageComplete: The percentage of the partition rebalance that is completed e.g. values in the range [0-100]%
- rebalanceProgressConfigMap: The ConfigMap where “non-verbose” JSON payload from Executor State from CruiseControlState endpoint is stored.

### Supported KafkaRebalance States

For initial implementation we will focus on including the “progress” section only in the following KafkaRebalance states:

- “Rebalancing”
- "Stopped"

These are the states where this progress information will be able to be most accurately calculated and most useful for users.
We could provide the “progress” section for other states as well such as the “ProposalReady” and “Ready” states but it is not completely necessary, nor is it trivial.
Further explanation as to why that is and why it should be saved as a future improvement is explained in the Future Improvements section near the bottom of this proposal.

All information required for estimating the values of “estimatedTimeToCompletion” and “percentageComplete” fields can be derived from either Cruise Control server configurations or CruiseControlState endpoint.
That being said, the method of estimation for these fields depends on the state of the KafkaRebalance resource.

#### estimatedTimeToCompletion

##### Rebalancing

```
rate = (finishedDataMovement)/(<task_trigger_time> - <current_time>)

estimatedTimeToCompletion = (totalDateToMove-finishedDataMovement) / (rate)
```

##### Stopped

Once a rebalance has been stopped, it cannot be completed.
Therefore, there is no “estimationTimeToCompletion” for a stopped rebalance, so we set estimatedTimeToCompletion = null to emphasize this.
The `KafkaRebalance` resource must be refreshed and the progress section overwritten with the next state change.

```
estimatedTimeToCompletion = null
```


#### percentageComplete

##### Rebalancing

```
percentageComplete = (finishedDataMovement/totalDataToMoveMB)%
```

##### Stopped

Once a rebalance has been stopped, it cannot be completed. The `KafkaRebalance` resource must be refreshed and the progress section overwritten with that change. That being said, before the `KafkaRebalace` resource is deleted or “refreshed”, the percentageComplete information will still be of value to users.

```
percentageComplete = (finishedDataMovement/totalDataToMoveMB)
```

#### rebalanceProgressConfigMap

Will only be present in “Rebalancing” and “Stopped” states.

The enhanced `KafkaRebalance` resource would include the following in its status section

```
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
spec: {}
status:
conditions:
- lastTransitionTime: "2024-11-05T15:28:23.995129903Z"
status: "True"
type: Rebalancing | Stopped [1]
observedGeneration: 1
optimizationResult:
afterBeforeLoadConfigMap: my-rebalance
dataToMoveMB: 0
excludedBrokersForLeadership: []
excludedBrokersForReplicaMove: []
excludedTopics: []
intraBrokerDataToMoveMB: 0
monitoredPartitionsPercentage: 100
numIntraBrokerReplicaMovements: 0
numLeaderMovements: 16
numReplicaMovements: 0
onDemandBalancednessScoreAfter: 95.4347095948149
onDemandBalancednessScoreBefore: 89.4347095948149
provisionRecommendation: ""
provisionStatus: RIGHT_SIZED
recentWindows: 1
progress:
estimatedTimeToCompletion: 5m [2]
percentageComplete: 80% [3]
rebalanceProgressConfigMap: my-rebalance-progress [4]
```
[1] The “progress” section will be visible during the KafkaRebalance “Rebalancing” and “Stopped” states.
[2] The minimum estimated time it will take the rebalance to complete.
[3] The percentage complete of the ongoing rebalance in the range [0-100]%
[4] The ConfigMap where “non-verbose” JSON payload from Executor State from CruiseControlState endpoint is stored.

### Executor State

All the information needed for the `progress` section proposed above relies on the [ExecutorState](https://github.com/linkedin/cruise-control/wiki/REST-APIs#query-the-state-of-cruise-control) of the CruiseControlState endpoint.

Querying the Executor State during an interbroker balance dumps the following JSON payload:
```
{
"abortingPartitions": 0,
"averageConcurrentPartitionMovementsPerBroker": 5,
"finishedDataMovement": 0,
"maximumConcurrentPartitionMovementsPerBroker": 5,
"minimumConcurrentPartitionMovementsPerBroker": 5,
"numFinishedPartitionMovements": 0,
"numInProgressPartitionMovements": 0,
"numPendingPartitionMovements": 20,
"numTotalPartitionMovements": 20,
"state": "INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS",
"totalDataToMove": 0,
"triggeredSelfHealingTaskId": "",
"triggeredTaskReason": "No reason provided (Client: 172.17.0.1, Date: 2024-11-15T19:41:27Z)",
"triggeredUserTaskId": "0230d401-6a36-430e-9858-fac8f2edde93"
}
```

For determining which fields are included for different executor states e.g. NO_TASK_IN_PROGRESS, STARTING_EXECUTION, INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS whether the verbose parameter is provided or not, refer to the code [here](https://github.com/linkedin/cruise-control/blob/2.5.141/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorState.java#L427)

For an exhaustive list of all the fields that could be included, refer to Cruise Control’s OpenAPI spec, refer to the file [here](https://github.com/linkedin/cruise-control/blob/2.5.141/cruise-control/src/main/resources/yaml/responses/executorState.yaml)

The “non-verbose” JSON payload from the ExecutorState is already too verbose to include in the `KafkaRebalance` status in its entirety.
However, having the information available to users is still useful especially when debugging the state of a partition rebalance.
Therefore, we will store the JSON payload in its own ConfigMap, “rebalanceProgressConfigMap”.
For this initial feature enhancement we will only store the “non-verbose” JSON output but we will still have a good amount of space remaining in the ConfigMap should we make the verbosity configurable in the future.

The ConfigMap with ExecutorStatus included would section:
```
apiVersion: v1
kind: ConfigMap
metadata:
name: my-rebalance-progress
data:
executorState: [1]
`{"abortingPartitions":0,"averageConcurrentPartitionMovementsPerBroker":5,"finishedDataMovement":0,"maximumConcurrentPartitionMovementsPerBroker":5,"minimumConcurrentPartitionMovementsPerBroker":5,"numFinishedPartitionMovements":0,"numInProgressPartitionMovements":0,"numPendingPartitionMovements":20,"numTotalPartitionMovements":20,"state":"INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS","totalDataToMove":0,"triggeredSelfHealingTaskId":"","triggeredTaskReason":"No reason provided (Client: 172.17.0.1, Date: 2024-11-15T19:41:27Z)","triggeredUserTaskId":"0230d401-6a36-430e-9858-fac8f2edde93"}
`
```
[1] An example of the ExecutorState JSON of an inter-broker partition rebalance.

### Progress Update Cadence

For ease of implementation and minimizing the load on the CruiseControl REST API server, we would only query the CruiseControlState endpoint and update the “progress” section upon `KafkaRebalance` resource reconciliation.
The progress section will never be more out of date longer than the reconciliation period and even if the rebalance runs into an error or “NotReady” state, the “progress” section would still be updated on that KafkaRebalance resource reconciliation along with any error.

### Future Improvements

#### Adding “progress” section for other KafkaRebalance states

In addition to the “progress” the “Rebalancing” and “Stopped” KafkaRebalance states, we could provide the “progress” section for other states as well such as the “ProposalReady” and “Ready” states.
Firstly, this would help emphasize that a rebalance had not started or had completed by having a percentageComplete: 0% on "ProposalReady" and a percentageComplete: 100% on "Ready".
This emphasis could help clear up ambiguity surrounding what the KafkaRebalance “Ready” state or “optimizationResult” field means.
Secondly, but more importantly, it would provide an estimate for the minimum time a partition rebalance proposal would take to execute before even executing it.
This feature would be of great value to users.
However, providing an accurate estimation for this is non-trivial, namely the “estimatedTimeToCompletion” field for “ProposalReady" state, is non-trivial.

Leveraging the Cruise Control configurations and user-provided network capacity settings, we could provide a rough estimate for “estimatedTimeToCompletetion” field for inter-broker balances.
However, one challenge is coming up with a method of reliably determining a reasonable estimate for the disk read/write throughput.
It is not so much of an issue for inter-broker rebalance estimates (assuming network is the bottleneck for inter-broker balances) but is certainly an issue for intra-broker rebalance estimates.

Estimation for inter-broker partition rebalance time:
```
# The maximum number of partition movements given CC partition movement cap
max_partition_movements= min(<# of brokers> *
num.concurrent.partition.movements.per.broker)
max_partition_movements=min(max_partition_movements, max.num.cluster.partition.movements)

# The network bandwith given CC bandwith throttle
bandwidth = min(<network_capacity>, replication.throttle)


# The throughput given the max allowed number of partition movements and network bandwidth
throughput = max_partition_movements * bandwidth

estimatedTimeToCompletion = dataToMoveMB / throughput
```

However, without an estimate for disk read/write throughput, it is challenging to provide an accurate estimate for intra-broker rebalances but as mentioned above, getting disk throughput is non-trivial for Strimzi.
We would either need some estimation of the disk throughput, make it user configurable, or hardcode the value ourselves.

```
# The maximum number of partition movements given CC partition movement cap
max_partition_movements= min(<# of brokers> *
num.concurrent.intra.broker.partition.movements.per.broker)
max_partition_movements=min(max_partition_movements, max.num.cluster.movements)

estimatedDiskThroughput = ???

# The throughput given the max allowed number of partition movements and disk throughput
throughput = max_partition_movements * estimatedDiskThroughput

estimatedTimeToCompletion = intraBrokerDataToMoveMB / throughput
```

Given that its inclusion is not completely necessary and adds significant complexity to the proposal, it is out of scope for this proposal.

#### Configurable verbosity for Executor State

When querying the Executor State of the CruiseControlState endpoint directly, we have the option to add a “verbose” parameter to request additional information surrounding the state.
The additional information could be of interest to third-party UI tools for exposing more details of a rebalance or to users debugging a problematic rebalance at the partition level.
However, to reduce the complexity of this initial enhancement, we have chosen not to use the “verbose” parameter.
One concern is that some of the fields like the “pendingParitionMovements” field can cause the JSON output to grow quite large.
For small clusters this is not a problem but for larger production clusters, it is possible this field in addition to others could cause the ConfigMap 1MB limit to be reached.


Querying the Executor State with verbose parameter during an interbroker balance dumps the following JSON payload:
```
{
"abortedPartitionMovement": [],
"abortingPartitionMovement": [],
"abortingPartitions": 0,
"averageConcurrentPartitionMovementsPerBroker": 5,
"completedPartitionMovement": [],
"deadPartitionMovement": [],
"finishedDataMovement": 0,
"inProgressPartitionMovement": [],
"maximumConcurrentPartitionMovementsPerBroker": 5,
"minimumConcurrentPartitionMovementsPerBroker": 5,
"numFinishedPartitionMovements": 0,
"numInProgressPartitionMovements": 0,
"numPendingPartitionMovements": 20,
"numTotalPartitionMovements": 20,
"pendingPartitionMovement": [
{
"executionId": 0,
"proposal": {
"newReplicas": [2, 1, 0],
"oldLeader": 1,
"oldReplicas": [1, 0, 2],
"topicPartition": {
"hash": -290357414,
"partition": 29,
"topic": "strimzi.cruisecontrol.modeltrainingsamples"
}
},
"state": "IN_PROGRESS",
"type": "INTER_BROKER_REPLICA_ACTION"
},
{
"executionId": 1,
"proposal": {
"newReplicas": [0, 2, 1],
"oldLeader": 1,
"oldReplicas": [1, 2, 0],
"topicPartition": {
"hash": -290357693,
"partition": 20,
"topic": "strimzi.cruisecontrol.modeltrainingsamples"
}
},
"state": "IN_PROGRESS",
"type": "INTER_BROKER_REPLICA_ACTION"
},
{
"executionId": 19,
"proposal": {
"newReplicas": [0, 1, 2],
"oldLeader": 1,
"oldReplicas": [1, 0, 2],
"topicPartition": {
"hash": -756317387,
"partition": 11,
"topic": "strimzi.cruisecontrol.partitionmetricsamples"
}
},
"state": "PENDING",
"type": "INTER_BROKER_REPLICA_ACTION"
}
],
"state": "INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS",
"totalDataToMove": 0,
"triggeredSelfHealingTaskId": "",
"triggeredTaskReason": "No reason provided (Client: 172.17.0.1, Date: 2024-11-15T19:41:27Z)",
"triggeredUserTaskId": "0230d401-6a36-430e-9858-fac8f2edde93"
}
```

One way around this issue could be by extending the KafkaRebalance API to make the verbosity of the Executor State request configurable.
This way, users could enable or disable the verbosity depending on their monitoring needs.
That being said, this is left as a potential future improvement where a more thorough investigation can be done and solutions proposed.

### Rejected Alternatives

#### Including “ExecutorState” in KafkaRebalance resource status

Given that some of the information in the Executor State is not relevant to user driven partition rebalances (e.g. triggeredSelfHealingTaskId and triggeredTaskReason) and can be quite verbose (e.g. pendingPartitionMovement list), it is best if we take what we take the high level details we need from the ExecutorState and store the rest somewhere else.

#### Including “ExecutorState” in “afterBeforeLoadConfigmap”

Keeping the ExecutorState in its own ConfigMap as opposed to storing it in the existing “afterBeforeLoadConfigMap” (1) leaves more room for Executor state information should we decide to enable “verbosity” parameter in the future and (2) leaves more room for the broker load information in the “afterBeforeLoadConfigMap”.
For smaller clusters, the space is not an issue but for larger production clusters with a larger number of brokers and partitions we run the risk of hitting the 1MB storage limit of the ConfigMap.
The cost of another ConfigMap is worth avoiding the risk of hitting the limit of the other.

0 comments on commit 169723b

Please sign in to comment.