From 3193e5ff18d48d85ec80b3dc5948330735fccb7c Mon Sep 17 00:00:00 2001 From: Dalia Cervantes Date: Mon, 20 Jan 2025 12:00:21 -0600 Subject: [PATCH 1/4] Remove not used Kafka topic platform.topological-inventory.operations-satellite --- service/availability_check.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/service/availability_check.go b/service/availability_check.go index 76f536bb..9e0fd8be 100644 --- a/service/availability_check.go +++ b/service/availability_check.go @@ -11,7 +11,6 @@ import ( "strconv" "time" - "github.com/RedHatInsights/sources-api-go/config" "github.com/RedHatInsights/sources-api-go/dao" "github.com/RedHatInsights/sources-api-go/kafka" l "github.com/RedHatInsights/sources-api-go/logger" @@ -25,7 +24,6 @@ const ( disconnectedRhc = "cloud-connector returned 'disconnected'" unavailableRhc = "cloud-connector returned a non-ok exit code for this connection" - satelliteRequestedTopic = "platform.topological-inventory.operations-satellite" ) type availabilityCheckRequester struct { @@ -50,9 +48,6 @@ type availabilityChecker interface { } var ( - // storing the satellite topic here since it doesn't change after initial - // startup. - satelliteTopic = config.Get().KafkaTopic(satelliteRequestedTopic) // cloud connector related fields cloudConnectorUrl = os.Getenv("CLOUD_CONNECTOR_AVAILABILITY_CHECK_URL") cloudConnectorPsk = os.Getenv("CLOUD_CONNECTOR_PSK") @@ -160,7 +155,6 @@ func (acr availabilityCheckRequester) EndpointAvailabilityCheck(source *m.Source // instantiate a producer for this source writer, err := kafka.GetWriter(&kafka.Options{ BrokerConfig: conf.KafkaBrokerConfig, - Topic: satelliteTopic, Logger: acr.Logger(), }) if err != nil { From 3cdb0de6ce409c3cc6a32c9e76f57d22bd19d25b Mon Sep 17 00:00:00 2001 From: Dalia Cervantes Date: Mon, 20 Jan 2025 12:32:38 -0600 Subject: [PATCH 2/4] Removing all other things associated with satellite --- service/availability_check.go | 69 ------------------------------ service/availability_check_test.go | 23 ---------- 2 files changed, 92 deletions(-) diff --git a/service/availability_check.go b/service/availability_check.go index 9e0fd8be..ef5375f2 100644 --- a/service/availability_check.go +++ b/service/availability_check.go @@ -16,7 +16,6 @@ import ( l "github.com/RedHatInsights/sources-api-go/logger" h "github.com/RedHatInsights/sources-api-go/middleware/headers" m "github.com/RedHatInsights/sources-api-go/model" - "github.com/RedHatInsights/sources-api-go/util" "github.com/labstack/echo/v4" ) @@ -34,12 +33,10 @@ type availabilityCheckRequester struct { type availabilityChecker interface { // public methods ApplicationAvailabilityCheck(source *m.Source) - EndpointAvailabilityCheck(source *m.Source) RhcConnectionAvailabilityCheck(source *m.Source, headers []kafka.Header) // private methods httpAvailabilityRequest(source *m.Source, app *m.Application, uri *url.URL) - publishSatelliteMessage(writer *kafka.Writer, source *m.Source, endpoint *m.Endpoint) pingRHC(source *m.Source, rhcConnection *m.RhcConnection, headers []kafka.Header) updateRhcStatus(source *m.Source, status string, errstr string, rhcConnection *m.RhcConnection, headers []kafka.Header) @@ -68,8 +65,6 @@ func RequestAvailabilityCheck(c echo.Context, source *m.Source, headers []kafka. // of overwriting the status set by the RHC check if len(source.SourceRhcConnections) != 0 { ac.RhcConnectionAvailabilityCheck(source, headers) - } else if len(source.Endpoints) != 0 { - ac.EndpointAvailabilityCheck(source) } ac.Logger().Infof("Finished Publishing Availability Messages for Source %v", source.ID) @@ -133,70 +128,6 @@ func (acr availabilityCheckRequester) httpAvailabilityRequest(source *m.Source, } } -// codified version of what we were sending over kafka. The satellite operations -// worker picks this message up and makes the proper requests to the -// platform-receptor-controller. -type satelliteAvailabilityMessage struct { - SourceID string `json:"source_id"` - SourceUID *string `json:"source_uid"` - SourceRef *string `json:"source_ref"` - ExternalTenant string `json:"external_tenant"` -} - -// sends off an availability check kafka message for each of the source's -// endpoints but only if the source is of type satellite - we do not support any -// other operations currently (legacy behavior) -func (acr availabilityCheckRequester) EndpointAvailabilityCheck(source *m.Source) { - if source.SourceType.Name != "satellite" { - acr.Logger().Infof("Skipping Endpoint availability check for non-satellite source type") - return - } - - // instantiate a producer for this source - writer, err := kafka.GetWriter(&kafka.Options{ - BrokerConfig: conf.KafkaBrokerConfig, - Logger: acr.Logger(), - }) - if err != nil { - acr.Logger().Errorf(`[source_id: %d] unable to create a Kafka writer for the endpoint availability check: %s`, source.ID, err) - return - } - - l.Log.Infof("Publishing message for Source [%v] topic [%v] ", source.ID, writer.Topic) - for _, endpoint := range source.Endpoints { - acr.publishSatelliteMessage(writer, source, &endpoint) - } -} - -func (acr availabilityCheckRequester) publishSatelliteMessage(writer *kafka.Writer, source *m.Source, endpoint *m.Endpoint) { - acr.Logger().Infof("[source_id: %d] Requesting Availability Check for Endpoint %v", source.ID, endpoint.ID) - defer kafka.CloseWriter(writer, "publish satellite message") - - msg := &kafka.Message{} - err := msg.AddValueAsJSON(map[string]interface{}{ - "params": satelliteAvailabilityMessage{ - SourceID: strconv.FormatInt(source.ID, 10), - SourceUID: source.Uid, - SourceRef: source.SourceRef, - ExternalTenant: source.Tenant.ExternalTenant, - }}) - if err != nil { - acr.Logger().Warnf("Failed to add struct value as json to kafka message") - return - } - - msg.AddHeaders([]kafka.Header{ - {Key: "event_type", Value: []byte("Source.availability_check")}, - {Key: "encoding", Value: []byte("json")}, - {Key: h.XRHID, Value: []byte(util.GeneratedXRhIdentity(source.Tenant.ExternalTenant, source.Tenant.OrgID))}, - {Key: h.AccountNumber, Value: []byte(endpoint.Tenant.ExternalTenant)}, - }) - - if err = kafka.Produce(writer, msg); err != nil { - acr.Logger().Warnf("Failed to produce kafka message for Source %v, error: %v", source.ID, err) - } -} - type rhcConnectionStatusResponse struct { Status string `json:"status"` } diff --git a/service/availability_check_test.go b/service/availability_check_test.go index ca3694a6..793519f3 100644 --- a/service/availability_check_test.go +++ b/service/availability_check_test.go @@ -31,13 +31,6 @@ func (c *dummyChecker) ApplicationAvailabilityCheck(source *m.Source) { } } -// send out a satellite kafka message per endpoint -func (c *dummyChecker) EndpointAvailabilityCheck(source *m.Source) { - for i := 0; i < len(source.Endpoints); i++ { - c.publishSatelliteMessage(&kafka.Writer{}, source, &source.Endpoints[i]) - } -} - // ping RHC for each RHC Connection func (c *dummyChecker) RhcConnectionAvailabilityCheck(source *m.Source, headers []kafka.Header) { for i := 0; i < len(source.SourceRhcConnections); i++ { @@ -49,9 +42,6 @@ func (c *dummyChecker) RhcConnectionAvailabilityCheck(source *m.Source, headers func (c *dummyChecker) httpAvailabilityRequest(source *m.Source, app *m.Application, uri *url.URL) { c.ApplicationCounter++ } -func (c *dummyChecker) publishSatelliteMessage(writer *kafka.Writer, source *m.Source, endpoint *m.Endpoint) { - c.EndpointCounter++ -} func (c *dummyChecker) pingRHC(source *m.Source, rhcConnection *m.RhcConnection, headers []kafka.Header) { c.RhcConnectionCounter++ } @@ -70,18 +60,6 @@ func TestApplicationAvailability(t *testing.T) { } } -func TestEndpointAvailability(t *testing.T) { - var acr = &dummyChecker{} - acr.EndpointAvailabilityCheck(&m.Source{ - // 3 endpoints on this source. - Endpoints: []m.Endpoint{{}, {}, {}}, - }) - - if acr.EndpointCounter != 3 { - t.Errorf("availability check not called for all endpoints, got %v expected %v", acr.EndpointCounter, 3) - } -} - func TestRhcConnectionAvailability(t *testing.T) { var acr = &dummyChecker{} acr.RhcConnectionAvailabilityCheck(&m.Source{ @@ -105,7 +83,6 @@ func TestAllAvailability(t *testing.T) { SourceRhcConnections: []m.SourceRhcConnection{{RhcConnection: m.RhcConnection{RhcId: "asdf"}}}, } acr.ApplicationAvailabilityCheck(src) - acr.EndpointAvailabilityCheck(src) acr.RhcConnectionAvailabilityCheck(src, []kafka.Header{}) if acr.ApplicationCounter != 3 { From 0e9822cd5a3cd00467a472ed9294e90c7445f60c Mon Sep 17 00:00:00 2001 From: Dalia Cervantes Date: Tue, 21 Jan 2025 09:46:51 -0600 Subject: [PATCH 3/4] Removing EndpointCounter --- service/availability_check_test.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/service/availability_check_test.go b/service/availability_check_test.go index 793519f3..a349b1ee 100644 --- a/service/availability_check_test.go +++ b/service/availability_check_test.go @@ -13,7 +13,6 @@ import ( type dummyChecker struct { ApplicationCounter int - EndpointCounter int RhcConnectionCounter int } @@ -89,10 +88,6 @@ func TestAllAvailability(t *testing.T) { t.Errorf("availability check not called for both applications, got %v expected %v", acr.ApplicationCounter, 3) } - if acr.EndpointCounter != 4 { - t.Errorf("availability check not called for all endpoints, got %v expected %v", acr.EndpointCounter, 4) - } - if acr.RhcConnectionCounter != 1 { t.Errorf("availability check not called for all rhc connections, got %v expected %v", acr.RhcConnectionCounter, 1) } From 816485312560e7f8f77cadefd0cebce96814a3b6 Mon Sep 17 00:00:00 2001 From: Dalia Cervantes Date: Tue, 21 Jan 2025 10:00:39 -0600 Subject: [PATCH 4/4] go fmt --- service/availability_check.go | 1 - 1 file changed, 1 deletion(-) diff --git a/service/availability_check.go b/service/availability_check.go index ef5375f2..92cc7e46 100644 --- a/service/availability_check.go +++ b/service/availability_check.go @@ -22,7 +22,6 @@ import ( const ( disconnectedRhc = "cloud-connector returned 'disconnected'" unavailableRhc = "cloud-connector returned a non-ok exit code for this connection" - ) type availabilityCheckRequester struct {