Skip to content

Commit

Permalink
Add lastIngestionRunName
Browse files Browse the repository at this point in the history
Signed-off-by: Daishan Peng <[email protected]>
  • Loading branch information
StrongMonkey committed Oct 24, 2024
1 parent 9c95f09 commit c5a9016
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 27 deletions.
48 changes: 32 additions & 16 deletions pkg/controller/handlers/knowledge/knowledge.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (a *Handler) IngestKnowledge(req router.Request, resp router.Response) erro
}

// The status handler will clean this up
if ws.Status.IngestionRunName != "" {
if ws.Status.CurrentIngestionRunName != "" {
return nil
}

Expand Down Expand Up @@ -205,7 +205,7 @@ func (a *Handler) IngestKnowledge(req router.Request, resp router.Response) erro
}

ws.Status.IngestionRunHash = hash
ws.Status.IngestionRunName = run.Run.Name
ws.Status.CurrentIngestionRunName = run.Run.Name
ws.Status.IngestionGeneration++
return req.Client.Status().Update(req.Ctx, ws)
}
Expand Down Expand Up @@ -234,30 +234,21 @@ func toStream(events <-chan types.Progress) io.ReadCloser {
func (a *Handler) UpdateFileStatus(req router.Request, _ router.Response) error {
ws := req.Object.(*v1.Workspace)

if ws.Status.IngestionRunName == "" {
if ws.Status.CurrentIngestionRunName == "" {
return nil
}

var run v1.Run
if err := req.Get(&run, ws.Namespace, ws.Status.IngestionRunName); apierrors.IsNotFound(err) {
if err := req.Get(uncached.Get(&run), ws.Namespace, ws.Status.IngestionRunName); apierrors.IsNotFound(err) {
if err := req.Get(&run, ws.Namespace, ws.Status.CurrentIngestionRunName); apierrors.IsNotFound(err) {
if err := req.Get(uncached.Get(&run), ws.Namespace, ws.Status.CurrentIngestionRunName); apierrors.IsNotFound(err) {
// Orphaned? User deleted the run? Solar flare?
ws.Status.IngestionRunName = ""
ws.Status.CurrentIngestionRunName = ""
}
return nil
} else if err != nil {
return err
}

if run.Status.State.IsTerminal() {
if err := updateIngestionError(req, ws, &run); err != nil {
return err
}

ws.Status.IngestionRunName = ""
return nil
}

_, progress, err := a.events.Watch(req.Ctx, ws.Namespace, events.WatchOptions{
Run: &run,
})
Expand All @@ -270,11 +261,36 @@ func (a *Handler) UpdateFileStatus(req router.Request, _ router.Response) error
return err
}

ws.Status.LastIngestionRunName = ws.Status.CurrentIngestionRunName
ws.Status.CurrentIngestionRunName = ""
ws.Status.NotFinished = notFinished
ws.Status.IngestionLastRunTime = metav1.Now()
return nil
}

func (a *Handler) UpdateIngestionError(req router.Request, _ router.Response) error {
ws := req.Object.(*v1.Workspace)

if ws.Status.LastIngestionRunName == "" {
return nil
}

var run v1.Run
if err := req.Get(&run, ws.Namespace, ws.Status.LastIngestionRunName); apierrors.IsNotFound(err) {
return nil
} else if err != nil {
return err
}

if run.Status.State.IsTerminal() {
if err := updateIngestionError(req, ws, &run); err != nil {
return err
}
}

return nil
}

func updateIngestionError(req router.Request, ws *v1.Workspace, run *v1.Run) error {
var knowledgeSet v1.KnowledgeSet
if err := req.Get(&knowledgeSet, ws.Namespace, ws.Spec.KnowledgeSetName); err != nil {
Expand Down Expand Up @@ -323,7 +339,7 @@ func compileFileStatuses(ctx context.Context, client kclient.Client, ws *v1.Work
}
final[file.Name] = ingestionStatus.Status

if ingestionStatus.Status == "finished" {
if ingestionStatus.Status == "finished" || ingestionStatus.Status == "skipped" {
delete(final, file.Name)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/handlers/knowledgeset/knowledgeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (h *Handler) GenerateDataDescription(req router.Request, resp router.Respon
}

// Ignore if running
if ws.Status.IngestionRunName != "" {
if ws.Status.CurrentIngestionRunName != "" {
return nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/controller/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func (c *Controller) setupRoutes() error {
root.Type(&v1.Workspace{}).HandlerFunc(workspace.CreateWorkspace)
root.Type(&v1.Workspace{}).HandlerFunc(knowledge.IngestKnowledge)
root.Type(&v1.Workspace{}).HandlerFunc(knowledge.UpdateFileStatus)
root.Type(&v1.Workspace{}).HandlerFunc(knowledge.UpdateIngestionError)

// KnowledgeSets
root.Type(&v1.KnowledgeSet{}).HandlerFunc(cleanup.Cleanup)
Expand Down
21 changes: 11 additions & 10 deletions pkg/storage/apis/otto.gptscript.ai/v1/workspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,17 @@ type WorkspaceSpec struct {
}

type WorkspaceStatus struct {
WorkspaceID string `json:"workspaceID,omitempty"`
IngestionGeneration int64 `json:"ingestionGeneration,omitempty"`
IngestionRunHash string `json:"ingestionRunHash,omitempty"`
IngestionRunName string `json:"ingestionRunName,omitempty"`
IngestionLastRunTime metav1.Time `json:"ingestionLastRunTime,omitempty"`
LastNotFinished map[string]string `json:"lastNotFinished,omitempty"`
NotFinished map[string]string `json:"notFinished,omitempty"`
RetryCount int `json:"retryCount,omitempty"`
PendingApproval []string `json:"pendingApproval,omitempty"`
PendingRejections []string `json:"pendingRejections,omitempty"`
WorkspaceID string `json:"workspaceID,omitempty"`
IngestionGeneration int64 `json:"ingestionGeneration,omitempty"`
IngestionRunHash string `json:"ingestionRunHash,omitempty"`
CurrentIngestionRunName string `json:"currentIngestionRunName,omitempty"`
LastIngestionRunName string `json:"lastIngestionRunName,omitempty"`
IngestionLastRunTime metav1.Time `json:"ingestionLastRunTime,omitempty"`
LastNotFinished map[string]string `json:"lastNotFinished,omitempty"`
NotFinished map[string]string `json:"notFinished,omitempty"`
RetryCount int `json:"retryCount,omitempty"`
PendingApproval []string `json:"pendingApproval,omitempty"`
PendingRejections []string `json:"pendingRejections,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down

0 comments on commit c5a9016

Please sign in to comment.