Skip to content

Commit

Permalink
Fix: Update CloudEventType to correct format (#5277)
Browse files Browse the repository at this point in the history
Signed-off-by: SpiritZhou <[email protected]>
  • Loading branch information
SpiritZhou authored Jan 4, 2024
1 parent 9be8ee6 commit 35b96df
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 13 deletions.
8 changes: 4 additions & 4 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
if !scaledObject.Status.Conditions.AreInitialized() {
conditions := kedav1alpha1.GetInitializedConditions()
if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, conditions); err != nil {
r.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.ScaledObjectUpdateFailed, err.Error())
r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeWarning, eventemitter.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error())
return ctrl.Result{}, err
}
}
Expand All @@ -192,18 +192,18 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
reqLogger.Error(err, msg)
conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledObjectCheckFailed", msg)
conditions.SetActiveCondition(metav1.ConditionUnknown, "UnknownState", "ScaledObject check failed")
r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeWarning, eventreason.ScaledObjectCheckFailed, msg)
r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeWarning, eventemitter.ScaledObjectFailedType, eventreason.ScaledObjectCheckFailed, msg)
} else {
wasReady := conditions.GetReadyCondition()
if wasReady.IsFalse() || wasReady.IsUnknown() {
r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeNormal, eventreason.ScaledObjectReady, message.ScalerReadyMsg)
r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeNormal, eventemitter.ScaledObjectReadyType, eventreason.ScaledObjectReady, message.ScalerReadyMsg)
}
reqLogger.V(1).Info(msg)
conditions.SetReadyCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionReadySuccessReason, msg)
}

if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, &conditions); err != nil {
r.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.ScaledObjectUpdateFailed, err.Error())
r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeWarning, eventemitter.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error())
return ctrl.Result{}, err
}

Expand Down
6 changes: 1 addition & 5 deletions pkg/eventemitter/cloudevent_http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ import (
"github.com/kedacore/keda/v2/pkg/util"
)

const (
cloudEventSourceType = "com.cloudeventsource.keda"
)

var (
kedaNamespace, _ = util.GetClusterObjectNamespace()
)
Expand Down Expand Up @@ -97,7 +93,7 @@ func (c *CloudEventHTTPHandler) EmitEvent(eventData eventdata.EventData, failure
event := cloudevents.NewEvent()
event.SetSource(source)
event.SetSubject(subject)
event.SetType(cloudEventSourceType)
event.SetType(eventData.EventType)

if err := event.SetData(cloudevents.ApplicationJSON, EmitData{Reason: eventData.Reason, Message: eventData.Message}); err != nil {
c.logger.Error(err, "Failed to set data to CloudEvents receiver")
Expand Down
6 changes: 3 additions & 3 deletions pkg/eventemitter/eventemitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type EventEmitter struct {
type EventHandler interface {
DeleteCloudEventSource(cloudEventSource *eventingv1alpha1.CloudEventSource) error
HandleCloudEventSource(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource) error
Emit(object runtime.Object, namesapce types.NamespacedName, eventType, reason, message string)
Emit(object runtime.Object, namesapce types.NamespacedName, eventType string, cloudeventType string, reason string, message string)
}

// EventDataHandler defines the behavior for different event handlers
Expand Down Expand Up @@ -274,7 +274,7 @@ func (e *EventEmitter) checkEventHandlers(ctx context.Context, cloudEventSource
}

// Emit is emitting event to both local kubernetes and custom CloudEventSource handler. After emit event to local kubernetes, event will inqueue and waitng for handler's consuming.
func (e *EventEmitter) Emit(object runtime.Object, namesapce types.NamespacedName, eventType, reason, message string) {
func (e *EventEmitter) Emit(object runtime.Object, namesapce types.NamespacedName, eventType, cloudeventType, reason, message string) {
e.recorder.Event(object, eventType, reason, message)

e.eventHandlersCacheLock.RLock()
Expand All @@ -287,9 +287,9 @@ func (e *EventEmitter) Emit(object runtime.Object, namesapce types.NamespacedNam
objectType, _ := meta.NewAccessor().Kind(object)
eventData := eventdata.EventData{
Namespace: namesapce.Namespace,
EventType: cloudeventType,
ObjectName: strings.ToLower(objectName),
ObjectType: strings.ToLower(objectType),
EventType: eventType,
Reason: reason,
Message: message,
Time: time.Now().UTC(),
Expand Down
25 changes: 25 additions & 0 deletions pkg/eventemitter/eventtypes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
Copyright 2023 The KEDA Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package eventemitter

const (
// ScaledObjectReadyType is for event when a new ScaledObject is ready
ScaledObjectReadyType = "keda.scaledobject.ready.v1"

// ScaledObjectFailedType is for event when creating ScaledObject failed
ScaledObjectFailedType = "keda.scaledobject.failed.v1"
)
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func testErrEventSourceEmitValue(t *testing.T, _ *kubernetes.Clientset, data tem
err := cloudEvent.DataAs(&data)
assert.NoError(t, err)
assert.Equal(t, data["message"], "ScaledObject doesn't have correct scaleTargetRef specification")
assert.Equal(t, cloudEvent.Type(), "com.cloudeventsource.keda")
assert.Equal(t, cloudEvent.Type(), "keda.scaledobject.failed.v1")
assert.Equal(t, cloudEvent.Source(), expectedSource)
assert.Equal(t, cloudEvent.DataContentType(), "application/json")
}
Expand Down

0 comments on commit 35b96df

Please sign in to comment.