Skip to content

Commit

Permalink
fix: don't load entire archived workflow into memory in list APIs (#1…
Browse files Browse the repository at this point in the history
…2912)

Signed-off-by: Jiacheng Xu <[email protected]>
(cherry picked from commit f80b9e8)
  • Loading branch information
jiachengxu authored and Anton Gilgur committed Apr 19, 2024
1 parent fe5c612 commit 200f4d1
Showing 1 changed file with 52 additions and 13 deletions.
65 changes: 52 additions & 13 deletions persist/sqldb/workflow_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
log "github.com/sirupsen/logrus"
"github.com/upper/db/v4"
"google.golang.org/grpc/codes"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
sutils "github.com/argoproj/argo-workflows/v3/server/utils"
Expand All @@ -30,6 +32,9 @@ type archivedWorkflowMetadata struct {
Phase wfv1.WorkflowPhase `db:"phase"`
StartedAt time.Time `db:"startedat"`
FinishedAt time.Time `db:"finishedat"`
Labels string `db:"labels,omitempty"`
Annotations string `db:"annotations,omitempty"`
Progress string `db:"progress,omitempty"`
}

type archivedWorkflowRecord struct {
Expand Down Expand Up @@ -142,7 +147,7 @@ func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error {
}

func (r *workflowArchive) ListWorkflows(namespace string, name string, namePrefix string, minStartedAt, maxStartedAt time.Time, labelRequirements labels.Requirements, limit int, offset int) (wfv1.Workflows, error) {
var archivedWfs []archivedWorkflowRecord
var archivedWfs []archivedWorkflowMetadata

// If we were passed 0 as the limit, then we should load all available archived workflows
// to match the behavior of the `List` operations in the Kubernetes API
Expand All @@ -151,8 +156,13 @@ func (r *workflowArchive) ListWorkflows(namespace string, name string, namePrefi
offset = -1
}

selectQuery, err := selectArchivedWorkflowQuery(r.dbType)
if err != nil {
return nil, err
}

selector := r.session.SQL().
Select("workflow").
Select(selectQuery).
From(archiveTableName).
Where(r.clusterManagedNamespaceAndInstanceID()).
And(namespaceEqual(namespace)).
Expand All @@ -161,7 +171,7 @@ func (r *workflowArchive) ListWorkflows(namespace string, name string, namePrefi
And(startedAtFromClause(minStartedAt)).
And(startedAtToClause(maxStartedAt))

selector, err := labelsClause(selector, r.dbType, labelRequirements)
selector, err = labelsClause(selector, r.dbType, labelRequirements)
if err != nil {
return nil, err
}
Expand All @@ -174,16 +184,35 @@ func (r *workflowArchive) ListWorkflows(namespace string, name string, namePrefi
return nil, err
}

wfs := make(wfv1.Workflows, 0)
for _, archivedWf := range archivedWfs {
wf := wfv1.Workflow{}
err = json.Unmarshal([]byte(archivedWf.Workflow), &wf)
if err != nil {
log.WithFields(log.Fields{"workflowUID": archivedWf.UID, "workflowName": archivedWf.Name}).Errorln("unable to unmarshal workflow from database")
} else {
// For backward compatibility, we should label workflow retrieved from DB as Persisted.
wf.ObjectMeta.Labels[common.LabelKeyWorkflowArchivingStatus] = "Persisted"
wfs = append(wfs, wf)
wfs := make(wfv1.Workflows, len(archivedWfs))
for i, md := range archivedWfs {
labels := make(map[string]string)
if err := json.Unmarshal([]byte(md.Labels), &labels); err != nil {
return nil, err
}
// For backward compatibility, we should label workflow retrieved from DB as Persisted.
labels[common.LabelKeyWorkflowArchivingStatus] = "Persisted"

annotations := make(map[string]string)
if err := json.Unmarshal([]byte(md.Annotations), &annotations); err != nil {
return nil, err
}

wfs[i] = wfv1.Workflow{
ObjectMeta: v1.ObjectMeta{
Name: md.Name,
Namespace: md.Namespace,
UID: types.UID(md.UID),
CreationTimestamp: v1.Time{Time: md.StartedAt},
Labels: labels,
Annotations: annotations,
},
Status: wfv1.WorkflowStatus{
Phase: md.Phase,
StartedAt: v1.Time{Time: md.StartedAt},
FinishedAt: v1.Time{Time: md.FinishedAt},
Progress: wfv1.Progress(md.Progress),
},
}
}
return wfs, nil
Expand Down Expand Up @@ -347,3 +376,13 @@ func (r *workflowArchive) DeleteExpiredWorkflows(ttl time.Duration) error {
log.WithFields(log.Fields{"rowsAffected": rowsAffected}).Info("Deleted archived workflows")
return nil
}

func selectArchivedWorkflowQuery(t dbType) (*db.RawExpr, error) {
switch t {
case MySQL:
return db.Raw("name, namespace, uid, phase, startedat, finishedat, coalesce(workflow->>'$.metadata.labels', '{}') as labels,coalesce(workflow->>'$.metadata.annotations', '{}') as annotations, coalesce(workflow->>'$.status.progress', '') as progress"), nil
case Postgres:
return db.Raw("name, namespace, uid, phase, startedat, finishedat, coalesce((workflow::json)->'metadata'->>'labels', '{}') as labels, coalesce((workflow::json)->'metadata'->>'annotations', '{}') as annotations, coalesce((workflow::json)->'status'->>'progress', '') as progress"), nil
}
return nil, fmt.Errorf("unsupported db type %s", t)
}

0 comments on commit 200f4d1

Please sign in to comment.