Skip to content

Commit

Permalink
Reorganize workflow tests
Browse files Browse the repository at this point in the history
  • Loading branch information
eivindjahren committed Jan 10, 2025
1 parent 21323bf commit d833851
Show file tree
Hide file tree
Showing 5 changed files with 354 additions and 388 deletions.
192 changes: 190 additions & 2 deletions tests/ert/unit_tests/config/test_workflow.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,202 @@
import os
from contextlib import ExitStack as does_not_raise

import pytest
from hypothesis import given, strategies

from ert.config import ConfigValidationError, Workflow
from ert.config import ConfigValidationError, Workflow, WorkflowJob
from ert.substitutions import Substitutions


@pytest.mark.usefixtures("use_tmpdir")
def test_reading_non_existent_workflow_raises_config_error():
with pytest.raises(ConfigValidationError, match="No such file or directory"):
Workflow.from_file("/tmp/does_not_exist", None, {})
Workflow.from_file("does_not_exist", None, {})
os.mkdir("is_a_directory")
with pytest.raises(ConfigValidationError, match="Is a directory"):
Workflow.from_file("is_a_directory", None, {})


@pytest.mark.usefixtures("use_tmpdir")
def test_that_failure_in_parsing_workflow_gives_config_validation_error():
with open("workflow", "w", encoding="utf-8") as f:
f.write("DEFINE\n")
with pytest.raises(
ConfigValidationError, match=r"DEFINE must have .* arguments"
) as err:
_ = Workflow.from_file("workflow", None, {})
assert os.path.abspath(err.value.errors[0].filename) == os.path.abspath("workflow")


@pytest.mark.usefixtures("use_tmpdir")
def test_that_substitution_happens_in_workflow():
with open("workflow", "w", encoding="utf-8") as f:
f.write("JOB <A> <B>\n")
substlist = Substitutions()
substlist["<A>"] = "a"
substlist["<B>"] = "b"
job = WorkflowJob(
name="JOB",
internal=False,
min_args=None,
max_args=None,
arg_types=[],
executable="echo",
script=None,
)
wf = Workflow.from_file(
"workflow",
substlist,
{"JOB": job},
)
assert wf.cmd_list == [(job, ["a", "b"])]


def get_workflow_job(name):
return WorkflowJob(
name=name,
internal=False,
min_args=None,
max_args=None,
arg_types=[],
executable=None,
script=None,
)


@pytest.mark.usefixtures("use_tmpdir")
@given(
strategies.lists(
strategies.sampled_from(
[
"foo",
"bar",
"baz",
]
),
min_size=1,
max_size=20,
)
)
def test_that_multiple_workflow_jobs_are_ordered_correctly(order):
with open("workflow", "w", encoding="utf-8") as f:
f.write("\n".join(order))

foo = get_workflow_job("foo")
bar = get_workflow_job("bar")
baz = get_workflow_job("baz")

wf = Workflow.from_file(
src_file="workflow",
context=None,
job_dict={
"foo": foo,
"bar": bar,
"baz": baz,
},
)

assert [x[0].name for x in wf.cmd_list] == order


@pytest.mark.usefixtures("use_tmpdir")
def test_that_multiple_workflow_jobs_with_redefines_are_ordered_correctly():
with open("workflow", "w", encoding="utf-8") as f:
f.write(
"\n".join(
[
"DEFINE <A> 1",
"foo <A>",
"bar <A>",
"DEFINE <A> 3",
"foo <A>",
"baz <A>",
]
)
)

foo = get_workflow_job("foo")
bar = get_workflow_job("bar")
baz = get_workflow_job("baz")

wf = Workflow.from_file(
src_file="workflow",
context=None,
job_dict={
"foo": foo,
"bar": bar,
"baz": baz,
},
)

commands = [(name, args[0]) for (name, args) in wf.cmd_list]

assert commands == [(foo, "1"), (bar, "1"), (foo, "3"), (baz, "3")]


@pytest.mark.usefixtures("use_tmpdir")
def test_that_unknown_jobs_gives_error():
with open("workflow", "w", encoding="utf-8") as f:
f.write(
"\n".join(
[
"boo <A>",
"kingboo <A>",
]
)
)

with pytest.raises(
ConfigValidationError, match="Job with name: kingboo is not recognized"
):
Workflow.from_file(
src_file="workflow",
context=None,
job_dict={"boo": get_workflow_job("boo")},
)


@pytest.mark.usefixtures("use_tmpdir")
@pytest.mark.parametrize(
"config, expectation",
[
(
"WORKFLOW",
pytest.raises(ConfigValidationError, match="not have enough arguments"),
),
(
"WORKFLOW arg_1",
does_not_raise(),
),
(
"WORKFLOW arg_1 arg_2",
does_not_raise(),
),
(
"WORKFLOW arg_1 arg_2 arg_3",
pytest.raises(ConfigValidationError, match="too many arguments"),
),
],
)
@pytest.mark.parametrize("min_args, max_args", [(1, 2), (None, None)])
def test_args_validation(config, expectation, min_args, max_args):
with open("workflow", "w", encoding="utf-8") as f:
f.write(config)
if min_args is None and max_args is None:
expectation = does_not_raise()
with expectation:
Workflow.from_file(
src_file="workflow",
context=None,
job_dict={
"WORKFLOW": WorkflowJob(
name="WORKFLOW",
internal=False,
min_args=min_args,
max_args=max_args,
arg_types=[],
executable=None,
script=None,
),
},
)
16 changes: 16 additions & 0 deletions tests/ert/unit_tests/config/test_workflow_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,19 @@ def test_that_ert_warns_on_duplicate_workflow_jobs(tmp_path):
ErtPluginContext(),
):
_ = ErtConfig.from_file(test_config_file_name)


@pytest.mark.usefixtures("use_tmpdir")
def test_stop_on_fail_is_parsed_external():
with open("fail_job", "w+", encoding="utf-8") as f:
f.write("INTERNAL False\n")
f.write("EXECUTABLE echo\n")
f.write("MIN_ARG 1\n")
f.write("STOP_ON_FAIL True\n")

job_internal = WorkflowJob.from_file(
name="FAIL",
config_file="fail_job",
)

assert job_internal.stop_on_fail
Loading

0 comments on commit d833851

Please sign in to comment.