Skip to content

Commit

Permalink
Update wording and formatting
Browse files Browse the repository at this point in the history
Signed-off-by: Kyle Liberti <[email protected]>
  • Loading branch information
kyguy committed Dec 18, 2024
1 parent d1433d8 commit bc7f1ed
Showing 1 changed file with 94 additions and 64 deletions.
158 changes: 94 additions & 64 deletions 088-rebalance-progress-status.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ The `progress` section of the `KafkaRebalance` resource would look like the foll
```yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
name: my-rebalance
spec: {}
status:
conditions:
Expand Down Expand Up @@ -89,13 +91,12 @@ data:
"triggeredTaskReason":"No reason provided (Client: 172.17.0.1, Date: 2024-11-15T19:41:27Z)",
"triggeredUserTaskId":"0230d401-6a36-430e-9858-fac8f2edde93"
}
```
[1] The estimated time it will take the rebalance to complete based on the average rate of data transfer.

[2] The percentage complete of the ongoing rebalance in the range [0-100]%

[3] The “non-verbose” JSON payload from [/kafkacruisecontrol/state?substates=executor](#executor-state) endpoint.
[3] The “non-verbose” JSON payload from [/kafkacruisecontrol/state?substates=executor](https://github.com/linkedin/cruise-control/wiki/REST-APIs#query-the-state-of-cruise-control) endpoint.

### Field values per `KafkaRebalance` State

Expand All @@ -110,51 +111,73 @@ Further discussion on the inclusion of the progress information for these other

All the information required for estimating the values of `estimatedTimeToCompletion` and `percentageDataMovementComplete` fields can be derived from either the Cruise Control server configurations or the [/kafkacruisecontrol/state?substates=executor](https://github.com/linkedin/cruise-control/wiki/REST-APIs#query-the-state-of-cruise-control) REST API endpoint.
However, the actual formula used to produce values for these fields depends on the state of the `KafkaRebalance` resource.
Checkout the example in the [Executor State](#executorState) section to see where the fields used in the formulas below come from.
Checkout the example in the [Field: executorState](#field-executorstate) section to see where the fields used in the formulas below come from.

### Field: `estimatedTimeToCompletion`

#### estimatedTimeToCompletion
The estimated time it will take a rebalance to complete based on the average rate of data transfer.

The formulas used to calculate field value per `KafkaRebalance` state:

##### Rebalancing
#### State: `Rebalancing`

$$
\text{rate} = \frac{\text{finishedDataMovement}}{\text{taskTriggerTime}^{[1]} - \text{currentTime}}
\text{rate} = \frac{\text{finishedDataMovement}\_{[1]}}{\text{taskTriggerTime}\_{[2]} - \text{currentTime}}
$$

$$
\text{estimatedTimeToCompletion} = \frac{\text{totalDataToMove} - \text{finishedDataMovement}}{\text{rate}}
\text{estimatedTimeToCompletion} = \frac{\text{totalDataToMove}\_{[3]} - \text{finishedDataMovement}\_{[1]}}{\text{rate}}
$$

[1] `taskTriggerTime` is the time when the rebalance task was started, extracted from `triggeredTaskReason` field from the [Executor State](#executorState) for that task.
Notes
- [1] `finishedDataMovement` is the number of megabytes already moved by rebalance, provided by [/kafkacruisecontrol/state?substates=executor](#field-executorstate) REST API endpoint.
- [2] `taskTriggerTime` is the time when the rebalance task was started, extracted from `triggeredTaskReason` field from the [/kafkacruisecontrol/state?substates=executor](#field-executorstate) for that task.
- [3] `totalDataMovement` is the total number of megabytes planned to be moved for rebalance, provided from json payload of the [/kafkacruisecontrol/state?substates=executor](#field-executorstate) REST API endpoint.

##### Stopped

Once a rebalance has been stopped, it cannot be resumed.
Therefore, there is no `estimatedTimeToCompletion` for a stopped rebalance, so we set the field to `N/A` to emphasize this.
To move from the `Stopped` state, a user must refresh the `KafkaRebalance` resource, the `rebalanceProgressConfigMap` will then be updated with the next state change.
#### State: `Stopped`

$$
\text{estimatedTimeToCompletion} = \text{N/A}
$$

#### percentageDataMovementComplete
Once a rebalance has been stopped, it cannot be resumed.
Therefore, there is no `estimatedTimeToCompletion` for a stopped rebalance. We set the field to `N/A` to emphasize this.
To move from the `Stopped` state, a user must refresh the `KafkaRebalance` resource, the `rebalanceProgressConfigMap` will then be updated by the operator upon the next state change.

##### Rebalancing

### Field: `percentageDataMovementComplete`

The percentage of the data movement of the partition rebalance that is completed.

The formulas used to calculate field value per `KafkaRebalance` state:

#### State: `Rebalancing`

$$
\text{percentageDataMovementComplete} = (\frac{\text{finishedDataMovement}}{\text{totalDataToMoveMB}} \times 100)\text{%}
\text{percentageDataMovementComplete} = (\frac{\text{finishedDataMovement}\_{[1]}}{\text{totalDataToMove}\_{[2]}} \times 100)\%
$$

##### Stopped
Notes
- [1] `finishedDataMovement` is the number of megabytes already moved by rebalance, provided by [/kafkacruisecontrol/state?substates=executor](#field-executorstate) REST API endpoint.
- [2] `totalDataMovement` is the total number of megabytes planned to be moved for rebalance, provided from json payload of the [/kafkacruisecontrol/state?substates=executor](#field-executorstate) REST API endpoint.

#### State: `Stopped`

$$
\text{percentageDataMovementComplete} = \langle \text{previous percentageDataMovementComplete} \rangle
$$

Once a rebalance has been stopped, it cannot be resumed.
However, the `percentageDataMovementComplete` information from when before the rebalance was stopped may still be of value to users.
Therefore, we reuse the same value of `percentageDataMovementComplete` from the latest update.
Therefore, we reuse the same value of `percentageDataMovementComplete` from the previous update.

#### executorState
### Field: `executorState`

##### Rebalancing
The "non-verbose" JSON payload of the [/kafkacruisecontrol/state?substates=executor](https://github.com/linkedin/cruise-control/wiki/REST-APIs#query-the-state-of-cruise-control) endpoint of the Cruise Control REST API.

The `executorState` field will be populated with the "non-verbose" JSON payload of the [/kafkacruisecontrol/state?substates=executor](https://github.com/linkedin/cruise-control/wiki/REST-APIs#query-the-state-of-cruise-control) endpoint of the Cruise Control REST API.
For determining which fields are included for different executor states (`NO_TASK_IN_PROGRESS`, `STARTING_EXECUTION`, `INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS` etc), refer to the code [here](https://github.com/linkedin/cruise-control/blob/2.5.141/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorState.java#L427).
For an exhaustive list of all the fields that could be included, see the Cruise Control’s OpenAPI spec [here](https://github.com/linkedin/cruise-control/blob/2.5.141/cruise-control/src/main/resources/yaml/responses/executorState.yaml)

Example of the Executor State JSON payload during an inter-broker balance:
```json
Expand All @@ -175,28 +198,41 @@ Example of the Executor State JSON payload during an inter-broker balance:
"triggeredUserTaskId": "0230d401-6a36-430e-9858-fac8f2edde93"
}
```
For determining which fields are included for different executor states (`NO_TASK_IN_PROGRESS`, `STARTING_EXECUTION`, `INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS` etc) and whether the verbose parameter is provided or not, refer to the code [here](https://github.com/linkedin/cruise-control/blob/2.5.141/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorState.java#L427).

For an exhaustive list of all the fields that could be included, see the Cruise Control’s OpenAPI spec [here](https://github.com/linkedin/cruise-control/blob/2.5.141/cruise-control/src/main/resources/yaml/responses/executorState.yaml)
The formulas used to calculate field value per `KafkaRebalance` state:

#### State: `Rebalancing`

##### Stopped
$$
\text{executorState} = \langle \text{JSON payload from "/kafkacruisecontrol/state?substates=executor" endpoint} \rangle
$$

#### State: `Stopped`

$$
\text{executorState} = \langle \text{Previous JSON payload from "/kafkacruisecontrol/state?substates=executor" endpoint} \rangle
$$

Once a rebalance has been stopped, it cannot be resumed.
However, the `executorState` information from when before the rebalance was stopped may still be of value to users.
Therefore, we reuse the same value of `executorState` from the latest update.
Therefore, we reuse the same value of `executorState` from the previous update.


### Progress Update Cadence

For ease of implementation and minimizing the load on the CruiseControl REST API server, the operator will only query the `/kafkacruisecontrol/state?substates=executor` endpoint and update the progress `ConfigMap` upon `KafkaRebalance` resource reconciliation.

In the event that Cruise Control runs into an error when rebalancing, the operator will transition the `KafkaRebalance` resource to the `NotReady` state, remove the `progress` section, and delete the progress `ConfigMap`.
In the event that the Cruise Control REST API returns an error or fails to respond to the operator when querying the `/kafkacruisecontrol/state?substates=executor` endpoint during a rebalance, the operator will add a condition entry for the `Rebalancing` type with the message "Failed to retrieve rebalance progress" but leave the progress section and referenced progress `ConfigMap` as is.
In the event that the Cruise Control REST API returns an error or fails to respond to the operator when querying the `/kafkacruisecontrol/state?substates=executor` endpoint during a rebalance, the operator will add a condition entry for the `Rebalancing` type with the message "Failed to retrieve rebalance progress" but leave the existing progress section and progress `ConfigMap` as is.

When Cruise Control state retrievel failed, the `KafkaRebalance` resource would be updated like this:
When Cruise Control state retrievel failed, the `KafkaRebalance` resource would be updated as follows:

```yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
name: my-rebalance
...
spec: {}
status:
conditions:
Expand Down Expand Up @@ -227,7 +263,7 @@ status:

### Future Improvements

#### Support progress information for other KafkaRebalance states
### Support progress information for other KafkaRebalance states

In addition to the “progress” the `Rebalancing` and `Stopped` KafkaRebalance states, we could provide the `progress` section and `ConfigMap` for other states as well such as the `ProposalReady` and `Ready` states.
Firstly, this would help emphasize that a rebalance had not started or had completed by having a `percentageDataMovementComplete`: 0% on `ProposalReady` and a `percentageDataMovementComplete`: 100% on `Ready`.
Expand All @@ -238,67 +274,61 @@ However, providing an accurate estimation for this is non-trivial, namely the `e

Leveraging the Cruise Control configurations and user-provided network capacity settings, we could provide a rough estimate for `estimatedTimeToCompletetion` field for inter-broker movements.
However, one challenge is coming up with a method of reliably determining a reasonable estimate for the disk read/write throughput.
It is not so much of an issue for inter-broker rebalance estimates (assuming network is the bottleneck for inter-broker balances) but is certainly an issue for intra-broker rebalance estimates.
It is not so much of an issue for inter-broker rebalance estimates (assuming network is the bottleneck for inter-broker balances) but is certainly an issue for intra-broker rebalance estimates.

Estimation for inter-broker partition rebalance time:
### Field `estimatedTimeToCompletion`

The maximum number of partition movements given CC partition movement cap
##### State: `ProposalReady`

$$
\text{maxPartitionMovements} = \min\text{numberOfBrokers} \times \text{num.concurrent.partition.movements.per.broker}),\text{max.num.cluster.partition.movements})
$$
**Estimation for inter-broker rebalance:**

The network bandwidth given CC bandwidth throttle
$$\text{maxPartitionMovements}_{[1]} = \min\text{numberOfBrokers} \times \text{num.concurrent.partition.movements.per.broker}),\text{max.num.cluster.partition.movements})$$

$$
\text{bandwidth} = \min(\text{networkCapacity}, \text{replication.throttle})
$$
$$\text{bandwidth}_{[2]} = \min(\text{networkCapacity}, \text{replication.throttle})$$

The throughput given the max allowed number of partition movements and network bandwidth
$$\text{throughput}_{[3]} = \text{maxPartitionMovements} \times \text{bandwidth}$$

$$
\text{throughput} = \text{maxPartitionMovements} \times \text{bandwidth}
$$

$$
\text{estimatedTimeToCompletion} = \frac{\text{dataToMoveMB}}{\text{throughput}}
$$
$$\text{estimatedTimeToCompletion} = \frac{\text{dataToMoveMB}}{\text{throughput}}$$

However, without an estimate for disk read/write throughput, it is challenging to provide an accurate estimate for intra-broker rebalances but as mentioned above, getting disk throughput is non-trivial for Strimzi.
We would either need some estimation of the disk throughput, make it user configurable, or hardcode the value ourselves.
Notes:
- [1] The maximum number of partition movements given Cruise Control partition movement cap

The maximum number of partition movements given CC partition movement cap
- [2] The network bandwidth given Cruise Control bandwidth throttle

$$
\text{maxPartitionMovements} = \min\left(\text{numberOfBrokers} \times \text{num.concurrent.intra.broker.partition.movements.per.broker}),\text{max.num.cluster.movements}\right)
$$
- [3] The throughput given the max allowed number of partition movements and network bandwidth

$$
\text{estimatedDiskThroughput} = \text{???}
$$
**Estimation for intra-broker rebalance:**

The throughput given the max allowed number of partition movements and disk throughput
It is challenging to provide an accurate estimate for intra-broker rebalances without an estimate for disk read/write throughput, and as mentioned above, getting disk throughput is non-trivial for Strimzi.

$$
\text{throughput} = \text{maxPartitionMovements} \times \text{estimatedDiskThroughput}
$$
$$\text{maxPartitionMovements}_{[1]} = \min\left(\text{numberOfBrokers} \times \text{num.concurrent.intra.broker.partition.movements.per.broker}),\text{max.num.cluster.movements}\right)$$

$$
\text{estimatedTimeToCompletion} = \frac{\text{intraBrokerDataToMoveMB}}{\text{throughput}}
$$
$$\text{estimatedDiskThroughput} = \text{???}_{[2]}$$

$$\text{throughput}_{[3]} = \text{maxPartitionMovements} \times \text{estimatedDiskThroughput}$$

$$\text{estimatedTimeToCompletion} = \frac{\text{intraBrokerDataToMoveMB}}{\text{throughput}}$$

Notes
- [1] The maximum number of partition movements given Cruise Control partition movement cap}
- [2] We don't have a method of determining disk throughput
- [3] The throughput given the max allowed number of partition movements and disk throughput}

Given that its inclusion is not completely necessary and adds significant complexity to the proposal, it is out of scope for this proposal.

### Rejected Alternatives

#### Including “ExecutorState” in KafkaRebalance resource status
#### Maintaining progress fields in KafkaRebalance resource status

Given that some of the information in the Executor State is not relevant to user driven partition rebalances (e.g. triggeredSelfHealingTaskId and triggeredTaskReason) and can be quite verbose (e.g. pendingPartitionMovement list), it is best if we take what we take the high level details we need from the ExecutorState and store the rest somewhere else.
Given that the progress information included in a `KafkaRebalance` resource:
- Contains information irrelevant to user driven partition rebalances (e.g. triggeredSelfHealingTaskId, triggeredTaskReason)
- Can be quite verbose (e.g. pendingPartitionMovement list)
- Requires tracking a "last updated" timestamp and special logic in the rebalance operator to avoid trigger recursive operator reconciliations

it is best if we maintain the progress information somewhere else.

#### Including “ExecutorState” in “afterBeforeLoadConfigmap”

Keeping the ExecutorState in its own ConfigMap as opposed to storing it in the existing “afterBeforeLoadConfigMap” (1) leaves more room for Executor state information should we decide to enable “verbosity” parameter in the future and (2) leaves more room for the broker load information in the “afterBeforeLoadConfigMap”.
For smaller clusters, the space is not an issue but for larger production clusters with a larger number of brokers and partitions we run the risk of hitting the 1MB storage limit of the ConfigMap.
The cost of another ConfigMap is worth avoiding the risk of hitting the limit of the other.

0 comments on commit bc7f1ed

Please sign in to comment.