Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Add error status handling for ingestion and sync source #283

Merged
merged 5 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions apiclient/types/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type Agent struct {
Metadata
AgentManifest
AgentExternalStatus
AgentKnowledgeSetStatus
}

type AgentList List[Agent]
Expand Down Expand Up @@ -46,3 +47,12 @@ func (m AgentManifest) GetParams() *openapi3.Schema {
type AgentExternalStatus struct {
RefNameAssigned bool `json:"refNameAssigned,omitempty"`
}

type AgentKnowledgeSetStatus struct {
KnowledgeSetStatues []KnowledgeSetStatus `json:"knowledgeSetStatues,omitempty"`
}

type KnowledgeSetStatus struct {
KnowledgeSetName string `json:"knowledgeSetName,omitempty"`
Error string `json:"error,omitempty"`
}
36 changes: 36 additions & 0 deletions apiclient/types/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 40 additions & 6 deletions pkg/api/handlers/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"github.com/otto8-ai/otto8/pkg/api/server"
"github.com/otto8-ai/otto8/pkg/render"
v1 "github.com/otto8-ai/otto8/pkg/storage/apis/otto.gptscript.ai/v1"
"github.com/otto8-ai/otto8/pkg/storage/selectors"
"github.com/otto8-ai/otto8/pkg/system"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
kclient "sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -87,7 +89,7 @@ func (a *AgentHandler) Create(req api.Context) error {
return req.Write(convertAgent(agent, server.GetURLPrefix(req)))
}

func convertAgent(agent v1.Agent, prefix string) *types.Agent {
func convertAgent(agent v1.Agent, prefix string, knowledgeSets ...v1.KnowledgeSet) *types.Agent {
var links []string
if prefix != "" {
refName := agent.Name
Expand All @@ -96,10 +98,20 @@ func convertAgent(agent v1.Agent, prefix string) *types.Agent {
}
links = []string{"invoke", prefix + "/invoke/" + refName}
}

var knowledgeSetsStatus types.AgentKnowledgeSetStatus
for _, knowledge := range knowledgeSets {
knowledgeSetsStatus.KnowledgeSetStatues = append(knowledgeSetsStatus.KnowledgeSetStatues, types.KnowledgeSetStatus{
Error: knowledge.Status.IngestionError,
KnowledgeSetName: knowledge.Name,
})
}

return &types.Agent{
Metadata: MetadataFrom(&agent, links...),
AgentManifest: agent.Spec.Manifest,
AgentExternalStatus: agent.Status.External,
Metadata: MetadataFrom(&agent, links...),
AgentManifest: agent.Spec.Manifest,
AgentExternalStatus: agent.Status.External,
AgentKnowledgeSetStatus: knowledgeSetsStatus,
}
}

Expand All @@ -109,7 +121,12 @@ func (a *AgentHandler) ByID(req api.Context) error {
return err
}

return req.Write(convertAgent(agent, server.GetURLPrefix(req)))
knowledgeSets, err := getKnowledgeSetsFromAgent(req, agent)
if err != nil {
return err
}

return req.Write(convertAgent(agent, server.GetURLPrefix(req), knowledgeSets...))
}

func (a *AgentHandler) List(req api.Context) error {
Expand All @@ -120,12 +137,29 @@ func (a *AgentHandler) List(req api.Context) error {

var resp types.AgentList
for _, agent := range agentList.Items {
resp.Items = append(resp.Items, *convertAgent(agent, server.GetURLPrefix(req)))
knowledgeSets, err := getKnowledgeSetsFromAgent(req, agent)
if err != nil {
return err
}
resp.Items = append(resp.Items, *convertAgent(agent, server.GetURLPrefix(req), knowledgeSets...))
}

return req.Write(resp)
}

func getKnowledgeSetsFromAgent(req api.Context, agent v1.Agent) ([]v1.KnowledgeSet, error) {
var knowledgeSets v1.KnowledgeSetList
if err := req.Storage.List(req.Context(), &knowledgeSets, &kclient.ListOptions{
FieldSelector: fields.SelectorFromSet(selectors.RemoveEmpty(map[string]string{
"spec.agentName": agent.Name,
})),
Namespace: agent.Namespace,
}); err != nil {
return nil, err
}
return knowledgeSets.Items, nil
}

func (a *AgentHandler) Files(req api.Context) error {
var (
id = req.PathValue("id")
Expand Down
1 change: 1 addition & 0 deletions pkg/api/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func Router(services *services.Services) (http.Handler, error) {
mux.HandleFunc("PUT /api/agents/{id}/knowledge/{file_id}/approve", agents.ApproveKnowledgeFile)
mux.HandleFunc("DELETE /api/agents/{id}/knowledge/{file...}", agents.DeleteKnowledge)

// Remote Knowledge Sources
mux.HandleFunc("POST /api/agents/{agent_id}/remote-knowledge-sources", agents.CreateRemoteKnowledgeSource)
mux.HandleFunc("GET /api/agents/{agent_id}/remote-knowledge-sources", agents.GetRemoteKnowledgeSources)
mux.HandleFunc("PATCH /api/agents/{agent_id}/remote-knowledge-sources/{id}", agents.ReSyncRemoteKnowledgeSource)
Expand Down
78 changes: 67 additions & 11 deletions pkg/controller/handlers/knowledge/knowledge.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (a *Handler) IngestKnowledge(req router.Request, resp router.Response) erro
}

// The status handler will clean this up
if ws.Status.IngestionRunName != "" {
if ws.Status.CurrentIngestionRunName != "" {
return nil
}

Expand Down Expand Up @@ -192,8 +192,20 @@ func (a *Handler) IngestKnowledge(req router.Request, resp router.Response) erro
return err
}

// Reset knowledge set error before ingestion
var knowledgeSet v1.KnowledgeSet
if err := req.Get(&knowledgeSet, ws.Namespace, ws.Spec.KnowledgeSetName); err != nil {
return err
}
if knowledgeSet.Status.IngestionError != "" {
knowledgeSet.Status.IngestionError = ""
if err := req.Client.Status().Update(req.Ctx, &knowledgeSet); err != nil {
return err
}
}

ws.Status.IngestionRunHash = hash
ws.Status.IngestionRunName = run.Run.Name
ws.Status.CurrentIngestionRunName = run.Run.Name
ws.Status.IngestionGeneration++
return req.Client.Status().Update(req.Ctx, ws)
}
Expand Down Expand Up @@ -222,15 +234,15 @@ func toStream(events <-chan types.Progress) io.ReadCloser {
func (a *Handler) UpdateFileStatus(req router.Request, _ router.Response) error {
ws := req.Object.(*v1.Workspace)

if ws.Status.IngestionRunName == "" {
if ws.Status.CurrentIngestionRunName == "" {
return nil
}

var run v1.Run
if err := req.Get(&run, ws.Namespace, ws.Status.IngestionRunName); apierrors.IsNotFound(err) {
if err := req.Get(uncached.Get(&run), ws.Namespace, ws.Status.IngestionRunName); apierrors.IsNotFound(err) {
if err := req.Get(&run, ws.Namespace, ws.Status.CurrentIngestionRunName); apierrors.IsNotFound(err) {
if err := req.Get(uncached.Get(&run), ws.Namespace, ws.Status.CurrentIngestionRunName); apierrors.IsNotFound(err) {
// Orphaned? User deleted the run? Solar flare?
ws.Status.IngestionRunName = ""
ws.Status.CurrentIngestionRunName = ""
}
return nil
} else if err != nil {
Expand All @@ -244,18 +256,55 @@ func (a *Handler) UpdateFileStatus(req router.Request, _ router.Response) error
return err
}

NotFinished, err := compileFileStatuses(req.Ctx, req.Client, ws, progress)
notFinished, err := compileFileStatuses(req.Ctx, req.Client, ws, progress)
if err != nil {
return err
}

// All good
ws.Status.IngestionRunName = ""
ws.Status.NotFinished = NotFinished
ws.Status.LastIngestionRunName = ws.Status.CurrentIngestionRunName
ws.Status.CurrentIngestionRunName = ""
ws.Status.NotFinished = notFinished
ws.Status.IngestionLastRunTime = metav1.Now()
return nil
}

func (a *Handler) UpdateIngestionError(req router.Request, _ router.Response) error {
ws := req.Object.(*v1.Workspace)

if ws.Status.LastIngestionRunName == "" {
return nil
}

var run v1.Run
if err := req.Get(&run, ws.Namespace, ws.Status.LastIngestionRunName); apierrors.IsNotFound(err) {
return nil
} else if err != nil {
return err
}

if run.Status.State.IsTerminal() {
if err := updateIngestionError(req, ws, &run); err != nil {
return err
}
}

return nil
}

func updateIngestionError(req router.Request, ws *v1.Workspace, run *v1.Run) error {
var knowledgeSet v1.KnowledgeSet
if err := req.Get(&knowledgeSet, ws.Namespace, ws.Spec.KnowledgeSetName); err != nil {
return err
}
if run.Status.Error != knowledgeSet.Status.IngestionError {
knowledgeSet.Status.IngestionError = run.Status.Error
if err := req.Client.Status().Update(req.Ctx, &knowledgeSet); err != nil {
return err
}
}
return nil
}

func compileFileStatuses(ctx context.Context, client kclient.Client, ws *v1.Workspace, progress <-chan types.Progress) (map[string]string, error) {
input := toStream(progress)
defer input.Close()
Expand Down Expand Up @@ -288,8 +337,9 @@ func compileFileStatuses(ctx context.Context, client kclient.Client, ws *v1.Work
} else if err != nil {
errs = append(errs, fmt.Errorf("failed to get knowledge file: %s", err))
}
final[file.Name] = ingestionStatus.Status

if ingestionStatus.Status == "finished" {
if ingestionStatus.Status == "finished" || ingestionStatus.Status == "skipped" {
delete(final, file.Name)
}

Expand Down Expand Up @@ -324,6 +374,12 @@ func (a *Handler) CleanupFile(req router.Request, resp router.Response) error {
return err
}

var knowledgeSet v1.KnowledgeSet
if err := req.Get(uncached.Get(&knowledgeSet), kFile.Namespace, ws.Spec.KnowledgeSetName); apierrors.IsNotFound(err) {
// if knowledge set has been deleted already, then we don't need to run delete file
return nil
}

if _, err := a.ingester.DeleteKnowledgeFiles(req.Ctx, kFile.Namespace, filepath.Join(workspace.GetDir(ws.Status.WorkspaceID), kFile.Spec.FileName), ws.Spec.KnowledgeSetName); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/handlers/knowledgeset/knowledgeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (h *Handler) GenerateDataDescription(req router.Request, resp router.Respon
}

// Ignore if running
if ws.Status.IngestionRunName != "" {
if ws.Status.CurrentIngestionRunName != "" {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/handlers/threads/threads.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func CreateWorkspaces(req router.Request, resp router.Response) error {

func CreateKnowledgeSet(req router.Request, _ router.Response) error {
thread := req.Object.(*v1.Thread)
if len(thread.Status.KnowledgeSetNames) == 0 {
if len(thread.Status.KnowledgeSetNames) == 0 && thread.Labels[invoke.SystemThreadLabel] != "true" {
thedadams marked this conversation as resolved.
Show resolved Hide resolved
ws := &v1.KnowledgeSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: req.Namespace,
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/handlers/uploads/remoteknowledgesource.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ func (u *UploadHandler) HandleUploadRun(req router.Request, resp router.Response
// Reset run name to indicate that the run is no longer running
remoteKnowledgeSource.Status.ThreadName = ""
remoteKnowledgeSource.Status.RunName = ""
if run.Status.Error != "" {
remoteKnowledgeSource.Status.Error = run.Status.Error
}

return req.Client.Status().Update(req.Ctx, remoteKnowledgeSource)
}

Expand Down
1 change: 1 addition & 0 deletions pkg/controller/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func (c *Controller) setupRoutes() error {
root.Type(&v1.Workspace{}).HandlerFunc(workspace.CreateWorkspace)
root.Type(&v1.Workspace{}).HandlerFunc(knowledge.IngestKnowledge)
root.Type(&v1.Workspace{}).HandlerFunc(knowledge.UpdateFileStatus)
root.Type(&v1.Workspace{}).HandlerFunc(knowledge.UpdateIngestionError)

// KnowledgeSets
root.Type(&v1.KnowledgeSet{}).HandlerFunc(cleanup.Cleanup)
Expand Down
22 changes: 22 additions & 0 deletions pkg/storage/apis/otto.gptscript.ai/v1/knowledgeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,27 @@ func (in *KnowledgeSet) DeleteRefs() []Ref {
}
}

func (in *KnowledgeSet) Has(field string) bool {
return in.Get(field) != ""
}

func (in *KnowledgeSet) Get(field string) string {
if in == nil {
return ""
}

switch field {
case "spec.agentName":
return in.Spec.AgentName
}

return ""
}

func (*KnowledgeSet) FieldNames() []string {
return []string{"spec.agentName"}
}

// KnowledgeSetManifest should be moved to types once we expose this API
type KnowledgeSetManifest struct {
DataDescription string `json:"dataDescription,omitempty"`
Expand All @@ -44,6 +65,7 @@ type KnowledgeSource struct {

type KnowledgeSetStatus struct {
ObservedIngestionGeneration int64 `json:"observedIngestionGeneration,omitempty"`
IngestionError string `json:"ingestionError,omitempty"`
SuggestedDataDescription string `json:"suggestedDataDescription,omitempty"`
WorkspaceName string `json:"workspaceName,omitempty"`
IsEmpty bool `json:"isEmpty,omitempty"`
Expand Down
Loading