diff --git a/controllers/keda/scaledobject_controller.go b/controllers/keda/scaledobject_controller.go index ece87ebfe32..9e79f624acc 100755 --- a/controllers/keda/scaledobject_controller.go +++ b/controllers/keda/scaledobject_controller.go @@ -180,7 +180,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request if !scaledObject.Status.Conditions.AreInitialized() { conditions := kedav1alpha1.GetInitializedConditions() if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, conditions); err != nil { - r.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.ScaledObjectUpdateFailed, err.Error()) + r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeWarning, eventemitter.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error()) return ctrl.Result{}, err } } @@ -192,18 +192,18 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request reqLogger.Error(err, msg) conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledObjectCheckFailed", msg) conditions.SetActiveCondition(metav1.ConditionUnknown, "UnknownState", "ScaledObject check failed") - r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeWarning, eventreason.ScaledObjectCheckFailed, msg) + r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeWarning, eventemitter.ScaledObjectFailedType, eventreason.ScaledObjectCheckFailed, msg) } else { wasReady := conditions.GetReadyCondition() if wasReady.IsFalse() || wasReady.IsUnknown() { - r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeNormal, eventreason.ScaledObjectReady, message.ScalerReadyMsg) + r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeNormal, eventemitter.ScaledObjectReadyType, eventreason.ScaledObjectReady, message.ScalerReadyMsg) } reqLogger.V(1).Info(msg) conditions.SetReadyCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionReadySuccessReason, msg) } if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, &conditions); err != nil { - r.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.ScaledObjectUpdateFailed, err.Error()) + r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeWarning, eventemitter.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error()) return ctrl.Result{}, err } diff --git a/pkg/eventemitter/cloudevent_http_handler.go b/pkg/eventemitter/cloudevent_http_handler.go index 7eedb7ce123..e02e64861db 100644 --- a/pkg/eventemitter/cloudevent_http_handler.go +++ b/pkg/eventemitter/cloudevent_http_handler.go @@ -35,10 +35,6 @@ import ( "github.com/kedacore/keda/v2/pkg/util" ) -const ( - cloudEventSourceType = "com.cloudeventsource.keda" -) - var ( kedaNamespace, _ = util.GetClusterObjectNamespace() ) @@ -97,7 +93,7 @@ func (c *CloudEventHTTPHandler) EmitEvent(eventData eventdata.EventData, failure event := cloudevents.NewEvent() event.SetSource(source) event.SetSubject(subject) - event.SetType(cloudEventSourceType) + event.SetType(eventData.EventType) if err := event.SetData(cloudevents.ApplicationJSON, EmitData{Reason: eventData.Reason, Message: eventData.Message}); err != nil { c.logger.Error(err, "Failed to set data to CloudEvents receiver") diff --git a/pkg/eventemitter/eventemitter.go b/pkg/eventemitter/eventemitter.go index 9c7f21693a5..1d1cbb4b55c 100644 --- a/pkg/eventemitter/eventemitter.go +++ b/pkg/eventemitter/eventemitter.go @@ -68,7 +68,7 @@ type EventEmitter struct { type EventHandler interface { DeleteCloudEventSource(cloudEventSource *eventingv1alpha1.CloudEventSource) error HandleCloudEventSource(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource) error - Emit(object runtime.Object, namesapce types.NamespacedName, eventType, reason, message string) + Emit(object runtime.Object, namesapce types.NamespacedName, eventType string, cloudeventType string, reason string, message string) } // EventDataHandler defines the behavior for different event handlers @@ -274,7 +274,7 @@ func (e *EventEmitter) checkEventHandlers(ctx context.Context, cloudEventSource } // Emit is emitting event to both local kubernetes and custom CloudEventSource handler. After emit event to local kubernetes, event will inqueue and waitng for handler's consuming. -func (e *EventEmitter) Emit(object runtime.Object, namesapce types.NamespacedName, eventType, reason, message string) { +func (e *EventEmitter) Emit(object runtime.Object, namesapce types.NamespacedName, eventType, cloudeventType, reason, message string) { e.recorder.Event(object, eventType, reason, message) e.eventHandlersCacheLock.RLock() @@ -287,9 +287,9 @@ func (e *EventEmitter) Emit(object runtime.Object, namesapce types.NamespacedNam objectType, _ := meta.NewAccessor().Kind(object) eventData := eventdata.EventData{ Namespace: namesapce.Namespace, + EventType: cloudeventType, ObjectName: strings.ToLower(objectName), ObjectType: strings.ToLower(objectType), - EventType: eventType, Reason: reason, Message: message, Time: time.Now().UTC(), diff --git a/pkg/eventemitter/eventtypes.go b/pkg/eventemitter/eventtypes.go new file mode 100644 index 00000000000..3d00a96f758 --- /dev/null +++ b/pkg/eventemitter/eventtypes.go @@ -0,0 +1,25 @@ +/* +Copyright 2023 The KEDA Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventemitter + +const ( + // ScaledObjectReadyType is for event when a new ScaledObject is ready + ScaledObjectReadyType = "keda.scaledobject.ready.v1" + + // ScaledObjectFailedType is for event when creating ScaledObject failed + ScaledObjectFailedType = "keda.scaledobject.failed.v1" +) diff --git a/tests/internals/cloudevent_source/cloudevent_source_test.go b/tests/internals/cloudevent_source/cloudevent_source_test.go index e0d16255469..01080cc4cd0 100644 --- a/tests/internals/cloudevent_source/cloudevent_source_test.go +++ b/tests/internals/cloudevent_source/cloudevent_source_test.go @@ -185,7 +185,7 @@ func testErrEventSourceEmitValue(t *testing.T, _ *kubernetes.Clientset, data tem err := cloudEvent.DataAs(&data) assert.NoError(t, err) assert.Equal(t, data["message"], "ScaledObject doesn't have correct scaleTargetRef specification") - assert.Equal(t, cloudEvent.Type(), "com.cloudeventsource.keda") + assert.Equal(t, cloudEvent.Type(), "keda.scaledobject.failed.v1") assert.Equal(t, cloudEvent.Source(), expectedSource) assert.Equal(t, cloudEvent.DataContentType(), "application/json") }