Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CCXDEV-14564] Heartbeats consumer #2098

Merged
merged 7 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions consumer/heartbeat.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"version" : "1.0.0",
"idHash" : "5b43352e973d2f1ccbb274f392460430237a670c0a0aedfc15cad7d049f6d93efede450496f9aba6bd93b1011d1a4c9ac9f56ce68df5f5d1929d19eeed8b3f5f",
"updated-jars" : {
"version" : "1.0.0",
"jars" : [ {
"name" : "EMPRESP.war",
"version" : " ",
"attributes" : {
"sha1Checksum" : "d95d4c072772eca2559dca114901a9e8e4fcc6af",
"path" : "EMPRESP.war",
"sha256Checksum" : "67b6e2d08d430b9ead1fcda24f7e6dba17e9531fe7ab88973b99b6f26f3777b7",
"sha512Checksum" : "9d3d9752dcd16a21d9289dd136b6c95ffed48a61ffb28b1759ce860feb05fa9324e05a7758d963408fa4d031937ec3ffaf67d005ce9cfd7d7ca4cc5fbcb0b463",
"runtime-name" : "EMPRESP.war",
"deployment" : "EMPRESP.war"
}
} ]
},
"details": {
"type": "General Java app",
"is_ocp": "true",
"pod_name": "sample-app-799584c8bc-9b8jn",
"pod_namespace": "sample-app",
"object_uid": "24f31da9-4e40-4b92-ab15-8c5f4dd5fb8c"
}
}
1 change: 1 addition & 0 deletions consumer/heartbeat_ingress.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"account":"12345","category":"analytics","content_type":"application/vnd.redhat.runtimes-java-general.analytics+tgz","metadata":{"reporter":"","stale_timestamp":"0001-01-01T00:00:00Z"},"request_id":"ingress-service-5df7465c49-nlgfw/A0CjAuL4dj-002212","principal":"12345","org_id":"12345","service":"runtimes-java-general","size":883,"url":"myserverurl","b64_identity":"xyz","timestamp":"2024-11-19T10:02:56.725889989Z"}
313 changes: 313 additions & 0 deletions consumer/heartbeats_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
package consumer

import (
"context"
"encoding/json"
"errors"
"io"
"net/http"
"strings"
"time"

// "github.com/RedHatInsights/insights-results-aggregator/producer"
"github.com/RedHatInsights/insights-results-aggregator/storage"
"github.com/rs/zerolog/log"

// "github.com/RedHatInsights/insights-results-aggregator/types"
"github.com/Shopify/sarama"

"github.com/RedHatInsights/insights-results-aggregator/broker"
)

// Metadata taken from https://github.com/redhatinsights/insights-ingress-go/internal/validators/types.go
type Metadata struct {
juandspy marked this conversation as resolved.
Show resolved Hide resolved
IPAddresses []string `json:"ip_addresses,omitempty"`
Account string `json:"account,omitempty"`
OrgID string `json:"org_id,omitempty"`
InsightsID string `json:"insights_id,omitempty"`
MachineID string `json:"machine_id,omitempty"`
SubManID string `json:"subscription_manager_id,omitempty"`
MacAddresses []string `json:"mac_addresses,omitempty"`
FQDN string `json:"fqdn,omitempty"`
BiosUUID string `json:"bios_uuid,omitempty"`
DisplayName string `json:"display_name,omitempty"`
AnsibleHost string `json:"ansible_host,omitempty"`
CustomMetadata map[string]string `json:"custom_metadata,omitempty"`
Reporter string `json:"reporter"`
StaleTimestamp time.Time `json:"stale_timestamp"`
QueueKey string `json:"queue_key,omitempty"`
}

// Request taken from https://github.com/redhatinsights/insights-ingress-go/internal/validators/types.go
type Request struct {
Account string `json:"account"`
Category string `json:"category"`
ContentType string `json:"content_type"`
Metadata Metadata `json:"metadata"`
RequestID string `json:"request_id"`
Principal string `json:"principal"`
OrgID string `json:"org_id"`
Service string `json:"service"`
Size int64 `json:"size"`
URL string `json:"url"`
ID string `json:"id,omitempty"`
B64Identity string `json:"b64_identity"`
Timestamp time.Time `json:"timestamp"`
}

// KkfMessageProcessor Processor for kafka messages used by NewKafkaConsumer
type KkfMessageProcessor interface {
HandleMessage(msg *sarama.ConsumerMessage) error
}

// KfkConsumer ...
type KfkConsumer struct {
juandspy marked this conversation as resolved.
Show resolved Hide resolved
configuration broker.Configuration
client sarama.ConsumerGroup
Storage storage.Storage
MessageProcessor KkfMessageProcessor
ready chan bool
// cancel context.CancelFunc
}

// TODO
// &HearbeatMessageProcessor{Storage: storage}

// NewKfkConsumer constructs a kafka consumer with a message processor
func NewKfkConsumer(brokerCfg broker.Configuration, processor KkfMessageProcessor) (*KfkConsumer, error) {
saramaConfig, err := broker.SaramaConfigFromBrokerConfig(brokerCfg)
if err != nil {
log.Error().Err(err).Msg("Unable to create sarama configuration from current broker configuration")
return nil, err
}
log.Info().
Str("addresses", brokerCfg.Addresses).
Str("group", brokerCfg.Group).
Msg("New consumer group")

consumerGroup, err := sarama.NewConsumerGroup(strings.Split(brokerCfg.Addresses, ","), brokerCfg.Group, saramaConfig)
if err != nil {
log.Error().Err(err).Msg("Unable to create consumer group")
return nil, err
}
log.Info().Msg("Consumer group has been created")

consumer := &KfkConsumer{
configuration: brokerCfg,
client: consumerGroup,
MessageProcessor: processor,
ready: make(chan bool),
}

return consumer, nil
}

// Serve starts listening for messages and processing them. It blocks current thread.
// TODO pass ctx, cancel := context.WithCancel(context.Background())
func (consumer *KfkConsumer) Serve(ctx context.Context) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question as above: why not reusing the already existing implementation of KafkaConsumer?

ctx, cancel := context.WithCancel(ctx)

go func() {
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := consumer.client.Consume(ctx, []string{consumer.configuration.Topic}, consumer); err != nil {
log.Fatal().Err(err).Msg("unable to recreate kafka session")
}

// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
return
}

log.Info().Msg("created new kafka session")

consumer.ready = make(chan bool)
}
}()

// Wait for the consumer to be set up
log.Info().Msg("waiting for consumer to become ready")
<-consumer.ready
log.Info().Msg("finished waiting for consumer to become ready")

// Actual processing is done in goroutine created by sarama (see ConsumeClaim below)
log.Info().Msg("started serving consumer")
<-ctx.Done()
log.Info().Msg("context cancelled, exiting")

cancel()
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *KfkConsumer) Setup(sarama.ConsumerGroupSession) error {
log.Info().Msg("new session has been setup")
// Mark the consumer as ready
close(consumer.ready)
return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *KfkConsumer) Cleanup(sarama.ConsumerGroupSession) error {
log.Info().Msg("new session has been finished")
return nil
}

// ConsumeClaim starts a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *KfkConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
log.Info().
Int64(offsetKey, claim.InitialOffset()).
Msg("starting messages loop")

for {
select {
case message, ok := <-claim.Messages():
if !ok {
log.Printf("message channel was closed")
return nil
}
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
err := consumer.MessageProcessor.HandleMessage(message)
if err != nil {
log.Error().Err(err).Msg("Error processing message")
}
session.MarkMessage(message, "")
case <-session.Context().Done():
return nil
}
}
}

// Close method closes all resources used by consumer
func (consumer *KfkConsumer) Close() error {
if consumer.client != nil {
if err := consumer.client.Close(); err != nil {
log.Error().Err(err).Msg("unable to close consumer group")
}
}

return nil
}

// HearbeatMessageProcessor implementation of KkfMessageProccessor for heartbeats
type HearbeatMessageProcessor struct {
Storage storage.Storage
}

// HandleMessage get a kafka message, deserializes it, parses it and finally send the result to be stored
func (p *HearbeatMessageProcessor) HandleMessage(msg *sarama.ConsumerMessage) error {
log.Info().
Int64(offsetKey, msg.Offset).
Int32(partitionKey, msg.Partition).
Str(topicKey, msg.Topic).
Time("message_timestamp", msg.Timestamp).
Msgf("started processing message")

// process message
messageRequest, err := deserializeMessage(msg.Value)
if err != nil {
log.Error().Err(err).Msg("Error deserializing message from Kafka")
return err
}

// skip messages not for us
if messageRequest.ContentType != "application/vnd.redhat.runtimes-java-general.analytics+tgz" {
log.Info().Msg("Content not for runtimes. Skipping")
return nil
}

instanceID, err := parseHearbeat(messageRequest.URL)

// Something went wrong while processing the message.
if err != nil {
log.Error().Err(err).Msg("Error processing message consumed from Kafka")
return err
}
log.Info().Int64(offsetKey, msg.Offset).Msg("Message consumed")

err = p.updateHeartbeat(instanceID, messageRequest.Timestamp)
if err != nil {
log.Error().Err(err).Msg("Error updating hearbeat Kafka")
return err
}
log.Info().Msg("Heartbeat updated")

return nil
}

// deserializeMessage deserialize a kafka meesage
func deserializeMessage(messageValue []byte) (Request, error) {
var deserialized Request

received, err := DecompressMessage(messageValue)
if err != nil {
return deserialized, err
}

err = json.Unmarshal(received, &deserialized)
if err != nil {
return deserialized, err
}
return deserialized, nil
}

// HeartbeatDetails details of the hearbeat
type HeartbeatDetails struct {
ObjectUID string `json:"object_uid"`
}

// Heartbeat data
type Heartbeat struct {
Details HeartbeatDetails `json:"details"`
}

func parseHearbeat(url string) (string, error) {
resp, err := http.Get(url) /* #nosec G107 */
if err != nil {
log.Error().Err(err).Msg("Error downloading remote file")
return "", err
}
defer func() { _ = resp.Body.Close() }()

bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
log.Error().Err(err).Msg("Error reading remote file")
return "", err
}

var heartbeat Heartbeat
err = json.Unmarshal(bodyBytes, &heartbeat)
if err != nil {
log.Error().Err(err).Msg("Error parsing JSON")
return "", err
}

if heartbeat.Details.ObjectUID == "" {
return "", errors.New("Empty value for Object UID")
}

log.Info().Msg("Read heartbeat")
log.Debug().Msgf("Processed heartbeat for %s", heartbeat.Details.ObjectUID)
return heartbeat.Details.ObjectUID, nil
}

func (p *HearbeatMessageProcessor) updateHeartbeat(
objectUID string, timestamp time.Time,
) error {
if dvoStorage, ok := p.Storage.(storage.DVORecommendationsStorage); ok {
err := dvoStorage.UpdateHeartbeat(
objectUID,
timestamp,
)
if err != nil {
log.Error().Err(err).Msg("Error updating heartbeat in database")
return err
}
log.Debug().Msgf("Update heartbeat %s", objectUID)
return nil
}
err := errors.New("heartbeats could not be updated")
log.Error().Err(err).Msg(unexpectedStorageType)
return err
}
Loading
Loading