From 7c68eb6d9709774bdec4e688b6778d0bcac0d647 Mon Sep 17 00:00:00 2001 From: Daishan Peng Date: Wed, 23 Oct 2024 02:27:27 -0700 Subject: [PATCH 1/5] Fix: Add error status handling for ingestion and sync source Signed-off-by: Daishan Peng --- apiclient/types/agent.go | 10 +++ apiclient/types/zz_generated.deepcopy.go | 36 +++++++++ pkg/api/handlers/agent.go | 47 +++++++++-- pkg/api/router/router.go | 1 + pkg/controller/handlers/agents/agents.go | 2 + .../handlers/knowledge/knowledge.go | 67 ++++++++++++++- pkg/controller/handlers/threads/threads.go | 2 +- .../handlers/uploads/remoteknowledgesource.go | 4 + .../apis/otto.gptscript.ai/v1/knowledgeset.go | 22 +++++ .../apis/otto.gptscript.ai/v1/workspace.go | 21 ++--- .../v1/zz_generated.deepcopy.go | 4 +- .../openapi/generated/openapi_generated.go | 81 ++++++++++++++++--- .../knowledge/AgentKnowledgePanel.tsx | 28 +++++++ .../components/knowledge/IngestionStatus.tsx | 16 +++- .../knowledge/RemoteKnowledgeSourceStatus.tsx | 15 ++-- .../components/knowledge/file/FileModal.tsx | 7 +- .../knowledge/notion/NotionModal.tsx | 7 +- .../knowledge/onedrive/AddLinkModal.tsx | 1 + .../knowledge/onedrive/OneDriveModal.tsx | 9 ++- .../knowledge/website/WebsiteModal.tsx | 7 +- ui/admin/app/lib/model/agents.ts | 6 ++ 21 files changed, 350 insertions(+), 43 deletions(-) diff --git a/apiclient/types/agent.go b/apiclient/types/agent.go index 110f5ba2..d765e46f 100644 --- a/apiclient/types/agent.go +++ b/apiclient/types/agent.go @@ -12,6 +12,7 @@ type Agent struct { Metadata AgentManifest AgentExternalStatus + AgentKnowledgeSetStatus } type AgentList List[Agent] @@ -46,3 +47,12 @@ func (m AgentManifest) GetParams() *openapi3.Schema { type AgentExternalStatus struct { RefNameAssigned bool `json:"refNameAssigned,omitempty"` } + +type AgentKnowledgeSetStatus struct { + KnowledgeSetStatues []KnowledgeSetStatus `json:"knowledgeSetStatues,omitempty"` +} + +type KnowledgeSetStatus struct { + KnowledgeSetName string `json:"knowledgeSetName,omitempty"` + Error string `json:"error,omitempty"` +} diff --git a/apiclient/types/zz_generated.deepcopy.go b/apiclient/types/zz_generated.deepcopy.go index 686e4b22..7cc01e69 100644 --- a/apiclient/types/zz_generated.deepcopy.go +++ b/apiclient/types/zz_generated.deepcopy.go @@ -12,6 +12,7 @@ func (in *Agent) DeepCopyInto(out *Agent) { in.Metadata.DeepCopyInto(&out.Metadata) in.AgentManifest.DeepCopyInto(&out.AgentManifest) out.AgentExternalStatus = in.AgentExternalStatus + in.AgentKnowledgeSetStatus.DeepCopyInto(&out.AgentKnowledgeSetStatus) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Agent. @@ -39,6 +40,26 @@ func (in *AgentExternalStatus) DeepCopy() *AgentExternalStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AgentKnowledgeSetStatus) DeepCopyInto(out *AgentKnowledgeSetStatus) { + *out = *in + if in.KnowledgeSetStatues != nil { + in, out := &in.KnowledgeSetStatues, &out.KnowledgeSetStatues + *out = make([]KnowledgeSetStatus, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AgentKnowledgeSetStatus. +func (in *AgentKnowledgeSetStatus) DeepCopy() *AgentKnowledgeSetStatus { + if in == nil { + return nil + } + out := new(AgentKnowledgeSetStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *AgentList) DeepCopyInto(out *AgentList) { *out = *in @@ -440,6 +461,21 @@ func (in *KnowledgeFileList) DeepCopy() *KnowledgeFileList { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *KnowledgeSetStatus) DeepCopyInto(out *KnowledgeSetStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KnowledgeSetStatus. +func (in *KnowledgeSetStatus) DeepCopy() *KnowledgeSetStatus { + if in == nil { + return nil + } + out := new(KnowledgeSetStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *LinkState) DeepCopyInto(out *LinkState) { *out = *in diff --git a/pkg/api/handlers/agent.go b/pkg/api/handlers/agent.go index 950fa9be..8666ed0d 100644 --- a/pkg/api/handlers/agent.go +++ b/pkg/api/handlers/agent.go @@ -11,8 +11,10 @@ import ( "github.com/otto8-ai/otto8/pkg/api/server" "github.com/otto8-ai/otto8/pkg/render" v1 "github.com/otto8-ai/otto8/pkg/storage/apis/otto.gptscript.ai/v1" + "github.com/otto8-ai/otto8/pkg/storage/selectors" "github.com/otto8-ai/otto8/pkg/system" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" kclient "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -87,7 +89,7 @@ func (a *AgentHandler) Create(req api.Context) error { return req.Write(convertAgent(agent, server.GetURLPrefix(req))) } -func convertAgent(agent v1.Agent, prefix string) *types.Agent { +func convertAgent(agent v1.Agent, prefix string, knowledgeSets ...v1.KnowledgeSet) *types.Agent { var links []string if prefix != "" { refName := agent.Name @@ -96,10 +98,21 @@ func convertAgent(agent v1.Agent, prefix string) *types.Agent { } links = []string{"invoke", prefix + "/invoke/" + refName} } + + var knowledgeSetsStatus types.AgentKnowledgeSetStatus + + for _, knowledge := range knowledgeSets { + knowledgeSetsStatus.KnowledgeSetStatues = append(knowledgeSetsStatus.KnowledgeSetStatues, types.KnowledgeSetStatus{ + Error: knowledge.Status.IngestionError, + KnowledgeSetName: knowledge.Name, + }) + } + return &types.Agent{ - Metadata: MetadataFrom(&agent, links...), - AgentManifest: agent.Spec.Manifest, - AgentExternalStatus: agent.Status.External, + Metadata: MetadataFrom(&agent, links...), + AgentManifest: agent.Spec.Manifest, + AgentExternalStatus: agent.Status.External, + AgentKnowledgeSetStatus: knowledgeSetsStatus, } } @@ -109,7 +122,12 @@ func (a *AgentHandler) ByID(req api.Context) error { return err } - return req.Write(convertAgent(agent, server.GetURLPrefix(req))) + knowledgeSets, err := getKnowledgeSetsFromAgent(req, agent) + if err != nil { + return err + } + + return req.Write(convertAgent(agent, server.GetURLPrefix(req), knowledgeSets...)) } func (a *AgentHandler) List(req api.Context) error { @@ -120,12 +138,29 @@ func (a *AgentHandler) List(req api.Context) error { var resp types.AgentList for _, agent := range agentList.Items { - resp.Items = append(resp.Items, *convertAgent(agent, server.GetURLPrefix(req))) + knowledgeSets, err := getKnowledgeSetsFromAgent(req, agent) + if err != nil { + return err + } + resp.Items = append(resp.Items, *convertAgent(agent, server.GetURLPrefix(req), knowledgeSets...)) } return req.Write(resp) } +func getKnowledgeSetsFromAgent(req api.Context, agent v1.Agent) ([]v1.KnowledgeSet, error) { + var knowledgeSets v1.KnowledgeSetList + if err := req.Storage.List(req.Context(), &knowledgeSets, &kclient.ListOptions{ + FieldSelector: fields.SelectorFromSet(selectors.RemoveEmpty(map[string]string{ + "spec.agentName": agent.Name, + })), + Namespace: agent.Namespace, + }); err != nil { + return nil, err + } + return knowledgeSets.Items, nil +} + func (a *AgentHandler) Files(req api.Context) error { var ( id = req.PathValue("id") diff --git a/pkg/api/router/router.go b/pkg/api/router/router.go index f43d4c2c..73a33541 100644 --- a/pkg/api/router/router.go +++ b/pkg/api/router/router.go @@ -41,6 +41,7 @@ func Router(services *services.Services) (http.Handler, error) { mux.HandleFunc("PUT /api/agents/{id}/knowledge/{file_id}/approve", agents.ApproveKnowledgeFile) mux.HandleFunc("DELETE /api/agents/{id}/knowledge/{file...}", agents.DeleteKnowledge) + // Remote Knowledge Sources mux.HandleFunc("POST /api/agents/{agent_id}/remote-knowledge-sources", agents.CreateRemoteKnowledgeSource) mux.HandleFunc("GET /api/agents/{agent_id}/remote-knowledge-sources", agents.GetRemoteKnowledgeSources) mux.HandleFunc("PATCH /api/agents/{agent_id}/remote-knowledge-sources/{id}", agents.ReSyncRemoteKnowledgeSource) diff --git a/pkg/controller/handlers/agents/agents.go b/pkg/controller/handlers/agents/agents.go index 160b756c..fa8b374b 100644 --- a/pkg/controller/handlers/agents/agents.go +++ b/pkg/controller/handlers/agents/agents.go @@ -4,6 +4,7 @@ import ( "github.com/acorn-io/baaah/pkg/router" v1 "github.com/otto8-ai/otto8/pkg/storage/apis/otto.gptscript.ai/v1" "github.com/otto8-ai/otto8/pkg/system" + "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -27,6 +28,7 @@ func WorkspaceObjects(req router.Request, _ router.Response) error { } if len(agent.Status.KnowledgeSetNames) == 0 { + logrus.Info("Created default knowledge set") ws := &v1.KnowledgeSet{ ObjectMeta: metav1.ObjectMeta{ Namespace: req.Namespace, diff --git a/pkg/controller/handlers/knowledge/knowledge.go b/pkg/controller/handlers/knowledge/knowledge.go index cf3e5b9b..b1597380 100644 --- a/pkg/controller/handlers/knowledge/knowledge.go +++ b/pkg/controller/handlers/knowledge/knowledge.go @@ -192,10 +192,22 @@ func (a *Handler) IngestKnowledge(req router.Request, resp router.Response) erro return err } + // Reset knowledge set error before ingestion + var knowledgeSet v1.KnowledgeSet + if err := req.Get(&knowledgeSet, ws.Namespace, ws.Spec.KnowledgeSetName); err != nil { + return err + } + if knowledgeSet.Status.IngestionError != "" { + knowledgeSet.Status.IngestionError = "" + if err := req.Client.Status().Update(req.Ctx, &knowledgeSet); err != nil { + return err + } + } + ws.Status.IngestionRunHash = hash ws.Status.IngestionRunName = run.Run.Name ws.Status.IngestionGeneration++ - return req.Client.Status().Update(req.Ctx, ws) + return nil } return nil @@ -249,19 +261,59 @@ func (a *Handler) UpdateFileStatus(req router.Request, _ router.Response) error return err } - // All good + // Fetch ingestion error from run.status in go routine so that we don't block + runName := ws.Status.IngestionRunName + go func() { + for { + stop, err := updateIngestionError(req, ws, runName) + if err != nil { + logger.Errorf("failed to update ingestion error: %s", err) + break + } + + if stop { + break + } + time.Sleep(time.Second * 5) + } + }() + ws.Status.IngestionRunName = "" ws.Status.NotFinished = NotFinished ws.Status.IngestionLastRunTime = metav1.Now() return nil } -func compileFileStatuses(ctx context.Context, client kclient.Client, ws *v1.Workspace, progress <-chan types.Progress) (map[string]string, error) { +func updateIngestionError(req router.Request, ws *v1.Workspace, runName string) (bool, error) { + var run v1.Run + if err := req.Get(uncached.Get(&run), ws.Namespace, runName); err != nil && !apierrors.IsNotFound(err) { + return false, err + } else if err == nil { + var knowledgeSet v1.KnowledgeSet + if err := req.Get(&knowledgeSet, ws.Namespace, ws.Spec.KnowledgeSetName); err != nil { + return false, err + } + if run.Status.Error != knowledgeSet.Status.IngestionError { + knowledgeSet.Status.IngestionError = run.Status.Error + if err := req.Client.Status().Update(req.Ctx, &knowledgeSet); err != nil { + return false, err + } + } + } + + if run.Status.State == gptscript.Finished || run.Status.State == gptscript.Error { + return true, nil + } + + return false, nil +} + +func compileFileStatuses(ctx context.Context, client kclient.Client, ws *v1.Workspace, progress <-chan types.Progress) (map[string]types.Item, error) { input := toStream(progress) defer input.Close() scanner := bufio.NewScanner(input) - final := map[string]string{} + final := map[string]types.Item{} var errs []error for scanner.Scan() { @@ -288,6 +340,7 @@ func compileFileStatuses(ctx context.Context, client kclient.Client, ws *v1.Work } else if err != nil { errs = append(errs, fmt.Errorf("failed to get knowledge file: %s", err)) } + final[file.Name] = struct{}{} if ingestionStatus.Status == "finished" { delete(final, file.Name) @@ -324,6 +377,12 @@ func (a *Handler) CleanupFile(req router.Request, resp router.Response) error { return err } + var knowledgeSet v1.KnowledgeSet + if err := req.Get(uncached.Get(&knowledgeSet), kFile.Namespace, ws.Spec.KnowledgeSetName); apierrors.IsNotFound(err) { + // if knowledge set has been deleted already, then we don't need to run delete file + return nil + } + if _, err := a.ingester.DeleteKnowledgeFiles(req.Ctx, kFile.Namespace, filepath.Join(workspace.GetDir(ws.Status.WorkspaceID), kFile.Spec.FileName), ws.Spec.KnowledgeSetName); err != nil { return err } diff --git a/pkg/controller/handlers/threads/threads.go b/pkg/controller/handlers/threads/threads.go index 08376812..c213fee7 100644 --- a/pkg/controller/handlers/threads/threads.go +++ b/pkg/controller/handlers/threads/threads.go @@ -60,7 +60,7 @@ func CreateWorkspaces(req router.Request, resp router.Response) error { func CreateKnowledgeSet(req router.Request, _ router.Response) error { thread := req.Object.(*v1.Thread) - if len(thread.Status.KnowledgeSetNames) == 0 { + if len(thread.Status.KnowledgeSetNames) == 0 && thread.Labels[invoke.SystemThreadLabel] != "true" { ws := &v1.KnowledgeSet{ ObjectMeta: metav1.ObjectMeta{ Namespace: req.Namespace, diff --git a/pkg/controller/handlers/uploads/remoteknowledgesource.go b/pkg/controller/handlers/uploads/remoteknowledgesource.go index 287c2a56..abba5299 100644 --- a/pkg/controller/handlers/uploads/remoteknowledgesource.go +++ b/pkg/controller/handlers/uploads/remoteknowledgesource.go @@ -239,6 +239,10 @@ func (u *UploadHandler) HandleUploadRun(req router.Request, resp router.Response // Reset run name to indicate that the run is no longer running remoteKnowledgeSource.Status.ThreadName = "" remoteKnowledgeSource.Status.RunName = "" + if run.Status.Error != "" { + remoteKnowledgeSource.Status.Error = run.Status.Error + } + return req.Client.Status().Update(req.Ctx, remoteKnowledgeSource) } diff --git a/pkg/storage/apis/otto.gptscript.ai/v1/knowledgeset.go b/pkg/storage/apis/otto.gptscript.ai/v1/knowledgeset.go index bc7ffc12..5088212d 100644 --- a/pkg/storage/apis/otto.gptscript.ai/v1/knowledgeset.go +++ b/pkg/storage/apis/otto.gptscript.ai/v1/knowledgeset.go @@ -30,6 +30,27 @@ func (in *KnowledgeSet) DeleteRefs() []Ref { } } +func (in *KnowledgeSet) Has(field string) bool { + return in.Get(field) != "" +} + +func (in *KnowledgeSet) Get(field string) string { + if in == nil { + return "" + } + + switch field { + case "spec.agentName": + return in.Spec.AgentName + } + + return "" +} + +func (*KnowledgeSet) FieldNames() []string { + return []string{"spec.agentName"} +} + // KnowledgeSetManifest should be moved to types once we expose this API type KnowledgeSetManifest struct { DataDescription string `json:"dataDescription,omitempty"` @@ -44,6 +65,7 @@ type KnowledgeSource struct { type KnowledgeSetStatus struct { ObservedIngestionGeneration int64 `json:"observedIngestionGeneration,omitempty"` + IngestionError string `json:"ingestionError,omitempty"` SuggestedDataDescription string `json:"suggestedDataDescription,omitempty"` WorkspaceName string `json:"workspaceName,omitempty"` IsEmpty bool `json:"isEmpty,omitempty"` diff --git a/pkg/storage/apis/otto.gptscript.ai/v1/workspace.go b/pkg/storage/apis/otto.gptscript.ai/v1/workspace.go index d55f5a98..f8b416a2 100644 --- a/pkg/storage/apis/otto.gptscript.ai/v1/workspace.go +++ b/pkg/storage/apis/otto.gptscript.ai/v1/workspace.go @@ -2,6 +2,7 @@ package v1 import ( "github.com/acorn-io/baaah/pkg/fields" + "github.com/otto8-ai/otto8/apiclient/types" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -63,16 +64,16 @@ type WorkspaceSpec struct { } type WorkspaceStatus struct { - WorkspaceID string `json:"workspaceID,omitempty"` - IngestionGeneration int64 `json:"ingestionGeneration,omitempty"` - IngestionRunHash string `json:"ingestionRunHash,omitempty"` - IngestionRunName string `json:"ingestionRunName,omitempty"` - IngestionLastRunTime metav1.Time `json:"ingestionLastRunTime,omitempty"` - LastNotFinished map[string]string `json:"lastNotFinished,omitempty"` - NotFinished map[string]string `json:"notFinished,omitempty"` - RetryCount int `json:"retryCount,omitempty"` - PendingApproval []string `json:"pendingApproval,omitempty"` - PendingRejections []string `json:"pendingRejections,omitempty"` + WorkspaceID string `json:"workspaceID,omitempty"` + IngestionGeneration int64 `json:"ingestionGeneration,omitempty"` + IngestionRunHash string `json:"ingestionRunHash,omitempty"` + IngestionRunName string `json:"ingestionRunName,omitempty"` + IngestionLastRunTime metav1.Time `json:"ingestionLastRunTime,omitempty"` + LastNotFinished map[string]types.Item `json:"lastNotFinished,omitempty"` + NotFinished map[string]types.Item `json:"notFinished,omitempty"` + RetryCount int `json:"retryCount,omitempty"` + PendingApproval []string `json:"pendingApproval,omitempty"` + PendingRejections []string `json:"pendingRejections,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/storage/apis/otto.gptscript.ai/v1/zz_generated.deepcopy.go b/pkg/storage/apis/otto.gptscript.ai/v1/zz_generated.deepcopy.go index 53717dec..a456a057 100644 --- a/pkg/storage/apis/otto.gptscript.ai/v1/zz_generated.deepcopy.go +++ b/pkg/storage/apis/otto.gptscript.ai/v1/zz_generated.deepcopy.go @@ -2029,14 +2029,14 @@ func (in *WorkspaceStatus) DeepCopyInto(out *WorkspaceStatus) { in.IngestionLastRunTime.DeepCopyInto(&out.IngestionLastRunTime) if in.LastNotFinished != nil { in, out := &in.LastNotFinished, &out.LastNotFinished - *out = make(map[string]string, len(*in)) + *out = make(map[string]types.Item, len(*in)) for key, val := range *in { (*out)[key] = val } } if in.NotFinished != nil { in, out := &in.NotFinished, &out.NotFinished - *out = make(map[string]string, len(*in)) + *out = make(map[string]types.Item, len(*in)) for key, val := range *in { (*out)[key] = val } diff --git a/pkg/storage/openapi/generated/openapi_generated.go b/pkg/storage/openapi/generated/openapi_generated.go index b76984b2..42ad44d2 100644 --- a/pkg/storage/openapi/generated/openapi_generated.go +++ b/pkg/storage/openapi/generated/openapi_generated.go @@ -18,6 +18,7 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA return map[string]common.OpenAPIDefinition{ "github.com/otto8-ai/otto8/apiclient/types.Agent": schema_otto8_ai_otto8_apiclient_types_Agent(ref), "github.com/otto8-ai/otto8/apiclient/types.AgentExternalStatus": schema_otto8_ai_otto8_apiclient_types_AgentExternalStatus(ref), + "github.com/otto8-ai/otto8/apiclient/types.AgentKnowledgeSetStatus": schema_otto8_ai_otto8_apiclient_types_AgentKnowledgeSetStatus(ref), "github.com/otto8-ai/otto8/apiclient/types.AgentList": schema_otto8_ai_otto8_apiclient_types_AgentList(ref), "github.com/otto8-ai/otto8/apiclient/types.AgentManifest": schema_otto8_ai_otto8_apiclient_types_AgentManifest(ref), "github.com/otto8-ai/otto8/apiclient/types.Credential": schema_otto8_ai_otto8_apiclient_types_Credential(ref), @@ -35,6 +36,7 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/otto8-ai/otto8/apiclient/types.Item": schema_otto8_ai_otto8_apiclient_types_Item(ref), "github.com/otto8-ai/otto8/apiclient/types.KnowledgeFile": schema_otto8_ai_otto8_apiclient_types_KnowledgeFile(ref), "github.com/otto8-ai/otto8/apiclient/types.KnowledgeFileList": schema_otto8_ai_otto8_apiclient_types_KnowledgeFileList(ref), + "github.com/otto8-ai/otto8/apiclient/types.KnowledgeSetStatus": schema_otto8_ai_otto8_apiclient_types_KnowledgeSetStatus(ref), "github.com/otto8-ai/otto8/apiclient/types.LinkState": schema_otto8_ai_otto8_apiclient_types_LinkState(ref), "github.com/otto8-ai/otto8/apiclient/types.Metadata": schema_otto8_ai_otto8_apiclient_types_Metadata(ref), "github.com/otto8-ai/otto8/apiclient/types.NotionConfig": schema_otto8_ai_otto8_apiclient_types_NotionConfig(ref), @@ -251,12 +253,18 @@ func schema_otto8_ai_otto8_apiclient_types_Agent(ref common.ReferenceCallback) c Ref: ref("github.com/otto8-ai/otto8/apiclient/types.AgentExternalStatus"), }, }, + "AgentKnowledgeSetStatus": { + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("github.com/otto8-ai/otto8/apiclient/types.AgentKnowledgeSetStatus"), + }, + }, }, - Required: []string{"Metadata", "AgentManifest", "AgentExternalStatus"}, + Required: []string{"Metadata", "AgentManifest", "AgentExternalStatus", "AgentKnowledgeSetStatus"}, }, }, Dependencies: []string{ - "github.com/otto8-ai/otto8/apiclient/types.AgentExternalStatus", "github.com/otto8-ai/otto8/apiclient/types.AgentManifest", "github.com/otto8-ai/otto8/apiclient/types.Metadata"}, + "github.com/otto8-ai/otto8/apiclient/types.AgentExternalStatus", "github.com/otto8-ai/otto8/apiclient/types.AgentKnowledgeSetStatus", "github.com/otto8-ai/otto8/apiclient/types.AgentManifest", "github.com/otto8-ai/otto8/apiclient/types.Metadata"}, } } @@ -278,6 +286,33 @@ func schema_otto8_ai_otto8_apiclient_types_AgentExternalStatus(ref common.Refere } } +func schema_otto8_ai_otto8_apiclient_types_AgentKnowledgeSetStatus(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "knowledgeSetStatues": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("github.com/otto8-ai/otto8/apiclient/types.KnowledgeSetStatus"), + }, + }, + }, + }, + }, + }, + }, + }, + Dependencies: []string{ + "github.com/otto8-ai/otto8/apiclient/types.KnowledgeSetStatus"}, + } +} + func schema_otto8_ai_otto8_apiclient_types_AgentList(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ @@ -1069,6 +1104,30 @@ func schema_otto8_ai_otto8_apiclient_types_KnowledgeFileList(ref common.Referenc } } +func schema_otto8_ai_otto8_apiclient_types_KnowledgeSetStatus(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "knowledgeSetName": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, + "error": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, + } +} + func schema_otto8_ai_otto8_apiclient_types_LinkState(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ @@ -3956,6 +4015,12 @@ func schema_storage_apis_ottogptscriptai_v1_KnowledgeSetStatus(ref common.Refere Format: "int64", }, }, + "ingestionError": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, "suggestedDataDescription": { SchemaProps: spec.SchemaProps{ Type: []string{"string"}, @@ -6936,9 +7001,8 @@ func schema_storage_apis_ottogptscriptai_v1_WorkspaceStatus(ref common.Reference Allows: true, Schema: &spec.Schema{ SchemaProps: spec.SchemaProps{ - Default: "", - Type: []string{"string"}, - Format: "", + Default: map[string]interface{}{}, + Ref: ref("github.com/otto8-ai/otto8/apiclient/types.Item"), }, }, }, @@ -6951,9 +7015,8 @@ func schema_storage_apis_ottogptscriptai_v1_WorkspaceStatus(ref common.Reference Allows: true, Schema: &spec.Schema{ SchemaProps: spec.SchemaProps{ - Default: "", - Type: []string{"string"}, - Format: "", + Default: map[string]interface{}{}, + Ref: ref("github.com/otto8-ai/otto8/apiclient/types.Item"), }, }, }, @@ -6997,7 +7060,7 @@ func schema_storage_apis_ottogptscriptai_v1_WorkspaceStatus(ref common.Reference }, }, Dependencies: []string{ - "k8s.io/apimachinery/pkg/apis/meta/v1.Time"}, + "github.com/otto8-ai/otto8/apiclient/types.Item", "k8s.io/apimachinery/pkg/apis/meta/v1.Time"}, } } diff --git a/ui/admin/app/components/knowledge/AgentKnowledgePanel.tsx b/ui/admin/app/components/knowledge/AgentKnowledgePanel.tsx index 26750831..38417e5e 100644 --- a/ui/admin/app/components/knowledge/AgentKnowledgePanel.tsx +++ b/ui/admin/app/components/knowledge/AgentKnowledgePanel.tsx @@ -10,6 +10,7 @@ import { getIngestionStatus, } from "~/lib/model/knowledge"; import { ApiRoutes } from "~/lib/routers/apiRoutes"; +import { AgentService } from "~/lib/service/api/agentService"; import { KnowledgeService } from "~/lib/service/api/knowledgeService"; import { assetUrl } from "~/lib/utils"; @@ -71,6 +72,29 @@ export function AgentKnowledgePanel({ agentId }: { agentId: string }) { [getRemoteKnowledgeSources.data] ); + const fetchAgentKnowledgeSetStatus = useSWR( + AgentService.getAgentById.key(agentId), + ({ agentId }) => + AgentService.getAgentById(agentId).then((agent) => { + if ( + agent?.knowledgeSetStatues && + agent.knowledgeSetStatues.length > 0 + ) { + return agent.knowledgeSetStatues[0]; + } + return null; + }), + { + revalidateOnFocus: false, + refreshInterval: blockPolling ? undefined : 5000, + } + ); + + const knowledgeSetStatus = useMemo( + () => fetchAgentKnowledgeSetStatus.data, + [fetchAgentKnowledgeSetStatus.data] + ); + useEffect(() => { if (knowledge.length > 0) { setBlockPolling( @@ -337,6 +361,7 @@ export function AgentKnowledgePanel({ agentId }: { agentId: string }) { startPolling={startPolling} knowledge={localFiles} getKnowledgeFiles={getKnowledgeFiles} + ingestionError={knowledgeSetStatus?.error} /> ); diff --git a/ui/admin/app/components/knowledge/IngestionStatus.tsx b/ui/admin/app/components/knowledge/IngestionStatus.tsx index b196a968..ce72d8d2 100644 --- a/ui/admin/app/components/knowledge/IngestionStatus.tsx +++ b/ui/admin/app/components/knowledge/IngestionStatus.tsx @@ -13,9 +13,13 @@ import { LoadingSpinner } from "../ui/LoadingSpinner"; interface IngestionStatusProps { knowledge: KnowledgeFile[]; + ingestionError?: string; } -const IngestionStatusComponent = ({ knowledge }: IngestionStatusProps) => { +const IngestionStatusComponent = ({ + knowledge, + ingestionError, +}: IngestionStatusProps) => { const approvedKnowledge = knowledge.filter( (item) => item.approved === true ); @@ -24,6 +28,16 @@ const IngestionStatusComponent = ({ knowledge }: IngestionStatusProps) => {
{(() => { + if (ingestionError) { + return ( +
+ + {ingestionError} + +
+ ); + } + const ingestingCount = approvedKnowledge.filter( (item) => item.ingestionStatus?.status === diff --git a/ui/admin/app/components/knowledge/RemoteKnowledgeSourceStatus.tsx b/ui/admin/app/components/knowledge/RemoteKnowledgeSourceStatus.tsx index 870cec36..6c8ec4a0 100644 --- a/ui/admin/app/components/knowledge/RemoteKnowledgeSourceStatus.tsx +++ b/ui/admin/app/components/knowledge/RemoteKnowledgeSourceStatus.tsx @@ -7,17 +7,20 @@ import { LoadingSpinner } from "~/components/ui/LoadingSpinner"; import RemoteFileAvatar from "./RemoteFileAvatar"; interface RemoteKnowledgeSourceStatusProps { - source: RemoteKnowledgeSource; + source: RemoteKnowledgeSource | undefined; includeAvatar?: boolean; } const RemoteKnowledgeSourceStatus: React.FC< RemoteKnowledgeSourceStatusProps > = ({ source, includeAvatar = true }) => { - if (!source || !source.runID) return null; + if (!source || (!source.runID && !source.error)) return null; if (source.sourceType === "onedrive" && !source.onedriveConfig) return null; + if (source.sourceType === "website" && !source.websiteCrawlingConfig) + return null; + return (
@@ -26,10 +29,12 @@ const RemoteKnowledgeSourceStatus: React.FC< remoteKnowledgeSourceType={source.sourceType!} /> )} - - {source?.status || "Syncing Files..."} + + {source?.error || source?.status || "Syncing Files..."} - + {!source.error && }
); diff --git a/ui/admin/app/components/knowledge/file/FileModal.tsx b/ui/admin/app/components/knowledge/file/FileModal.tsx index bffc71a4..3383f712 100644 --- a/ui/admin/app/components/knowledge/file/FileModal.tsx +++ b/ui/admin/app/components/knowledge/file/FileModal.tsx @@ -35,6 +35,7 @@ interface FileModalProps { onOpenChange: (open: boolean) => void; startPolling: () => void; knowledge: KnowledgeFile[]; + ingestionError?: string; } function FileModal({ @@ -44,6 +45,7 @@ function FileModal({ knowledge, isOpen, onOpenChange, + ingestionError, }: FileModalProps) { const fileInputRef = useRef(null); @@ -165,7 +167,10 @@ function FileModal({
{knowledge.some((item) => item.approved) && ( - + )} void; + ingestionError?: string; }; export const NotionModal: FC = ({ @@ -51,6 +52,7 @@ export const NotionModal: FC = ({ knowledgeFiles, startPolling, handleRemoteKnowledgeSourceSync, + ingestionError, }) => { const [loading, setLoading] = useState(false); const [isSettingModalOpen, setIsSettingModalOpen] = useState(false); @@ -154,7 +156,10 @@ export const NotionModal: FC = ({
{knowledgeFiles?.some((item) => item.approved) && ( - + )} {notionSource?.runID && ( diff --git a/ui/admin/app/components/knowledge/onedrive/AddLinkModal.tsx b/ui/admin/app/components/knowledge/onedrive/AddLinkModal.tsx index e83db93f..898085b0 100644 --- a/ui/admin/app/components/knowledge/onedrive/AddLinkModal.tsx +++ b/ui/admin/app/components/knowledge/onedrive/AddLinkModal.tsx @@ -47,6 +47,7 @@ const AddLinkModal: FC = ({ }, } ); + setNewLink(""); startPolling(); onOpenChange(false); }; diff --git a/ui/admin/app/components/knowledge/onedrive/OneDriveModal.tsx b/ui/admin/app/components/knowledge/onedrive/OneDriveModal.tsx index 894f499a..97a315a2 100644 --- a/ui/admin/app/components/knowledge/onedrive/OneDriveModal.tsx +++ b/ui/admin/app/components/knowledge/onedrive/OneDriveModal.tsx @@ -46,6 +46,7 @@ interface OnedriveModalProps { handleRemoteKnowledgeSourceSync: ( sourceType: RemoteKnowledgeSourceType ) => void; + ingestionError?: string; } export const OnedriveModal: FC = ({ @@ -56,6 +57,7 @@ export const OnedriveModal: FC = ({ startPolling, knowledgeFiles, handleRemoteKnowledgeSourceSync, + ingestionError, }) => { const [isSettingModalOpen, setIsSettingModalOpen] = useState(false); const [isAddLinkModalOpen, setIsAddLinkModalOpen] = useState(false); @@ -351,11 +353,14 @@ export const OnedriveModal: FC = ({ {knowledgeFiles?.some((item) => item.approved) && ( - + )} {onedriveSource?.state?.onedriveState?.links && onedriveSource?.runID && ( - + )}
diff --git a/ui/admin/app/components/knowledge/website/WebsiteModal.tsx b/ui/admin/app/components/knowledge/website/WebsiteModal.tsx index e62b16b9..f13fc3e9 100644 --- a/ui/admin/app/components/knowledge/website/WebsiteModal.tsx +++ b/ui/admin/app/components/knowledge/website/WebsiteModal.tsx @@ -44,6 +44,7 @@ interface WebsiteModalProps { handleRemoteKnowledgeSourceSync: ( sourceType: RemoteKnowledgeSourceType ) => void; + ingestionError?: string; } export const WebsiteModal: FC = ({ @@ -54,6 +55,7 @@ export const WebsiteModal: FC = ({ startPolling, knowledgeFiles, handleRemoteKnowledgeSourceSync, + ingestionError, }) => { const [isSettingModalOpen, setIsSettingModalOpen] = useState(false); const [isAddWebsiteModalOpen, setIsAddWebsiteModalOpen] = useState(false); @@ -264,7 +266,10 @@ export const WebsiteModal: FC = ({ {knowledgeFiles?.some((item) => item.approved) && ( - + )} {websiteSource?.runID && ( diff --git a/ui/admin/app/lib/model/agents.ts b/ui/admin/app/lib/model/agents.ts index 35eeafce..86bd3d43 100644 --- a/ui/admin/app/lib/model/agents.ts +++ b/ui/admin/app/lib/model/agents.ts @@ -13,6 +13,12 @@ export type AgentBase = { workflows?: string[]; tools?: string[]; params?: Record; + knowledgeSetStatues: KnowledgeSetStatus[]; +}; + +export type KnowledgeSetStatus = { + knowledgeSetName: string; + error?: string; }; export type Agent = EntityMeta & From 2873972ce2fd880307c1a32bc3706617e521650e Mon Sep 17 00:00:00 2001 From: Daishan Peng Date: Wed, 23 Oct 2024 09:28:17 -0700 Subject: [PATCH 2/5] Address comments Signed-off-by: Daishan Peng --- pkg/api/handlers/agent.go | 1 - pkg/controller/handlers/agents/agents.go | 2 -- pkg/controller/handlers/knowledge/knowledge.go | 2 +- 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/api/handlers/agent.go b/pkg/api/handlers/agent.go index 8666ed0d..d13560e2 100644 --- a/pkg/api/handlers/agent.go +++ b/pkg/api/handlers/agent.go @@ -100,7 +100,6 @@ func convertAgent(agent v1.Agent, prefix string, knowledgeSets ...v1.KnowledgeSe } var knowledgeSetsStatus types.AgentKnowledgeSetStatus - for _, knowledge := range knowledgeSets { knowledgeSetsStatus.KnowledgeSetStatues = append(knowledgeSetsStatus.KnowledgeSetStatues, types.KnowledgeSetStatus{ Error: knowledge.Status.IngestionError, diff --git a/pkg/controller/handlers/agents/agents.go b/pkg/controller/handlers/agents/agents.go index fa8b374b..160b756c 100644 --- a/pkg/controller/handlers/agents/agents.go +++ b/pkg/controller/handlers/agents/agents.go @@ -4,7 +4,6 @@ import ( "github.com/acorn-io/baaah/pkg/router" v1 "github.com/otto8-ai/otto8/pkg/storage/apis/otto.gptscript.ai/v1" "github.com/otto8-ai/otto8/pkg/system" - "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -28,7 +27,6 @@ func WorkspaceObjects(req router.Request, _ router.Response) error { } if len(agent.Status.KnowledgeSetNames) == 0 { - logrus.Info("Created default knowledge set") ws := &v1.KnowledgeSet{ ObjectMeta: metav1.ObjectMeta{ Namespace: req.Namespace, diff --git a/pkg/controller/handlers/knowledge/knowledge.go b/pkg/controller/handlers/knowledge/knowledge.go index b1597380..9c998e9b 100644 --- a/pkg/controller/handlers/knowledge/knowledge.go +++ b/pkg/controller/handlers/knowledge/knowledge.go @@ -207,7 +207,7 @@ func (a *Handler) IngestKnowledge(req router.Request, resp router.Response) erro ws.Status.IngestionRunHash = hash ws.Status.IngestionRunName = run.Run.Name ws.Status.IngestionGeneration++ - return nil + return req.Client.Status().Update(req.Ctx, ws) } return nil From 35ab1288e042f0814cb4f19312d3d91d63ce288a Mon Sep 17 00:00:00 2001 From: Daishan Peng Date: Wed, 23 Oct 2024 14:40:02 -0700 Subject: [PATCH 3/5] Address comments Signed-off-by: Daishan Peng --- .../handlers/knowledge/knowledge.go | 8 +++---- .../apis/otto.gptscript.ai/v1/workspace.go | 21 +++++++++---------- .../v1/zz_generated.deepcopy.go | 4 ++-- .../openapi/generated/openapi_generated.go | 12 ++++++----- 4 files changed, 23 insertions(+), 22 deletions(-) diff --git a/pkg/controller/handlers/knowledge/knowledge.go b/pkg/controller/handlers/knowledge/knowledge.go index 9c998e9b..1764e77b 100644 --- a/pkg/controller/handlers/knowledge/knowledge.go +++ b/pkg/controller/handlers/knowledge/knowledge.go @@ -301,19 +301,19 @@ func updateIngestionError(req router.Request, ws *v1.Workspace, runName string) } } - if run.Status.State == gptscript.Finished || run.Status.State == gptscript.Error { + if run.Status.State.IsTerminal() { return true, nil } return false, nil } -func compileFileStatuses(ctx context.Context, client kclient.Client, ws *v1.Workspace, progress <-chan types.Progress) (map[string]types.Item, error) { +func compileFileStatuses(ctx context.Context, client kclient.Client, ws *v1.Workspace, progress <-chan types.Progress) (map[string]string, error) { input := toStream(progress) defer input.Close() scanner := bufio.NewScanner(input) - final := map[string]types.Item{} + final := map[string]string{} var errs []error for scanner.Scan() { @@ -340,7 +340,7 @@ func compileFileStatuses(ctx context.Context, client kclient.Client, ws *v1.Work } else if err != nil { errs = append(errs, fmt.Errorf("failed to get knowledge file: %s", err)) } - final[file.Name] = struct{}{} + final[file.Name] = ingestionStatus.Status if ingestionStatus.Status == "finished" { delete(final, file.Name) diff --git a/pkg/storage/apis/otto.gptscript.ai/v1/workspace.go b/pkg/storage/apis/otto.gptscript.ai/v1/workspace.go index f8b416a2..d55f5a98 100644 --- a/pkg/storage/apis/otto.gptscript.ai/v1/workspace.go +++ b/pkg/storage/apis/otto.gptscript.ai/v1/workspace.go @@ -2,7 +2,6 @@ package v1 import ( "github.com/acorn-io/baaah/pkg/fields" - "github.com/otto8-ai/otto8/apiclient/types" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -64,16 +63,16 @@ type WorkspaceSpec struct { } type WorkspaceStatus struct { - WorkspaceID string `json:"workspaceID,omitempty"` - IngestionGeneration int64 `json:"ingestionGeneration,omitempty"` - IngestionRunHash string `json:"ingestionRunHash,omitempty"` - IngestionRunName string `json:"ingestionRunName,omitempty"` - IngestionLastRunTime metav1.Time `json:"ingestionLastRunTime,omitempty"` - LastNotFinished map[string]types.Item `json:"lastNotFinished,omitempty"` - NotFinished map[string]types.Item `json:"notFinished,omitempty"` - RetryCount int `json:"retryCount,omitempty"` - PendingApproval []string `json:"pendingApproval,omitempty"` - PendingRejections []string `json:"pendingRejections,omitempty"` + WorkspaceID string `json:"workspaceID,omitempty"` + IngestionGeneration int64 `json:"ingestionGeneration,omitempty"` + IngestionRunHash string `json:"ingestionRunHash,omitempty"` + IngestionRunName string `json:"ingestionRunName,omitempty"` + IngestionLastRunTime metav1.Time `json:"ingestionLastRunTime,omitempty"` + LastNotFinished map[string]string `json:"lastNotFinished,omitempty"` + NotFinished map[string]string `json:"notFinished,omitempty"` + RetryCount int `json:"retryCount,omitempty"` + PendingApproval []string `json:"pendingApproval,omitempty"` + PendingRejections []string `json:"pendingRejections,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/storage/apis/otto.gptscript.ai/v1/zz_generated.deepcopy.go b/pkg/storage/apis/otto.gptscript.ai/v1/zz_generated.deepcopy.go index a456a057..53717dec 100644 --- a/pkg/storage/apis/otto.gptscript.ai/v1/zz_generated.deepcopy.go +++ b/pkg/storage/apis/otto.gptscript.ai/v1/zz_generated.deepcopy.go @@ -2029,14 +2029,14 @@ func (in *WorkspaceStatus) DeepCopyInto(out *WorkspaceStatus) { in.IngestionLastRunTime.DeepCopyInto(&out.IngestionLastRunTime) if in.LastNotFinished != nil { in, out := &in.LastNotFinished, &out.LastNotFinished - *out = make(map[string]types.Item, len(*in)) + *out = make(map[string]string, len(*in)) for key, val := range *in { (*out)[key] = val } } if in.NotFinished != nil { in, out := &in.NotFinished, &out.NotFinished - *out = make(map[string]types.Item, len(*in)) + *out = make(map[string]string, len(*in)) for key, val := range *in { (*out)[key] = val } diff --git a/pkg/storage/openapi/generated/openapi_generated.go b/pkg/storage/openapi/generated/openapi_generated.go index 42ad44d2..d55d4b3b 100644 --- a/pkg/storage/openapi/generated/openapi_generated.go +++ b/pkg/storage/openapi/generated/openapi_generated.go @@ -7001,8 +7001,9 @@ func schema_storage_apis_ottogptscriptai_v1_WorkspaceStatus(ref common.Reference Allows: true, Schema: &spec.Schema{ SchemaProps: spec.SchemaProps{ - Default: map[string]interface{}{}, - Ref: ref("github.com/otto8-ai/otto8/apiclient/types.Item"), + Default: "", + Type: []string{"string"}, + Format: "", }, }, }, @@ -7015,8 +7016,9 @@ func schema_storage_apis_ottogptscriptai_v1_WorkspaceStatus(ref common.Reference Allows: true, Schema: &spec.Schema{ SchemaProps: spec.SchemaProps{ - Default: map[string]interface{}{}, - Ref: ref("github.com/otto8-ai/otto8/apiclient/types.Item"), + Default: "", + Type: []string{"string"}, + Format: "", }, }, }, @@ -7060,7 +7062,7 @@ func schema_storage_apis_ottogptscriptai_v1_WorkspaceStatus(ref common.Reference }, }, Dependencies: []string{ - "github.com/otto8-ai/otto8/apiclient/types.Item", "k8s.io/apimachinery/pkg/apis/meta/v1.Time"}, + "k8s.io/apimachinery/pkg/apis/meta/v1.Time"}, } } From 9c95f090b60427b3f1cbaa9d6886fd3420794188 Mon Sep 17 00:00:00 2001 From: Donnie Adams Date: Thu, 24 Oct 2024 08:20:39 -0400 Subject: [PATCH 4/5] Use controller framework to set ingestion error Signed-off-by: Donnie Adams --- .../handlers/knowledge/knowledge.go | 61 +++++++------------ 1 file changed, 21 insertions(+), 40 deletions(-) diff --git a/pkg/controller/handlers/knowledge/knowledge.go b/pkg/controller/handlers/knowledge/knowledge.go index 1764e77b..a5067ef6 100644 --- a/pkg/controller/handlers/knowledge/knowledge.go +++ b/pkg/controller/handlers/knowledge/knowledge.go @@ -249,6 +249,15 @@ func (a *Handler) UpdateFileStatus(req router.Request, _ router.Response) error return err } + if run.Status.State.IsTerminal() { + if err := updateIngestionError(req, ws, &run); err != nil { + return err + } + + ws.Status.IngestionRunName = "" + return nil + } + _, progress, err := a.events.Watch(req.Ctx, ws.Namespace, events.WatchOptions{ Run: &run, }) @@ -256,56 +265,28 @@ func (a *Handler) UpdateFileStatus(req router.Request, _ router.Response) error return err } - NotFinished, err := compileFileStatuses(req.Ctx, req.Client, ws, progress) + notFinished, err := compileFileStatuses(req.Ctx, req.Client, ws, progress) if err != nil { return err } - // Fetch ingestion error from run.status in go routine so that we don't block - runName := ws.Status.IngestionRunName - go func() { - for { - stop, err := updateIngestionError(req, ws, runName) - if err != nil { - logger.Errorf("failed to update ingestion error: %s", err) - break - } - - if stop { - break - } - time.Sleep(time.Second * 5) - } - }() - - ws.Status.IngestionRunName = "" - ws.Status.NotFinished = NotFinished + ws.Status.NotFinished = notFinished ws.Status.IngestionLastRunTime = metav1.Now() return nil } -func updateIngestionError(req router.Request, ws *v1.Workspace, runName string) (bool, error) { - var run v1.Run - if err := req.Get(uncached.Get(&run), ws.Namespace, runName); err != nil && !apierrors.IsNotFound(err) { - return false, err - } else if err == nil { - var knowledgeSet v1.KnowledgeSet - if err := req.Get(&knowledgeSet, ws.Namespace, ws.Spec.KnowledgeSetName); err != nil { - return false, err - } - if run.Status.Error != knowledgeSet.Status.IngestionError { - knowledgeSet.Status.IngestionError = run.Status.Error - if err := req.Client.Status().Update(req.Ctx, &knowledgeSet); err != nil { - return false, err - } - } +func updateIngestionError(req router.Request, ws *v1.Workspace, run *v1.Run) error { + var knowledgeSet v1.KnowledgeSet + if err := req.Get(&knowledgeSet, ws.Namespace, ws.Spec.KnowledgeSetName); err != nil { + return err } - - if run.Status.State.IsTerminal() { - return true, nil + if run.Status.Error != knowledgeSet.Status.IngestionError { + knowledgeSet.Status.IngestionError = run.Status.Error + if err := req.Client.Status().Update(req.Ctx, &knowledgeSet); err != nil { + return err + } } - - return false, nil + return nil } func compileFileStatuses(ctx context.Context, client kclient.Client, ws *v1.Workspace, progress <-chan types.Progress) (map[string]string, error) { From c5a9016e023979c9eb9db866e195a02fd38241e6 Mon Sep 17 00:00:00 2001 From: Daishan Peng Date: Thu, 24 Oct 2024 09:39:23 -0700 Subject: [PATCH 5/5] Add lastIngestionRunName Signed-off-by: Daishan Peng --- .../handlers/knowledge/knowledge.go | 48 ++++++++++++------- .../handlers/knowledgeset/knowledgeset.go | 2 +- pkg/controller/routes.go | 1 + .../apis/otto.gptscript.ai/v1/workspace.go | 21 ++++---- 4 files changed, 45 insertions(+), 27 deletions(-) diff --git a/pkg/controller/handlers/knowledge/knowledge.go b/pkg/controller/handlers/knowledge/knowledge.go index a5067ef6..9e5be329 100644 --- a/pkg/controller/handlers/knowledge/knowledge.go +++ b/pkg/controller/handlers/knowledge/knowledge.go @@ -79,7 +79,7 @@ func (a *Handler) IngestKnowledge(req router.Request, resp router.Response) erro } // The status handler will clean this up - if ws.Status.IngestionRunName != "" { + if ws.Status.CurrentIngestionRunName != "" { return nil } @@ -205,7 +205,7 @@ func (a *Handler) IngestKnowledge(req router.Request, resp router.Response) erro } ws.Status.IngestionRunHash = hash - ws.Status.IngestionRunName = run.Run.Name + ws.Status.CurrentIngestionRunName = run.Run.Name ws.Status.IngestionGeneration++ return req.Client.Status().Update(req.Ctx, ws) } @@ -234,30 +234,21 @@ func toStream(events <-chan types.Progress) io.ReadCloser { func (a *Handler) UpdateFileStatus(req router.Request, _ router.Response) error { ws := req.Object.(*v1.Workspace) - if ws.Status.IngestionRunName == "" { + if ws.Status.CurrentIngestionRunName == "" { return nil } var run v1.Run - if err := req.Get(&run, ws.Namespace, ws.Status.IngestionRunName); apierrors.IsNotFound(err) { - if err := req.Get(uncached.Get(&run), ws.Namespace, ws.Status.IngestionRunName); apierrors.IsNotFound(err) { + if err := req.Get(&run, ws.Namespace, ws.Status.CurrentIngestionRunName); apierrors.IsNotFound(err) { + if err := req.Get(uncached.Get(&run), ws.Namespace, ws.Status.CurrentIngestionRunName); apierrors.IsNotFound(err) { // Orphaned? User deleted the run? Solar flare? - ws.Status.IngestionRunName = "" + ws.Status.CurrentIngestionRunName = "" } return nil } else if err != nil { return err } - if run.Status.State.IsTerminal() { - if err := updateIngestionError(req, ws, &run); err != nil { - return err - } - - ws.Status.IngestionRunName = "" - return nil - } - _, progress, err := a.events.Watch(req.Ctx, ws.Namespace, events.WatchOptions{ Run: &run, }) @@ -270,11 +261,36 @@ func (a *Handler) UpdateFileStatus(req router.Request, _ router.Response) error return err } + ws.Status.LastIngestionRunName = ws.Status.CurrentIngestionRunName + ws.Status.CurrentIngestionRunName = "" ws.Status.NotFinished = notFinished ws.Status.IngestionLastRunTime = metav1.Now() return nil } +func (a *Handler) UpdateIngestionError(req router.Request, _ router.Response) error { + ws := req.Object.(*v1.Workspace) + + if ws.Status.LastIngestionRunName == "" { + return nil + } + + var run v1.Run + if err := req.Get(&run, ws.Namespace, ws.Status.LastIngestionRunName); apierrors.IsNotFound(err) { + return nil + } else if err != nil { + return err + } + + if run.Status.State.IsTerminal() { + if err := updateIngestionError(req, ws, &run); err != nil { + return err + } + } + + return nil +} + func updateIngestionError(req router.Request, ws *v1.Workspace, run *v1.Run) error { var knowledgeSet v1.KnowledgeSet if err := req.Get(&knowledgeSet, ws.Namespace, ws.Spec.KnowledgeSetName); err != nil { @@ -323,7 +339,7 @@ func compileFileStatuses(ctx context.Context, client kclient.Client, ws *v1.Work } final[file.Name] = ingestionStatus.Status - if ingestionStatus.Status == "finished" { + if ingestionStatus.Status == "finished" || ingestionStatus.Status == "skipped" { delete(final, file.Name) } diff --git a/pkg/controller/handlers/knowledgeset/knowledgeset.go b/pkg/controller/handlers/knowledgeset/knowledgeset.go index 04e19276..092931c3 100644 --- a/pkg/controller/handlers/knowledgeset/knowledgeset.go +++ b/pkg/controller/handlers/knowledgeset/knowledgeset.go @@ -42,7 +42,7 @@ func (h *Handler) GenerateDataDescription(req router.Request, resp router.Respon } // Ignore if running - if ws.Status.IngestionRunName != "" { + if ws.Status.CurrentIngestionRunName != "" { return nil } diff --git a/pkg/controller/routes.go b/pkg/controller/routes.go index 7aa5ca64..e8f80e8d 100644 --- a/pkg/controller/routes.go +++ b/pkg/controller/routes.go @@ -88,6 +88,7 @@ func (c *Controller) setupRoutes() error { root.Type(&v1.Workspace{}).HandlerFunc(workspace.CreateWorkspace) root.Type(&v1.Workspace{}).HandlerFunc(knowledge.IngestKnowledge) root.Type(&v1.Workspace{}).HandlerFunc(knowledge.UpdateFileStatus) + root.Type(&v1.Workspace{}).HandlerFunc(knowledge.UpdateIngestionError) // KnowledgeSets root.Type(&v1.KnowledgeSet{}).HandlerFunc(cleanup.Cleanup) diff --git a/pkg/storage/apis/otto.gptscript.ai/v1/workspace.go b/pkg/storage/apis/otto.gptscript.ai/v1/workspace.go index d55f5a98..5f1f655d 100644 --- a/pkg/storage/apis/otto.gptscript.ai/v1/workspace.go +++ b/pkg/storage/apis/otto.gptscript.ai/v1/workspace.go @@ -63,16 +63,17 @@ type WorkspaceSpec struct { } type WorkspaceStatus struct { - WorkspaceID string `json:"workspaceID,omitempty"` - IngestionGeneration int64 `json:"ingestionGeneration,omitempty"` - IngestionRunHash string `json:"ingestionRunHash,omitempty"` - IngestionRunName string `json:"ingestionRunName,omitempty"` - IngestionLastRunTime metav1.Time `json:"ingestionLastRunTime,omitempty"` - LastNotFinished map[string]string `json:"lastNotFinished,omitempty"` - NotFinished map[string]string `json:"notFinished,omitempty"` - RetryCount int `json:"retryCount,omitempty"` - PendingApproval []string `json:"pendingApproval,omitempty"` - PendingRejections []string `json:"pendingRejections,omitempty"` + WorkspaceID string `json:"workspaceID,omitempty"` + IngestionGeneration int64 `json:"ingestionGeneration,omitempty"` + IngestionRunHash string `json:"ingestionRunHash,omitempty"` + CurrentIngestionRunName string `json:"currentIngestionRunName,omitempty"` + LastIngestionRunName string `json:"lastIngestionRunName,omitempty"` + IngestionLastRunTime metav1.Time `json:"ingestionLastRunTime,omitempty"` + LastNotFinished map[string]string `json:"lastNotFinished,omitempty"` + NotFinished map[string]string `json:"notFinished,omitempty"` + RetryCount int `json:"retryCount,omitempty"` + PendingApproval []string `json:"pendingApproval,omitempty"` + PendingRejections []string `json:"pendingRejections,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object