Skip to content

Commit

Permalink
bugfix/celery-chord-error (#481)
Browse files Browse the repository at this point in the history
* remove a merge conflict statement that was missed

* add celery results backend patch to stop ChordErrors

* add MERLIN_RAISE_ERROR return code

* add tests to ensure chord error isn't raised

* add RAISE_ERROR to docs

* update CHANGELOG

* fix lint issues

* up the sleep time on the chord error test

* add new steps to the chord err test spec

* add tree statement to the new test for debugging

* upping sleep time to see if that fixes github action for python 3.7

* change sleep time for new test based on python version

* run fix style

* remove specific sleep time for diff python versions
  • Loading branch information
bgunnar5 authored Jun 5, 2024
1 parent 297d9d5 commit 4b4fdee
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 4 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- New github action test to make sure target branch has been merged into the source first, so we know histories are ok
- Check in the status commands to make sure we're not pulling statuses from nested workspaces
- Added `setuptools` as a requirement for python 3.12 to recognize the `pkg_resources` library
- Patch to celery results backend to stop ChordErrors being raised and breaking workflows when a single task fails
- New step return code `$(MERLIN_RAISE_ERROR)` to force an error to be raised by a task (mainly for testing)
- Added description of this to docs
- New test to ensure a single failed task won't break a workflow

### Changed
- `merlin info` is cleaner and gives python package info
Expand Down
1 change: 1 addition & 0 deletions docs/user_guide/variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,3 +252,4 @@ If necessary, users can raise their own return codes within steps. The table bel
| <pre>`$(MERLIN_SOFT_FAIL)`</pre> | Mark this step as a failure, note in the warning log but keep executing the workflow. Unknown return codes get translated to soft fails, so that they can be logged. | <pre><code><span>echo "Uh-oh, this sample didn't work"</span></br><span>exit $(MERLIN_SOFT_FAIL)</span></code></pre> |
| <pre>`$(MERLIN_HARD_FAIL)`</pre> | Something went terribly wrong and we need to stop the whole workflow. Raises a `HardFailException` and stops all workers connected to that step. Workers will stop after a 60 second delay to allow the step to be acknowledged by the server. <div class="admonition note"><p class="admonition-title">Note</p><p>Workers in isolated parts of the workflow not consuming from the bad step will continue. you can stop all workers with `$(MERLIN_STOP_WORKERS)`</p></div> | <pre><code><span>echo "Oh no, we've created skynet! Abort!"</span></br><span>exit $(MERLIN_HARD_FAIL)</span></code></pre> |
| <pre>`$(MERLIN_STOP_WORKERS)`</pre> | Launch a task to stop all active workers. To allow the current task to finish and acknowledge the results to the server, will happen in 60 seconds. | <pre><code><span># send a signal to all workers to stop</span></br><span>exit $(MERLIN_STOP_WORKERS)</span></code></pre> |
| <pre>`$(MERLIN_RAISE_ERROR)`</pre> | Purposefully raise a general exception. *This is intended to be used for testing, you'll likely want to use `$(MERLIN_SOFT_FAIL)` instead.* | <pre><code><span># send a signal to raise an exception</span></br><span>exit $(MERLIN_RAISE_ERROR)</span></code></pre> |
37 changes: 35 additions & 2 deletions merlin/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@
from typing import Dict, Optional, Union

import billiard
import celery
import psutil
from celery import Celery
from celery import Celery, states
from celery.backends.redis import RedisBackend # noqa: F401 ; Needed for celery patch
from celery.signals import worker_process_init

import merlin.common.security.encrypt_backend_traffic
Expand All @@ -50,6 +52,37 @@
LOG: logging.Logger = logging.getLogger(__name__)


def patch_celery():
"""
Patch redis backend so that errors in chords don't break workflows.
Celery has error callbacks but they do not work properly on chords that
are nested within chains.
Credit to this function goes to: https://danidee10.github.io/2019/07/09/celery-chords.html
"""

def _unpack_chord_result(
self,
tup,
decode,
EXCEPTION_STATES=states.EXCEPTION_STATES,
PROPAGATE_STATES=states.PROPAGATE_STATES,
):
_, tid, state, retval = decode(tup)

if state in EXCEPTION_STATES:
retval = self.exception_to_python(retval)
if state in PROPAGATE_STATES:
# retval is an Exception
retval = f"{retval.__class__.__name__}: {str(retval)}"

return retval

celery.backends.redis.RedisBackend._unpack_chord_result = _unpack_chord_result

return celery


# This function has to have specific args/return values for celery so ignore pylint
def route_for_task(name, args, kwargs, options, task=None, **kw): # pylint: disable=W0613,R1710
"""
Expand Down Expand Up @@ -82,7 +115,7 @@ def route_for_task(name, args, kwargs, options, task=None, **kw): # pylint: dis
RESULTS_BACKEND_URI = None

# initialize app with essential properties
app: Celery = Celery(
app: Celery = patch_celery().Celery(
"merlin",
broker=BROKER_URI,
backend=RESULTS_BACKEND_URI,
Expand Down
1 change: 1 addition & 0 deletions merlin/common/abstracts/enums/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@ class ReturnCode(IntEnum):
DRY_OK = 103
RETRY = 104
STOP_WORKERS = 105
RAISE_ERROR = 106
5 changes: 5 additions & 0 deletions merlin/common/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from celery import chain, chord, group, shared_task, signature
from celery.exceptions import MaxRetriesExceededError, OperationalError, TimeoutError # pylint: disable=W0622
from filelock import FileLock, Timeout
from redis.exceptions import TimeoutError as RedisTimeoutError

from merlin.common.abstracts.enums import ReturnCode
from merlin.common.sample_index import uniform_directories
Expand All @@ -62,6 +63,7 @@
RetryException,
RestartException,
FileNotFoundError,
RedisTimeoutError,
)

LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -181,6 +183,9 @@ def merlin_step(self, *args: Any, **kwargs: Any) -> Optional[ReturnCode]: # noq
shutdown = shutdown_workers.s(None)
shutdown.set(queue=step.get_task_queue())
shutdown.apply_async(countdown=STOP_COUNTDOWN)
elif result == ReturnCode.RAISE_ERROR:
LOG.warning("*** Raising an error ***")
raise Exception("Exception raised by request from the user")
else:
LOG.warning(f"**** Step '{step_name}' in '{step_dir}' had unhandled exit code {result}. Continuing with workflow.")

Expand Down
2 changes: 2 additions & 0 deletions merlin/spec/expansion.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"MERLIN_HARD_FAIL",
"MERLIN_RETRY",
"MERLIN_STOP_WORKERS",
"MERLIN_RAISE_ERROR",
}
MERLIN_RESERVED = STEP_AWARE | PROVENANCE_REPLACE
RESERVED = MAESTRO_RESERVED | MERLIN_RESERVED
Expand Down Expand Up @@ -215,6 +216,7 @@ def parameter_substitutions_for_cmd(glob_path, sample_paths):
substitutions.append(("$(MERLIN_HARD_FAIL)", str(int(ReturnCode.HARD_FAIL))))
substitutions.append(("$(MERLIN_RETRY)", str(int(ReturnCode.RETRY))))
substitutions.append(("$(MERLIN_STOP_WORKERS)", str(int(ReturnCode.STOP_WORKERS))))
substitutions.append(("$(MERLIN_RAISE_ERROR)", str(int(ReturnCode.RAISE_ERROR))))
return substitutions


Expand Down
2 changes: 2 additions & 0 deletions merlin/study/script_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,8 @@ def submit(self, step, path, cwd, job_map=None, env=None): # pylint: disable=R0
step.restart = False
elif retcode == ReturnCode.STOP_WORKERS:
LOG.debug("Execution returned status STOP_WORKERS")
elif retcode == ReturnCode.RAISE_ERROR:
LOG.debug("Execution returned status RAISE_ERROR")
else:
LOG.warning(f"Unrecognized Merlin Return code: {retcode}, returning SOFT_FAIL")
submission_record.add_info("retcode", retcode)
Expand Down
5 changes: 3 additions & 2 deletions tests/integration/conditions.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,9 @@ class PathExists(Condition):
A condition for checking if a path to a file or directory exists
"""

def __init__(self, pathname) -> None:
def __init__(self, pathname, negate=False) -> None:
self.pathname = pathname
self.negate = negate

def path_exists(self) -> bool:
"""Check if a path exists"""
Expand All @@ -319,7 +320,7 @@ def __str__(self) -> str:

@property
def passes(self):
return self.path_exists()
return not self.path_exists() if self.negate else self.path_exists()


class FileHasRegex(Condition):
Expand Down
26 changes: 26 additions & 0 deletions tests/integration/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def define_tests(): # pylint: disable=R0914,R0915
lsf = f"{examples}/lsf/lsf_par.yaml"
mul_workers_demo = f"{dev_examples}/multiple_workers.yaml"
cli_substitution_wf = f"{test_specs}/cli_substitution_test.yaml"
chord_err_wf = f"{test_specs}/chord_err.yaml"

# Other shortcuts
black = "black --check --target-version py36"
Expand Down Expand Up @@ -827,6 +828,30 @@ def define_tests(): # pylint: disable=R0914,R0915
"run type": "distributed",
},
}
distributed_error_checks = {
"check chord error continues wf": {
"cmds": [
f"{workers} {chord_err_wf} --vars OUTPUT_PATH=./{OUTPUT_DIR}",
f"{run} {chord_err_wf} --vars OUTPUT_PATH=./{OUTPUT_DIR}; sleep 40; tree {OUTPUT_DIR}",
],
"conditions": [
HasReturnCode(),
PathExists( # Check that the sample that's supposed to raise an error actually raises an error
f"{OUTPUT_DIR}/process_samples/01/MERLIN_FINISHED",
negate=True,
),
StepFileExists( # Check that step 3 is actually started and completes
"step_3",
"MERLIN_FINISHED",
"chord_err",
OUTPUT_DIR,
),
],
"run type": "distributed",
"cleanup": KILL_WORKERS,
"num procs": 2,
}
}

# combine and return test dictionaries
all_tests = {}
Expand All @@ -849,6 +874,7 @@ def define_tests(): # pylint: disable=R0914,R0915
stop_workers_tests,
query_workers_tests,
distributed_tests,
distributed_error_checks,
]:
all_tests.update(test_dict)

Expand Down
3 changes: 3 additions & 0 deletions tests/integration/samples_files/samples.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
SUCCESS_1
RAISE
SUCCESS_2
54 changes: 54 additions & 0 deletions tests/integration/test_specs/chord_err.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
description:
name: chord_err
description: test the chord err problem

env:
variables:
OUTPUT_PATH: ./studies

global.parameters:
TEST_PARAM:
values: [2, 4]
label: TEST_PARAM.%%

study:
- name: process_samples
description: Process samples. Purposefully try to trigger the chord err
run:
cmd: |
if [ $(SAMPLE) == "RAISE" ];
then
exit $(MERLIN_RAISE_ERROR)
else
echo "Success for sample $(SAMPLE)"
fi
- name: samples_and_params
description: step with samples and parameters
run:
cmd: |
echo "sample: $(SAMPLE); param: $(TEST_PARAM)"
if [ -f $(process_samples.workspace)/$(MERLIN_SAMPLE_PATH)/MERLIN_FINISHED ];
then
echo "MERLIN finished file found at $(process_samples.workspace)/$(MERLIN_SAMPLE_PATH)"
else
echo "MERLIN finished file NOT found at $(process_samples.workspace)/$(MERLIN_SAMPLE_PATH)"
fi
depends: [process_samples_*]
- name: step_3
description: funnel step
run:
cmd: |
echo "Running step_3"
depends: [samples_and_params_*]

merlin:
samples:
column_labels: [SAMPLE]
file: $(MERLIN_INFO)/samples.csv
generate:
cmd: cp $(SPECROOT)/../samples_files/samples.csv $(MERLIN_INFO)/samples.csv
resources:
workers:
merlin_test_worker:
args: -l INFO --concurrency 1 --prefetch-multiplier 1 -Ofair
steps: [process_samples, samples_and_params, step_3]

0 comments on commit 4b4fdee

Please sign in to comment.