Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

## Description #1080

Closed
wants to merge 29 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
d2b8021
[INTERNAL] Update helm for adobe builds (#52)
alungu Feb 7, 2023
a5ea111
[INTERNAL] Use local replacement for sub-modules (#54)
aguzovatii Apr 6, 2023
c1173d0
[INTERNAL] [BUILD] Publish docker images to adobe/kafka-operator and …
amuraru Apr 11, 2020
e5152f7
[INTERNAL] make manifests should be called manually, if needed (#25)
amuraru Jul 28, 2021
fe9adaf
[INTERNAL] Allow Kafka to use External DNS for inter-broker protocol …
amuraru May 17, 2021
8ca9d51
[INTERNAL] Allow external listeners to be used for inner communicatio…
amuraru May 17, 2021
4335c20
[INTERNAL] Ensure external listerners are always the first the advert…
alungu Sep 2, 2021
685da4e
[INTERNAL] Generate CRDs resources
amuraru Sep 2, 2021
84345fc
[INTERNAL] Build kafka 3.4.1 using Oracle OpenJDK
amuraru Sep 21, 2021
9b060e2
Enable envoy idleTimeout and TCP keep-alive for connections to kafka …
amuraru Oct 7, 2021
450ebe7
Enable envoy tls termination (#41)
dobrerazvan Mar 17, 2022
eea16ab
Envoy config generated by the operator is invalid in envoy 1.22
amuraru Jun 23, 2022
ecf3223
Added TaintedBrokersSelector to kafkaClusterSpec (#48)
azun Aug 11, 2022
a40d5fe
Remove disk feature (#53)
amuraru May 15, 2023
f023d28
Handle async response from CC /state endpoint (#56)
aguzovatii May 16, 2023
514fa07
Allow concurrent broker restarts from same AZ (broker rack) (#62)
amuraru Jun 10, 2023
16a9fc2
Fix flaky test by deleting nodeports explicitly (#67)
ctrlaltluc Jun 13, 2023
cdfb6b9
Allow dashes when parsing broker rack (#68)
ctrlaltluc Jun 28, 2023
18e7253
Upgrade Kafka to 3.6.0 (#69)
ctrlaltluc Oct 11, 2023
edb7ebf
Upgrade dependencies
amuraru Dec 12, 2023
5f78c06
Fix wrong port on expectEnvoyWithConfigAz2Tls test (#70)
dobrerazvan Dec 19, 2023
7383921
Upgrade Kafka to 3.6.1 (#71)
cristianpetrache Dec 22, 2023
8172c6e
Upgrade Kafka image to use Java v21 (#72)
amuraru Feb 16, 2024
5bcb31f
Added arm64 to docker build platforms (#73)
azun Feb 28, 2024
2b16b29
Upgrading Kafka to 3.7.0 (#77)
azun Mar 26, 2024
aa25e15
Update codeql-analysis.yml (#78)
amuraru Apr 8, 2024
87b08a1
Upgrade dependencies
amuraru Apr 8, 2024
bb519ee
[INTERNAL] Create uniq leader ID per operator deployment (#76)
dobrerazvan Apr 16, 2024
9fa0856
Merge branch 'master' into updeps
azun Apr 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Allow concurrent broker restarts from same AZ (broker rack) (#62)
  • Loading branch information
amuraru committed Dec 12, 2023
commit 514fa07bccf06325955be0e4e4b8ebb6274776b3
2 changes: 2 additions & 0 deletions api/go.mod
Original file line number Diff line number Diff line change
@@ -34,3 +34,5 @@ require (
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
)
// remove once https://github.com/cert-manager/cert-manager/issues/5953 is fixed
replace github.com/Venafi/vcert/v4 => github.com/jetstack/vcert/v4 v4.9.6-0.20230127103832-3aa3dfd6613d
8 changes: 3 additions & 5 deletions api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
@@ -214,17 +214,15 @@ type RollingUpgradeConfig struct {
// alerts with 'rollingupgrade'
FailureThreshold int `json:"failureThreshold"`

// ConcurrentBrokerRestartCountPerRack controls how many brokers can be restarted in parallel during a rolling upgrade. If
// ConcurrentBrokerRestartsAllowed controls how many brokers can be restarted in parallel during a rolling upgrade. If
// it is set to a value greater than 1, the operator will restart up to that amount of brokers in parallel, if the
// brokers are within the same rack (as specified by "broker.rack" in broker read-only configs). Since using Kafka broker
// racks spreads out the replicas, we know that restarting multiple brokers in the same rack will not cause more than
// 1/Nth of the replicas of a topic-partition to be unavailable at the same time, where N is the number of racks used.
// This is a safe way to speed up the rolling upgrade. Note that for the rack distribution explained above, Cruise Control
// requires `com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareDistributionGoal` to be configured. Default value is 1.
// +kubebuilder:validation:Minimum=1
// +kubebuilder:default=1
// requires `com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareDistributionGoal` to be configured.
// +optional
ConcurrentBrokerRestartCountPerRack int `json:"concurrentBrokerRestartCountPerRack,omitempty"`
ConcurrentBrokerRestartsAllowed int `json:"concurrentBrokerRestartsAllowed,omitempty"`
}

// DisruptionBudget defines the configuration for PodDisruptionBudget where the workload is managed by the kafka-operator
10 changes: 4 additions & 6 deletions charts/kafka-operator/crds/kafkaclusters.yaml
Original file line number Diff line number Diff line change
@@ -21556,10 +21556,9 @@ spec:
description: RollingUpgradeConfig defines the desired config of the
RollingUpgrade
properties:
concurrentBrokerRestartCountPerRack:
default: 1
description: ConcurrentBrokerRestartCountPerRack controls how
many brokers can be restarted in parallel during a rolling upgrade.
concurrentBrokerRestartsAllowed:
description: ConcurrentBrokerRestartsAllowed controls how many
brokers can be restarted in parallel during a rolling upgrade.
If it is set to a value greater than 1, the operator will restart
up to that amount of brokers in parallel, if the brokers are
within the same rack (as specified by "broker.rack" in broker
@@ -21570,8 +21569,7 @@ spec:
N is the number of racks used. This is a safe way to speed up
the rolling upgrade. Note that for the rack distribution explained
above, Cruise Control requires `com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareDistributionGoal`
to be configured. Default value is 1.
minimum: 1
to be configured.
type: integer
failureThreshold:
description: FailureThreshold controls how many failures the cluster
10 changes: 4 additions & 6 deletions config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml
Original file line number Diff line number Diff line change
@@ -21556,10 +21556,9 @@ spec:
description: RollingUpgradeConfig defines the desired config of the
RollingUpgrade
properties:
concurrentBrokerRestartCountPerRack:
default: 1
description: ConcurrentBrokerRestartCountPerRack controls how
many brokers can be restarted in parallel during a rolling upgrade.
concurrentBrokerRestartsAllowed:
description: ConcurrentBrokerRestartsAllowed controls how many
brokers can be restarted in parallel during a rolling upgrade.
If it is set to a value greater than 1, the operator will restart
up to that amount of brokers in parallel, if the brokers are
within the same rack (as specified by "broker.rack" in broker
@@ -21570,8 +21569,7 @@ spec:
N is the number of racks used. This is a safe way to speed up
the rolling upgrade. Note that for the rack distribution explained
above, Cruise Control requires `com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareDistributionGoal`
to be configured. Default value is 1.
minimum: 1
to be configured.
type: integer
failureThreshold:
description: FailureThreshold controls how many failures the cluster
18 changes: 11 additions & 7 deletions controllers/cruisecontroltask_controller_test.go
Original file line number Diff line number Diff line change
@@ -19,10 +19,11 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"go.uber.org/mock/gomock"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/banzaicloud/koperator/pkg/scale"

@@ -370,8 +371,11 @@ func TestCreateCCOperation(t *testing.T) {
},
}

mockCtrl := gomock.NewController(t)

for _, testCase := range testCases {
mockClient := new(mocks.Client)
mockClient := mocks.NewMockClient(mockCtrl)
mockSubResourceClient := mocks.NewMockSubResourceClient(mockCtrl)
scheme := runtime.NewScheme()
_ = v1beta1.AddToScheme(scheme)
_ = corev1.AddToScheme(scheme)
@@ -389,17 +393,17 @@ func TestCreateCCOperation(t *testing.T) {

// Mock the Create call and capture the operation
var createdOperation *banzaiv1alpha1.CruiseControlOperation
mockClient.On("Create", ctx, mock.IsType(&banzaiv1alpha1.CruiseControlOperation{})).Run(func(args mock.Arguments) {
createdOperation = args.Get(1).(*banzaiv1alpha1.CruiseControlOperation)
mockClient.EXPECT().Create(ctx, gomock.AssignableToTypeOf(&banzaiv1alpha1.CruiseControlOperation{})).Do(func(ctx context.Context, obj client.Object, opts ...client.CreateOption) {
createdOperation = obj.(*banzaiv1alpha1.CruiseControlOperation)
createdOperation.ObjectMeta.Name = "generated-name"
}).Return(nil)

// Mock the Status call
mockClient.On("Status").Return(mockClient)
mockClient.EXPECT().Status().Return(mockSubResourceClient)

// Mock the Update call
mockClient.On("Update", ctx, mock.IsType(&banzaiv1alpha1.CruiseControlOperation{})).Run(func(args mock.Arguments) {
arg := args.Get(1).(*banzaiv1alpha1.CruiseControlOperation)
mockSubResourceClient.EXPECT().Update(ctx, gomock.AssignableToTypeOf(&banzaiv1alpha1.CruiseControlOperation{})).Do(func(ctx context.Context, obj client.Object, opts ...client.UpdateOption) {
arg := obj.(*banzaiv1alpha1.CruiseControlOperation)
createdOperation.Status = arg.Status
}).Return(nil)

2 changes: 1 addition & 1 deletion controllers/tests/common_test.go
Original file line number Diff line number Diff line change
@@ -124,7 +124,7 @@ func createMinimalKafkaClusterCR(name, namespace string) *v1beta1.KafkaCluster {
CCJMXExporterConfig: "custom_property: custom_value",
},
ReadOnlyConfig: "cruise.control.metrics.topic.auto.create=true",
RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{FailureThreshold: 1, ConcurrentBrokerRestartCountPerRack: 1},
RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{FailureThreshold: 1, ConcurrentBrokerRestartsAllowed: 1},
},
}
}
4 changes: 2 additions & 2 deletions controllers/tests/cruisecontroloperation_controller_test.go
Original file line number Diff line number Diff line change
@@ -307,7 +307,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("there is an errored remove_disks and a rebalance disks operation for the same broker", Serial, func() {
JustBeforeEach(func() {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock6())
cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock6())
// Remove_disk operation - errored
operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName())
err := k8sClient.Create(context.Background(), &operation)
@@ -361,7 +361,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
})
When("Cruise Control makes the Status operation async", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock7())
cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock7())
operation := generateCruiseControlOperation("add-broker-operation", namespace, kafkaCluster.GetName())
err := k8sClient.Create(ctx, &operation)
Expect(err).NotTo(HaveOccurred())
21 changes: 12 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
@@ -18,14 +18,14 @@ require (
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32
github.com/go-logr/logr v1.2.4
github.com/imdario/mergo v0.3.13
github.com/onsi/ginkgo/v2 v2.9.2
github.com/onsi/gomega v1.27.6
github.com/onsi/ginkgo/v2 v2.9.7
github.com/onsi/gomega v1.27.8
github.com/pavlo-v-chernykh/keystore-go/v4 v4.4.1
github.com/prometheus/common v0.37.0
github.com/stretchr/testify v1.8.1
go.uber.org/mock v0.2.0
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1
google.golang.org/protobuf v1.28.1
gopkg.in/inf.v0 v0.9.1
gotest.tools v2.2.0+incompatible
@@ -40,7 +40,7 @@ require (
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 // indirect
github.com/stretchr/objx v0.5.0 // indirect
golang.org/x/tools v0.7.0 // indirect
golang.org/x/tools v0.9.1 // indirect
)

require (
@@ -113,12 +113,12 @@ require (
github.com/wayneashleyberry/terminal-dimensions v1.0.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/crypto v0.7.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/crypto v0.5.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/oauth2 v0.4.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/term v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/term v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/time v0.3.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
@@ -143,3 +143,6 @@ replace (
github.com/gogo/protobuf => github.com/waynz0r/protobuf v1.3.3-0.20210811122234-64636cae0910
github.com/golang/protobuf => github.com/luciferinlove/protobuf v0.0.0-20220913214010-c63936d75066
)

// remove once https://github.com/cert-manager/cert-manager/issues/5953 is fixed
replace github.com/Venafi/vcert/v4 => github.com/jetstack/vcert/v4 v4.9.6-0.20230127103832-3aa3dfd6613d
Loading