From 207e4636e92fd08d911cd24fc250d6a3fcd6e725 Mon Sep 17 00:00:00 2001 From: Blaize M Kaye Date: Mon, 2 Oct 2023 09:39:55 +1300 Subject: [PATCH] Change ack logic for items processing --- internal/handler/messaging.go | 44 +++++---- internal/handler/trivyProcessing.go | 111 +---------------------- internal/handler/trivyProcessing_test.go | 2 +- 3 files changed, 33 insertions(+), 124 deletions(-) diff --git a/internal/handler/messaging.go b/internal/handler/messaging.go index 462a07d..b35b57e 100644 --- a/internal/handler/messaging.go +++ b/internal/handler/messaging.go @@ -39,7 +39,9 @@ func NewMessaging(config mq.Config, lagoonAPI LagoonAPI, s3 S3, startupAttempts func (h *Messaging) processMessageQueue(message mq.Message) { var insights InsightsData var resource ResourceDestination + // set up defer to ack the message after we're done processing + defer func(message mq.Message) { // Ack to remove from queue err := message.Ack(false) @@ -48,6 +50,26 @@ func (h *Messaging) processMessageQueue(message mq.Message) { } }(message) + acknowledgeMessage := func(message mq.Message) func() { + return func() { + // Ack to remove from queue + err := message.Ack(false) + if err != nil { + fmt.Printf("Failed to acknowledge message: %s\n", err.Error()) + } + } + }(message) + + rejectMessage := func(message mq.Message) func(bool) { + return func(requeue bool) { + // Ack to remove from queue + err := message.Reject(requeue) + if err != nil { + fmt.Printf("Failed to requect message: %s\n", err.Error()) + } + } + }(message) + incoming := &InsightsMessage{} json.Unmarshal(message.Body(), incoming) @@ -56,6 +78,7 @@ func (h *Messaging) processMessageQueue(message mq.Message) { if incoming.Type == "direct.facts" || incoming.Type == "direct.problems" { resp := processItemsDirectly(message, h) log.Println(resp) + acknowledgeMessage() return } @@ -108,19 +131,7 @@ func (h *Messaging) processMessageQueue(message mq.Message) { if insights.InputType != "" { switch insights.InputType { case "sbom", "sbom-gz": - insights.InsightsType = Sbom - // We actually want to decompress the payload here so that they're all processed the same way - //decodeGzipString(incoming.BinaryPayload[0]) - //for n, d := range incoming.BinaryPayload { - // // let's try and decompress the binary payload here - // data, err := decodeGzipString(d) - // // TODO: I think there may be a potential issue here if the type isn't gzip, so should probably test - // if err != nil { - // - // } - //} - case "image", "image-gz": insights.InsightsType = Image case "direct": @@ -135,10 +146,7 @@ func (h *Messaging) processMessageQueue(message mq.Message) { if h.EnableDebug { log.Printf("[DEBUG] no payload was found") } - err := message.Reject(false) - if err != nil { - fmt.Printf("Unable to reject payload: %s\n", err.Error()) - } + rejectMessage(false) return } if len(incoming.Payload) != 0 { @@ -160,6 +168,7 @@ func (h *Messaging) processMessageQueue(message mq.Message) { err := h.sendToLagoonS3(incoming, insights, resource) if err != nil { log.Printf("Unable to send to S3: %s", err.Error()) + // TODO: do we reque here? Reject } } } @@ -176,7 +185,10 @@ func (h *Messaging) processMessageQueue(message mq.Message) { if err != nil { log.Printf("Unable to send to the api: %s", err.Error()) + rejectMessage(false) + return } } } + acknowledgeMessage() } diff --git a/internal/handler/trivyProcessing.go b/internal/handler/trivyProcessing.go index e02c0ce..8eaf072 100644 --- a/internal/handler/trivyProcessing.go +++ b/internal/handler/trivyProcessing.go @@ -8,53 +8,29 @@ import ( "github.com/Khan/genqlient/graphql" "github.com/aquasecurity/trivy/pkg/commands/artifact" "github.com/aquasecurity/trivy/pkg/flag" - "github.com/aquasecurity/trivy/pkg/sbom/cyclonedx" "github.com/aquasecurity/trivy/pkg/types" "github.com/uselagoon/lagoon/services/insights-handler/internal/lagoonclient" "io" "net/http" "os" - "os/exec" "strings" - "sync" "time" ) const problemSource = "insights-handler-grype" -type sbomQueueItem struct { - EnvironmentId int - Service string - SBOM cyclonedx.BOM -} - -type sbomQueue struct { - Items []sbomQueueItem - Lock sync.Mutex - GrypeLocation string - Messaging Messaging -} - -var queue = sbomQueue{ - Items: []sbomQueueItem{}, - Lock: sync.Mutex{}, -} - func SbomToProblems(apiClient graphql.Client, trivyRemoteAddress string, bomWriteDirectory string, environmentId int, service string, sbom cdx.BOM) error { - fmt.Println("AAA") rep, err := executeProcessingTrivy(trivyRemoteAddress, bomWriteDirectory, sbom) if err != nil { - return err + return fmt.Errorf("unable to execute trivy processing: %v", err.Error()) } - fmt.Println("BBB") problems, err := trivyReportToProblems(environmentId, problemSource, service, rep) if err != nil { - return err + return fmt.Errorf("unable to execute trivy processing - converting trivy report to problems: %v", err.Error()) } - fmt.Println("CCC") err = writeProblemsArrayToApi(apiClient, environmentId, problemSource, service, problems) if err != nil { - return err + return fmt.Errorf("unable to execute trivy processing- writing problems to api: %v", err.Error()) } return nil } @@ -85,7 +61,6 @@ func convertBOMToProblemsArray(environment int, source string, service string, b //here we need to ensure that there are actually vulnerabilities if v.Ratings != nil && len(*v.Ratings) > 0 { - //TODO: this is gross, fix it. p.Severity = lagoonclient.ProblemSeverityRating(strings.ToUpper(string((*v.Ratings)[0].Severity))) var sevScore float64 @@ -120,7 +95,7 @@ func writeProblemsArrayToApi(apiClient graphql.Client, environment int, source s return nil } -func testTrivyServerIsAlive(trivyRemoteAddress string) (bool, error) { +func IsTrivyServerIsAlive(trivyRemoteAddress string) (bool, error) { resp, err := http.Get(fmt.Sprintf("%v/healthz", trivyRemoteAddress)) if err != nil { return false, err @@ -253,7 +228,6 @@ func trivyReportToProblems(environment int, source string, service string, repor Data: "{}", AssociatedPackage: "", Description: v.Vulnerability.Description, - // Severity: } if len(v.Vulnerability.References) > 0 { @@ -269,80 +243,3 @@ func trivyReportToProblems(environment int, source string, service string, repor fmt.Println(ret) return ret, nil } - -func executeProcessing(grypeLocation string, bom cyclonedx.BOM) (cyclonedx.BOM, error) { - cmd := exec.Command(grypeLocation, "-o", "cyclonedx-json") - // Set up pipes for stdin, stdout, and stderr - stdin, err := cmd.StdinPipe() - if err != nil { - fmt.Println("Failed to create stdin pipe:", err) - return cyclonedx.BOM{}, err - } - - stdout, err := cmd.StdoutPipe() - if err != nil { - fmt.Println("Failed to create stdout pipe:", err) - return cyclonedx.BOM{}, err - } - defer stdout.Close() - - stderr, err := cmd.StderrPipe() - if err != nil { - fmt.Println("Failed to create stderr pipe:", err) - return cyclonedx.BOM{}, err - } - defer stderr.Close() - - sbomString, err := json.Marshal(bom) - if err != nil { - return cyclonedx.BOM{}, err - } - //let's push the sbom into the stdin - if err := cmd.Start(); err != nil { - fmt.Println("Failed to start command:", err) - return cyclonedx.BOM{}, err - } - - go func() { - defer stdin.Close() - _, err = io.WriteString(stdin, string(sbomString)) - }() - - if err != nil { - fmt.Println("Could not write to grype", err) - return cyclonedx.BOM{}, err - } - - //execute - // Read from stdout - output := make([]byte, 0) // Buffer to store the output - buf := make([]byte, 1024) // Read buffer - for { - n, err := stdout.Read(buf) - if err != nil && err != io.EOF { - fmt.Println("Failed to read from stdout:", err) - return cyclonedx.BOM{}, err - } - if n == 0 { - break - } - output = append(output, buf[:n]...) - } - - //fmt.Println("Output:", string(output)) - - // Wait for the command to finish - if err := cmd.Wait(); err != nil { - fmt.Println("Command execution failed:", err) - return cyclonedx.BOM{}, err - } - - var ret cyclonedx.BOM - err = json.Unmarshal(output, &ret) - if err != nil { - fmt.Println("Unable to unmarshal data") - return ret, err - } - - return ret, nil -} diff --git a/internal/handler/trivyProcessing_test.go b/internal/handler/trivyProcessing_test.go index 7776964..90baddd 100644 --- a/internal/handler/trivyProcessing_test.go +++ b/internal/handler/trivyProcessing_test.go @@ -135,7 +135,7 @@ func Test_executeProcessingTrivy(t *testing.T) { t.Run(tt.name, func(t *testing.T) { // check if a server is available to run the test - serverUp, err := testTrivyServerIsAlive(tt.args.trivyRemoteAddress) + serverUp, err := IsTrivyServerIsAlive(tt.args.trivyRemoteAddress) if err != nil { t.Errorf("Unable to connect to trivy server: %v", err.Error())