From c5a9016e023979c9eb9db866e195a02fd38241e6 Mon Sep 17 00:00:00 2001 From: Daishan Peng Date: Thu, 24 Oct 2024 09:39:23 -0700 Subject: [PATCH] Add lastIngestionRunName Signed-off-by: Daishan Peng --- .../handlers/knowledge/knowledge.go | 48 ++++++++++++------- .../handlers/knowledgeset/knowledgeset.go | 2 +- pkg/controller/routes.go | 1 + .../apis/otto.gptscript.ai/v1/workspace.go | 21 ++++---- 4 files changed, 45 insertions(+), 27 deletions(-) diff --git a/pkg/controller/handlers/knowledge/knowledge.go b/pkg/controller/handlers/knowledge/knowledge.go index a5067ef6..9e5be329 100644 --- a/pkg/controller/handlers/knowledge/knowledge.go +++ b/pkg/controller/handlers/knowledge/knowledge.go @@ -79,7 +79,7 @@ func (a *Handler) IngestKnowledge(req router.Request, resp router.Response) erro } // The status handler will clean this up - if ws.Status.IngestionRunName != "" { + if ws.Status.CurrentIngestionRunName != "" { return nil } @@ -205,7 +205,7 @@ func (a *Handler) IngestKnowledge(req router.Request, resp router.Response) erro } ws.Status.IngestionRunHash = hash - ws.Status.IngestionRunName = run.Run.Name + ws.Status.CurrentIngestionRunName = run.Run.Name ws.Status.IngestionGeneration++ return req.Client.Status().Update(req.Ctx, ws) } @@ -234,30 +234,21 @@ func toStream(events <-chan types.Progress) io.ReadCloser { func (a *Handler) UpdateFileStatus(req router.Request, _ router.Response) error { ws := req.Object.(*v1.Workspace) - if ws.Status.IngestionRunName == "" { + if ws.Status.CurrentIngestionRunName == "" { return nil } var run v1.Run - if err := req.Get(&run, ws.Namespace, ws.Status.IngestionRunName); apierrors.IsNotFound(err) { - if err := req.Get(uncached.Get(&run), ws.Namespace, ws.Status.IngestionRunName); apierrors.IsNotFound(err) { + if err := req.Get(&run, ws.Namespace, ws.Status.CurrentIngestionRunName); apierrors.IsNotFound(err) { + if err := req.Get(uncached.Get(&run), ws.Namespace, ws.Status.CurrentIngestionRunName); apierrors.IsNotFound(err) { // Orphaned? User deleted the run? Solar flare? - ws.Status.IngestionRunName = "" + ws.Status.CurrentIngestionRunName = "" } return nil } else if err != nil { return err } - if run.Status.State.IsTerminal() { - if err := updateIngestionError(req, ws, &run); err != nil { - return err - } - - ws.Status.IngestionRunName = "" - return nil - } - _, progress, err := a.events.Watch(req.Ctx, ws.Namespace, events.WatchOptions{ Run: &run, }) @@ -270,11 +261,36 @@ func (a *Handler) UpdateFileStatus(req router.Request, _ router.Response) error return err } + ws.Status.LastIngestionRunName = ws.Status.CurrentIngestionRunName + ws.Status.CurrentIngestionRunName = "" ws.Status.NotFinished = notFinished ws.Status.IngestionLastRunTime = metav1.Now() return nil } +func (a *Handler) UpdateIngestionError(req router.Request, _ router.Response) error { + ws := req.Object.(*v1.Workspace) + + if ws.Status.LastIngestionRunName == "" { + return nil + } + + var run v1.Run + if err := req.Get(&run, ws.Namespace, ws.Status.LastIngestionRunName); apierrors.IsNotFound(err) { + return nil + } else if err != nil { + return err + } + + if run.Status.State.IsTerminal() { + if err := updateIngestionError(req, ws, &run); err != nil { + return err + } + } + + return nil +} + func updateIngestionError(req router.Request, ws *v1.Workspace, run *v1.Run) error { var knowledgeSet v1.KnowledgeSet if err := req.Get(&knowledgeSet, ws.Namespace, ws.Spec.KnowledgeSetName); err != nil { @@ -323,7 +339,7 @@ func compileFileStatuses(ctx context.Context, client kclient.Client, ws *v1.Work } final[file.Name] = ingestionStatus.Status - if ingestionStatus.Status == "finished" { + if ingestionStatus.Status == "finished" || ingestionStatus.Status == "skipped" { delete(final, file.Name) } diff --git a/pkg/controller/handlers/knowledgeset/knowledgeset.go b/pkg/controller/handlers/knowledgeset/knowledgeset.go index 04e19276..092931c3 100644 --- a/pkg/controller/handlers/knowledgeset/knowledgeset.go +++ b/pkg/controller/handlers/knowledgeset/knowledgeset.go @@ -42,7 +42,7 @@ func (h *Handler) GenerateDataDescription(req router.Request, resp router.Respon } // Ignore if running - if ws.Status.IngestionRunName != "" { + if ws.Status.CurrentIngestionRunName != "" { return nil } diff --git a/pkg/controller/routes.go b/pkg/controller/routes.go index 7aa5ca64..e8f80e8d 100644 --- a/pkg/controller/routes.go +++ b/pkg/controller/routes.go @@ -88,6 +88,7 @@ func (c *Controller) setupRoutes() error { root.Type(&v1.Workspace{}).HandlerFunc(workspace.CreateWorkspace) root.Type(&v1.Workspace{}).HandlerFunc(knowledge.IngestKnowledge) root.Type(&v1.Workspace{}).HandlerFunc(knowledge.UpdateFileStatus) + root.Type(&v1.Workspace{}).HandlerFunc(knowledge.UpdateIngestionError) // KnowledgeSets root.Type(&v1.KnowledgeSet{}).HandlerFunc(cleanup.Cleanup) diff --git a/pkg/storage/apis/otto.gptscript.ai/v1/workspace.go b/pkg/storage/apis/otto.gptscript.ai/v1/workspace.go index d55f5a98..5f1f655d 100644 --- a/pkg/storage/apis/otto.gptscript.ai/v1/workspace.go +++ b/pkg/storage/apis/otto.gptscript.ai/v1/workspace.go @@ -63,16 +63,17 @@ type WorkspaceSpec struct { } type WorkspaceStatus struct { - WorkspaceID string `json:"workspaceID,omitempty"` - IngestionGeneration int64 `json:"ingestionGeneration,omitempty"` - IngestionRunHash string `json:"ingestionRunHash,omitempty"` - IngestionRunName string `json:"ingestionRunName,omitempty"` - IngestionLastRunTime metav1.Time `json:"ingestionLastRunTime,omitempty"` - LastNotFinished map[string]string `json:"lastNotFinished,omitempty"` - NotFinished map[string]string `json:"notFinished,omitempty"` - RetryCount int `json:"retryCount,omitempty"` - PendingApproval []string `json:"pendingApproval,omitempty"` - PendingRejections []string `json:"pendingRejections,omitempty"` + WorkspaceID string `json:"workspaceID,omitempty"` + IngestionGeneration int64 `json:"ingestionGeneration,omitempty"` + IngestionRunHash string `json:"ingestionRunHash,omitempty"` + CurrentIngestionRunName string `json:"currentIngestionRunName,omitempty"` + LastIngestionRunName string `json:"lastIngestionRunName,omitempty"` + IngestionLastRunTime metav1.Time `json:"ingestionLastRunTime,omitempty"` + LastNotFinished map[string]string `json:"lastNotFinished,omitempty"` + NotFinished map[string]string `json:"notFinished,omitempty"` + RetryCount int `json:"retryCount,omitempty"` + PendingApproval []string `json:"pendingApproval,omitempty"` + PendingRejections []string `json:"pendingRejections,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object