Skip to content

Commit

Permalink
fix: further optimize archive workflow listing. Fixes #13601 (#13819)
Browse files Browse the repository at this point in the history
Signed-off-by: Mason Malone <[email protected]>
  • Loading branch information
MasonM authored Dec 19, 2024
1 parent 7d7ebb1 commit abd14b7
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 45 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ E2E_PARALLEL ?= 20
E2E_SUITE_TIMEOUT ?= 15m
GOTEST ?= go test -v -p 20
ALL_BUILD_TAGS ?= api,cli,cron,executor,examples,corefunctional,functional,plugins
BENCHMARK_COUNT ?= 6

# should we build the static files?
ifneq (,$(filter $(MAKECMDGOALS),codegen lint test docs start))
Expand Down Expand Up @@ -619,7 +620,7 @@ Test%:
E2E_WAIT_TIMEOUT=$(E2E_WAIT_TIMEOUT) go test -failfast -v -timeout $(E2E_SUITE_TIMEOUT) -count 1 --tags $(ALL_BUILD_TAGS) -parallel $(E2E_PARALLEL) ./test/e2e -run='.*/$*'

Benchmark%:
go test --tags $(ALL_BUILD_TAGS) ./test/e2e -run='$@' -benchmem -bench '$@' .
go test --tags $(ALL_BUILD_TAGS) ./test/e2e -run='$@' -benchmem -count=$(BENCHMARK_COUNT) -bench .

# clean

Expand Down
6 changes: 6 additions & 0 deletions persist/sqldb/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,12 @@ func (m migrate) Exec(ctx context.Context) (err error) {
noop{},
ansiSQLChange(`alter table argo_archived_workflows alter column workflow set data type jsonb using workflow::jsonb`),
),
// change argo_archived_workflows_i4 index to include clustername so MySQL uses it for listing archived workflows. #13601
ternary(dbType == MySQL,
ansiSQLChange(`drop index argo_archived_workflows_i4 on argo_archived_workflows`),
ansiSQLChange(`drop index argo_archived_workflows_i4`),
),
ansiSQLChange(`create index argo_archived_workflows_i4 on argo_archived_workflows (clustername, startedat)`),
} {
err := m.applyChange(changeSchemaVersion, change)
if err != nil {
Expand Down
101 changes: 57 additions & 44 deletions persist/sqldb/workflow_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,46 +157,69 @@ func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error {

func (r *workflowArchive) ListWorkflows(options sutils.ListOptions) (wfv1.Workflows, error) {
var archivedWfs []archivedWorkflowMetadata
var baseSelector = r.session.SQL().Select("name", "namespace", "uid", "phase", "startedat", "finishedat")

selectQuery, err := selectArchivedWorkflowQuery(r.dbType)
if err != nil {
return nil, err
}
switch r.dbType {
case MySQL:
selectQuery := baseSelector.
Columns(
db.Raw("coalesce(workflow->'$.metadata.labels', '{}') as labels"),
db.Raw("coalesce(workflow->'$.metadata.annotations', '{}') as annotations"),
db.Raw("coalesce(workflow->>'$.status.progress', '') as progress"),
db.Raw("coalesce(workflow->>'$.metadata.creationTimestamp', '') as creationtimestamp"),
db.Raw("workflow->>'$.spec.suspend'"),
db.Raw("coalesce(workflow->>'$.status.message', '') as message"),
db.Raw("coalesce(workflow->>'$.status.estimatedDuration', '0') as estimatedduration"),
db.Raw("coalesce(workflow->'$.status.resourcesDuration', '{}') as resourcesduration"),
).
From(archiveTableName).
Where(r.clusterManagedNamespaceAndInstanceID())

subSelector := r.session.SQL().
Select(db.Raw("uid")).
From(archiveTableName).
Where(r.clusterManagedNamespaceAndInstanceID())
selectQuery, err := BuildArchivedWorkflowSelector(selectQuery, archiveTableName, archiveLabelsTableName, r.dbType, options, false)
if err != nil {
return nil, err
}

subSelector, err = BuildArchivedWorkflowSelector(subSelector, archiveTableName, archiveLabelsTableName, r.dbType, options, false)
if err != nil {
return nil, err
}
err = selectQuery.All(&archivedWfs)
if err != nil {
return nil, err
}
case Postgres:
// Use a common table expression to reduce detoast overhead for the "workflow" column:
// https://github.com/argoproj/argo-workflows/issues/13601#issuecomment-2420499551
cteSelector := baseSelector.
Columns(
db.Raw("coalesce(workflow->'metadata', '{}') as metadata"),
db.Raw("coalesce(workflow->'status', '{}') as status"),
db.Raw("workflow->'spec'->>'suspend' as suspend"),
).
From(archiveTableName).
Where(r.clusterManagedNamespaceAndInstanceID())

if r.dbType == MySQL {
// workaround for mysql 42000 error (Unsupported subquery syntax):
//
// Error 1235 (42000): This version of MySQL doesn't yet support 'LIMIT \u0026 IN/ALL/ANY/SOME subquery'
//
// more context:
// * https://dev.mysql.com/doc/refman/8.0/en/subquery-errors.html
// * https://dev.to/gkoniaris/limit-mysql-subquery-results-inside-a-where-in-clause-using-laravel-s-eloquent-orm-26en
subSelector = r.session.SQL().Select(db.Raw("*")).From(subSelector).As("x")
}
cteSelector, err := BuildArchivedWorkflowSelector(cteSelector, archiveTableName, archiveLabelsTableName, r.dbType, options, false)
if err != nil {
return nil, err
}

// why a subquery? the json unmarshal triggers for every row in the filter
// query. by filtering on uid first, we delay json parsing until a single
// row, speeding up the query(e.g. up to 257 times faster for some
// deployments).
//
// more context: https://github.com/argoproj/argo-workflows/pull/13566
selector := r.session.SQL().Select(selectQuery).From(archiveTableName).Where(
r.clusterManagedNamespaceAndInstanceID().And(db.Cond{"uid IN": subSelector}),
)
selectQuery := baseSelector.Columns(
db.Raw("coalesce(metadata->>'labels', '{}') as labels"),
db.Raw("coalesce(metadata->>'annotations', '{}') as annotations"),
db.Raw("coalesce(status->>'progress', '') as progress"),
db.Raw("coalesce(metadata->>'creationTimestamp', '') as creationtimestamp"),
"suspend",
db.Raw("coalesce(status->>'message', '') as message"),
db.Raw("coalesce(status->>'estimatedDuration', '0') as estimatedduration"),
db.Raw("coalesce(status->>'resourcesDuration', '{}') as resourcesduration"),
)

err = selector.All(&archivedWfs)
if err != nil {
return nil, err
err = r.session.SQL().
Iterator("WITH workflows AS ? ?", cteSelector, selectQuery.From("workflows")).
All(&archivedWfs)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unsupported db type %s", r.dbType)
}

wfs := make(wfv1.Workflows, len(archivedWfs))
Expand Down Expand Up @@ -446,13 +469,3 @@ func (r *workflowArchive) DeleteExpiredWorkflows(ttl time.Duration) error {
log.WithFields(log.Fields{"rowsAffected": rowsAffected}).Info("Deleted archived workflows")
return nil
}

func selectArchivedWorkflowQuery(t dbType) (*db.RawExpr, error) {
switch t {
case MySQL:
return db.Raw("name, namespace, uid, phase, startedat, finishedat, coalesce(JSON_EXTRACT(workflow,'$.metadata.labels'), '{}') as labels,coalesce(JSON_EXTRACT(workflow,'$.metadata.annotations'), '{}') as annotations, coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.status.progress')), '') as progress, coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.metadata.creationTimestamp')), '') as creationtimestamp, JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.spec.suspend')) as suspend, coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.status.message')), '') as message, coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.status.estimatedDuration')), '0') as estimatedduration, coalesce(JSON_EXTRACT(workflow,'$.status.resourcesDuration'), '{}') as resourcesduration"), nil
case Postgres:
return db.Raw("name, namespace, uid, phase, startedat, finishedat, coalesce(workflow->'metadata'->>'labels', '{}') as labels, coalesce(workflow->'metadata'->>'annotations', '{}') as annotations, coalesce(workflow->'status'->>'progress', '') as progress, coalesce(workflow->'metadata'->>'creationTimestamp', '') as creationtimestamp, workflow->'spec'->>'suspend' as suspend, coalesce(workflow->'status'->>'message', '') as message, coalesce(workflow->'status'->>'estimatedDuration', '0') as estimatedduration, coalesce(workflow->'status'->>'resourcesDuration', '{}') as resourcesduration"), nil
}
return nil, fmt.Errorf("unsupported db type %s", t)
}

0 comments on commit abd14b7

Please sign in to comment.