Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Add error status handling for ingestion and sync source #283

Merged
merged 5 commits into from
Oct 24, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 21 additions & 40 deletions pkg/controller/handlers/knowledge/knowledge.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,63 +249,44 @@ func (a *Handler) UpdateFileStatus(req router.Request, _ router.Response) error
return err
}

if run.Status.State.IsTerminal() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thedadams There is a chance that when run is not in terminal state and we are not immediately setting ws.Status.IngestionRunName = '', the controller will reprocess the logs from the run events, which could be a lot of computes.

So I think the best way to address this without a goroutine is to add a new field lastIngestionRunName to keep track it the run and update error based on that.

if err := updateIngestionError(req, ws, &run); err != nil {
return err
}

ws.Status.IngestionRunName = ""
return nil
}

_, progress, err := a.events.Watch(req.Ctx, ws.Namespace, events.WatchOptions{
Run: &run,
})
if err != nil {
return err
}

NotFinished, err := compileFileStatuses(req.Ctx, req.Client, ws, progress)
notFinished, err := compileFileStatuses(req.Ctx, req.Client, ws, progress)
if err != nil {
return err
}

// Fetch ingestion error from run.status in go routine so that we don't block
runName := ws.Status.IngestionRunName
go func() {
for {
stop, err := updateIngestionError(req, ws, runName)
if err != nil {
logger.Errorf("failed to update ingestion error: %s", err)
break
}

if stop {
break
}
time.Sleep(time.Second * 5)
}
}()

ws.Status.IngestionRunName = ""
ws.Status.NotFinished = NotFinished
ws.Status.NotFinished = notFinished
ws.Status.IngestionLastRunTime = metav1.Now()
return nil
}

func updateIngestionError(req router.Request, ws *v1.Workspace, runName string) (bool, error) {
var run v1.Run
if err := req.Get(uncached.Get(&run), ws.Namespace, runName); err != nil && !apierrors.IsNotFound(err) {
return false, err
} else if err == nil {
var knowledgeSet v1.KnowledgeSet
if err := req.Get(&knowledgeSet, ws.Namespace, ws.Spec.KnowledgeSetName); err != nil {
return false, err
}
if run.Status.Error != knowledgeSet.Status.IngestionError {
knowledgeSet.Status.IngestionError = run.Status.Error
if err := req.Client.Status().Update(req.Ctx, &knowledgeSet); err != nil {
return false, err
}
}
func updateIngestionError(req router.Request, ws *v1.Workspace, run *v1.Run) error {
var knowledgeSet v1.KnowledgeSet
if err := req.Get(&knowledgeSet, ws.Namespace, ws.Spec.KnowledgeSetName); err != nil {
return err
}

if run.Status.State.IsTerminal() {
return true, nil
if run.Status.Error != knowledgeSet.Status.IngestionError {
knowledgeSet.Status.IngestionError = run.Status.Error
if err := req.Client.Status().Update(req.Ctx, &knowledgeSet); err != nil {
return err
}
}

return false, nil
return nil
}

func compileFileStatuses(ctx context.Context, client kclient.Client, ws *v1.Workspace, progress <-chan types.Progress) (map[string]string, error) {
Expand Down