Skip to content

Commit

Permalink
fix: don't necessarily include all artifacts from templates in node o…
Browse files Browse the repository at this point in the history
…utputs (#13066)

Signed-off-by: Julie Vogelman <[email protected]>
Co-authored-by: Anton Gilgur <[email protected]>
(cherry picked from commit 2ca4841)
  • Loading branch information
juliev0 authored and Anton Gilgur committed Jun 17, 2024
1 parent c2204ae commit 10488d6
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 40 deletions.
5 changes: 3 additions & 2 deletions cmd/argoexec/commands/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,17 @@ func waitContainer(ctx context.Context) error {
}

// Saving output artifacts
err = wfExecutor.SaveArtifacts(bgCtx)
artifacts, err := wfExecutor.SaveArtifacts(bgCtx)
if err != nil {
wfExecutor.AddError(err)
}

// Save log artifacts
logArtifacts := wfExecutor.SaveLogs(bgCtx)
artifacts = append(artifacts, logArtifacts...)

// Try to upsert TaskResult. If it fails, we will try to update the Pod's Annotations
err = wfExecutor.ReportOutputs(bgCtx, logArtifacts)
err = wfExecutor.ReportOutputs(bgCtx, artifacts)
if err != nil {
wfExecutor.AddError(err)
}
Expand Down
126 changes: 98 additions & 28 deletions test/e2e/artifacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,52 @@ func (s *ArtifactsSuite) TestGlobalArtifactPassing() {
}

type artifactState struct {
key string
bucketName string
artifactLocation s3Location

deletedAtWFCompletion bool
deletedAtWFDeletion bool
}

type s3Location struct {
bucketName string
// specify one of these two:
specifiedKey string // exact key is known
derivedKey *artifactDerivedKey // exact key needs to be derived
}

type artifactDerivedKey struct {
templateName string
artifactName string
}

func (al *s3Location) getS3Key(wf *wfv1.Workflow) (string, error) {
if al.specifiedKey == "" && al.derivedKey == nil {
panic(fmt.Sprintf("invalid artifactLocation: %+v, must have specifiedKey or derivedKey set", al))
}

if al.specifiedKey != "" {
return al.specifiedKey, nil
}

// get key by finding the node in the Workflow's NodeStatus and looking at its Artifacts

// get node name using template
n := wf.Status.Nodes.Find(func(nodeStatus wfv1.NodeStatus) bool { return nodeStatus.TemplateName == al.derivedKey.templateName })
if n == nil {
return "", fmt.Errorf("no node with template name=%q found in workflow %+v", al.derivedKey.templateName, wf)
}
for _, a := range n.Outputs.Artifacts {
if a.Name == al.derivedKey.artifactName {
if a.S3 == nil {
return "", fmt.Errorf("didn't find expected S3 field in artifact %q: %+v", al.derivedKey.artifactName, a)
}
return a.S3.Key, nil
}
}

return "", fmt.Errorf("artifact named %q not found", al.derivedKey.artifactName)
}

func (s *ArtifactsSuite) TestStoppedWorkflow() {

for _, tt := range []struct {
Expand Down Expand Up @@ -257,79 +297,97 @@ func (s *ArtifactsSuite) TestArtifactGC() {
for _, tt := range []struct {
workflowFile string
hasGC bool
workflowShouldSucceed bool
expectedArtifacts []artifactState
expectedGCPodsOnWFCompletion int
}{
{
workflowFile: "@testdata/artifactgc/artgc-multi-strategy-multi-anno.yaml",
hasGC: true,
workflowShouldSucceed: true,
expectedGCPodsOnWFCompletion: 2,
expectedArtifacts: []artifactState{
artifactState{"first-on-completion-1", "my-bucket-2", true, false},
artifactState{"first-on-completion-2", "my-bucket-3", true, false},
artifactState{"first-no-deletion", "my-bucket-3", false, false},
artifactState{"second-on-deletion", "my-bucket-3", false, true},
artifactState{"second-on-completion", "my-bucket-2", true, false},
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "first-on-completion-1"}, true, false},
artifactState{s3Location{bucketName: "my-bucket-3", specifiedKey: "first-on-completion-2"}, true, false},
artifactState{s3Location{bucketName: "my-bucket-3", specifiedKey: "first-no-deletion"}, false, false},
artifactState{s3Location{bucketName: "my-bucket-3", specifiedKey: "second-on-deletion"}, false, true},
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "second-on-completion"}, true, false},
},
},
// entire Workflow based on a WorkflowTemplate
{
workflowFile: "@testdata/artifactgc/artgc-from-template.yaml",
hasGC: true,
workflowShouldSucceed: true,
expectedGCPodsOnWFCompletion: 1,
expectedArtifacts: []artifactState{
artifactState{"on-completion", "my-bucket-2", true, false},
artifactState{"on-deletion", "my-bucket-2", false, true},
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-completion"}, true, false},
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-deletion"}, false, true},
},
},
// entire Workflow based on a WorkflowTemplate
{
workflowFile: "@testdata/artifactgc/artgc-from-template-2.yaml",
hasGC: true,
workflowShouldSucceed: true,
expectedGCPodsOnWFCompletion: 1,
expectedArtifacts: []artifactState{
artifactState{"on-completion", "my-bucket-2", true, false},
artifactState{"on-deletion", "my-bucket-2", false, true},
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-completion"}, true, false},
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-deletion"}, false, true},
},
},
// Step in Workflow references a WorkflowTemplate's template
{
workflowFile: "@testdata/artifactgc/artgc-step-wf-tmpl.yaml",
hasGC: true,
workflowShouldSucceed: true,
expectedGCPodsOnWFCompletion: 1,
expectedArtifacts: []artifactState{
artifactState{"on-completion", "my-bucket-2", true, false},
artifactState{"on-deletion", "my-bucket-2", false, true},
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-completion"}, true, false},
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-deletion"}, false, true},
},
},
// Step in Workflow references a WorkflowTemplate's template
{
workflowFile: "@testdata/artifactgc/artgc-step-wf-tmpl-2.yaml",
hasGC: true,
workflowShouldSucceed: true,
expectedGCPodsOnWFCompletion: 1,
expectedArtifacts: []artifactState{
artifactState{"on-completion", "my-bucket-2", true, false},
artifactState{"on-deletion", "my-bucket-2", false, false},
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-completion"}, true, false},
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-deletion"}, false, false},
},
},
// entire Workflow based on a WorkflowTemplate which has a Step that references another WorkflowTemplate's template
{
workflowFile: "@testdata/artifactgc/artgc-from-ref-template.yaml",
hasGC: true,
workflowShouldSucceed: true,
expectedGCPodsOnWFCompletion: 1,
expectedArtifacts: []artifactState{
artifactState{"on-completion", "my-bucket-2", true, false},
artifactState{"on-deletion", "my-bucket-2", false, true},
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-completion"}, true, false},
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-deletion"}, false, true},
},
},
// Step in Workflow references a WorkflowTemplate's template
// Workflow defines ArtifactGC but all artifacts override with "Never" so Artifact GC should not be done
{
workflowFile: "@testdata/artifactgc/artgc-step-wf-tmpl-no-gc.yaml",
hasGC: false,
workflowShouldSucceed: true,
expectedGCPodsOnWFCompletion: 0,
expectedArtifacts: []artifactState{},
},
// Workflow fails to write an artifact that's been defined as an Output
{
workflowFile: "@testdata/artifactgc/artgc-artifact-not-written.yaml",
hasGC: true,
workflowShouldSucceed: false, // artifact not being present causes Workflow to fail
expectedGCPodsOnWFCompletion: 0,
expectedArtifacts: []artifactState{
artifactState{s3Location{bucketName: "my-bucket", derivedKey: &artifactDerivedKey{templateName: "artifact-written", artifactName: "present"}}, false, true},
},
},
} {
// for each test make sure that:
// 1. the finalizer gets added
Expand All @@ -352,7 +410,7 @@ func (s *ArtifactsSuite) TestArtifactGC() {
}
})

if when.WorkflowCondition(func(wf *wfv1.Workflow) bool {
if tt.workflowShouldSucceed && when.WorkflowCondition(func(wf *wfv1.Workflow) bool {
return wf.Status.Phase == wfv1.WorkflowFailed || wf.Status.Phase == wfv1.WorkflowError
}) {
fmt.Println("can't reliably verify Artifact GC since workflow failed")
Expand All @@ -365,22 +423,27 @@ func (s *ArtifactsSuite) TestArtifactGC() {
WaitForWorkflow(
fixtures.WorkflowCompletionOkay(true),
fixtures.Condition(func(wf *wfv1.Workflow) (bool, string) {
return len(wf.Status.ArtifactGCStatus.PodsRecouped) >= tt.expectedGCPodsOnWFCompletion,
return (len(wf.Status.ArtifactGCStatus.PodsRecouped) >= tt.expectedGCPodsOnWFCompletion) || (tt.expectedGCPodsOnWFCompletion == 0),
fmt.Sprintf("for all %d pods to have been recouped", tt.expectedGCPodsOnWFCompletion)
}))

then := when.Then()

// verify that the artifacts that should have been deleted at completion time were
for _, expectedArtifact := range tt.expectedArtifacts {
artifactKey, err := expectedArtifact.artifactLocation.getS3Key(when.GetWorkflow())
fmt.Printf("artifact key: %q\n", artifactKey)
if err != nil {
panic(err)
}
if expectedArtifact.deletedAtWFCompletion {
fmt.Printf("verifying artifact %s is deleted at completion time\n", expectedArtifact.key)
then.ExpectArtifactByKey(expectedArtifact.key, expectedArtifact.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) {
fmt.Printf("verifying artifact %s is deleted at completion time\n", artifactKey)
then.ExpectArtifactByKey(artifactKey, expectedArtifact.artifactLocation.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) {
assert.NotNil(t, err)
})
} else {
fmt.Printf("verifying artifact %s is not deleted at completion time\n", expectedArtifact.key)
then.ExpectArtifactByKey(expectedArtifact.key, expectedArtifact.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) {
fmt.Printf("verifying artifact %s is not deleted at completion time\n", artifactKey)
then.ExpectArtifactByKey(artifactKey, expectedArtifact.artifactLocation.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) {
assert.NoError(t, err)
})
}
Expand All @@ -390,25 +453,32 @@ func (s *ArtifactsSuite) TestArtifactGC() {

when.
DeleteWorkflow().
WaitForWorkflowDeletion()
WaitForWorkflowDeletion().
Then().
ExpectWorkflowDeleted()

when = when.RemoveFinalizers(false) // just in case - if the above test failed we need to forcibly remove the finalizer for Artifact GC

then = when.Then()

for _, expectedArtifact := range tt.expectedArtifacts {
artifactKey, err := expectedArtifact.artifactLocation.getS3Key(when.GetWorkflow())
fmt.Printf("artifact key: %q\n", artifactKey)
if err != nil {
panic(err)
}

if expectedArtifact.deletedAtWFCompletion { // already checked this
continue
}
if expectedArtifact.deletedAtWFDeletion {
fmt.Printf("verifying artifact %s is deleted\n", expectedArtifact.key)
then.ExpectArtifactByKey(expectedArtifact.key, expectedArtifact.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) {
fmt.Printf("verifying artifact %s is deleted\n", artifactKey)
then.ExpectArtifactByKey(artifactKey, expectedArtifact.artifactLocation.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) {
assert.NotNil(t, err)
})
} else {
fmt.Printf("verifying artifact %s is not deleted\n", expectedArtifact.key)
then.ExpectArtifactByKey(expectedArtifact.key, expectedArtifact.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) {
fmt.Printf("verifying artifact %s is not deleted\n", artifactKey)
then.ExpectArtifactByKey(artifactKey, expectedArtifact.artifactLocation.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) {
assert.NoError(t, err)
})
}
Expand Down
4 changes: 4 additions & 0 deletions test/e2e/fixtures/when.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (w *When) SubmitWorkflow() *When {
return w
}

func (w *When) GetWorkflow() *wfv1.Workflow {
return w.wf
}

func label(obj metav1.Object) {
labels := obj.GetLabels()
if labels == nil {
Expand Down
43 changes: 43 additions & 0 deletions test/e2e/testdata/artifactgc/artgc-artifact-not-written.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: artgc-artifact-not-written-
spec:
entrypoint: entrypoint
artifactGC:
strategy: OnWorkflowDeletion
podGC:
strategy: ""
templates:
- name: entrypoint
steps:
- - name: artifact-written
template: artifact-written
- - name: artifact-not-written
template: artifact-not-written
- name: artifact-written
container:
image: argoproj/argosay:v2
command:
- sh
- -c
args:
- |
echo "something" > /tmp/present
outputs:
artifacts:
- name: present
path: /tmp/present
- name: artifact-not-written
container:
image: argoproj/argosay:v2
command:
- sh
- -c
args:
- |
echo "intentionally not writing anything to disk"
outputs:
artifacts:
- name: notpresent
path: /tmp/notpresent
19 changes: 10 additions & 9 deletions workflow/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,26 +291,27 @@ func (we *WorkflowExecutor) StageFiles() error {
}

// SaveArtifacts uploads artifacts to the archive location
func (we *WorkflowExecutor) SaveArtifacts(ctx context.Context) error {
func (we *WorkflowExecutor) SaveArtifacts(ctx context.Context) (wfv1.Artifacts, error) {
artifacts := wfv1.Artifacts{}
if len(we.Template.Outputs.Artifacts) == 0 {
log.Infof("No output artifacts")
return nil
return artifacts, nil
}

log.Infof("Saving output artifacts")
err := os.MkdirAll(tempOutArtDir, os.ModePerm)
if err != nil {
return argoerrs.InternalWrapError(err)
return artifacts, argoerrs.InternalWrapError(err)
}

for i, art := range we.Template.Outputs.Artifacts {
for _, art := range we.Template.Outputs.Artifacts {
err := we.saveArtifact(ctx, common.MainContainerName, &art)
if err != nil {
return err
return artifacts, err
}
we.Template.Outputs.Artifacts[i] = art
artifacts = append(artifacts, art)
}
return nil
return artifacts, nil
}

func (we *WorkflowExecutor) saveArtifact(ctx context.Context, containerName string, art *wfv1.Artifact) error {
Expand Down Expand Up @@ -832,9 +833,9 @@ func (we *WorkflowExecutor) InitializeOutput(ctx context.Context) {
}

// ReportOutputs updates the WorkflowTaskResult (or falls back to annotate the Pod)
func (we *WorkflowExecutor) ReportOutputs(ctx context.Context, logArtifacts []wfv1.Artifact) error {
func (we *WorkflowExecutor) ReportOutputs(ctx context.Context, artifacts []wfv1.Artifact) error {
outputs := we.Template.Outputs.DeepCopy()
outputs.Artifacts = append(outputs.Artifacts, logArtifacts...)
outputs.Artifacts = artifacts
return we.reportResult(ctx, wfv1.NodeResult{Outputs: outputs})
}

Expand Down
2 changes: 1 addition & 1 deletion workflow/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func TestSaveArtifacts(t *testing.T) {

for _, tt := range tests {
ctx := context.Background()
err := tt.workflowExecutor.SaveArtifacts(ctx)
_, err := tt.workflowExecutor.SaveArtifacts(ctx)
if err != nil {
assert.Equal(t, tt.expectError, true)
continue
Expand Down

0 comments on commit 10488d6

Please sign in to comment.