Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove not used Kafka topic platform.topological-inventory.operations… #738

Merged
merged 4 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 0 additions & 76 deletions service/availability_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,17 @@ import (
"strconv"
"time"

"github.com/RedHatInsights/sources-api-go/config"
"github.com/RedHatInsights/sources-api-go/dao"
"github.com/RedHatInsights/sources-api-go/kafka"
l "github.com/RedHatInsights/sources-api-go/logger"
h "github.com/RedHatInsights/sources-api-go/middleware/headers"
m "github.com/RedHatInsights/sources-api-go/model"
"github.com/RedHatInsights/sources-api-go/util"
"github.com/labstack/echo/v4"
)

const (
disconnectedRhc = "cloud-connector returned 'disconnected'"
unavailableRhc = "cloud-connector returned a non-ok exit code for this connection"

satelliteRequestedTopic = "platform.topological-inventory.operations-satellite"
)

type availabilityCheckRequester struct {
Expand All @@ -36,12 +32,10 @@ type availabilityCheckRequester struct {
type availabilityChecker interface {
// public methods
ApplicationAvailabilityCheck(source *m.Source)
EndpointAvailabilityCheck(source *m.Source)
RhcConnectionAvailabilityCheck(source *m.Source, headers []kafka.Header)

// private methods
httpAvailabilityRequest(source *m.Source, app *m.Application, uri *url.URL)
publishSatelliteMessage(writer *kafka.Writer, source *m.Source, endpoint *m.Endpoint)
pingRHC(source *m.Source, rhcConnection *m.RhcConnection, headers []kafka.Header)
updateRhcStatus(source *m.Source, status string, errstr string, rhcConnection *m.RhcConnection, headers []kafka.Header)

Expand All @@ -50,9 +44,6 @@ type availabilityChecker interface {
}

var (
// storing the satellite topic here since it doesn't change after initial
// startup.
satelliteTopic = config.Get().KafkaTopic(satelliteRequestedTopic)
// cloud connector related fields
cloudConnectorUrl = os.Getenv("CLOUD_CONNECTOR_AVAILABILITY_CHECK_URL")
cloudConnectorPsk = os.Getenv("CLOUD_CONNECTOR_PSK")
Expand All @@ -73,8 +64,6 @@ func RequestAvailabilityCheck(c echo.Context, source *m.Source, headers []kafka.
// of overwriting the status set by the RHC check
if len(source.SourceRhcConnections) != 0 {
ac.RhcConnectionAvailabilityCheck(source, headers)
} else if len(source.Endpoints) != 0 {
ac.EndpointAvailabilityCheck(source)
}

ac.Logger().Infof("Finished Publishing Availability Messages for Source %v", source.ID)
Expand Down Expand Up @@ -138,71 +127,6 @@ func (acr availabilityCheckRequester) httpAvailabilityRequest(source *m.Source,
}
}

// codified version of what we were sending over kafka. The satellite operations
// worker picks this message up and makes the proper requests to the
// platform-receptor-controller.
type satelliteAvailabilityMessage struct {
SourceID string `json:"source_id"`
SourceUID *string `json:"source_uid"`
SourceRef *string `json:"source_ref"`
ExternalTenant string `json:"external_tenant"`
}

// sends off an availability check kafka message for each of the source's
// endpoints but only if the source is of type satellite - we do not support any
// other operations currently (legacy behavior)
func (acr availabilityCheckRequester) EndpointAvailabilityCheck(source *m.Source) {
if source.SourceType.Name != "satellite" {
acr.Logger().Infof("Skipping Endpoint availability check for non-satellite source type")
return
}

// instantiate a producer for this source
writer, err := kafka.GetWriter(&kafka.Options{
BrokerConfig: conf.KafkaBrokerConfig,
Topic: satelliteTopic,
Logger: acr.Logger(),
})
if err != nil {
acr.Logger().Errorf(`[source_id: %d] unable to create a Kafka writer for the endpoint availability check: %s`, source.ID, err)
return
}

l.Log.Infof("Publishing message for Source [%v] topic [%v] ", source.ID, writer.Topic)
for _, endpoint := range source.Endpoints {
acr.publishSatelliteMessage(writer, source, &endpoint)
}
}

func (acr availabilityCheckRequester) publishSatelliteMessage(writer *kafka.Writer, source *m.Source, endpoint *m.Endpoint) {
acr.Logger().Infof("[source_id: %d] Requesting Availability Check for Endpoint %v", source.ID, endpoint.ID)
defer kafka.CloseWriter(writer, "publish satellite message")

msg := &kafka.Message{}
err := msg.AddValueAsJSON(map[string]interface{}{
"params": satelliteAvailabilityMessage{
SourceID: strconv.FormatInt(source.ID, 10),
SourceUID: source.Uid,
SourceRef: source.SourceRef,
ExternalTenant: source.Tenant.ExternalTenant,
}})
if err != nil {
acr.Logger().Warnf("Failed to add struct value as json to kafka message")
return
}

msg.AddHeaders([]kafka.Header{
{Key: "event_type", Value: []byte("Source.availability_check")},
{Key: "encoding", Value: []byte("json")},
{Key: h.XRHID, Value: []byte(util.GeneratedXRhIdentity(source.Tenant.ExternalTenant, source.Tenant.OrgID))},
{Key: h.AccountNumber, Value: []byte(endpoint.Tenant.ExternalTenant)},
})

if err = kafka.Produce(writer, msg); err != nil {
acr.Logger().Warnf("Failed to produce kafka message for Source %v, error: %v", source.ID, err)
}
}

type rhcConnectionStatusResponse struct {
Status string `json:"status"`
}
Expand Down
28 changes: 0 additions & 28 deletions service/availability_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

type dummyChecker struct {
ApplicationCounter int
EndpointCounter int
RhcConnectionCounter int
}

Expand All @@ -31,13 +30,6 @@ func (c *dummyChecker) ApplicationAvailabilityCheck(source *m.Source) {
}
}

// send out a satellite kafka message per endpoint
func (c *dummyChecker) EndpointAvailabilityCheck(source *m.Source) {
for i := 0; i < len(source.Endpoints); i++ {
c.publishSatelliteMessage(&kafka.Writer{}, source, &source.Endpoints[i])
}
}

// ping RHC for each RHC Connection
func (c *dummyChecker) RhcConnectionAvailabilityCheck(source *m.Source, headers []kafka.Header) {
for i := 0; i < len(source.SourceRhcConnections); i++ {
Expand All @@ -49,9 +41,6 @@ func (c *dummyChecker) RhcConnectionAvailabilityCheck(source *m.Source, headers
func (c *dummyChecker) httpAvailabilityRequest(source *m.Source, app *m.Application, uri *url.URL) {
c.ApplicationCounter++
}
func (c *dummyChecker) publishSatelliteMessage(writer *kafka.Writer, source *m.Source, endpoint *m.Endpoint) {
c.EndpointCounter++
}
func (c *dummyChecker) pingRHC(source *m.Source, rhcConnection *m.RhcConnection, headers []kafka.Header) {
c.RhcConnectionCounter++
}
Expand All @@ -70,18 +59,6 @@ func TestApplicationAvailability(t *testing.T) {
}
}

func TestEndpointAvailability(t *testing.T) {
var acr = &dummyChecker{}
acr.EndpointAvailabilityCheck(&m.Source{
// 3 endpoints on this source.
Endpoints: []m.Endpoint{{}, {}, {}},
})

if acr.EndpointCounter != 3 {
t.Errorf("availability check not called for all endpoints, got %v expected %v", acr.EndpointCounter, 3)
}
}

func TestRhcConnectionAvailability(t *testing.T) {
var acr = &dummyChecker{}
acr.RhcConnectionAvailabilityCheck(&m.Source{
Expand All @@ -105,17 +82,12 @@ func TestAllAvailability(t *testing.T) {
SourceRhcConnections: []m.SourceRhcConnection{{RhcConnection: m.RhcConnection{RhcId: "asdf"}}},
}
acr.ApplicationAvailabilityCheck(src)
acr.EndpointAvailabilityCheck(src)
acr.RhcConnectionAvailabilityCheck(src, []kafka.Header{})

if acr.ApplicationCounter != 3 {
t.Errorf("availability check not called for both applications, got %v expected %v", acr.ApplicationCounter, 3)
}

if acr.EndpointCounter != 4 {
t.Errorf("availability check not called for all endpoints, got %v expected %v", acr.EndpointCounter, 4)
}

if acr.RhcConnectionCounter != 1 {
t.Errorf("availability check not called for all rhc connections, got %v expected %v", acr.RhcConnectionCounter, 1)
}
Expand Down
Loading