From efb22cde709182e324fafb36888b487e059195e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Tue, 21 Nov 2023 14:58:23 +0100 Subject: [PATCH] Rename internal functions and attributes on state change --- src/ert/job_queue/queue.py | 31 +++++++++++++------------- src/ert/job_queue/realization_state.py | 4 ++-- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/src/ert/job_queue/queue.py b/src/ert/job_queue/queue.py index 74f13cd7f8a..6e8d578b9da 100644 --- a/src/ert/job_queue/queue.py +++ b/src/ert/job_queue/queue.py @@ -96,7 +96,7 @@ def __init__(self, queue_config: "QueueConfig"): self._ee_token: Optional[str] = None self._ee_ssl_context: Optional[Union[ssl.SSLContext, bool]] = None - self._changes_to_publish: Optional[ + self._statechanges_to_publish: Optional[ asyncio.Queue[Union[Dict[int, str], object]] ] = None @@ -240,7 +240,7 @@ def _translate_change_to_cloudevent( }, ) - async def _publish_changes( + async def _publish_statechanges( self, changes: Dict[int, str], ee_connection: WebSocketClientProtocol ) -> None: assert self._ens_id is not None @@ -254,18 +254,18 @@ async def _publish_changes( await ee_connection.send(to_json(events[0])) events.popleft() - async def _jobqueue_publisher(self) -> None: + async def _realization_statechange_publisher(self) -> None: ee_headers = Headers() if self._ee_token is not None: ee_headers["token"] = self._ee_token if self._ee_uri is None: # If no ensemble evaluator present, we will publish to the log - assert self._changes_to_publish is not None + assert self._statechanges_to_publish is not None while ( - change := await self._changes_to_publish.get() + change := await self._statechanges_to_publish.get() ) != CLOSE_PUBLISHER_SENTINEL: - logger.warning(f"State change in jobqueue.execute(): {change}") + logger.warning(f"State change in JobQueue.execute(): {change}") return async for ee_connection in connect( @@ -278,14 +278,13 @@ async def _jobqueue_publisher(self) -> None: close_timeout=60, ): try: - assert self._changes_to_publish is not None + assert self._statechanges_to_publish is not None while True: - change = await self._changes_to_publish.get() + change = await self._statechanges_to_publish.get() if change == CLOSE_PUBLISHER_SENTINEL: - print("CLOSE SENTINEL") return assert isinstance(change, dict) - await self._publish_changes(change, ee_connection) + await self._publish_statechanges(change, ee_connection) except ConnectionClosed: logger.debug( "Websocket connection from JobQueue " @@ -300,12 +299,12 @@ async def execute( if evaluators is None: evaluators = [] - self._changes_to_publish = asyncio.Queue() - asyncio.create_task(self._jobqueue_publisher()) + self._statechanges_to_publish = asyncio.Queue() + asyncio.create_task(self._realization_statechange_publisher()) asyncio.create_task(self._realization_submitter()) try: - # await self._changes_to_publish.put(self._differ.snapshot()) # Reimplement me!, maybe send waiting states? + # await self._statechanges_to_publish.put(self._differ.snapshot()) # Reimplement me!, maybe send waiting states? while True: await asyncio.sleep(1) @@ -329,13 +328,13 @@ async def execute( print("WE ARE STOPPED") logger.debug("queue cancelled, stopping jobs...") await self.stop_jobs_async() - await self._changes_to_publish.put(CLOSE_PUBLISHER_SENTINEL) + await self._statechanges_to_publish.put(CLOSE_PUBLISHER_SENTINEL) return EVTYPE_ENSEMBLE_CANCELLED if not self.is_active(): print("not active, breaking out") await asyncio.sleep(0.1) # Let changes be propagated to the queue - while not self._changes_to_publish.empty(): + while not self._statechanges_to_publish.empty(): # Drain queue.. await asyncio.sleep(0.1) break @@ -351,7 +350,7 @@ async def execute( logger.debug("jobs stopped, re-raising exception") return EVTYPE_ENSEMBLE_FAILED - await self._changes_to_publish.put(CLOSE_PUBLISHER_SENTINEL) + await self._statechanges_to_publish.put(CLOSE_PUBLISHER_SENTINEL) if not self.is_all_reals_state(RealizationState.SUCCESS): return EVTYPE_ENSEMBLE_FAILED diff --git a/src/ert/job_queue/realization_state.py b/src/ert/job_queue/realization_state.py index 697e993a699..1596be12985 100644 --- a/src/ert/job_queue/realization_state.py +++ b/src/ert/job_queue/realization_state.py @@ -96,7 +96,7 @@ def __init__( donotgohere = UNKNOWN.to(STATUS_FAILURE) def on_enter_state(self, target: RealizationState) -> None: - if self.jobqueue._changes_to_publish is None: + if self.jobqueue._statechanges_to_publish is None: return if target in ( # RealizationState.WAITING, # This happens too soon (initially) @@ -107,7 +107,7 @@ def on_enter_state(self, target: RealizationState) -> None: RealizationState.IS_KILLED, ): change = {self.realization.run_arg.iens: target.id} - asyncio.create_task(self.jobqueue._changes_to_publish.put(change)) + asyncio.create_task(self.jobqueue._statechanges_to_publish.put(change)) def on_enter_SUBMITTED(self) -> None: asyncio.create_task(self.jobqueue.driver.submit(self))