Skip to content

Commit

Permalink
[release v1.11] Batch of backports (#834)
Browse files Browse the repository at this point in the history
* Make SecretSpec field of consumers Auth omitempty (#780)

* Expose init offset and schedule metrics for ConsumerGroup reconciler (#790) (#791)

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Fix channel finalizer logic (knative-extensions#3295) (#795)

Signed-off-by: Calum Murray <[email protected]>
Co-authored-by: Calum Murray <[email protected]>
Co-authored-by: Pierangelo Di Pilato <[email protected]>

* [release-v1.10] SRVKE-958: Cache init offsets results (#817)

* Cache init offsets results

When there is high load and multiple consumer group schedule calls,
we get many `dial tcp 10.130.4.8:9092: i/o timeout` errors when
trying to connect to Kafka.
This leads to increased "time to readiness" for consumer groups.

The downside of caching is that, in the case, partitions
increase while the result is cached we won't initialize
the offsets of the new partitions.

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Add autoscaler leader log patch

Signed-off-by: Pierangelo Di Pilato <[email protected]>

---------

Signed-off-by: Pierangelo Di Pilato <[email protected]>
Co-authored-by: Pierangelo Di Pilato <[email protected]>

* Scheduler handle overcommitted pods (#820)

Signed-off-by: Pierangelo Di Pilato <[email protected]>
Co-authored-by: Pierangelo Di Pilato <[email protected]>

* Set consumer and consumergroups finalizers when creating them (#823)

It is possible that a delete consumer or consumergroup might
be reconciled and never finalized when it is deleted before
the finalizer is set.
This happens because the Knative generated reconciler uses
patch (as opposed to using update) for setting the finalizer
and patch doesn't have any optimistic concurrency controls.

Signed-off-by: Pierangelo Di Pilato <[email protected]>
Co-authored-by: Pierangelo Di Pilato <[email protected]>

* Clean up reserved from resources that have been scheduled (#830)

In a recent testing run, we've noticed we have have a scheduled
`ConsumerGroup` [1] (see placements) being considered having
reserved replicas in a different pod [2].

That makes the scheduler think that there is no space but the
autoscaler says we have enough space to hold every virtual
replica.

[1]
```
$ k describe consumergroups -n ks-multi-ksvc-0 c9ee3490-5b4b-4d11-87af-8cb2219d9fe3
Name:         c9ee3490-5b4b-4d11-87af-8cb2219d9fe3
Namespace:    ks-multi-ksvc-0
...
Status:
  Conditions:
    Last Transition Time:  2023-09-06T19:58:27Z
    Reason:                Autoscaler is disabled
    Status:                True
    Type:                  Autoscaler
    Last Transition Time:  2023-09-06T21:41:13Z
    Status:                True
    Type:                  Consumers
    Last Transition Time:  2023-09-06T19:58:27Z
    Status:                True
    Type:                  ConsumersScheduled
    Last Transition Time:  2023-09-06T21:41:13Z
    Status:                True
    Type:                  Ready
  Observed Generation:     1
  Placements:
    Pod Name:      kafka-source-dispatcher-6
    Vreplicas:     4
    Pod Name:      kafka-source-dispatcher-7
    Vreplicas:     4
  Replicas:        8
  Subscriber Uri:  http://receiver5-2.ks-multi-ksvc-0.svc.cluster.local
Events:            <none>
```

[2]
```
    "ks-multi-ksvc-0/c9ee3490-5b4b-4d11-87af-8cb2219d9fe3": {
      "kafka-source-dispatcher-3": 8
    },
```

Signed-off-by: Pierangelo Di Pilato <[email protected]>
Co-authored-by: Pierangelo Di Pilato <[email protected]>

* Ignore unknown fields in data plane contract (knative-extensions#3335) (#828)

Signed-off-by: Calum Murray <[email protected]>

---------

Signed-off-by: Pierangelo Di Pilato <[email protected]>
Signed-off-by: Calum Murray <[email protected]>
Co-authored-by: Martin Gencur <[email protected]>
Co-authored-by: Matthias Wessendorf <[email protected]>
Co-authored-by: Calum Murray <[email protected]>
Co-authored-by: OpenShift Cherrypick Robot <[email protected]>
  • Loading branch information
5 people authored Sep 20, 2023
1 parent b809928 commit 90c61a5
Show file tree
Hide file tree
Showing 24 changed files with 408 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,13 @@ func (c *ConsumerGroup) GetStatus() *duckv1.Status {
func (cg *ConsumerGroup) ConsumerFromTemplate(options ...ConsumerOption) *Consumer {
// TODO figure out naming strategy, is generateName enough?
c := &Consumer{
ObjectMeta: cg.Spec.Template.ObjectMeta,
Spec: cg.Spec.Template.Spec,
ObjectMeta: *cg.Spec.Template.ObjectMeta.DeepCopy(),
Spec: *cg.Spec.Template.Spec.DeepCopy(),
}

ownerRef := metav1.NewControllerRef(cg, ConsumerGroupGroupVersionKind)
c.OwnerReferences = append(c.OwnerReferences, *ownerRef)
c.Finalizers = []string{"consumers.internal.kafka.eventing.knative.dev"}

for _, opt := range options {
opt(c)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ type Auth struct {
NetSpec *bindings.KafkaNetSpec
// Deprecated, use secret spec
AuthSpec *eventingv1alpha1.Auth `json:"AuthSpec,omitempty"`
SecretSpec *SecretSpec
SecretSpec *SecretSpec `json:"SecretSpec,omitempty"`
}

type SecretSpec struct {
Expand Down
6 changes: 5 additions & 1 deletion control-plane/pkg/reconciler/base/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ const (
Json = "json"
)

var (
jsonUnmarshalOptions = protojson.UnmarshalOptions{DiscardUnknown: true}
)

// Base reconciler for broker and trigger reconciler.
// It contains common logic for both trigger and broker reconciler.
type Reconciler struct {
Expand Down Expand Up @@ -213,7 +217,7 @@ func GetDataPlaneConfigMapData(logger *zap.Logger, dataPlaneConfigMap *corev1.Co
case Protobuf:
err = proto.Unmarshal(dataPlaneDataRaw, ct)
case Json:
err = protojson.Unmarshal(dataPlaneDataRaw, ct)
err = jsonUnmarshalOptions.Unmarshal(dataPlaneDataRaw, ct)
}
if err != nil {

Expand Down
66 changes: 66 additions & 0 deletions control-plane/pkg/reconciler/base/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,25 @@ func TestGetDataPlaneConfigMapDataCorrupted(t *testing.T) {
require.Equal(t, uint64(0), got.Generation)
}

func TestGetDataPlaneConfigMapDataUnknownField(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)

r := &base.Reconciler{
KubeClient: kubeclient.Get(ctx),
ContractConfigMapFormat: base.Json,
}

cm := &corev1.ConfigMap{
BinaryData: map[string][]byte{
base.ConfigMapDataKey: []byte(dataPlaneContractExtraData),
},
}

got, err := r.GetDataPlaneConfigMapData(logging.FromContext(ctx).Desugar(), cm)
require.Nil(t, err)
require.Equal(t, uint64(11), got.Generation)
}

func TestUpdateReceiverPodAnnotation(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)

Expand Down Expand Up @@ -297,3 +316,50 @@ func addRunningPod(store cache.Store, kc kubernetes.Interface, label string) {
panic(err)
}
}

const dataPlaneContractExtraData = `{
"generation": "11",
"resources": [
{
"uid": "50a30fb7-9710-45f5-9724-9a7ebb677a29",
"topics": [
"knative-messaging-kafka.eventing-e2e17.sut"
],
"bootstrapServers": "my-cluster-kafka-bootstrap.kafka:9092",
"ingress": {
"host": "sut-kn-channel.eventing-e2e17.svc.cluster.local"
},
"egressConfig": {
"retry": 12,
"backoffDelay": "1000"
},
"egresses": [
{
"consumerGroup": "kafka.eventing-e2e17.sut.e21ad4f4-bf2d-4fe2-879a-728bb9d5626d",
"destination": "http://wathola-receiver.eventing-e2e17.svc.cluster.local",
"discardReply": {},
"uid": "e21ad4f4-bf2d-4fe2-879a-728bb9d5626d",
"egressConfig": {
"retry": 12,
"backoffDelay": "1000"
},
"deliveryOrder": "ORDERED",
"reference": {
"uuid": "e21ad4f4-bf2d-4fe2-879a-728bb9d5626d",
"namespace": "eventing-e2e17",
"name": "sut"
}
}
],
"reference": {
"uuid": "50a30fb7-9710-45f5-9724-9a7ebb677a29",
"namespace": "eventing-e2e17",
"name": "sut",
"kind": "KafkaChannel",
"groupVersion": "messaging.knative.dev/v1beta1"
},
"extraKey": "extraValue"
}
]
}
`
5 changes: 4 additions & 1 deletion control-plane/pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ const (
NewChannelIngressServiceName = "kafka-channel-ingress"
kafkaChannelTLSSecretName = "kafka-channel-ingress-server-tls" //nolint:gosec // This is not a hardcoded credential
caCertsSecretKey = "ca.crt"
// TopicPrefix is the old Kafka Channel topic prefix - we keep this constant so that deleting channels shortly after upgrading
// does not have issues. See https://github.com/knative-extensions/eventing-kafka-broker/issues/3289 for more info
TopicPrefix = "knative-messaging-kafka"
)

type Reconciler struct {
Expand Down Expand Up @@ -494,7 +497,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, channel *messagingv1beta1

topicName, ok := channel.Status.Annotations[kafka.TopicAnnotation]
if !ok {
return fmt.Errorf("no topic annotated on channel")
topicName = kafka.ChannelTopic(TopicPrefix, channel)
}
topic, err := kafka.DeleteTopic(kafkaClusterAdminClient, topicName)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion control-plane/pkg/reconciler/consumergroup/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"knative.dev/eventing-kafka-broker/control-plane/pkg/security"
)

func (r Reconciler) newAuthConfigOption(ctx context.Context, cg *kafkainternals.ConsumerGroup) (kafka.ConfigOption, error) {
func (r *Reconciler) newAuthConfigOption(ctx context.Context, cg *kafkainternals.ConsumerGroup) (kafka.ConfigOption, error) {
var secret *corev1.Secret

if hasSecretSpecConfig(cg.Spec.Template.Spec.Auth) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"knative.dev/eventing-kafka-broker/control-plane/pkg/autoscaler"
)

func (r Reconciler) autoscalerDefaultsFromConfigMap(ctx context.Context, configMapName string) (*autoscaler.AutoscalerConfig, error) {
func (r *Reconciler) autoscalerDefaultsFromConfigMap(ctx context.Context, configMapName string) (*autoscaler.AutoscalerConfig, error) {
cm, err := r.KubeClient.CoreV1().ConfigMaps(r.SystemNamespace).Get(ctx, configMapName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error while retrieving the %s config map in namespace %s: %+v", configMapName, r.SystemNamespace, err)
Expand Down
Loading

0 comments on commit 90c61a5

Please sign in to comment.