Skip to content

Commit

Permalink
Unify Jobqueue.execute() and refactor its websocket connection
Browse files Browse the repository at this point in the history
The websocket connection from the JobQueue is refactored
to use a single asyncio task for fetching changes to be published,
publish them, and also keep the connection open.

The queue execute function had two variants, one that
included websocket communication (for "legacy" ert, and one
for Everest, the "simulation context"). Now there is only one,
which is async.

The changes that are published to the websocket channel is in the
simulation context (Everest) logged to the debug log. Previously that
information was not propagated.

The connection to the ensemble evaluator is now configured once,
and from now on it is not possible to have the job_runner use a
different connection than the JobQueue uses for publishing its
realization state changes.
  • Loading branch information
berland committed Nov 15, 2023
1 parent 4241702 commit 0569a3d
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 185 deletions.
56 changes: 20 additions & 36 deletions src/ert/ensemble_evaluator/_builder/_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ async def _evaluate_inner( # pylint: disable=too-many-branches
# Set up the timeout-mechanism
timeout_queue = asyncio.Queue() # type: ignore
# Based on the experiment id the generator will
# give a function returning cloud event or protobuf
# give a function returning cloud event
event_creator = self.generate_event_creator(experiment_id=experiment_id)
on_timeout, send_timeout_future = self.setup_timeout_callback(
timeout_queue, cloudevent_unary_send, event_creator
Expand All @@ -169,16 +169,11 @@ async def _evaluate_inner( # pylint: disable=too-many-branches
if not self._config:
raise ValueError("no config") # mypy

# event for normal evaluation, will be overwritten later in case of failure
# or cancellation
result = event_creator(identifiers.EVTYPE_ENSEMBLE_STOPPED, None)

try:
# Dispatch STARTED-event
out_cloudevent = event_creator(identifiers.EVTYPE_ENSEMBLE_STARTED, None)
await cloudevent_unary_send(out_cloudevent)
await cloudevent_unary_send(
event_creator(identifiers.EVTYPE_ENSEMBLE_STARTED, None)
)

# Submit all jobs to queue and inform queue when done
for real in self.active_reals:
self._job_queue.add_realization(real, callback_timeout=on_timeout)

Expand All @@ -199,46 +194,35 @@ async def _evaluate_inner( # pylint: disable=too-many-branches
)
]

# Tell queue to pass info to the jobs-file
# NOTE: This touches files on disk...
sema = threading.BoundedSemaphore(value=CONCURRENT_INTERNALIZATION)
self._job_queue.add_dispatch_information_to_jobs_file(
self._job_queue.set_ee_info(
ee_uri=self._config.dispatch_uri,
ens_id=self.id_,
dispatch_url=self._config.dispatch_uri,
cert=self._config.cert,
token=self._config.token,
)
# Finally, run the queue-loop until it finishes or raises
await self._job_queue.execute_queue_via_websockets(
self._config.dispatch_uri,
self.id_,
sema,
queue_evaluators, # type: ignore
ee_cert=self._config.cert,
ee_token=self._config.token,
)

except asyncio.CancelledError:
logger.debug("ensemble was cancelled")
result = event_creator(identifiers.EVTYPE_ENSEMBLE_CANCELLED, None)
# Tell queue to pass info to the jobs-file
# NOTE: This touches files on disk...
self._job_queue.add_dispatch_information_to_jobs_file()

sema = threading.BoundedSemaphore(value=CONCURRENT_INTERNALIZATION)
result: str = await self._job_queue.execute(
sema,
queue_evaluators,
)

except Exception:
logger.exception(
"unexpected exception in ensemble",
exc_info=True,
)
result = event_creator(identifiers.EVTYPE_ENSEMBLE_FAILED, None)
result = identifiers.EVTYPE_ENSEMBLE_FAILED

else:
logger.debug("ensemble finished normally")

finally:
await timeout_queue.put(None) # signal to exit timer
await send_timeout_future
await timeout_queue.put(None) # signal to exit timer
await send_timeout_future

# Dispatch final result from evaluator - FAILED, CANCEL or STOPPED
assert self._config # mypy
await cloudevent_unary_send(result)
# Dispatch final result from evaluator - FAILED, CANCEL or STOPPED
await cloudevent_unary_send(event_creator(result, None))

@property
def cancellable(self) -> bool:
Expand Down
Loading

0 comments on commit 0569a3d

Please sign in to comment.