Skip to content

Commit

Permalink
Merge pull request #738 from dscervantes/remove/topic
Browse files Browse the repository at this point in the history
Remove not used Kafka topic platform.topological-inventory.operations…
  • Loading branch information
MikelAlejoBR authored Jan 21, 2025
2 parents 962121c + 8164853 commit 2906f57
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 104 deletions.
76 changes: 0 additions & 76 deletions service/availability_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,17 @@ import (
"strconv"
"time"

"github.com/RedHatInsights/sources-api-go/config"
"github.com/RedHatInsights/sources-api-go/dao"
"github.com/RedHatInsights/sources-api-go/kafka"
l "github.com/RedHatInsights/sources-api-go/logger"
h "github.com/RedHatInsights/sources-api-go/middleware/headers"
m "github.com/RedHatInsights/sources-api-go/model"
"github.com/RedHatInsights/sources-api-go/util"
"github.com/labstack/echo/v4"
)

const (
disconnectedRhc = "cloud-connector returned 'disconnected'"
unavailableRhc = "cloud-connector returned a non-ok exit code for this connection"

satelliteRequestedTopic = "platform.topological-inventory.operations-satellite"
)

type availabilityCheckRequester struct {
Expand All @@ -36,12 +32,10 @@ type availabilityCheckRequester struct {
type availabilityChecker interface {
// public methods
ApplicationAvailabilityCheck(source *m.Source)
EndpointAvailabilityCheck(source *m.Source)
RhcConnectionAvailabilityCheck(source *m.Source, headers []kafka.Header)

// private methods
httpAvailabilityRequest(source *m.Source, app *m.Application, uri *url.URL)
publishSatelliteMessage(writer *kafka.Writer, source *m.Source, endpoint *m.Endpoint)
pingRHC(source *m.Source, rhcConnection *m.RhcConnection, headers []kafka.Header)
updateRhcStatus(source *m.Source, status string, errstr string, rhcConnection *m.RhcConnection, headers []kafka.Header)

Expand All @@ -50,9 +44,6 @@ type availabilityChecker interface {
}

var (
// storing the satellite topic here since it doesn't change after initial
// startup.
satelliteTopic = config.Get().KafkaTopic(satelliteRequestedTopic)
// cloud connector related fields
cloudConnectorUrl = os.Getenv("CLOUD_CONNECTOR_AVAILABILITY_CHECK_URL")
cloudConnectorPsk = os.Getenv("CLOUD_CONNECTOR_PSK")
Expand All @@ -73,8 +64,6 @@ func RequestAvailabilityCheck(c echo.Context, source *m.Source, headers []kafka.
// of overwriting the status set by the RHC check
if len(source.SourceRhcConnections) != 0 {
ac.RhcConnectionAvailabilityCheck(source, headers)
} else if len(source.Endpoints) != 0 {
ac.EndpointAvailabilityCheck(source)
}

ac.Logger().Infof("Finished Publishing Availability Messages for Source %v", source.ID)
Expand Down Expand Up @@ -138,71 +127,6 @@ func (acr availabilityCheckRequester) httpAvailabilityRequest(source *m.Source,
}
}

// codified version of what we were sending over kafka. The satellite operations
// worker picks this message up and makes the proper requests to the
// platform-receptor-controller.
type satelliteAvailabilityMessage struct {
SourceID string `json:"source_id"`
SourceUID *string `json:"source_uid"`
SourceRef *string `json:"source_ref"`
ExternalTenant string `json:"external_tenant"`
}

// sends off an availability check kafka message for each of the source's
// endpoints but only if the source is of type satellite - we do not support any
// other operations currently (legacy behavior)
func (acr availabilityCheckRequester) EndpointAvailabilityCheck(source *m.Source) {
if source.SourceType.Name != "satellite" {
acr.Logger().Infof("Skipping Endpoint availability check for non-satellite source type")
return
}

// instantiate a producer for this source
writer, err := kafka.GetWriter(&kafka.Options{
BrokerConfig: conf.KafkaBrokerConfig,
Topic: satelliteTopic,
Logger: acr.Logger(),
})
if err != nil {
acr.Logger().Errorf(`[source_id: %d] unable to create a Kafka writer for the endpoint availability check: %s`, source.ID, err)
return
}

l.Log.Infof("Publishing message for Source [%v] topic [%v] ", source.ID, writer.Topic)
for _, endpoint := range source.Endpoints {
acr.publishSatelliteMessage(writer, source, &endpoint)
}
}

func (acr availabilityCheckRequester) publishSatelliteMessage(writer *kafka.Writer, source *m.Source, endpoint *m.Endpoint) {
acr.Logger().Infof("[source_id: %d] Requesting Availability Check for Endpoint %v", source.ID, endpoint.ID)
defer kafka.CloseWriter(writer, "publish satellite message")

msg := &kafka.Message{}
err := msg.AddValueAsJSON(map[string]interface{}{
"params": satelliteAvailabilityMessage{
SourceID: strconv.FormatInt(source.ID, 10),
SourceUID: source.Uid,
SourceRef: source.SourceRef,
ExternalTenant: source.Tenant.ExternalTenant,
}})
if err != nil {
acr.Logger().Warnf("Failed to add struct value as json to kafka message")
return
}

msg.AddHeaders([]kafka.Header{
{Key: "event_type", Value: []byte("Source.availability_check")},
{Key: "encoding", Value: []byte("json")},
{Key: h.XRHID, Value: []byte(util.GeneratedXRhIdentity(source.Tenant.ExternalTenant, source.Tenant.OrgID))},
{Key: h.AccountNumber, Value: []byte(endpoint.Tenant.ExternalTenant)},
})

if err = kafka.Produce(writer, msg); err != nil {
acr.Logger().Warnf("Failed to produce kafka message for Source %v, error: %v", source.ID, err)
}
}

type rhcConnectionStatusResponse struct {
Status string `json:"status"`
}
Expand Down
28 changes: 0 additions & 28 deletions service/availability_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

type dummyChecker struct {
ApplicationCounter int
EndpointCounter int
RhcConnectionCounter int
}

Expand All @@ -31,13 +30,6 @@ func (c *dummyChecker) ApplicationAvailabilityCheck(source *m.Source) {
}
}

// send out a satellite kafka message per endpoint
func (c *dummyChecker) EndpointAvailabilityCheck(source *m.Source) {
for i := 0; i < len(source.Endpoints); i++ {
c.publishSatelliteMessage(&kafka.Writer{}, source, &source.Endpoints[i])
}
}

// ping RHC for each RHC Connection
func (c *dummyChecker) RhcConnectionAvailabilityCheck(source *m.Source, headers []kafka.Header) {
for i := 0; i < len(source.SourceRhcConnections); i++ {
Expand All @@ -49,9 +41,6 @@ func (c *dummyChecker) RhcConnectionAvailabilityCheck(source *m.Source, headers
func (c *dummyChecker) httpAvailabilityRequest(source *m.Source, app *m.Application, uri *url.URL) {
c.ApplicationCounter++
}
func (c *dummyChecker) publishSatelliteMessage(writer *kafka.Writer, source *m.Source, endpoint *m.Endpoint) {
c.EndpointCounter++
}
func (c *dummyChecker) pingRHC(source *m.Source, rhcConnection *m.RhcConnection, headers []kafka.Header) {
c.RhcConnectionCounter++
}
Expand All @@ -70,18 +59,6 @@ func TestApplicationAvailability(t *testing.T) {
}
}

func TestEndpointAvailability(t *testing.T) {
var acr = &dummyChecker{}
acr.EndpointAvailabilityCheck(&m.Source{
// 3 endpoints on this source.
Endpoints: []m.Endpoint{{}, {}, {}},
})

if acr.EndpointCounter != 3 {
t.Errorf("availability check not called for all endpoints, got %v expected %v", acr.EndpointCounter, 3)
}
}

func TestRhcConnectionAvailability(t *testing.T) {
var acr = &dummyChecker{}
acr.RhcConnectionAvailabilityCheck(&m.Source{
Expand All @@ -105,17 +82,12 @@ func TestAllAvailability(t *testing.T) {
SourceRhcConnections: []m.SourceRhcConnection{{RhcConnection: m.RhcConnection{RhcId: "asdf"}}},
}
acr.ApplicationAvailabilityCheck(src)
acr.EndpointAvailabilityCheck(src)
acr.RhcConnectionAvailabilityCheck(src, []kafka.Header{})

if acr.ApplicationCounter != 3 {
t.Errorf("availability check not called for both applications, got %v expected %v", acr.ApplicationCounter, 3)
}

if acr.EndpointCounter != 4 {
t.Errorf("availability check not called for all endpoints, got %v expected %v", acr.EndpointCounter, 4)
}

if acr.RhcConnectionCounter != 1 {
t.Errorf("availability check not called for all rhc connections, got %v expected %v", acr.RhcConnectionCounter, 1)
}
Expand Down

0 comments on commit 2906f57

Please sign in to comment.