Skip to content

Commit

Permalink
fix: Support OOMKilled with container-set. Fixes #10063. FOR 3.4.11 o…
Browse files Browse the repository at this point in the history
…nly (#11757)

Signed-off-by: Alex Collins <[email protected]>
Signed-off-by: Isitha Subasinghe <[email protected]>
Co-authored-by: Alex Collins <[email protected]>
  • Loading branch information
isubasinghe and alexec authored Sep 6, 2023
1 parent e731cc0 commit ee939bb
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 18 deletions.
47 changes: 30 additions & 17 deletions cmd/argoexec/commands/emissary.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,24 +71,42 @@ func NewEmissaryCommand() *cobra.Command {
return fmt.Errorf("failed to unmarshal template: %w", err)
}

// setup signal handlers
signals := make(chan os.Signal, 1)
defer close(signals)
signal.Notify(signals)
defer signal.Reset()

for _, x := range template.ContainerSet.GetGraph() {
if x.Name == containerName {
for _, y := range x.Dependencies {
logger.Infof("waiting for dependency %q", y)
WaitForDependency:
for {
data, err := ioutil.ReadFile(filepath.Clean(varRunArgo + "/ctr/" + y + "/exitcode"))
if os.IsNotExist(err) {
time.Sleep(time.Second)
continue
}
exitCode, err := strconv.Atoi(string(data))
if err != nil {
return fmt.Errorf("failed to read exit-code of dependency %q: %w", y, err)
}
if exitCode != 0 {
return fmt.Errorf("dependency %q exited with non-zero code: %d", y, exitCode)
select {
// If we receive a terminated or killed signal, we should exit immediately.
case s := <-signals:
switch s {
case osspecific.Term:
// exit with 128 + 15 (SIGTERM)
return errors.NewExitErr(143)
case os.Kill:
// exit with 128 + 9 (SIGKILL)
return errors.NewExitErr(137)
}
default:
data, _ := os.ReadFile(filepath.Clean(varRunArgo + "/ctr/" + y + "/exitcode"))
exitCode, err := strconv.Atoi(string(data))
if err != nil {
time.Sleep(time.Second)
continue
}
if exitCode != 0 {
return fmt.Errorf("dependency %q exited with non-zero code: %d", y, exitCode)
}

break WaitForDependency
}
break
}
}
}
Expand Down Expand Up @@ -118,11 +136,6 @@ func NewEmissaryCommand() *cobra.Command {
}

cmdErr := retry.OnError(backoff, func(error) bool { return true }, func() error {
// setup signal handlers
signals := make(chan os.Signal, 1)
defer close(signals)
signal.Notify(signals)
defer signal.Reset()

command, closer, err := startCommand(name, args, template)
if err != nil {
Expand Down
23 changes: 23 additions & 0 deletions test/e2e/signals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,29 @@ func (s *SignalsSuite) TestSignaled() {
})
}

func (s *SignalsSuite) TestSignaledContainerSet() {
s.Given().
Workflow("@testdata/signaled-container-set-workflow.yaml").
When().
SubmitWorkflow().
WaitForWorkflow().
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowFailed, status.Phase)
assert.Equal(t, "OOMKilled (exit code 137)", status.Message)
one := status.Nodes.FindByDisplayName("one")
if assert.NotNil(t, one) {
assert.Equal(t, wfv1.NodeFailed, one.Phase)
assert.Equal(t, "OOMKilled (exit code 137): ", one.Message)
}
two := status.Nodes.FindByDisplayName("two")
if assert.NotNil(t, two) {
assert.Equal(t, wfv1.NodeFailed, two.Phase)
assert.Equal(t, "Error (exit code 143): ", two.Message)
}
})
}

func TestSignalsSuite(t *testing.T) {
suite.Run(t, new(SignalsSuite))
}
50 changes: 50 additions & 0 deletions test/e2e/testdata/signaled-container-set-workflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: example
namespace: argo
spec:
templates:
- name: entrypoint
metadata:
containerSet:
containers:
- name: one
resources:
requests:
memory: "50Mi"
cpu: "50m"
limits:
memory: "50Mi"
image: argoproj/argosay:v2
command:
- bash
- '-c'
args:
- |
/bin/bash <<'EOF'
echo "hello one"
apt update -y
apt install stress -y
echo 'stress --vm 1 --vm-bytes 512M --vm-hang 100' > abc.sh
bash abc.sh
EOF
- name: two
resources:
requests:
memory: "150Mi"
cpu: "50m"
limits:
memory: "250Mi"
image: argoproj/argosay:v2
command:
- bash
- '-c'
args:
- |
/bin/bash <<'EOF'
echo "hello world"
EOF
dependencies:
- one
entrypoint: entrypoint
10 changes: 9 additions & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1421,8 +1421,16 @@ func getExitCode(pod *apiv1.Pod) *int32 {
}

func podHasContainerNeedingTermination(pod *apiv1.Pod, tmpl wfv1.Template) bool {
// pod needs to be terminated if any of the following are true:
// 1. any main container has exited with non-zero exit code
// 2. all main containers have exited
// pod termination will cause the wait container to finish
for _, c := range pod.Status.ContainerStatuses {
if tmpl.IsMainContainerName(c.Name) && c.State.Terminated != nil && c.State.Terminated.ExitCode != 0 {
return true
}
}
for _, c := range pod.Status.ContainerStatuses {
// Only clean up pod when all main containers are terminated
if tmpl.IsMainContainerName(c.Name) && c.State.Terminated == nil {
return false
}
Expand Down
4 changes: 4 additions & 0 deletions workflow/executor/os-specific/signal_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import (
"github.com/argoproj/argo-workflows/v3/util/errors"
)

var (
Term = syscall.SIGTERM
)

func CanIgnoreSignal(s os.Signal) bool {
return s == syscall.SIGCHLD || s == syscall.SIGURG
}
Expand Down
4 changes: 4 additions & 0 deletions workflow/executor/os-specific/signal_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"github.com/argoproj/argo-workflows/v3/util/errors"
)

var (
Term = os.Interrupt
)

func CanIgnoreSignal(s os.Signal) bool {
return false
}
Expand Down

0 comments on commit ee939bb

Please sign in to comment.