diff --git a/internal/handler/main.go b/internal/handler/main.go index db7bf2a..c7b2e8c 100644 --- a/internal/handler/main.go +++ b/internal/handler/main.go @@ -62,11 +62,12 @@ type S3 struct { } type InsightsMessage struct { - Payload []PayloadInput `json:"payload"` - BinaryPayload map[string]string `json:"binaryPayload"` - Annotations map[string]string `json:"annotations"` - Labels map[string]string `json:"labels"` - Type string `json:"type,omitempty"` + Payload []PayloadInput `json:"payload"` + BinaryPayload map[string]string `json:"binaryPayload"` + Annotations map[string]string `json:"annotations"` + Labels map[string]string `json:"labels"` + Type string `json:"type,omitempty"` + RequeueAttempts int `json:"requeueAttempts,omitempty"` } type PayloadInput struct { diff --git a/internal/handler/messaging.go b/internal/handler/messaging.go index e86b097..b3091bf 100644 --- a/internal/handler/messaging.go +++ b/internal/handler/messaging.go @@ -19,11 +19,11 @@ type Messaging struct { EnableDebug bool ProblemsFromSBOM bool TrivyServerEndpoint string - RequeueAttempts int + MessageQWriter func(data []byte) error } // NewMessaging returns a messaging with config -func NewMessaging(config mq.Config, lagoonAPI LagoonAPI, s3 S3, startupAttempts int, startupInterval int, enableDebug bool, problemsFromSBOM bool, trivyServerEndpoint string) *Messaging { +func NewMessaging(config mq.Config, lagoonAPI LagoonAPI, s3 S3, startupAttempts int, startupInterval int, enableDebug bool, problemsFromSBOM bool, trivyServerEndpoint string, MessageQWriter func(data []byte) error) *Messaging { return &Messaging{ Config: config, LagoonAPI: lagoonAPI, @@ -33,7 +33,7 @@ func NewMessaging(config mq.Config, lagoonAPI LagoonAPI, s3 S3, startupAttempts EnableDebug: enableDebug, ProblemsFromSBOM: problemsFromSBOM, TrivyServerEndpoint: trivyServerEndpoint, - RequeueAttempts: 0, + MessageQWriter: MessageQWriter, } } @@ -175,16 +175,21 @@ func (h *Messaging) processMessageQueue(message mq.Message) { if insights.InsightsType != Direct { err := h.sendToLagoonS3(incoming, insights, resource) if err != nil { - // parse error to determine if retry is valid - if err.Error() == "Could not connect to the endpoint URL" || err.Error() == "Connect timeout on endpoint URL" { - slog.Error("Unable to send to S3", "Error", err.Error()) - } else { - h.RequeueAttempts++ - if h.RequeueAttempts <= 3 { - rejectMessage(true) - } else { - slog.Error("Retries failed, unable to send to S3", "Error", err.Error()) + incoming.RequeueAttempts++ + updatedMessage, err := json.Marshal(incoming) + if err != nil { + fmt.Printf(err.Error()) + } + if incoming.RequeueAttempts <= 3 { + rejectMessage(false) + if err := h.MessageQWriter(updatedMessage); err != nil { + slog.Error("Error re-queueing message", "Error", err.Error()) } + return + } else { + slog.Error("Retries failed, unable to send to S3", "Error", err.Error()) + rejectMessage(false) + return } } } @@ -201,9 +206,17 @@ func (h *Messaging) processMessageQueue(message mq.Message) { err := h.sendToLagoonAPI(incoming, resource, insights) if err != nil { - h.RequeueAttempts++ - if h.RequeueAttempts <= 3 { - rejectMessage(true) + incoming.RequeueAttempts++ + updatedMessage, err := json.Marshal(incoming) + if err != nil { + fmt.Printf(err.Error()) + } + if incoming.RequeueAttempts <= 3 { + rejectMessage(false) + if err := h.MessageQWriter(updatedMessage); err != nil { + slog.Error("Error re-queueing message", "Error", err.Error()) + } + return } else { slog.Error("Retries failed, unable to send to the API", "Error", err.Error()) rejectMessage(false) diff --git a/main.go b/main.go index 9c23bca..69d4f6e 100644 --- a/main.go +++ b/main.go @@ -41,8 +41,30 @@ var ( enableDebug bool problemsFromSBOM bool trivyServerEndpoint string + config mq.Config ) +func mqWriteObject(data []byte) error { + messageQ, err := mq.New(config) + if err != nil { + return err + } + defer messageQ.Close() + + producer, err := messageQ.SyncProducer("lagoon-handler") + if err != nil { + return err + } + + err = producer.Produce(data) + + if err != nil { + return err + } + + return nil +} + func main() { flag.StringVar(&lagoonAppID, "lagoon-app-id", "insights-handler", "The appID to use that will be sent with messages.") flag.StringVar(&mqUser, "rabbitmq-username", "guest", "The username of the rabbitmq user.") @@ -156,7 +178,7 @@ func main() { slog.Error("Unable to register filters from disk", "Error", err) } - config := mq.Config{ + config = mq.Config{ ReconnectDelay: time.Duration(rabbitReconnectRetryInterval) * time.Second, Exchanges: mq.Exchanges{ { @@ -195,9 +217,20 @@ func main() { }, }, }, + Producers: mq.Producers{ + { + Name: "lagoon-handler", + Exchange: "lagoon-insights", + Sync: true, + Options: mq.Options{ + "delivery_mode": "2", + "headers": "", + "content_type": "", + }, + }, + }, DSN: fmt.Sprintf("amqp://%s:%s@%s/", broker.Username, broker.Password, broker.Hostname), } - messaging := handler.NewMessaging(config, graphQLConfig, s3Config, @@ -206,6 +239,7 @@ func main() { enableDebug, problemsFromSBOM, trivyServerEndpoint, + mqWriteObject, ) // start the consumer