Skip to content

Commit

Permalink
Rename ensemble_evaluator identifiers, FM_JOB to FORWARD_MODEL
Browse files Browse the repository at this point in the history
  • Loading branch information
berland committed Dec 22, 2023
1 parent 28d8a2a commit 0140ccc
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 59 deletions.
18 changes: 9 additions & 9 deletions src/_ert_job_runner/reporting/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
)
from _ert_job_runner.reporting.statemachine import StateMachine

_FM_JOB_START = "com.equinor.ert.forward_model_job.start"
_FM_JOB_RUNNING = "com.equinor.ert.forward_model_job.running"
_FM_JOB_SUCCESS = "com.equinor.ert.forward_model_job.success"
_FM_JOB_FAILURE = "com.equinor.ert.forward_model_job.failure"
_FORWARD_MODEL_START = "com.equinor.ert.forward_model_job.start"
_FORWARD_MODEL_RUNNING = "com.equinor.ert.forward_model_job.running"
_FORWARD_MODEL_SUCCESS = "com.equinor.ert.forward_model_job.success"
_FORWARD_MODEL_FAILURE = "com.equinor.ert.forward_model_job.failure"

_CONTENT_TYPE = "datacontenttype"
_JOB_MSG_TYPE = "type"
Expand Down Expand Up @@ -140,7 +140,7 @@ def _job_handler(self, msg: Message):
if isinstance(msg, Start):
logger.debug(f"Job {job_name} was successfully started")
self._dump_event(
attributes={_JOB_MSG_TYPE: _FM_JOB_START, **job_msg_attrs},
attributes={_JOB_MSG_TYPE: _FORWARD_MODEL_START, **job_msg_attrs},
data={
"stdout": str(Path(msg.job.std_out).resolve()),
"stderr": str(Path(msg.job.std_err).resolve()),
Expand All @@ -149,7 +149,7 @@ def _job_handler(self, msg: Message):
if not msg.success():
logger.error(f"Job {job_name} FAILED to start")
self._dump_event(
attributes={_JOB_MSG_TYPE: _FM_JOB_FAILURE, **job_msg_attrs},
attributes={_JOB_MSG_TYPE: _FORWARD_MODEL_FAILURE, **job_msg_attrs},
data={
"error_msg": msg.error_message,
},
Expand All @@ -159,7 +159,7 @@ def _job_handler(self, msg: Message):
data = None
if msg.success():
logger.debug(f"Job {job_name} exited successfully")
attributes = {_JOB_MSG_TYPE: _FM_JOB_SUCCESS, **job_msg_attrs}
attributes = {_JOB_MSG_TYPE: _FORWARD_MODEL_SUCCESS, **job_msg_attrs}
else:
logger.error(
_JOB_EXIT_FAILED_STRING.format(
Expand All @@ -168,7 +168,7 @@ def _job_handler(self, msg: Message):
error_message=msg.error_message,
)
)
attributes = {_JOB_MSG_TYPE: _FM_JOB_FAILURE, **job_msg_attrs}
attributes = {_JOB_MSG_TYPE: _FORWARD_MODEL_FAILURE, **job_msg_attrs}
data = {
"exit_code": msg.exit_code,
"error_msg": msg.error_message,
Expand All @@ -178,7 +178,7 @@ def _job_handler(self, msg: Message):
elif isinstance(msg, Running):
logger.debug(f"{job_name} job is running")
self._dump_event(
attributes={_JOB_MSG_TYPE: _FM_JOB_RUNNING, **job_msg_attrs},
attributes={_JOB_MSG_TYPE: _FORWARD_MODEL_RUNNING, **job_msg_attrs},
data={
"max_memory_usage": msg.max_memory_usage,
"current_memory_usage": msg.current_memory_usage,
Expand Down
20 changes: 10 additions & 10 deletions src/ert/ensemble_evaluator/identifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
EVTYPE_REALIZATION_WAITING = "com.equinor.ert.realization.waiting"
EVTYPE_REALIZATION_TIMEOUT = "com.equinor.ert.realization.timeout"

EVTYPE_FM_JOB_START = "com.equinor.ert.forward_model_job.start"
EVTYPE_FM_JOB_RUNNING = "com.equinor.ert.forward_model_job.running"
EVTYPE_FM_JOB_SUCCESS = "com.equinor.ert.forward_model_job.success"
EVTYPE_FM_JOB_FAILURE = "com.equinor.ert.forward_model_job.failure"
EVTYPE_FORWARD_MODEL_START = "com.equinor.ert.forward_model_job.start"
EVTYPE_FORWARD_MODEL_RUNNING = "com.equinor.ert.forward_model_job.running"
EVTYPE_FORWARD_MODEL_SUCCESS = "com.equinor.ert.forward_model_job.success"
EVTYPE_FORWARD_MODEL_FAILURE = "com.equinor.ert.forward_model_job.failure"


EVGROUP_REALIZATION = {
Expand All @@ -41,14 +41,14 @@
EVTYPE_REALIZATION_TIMEOUT,
}

EVGROUP_FM_JOB = {
EVTYPE_FM_JOB_START,
EVTYPE_FM_JOB_RUNNING,
EVTYPE_FM_JOB_SUCCESS,
EVTYPE_FM_JOB_FAILURE,
EVGROUP_FORWARD_MODEL = {
EVTYPE_FORWARD_MODEL_START,
EVTYPE_FORWARD_MODEL_RUNNING,
EVTYPE_FORWARD_MODEL_SUCCESS,
EVTYPE_FORWARD_MODEL_FAILURE,
}

EVGROUP_FM_ALL = EVGROUP_REALIZATION | EVGROUP_FM_JOB
EVGROUP_FM_ALL = EVGROUP_REALIZATION | EVGROUP_FORWARD_MODEL

EVTYPE_EE_SNAPSHOT = "com.equinor.ert.ee.snapshot"
EVTYPE_EE_SNAPSHOT_UPDATE = "com.equinor.ert.ee.snapshot_update"
Expand Down
23 changes: 13 additions & 10 deletions src/ert/ensemble_evaluator/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ class UnsupportedOperationException(ValueError):
ids.EVTYPE_REALIZATION_SUCCESS: state.REALIZATION_STATE_FINISHED,
ids.EVTYPE_REALIZATION_UNKNOWN: state.REALIZATION_STATE_UNKNOWN,
ids.EVTYPE_REALIZATION_TIMEOUT: state.REALIZATION_STATE_FAILED,
ids.EVTYPE_FM_JOB_START: state.FORWARD_MODEL_STATE_START,
ids.EVTYPE_FM_JOB_RUNNING: state.FORWARD_MODEL_STATE_RUNNING,
ids.EVTYPE_FM_JOB_SUCCESS: state.FORWARD_MODEL_STATE_FINISHED,
ids.EVTYPE_FM_JOB_FAILURE: state.FORWARD_MODEL_STATE_FAILURE,
ids.EVTYPE_FORWARD_MODEL_START: state.FORWARD_MODEL_STATE_START,
ids.EVTYPE_FORWARD_MODEL_RUNNING: state.FORWARD_MODEL_STATE_RUNNING,
ids.EVTYPE_FORWARD_MODEL_SUCCESS: state.FORWARD_MODEL_STATE_FINISHED,
ids.EVTYPE_FORWARD_MODEL_FAILURE: state.FORWARD_MODEL_STATE_FAILURE,
}

_ENSEMBLE_TYPE_EVENT_TO_STATUS = {
Expand Down Expand Up @@ -261,13 +261,16 @@ def from_cloudevent(self, event: CloudEvent) -> "PartialSnapshot":
}
)

elif e_type in ids.EVGROUP_FM_JOB:
elif e_type in ids.EVGROUP_FORWARD_MODEL:
status = _FM_TYPE_EVENT_TO_STATUS[e_type]
start_time = None
end_time = None
if e_type == ids.EVTYPE_FM_JOB_START:
if e_type == ids.EVTYPE_FORWARD_MODEL_START:
start_time = convert_iso8601_to_datetime(timestamp)
elif e_type in {ids.EVTYPE_FM_JOB_SUCCESS, ids.EVTYPE_FM_JOB_FAILURE}:
elif e_type in {
ids.EVTYPE_FORWARD_MODEL_SUCCESS,
ids.EVTYPE_FORWARD_MODEL_FAILURE,
}:
end_time = convert_iso8601_to_datetime(timestamp)

fm_dict = {
Expand All @@ -276,15 +279,15 @@ def from_cloudevent(self, event: CloudEvent) -> "PartialSnapshot":
"end_time": end_time,
"index": _get_forward_model_index(e_source),
}
if e_type == ids.EVTYPE_FM_JOB_RUNNING:
if e_type == ids.EVTYPE_FORWARD_MODEL_RUNNING:
fm_dict[ids.CURRENT_MEMORY_USAGE] = event.data.get(
ids.CURRENT_MEMORY_USAGE
)
fm_dict[ids.MAX_MEMORY_USAGE] = event.data.get(ids.MAX_MEMORY_USAGE)
if e_type == ids.EVTYPE_FM_JOB_START:
if e_type == ids.EVTYPE_FORWARD_MODEL_START:
fm_dict["stdout"] = event.data.get(ids.STDOUT)
fm_dict["stderr"] = event.data.get(ids.STDERR)
if e_type == ids.EVTYPE_FM_JOB_FAILURE:
if e_type == ids.EVTYPE_FORWARD_MODEL_FAILURE:
fm_dict["error"] = event.data.get(ids.ERROR_MSG)
self.update_forward_model(
_get_real_id(e_source),
Expand Down
6 changes: 3 additions & 3 deletions tests/performance_tests/test_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def simulate_forward_model_event_handling(
attributes={
"source": f"/ert/ensemble/{ens_id}/"
f"real/{real}/forward_model/{fm_idx}",
"type": ids.EVTYPE_FM_JOB_START,
"type": ids.EVTYPE_FORWARD_MODEL_START,
"id": str(uuid.uuid1()),
},
data={"stderr": "foo", "stdout": "bar"},
Expand All @@ -126,7 +126,7 @@ def simulate_forward_model_event_handling(
attributes={
"source": f"/ert/ensemble/{ens_id}/"
f"real/{real}/forward_model/{fm_idx}",
"type": ids.EVTYPE_FM_JOB_RUNNING,
"type": ids.EVTYPE_FORWARD_MODEL_RUNNING,
"id": str(uuid.uuid1()),
},
data={
Expand All @@ -141,7 +141,7 @@ def simulate_forward_model_event_handling(
attributes={
"source": f"/ert/ensemble/{ens_id}/"
f"real/{real}/forward_model/{fm_idx}",
"type": ids.EVTYPE_FM_JOB_SUCCESS,
"type": ids.EVTYPE_FORWARD_MODEL_SUCCESS,
"id": str(uuid.uuid1()),
},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def _evaluate(self, url):
for job in range(0, self.jobs):
send_dispatch_event(
dispatch,
identifiers.EVTYPE_FM_JOB_RUNNING,
identifiers.EVTYPE_FORWARD_MODEL_RUNNING,
f"/ert/ensemble/{self.id_}/real/{real}/forward_model/{job}",
f"event-{event_id}",
{"current_memory_usage": 1000},
Expand All @@ -112,7 +112,7 @@ def _evaluate(self, url):
if self._shouldFailJob(real, job):
send_dispatch_event(
dispatch,
identifiers.EVTYPE_FM_JOB_FAILURE,
identifiers.EVTYPE_FORWARD_MODEL_FAILURE,
f"/ert/ensemble/{self.id_}/real/{real}/forward_model/{job}",
f"event-{event_id}",
{},
Expand All @@ -122,7 +122,7 @@ def _evaluate(self, url):
break
send_dispatch_event(
dispatch,
identifiers.EVTYPE_FM_JOB_SUCCESS,
identifiers.EVTYPE_FORWARD_MODEL_SUCCESS,
f"/ert/ensemble/{self.id_}/real/{real}/forward_model/{job}",
f"event-{event_id}",
{"current_memory_usage": 1000},
Expand Down
2 changes: 1 addition & 1 deletion tests/unit_tests/ensemble_evaluator/test_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def _create_dummy_event(event_type):
async def test_that_dispatcher_uses_right_handle_function_for_one_event():
event_handler = DummyEventHandler()

event = _create_dummy_event(ids.EVTYPE_FM_JOB_SUCCESS)
event = _create_dummy_event(ids.EVTYPE_FORWARD_MODEL_SUCCESS)
await event_handler.dispatcher.handle_event(event)
await event_handler.join()

Expand Down
22 changes: 11 additions & 11 deletions tests/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def test_new_monitor_can_pick_up_where_we_left_off(evaluator):
# first dispatch endpoint client informs that forward model 0 is running
send_dispatch_event(
dispatch1,
identifiers.EVTYPE_FM_JOB_RUNNING,
identifiers.EVTYPE_FORWARD_MODEL_RUNNING,
f"/ert/ensemble/{evaluator.ensemble.id_}/real/0/forward_model/0",
"event1",
{"current_memory_usage": 1000},
Expand All @@ -55,15 +55,15 @@ def test_new_monitor_can_pick_up_where_we_left_off(evaluator):
# second dispatch endpoint client informs that forward model 0 is running
send_dispatch_event(
dispatch2,
identifiers.EVTYPE_FM_JOB_RUNNING,
identifiers.EVTYPE_FORWARD_MODEL_RUNNING,
f"/ert/ensemble/{evaluator.ensemble.id_}/real/1/forward_model/0",
"event1",
{"current_memory_usage": 1000},
)
# second dispatch endpoint client informs that forward model 1 is running
send_dispatch_event(
dispatch2,
identifiers.EVTYPE_FM_JOB_RUNNING,
identifiers.EVTYPE_FORWARD_MODEL_RUNNING,
f"/ert/ensemble/{evaluator.ensemble.id_}/real/1/forward_model/1",
"event1",
{"current_memory_usage": 1000},
Expand All @@ -86,7 +86,7 @@ def test_new_monitor_can_pick_up_where_we_left_off(evaluator):
# second dispatch endpoint client informs that job 0 is done
send_dispatch_event(
dispatch2,
identifiers.EVTYPE_FM_JOB_SUCCESS,
identifiers.EVTYPE_FORWARD_MODEL_SUCCESS,
f"/ert/ensemble/{evaluator.ensemble.id_}/real/1/forward_model/0",
"event1",
{"current_memory_usage": 1000},
Expand All @@ -95,7 +95,7 @@ def test_new_monitor_can_pick_up_where_we_left_off(evaluator):
# second dispatch endpoint client informs that job 1 is failed
send_dispatch_event(
dispatch2,
identifiers.EVTYPE_FM_JOB_FAILURE,
identifiers.EVTYPE_FORWARD_MODEL_FAILURE,
f"/ert/ensemble/{evaluator.ensemble.id_}/real/1/forward_model/1",
"event_job_1_fail",
{identifiers.ERROR_MSG: "error"},
Expand Down Expand Up @@ -150,7 +150,7 @@ def test_dispatch_endpoint_clients_can_connect_and_monitor_can_shut_down_evaluat
# first dispatch endpoint client informs that job 0 is running
send_dispatch_event(
dispatch1,
identifiers.EVTYPE_FM_JOB_RUNNING,
identifiers.EVTYPE_FORWARD_MODEL_RUNNING,
f"/ert/ensemble/{evaluator.ensemble.id_}/real/0/forward_model/0",
"event1",
{"current_memory_usage": 1000},
Expand All @@ -159,7 +159,7 @@ def test_dispatch_endpoint_clients_can_connect_and_monitor_can_shut_down_evaluat
# second dispatch endpoint client informs that job 0 is running
send_dispatch_event(
dispatch2,
identifiers.EVTYPE_FM_JOB_RUNNING,
identifiers.EVTYPE_FORWARD_MODEL_RUNNING,
f"/ert/ensemble/{evaluator.ensemble.id_}/real/1/forward_model/0",
"event1",
{"current_memory_usage": 1000},
Expand All @@ -168,7 +168,7 @@ def test_dispatch_endpoint_clients_can_connect_and_monitor_can_shut_down_evaluat
# second dispatch endpoint client informs that job 0 is done
send_dispatch_event(
dispatch2,
identifiers.EVTYPE_FM_JOB_SUCCESS,
identifiers.EVTYPE_FORWARD_MODEL_SUCCESS,
f"/ert/ensemble/{evaluator.ensemble.id_}/real/1/forward_model/0",
"event1",
{"current_memory_usage": 1000},
Expand All @@ -177,7 +177,7 @@ def test_dispatch_endpoint_clients_can_connect_and_monitor_can_shut_down_evaluat
# second dispatch endpoint client informs that job 1 is failed
send_dispatch_event(
dispatch2,
identifiers.EVTYPE_FM_JOB_FAILURE,
identifiers.EVTYPE_FORWARD_MODEL_FAILURE,
f"/ert/ensemble/{evaluator.ensemble.id_}/real/1/forward_model/1",
"event_job_1_fail",
{identifiers.ERROR_MSG: "error"},
Expand Down Expand Up @@ -295,14 +295,14 @@ def exploding_handler(events):
)
send_dispatch_event(
dispatch,
identifiers.EVTYPE_FM_JOB_RUNNING,
identifiers.EVTYPE_FORWARD_MODEL_RUNNING,
f"/ert/ensemble/{evaluator.ensemble.id_}/real/0/forward_model/0",
"event1",
{"current_memory_usage": 1000},
)
send_dispatch_event(
dispatch,
identifiers.EVTYPE_FM_JOB_RUNNING,
identifiers.EVTYPE_FORWARD_MODEL_RUNNING,
f"/ert/ensemble/{evaluator.ensemble.id_}/real/0/forward_model/0",
"event2",
{},
Expand Down
6 changes: 3 additions & 3 deletions tests/unit_tests/ensemble_evaluator/test_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def test_update_forward_models_in_partial_from_multiple_cloudevents(snapshot):
CloudEvent(
attributes={
"id": "0",
"type": ids.EVTYPE_FM_JOB_RUNNING,
"type": ids.EVTYPE_FORWARD_MODEL_RUNNING,
"source": "/real/0/forward_model/0",
},
data={
Expand All @@ -123,7 +123,7 @@ def test_update_forward_models_in_partial_from_multiple_cloudevents(snapshot):
CloudEvent(
{
"id": "0",
"type": ids.EVTYPE_FM_JOB_FAILURE,
"type": ids.EVTYPE_FORWARD_MODEL_FAILURE,
"source": "/real/0/forward_model/0",
},
{ids.ERROR_MSG: "failed"},
Expand All @@ -133,7 +133,7 @@ def test_update_forward_models_in_partial_from_multiple_cloudevents(snapshot):
CloudEvent(
{
"id": "1",
"type": ids.EVTYPE_FM_JOB_SUCCESS,
"type": ids.EVTYPE_FORWARD_MODEL_SUCCESS,
"source": "/real/0/forward_model/1",
}
)
Expand Down
18 changes: 9 additions & 9 deletions tests/unit_tests/job_runner/test_event_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
from _ert_job_runner.job import Job
from _ert_job_runner.reporting import Event
from _ert_job_runner.reporting.event import (
_FM_JOB_FAILURE,
_FM_JOB_RUNNING,
_FM_JOB_START,
_FM_JOB_SUCCESS,
_FORWARD_MODEL_FAILURE,
_FORWARD_MODEL_RUNNING,
_FORWARD_MODEL_START,
_FORWARD_MODEL_SUCCESS,
)
from _ert_job_runner.reporting.message import Exited, Finish, Init, Running, Start
from _ert_job_runner.reporting.statemachine import TransitionError
Expand All @@ -41,7 +41,7 @@ def test_report_with_successful_start_message_argument(unused_tcp_port):

assert len(lines) == 1
event = json.loads(lines[0])
assert event["type"] == _FM_JOB_START
assert event["type"] == _FORWARD_MODEL_START
assert event["source"] == "/ert/ensemble/ens_id/real/0/forward_model/0/index/0"
assert os.path.basename(event["data"]["stdout"]) == "stdout"
assert os.path.basename(event["data"]["stderr"]) == "stderr"
Expand All @@ -65,7 +65,7 @@ def test_report_with_failed_start_message_argument(unused_tcp_port):

assert len(lines) == 2
event = json.loads(lines[1])
assert event["type"] == _FM_JOB_FAILURE
assert event["type"] == _FORWARD_MODEL_FAILURE
assert event["data"]["error_msg"] == "massive_failure"


Expand All @@ -83,7 +83,7 @@ def test_report_with_successful_exit_message_argument(unused_tcp_port):

assert len(lines) == 1
event = json.loads(lines[0])
assert event["type"] == _FM_JOB_SUCCESS
assert event["type"] == _FORWARD_MODEL_SUCCESS


def test_report_with_failed_exit_message_argument(unused_tcp_port):
Expand All @@ -100,7 +100,7 @@ def test_report_with_failed_exit_message_argument(unused_tcp_port):

assert len(lines) == 1
event = json.loads(lines[0])
assert event["type"] == _FM_JOB_FAILURE
assert event["type"] == _FORWARD_MODEL_FAILURE
assert event["data"]["error_msg"] == "massive_failure"


Expand All @@ -118,7 +118,7 @@ def test_report_with_running_message_argument(unused_tcp_port):

assert len(lines) == 1
event = json.loads(lines[0])
assert event["type"] == _FM_JOB_RUNNING
assert event["type"] == _FORWARD_MODEL_RUNNING
assert event["data"]["max_memory_usage"] == 100
assert event["data"]["current_memory_usage"] == 10

Expand Down

0 comments on commit 0140ccc

Please sign in to comment.