Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add progress status for partition rebalances #140

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
374 changes: 374 additions & 0 deletions 088-rebalance-progress-status.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,374 @@
# Partition Rebalance Progress Status
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be more "Cluster rebalance progress status" (or rebalancing) ... not sure if "partition" (even using the singular) sounds really fine because it's about a cluster rebalancing and about moving one or more partitions across the cluster.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to the "Adding progress updates for Cruise Control rebalances"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Partition Rebalance Progress Status
# Adding progress updates for Cruise Control rebalances


This proposal introduces a new feature to monitor the progression of an ongoing partition rebalance executed by a Strimzi-managed Cruise Control instance via a `KafkaRebalance` custom resource.

## Current Situation

At this time, Strimzi users are able to execute partition rebalances via `KafkaRebalance` custom resources but can only monitor the progression of those partition rebalances in two ways:

- Manually querying the Cruise Control REST API endpoint directly.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we lock these down with a Network Policy? Would the user have to alter the default set up to get access?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes there are a couple of things that would need special configuration to enable a user to access the CC REST API directly

- Inspecting the logs of the Cruise Control instance.

Unfortunately, neither of these methods are particularly user-friendly.

## Motivation

Information surrounding the progress of an executing partition rebalance is useful for planning future cluster operations.
Knowing things like how much time an ongoing partition rebalance has left to take and how much data an ongoing partition rebalance has left to transfer helps users understand the cost of an ongoing partition rebalance.
This information helps users decide whether they should continue or cancel an ongoing rebalance, and know when future operations will be able to be safely executed.

Further, having this information readily available and easily accessible via Kubernetes primitives, allows users and third-party tools like the Kubernetes CLI or StreamsHub Console to easily track the progression of a partition rebalance.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How many people in the Strimzi community would know about the "StreamsHub Console"? Maybe it deserves a link to the repo or not mentioning it at all?


## Proposal

This proposal extends the status section of the `KafkaRebalance` custom resource to include a `progress` section with a nested `rebalanceProgressConfigMap` field.
This field will reference the `KafkaRebalance`'s existing `ConfigMap`, which will be enhanced to contain information related to an ongoing partition rebalance.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"the KafkaRebalance's existing ConfigMap" ... maybe we should mention this ConfigMap beforehand to explain what it contains currently and from where it is already referenced (see afterBeforeLoadConfigMap field).


The `progress` section of the `KafkaRebalance` resource will look like the following:
```yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
name: my-rebalance
spec: {}
status:
conditions:
- lastTransitionTime: "2024-11-05T15:28:23.995129903Z"
status: "True"
type: ProposalReady | Rebalancing | Stopped | Ready [1]
observedGeneration: 1
optimizationResult:
afterBeforeLoadConfigMap: my-rebalance
dataToMoveMB: 0
excludedBrokersForLeadership: []
excludedBrokersForReplicaMove: []
excludedTopics: []
intraBrokerDataToMoveMB: 0
monitoredPartitionsPercentage: 100
numIntraBrokerReplicaMovements: 0
numLeaderMovements: 16
numReplicaMovements: 0
onDemandBalancednessScoreAfter: 95.4347095948149
onDemandBalancednessScoreBefore: 89.4347095948149
provisionRecommendation: ""
provisionStatus: RIGHT_SIZED
recentWindows: 1
progress: [1]
rebalanceProgressConfigMap: my-rebalance [2]
```
[1] The `progress` section will be visible during the `Ready`, `Rebalancing`, `Stopped` and `Ready` states.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Stopped but not PausedReconciliation or Not Ready? should we explain here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PausedReconciliation is not a valid rebalancing state.

Copy link
Member Author

@kyguy kyguy Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Stopped but not PausedReconciliation or Not Ready? should we explain here?

The PausedReconciliation and NotReady states are not related to the rebalance operation but more to the proposal genration. Therefore, these states don't have any rebalance progress information associated with them

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PausedReconciliation and NotReady states are not related to the rebalance operation but more to the proposal genration.

Well, actually even during a rebalancing you can get errors from CC and the KafkaRebalance ends in the NotReady state, right? But PausedReconciliation is not a valid rebalancing state at all.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, actually even during a rebalancing you can get errors from CC and the KafkaRebalance ends in the NotReady state, right?

Ah, yes you are right, any configuration or rebalance errors will put KafkaRebalance resource in NotReady state. Sorry @PaulRMellor, I was incorrect, NotReady should be supported as well for the same reason the Stopped state is supported, to show how far the rebalance got before it failed. That was a nice spot!

But PausedReconciliation is not a valid rebalancing state at all.

Is that because it is related to the resource and not the rebalance itself? What determines whether it is a valid rebalancing state? I am confused because there is an enum for PausedReconciliation listed in the KafkaRebalanceState class [1]

[1] https://github.com/strimzi/strimzi-kafka-operator/blob/0.45.0/api/src/main/java/io/strimzi/api/kafka/model/rebalance/KafkaRebalanceState.java#L77

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that because it is related to the resource and not the rebalance itself? What determines whether it is a valid rebalancing state? I am confused because there is an enum for PausedReconciliation listed in the KafkaRebalanceState class [1]

But it's ReconciliationPaused not PausedReconciliation! :-P
Joking apart ... I was trying to defend myself because I totally missed this state in the rebalance FSM :-D

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know about it either until Paul mentioned it!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
[1] The `progress` section will be visible during the `Ready`, `Rebalancing`, `Stopped` and `Ready` states.
[1] The `progress` section will be visible during the `ProposalReady`, `Rebalancing`, `Stopped` and `Ready` states.


[2] The `ConfigMap` containing information related to the ongoing partition rebalance
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
[2] The `ConfigMap` containing information related to the ongoing partition rebalance
[2] The `ConfigMap` containing information related to the ongoing partition rebalance.


In the `ConfigMap`, we will add the following fields:
- **estimatedTimeToCompletionInMinutes**: The estimated amount time it will take in minutes until partition rebalance is complete.
- **completedDataMovementPercentage**: The percentage of the data movement of the partition rebalance that is completed e.g. values in the range [0-100]%
- **executorState**: The “non-verbose” JSON payload from the `/kafkacruisecontrol/state?substates=executor` endpoint.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should be more specific about the contents of the executorState field?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not provide a detailed list of everything, maybe just a link to the OpenAPI definition of it on the Cruise Control repo?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree you should give a brief summary of what this is and link to the OpenAPI def upstream.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the link and text suggested by Paul which should be fair compromise

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- **executorState**: The “non-verbose” JSON payload from the `/kafkacruisecontrol/state?substates=executor` endpoint.
- **executorState**: The “non-verbose” JSON payload from the` /kafkacruisecontrol/state?substates=executor` endpoint, providing details about the executor's current status, including partition movement progress, concurrency limits, and total data to move.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added


Since the progress information is constant, we can safely add it to the existing `ConfigMap` maintained for and tied to the `KafkadRebalance` resource.
This keeps `KafkaRebalance` information organized in one place, simplifies the proposal implementation, and has insignificant impact on the storage of the `ConfigMap`.

The enhanced `ConfigMap` of an inter-broker partition rebalance will look like the following:
```yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: my-rebalance
data:
estimatedTimeToCompletionInMinutes: 5m [1]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the estimation counted? Is it reliable? How much is it affected by the issues with unknown real network capacity?

Copy link
Member Author

@kyguy kyguy Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the estimation counted?

Depends on the KafkaRebalance state, the specific details per state are in the "Field: estimatedTimeToCompletionInMinutes" section of the proposal.

Is it reliable?

In general, yes. The value for the Stopped and Ready states are hardcoded and the value for the Rebalancing state is based on the average rate of data transfer and easily calculated without the need of any user or capacity settings. The only state that could be potentially problematic is the estimation for the ProposalReady state which relies on accurate network capacity configuration from the user.

How much is it affected by the issues with unknown real network capacity?

If the default or user-configured network capacity is largely different from the real network capacity, the estimation for the ProposalReady state could be inaccurate. If the real network capacity is underestimated, the rebalance could take much less time than than estimatedTimeToCompletionInMinutes to complete. If the real network capacity is overestimated, the rebalance could take much more time than the estimatedTimeToCompletionInMinutes to complete. The latter case wouldn't be as much of an issue as we advertise estimatedTimeToCompletionInMinutes to be a theoretical minimum estimation in the ProposalReady state. However, the former case would be an issue since the estimatedTimeToCompletionInMinutes value wouldn't be a theoretical minimum.

To avoid issues like this, the current plan is to document the users must provide accurate network capacity settings to have accurate estimatedTimeToCompletionInMinutes values in the ProposalReady state. We already documented that users must provide accurate network capacity settings to have accurate rebalances based on network capacity and distribution anyway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid issues like this, the current plan is to document the users must provide accurate network capacity settings to have accurate estimatedTimeToCompletionInMinutes values in the ProposalReady state. We already documented that users must provide accurate network capacity settings to have accurate rebalances based on network capacity and distribution anyway.

I'm not sure this is a real solution. Do you really believe they configure the accurate network capacity? Do we even know how would they find out the accurate network capacity? Or will we solve it on paper but 99% or users will have it miscofigured and these numbers will be useless?

Copy link
Member Author

@kyguy kyguy Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really believe they configure the accurate network capacity?

Users that are serious about their network resource usage and distribution do

Do we even know how would they find out the accurate network capacity?

I imagine they would use K8s CNI plugins or network performance benchmark tools

Or will we solve it on paper but 99% or users will have it configured and these numbers will be useless?
I'm not sure this is a real solution.

For users that have network capacity properly configured, this feature is still useful. I admit that I don't know how many Strimzi users configure their network capacity settings but I would like to believe that those that are doing it are doing it accurately. In addition to the network capacity documentation, what if we were to only include this estimation in the ProposalReady state for users that explicitly configured their network capacity settings? Would that be a more reasonable solution?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to the network capacity documentation, what if we were to only include this estimation in the ProposalReady state for users that explicitly configured their network capacity settings? Would that be a more reasonable solution?

I guess it could be a viable solution. If the user is setting the network capacity I would assume they know what to put there, if not and they put a wrong/bad value, they should know that it's going to screw up the estimation. The documentation should state that. If we think it's not a viable solution then we should remove the estimatedTimeToCompletionInMinutes from the overall proposal. But I am for taking it and documenting it properly.

@tomncooper wdyt about the above discussion?

Copy link
Contributor

@fvaleri fvaleri Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO PropsalReady estimation is useful, because what really matters to users is to know if it would take minutes, hours, or days (see Windows file copy). If we could compute the average bandwidth from Kafka metrics, then we could use this value to provide a more accurate estimation independently from the user configuration.

completedDataMovementPercentage: 80% [2]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should either have the unit in the value and have users parse it or have it in the name and use an integer / double only. I'm fine with both ways, but you should pick one.

Copy link
Member

@ppatierno ppatierno Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My view is ...
For the completedDataMovementPercentage field it doesn't make much sense to have the % symbol in the value so I would be to just remove it and having completedDataMovementPercentage: 80.
Regarding the estimatedTimeToCompletionInMinutes, it could depends if we want the flexibility of showing the value in a different unit, but I don't see any value in it. I mean we could have estimatedTimeToCompletion: 300000ms or estimatedTimeToCompletion: 5m to say the same but does it make really sense? for this reason I would be more for just estimatedTimeToCompletionInMinutes: 5. The rebalancing is a long process and showing 1 minute or just 0 minute for a remaining time which is less than a minute could make sense (instead of something like estimatedTimeToCompletion: 36s).

executorState: [3]
{
"abortingPartitions":0,
"averageConcurrentPartitionMovementsPerBroker":5,
"finishedDataMovement":0,
"maximumConcurrentPartitionMovementsPerBroker":5,
"minimumConcurrentPartitionMovementsPerBroker":5,
"numFinishedPartitionMovements":0,
"numInProgressPartitionMovements":0,
"numPendingPartitionMovements":20,
"numTotalPartitionMovements":20,
"state":"INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS",
"totalDataToMove":0,
"triggeredSelfHealingTaskId":"",
"triggeredTaskReason":"No reason provided (Client: 172.17.0.1, Date: 2024-11-15T19:41:27Z)",
"triggeredUserTaskId":"0230d401-6a36-430e-9858-fac8f2edde93"
}
brokerLoad.json: {...} [4]
```
[1] The estimated time it will take in minutes for the rebalance to complete based on the average rate of data transfer.

[2] The percentage complete of the ongoing rebalance in the range [0-100]%
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
[2] The percentage complete of the ongoing rebalance in the range [0-100]%
[2] The percentage complete of the ongoing rebalance in the range [0-100]%.


[3] The “non-verbose” JSON payload from [/kafkacruisecontrol/state?substates=executor](https://github.com/linkedin/cruise-control/wiki/REST-APIs#query-the-state-of-cruise-control) endpoint.

[4] The broker load from the optimization proposal as a JSON string that already maintained in the `ConfigMap`.

### Field values per `KafkaRebalance` State

We will provide the progress information for the `KafkaRebalance` states where it is relevant:

- `ProposalReady`
- `Rebalancing`
- `Stopped`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will be the actual values in these states?

  • Proposal ready -> 0% completion and the estimated time from once it would be approved?
  • Rebalancing -> an up to date information?
  • Stopped -> the last infor before it was stopped?
  • Ready -> 100% and 0 minutes remaining?

Maybe you can describe it here in bullet points in a human readable form and leave the formulas below for experts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a summary like this would be useful

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

- `Ready`

All the information required for estimating the values of `estimatedTimeToCompletionInMinutes` and `completedDataMovementPercentage` fields can be derived from the Cruise Control server configurations and the [/kafkacruisecontrol/state?substates=executor](https://github.com/linkedin/cruise-control/wiki/REST-APIs#query-the-state-of-cruise-control) REST API endpoint.
However, the actual formulas used to produce values for these fields depend on the state of the `KafkaRebalance` resource.
Checkout the example in the [Field: executorState](#field-executorstate) section to see where the fields used in the formulas below come from.

### Field: `estimatedTimeToCompletionInMinutes`

The estimated time it will take in minutes for a rebalance to complete based on the average rate of data transfer.

The formulas used to calculate field value per `KafkaRebalance` state:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the formulas here ... will the CO calculate these? Or does CC calculate this and we just show the numbers?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the CO calculating these.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a sentence in the section above this sentence to make this more clear

"All the information required for the Cluster Operator to estimate the values of estimatedTimeToCompletionInMinutes and completedDataMovementPercentage fields"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The formulas used to calculate field value per `KafkaRebalance` state:
The formulas used to calculate the field value differ for each applicable `KafkaRebalance` state:


#### State: `ProposalReady`

**Estimation for inter-broker rebalance:**

This estimate will be a theoretical minimum derived from Cruise Control capacity and throttle configurations.
This means that the cluster rebalance would take at least the estimated amount of time to complete.

$$\text{maxPartitionMovements}_{[1]} = \min(\text{numberOfBrokers} \times \text{num.concurrent.partition.movements.per.broker}),\text{max.num.cluster.partition.movements})$$
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't follow this calculation, is it calculating the maximum number of non-concurrent partition movements? I can't work out why the number of brokers is needed. Isn't the worst case scenario that all movements have to happen from a single broker, so should it be max.num.cluster.partition.movements/num.concurrent.partition.movements.per.broker? Also it looks like you are missing a bracket somewhere, possible at the beginning of numberOfBrokers x num.concurrent.partition.movements.per.broker?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it calculating the maximum number of non-concurrent partition movements?

The maximum number of concurrent partition movements. Just updated the variable name to make this more clear.

I can't work out why the number of brokers is needed. Isn't the worst case scenario that all movements have to happen from a single broker, so should it be

This calculation is meant to be a theoretical minimum, the best case scenario, the least amount of time a rebalance would take to complete given ideal conditions (if the maximum allowed number of concurrent partitions movements per broker were moved concurrently and the available bandwidth was perfectly utilized). In reality, the rebalance will take longer than the theoretical minimum but it is still useful to know that the rebalance will take at least this estimated amount of time.

In the best case scenario, we are moving as many partitions concurrently as the brokers will allow. To calculate how many partitions can be move concurrently cluster-wide, we need the number of brokers.

Does that make sense? Would it help if I added annotations/descriptions for the CC configurations that are used in the formulas>

Also it looks like you are missing a bracket somewhere, possible at the beginning of numberOfBrokers x num.concurrent.partition.movements.per.broker?

Yes, that is a typo! Thanks for spotting!


$$\text{bandwidth}_{[2]} = \min(\text{networkCapacity}, \text{replication.throttle})$$

$$\text{throughput}_{[3]} = \text{maxPartitionMovements} \times \text{bandwidth}$$

$$\text{estimatedTimeToCompletionInMinutes} = \frac{\text{dataToMoveMB}}{\text{throughput}}$$

Notes:
- [1] The maximum number of partition movements given Cruise Control partition movement capacity.

- [2] The network bandwidth given Cruise Control bandwidth throttle.

- [3] The throughput given the max allowed number of partition movements and network bandwidth.

**Estimation for intra-broker rebalance:**

It is challenging to provide an accurate estimate for intra-broker rebalances without an estimate for disk read/write throughput and getting disk throughput is non-trivial for Strimzi.
However, by using the network bandwidth in place of the disk throughput, we can provide a rough estimate of how long the rebalance would take.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"by using the network bandwidth in place of the disk throughput" why do you think that we can do this "replacement" to get a rough estimation? I am not sure about that. I was wondering if we should just avoid this estimation if we don't have a good way for that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming the disk throughput is always greater than the network bandwidth, an estimate using the network bandwidth could serve as an upperbound (theoretical maximum) of how long an intra-broker rebalance would take. e.g the rebalance won't take longer than this. However, this contradicts the definition provided by the inter-broker balance and may cause confusion. We could simply set the value to N/A for now to avoid confusion/inaccuracy.

Would you mind if we left this estimate out for intra-broker balances @tomncooper?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that is what we were going to do? Basically we don't include the intra estimate as we can't reliably calculate it. So the time to completion is always a theoretical minimum, it WILL take longer than this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I would leave this estimation out imho.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated proposal to explain this and suggest that we set it to "N/A"


$$\text{maxPartitionMovements}_{[1]} = \min\left(\text{numberOfBrokers} \times \text{num.concurrent.intra.broker.partition.movements.per.broker}),\text{max.num.cluster.movements}\right)$$
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar here to previous comments, you're missing a bracket in the formula, but I'm also not clear what maxPartitionMovements is really representing


$$\text{estimatedDiskThroughput} = \text{???}_{[2]}$$

$$\text{bandwidth}_{[3]} = \min(\text{networkCapacity}, \text{replication.throttle})$$

$$\text{throughput}_{[4]} = \text{maxPartitionMovements} \times
\begin{cases}
\text{estimatedDiskThroughput}, & \text{if available} \\
\text{bandwidth}, & \text{otherwise}
\end{cases}
$$

$$\text{estimatedTimeToCompletionInMinutes} = \frac{\text{intraBrokerDataToMoveMB}}{\text{throughput}}$$

Notes
- [1] The maximum number of partition movements given Cruise Control partition movement capacity
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- [1] The maximum number of partition movements given Cruise Control partition movement capacity
- [1] The maximum number of partition movements given Cruise Control partition movement capacity.

- [2] We don't have a method of determining disk throughput.
- [3] The network bandwidth given Cruise Control bandwidth throttle.
- [4] The throughput given the max allowed number of partition movements and disk throughput.

#### State: `Rebalancing`

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This formula might benefit from an introduction like the others.

$$
\text{rate} = \frac{\text{finishedDataMovement}\_{[1]}}{\text{taskTriggerTime}\_{[2]} - \text{currentTime}}
$$

$$
\text{estimatedTimeToCompletionInMinutes} = \frac{\text{totalDataToMove}\_{[3]} - \text{finishedDataMovement}\_{[1]}}{\text{rate}}
$$

Notes
- [1] The number of megabytes already moved by rebalance, provided by [/kafkacruisecontrol/state?substates=executor](#field-executorstate) REST API endpoint.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put the specific field we are using from the returned JSON?

- [2] The time when the rebalance task was started, extracted from `triggeredTaskReason` field from the [/kafkacruisecontrol/state?substates=executor](#field-executorstate) for that task.
- [3] The total number of megabytes planned to be moved for rebalance, provided from json payload of the [/kafkacruisecontrol/state?substates=executor](#field-executorstate) REST API endpoint.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- [3] The total number of megabytes planned to be moved for rebalance, provided from json payload of the [/kafkacruisecontrol/state?substates=executor](#field-executorstate) REST API endpoint.
- [3] The total number of megabytes planned to be moved for rebalance, provided from the JSON payload of the [/kafkacruisecontrol/state?substates=executor](#field-executorstate) REST API endpoint.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put the specific field we are using from the returned JSON?


#### State: `Stopped`

$$
\text{estimatedTimeToCompletionInMinutes} = \text{N/A}
$$

Once a rebalance has been stopped, it cannot be resumed.
Therefore, there is no `estimatedTimeToCompletionInMinutes` for a stopped rebalance. We set the field to `N/A` to emphasize this.
To move from the `Stopped` state, a user must refresh the `KafkaRebalance` resource, the `rebalanceProgressConfigMap` will then be updated by the operator upon the next state change.

#### State: `Ready`

$$
\text{estimatedTimeToCompletionInMinutes} = 0
$$

The rebalance is complete so we hardcode the value to `0`
This emphasizes that the rebalance is complete and helps clear up ambiguity surrounding what the `Ready` state means in the `KafkaRebalance` resource.

### Field: `completedDataMovementPercentage`

The percentage of the data movement of the partition rebalance that is completed.

The formulas used to calculate field value per `KafkaRebalance` state:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The formulas used to calculate field value per `KafkaRebalance` state:
The formulas used to calculate the field value differ for each applicable `KafkaRebalance` state:


#### State: `ProposalReady`

$$
\text{completedDataMovementPercentage} = 0\%
$$

The rebalance has not started yet so no data has been moved, so we hardcode the value to `0`.
This emphasizes that the rebalance has not started and helps clear up ambiguity surrounding what the `optimizationResult` field means in the `KafkaRebalance` resource.

#### State: `Rebalancing`

$$
\text{completedDataMovementPercentage} = (\frac{\text{finishedDataMovement}\_{[1]}}{\text{totalDataToMove}\_{[2]}} \times 100)\%
$$

Notes
- [1] The number of megabytes already moved by rebalance, provided by [/kafkacruisecontrol/state?substates=executor](#field-executorstate) REST API endpoint.
- [2] The total number of megabytes planned to be moved for rebalance, provided by [/kafkacruisecontrol/state?substates=executor](#field-executorstate) REST API endpoint.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put the specific fields we are using from the returned JSON for both the above?


#### State: `Stopped`

$$
\text{completedDataMovementPercentage} = \langle \text{previous completedDataMovementPercentage} \rangle
$$

Once a rebalance has been stopped, it cannot be resumed.
However, the `completedDataMovementPercentage` information from when before the rebalance was stopped may still be of value to users.
Therefore, we reuse the same value of `completedDataMovementPercentage` from the previous update.

#### State: `Ready`

$$
\text{completedDataMovementPercentage} = 100\%
$$

The rebalance is complete so we hardcode the value to `100`%.
This emphasizes that the rebalance is complete and helps clear up ambiguity surrounding what the `Ready` state means in the `KafkaRebalance` resource.

### Field: `executorState`

The "non-verbose" JSON payload of the [/kafkacruisecontrol/state?substates=executor](https://github.com/linkedin/cruise-control/wiki/REST-APIs#query-the-state-of-cruise-control) endpoint of the Cruise Control REST API.

For determining which fields are included for different executor states (`NO_TASK_IN_PROGRESS`, `STARTING_EXECUTION`, `INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS` etc), refer to the code [here](https://github.com/linkedin/cruise-control/blob/2.5.141/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorState.java#L427).
For an exhaustive list of all the fields that could be included, see the Cruise Control’s OpenAPI spec [here](https://github.com/linkedin/cruise-control/blob/2.5.141/cruise-control/src/main/resources/yaml/responses/executorState.yaml)

Example of the Executor State JSON payload during an inter-broker balance:
```json
{
"abortingPartitions": 0,
"averageConcurrentPartitionMovementsPerBroker": 5,
"finishedDataMovement": 0,
"maximumConcurrentPartitionMovementsPerBroker": 5,
"minimumConcurrentPartitionMovementsPerBroker": 5,
"numFinishedPartitionMovements": 0,
"numInProgressPartitionMovements": 0,
"numPendingPartitionMovements": 20,
"numTotalPartitionMovements": 20,
"state": "INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS",
"totalDataToMove": 0,
"triggeredSelfHealingTaskId": "",
"triggeredTaskReason": "No reason provided (Client: 172.17.0.1, Date: 2024-11-15T19:41:27Z)",
"triggeredUserTaskId": "0230d401-6a36-430e-9858-fac8f2edde93"
}
```

The formulas used to calculate field value per `KafkaRebalance` state:

#### State: `Ready`

The rebalance has not started yet so no data surrounding partition movement is available yet, so we set the field to an empty JSON object.

#### State: `Rebalancing`

The field will be assigned the "non-verbose" JSON payload of the [/kafkacruisecontrol/state?substates=executor](https://github.com/linkedin/cruise-control/wiki/REST-APIs#query-the-state-of-cruise-control) endpoint of the Cruise Control REST API.

#### State: `Stopped`

Once a rebalance has been stopped, it cannot be resumed.
However, the `executorState` information from when before the rebalance was stopped may still be of value to users.
Therefore, we reuse the same value of `executorState` from the previous update.

#### State: `Ready`

The rebalance is complete in this state so we hardcode the value to an empty JSON object.

### Progress Update Cadence

For ease of implementation and minimizing the load on the CruiseControl REST API server, the operator will only query the `/kafkacruisecontrol/state?substates=executor` endpoint and update the `ConfigMap` upon `KafkaRebalance` resource reconciliation.

In the event that Cruise Control runs into an error when rebalancing, the operator will transition the `KafkaRebalance` resource to the `NotReady` state, remove the `progress` section, and delete the progress `ConfigMap`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"delete the progress ConfigMap" or "delete the progress section of the ConfigMap"?
Remember we are using the same ConfigMap for two purposes (also storing the before/after load data).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this consistent with what currently happens if we fail to get a response from CC in a single reconciliation? Does the CC client retry? Not sure setting the KR CR to NotReady for what could be a simple network blip is good UX.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete the progress ConfigMap" or "delete the progress section of the ConfigMap"?
Remember we are using the same ConfigMap for two purposes (also storing the before/after load data).

I am going to update the behavior described here to retain the progress information and ConfigMap when the KafkaRebalance resource moves to the NotReady state. As discussed in a previous thread with Paul, the progress information in the NotReady state may just be as useful for debugging as it is in the Stopped.

Is this consistent with what currently happens if we fail to get a response from CC in a single reconciliation? Does the CC client retry? Not sure setting the KR CR to NotReady for what could be a simple network blip is good UX.

This line was intended to describe how the progress information will be updated when CC server returns "CompletedWithError" status for a task. From what I understood when writing this, this was the only situation where the KR resource was moved to the NotReady state. But looking closer at the code, it appears I am wrong.

It appears the CO will move the KR resource to the NotReady state when it fails to get a response from the CC server, it also looks like the CO CC client code does not retry when failing to get a response (unless I am missing some retry logic in the code). This means that if the CO fails to get a response from CC server it will set the KR CR is set to Not Ready. I thought this only happened when there was an "CompleteWithError" response returned by the CC server, not when there was a failed HTTP request.

The proposal suggests attempting to retrieve the executor status but whether the retrieval succeeds or fails has no affect on the state of the KafkaRebalance resource.

Of course, the CC client, wherever it is used, should and will be implemented to retry

In the event that the Cruise Control REST API returns an error or fails to respond to the operator when querying the `/kafkacruisecontrol/state?substates=executor` endpoint during a rebalance, the operator will add a "Warning" condition with error message from the failed REST API call but leave the existing progress section and progress `ConfigMap` as is.

When Cruise Control state retrieval fails, the `KafkaRebalance` resource will be updated as follows:

```yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
name: my-rebalance
...
spec: {}
status:
conditions:
- lastTransitionTime: "2024-11-05T15:28:23.995129903Z"
status: "True"
type: Warning
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call out this property to highlight and distinguish between NotReady and (new?) Warning?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type of condition in the status could be any one of the KafkaRebalance states, it could also be Warning. Why would the NotReady type be tied/associated to the Warning type?

reason: CruiseControlRestException
message: <error_message> [1]
observedGeneration: 1
optimizationResult:
afterBeforeLoadConfigMap: my-rebalance
dataToMoveMB: 0
excludedBrokersForLeadership: []
excludedBrokersForReplicaMove: []
excludedTopics: []
intraBrokerDataToMoveMB: 0
monitoredPartitionsPercentage: 100
numIntraBrokerReplicaMovements: 0
numLeaderMovements: 16
numReplicaMovements: 0
onDemandBalancednessScoreAfter: 95.4347095948149
onDemandBalancednessScoreBefore: 89.4347095948149
provisionRecommendation: ""
provisionStatus: RIGHT_SIZED
recentWindows: 1
progress:
rebalanceProgressConfigMap: my-rebalance-progress
```
[1] Error message from failed Cruise Control REST API call
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
[1] Error message from failed Cruise Control REST API call
[1] Error message from failed Cruise Control REST API call.


### Accessing progress fields using Kubernetes CLI

The progress information will be stored in a `ConfigMap` with the same name as the `KafkaRebalance` resource.
Using the name of the ConfigMap we can view its data from the command line using the Kubernetes CLI.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Using the name of the ConfigMap we can view its data from the command line using the Kubernetes CLI.
Using the name of the `ConfigMap`, we can view its data from the command line using the Kubernetes CLI.


Example accessing `estimatedTimeToCompletionInMinutes` field.
```shell
kubectl get configmaps <my_rebalance_configmap_name> -o json | jq '.["data"]["estimatedTimeToCompletionInMinutes"]'
```

Example accessing `completedDataMovementPercentage` field.
```shell
kubectl get configmaps <my_rebalance_configmap_name> -o json | jq '.["data"]["completedDataMovementPercentage"]'
```

Example accessing `executorState` field.
```shell
kubectl get configmaps <my_rebalance_configmap_name> -o json | jq '.["data"]["executorState"] | fromjson | .'
```

### Rejected Alternatives

#### Maintaining progress fields in KafkaRebalance resource status
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#### Maintaining progress fields in KafkaRebalance resource status
#### Maintaining progress fields in `KafkaRebalance` resource status


Given that the progress information included in a `KafkaRebalance` resource:
- Contains information irrelevant to user driven partition rebalances (e.g. triggeredSelfHealingTaskId, triggeredTaskReason)
- Can be quite verbose (e.g. pendingPartitionMovement list)
- Requires tracking a "last updated" timestamp and special logic in the rebalance operator to avoid triggering recursive operator reconciliations

it is best if we maintain the progress information somewhere else.