From 0103fd809e434fc4f2291a6e1ffff3996c3eb643 Mon Sep 17 00:00:00 2001 From: dehort Date: Thu, 5 Sep 2024 13:09:06 -0500 Subject: [PATCH] Filter events in the validator in the case of large uploads (#384) --- deploy/clowdapp.yaml | 9 ++- internal/validator/handler.go | 121 ++++++++++++++++++++++++++-------- 2 files changed, 98 insertions(+), 32 deletions(-) diff --git a/deploy/clowdapp.yaml b/deploy/clowdapp.yaml index 4b300f3e..e83b155b 100644 --- a/deploy/clowdapp.yaml +++ b/deploy/clowdapp.yaml @@ -150,9 +150,6 @@ objects: - name: RBAC_HOST value: ${RBAC_HOST} - - name: STORAGE_MAX_CONCURRENCY - value: ${STORAGE_MAX_CONCURRENCY} - - name: TENANT_TRANSLATOR_IMPL value: ${TENANT_TRANSLATOR_IMPL} - name: TENANT_TRANSLATOR_HOST @@ -265,6 +262,10 @@ objects: value: ${LOG_LEVEL} - name: DB_SSLMODE value: ${DB_SSLMODE} + - name: STORAGE_MAX_CONCURRENCY + value: ${STORAGE_MAX_CONCURRENCY} + - name: ARTIFACT_MAX_SIZE + value: ${ARTIFACT_MAX_SIZE} resources: limits: cpu: ${CPU_LIMIT} @@ -341,6 +342,8 @@ parameters: - name: STORAGE_MAX_CONCURRENCY value: "5" +- name: ARTIFACT_MAX_SIZE + value: '3145728' - name: RETURN_URL value: TBD diff --git a/internal/validator/handler.go b/internal/validator/handler.go index 8fee79ac..63d7798d 100644 --- a/internal/validator/handler.go +++ b/internal/validator/handler.go @@ -163,23 +163,86 @@ func (this *handler) validateContent(ctx context.Context, requestType string, da events = &messageModel.ValidatedMessages{} events.PlaybookType = requestType + log := utils.GetLogFromContext(ctx) + + // FIXME: make this configurable + truncateData := len(data) >= 1*1024*1024 + if truncateData { + log.Debugw("Payload too big. Truncating payload.") + } + + eventTypeCount := make(map[string]int) + hostRunningPlaybook := make(map[string]int) + lines := strings.Split(string(data), "\n") for _, line := range lines { if len(strings.TrimSpace(line)) == 0 { continue } + if requestType == playbookSatPayloadHeaderValue { - err = validateWithSchema(ctx, this.schemas[1], true, line, events) + validatedEvent, err := validateSatRunResponseWithSchema(ctx, this.schemas[1], line) + if err == nil { - err = validateSatHostUUID(line) + err = validateSatHostUUID(validatedEvent) + } + + if err != nil { + return nil, err + } + + var storeEvent bool = true + + if truncateData { + + storeEvent = false + + if validatedEvent.Type == "playbook_run_completed" || validatedEvent.Type == "playbook_run_finished" { + storeEvent = true + log.Debugw("storing run complete/finished for host " /*, *validatedEvent.Host*/) + } + + if validatedEvent.Type == "playbook_run_update" { + + if validatedEvent.Host != nil { + if _, ok := hostRunningPlaybook[*validatedEvent.Host]; !ok { + storeEvent = true + log.Debugw("storing run update for host ", *validatedEvent.Host) + } + + hostRunningPlaybook[*validatedEvent.Host]++ + } + } } + + if storeEvent { + log.Debugw("storing event ", validatedEvent.Type) + events.PlaybookSat = append(events.PlaybookSat, *validatedEvent) + } + } else { - err = validateWithSchema(ctx, this.schemas[0], false, line, events) - } + validatedEvent, err := validateRunResponseWithSchema(ctx, this.schemas[0], line) + if err != nil { + return nil, err + } - if err != nil { - return nil, err + var storeEvent bool = true + + if truncateData { + storeEvent = false + + // FIXME: hardcoded :( + if validatedEvent.Event == "executor_on_start" || validatedEvent.Event == "playbook_on_stats" || validatedEvent.Event == "runner_on_failed" || validatedEvent.Event == "executor_on_failed" { + eventTypeCount[validatedEvent.Event]++ + storeEvent = eventTypeCount[validatedEvent.Event] <= 1 + } + } + + if storeEvent { + log.Debugw("storing event ", validatedEvent.Event) + events.Playbook = append(events.Playbook, *validatedEvent) + } } } @@ -191,14 +254,7 @@ func (this *handler) validateContent(ctx context.Context, requestType string, da return events, nil } -func validateSatHostUUID(line string) (err error) { - event := &messageModel.PlaybookSatRunResponseMessageYamlEventsElem{} - err = json.Unmarshal([]byte(line), &event) - - if err != nil { - return err - } - +func validateSatHostUUID(event *messageModel.PlaybookSatRunResponseMessageYamlEventsElem) (err error) { if event.Host != nil { _, err = uuid.Parse(*event.Host) if err != nil { @@ -208,33 +264,40 @@ func validateSatHostUUID(line string) (err error) { return nil } -func validateWithSchema(ctx context.Context, schema *jsonschema.Schema, rhcsatRequest bool, line string, events *messageModel.ValidatedMessages) (err error) { +func validateRunResponseWithSchema(ctx context.Context, schema *jsonschema.Schema, line string) (validatedEvent *messageModel.PlaybookRunResponseMessageYamlEventsElem, err error) { + errors, parserError := schema.ValidateBytes(ctx, []byte(line)) if parserError != nil { - return parserError + return nil, parserError } else if len(errors) > 0 { - return errors[0] + return nil, errors[0] } - if rhcsatRequest { - event := &messageModel.PlaybookSatRunResponseMessageYamlEventsElem{} - err = json.Unmarshal([]byte(line), &event) - if err != nil { - return err - } + event := &messageModel.PlaybookRunResponseMessageYamlEventsElem{} + err = json.Unmarshal([]byte(line), &event) + if err != nil { + return nil, err + } - events.PlaybookSat = append(events.PlaybookSat, *event) - return + return event, nil +} + +func validateSatRunResponseWithSchema(ctx context.Context, schema *jsonschema.Schema, line string) (validatedEvent *messageModel.PlaybookSatRunResponseMessageYamlEventsElem, err error) { + + errors, parserError := schema.ValidateBytes(ctx, []byte(line)) + if parserError != nil { + return nil, parserError + } else if len(errors) > 0 { + return nil, errors[0] } - event := &messageModel.PlaybookRunResponseMessageYamlEventsElem{} + event := &messageModel.PlaybookSatRunResponseMessageYamlEventsElem{} err = json.Unmarshal([]byte(line), &event) if err != nil { - return err + return nil, err } - events.Playbook = append(events.Playbook, *event) - return + return event, nil } func (this *handler) validationFailed(ctx context.Context, err error, requestType string, request *messageModel.IngressValidationRequest) {