From d8338514a2b3c9cbe6c13073631ac6a605115af0 Mon Sep 17 00:00:00 2001 From: Eivind Jahren Date: Thu, 9 Jan 2025 09:34:29 +0100 Subject: [PATCH] Reorganize workflow tests --- tests/ert/unit_tests/config/test_workflow.py | 192 +++++++++++++- .../unit_tests/config/test_workflow_jobs.py | 16 ++ .../workflow_runner/test_workflow.py | 245 ------------------ .../workflow_runner/test_workflow_job.py | 140 ---------- .../workflow_runner/test_workflow_runner.py | 149 ++++++++++- 5 files changed, 354 insertions(+), 388 deletions(-) delete mode 100644 tests/ert/unit_tests/workflow_runner/test_workflow.py delete mode 100644 tests/ert/unit_tests/workflow_runner/test_workflow_job.py diff --git a/tests/ert/unit_tests/config/test_workflow.py b/tests/ert/unit_tests/config/test_workflow.py index af94213bd8f..c0155b3eefe 100644 --- a/tests/ert/unit_tests/config/test_workflow.py +++ b/tests/ert/unit_tests/config/test_workflow.py @@ -1,14 +1,202 @@ import os +from contextlib import ExitStack as does_not_raise import pytest +from hypothesis import given, strategies -from ert.config import ConfigValidationError, Workflow +from ert.config import ConfigValidationError, Workflow, WorkflowJob +from ert.substitutions import Substitutions @pytest.mark.usefixtures("use_tmpdir") def test_reading_non_existent_workflow_raises_config_error(): with pytest.raises(ConfigValidationError, match="No such file or directory"): - Workflow.from_file("/tmp/does_not_exist", None, {}) + Workflow.from_file("does_not_exist", None, {}) os.mkdir("is_a_directory") with pytest.raises(ConfigValidationError, match="Is a directory"): Workflow.from_file("is_a_directory", None, {}) + + +@pytest.mark.usefixtures("use_tmpdir") +def test_that_failure_in_parsing_workflow_gives_config_validation_error(): + with open("workflow", "w", encoding="utf-8") as f: + f.write("DEFINE\n") + with pytest.raises( + ConfigValidationError, match=r"DEFINE must have .* arguments" + ) as err: + _ = Workflow.from_file("workflow", None, {}) + assert os.path.abspath(err.value.errors[0].filename) == os.path.abspath("workflow") + + +@pytest.mark.usefixtures("use_tmpdir") +def test_that_substitution_happens_in_workflow(): + with open("workflow", "w", encoding="utf-8") as f: + f.write("JOB \n") + substlist = Substitutions() + substlist[""] = "a" + substlist[""] = "b" + job = WorkflowJob( + name="JOB", + internal=False, + min_args=None, + max_args=None, + arg_types=[], + executable="echo", + script=None, + ) + wf = Workflow.from_file( + "workflow", + substlist, + {"JOB": job}, + ) + assert wf.cmd_list == [(job, ["a", "b"])] + + +def get_workflow_job(name): + return WorkflowJob( + name=name, + internal=False, + min_args=None, + max_args=None, + arg_types=[], + executable=None, + script=None, + ) + + +@pytest.mark.usefixtures("use_tmpdir") +@given( + strategies.lists( + strategies.sampled_from( + [ + "foo", + "bar", + "baz", + ] + ), + min_size=1, + max_size=20, + ) +) +def test_that_multiple_workflow_jobs_are_ordered_correctly(order): + with open("workflow", "w", encoding="utf-8") as f: + f.write("\n".join(order)) + + foo = get_workflow_job("foo") + bar = get_workflow_job("bar") + baz = get_workflow_job("baz") + + wf = Workflow.from_file( + src_file="workflow", + context=None, + job_dict={ + "foo": foo, + "bar": bar, + "baz": baz, + }, + ) + + assert [x[0].name for x in wf.cmd_list] == order + + +@pytest.mark.usefixtures("use_tmpdir") +def test_that_multiple_workflow_jobs_with_redefines_are_ordered_correctly(): + with open("workflow", "w", encoding="utf-8") as f: + f.write( + "\n".join( + [ + "DEFINE 1", + "foo ", + "bar ", + "DEFINE 3", + "foo ", + "baz ", + ] + ) + ) + + foo = get_workflow_job("foo") + bar = get_workflow_job("bar") + baz = get_workflow_job("baz") + + wf = Workflow.from_file( + src_file="workflow", + context=None, + job_dict={ + "foo": foo, + "bar": bar, + "baz": baz, + }, + ) + + commands = [(name, args[0]) for (name, args) in wf.cmd_list] + + assert commands == [(foo, "1"), (bar, "1"), (foo, "3"), (baz, "3")] + + +@pytest.mark.usefixtures("use_tmpdir") +def test_that_unknown_jobs_gives_error(): + with open("workflow", "w", encoding="utf-8") as f: + f.write( + "\n".join( + [ + "boo ", + "kingboo ", + ] + ) + ) + + with pytest.raises( + ConfigValidationError, match="Job with name: kingboo is not recognized" + ): + Workflow.from_file( + src_file="workflow", + context=None, + job_dict={"boo": get_workflow_job("boo")}, + ) + + +@pytest.mark.usefixtures("use_tmpdir") +@pytest.mark.parametrize( + "config, expectation", + [ + ( + "WORKFLOW", + pytest.raises(ConfigValidationError, match="not have enough arguments"), + ), + ( + "WORKFLOW arg_1", + does_not_raise(), + ), + ( + "WORKFLOW arg_1 arg_2", + does_not_raise(), + ), + ( + "WORKFLOW arg_1 arg_2 arg_3", + pytest.raises(ConfigValidationError, match="too many arguments"), + ), + ], +) +@pytest.mark.parametrize("min_args, max_args", [(1, 2), (None, None)]) +def test_args_validation(config, expectation, min_args, max_args): + with open("workflow", "w", encoding="utf-8") as f: + f.write(config) + if min_args is None and max_args is None: + expectation = does_not_raise() + with expectation: + Workflow.from_file( + src_file="workflow", + context=None, + job_dict={ + "WORKFLOW": WorkflowJob( + name="WORKFLOW", + internal=False, + min_args=min_args, + max_args=max_args, + arg_types=[], + executable=None, + script=None, + ), + }, + ) diff --git a/tests/ert/unit_tests/config/test_workflow_jobs.py b/tests/ert/unit_tests/config/test_workflow_jobs.py index ec455449d2f..02f5ff41e01 100644 --- a/tests/ert/unit_tests/config/test_workflow_jobs.py +++ b/tests/ert/unit_tests/config/test_workflow_jobs.py @@ -40,3 +40,19 @@ def test_that_ert_warns_on_duplicate_workflow_jobs(tmp_path): ErtPluginContext(), ): _ = ErtConfig.from_file(test_config_file_name) + + +@pytest.mark.usefixtures("use_tmpdir") +def test_stop_on_fail_is_parsed_external(): + with open("fail_job", "w+", encoding="utf-8") as f: + f.write("INTERNAL False\n") + f.write("EXECUTABLE echo\n") + f.write("MIN_ARG 1\n") + f.write("STOP_ON_FAIL True\n") + + job_internal = WorkflowJob.from_file( + name="FAIL", + config_file="fail_job", + ) + + assert job_internal.stop_on_fail diff --git a/tests/ert/unit_tests/workflow_runner/test_workflow.py b/tests/ert/unit_tests/workflow_runner/test_workflow.py deleted file mode 100644 index 2602ca62545..00000000000 --- a/tests/ert/unit_tests/workflow_runner/test_workflow.py +++ /dev/null @@ -1,245 +0,0 @@ -import os -from contextlib import ExitStack as does_not_raise - -import pytest -from hypothesis import given, strategies - -from ert.config import ConfigValidationError, Workflow, WorkflowJob -from ert.substitutions import Substitutions -from ert.workflow_runner import WorkflowRunner - -from .workflow_common import WorkflowCommon - - -def get_workflow_job(name): - return WorkflowJob( - name=name, - internal=False, - min_args=None, - max_args=None, - arg_types=[], - executable=None, - script=None, - ) - - -@pytest.mark.usefixtures("use_tmpdir") -def test_workflow(): - WorkflowCommon.createExternalDumpJob() - - dump_job = WorkflowJob.from_file("dump_job", name="DUMP") - - with pytest.raises(ConfigValidationError, match="No such file or directory"): - _ = WorkflowJob.from_file("knock_job", name="KNOCK") - - workflow = Workflow.from_file("dump_workflow", None, {"DUMP": dump_job}) - - assert len(workflow) == 2 - - job, args = workflow[0] - assert args[0] == "dump1" - assert args[1] == "dump_text_1" - - job, args = workflow[1] - assert job.name == "DUMP" - - -@pytest.mark.usefixtures("use_tmpdir") -def test_workflow_run(): - WorkflowCommon.createExternalDumpJob() - - dump_job = WorkflowJob.from_file("dump_job", name="DUMP") - - context = Substitutions() - context[""] = "text" - - workflow = Workflow.from_file("dump_workflow", context, {"DUMP": dump_job}) - - assert len(workflow) == 2 - - WorkflowRunner(workflow).run_blocking() - - with open("dump1", encoding="utf-8") as f: - assert f.read() == "dump_text_1" - - with open("dump2", encoding="utf-8") as f: - assert f.read() == "dump_text_2" - - -@pytest.mark.usefixtures("use_tmpdir") -def test_failing_workflow_run(): - with pytest.raises(ConfigValidationError, match="No such file or directory"): - _ = Workflow.from_file("the_file_name.ert", None, {}) - - -@pytest.mark.usefixtures("use_tmpdir") -def test_that_failure_in_parsing_workflow_gives_config_validation_error(): - with open("workflow", "w", encoding="utf-8") as f: - f.write("DEFINE\n") - with pytest.raises( - ConfigValidationError, match=r"DEFINE must have .* arguments" - ) as err: - _ = Workflow.from_file("workflow", None, {}) - assert os.path.abspath(err.value.errors[0].filename) == os.path.abspath("workflow") - - -@pytest.mark.usefixtures("use_tmpdir") -def test_that_substitution_happens_in_workflow(): - with open("workflow", "w", encoding="utf-8") as f: - f.write("JOB \n") - substlist = Substitutions() - substlist[""] = "a" - substlist[""] = "b" - job = WorkflowJob( - name="JOB", - internal=False, - min_args=None, - max_args=None, - arg_types=[], - executable="echo", - script=None, - ) - wf = Workflow.from_file( - "workflow", - substlist, - {"JOB": job}, - ) - assert wf.cmd_list == [(job, ["a", "b"])] - - -@pytest.mark.usefixtures("use_tmpdir") -@given( - strategies.lists( - strategies.sampled_from( - [ - "foo", - "bar", - "baz", - ] - ), - min_size=1, - max_size=20, - ) -) -def test_that_multiple_workflow_jobs_are_ordered_correctly(order): - with open("workflow", "w", encoding="utf-8") as f: - f.write("\n".join(order)) - - foo = get_workflow_job("foo") - bar = get_workflow_job("bar") - baz = get_workflow_job("baz") - - wf = Workflow.from_file( - src_file="workflow", - context=None, - job_dict={ - "foo": foo, - "bar": bar, - "baz": baz, - }, - ) - - assert [x[0].name for x in wf.cmd_list] == order - - -@pytest.mark.usefixtures("use_tmpdir") -def test_that_multiple_workflow_jobs_with_redefines_are_ordered_correctly(): - with open("workflow", "w", encoding="utf-8") as f: - f.write( - "\n".join( - [ - "DEFINE 1", - "foo ", - "bar ", - "DEFINE 3", - "foo ", - "baz ", - ] - ) - ) - - foo = get_workflow_job("foo") - bar = get_workflow_job("bar") - baz = get_workflow_job("baz") - - wf = Workflow.from_file( - src_file="workflow", - context=None, - job_dict={ - "foo": foo, - "bar": bar, - "baz": baz, - }, - ) - - commands = [(name, args[0]) for (name, args) in wf.cmd_list] - - assert commands == [(foo, "1"), (bar, "1"), (foo, "3"), (baz, "3")] - - -@pytest.mark.usefixtures("use_tmpdir") -def test_that_unknown_jobs_gives_error(): - with open("workflow", "w", encoding="utf-8") as f: - f.write( - "\n".join( - [ - "boo ", - "kingboo ", - ] - ) - ) - - with pytest.raises( - ConfigValidationError, match="Job with name: kingboo is not recognized" - ): - Workflow.from_file( - src_file="workflow", - context=None, - job_dict={"boo": get_workflow_job("boo")}, - ) - - -@pytest.mark.usefixtures("use_tmpdir") -@pytest.mark.parametrize( - "config, expectation", - [ - ( - "WORKFLOW", - pytest.raises(ConfigValidationError, match="not have enough arguments"), - ), - ( - "WORKFLOW arg_1", - does_not_raise(), - ), - ( - "WORKFLOW arg_1 arg_2", - does_not_raise(), - ), - ( - "WORKFLOW arg_1 arg_2 arg_3", - pytest.raises(ConfigValidationError, match="too many arguments"), - ), - ], -) -@pytest.mark.parametrize("min_args, max_args", [(1, 2), (None, None)]) -def test_args_validation(config, expectation, min_args, max_args): - with open("workflow", "w", encoding="utf-8") as f: - f.write(config) - if min_args is None and max_args is None: - expectation = does_not_raise() - with expectation: - Workflow.from_file( - src_file="workflow", - context=None, - job_dict={ - "WORKFLOW": WorkflowJob( - name="WORKFLOW", - internal=False, - min_args=min_args, - max_args=max_args, - arg_types=[], - executable=None, - script=None, - ), - }, - ) diff --git a/tests/ert/unit_tests/workflow_runner/test_workflow_job.py b/tests/ert/unit_tests/workflow_runner/test_workflow_job.py deleted file mode 100644 index 958cbe5a583..00000000000 --- a/tests/ert/unit_tests/workflow_runner/test_workflow_job.py +++ /dev/null @@ -1,140 +0,0 @@ -import pytest - -from ert.config import WorkflowJob -from ert.workflow_runner import WorkflowJobRunner - -from .workflow_common import WorkflowCommon - - -@pytest.mark.usefixtures("use_tmpdir") -def test_read_internal_function(): - WorkflowCommon.createErtScriptsJob() - - workflow_job = WorkflowJob.from_file( - name="SUBTRACT", - config_file="subtract_script_job", - ) - assert workflow_job.name == "SUBTRACT" - assert workflow_job.internal - - assert workflow_job.script.endswith("subtract_script.py") - - -@pytest.mark.usefixtures("use_tmpdir") -def test_arguments(): - WorkflowCommon.createErtScriptsJob() - - job = WorkflowJob.from_file( - name="SUBTRACT", - config_file="subtract_script_job", - ) - - assert job.min_args == 2 - assert job.max_args == 2 - assert job.argument_types() == [float, float] - - assert WorkflowJobRunner(job).run([1, 2.5]) - - with pytest.raises(ValueError, match="requires at least 2 arguments"): - WorkflowJobRunner(job).run([1]) - - with pytest.raises(ValueError, match="can only have 2 arguments"): - WorkflowJobRunner(job).run(["x %d %f %d %s", 1, 2.5, True, "y", "nada"]) - - -@pytest.mark.usefixtures("use_tmpdir") -def test_run_external_job(): - WorkflowCommon.createExternalDumpJob() - - job = WorkflowJob.from_file( - name="DUMP", - config_file="dump_job", - ) - - assert not job.internal - argTypes = job.argument_types() - assert argTypes == [str, str] - runner = WorkflowJobRunner(job) - assert runner.run(["test", "text"]) is None - assert runner.stdoutdata() == "Hello World\n" - - with open("test", encoding="utf-8") as f: - assert f.read() == "text" - - -@pytest.mark.usefixtures("use_tmpdir") -def test_error_handling_external_job(): - WorkflowCommon.createExternalDumpJob() - - job = WorkflowJob.from_file( - name="DUMP", - config_file="dump_failing_job", - ) - - assert not job.internal - job.argument_types() - runner = WorkflowJobRunner(job) - assert runner.run([]) is None - assert runner.stderrdata().startswith("Traceback") - - -@pytest.mark.usefixtures("use_tmpdir") -def test_run_internal_script(): - WorkflowCommon.createErtScriptsJob() - - job = WorkflowJob.from_file( - name="SUBTRACT", - config_file="subtract_script_job", - ) - - result = WorkflowJobRunner(job).run(["1", "2"]) - - assert result == -1 - - -@pytest.mark.usefixtures("use_tmpdir") -def test_stop_on_fail_is_parsed_internal(): - with open("fail_job", "w+", encoding="utf-8") as f: - f.write("INTERNAL True\n") - f.write("SCRIPT fail_script.py\n") - f.write("MIN_ARG 1\n") - f.write("MAX_ARG 1\n") - f.write("ARG_TYPE 0 STRING\n") - f.write("STOP_ON_FAIL True\n") - - with open("fail_script.py", "w+", encoding="utf-8") as f: - f.write( - """ -from ert import ErtScript - -class SevereErtFailureScript(ErtScript): - def __init__(self, ert, storage, ensemble=None): - assert False, "Severe ert failure" - - def run(self, *args): - pass - """ - ) - - job_internal = WorkflowJob.from_file( - name="FAIL", - config_file="fail_job", - ) - - assert job_internal.stop_on_fail - - -@pytest.mark.usefixtures("use_tmpdir") -def test_stop_on_fail_is_parsed_external(): - with open("fail_job", "w+", encoding="utf-8") as f: - f.write("INTERNAL False\n") - f.write("EXECUTABLE echo\n") - f.write("MIN_ARG 1\n") - f.write("STOP_ON_FAIL True\n") - - job_internal = WorkflowJob.from_file( - name="FAIL", - config_file="fail_job", - ) - - assert job_internal.stop_on_fail diff --git a/tests/ert/unit_tests/workflow_runner/test_workflow_runner.py b/tests/ert/unit_tests/workflow_runner/test_workflow_runner.py index 6bdcefd496b..95e68c73936 100644 --- a/tests/ert/unit_tests/workflow_runner/test_workflow_runner.py +++ b/tests/ert/unit_tests/workflow_runner/test_workflow_runner.py @@ -3,14 +3,161 @@ import pytest -from ert import WorkflowRunner from ert.config import Workflow, WorkflowJob from ert.substitutions import Substitutions +from ert.workflow_runner import WorkflowJobRunner, WorkflowRunner from tests.ert.utils import wait_until from .workflow_common import WorkflowCommon +@pytest.mark.usefixtures("use_tmpdir") +def test_read_internal_function(): + WorkflowCommon.createErtScriptsJob() + + workflow_job = WorkflowJob.from_file( + name="SUBTRACT", + config_file="subtract_script_job", + ) + assert workflow_job.name == "SUBTRACT" + assert workflow_job.internal + + assert workflow_job.script.endswith("subtract_script.py") + + +@pytest.mark.usefixtures("use_tmpdir") +def test_arguments(): + WorkflowCommon.createErtScriptsJob() + + job = WorkflowJob.from_file( + name="SUBTRACT", + config_file="subtract_script_job", + ) + + assert job.min_args == 2 + assert job.max_args == 2 + assert job.argument_types() == [float, float] + + assert WorkflowJobRunner(job).run([1, 2.5]) + + with pytest.raises(ValueError, match="requires at least 2 arguments"): + WorkflowJobRunner(job).run([1]) + + with pytest.raises(ValueError, match="can only have 2 arguments"): + WorkflowJobRunner(job).run(["x %d %f %d %s", 1, 2.5, True, "y", "nada"]) + + +@pytest.mark.usefixtures("use_tmpdir") +def test_run_external_job(): + WorkflowCommon.createExternalDumpJob() + + job = WorkflowJob.from_file( + name="DUMP", + config_file="dump_job", + ) + + assert not job.internal + argTypes = job.argument_types() + assert argTypes == [str, str] + runner = WorkflowJobRunner(job) + assert runner.run(["test", "text"]) is None + assert runner.stdoutdata() == "Hello World\n" + + with open("test", encoding="utf-8") as f: + assert f.read() == "text" + + +@pytest.mark.usefixtures("use_tmpdir") +def test_error_handling_external_job(): + WorkflowCommon.createExternalDumpJob() + + job = WorkflowJob.from_file( + name="DUMP", + config_file="dump_failing_job", + ) + + assert not job.internal + job.argument_types() + runner = WorkflowJobRunner(job) + assert runner.run([]) is None + assert runner.stderrdata().startswith("Traceback") + + +@pytest.mark.usefixtures("use_tmpdir") +def test_run_internal_script(): + WorkflowCommon.createErtScriptsJob() + + job = WorkflowJob.from_file( + name="SUBTRACT", + config_file="subtract_script_job", + ) + + result = WorkflowJobRunner(job).run(["1", "2"]) + + assert result == -1 + + +@pytest.mark.usefixtures("use_tmpdir") +def test_stop_on_fail_is_parsed_internal(): + with open("fail_job", "w+", encoding="utf-8") as f: + f.write("INTERNAL True\n") + f.write("SCRIPT fail_script.py\n") + f.write("MIN_ARG 1\n") + f.write("MAX_ARG 1\n") + f.write("ARG_TYPE 0 STRING\n") + f.write("STOP_ON_FAIL True\n") + + with open("fail_script.py", "w+", encoding="utf-8") as f: + f.write( + """ +from ert import ErtScript + +class SevereErtFailureScript(ErtScript): + def __init__(self, ert, storage, ensemble=None): + assert False, "Severe ert failure" + + def run(self, *args): + pass + """ + ) + + job_internal = WorkflowJob.from_file( + name="FAIL", + config_file="fail_job", + ) + + assert job_internal.stop_on_fail + + +@pytest.mark.usefixtures("use_tmpdir") +def test_workflow_run(): + WorkflowCommon.createExternalDumpJob() + + dump_job = WorkflowJob.from_file("dump_job", name="DUMP") + + context = Substitutions() + context[""] = "text" + + workflow = Workflow.from_file("dump_workflow", context, {"DUMP": dump_job}) + + assert len(workflow) == 2 + + job, args = workflow[0] + assert args[0] == "dump1" + assert args[1] == "dump_text_1" + + job, args = workflow[1] + assert job.name == "DUMP" + + WorkflowRunner(workflow).run_blocking() + + with open("dump1", encoding="utf-8") as f: + assert f.read() == "dump_text_1" + + with open("dump2", encoding="utf-8") as f: + assert f.read() == "dump_text_2" + + @pytest.mark.integration_test @pytest.mark.usefixtures("use_tmpdir") def test_workflow_thread_cancel_ert_script():