From e26780c75821dd2ffcb5b98177f6455a8498aada Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Tue, 21 Nov 2023 14:58:23 +0100 Subject: [PATCH] Rename internal functions and attributes on state change --- src/ert/job_queue/queue.py | 36 +++++++++++++------------- src/ert/job_queue/realization_state.py | 4 +-- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/src/ert/job_queue/queue.py b/src/ert/job_queue/queue.py index 45801fbaa3b..74d5703c3bd 100644 --- a/src/ert/job_queue/queue.py +++ b/src/ert/job_queue/queue.py @@ -96,7 +96,7 @@ def __init__(self, queue_config: "QueueConfig"): self._ee_token: Optional[str] = None self._ee_ssl_context: Optional[Union[ssl.SSLContext, bool]] = None - self._changes_to_publish: Optional[ + self._statechanges_to_publish: Optional[ asyncio.Queue[Union[Dict[int, str], object]] ] = None @@ -162,7 +162,9 @@ def queue_size(self) -> int: def _add_realization(self, realization: QueueableRealization) -> int: self._realizations.append( - RealizationState(self, realization, retries=self._queue_config.max_submit - 1) + RealizationState( + self, realization, retries=self._queue_config.max_submit - 1 + ) ) return len(self._realizations) - 1 @@ -236,7 +238,7 @@ def _translate_change_to_cloudevent( }, ) - async def _publish_changes( + async def _publish_statechanges( self, changes: Dict[int, str], ee_connection: WebSocketClientProtocol ) -> None: assert self._ens_id is not None @@ -250,18 +252,18 @@ async def _publish_changes( await ee_connection.send(to_json(events[0])) events.popleft() - async def _jobqueue_publisher(self) -> None: + async def _realization_statechange_publisher(self) -> None: ee_headers = Headers() if self._ee_token is not None: ee_headers["token"] = self._ee_token if self._ee_uri is None: # If no ensemble evaluator present, we will publish to the log - assert self._changes_to_publish is not None + assert self._statechanges_to_publish is not None while ( - change := await self._changes_to_publish.get() + change := await self._statechanges_to_publish.get() ) != CLOSE_PUBLISHER_SENTINEL: - logger.warning(f"State change in jobqueue.execute(): {change}") + logger.warning(f"State change in JobQueue.execute(): {change}") return async for ee_connection in connect( @@ -274,14 +276,13 @@ async def _jobqueue_publisher(self) -> None: close_timeout=60, ): try: - assert self._changes_to_publish is not None + assert self._statechanges_to_publish is not None while True: - change = await self._changes_to_publish.get() + change = await self._statechanges_to_publish.get() if change == CLOSE_PUBLISHER_SENTINEL: - print("CLOSE SENTINEL") return assert isinstance(change, dict) - await self._publish_changes(change, ee_connection) + await self._publish_statechanges(change, ee_connection) except ConnectionClosed: logger.debug( "Websocket connection from JobQueue " @@ -296,12 +297,12 @@ async def execute( if evaluators is None: evaluators = [] - self._changes_to_publish = asyncio.Queue() - asyncio.create_task(self._jobqueue_publisher()) + self._statechanges_to_publish = asyncio.Queue() + asyncio.create_task(self._realization_statechange_publisher()) asyncio.create_task(self._realization_submitter()) try: - # await self._changes_to_publish.put(self._differ.snapshot()) # Reimplement me!, maybe send waiting states? + # await self._statechanges_to_publish.put(self._differ.snapshot()) # Reimplement me!, maybe send waiting states? while True: await asyncio.sleep(1) @@ -324,13 +325,13 @@ async def execute( print("WE ARE STOPPED") logger.debug("queue cancelled, stopping jobs...") await self.stop_jobs_async() - await self._changes_to_publish.put(CLOSE_PUBLISHER_SENTINEL) + await self._statechanges_to_publish.put(CLOSE_PUBLISHER_SENTINEL) return EVTYPE_ENSEMBLE_CANCELLED if not self.is_active(): print("not active, breaking out") await asyncio.sleep(0.1) # Let changes be propagated to the queue - while not self._changes_to_publish.empty(): + while not self._statechanges_to_publish.empty(): # Drain queue.. await asyncio.sleep(0.1) break @@ -346,14 +347,13 @@ async def execute( logger.debug("jobs stopped, re-raising exception") return EVTYPE_ENSEMBLE_FAILED - await self._changes_to_publish.put(CLOSE_PUBLISHER_SENTINEL) + await self._statechanges_to_publish.put(CLOSE_PUBLISHER_SENTINEL) if not self.is_all_reals_state(RealizationState.SUCCESS): return EVTYPE_ENSEMBLE_FAILED return EVTYPE_ENSEMBLE_STOPPED - def add_realization_from_run_arg( self, run_arg: "RunArg", diff --git a/src/ert/job_queue/realization_state.py b/src/ert/job_queue/realization_state.py index f386f97046c..6312772e76f 100644 --- a/src/ert/job_queue/realization_state.py +++ b/src/ert/job_queue/realization_state.py @@ -103,7 +103,7 @@ def __init__( donotgohere = UNKNOWN.to(STATUS_FAILURE) def on_enter_state(self, target, event): - if self.jobqueue._changes_to_publish is None: + if self.jobqueue._statechanges_to_publish is None: return if target in ( # RealizationState.WAITING, # This happens too soon (initially) @@ -114,7 +114,7 @@ def on_enter_state(self, target, event): RealizationState.IS_KILLED, ): change = {self.realization.run_arg.iens: target.id} - asyncio.create_task(self.jobqueue._changes_to_publish.put(change)) + asyncio.create_task(self.jobqueue._statechanges_to_publish.put(change)) def on_enter_SUBMITTED(self): asyncio.create_task(self.jobqueue.driver.submit(self))