From 6c0081e75dad8dd1c25c80e4aac2619fbd90570b Mon Sep 17 00:00:00 2001 From: Knative Prow Robot Date: Tue, 3 Dec 2024 13:27:44 +0000 Subject: [PATCH] [release-1.16] MT-Broker: return retriable status code based on the state to leverage retries (#8367) * MT-Broker: return appropriate status code based on the state to leverage retries The ingress or filter deployments were returning 400 even in the case where a given resource (like trigger, broker, subscription) wasn't found, however, this is a common case where the lister cache hasn't caught up with the latest state. Signed-off-by: Pierangelo Di Pilato * Fix unit tests Signed-off-by: Pierangelo Di Pilato --------- Signed-off-by: Pierangelo Di Pilato Co-authored-by: Pierangelo Di Pilato --- pkg/broker/filter/filter_handler.go | 39 ++++++++++++++++++---- pkg/broker/filter/filter_handler_test.go | 12 ++++--- pkg/broker/ingress/ingress_handler.go | 8 ++++- pkg/broker/ingress/ingress_handler_test.go | 12 ++++--- 4 files changed, 53 insertions(+), 18 deletions(-) diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 08ddf1c1a4f..67640faadcc 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -26,6 +26,8 @@ import ( "net/http" "time" + apierrors "k8s.io/apimachinery/pkg/api/errors" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" messaginginformers "knative.dev/eventing/pkg/client/informers/externalversions/messaging/v1" "knative.dev/eventing/pkg/reconciler/broker/resources" @@ -178,16 +180,14 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { } trigger, err := h.getTrigger(triggerRef) - if err != nil { - h.logger.Info("Unable to get the Trigger", zap.Error(err), zap.Any("triggerRef", triggerRef)) - writer.WriteHeader(http.StatusBadRequest) + if apierrors.IsNotFound(err) { + h.logger.Info("Unable to find the Trigger", zap.Error(err), zap.Any("triggerRef", triggerRef)) + writer.WriteHeader(http.StatusNotFound) return } - - subscription, err := h.getSubscription(features, trigger) if err != nil { - h.logger.Info("Unable to get the Subscription of the Trigger", zap.Error(err), zap.Any("triggerRef", triggerRef)) - writer.WriteHeader(http.StatusInternalServerError) + h.logger.Info("Unable to get the Trigger", zap.Error(err), zap.Any("triggerRef", triggerRef)) + writer.WriteHeader(http.StatusBadRequest) return } @@ -216,6 +216,18 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { if features.IsOIDCAuthentication() { h.logger.Debug("OIDC authentication is enabled") + subscription, err := h.getSubscription(features, trigger) + if apierrors.IsNotFound(err) { + h.logger.Info("Unable to find the Subscription for trigger", zap.Error(err), zap.Any("triggerRef", triggerRef)) + writer.WriteHeader(http.StatusNotFound) + return + } + if err != nil { + h.logger.Info("Unable to get the Subscription of the Trigger", zap.Error(err), zap.Any("triggerRef", triggerRef)) + writer.WriteHeader(http.StatusInternalServerError) + return + } + audience := FilterAudience if subscription.Status.Auth == nil || subscription.Status.Auth.ServiceAccountName == nil { @@ -266,6 +278,11 @@ func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eve } broker, err := h.brokerLister.Brokers(brokerNamespace).Get(brokerName) + if apierrors.IsNotFound(err) { + h.logger.Info("Unable to get the Broker", zap.Error(err)) + writer.WriteHeader(http.StatusNotFound) + return + } if err != nil { h.logger.Info("Unable to get the Broker", zap.Error(err)) writer.WriteHeader(http.StatusBadRequest) @@ -311,6 +328,11 @@ func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *event brokerNamespace = trigger.Namespace } broker, err := h.brokerLister.Brokers(brokerNamespace).Get(brokerName) + if apierrors.IsNotFound(err) { + h.logger.Info("Unable to get the Broker", zap.Error(err)) + writer.WriteHeader(http.StatusNotFound) + return + } if err != nil { h.logger.Info("Unable to get the Broker", zap.Error(err)) writer.WriteHeader(http.StatusBadRequest) @@ -331,6 +353,9 @@ func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *event Audience: broker.Status.DeadLetterSinkAudience, } } + if target == nil { + return + } reportArgs := &ReportArgs{ ns: trigger.Namespace, diff --git a/pkg/broker/filter/filter_handler_test.go b/pkg/broker/filter/filter_handler_test.go index b15f2fde3df..a1a290524ca 100644 --- a/pkg/broker/filter/filter_handler_test.go +++ b/pkg/broker/filter/filter_handler_test.go @@ -27,11 +27,12 @@ import ( "testing" "time" - "knative.dev/eventing/pkg/eventingtls" filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered" "knative.dev/pkg/configmap" "knative.dev/pkg/system" + "knative.dev/eventing/pkg/eventingtls" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" "knative.dev/eventing/pkg/reconciler/broker/resources" @@ -64,10 +65,11 @@ import ( eventpolicyinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake" subscriptioninformerfake "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake" - // Fake injection client - _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake" _ "knative.dev/pkg/client/injection/kube/client/fake" _ "knative.dev/pkg/client/injection/kube/informers/factory/filtered/fake" + + // Fake injection client + _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake" ) const ( @@ -121,7 +123,7 @@ func TestReceiver(t *testing.T) { expectedStatus: http.StatusBadRequest, }, "Path too long": { - request: httptest.NewRequest(http.MethodPost, "/triggers/test-namespace/test-trigger/extra", nil), + request: httptest.NewRequest(http.MethodPost, "/triggers/test-namespace/test-trigger/uuid/extra/extra", nil), expectedStatus: http.StatusBadRequest, }, "Path without prefix": { @@ -130,7 +132,7 @@ func TestReceiver(t *testing.T) { }, "Trigger.Get fails": { // No trigger exists, so the Get will fail. - expectedStatus: http.StatusBadRequest, + expectedStatus: http.StatusNotFound, }, "Trigger doesn't have SubscriberURI": { triggers: []*eventingv1.Trigger{ diff --git a/pkg/broker/ingress/ingress_handler.go b/pkg/broker/ingress/ingress_handler.go index bdb817e6796..9888412ebce 100644 --- a/pkg/broker/ingress/ingress_handler.go +++ b/pkg/broker/ingress/ingress_handler.go @@ -23,6 +23,7 @@ import ( "strings" "time" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/utils/ptr" opencensusclient "github.com/cloudevents/sdk-go/observability/opencensus/v2/client" @@ -226,6 +227,11 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { } broker, err := h.getBroker(brokerName, brokerNamespace) + if apierrors.IsNotFound(err) { + h.Logger.Warn("Failed to retrieve broker", zap.Error(err)) + writer.WriteHeader(http.StatusNotFound) + return + } if err != nil { h.Logger.Warn("Failed to retrieve broker", zap.Error(err)) writer.WriteHeader(http.StatusBadRequest) @@ -315,7 +321,7 @@ func (h *Handler) receive(ctx context.Context, headers http.Header, event *cloud channelAddress, err := h.getChannelAddress(brokerObj) if err != nil { h.Logger.Warn("could not get channel address from broker", zap.Error(err)) - return http.StatusBadRequest, kncloudevents.NoDuration + return http.StatusInternalServerError, kncloudevents.NoDuration } opts := []kncloudevents.SendOption{ diff --git a/pkg/broker/ingress/ingress_handler_test.go b/pkg/broker/ingress/ingress_handler_test.go index f30174c1476..0950302c807 100644 --- a/pkg/broker/ingress/ingress_handler_test.go +++ b/pkg/broker/ingress/ingress_handler_test.go @@ -26,11 +26,12 @@ import ( "testing" "time" - "knative.dev/eventing/pkg/eventingtls" filteredconfigmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/filtered/fake" filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered" "knative.dev/pkg/system" + "knative.dev/eventing/pkg/eventingtls" + "github.com/cloudevents/sdk-go/v2/client" "github.com/cloudevents/sdk-go/v2/event" cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" @@ -54,10 +55,11 @@ import ( brokerinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker/fake" eventpolicyinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake" - // Fake injection client - _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake" _ "knative.dev/pkg/client/injection/kube/client/fake" _ "knative.dev/pkg/client/injection/kube/informers/factory/filtered/fake" + + // Fake injection client + _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake" ) const ( @@ -223,9 +225,9 @@ func TestHandler_ServeHTTP(t *testing.T) { method: nethttp.MethodPost, uri: "/ns/name", body: getValidEvent(), - statusCode: nethttp.StatusBadRequest, + statusCode: nethttp.StatusInternalServerError, handler: handler(), - reporter: &mockReporter{StatusCode: nethttp.StatusBadRequest, EventDispatchTimeReported: false}, + reporter: &mockReporter{StatusCode: nethttp.StatusInternalServerError, EventDispatchTimeReported: false}, defaulter: broker.TTLDefaulter(logger, 100), brokers: []*eventingv1.Broker{ withUninitializedAnnotations(makeBroker("name", "ns")),