Skip to content

Commit

Permalink
Use custom loop to get additional exceptions
Browse files Browse the repository at this point in the history
 - Monkeypatch forward_modek_ok for scheduler
  • Loading branch information
xjules committed Jan 4, 2024
1 parent a762b9b commit a79a20b
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 9 deletions.
3 changes: 2 additions & 1 deletion src/ert/ensemble_evaluator/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from websockets.exceptions import ConnectionClosedError
from websockets.legacy.server import WebSocketServerProtocol

from ert.async_utils import new_event_loop
from ert.serialization import evaluator_marshaller, evaluator_unmarshaller

from ._builder import Ensemble
Expand Down Expand Up @@ -65,7 +66,7 @@ def __init__(self, ensemble: Ensemble, config: EvaluatorServerConfig, iter_: int
self._config: EvaluatorServerConfig = config
self._ensemble: Ensemble = ensemble

self._loop = asyncio.new_event_loop()
self._loop = new_event_loop()
self._done = self._loop.create_future()

self._clients: Set[WebSocketServerProtocol] = set()
Expand Down
4 changes: 2 additions & 2 deletions src/ert/ensemble_evaluator/evaluator_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from aiohttp import ClientError
from websockets.exceptions import ConnectionClosedError

from ert.async_utils import get_event_loop
from ert.async_utils import get_event_loop, new_event_loop
from ert.ensemble_evaluator.identifiers import (
EVTYPE_EE_SNAPSHOT,
EVTYPE_EE_SNAPSHOT_UPDATE,
Expand Down Expand Up @@ -62,7 +62,7 @@ def __init__(
self._iter_snapshot: Dict[int, Snapshot] = {}

def _drain_monitor(self) -> None:
asyncio.set_event_loop(asyncio.new_event_loop())
asyncio.set_event_loop(new_event_loop())
drainer_logger = logging.getLogger("ert.ensemble_evaluator.drainer")
while not self._model.isFinished():
try:
Expand Down
4 changes: 3 additions & 1 deletion src/ert/ensemble_evaluator/sync_ws_duplexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from websockets.datastructures import Headers
from websockets.typing import Data

from ert.async_utils import new_event_loop

from ._wait_for_evaluator import wait_for_evaluator


Expand Down Expand Up @@ -50,7 +52,7 @@ def __init__(
ssl_context = True if self._uri.startswith("wss") else None
self._ssl_context: Optional[Union[bool, ssl.SSLContext]] = ssl_context

self._loop = asyncio.new_event_loop()
self._loop = new_event_loop()
self._connection: asyncio.Task[None] = self._loop.create_task(self._connect())
self._ws: Optional[WebSocketClientProtocol] = None
self._loop_thread = threading.Thread(target=self._loop.run_forever)
Expand Down
6 changes: 2 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from qtpy.QtCore import QDir

from ert.__main__ import ert_parser
from ert.async_utils import get_event_loop, new_event_loop
from ert.cli import ENSEMBLE_EXPERIMENT_MODE
from ert.cli.main import run_cli
from ert.config import ErtConfig
Expand Down Expand Up @@ -261,10 +262,7 @@ def try_queue_and_scheduler(request, monkeypatch):
if should_enable_scheduler:
# Flaky - the new scheduler needs an event loop, which might not be initialized yet.
# This might be a bug in python 3.8, but it does not occur locally.
try:
asyncio.get_running_loop()
except RuntimeError:
asyncio.set_event_loop(asyncio.new_event_loop())
_ = get_event_loop()

monkeypatch.setattr(
FeatureToggling._conf["scheduler"], "is_enabled", should_enable_scheduler
Expand Down
5 changes: 5 additions & 0 deletions tests/unit_tests/ensemble_evaluator/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ def _make_ensemble_builder(monkeypatch, tmpdir, num_reals, num_jobs, job_sleep=0
"forward_model_ok",
lambda _: (LoadStatus.LOAD_SUCCESSFUL, ""),
)
monkeypatch.setattr(
ert.scheduler.job,
"forward_model_ok",
lambda _: (LoadStatus.LOAD_SUCCESSFUL, ""),
)
builder = ert.ensemble_evaluator.EnsembleBuilder()
with tmpdir.as_cwd():
forward_model_list = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
from cloudevents.http import CloudEvent

from _ert_job_runner.client import Client
from ert.async_utils import new_event_loop
from ert.ensemble_evaluator import Ensemble, identifiers
from ert.ensemble_evaluator._builder._realization import ForwardModel, Realization


def _mock_ws(host, port, messages, delay_startup=0):
loop = asyncio.new_event_loop()
loop = new_event_loop()
done = loop.create_future()

async def _handler(websocket, path):
Expand Down

0 comments on commit a79a20b

Please sign in to comment.