diff --git a/CHANGELOG.md b/CHANGELOG.md index 4d5f7c87d..df41d7ca7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - New github action test to make sure target branch has been merged into the source first, so we know histories are ok - Check in the status commands to make sure we're not pulling statuses from nested workspaces - Added `setuptools` as a requirement for python 3.12 to recognize the `pkg_resources` library +- Patch to celery results backend to stop ChordErrors being raised and breaking workflows when a single task fails +- New step return code `$(MERLIN_RAISE_ERROR)` to force an error to be raised by a task (mainly for testing) + - Added description of this to docs +- New test to ensure a single failed task won't break a workflow ### Changed - `merlin info` is cleaner and gives python package info diff --git a/docs/user_guide/variables.md b/docs/user_guide/variables.md index 270d907e3..35478b138 100644 --- a/docs/user_guide/variables.md +++ b/docs/user_guide/variables.md @@ -252,3 +252,4 @@ If necessary, users can raise their own return codes within steps. The table bel |
`$(MERLIN_SOFT_FAIL)`| Mark this step as a failure, note in the warning log but keep executing the workflow. Unknown return codes get translated to soft fails, so that they can be logged. |
echo "Uh-oh, this sample didn't work"exit $(MERLIN_SOFT_FAIL)
|
| `$(MERLIN_HARD_FAIL)`| Something went terribly wrong and we need to stop the whole workflow. Raises a `HardFailException` and stops all workers connected to that step. Workers will stop after a 60 second delay to allow the step to be acknowledged by the server.
Note
Workers in isolated parts of the workflow not consuming from the bad step will continue. you can stop all workers with `$(MERLIN_STOP_WORKERS)`
echo "Oh no, we've created skynet! Abort!"exit $(MERLIN_HARD_FAIL)
|
| `$(MERLIN_STOP_WORKERS)`| Launch a task to stop all active workers. To allow the current task to finish and acknowledge the results to the server, will happen in 60 seconds. |
# send a signal to all workers to stopexit $(MERLIN_STOP_WORKERS)
|
+| `$(MERLIN_RAISE_ERROR)`| Purposefully raise a general exception. *This is intended to be used for testing, you'll likely want to use `$(MERLIN_SOFT_FAIL)` instead.* |
# send a signal to raise an exceptionexit $(MERLIN_RAISE_ERROR)
|
diff --git a/merlin/celery.py b/merlin/celery.py
index d35b0dccd..a1f24bd67 100644
--- a/merlin/celery.py
+++ b/merlin/celery.py
@@ -36,8 +36,10 @@
from typing import Dict, Optional, Union
import billiard
+import celery
import psutil
-from celery import Celery
+from celery import Celery, states
+from celery.backends.redis import RedisBackend # noqa: F401 ; Needed for celery patch
from celery.signals import worker_process_init
import merlin.common.security.encrypt_backend_traffic
@@ -50,6 +52,37 @@
LOG: logging.Logger = logging.getLogger(__name__)
+def patch_celery():
+ """
+ Patch redis backend so that errors in chords don't break workflows.
+ Celery has error callbacks but they do not work properly on chords that
+ are nested within chains.
+
+ Credit to this function goes to: https://danidee10.github.io/2019/07/09/celery-chords.html
+ """
+
+ def _unpack_chord_result(
+ self,
+ tup,
+ decode,
+ EXCEPTION_STATES=states.EXCEPTION_STATES,
+ PROPAGATE_STATES=states.PROPAGATE_STATES,
+ ):
+ _, tid, state, retval = decode(tup)
+
+ if state in EXCEPTION_STATES:
+ retval = self.exception_to_python(retval)
+ if state in PROPAGATE_STATES:
+ # retval is an Exception
+ retval = f"{retval.__class__.__name__}: {str(retval)}"
+
+ return retval
+
+ celery.backends.redis.RedisBackend._unpack_chord_result = _unpack_chord_result
+
+ return celery
+
+
# This function has to have specific args/return values for celery so ignore pylint
def route_for_task(name, args, kwargs, options, task=None, **kw): # pylint: disable=W0613,R1710
"""
@@ -82,7 +115,7 @@ def route_for_task(name, args, kwargs, options, task=None, **kw): # pylint: dis
RESULTS_BACKEND_URI = None
# initialize app with essential properties
-app: Celery = Celery(
+app: Celery = patch_celery().Celery(
"merlin",
broker=BROKER_URI,
backend=RESULTS_BACKEND_URI,
diff --git a/merlin/common/abstracts/enums/__init__.py b/merlin/common/abstracts/enums/__init__.py
index 61fecf7a8..fe70c74c3 100644
--- a/merlin/common/abstracts/enums/__init__.py
+++ b/merlin/common/abstracts/enums/__init__.py
@@ -48,3 +48,4 @@ class ReturnCode(IntEnum):
DRY_OK = 103
RETRY = 104
STOP_WORKERS = 105
+ RAISE_ERROR = 106
diff --git a/merlin/common/tasks.py b/merlin/common/tasks.py
index 980493ae8..4339ac7b0 100644
--- a/merlin/common/tasks.py
+++ b/merlin/common/tasks.py
@@ -41,6 +41,7 @@
from celery import chain, chord, group, shared_task, signature
from celery.exceptions import MaxRetriesExceededError, OperationalError, TimeoutError # pylint: disable=W0622
from filelock import FileLock, Timeout
+from redis.exceptions import TimeoutError as RedisTimeoutError
from merlin.common.abstracts.enums import ReturnCode
from merlin.common.sample_index import uniform_directories
@@ -62,6 +63,7 @@
RetryException,
RestartException,
FileNotFoundError,
+ RedisTimeoutError,
)
LOG = logging.getLogger(__name__)
@@ -181,6 +183,9 @@ def merlin_step(self, *args: Any, **kwargs: Any) -> Optional[ReturnCode]: # noq
shutdown = shutdown_workers.s(None)
shutdown.set(queue=step.get_task_queue())
shutdown.apply_async(countdown=STOP_COUNTDOWN)
+ elif result == ReturnCode.RAISE_ERROR:
+ LOG.warning("*** Raising an error ***")
+ raise Exception("Exception raised by request from the user")
else:
LOG.warning(f"**** Step '{step_name}' in '{step_dir}' had unhandled exit code {result}. Continuing with workflow.")
diff --git a/merlin/spec/expansion.py b/merlin/spec/expansion.py
index a8bf3ac43..03c46893f 100644
--- a/merlin/spec/expansion.py
+++ b/merlin/spec/expansion.py
@@ -57,6 +57,7 @@
"MERLIN_HARD_FAIL",
"MERLIN_RETRY",
"MERLIN_STOP_WORKERS",
+ "MERLIN_RAISE_ERROR",
}
MERLIN_RESERVED = STEP_AWARE | PROVENANCE_REPLACE
RESERVED = MAESTRO_RESERVED | MERLIN_RESERVED
@@ -215,6 +216,7 @@ def parameter_substitutions_for_cmd(glob_path, sample_paths):
substitutions.append(("$(MERLIN_HARD_FAIL)", str(int(ReturnCode.HARD_FAIL))))
substitutions.append(("$(MERLIN_RETRY)", str(int(ReturnCode.RETRY))))
substitutions.append(("$(MERLIN_STOP_WORKERS)", str(int(ReturnCode.STOP_WORKERS))))
+ substitutions.append(("$(MERLIN_RAISE_ERROR)", str(int(ReturnCode.RAISE_ERROR))))
return substitutions
diff --git a/merlin/study/script_adapter.py b/merlin/study/script_adapter.py
index 69dea2b63..5995e7992 100644
--- a/merlin/study/script_adapter.py
+++ b/merlin/study/script_adapter.py
@@ -470,6 +470,8 @@ def submit(self, step, path, cwd, job_map=None, env=None): # pylint: disable=R0
step.restart = False
elif retcode == ReturnCode.STOP_WORKERS:
LOG.debug("Execution returned status STOP_WORKERS")
+ elif retcode == ReturnCode.RAISE_ERROR:
+ LOG.debug("Execution returned status RAISE_ERROR")
else:
LOG.warning(f"Unrecognized Merlin Return code: {retcode}, returning SOFT_FAIL")
submission_record.add_info("retcode", retcode)
diff --git a/tests/integration/conditions.py b/tests/integration/conditions.py
index 535e350de..f85f5ec4a 100644
--- a/tests/integration/conditions.py
+++ b/tests/integration/conditions.py
@@ -307,8 +307,9 @@ class PathExists(Condition):
A condition for checking if a path to a file or directory exists
"""
- def __init__(self, pathname) -> None:
+ def __init__(self, pathname, negate=False) -> None:
self.pathname = pathname
+ self.negate = negate
def path_exists(self) -> bool:
"""Check if a path exists"""
@@ -319,7 +320,7 @@ def __str__(self) -> str:
@property
def passes(self):
- return self.path_exists()
+ return not self.path_exists() if self.negate else self.path_exists()
class FileHasRegex(Condition):
diff --git a/tests/integration/definitions.py b/tests/integration/definitions.py
index 28eac2d0e..829ae66cc 100644
--- a/tests/integration/definitions.py
+++ b/tests/integration/definitions.py
@@ -128,6 +128,7 @@ def define_tests(): # pylint: disable=R0914,R0915
lsf = f"{examples}/lsf/lsf_par.yaml"
mul_workers_demo = f"{dev_examples}/multiple_workers.yaml"
cli_substitution_wf = f"{test_specs}/cli_substitution_test.yaml"
+ chord_err_wf = f"{test_specs}/chord_err.yaml"
# Other shortcuts
black = "black --check --target-version py36"
@@ -827,6 +828,30 @@ def define_tests(): # pylint: disable=R0914,R0915
"run type": "distributed",
},
}
+ distributed_error_checks = {
+ "check chord error continues wf": {
+ "cmds": [
+ f"{workers} {chord_err_wf} --vars OUTPUT_PATH=./{OUTPUT_DIR}",
+ f"{run} {chord_err_wf} --vars OUTPUT_PATH=./{OUTPUT_DIR}; sleep 40; tree {OUTPUT_DIR}",
+ ],
+ "conditions": [
+ HasReturnCode(),
+ PathExists( # Check that the sample that's supposed to raise an error actually raises an error
+ f"{OUTPUT_DIR}/process_samples/01/MERLIN_FINISHED",
+ negate=True,
+ ),
+ StepFileExists( # Check that step 3 is actually started and completes
+ "step_3",
+ "MERLIN_FINISHED",
+ "chord_err",
+ OUTPUT_DIR,
+ ),
+ ],
+ "run type": "distributed",
+ "cleanup": KILL_WORKERS,
+ "num procs": 2,
+ }
+ }
# combine and return test dictionaries
all_tests = {}
@@ -849,6 +874,7 @@ def define_tests(): # pylint: disable=R0914,R0915
stop_workers_tests,
query_workers_tests,
distributed_tests,
+ distributed_error_checks,
]:
all_tests.update(test_dict)
diff --git a/tests/integration/samples_files/samples.csv b/tests/integration/samples_files/samples.csv
new file mode 100644
index 000000000..38ef6a076
--- /dev/null
+++ b/tests/integration/samples_files/samples.csv
@@ -0,0 +1,3 @@
+SUCCESS_1
+RAISE
+SUCCESS_2
diff --git a/tests/integration/test_specs/chord_err.yaml b/tests/integration/test_specs/chord_err.yaml
new file mode 100644
index 000000000..3da99ae03
--- /dev/null
+++ b/tests/integration/test_specs/chord_err.yaml
@@ -0,0 +1,54 @@
+description:
+ name: chord_err
+ description: test the chord err problem
+
+env:
+ variables:
+ OUTPUT_PATH: ./studies
+
+global.parameters:
+ TEST_PARAM:
+ values: [2, 4]
+ label: TEST_PARAM.%%
+
+study:
+- name: process_samples
+ description: Process samples. Purposefully try to trigger the chord err
+ run:
+ cmd: |
+ if [ $(SAMPLE) == "RAISE" ];
+ then
+ exit $(MERLIN_RAISE_ERROR)
+ else
+ echo "Success for sample $(SAMPLE)"
+ fi
+- name: samples_and_params
+ description: step with samples and parameters
+ run:
+ cmd: |
+ echo "sample: $(SAMPLE); param: $(TEST_PARAM)"
+ if [ -f $(process_samples.workspace)/$(MERLIN_SAMPLE_PATH)/MERLIN_FINISHED ];
+ then
+ echo "MERLIN finished file found at $(process_samples.workspace)/$(MERLIN_SAMPLE_PATH)"
+ else
+ echo "MERLIN finished file NOT found at $(process_samples.workspace)/$(MERLIN_SAMPLE_PATH)"
+ fi
+ depends: [process_samples_*]
+- name: step_3
+ description: funnel step
+ run:
+ cmd: |
+ echo "Running step_3"
+ depends: [samples_and_params_*]
+
+merlin:
+ samples:
+ column_labels: [SAMPLE]
+ file: $(MERLIN_INFO)/samples.csv
+ generate:
+ cmd: cp $(SPECROOT)/../samples_files/samples.csv $(MERLIN_INFO)/samples.csv
+ resources:
+ workers:
+ merlin_test_worker:
+ args: -l INFO --concurrency 1 --prefetch-multiplier 1 -Ofair
+ steps: [process_samples, samples_and_params, step_3]