diff --git a/internal/response-consumer/handler.go b/internal/response-consumer/handler.go index cd198e29..509d5437 100644 --- a/internal/response-consumer/handler.go +++ b/internal/response-consumer/handler.go @@ -115,7 +115,9 @@ func (this *handler) onMessage(ctx context.Context, msg *k.Message) { Events: eventsSerialized, } - if updateResult := baseQuery.Select("status", "events").Updates(toUpdate); updateResult.Error != nil { + // Only update if the run is not marked as complete + updateResult := baseQuery.Where("status not in ?", []string{db.RunStatusSuccess, db.RunStatusFailure}).Select("status", "events").Updates(toUpdate) + if updateResult.Error != nil { utils.GetLogFromContext(ctx).Errorw("Error updating run in db", "error", updateResult.Error) return updateResult.Error } else { @@ -214,8 +216,17 @@ func satUpdateRecord(ctx context.Context, tx *gorm.DB, responseFull bool, toUpda } func createRecord(ctx context.Context, tx *gorm.DB, toCreate []db.RunHost) error { + + successOrFailure := clause.OrConditions{Exprs: []clause.Expression{ + clause.Eq{Column: "run_hosts.status", Value: db.RunStatusSuccess}, + clause.Eq{Column: "run_hosts.status", Value: db.RunStatusFailure}, + }} + + notMarkedAsComplete := clause.Where{Exprs: []clause.Expression{clause.Not(successOrFailure)}} + createResult := tx.Model(db.RunHost{}). Clauses(clause.OnConflict{ + Where: notMarkedAsComplete, Columns: []clause.Column{{Name: "run_id"}, {Name: "host"}}, DoUpdates: clause.AssignmentColumns([]string{"status", "log"}), }). diff --git a/internal/response-consumer/handler_test.go b/internal/response-consumer/handler_test.go index f6f27ee5..42c7bcd1 100644 --- a/internal/response-consumer/handler_test.go +++ b/internal/response-consumer/handler_test.go @@ -203,6 +203,74 @@ var _ = Describe("handler", func() { checkHost(data.ID, "success", nil, "", nil) }) + It("successful runner event - ignore out-of-order updates", func() { + var data = test.NewRun(orgId()) + Expect(db().Create(&data).Error).ToNot(HaveOccurred()) + + events := createRunnerEvents( + messageModel.EventExecutorOnStart, + "playbook_on_start", + "playbook_on_play_start", + "playbook_on_task_start", + "runner_on_start", + "runner_on_ok", + "playbook_on_stats", + ) + + // process the complete set of events + instance.onMessage(test.TestContext(), newRunnerResponseMessage(events, data.CorrelationID)) + + // verify that the status is marked as complete/success + run := fetchRun(data.ID) + Expect(run.Status).To(Equal("success")) + checkHost(data.ID, "success", nil, "", nil) + + // remove the last element from the events slice + incompleteListOfEvents := (*events)[:len(*events)-1] + + // process the incomplete set of events after processing the complete set of events + instance.onMessage(test.TestContext(), newRunnerResponseMessage(&incompleteListOfEvents, data.CorrelationID)) + + // verify that processing a incomplete set of events does not overwrite the complete set of events + run = fetchRun(data.ID) + Expect(run.Status).To(Equal("success")) + checkHost(data.ID, "success", nil, "", nil) + }) + + It("failed runner event - ignore out-of-order updates", func() { + var data = test.NewRun(orgId()) + Expect(db().Create(&data).Error).ToNot(HaveOccurred()) + + events := createRunnerEvents( + messageModel.EventExecutorOnStart, + "playbook_on_start", + "playbook_on_play_start", + "playbook_on_task_start", + "runner_on_start", + "runner_on_failed", + "playbook_on_stats", + ) + + // process the complete set of events + instance.onMessage(test.TestContext(), newRunnerResponseMessage(events, data.CorrelationID)) + + // verify that the status is marked as complete/failure + run := fetchRun(data.ID) + Expect(run.Status).To(Equal("failure")) + checkHost(data.ID, "failure", nil, "", nil) + + // remove the last element from the events slice + incompleteListOfEvents := (*events)[:len(*events)-1] + + // process the incomplete set of events after processing the complete set of events + instance.onMessage(test.TestContext(), newRunnerResponseMessage(&incompleteListOfEvents, data.CorrelationID)) + + // verify that processing a incomplete set of events does not overwrite the complete set of events + run = fetchRun(data.ID) + Expect(run.Status).To(Equal("failure")) + checkHost(data.ID, "failure", nil, "", nil) + }) + It("updates the run status based on failure runner events", func() { var data = test.NewRun(orgId()) Expect(db().Create(&data).Error).ToNot(HaveOccurred()) @@ -817,6 +885,39 @@ var _ = Describe("handler", func() { checkHost(data.ID, "success", utils.IntRef(2), "\\n\\u2026\\nsecond console log\nthird console log", &inventoryId) }) + It("success status not overridden by out-of-order event", func() { + var data = test.NewRun(orgId()) + data.ResponseFull = false + + Expect(db().Create(&data).Error).ToNot(HaveOccurred()) + + inventoryId := uuid.New() + var hostData = test.NewRunHost(data.ID, "running", &inventoryId) + inventoryIdString := inventoryId.String() + + Expect(db().Create(&hostData).Error).ToNot(HaveOccurred()) + + events := buildSatEvents( + data.CorrelationID, + satPlaybookRunUpdateEvent(1, inventoryIdString, "second console log"), + satPlaybookRunFinishedEvent(inventoryIdString, "success"), + satPlaybookRunCompletedEvent("success"), + ) + + instance.onMessage(test.TestContext(), newSatResponseMessage(events, data.CorrelationID)) + + events = buildSatEvents( + data.CorrelationID, + satPlaybookRunUpdateEvent(0, inventoryIdString, "first console log\n"), + ) + + instance.onMessage(test.TestContext(), newSatResponseMessage(events, data.CorrelationID)) + + run := fetchRun(data.ID) + + Expect(run.Status).To(Equal("success")) + }) + It("failed status not overridden by out-of-order event", func() { var data = test.NewRun(orgId()) data.ResponseFull = false