Skip to content

Commit

Permalink
Use controller framework to set ingestion error
Browse files Browse the repository at this point in the history
Signed-off-by: Donnie Adams <[email protected]>
  • Loading branch information
thedadams authored and StrongMonkey committed Oct 24, 2024
1 parent 35ab128 commit 9c95f09
Showing 1 changed file with 21 additions and 40 deletions.
61 changes: 21 additions & 40 deletions pkg/controller/handlers/knowledge/knowledge.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,63 +249,44 @@ func (a *Handler) UpdateFileStatus(req router.Request, _ router.Response) error
return err
}

if run.Status.State.IsTerminal() {
if err := updateIngestionError(req, ws, &run); err != nil {
return err
}

ws.Status.IngestionRunName = ""
return nil
}

_, progress, err := a.events.Watch(req.Ctx, ws.Namespace, events.WatchOptions{
Run: &run,
})
if err != nil {
return err
}

NotFinished, err := compileFileStatuses(req.Ctx, req.Client, ws, progress)
notFinished, err := compileFileStatuses(req.Ctx, req.Client, ws, progress)
if err != nil {
return err
}

// Fetch ingestion error from run.status in go routine so that we don't block
runName := ws.Status.IngestionRunName
go func() {
for {
stop, err := updateIngestionError(req, ws, runName)
if err != nil {
logger.Errorf("failed to update ingestion error: %s", err)
break
}

if stop {
break
}
time.Sleep(time.Second * 5)
}
}()

ws.Status.IngestionRunName = ""
ws.Status.NotFinished = NotFinished
ws.Status.NotFinished = notFinished
ws.Status.IngestionLastRunTime = metav1.Now()
return nil
}

func updateIngestionError(req router.Request, ws *v1.Workspace, runName string) (bool, error) {
var run v1.Run
if err := req.Get(uncached.Get(&run), ws.Namespace, runName); err != nil && !apierrors.IsNotFound(err) {
return false, err
} else if err == nil {
var knowledgeSet v1.KnowledgeSet
if err := req.Get(&knowledgeSet, ws.Namespace, ws.Spec.KnowledgeSetName); err != nil {
return false, err
}
if run.Status.Error != knowledgeSet.Status.IngestionError {
knowledgeSet.Status.IngestionError = run.Status.Error
if err := req.Client.Status().Update(req.Ctx, &knowledgeSet); err != nil {
return false, err
}
}
func updateIngestionError(req router.Request, ws *v1.Workspace, run *v1.Run) error {
var knowledgeSet v1.KnowledgeSet
if err := req.Get(&knowledgeSet, ws.Namespace, ws.Spec.KnowledgeSetName); err != nil {
return err
}

if run.Status.State.IsTerminal() {
return true, nil
if run.Status.Error != knowledgeSet.Status.IngestionError {
knowledgeSet.Status.IngestionError = run.Status.Error
if err := req.Client.Status().Update(req.Ctx, &knowledgeSet); err != nil {
return err
}
}

return false, nil
return nil
}

func compileFileStatuses(ctx context.Context, client kclient.Client, ws *v1.Workspace, progress <-chan types.Progress) (map[string]string, error) {
Expand Down

0 comments on commit 9c95f09

Please sign in to comment.