Skip to content

Commit

Permalink
Moves logic into function
Browse files Browse the repository at this point in the history
  • Loading branch information
CGoodwin90 committed Dec 19, 2023
1 parent c243281 commit 10b5e31
Showing 1 changed file with 21 additions and 32 deletions.
53 changes: 21 additions & 32 deletions internal/handler/messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,25 @@ func (h *Messaging) processMessageQueue(message mq.Message) {
}
}(message)

rejectRequeue := func(message mq.Message) func(func(bool), *InsightsMessage, int, string, error) {
return func(rejectMessage func(bool), incoming *InsightsMessage, retryAttemptLimit int, target string, err error) {
incoming.RequeueAttempts++
updatedMessage, jsonErr := json.Marshal(incoming)
if jsonErr != nil {
slog.Error(jsonErr.Error())
}
if incoming.RequeueAttempts <= retryAttemptLimit {
rejectMessage(false)
if qErr := h.MessageQWriter(updatedMessage); qErr != nil {
slog.Error("Error re-queueing message", "Error", qErr.Error())
}
} else {
slog.Error(fmt.Sprintf("Retries failed, unable to send to %s", target), "Error", err.Error())
rejectMessage(false)
}
}
}(message)

incoming := &InsightsMessage{}
err := json.Unmarshal(message.Body(), incoming)

Expand Down Expand Up @@ -175,22 +194,7 @@ func (h *Messaging) processMessageQueue(message mq.Message) {
if insights.InsightsType != Direct {
err := h.sendToLagoonS3(incoming, insights, resource)
if err != nil {
incoming.RequeueAttempts++
updatedMessage, err := json.Marshal(incoming)
if err != nil {
fmt.Printf(err.Error())
}
if incoming.RequeueAttempts <= 3 {
rejectMessage(false)
if err := h.MessageQWriter(updatedMessage); err != nil {
slog.Error("Error re-queueing message", "Error", err.Error())
}
return
} else {
slog.Error("Retries failed, unable to send to S3", "Error", err.Error())
rejectMessage(false)
return
}
rejectRequeue(rejectMessage, incoming, 3, "S3", err)
}
}
}
Expand All @@ -206,22 +210,7 @@ func (h *Messaging) processMessageQueue(message mq.Message) {
err := h.sendToLagoonAPI(incoming, resource, insights)

if err != nil {
incoming.RequeueAttempts++
updatedMessage, err := json.Marshal(incoming)
if err != nil {
fmt.Printf(err.Error())
}
if incoming.RequeueAttempts <= 3 {
rejectMessage(false)
if err := h.MessageQWriter(updatedMessage); err != nil {
slog.Error("Error re-queueing message", "Error", err.Error())
}
return
} else {
slog.Error("Retries failed, unable to send to the API", "Error", err.Error())
rejectMessage(false)
return
}
rejectRequeue(rejectMessage, incoming, 3, "Lagoon API", err)
}
}
}
Expand Down

0 comments on commit 10b5e31

Please sign in to comment.