Skip to content

Commit

Permalink
Fix: Add error status handling for ingestion and sync source
Browse files Browse the repository at this point in the history
Signed-off-by: Daishan Peng <[email protected]>
  • Loading branch information
StrongMonkey committed Oct 23, 2024
1 parent c6cc8a6 commit 85fd6b4
Show file tree
Hide file tree
Showing 21 changed files with 350 additions and 46 deletions.
10 changes: 10 additions & 0 deletions apiclient/types/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type Agent struct {
Metadata
AgentManifest
AgentExternalStatus
AgentKnowledgeSetStatus
}

type AgentList List[Agent]
Expand Down Expand Up @@ -46,3 +47,12 @@ func (m AgentManifest) GetParams() *openapi3.Schema {
type AgentExternalStatus struct {
RefNameAssigned bool `json:"refNameAssigned,omitempty"`
}

type AgentKnowledgeSetStatus struct {
KnowledgeSetStatues []KnowledgeSetStatus `json:"knowledgeSetStatues,omitempty"`
}

type KnowledgeSetStatus struct {
KnowledgeSetName string `json:"knowledgeSetName,omitempty"`
Error string `json:"error,omitempty"`
}
36 changes: 36 additions & 0 deletions apiclient/types/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 41 additions & 6 deletions pkg/api/handlers/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"github.com/otto8-ai/otto8/pkg/api/server"
"github.com/otto8-ai/otto8/pkg/render"
v1 "github.com/otto8-ai/otto8/pkg/storage/apis/otto.gptscript.ai/v1"
"github.com/otto8-ai/otto8/pkg/storage/selectors"
"github.com/otto8-ai/otto8/pkg/system"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
kclient "sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -87,7 +89,7 @@ func (a *AgentHandler) Create(req api.Context) error {
return req.Write(convertAgent(agent, server.GetURLPrefix(req)))
}

func convertAgent(agent v1.Agent, prefix string) *types.Agent {
func convertAgent(agent v1.Agent, prefix string, knowledgeSets ...v1.KnowledgeSet) *types.Agent {
var links []string
if prefix != "" {
refName := agent.Name
Expand All @@ -96,10 +98,21 @@ func convertAgent(agent v1.Agent, prefix string) *types.Agent {
}
links = []string{"invoke", prefix + "/invoke/" + refName}
}

var knowledgeSetsStatus types.AgentKnowledgeSetStatus

for _, knowledge := range knowledgeSets {
knowledgeSetsStatus.KnowledgeSetStatues = append(knowledgeSetsStatus.KnowledgeSetStatues, types.KnowledgeSetStatus{
Error: knowledge.Status.IngestionError,
KnowledgeSetName: knowledge.Name,
})
}

return &types.Agent{
Metadata: MetadataFrom(&agent, links...),
AgentManifest: agent.Spec.Manifest,
AgentExternalStatus: agent.Status.External,
Metadata: MetadataFrom(&agent, links...),
AgentManifest: agent.Spec.Manifest,
AgentExternalStatus: agent.Status.External,
AgentKnowledgeSetStatus: knowledgeSetsStatus,
}
}

Expand All @@ -109,7 +122,12 @@ func (a *AgentHandler) ByID(req api.Context) error {
return err
}

return req.Write(convertAgent(agent, server.GetURLPrefix(req)))
knowledgeSets, err := getKnowledgeSetsFromAgent(req, agent)
if err != nil {
return err
}

return req.Write(convertAgent(agent, server.GetURLPrefix(req), knowledgeSets...))
}

func (a *AgentHandler) List(req api.Context) error {
Expand All @@ -120,12 +138,29 @@ func (a *AgentHandler) List(req api.Context) error {

var resp types.AgentList
for _, agent := range agentList.Items {
resp.Items = append(resp.Items, *convertAgent(agent, server.GetURLPrefix(req)))
knowledgeSets, err := getKnowledgeSetsFromAgent(req, agent)
if err != nil {
return err
}
resp.Items = append(resp.Items, *convertAgent(agent, server.GetURLPrefix(req), knowledgeSets...))
}

return req.Write(resp)
}

func getKnowledgeSetsFromAgent(req api.Context, agent v1.Agent) ([]v1.KnowledgeSet, error) {
var knowledgeSets v1.KnowledgeSetList
if err := req.Storage.List(req.Context(), &knowledgeSets, &kclient.ListOptions{
FieldSelector: fields.SelectorFromSet(selectors.RemoveEmpty(map[string]string{
"spec.agentName": agent.Name,
})),
Namespace: agent.Namespace,
}); err != nil {
return nil, err
}
return knowledgeSets.Items, nil
}

func (a *AgentHandler) Files(req api.Context) error {
var (
id = req.PathValue("id")
Expand Down
1 change: 1 addition & 0 deletions pkg/api/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func Router(services *services.Services) (http.Handler, error) {
mux.HandleFunc("PUT /api/agents/{id}/knowledge/{file_id}/approve", agents.ApproveKnowledgeFile)
mux.HandleFunc("DELETE /api/agents/{id}/knowledge/{file...}", agents.DeleteKnowledge)

// Remote Knowledge Sources
mux.HandleFunc("POST /api/agents/{agent_id}/remote-knowledge-sources", agents.CreateRemoteKnowledgeSource)
mux.HandleFunc("GET /api/agents/{agent_id}/remote-knowledge-sources", agents.GetRemoteKnowledgeSources)
mux.HandleFunc("PATCH /api/agents/{agent_id}/remote-knowledge-sources/{id}", agents.ReSyncRemoteKnowledgeSource)
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/handlers/agents/agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/acorn-io/baaah/pkg/router"
v1 "github.com/otto8-ai/otto8/pkg/storage/apis/otto.gptscript.ai/v1"
"github.com/otto8-ai/otto8/pkg/system"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand All @@ -27,6 +28,7 @@ func WorkspaceObjects(req router.Request, _ router.Response) error {
}

if len(agent.Status.KnowledgeSetNames) == 0 {
logrus.Info("Created default knowledge set")
ws := &v1.KnowledgeSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: req.Namespace,
Expand Down
67 changes: 63 additions & 4 deletions pkg/controller/handlers/knowledge/knowledge.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,22 @@ func (a *Handler) IngestKnowledge(req router.Request, resp router.Response) erro
return err
}

// Reset knowledge set error before ingestion
var knowledgeSet v1.KnowledgeSet
if err := req.Get(&knowledgeSet, ws.Namespace, ws.Spec.KnowledgeSetName); err != nil {
return err
}
if knowledgeSet.Status.IngestionError != "" {
knowledgeSet.Status.IngestionError = ""
if err := req.Client.Status().Update(req.Ctx, &knowledgeSet); err != nil {
return err
}
}

ws.Status.IngestionRunHash = hash
ws.Status.IngestionRunName = run.Run.Name
ws.Status.IngestionGeneration++
return req.Client.Status().Update(req.Ctx, ws)
return nil
}

return nil
Expand Down Expand Up @@ -248,19 +260,59 @@ func (a *Handler) UpdateFileStatus(req router.Request, _ router.Response) error
return err
}

// All good
// Fetch ingestion error from run.status in go routine so that we don't block
runName := ws.Status.IngestionRunName
go func() {
for {
stop, err := updateIngestionError(req, ws, runName)
if err != nil {
logger.Errorf("failed to update ingestion error: %s", err)
break
}

if stop {
break
}
time.Sleep(time.Second * 5)
}
}()

ws.Status.IngestionRunName = ""
ws.Status.NotFinished = NotFinished
ws.Status.IngestionLastRunTime = metav1.Now()
return nil
}

func compileFileStatuses(ctx context.Context, client kclient.Client, ws *v1.Workspace, progress <-chan types.Progress) (map[string]string, error) {
func updateIngestionError(req router.Request, ws *v1.Workspace, runName string) (bool, error) {
var run v1.Run
if err := req.Get(uncached.Get(&run), ws.Namespace, runName); err != nil && !apierrors.IsNotFound(err) {
return false, err
} else if err == nil {
var knowledgeSet v1.KnowledgeSet
if err := req.Get(&knowledgeSet, ws.Namespace, ws.Spec.KnowledgeSetName); err != nil {
return false, err
}
if run.Status.Error != knowledgeSet.Status.IngestionError {
knowledgeSet.Status.IngestionError = run.Status.Error
if err := req.Client.Status().Update(req.Ctx, &knowledgeSet); err != nil {
return false, err
}
}
}

if run.Status.State == gptscript.Finished || run.Status.State == gptscript.Error {
return true, nil
}

return false, nil
}

func compileFileStatuses(ctx context.Context, client kclient.Client, ws *v1.Workspace, progress <-chan types.Progress) (map[string]types.Item, error) {
input := toStream(progress)
defer input.Close()
scanner := bufio.NewScanner(input)

final := map[string]string{}
final := map[string]types.Item{}

var errs []error
for scanner.Scan() {
Expand All @@ -287,6 +339,7 @@ func compileFileStatuses(ctx context.Context, client kclient.Client, ws *v1.Work
} else if err != nil {
errs = append(errs, fmt.Errorf("failed to get knowledge file: %s", err))
}
final[file.Name] = struct{}{}

if ingestionStatus.Status == "finished" {
delete(final, file.Name)
Expand Down Expand Up @@ -322,6 +375,12 @@ func (a *Handler) CleanupFile(req router.Request, resp router.Response) error {
return err
}

var knowledgeSet v1.KnowledgeSet
if err := req.Get(uncached.Get(&knowledgeSet), kFile.Namespace, ws.Spec.KnowledgeSetName); apierrors.IsNotFound(err) {
// if knowledge set has been deleted already, then we don't need to run delete file
return nil
}

if _, err := a.ingester.DeleteKnowledgeFiles(req.Ctx, kFile.Namespace, filepath.Join(workspace.GetDir(ws.Status.WorkspaceID), kFile.Spec.FileName), ws.Spec.KnowledgeSetName); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/handlers/threads/threads.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func CreateWorkspaces(req router.Request, resp router.Response) error {

func CreateKnowledgeSet(req router.Request, _ router.Response) error {
thread := req.Object.(*v1.Thread)
if len(thread.Status.KnowledgeSetNames) == 0 {
if len(thread.Status.KnowledgeSetNames) == 0 && thread.Labels[invoke.SystemThreadLabel] != "true" {
ws := &v1.KnowledgeSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: req.Namespace,
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/handlers/uploads/remoteknowledgesource.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ func (u *UploadHandler) HandleUploadRun(req router.Request, resp router.Response
// Reset run name to indicate that the run is no longer running
remoteKnowledgeSource.Status.ThreadName = ""
remoteKnowledgeSource.Status.RunName = ""
if run.Status.Error != "" {
remoteKnowledgeSource.Status.Error = run.Status.Error
}

return req.Client.Status().Update(req.Ctx, remoteKnowledgeSource)
}

Expand Down
22 changes: 22 additions & 0 deletions pkg/storage/apis/otto.gptscript.ai/v1/knowledgeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,27 @@ func (in *KnowledgeSet) DeleteRefs() []Ref {
}
}

func (in *KnowledgeSet) Has(field string) bool {
return in.Get(field) != ""
}

func (in *KnowledgeSet) Get(field string) string {
if in == nil {
return ""
}

switch field {
case "spec.agentName":
return in.Spec.AgentName
}

return ""
}

func (*KnowledgeSet) FieldNames() []string {
return []string{"spec.agentName"}
}

// KnowledgeSetManifest should be moved to types once we expose this API
type KnowledgeSetManifest struct {
DataDescription string `json:"dataDescription,omitempty"`
Expand All @@ -44,6 +65,7 @@ type KnowledgeSource struct {

type KnowledgeSetStatus struct {
ObservedIngestionGeneration int64 `json:"observedIngestionGeneration,omitempty"`
IngestionError string `json:"ingestionError,omitempty"`
SuggestedDataDescription string `json:"suggestedDataDescription,omitempty"`
WorkspaceName string `json:"workspaceName,omitempty"`
IsEmpty bool `json:"isEmpty,omitempty"`
Expand Down
21 changes: 11 additions & 10 deletions pkg/storage/apis/otto.gptscript.ai/v1/workspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v1

import (
"github.com/acorn-io/baaah/pkg/fields"
"github.com/otto8-ai/otto8/apiclient/types"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -63,16 +64,16 @@ type WorkspaceSpec struct {
}

type WorkspaceStatus struct {
WorkspaceID string `json:"workspaceID,omitempty"`
IngestionGeneration int64 `json:"ingestionGeneration,omitempty"`
IngestionRunHash string `json:"ingestionRunHash,omitempty"`
IngestionRunName string `json:"ingestionRunName,omitempty"`
IngestionLastRunTime metav1.Time `json:"ingestionLastRunTime,omitempty"`
LastNotFinished map[string]string `json:"lastNotFinished,omitempty"`
NotFinished map[string]string `json:"notFinished,omitempty"`
RetryCount int `json:"retryCount,omitempty"`
PendingApproval []string `json:"pendingApproval,omitempty"`
PendingRejections []string `json:"pendingRejections,omitempty"`
WorkspaceID string `json:"workspaceID,omitempty"`
IngestionGeneration int64 `json:"ingestionGeneration,omitempty"`
IngestionRunHash string `json:"ingestionRunHash,omitempty"`
IngestionRunName string `json:"ingestionRunName,omitempty"`
IngestionLastRunTime metav1.Time `json:"ingestionLastRunTime,omitempty"`
LastNotFinished map[string]types.Item `json:"lastNotFinished,omitempty"`
NotFinished map[string]types.Item `json:"notFinished,omitempty"`
RetryCount int `json:"retryCount,omitempty"`
PendingApproval []string `json:"pendingApproval,omitempty"`
PendingRejections []string `json:"pendingRejections,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
Loading

0 comments on commit 85fd6b4

Please sign in to comment.