Skip to content

Commit

Permalink
Rename internal functions and attributes on state change
Browse files Browse the repository at this point in the history
  • Loading branch information
berland committed Nov 21, 2023
1 parent 01d351f commit e26780c
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 20 deletions.
36 changes: 18 additions & 18 deletions src/ert/job_queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def __init__(self, queue_config: "QueueConfig"):
self._ee_token: Optional[str] = None
self._ee_ssl_context: Optional[Union[ssl.SSLContext, bool]] = None

self._changes_to_publish: Optional[
self._statechanges_to_publish: Optional[
asyncio.Queue[Union[Dict[int, str], object]]
] = None

Expand Down Expand Up @@ -162,7 +162,9 @@ def queue_size(self) -> int:

def _add_realization(self, realization: QueueableRealization) -> int:
self._realizations.append(
RealizationState(self, realization, retries=self._queue_config.max_submit - 1)
RealizationState(
self, realization, retries=self._queue_config.max_submit - 1
)
)
return len(self._realizations) - 1

Expand Down Expand Up @@ -236,7 +238,7 @@ def _translate_change_to_cloudevent(
},
)

async def _publish_changes(
async def _publish_statechanges(
self, changes: Dict[int, str], ee_connection: WebSocketClientProtocol
) -> None:
assert self._ens_id is not None
Expand All @@ -250,18 +252,18 @@ async def _publish_changes(
await ee_connection.send(to_json(events[0]))
events.popleft()

async def _jobqueue_publisher(self) -> None:
async def _realization_statechange_publisher(self) -> None:
ee_headers = Headers()
if self._ee_token is not None:
ee_headers["token"] = self._ee_token

if self._ee_uri is None:
# If no ensemble evaluator present, we will publish to the log
assert self._changes_to_publish is not None
assert self._statechanges_to_publish is not None
while (
change := await self._changes_to_publish.get()
change := await self._statechanges_to_publish.get()
) != CLOSE_PUBLISHER_SENTINEL:
logger.warning(f"State change in jobqueue.execute(): {change}")
logger.warning(f"State change in JobQueue.execute(): {change}")
return

async for ee_connection in connect(
Expand All @@ -274,14 +276,13 @@ async def _jobqueue_publisher(self) -> None:
close_timeout=60,
):
try:
assert self._changes_to_publish is not None
assert self._statechanges_to_publish is not None
while True:
change = await self._changes_to_publish.get()
change = await self._statechanges_to_publish.get()
if change == CLOSE_PUBLISHER_SENTINEL:
print("CLOSE SENTINEL")
return
assert isinstance(change, dict)
await self._publish_changes(change, ee_connection)
await self._publish_statechanges(change, ee_connection)
except ConnectionClosed:
logger.debug(
"Websocket connection from JobQueue "
Expand All @@ -296,12 +297,12 @@ async def execute(
if evaluators is None:
evaluators = []

self._changes_to_publish = asyncio.Queue()
asyncio.create_task(self._jobqueue_publisher())
self._statechanges_to_publish = asyncio.Queue()
asyncio.create_task(self._realization_statechange_publisher())
asyncio.create_task(self._realization_submitter())

try:
# await self._changes_to_publish.put(self._differ.snapshot()) # Reimplement me!, maybe send waiting states?
# await self._statechanges_to_publish.put(self._differ.snapshot()) # Reimplement me!, maybe send waiting states?
while True:
await asyncio.sleep(1)

Expand All @@ -324,13 +325,13 @@ async def execute(
print("WE ARE STOPPED")
logger.debug("queue cancelled, stopping jobs...")
await self.stop_jobs_async()
await self._changes_to_publish.put(CLOSE_PUBLISHER_SENTINEL)
await self._statechanges_to_publish.put(CLOSE_PUBLISHER_SENTINEL)
return EVTYPE_ENSEMBLE_CANCELLED

if not self.is_active():
print("not active, breaking out")
await asyncio.sleep(0.1) # Let changes be propagated to the queue
while not self._changes_to_publish.empty():
while not self._statechanges_to_publish.empty():
# Drain queue..
await asyncio.sleep(0.1)
break
Expand All @@ -346,14 +347,13 @@ async def execute(
logger.debug("jobs stopped, re-raising exception")
return EVTYPE_ENSEMBLE_FAILED

await self._changes_to_publish.put(CLOSE_PUBLISHER_SENTINEL)
await self._statechanges_to_publish.put(CLOSE_PUBLISHER_SENTINEL)

if not self.is_all_reals_state(RealizationState.SUCCESS):
return EVTYPE_ENSEMBLE_FAILED

return EVTYPE_ENSEMBLE_STOPPED


def add_realization_from_run_arg(
self,
run_arg: "RunArg",
Expand Down
4 changes: 2 additions & 2 deletions src/ert/job_queue/realization_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def __init__(
donotgohere = UNKNOWN.to(STATUS_FAILURE)

def on_enter_state(self, target, event):

Check failure on line 105 in src/ert/job_queue/realization_state.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Function is missing a type annotation
if self.jobqueue._changes_to_publish is None:
if self.jobqueue._statechanges_to_publish is None:
return
if target in (
# RealizationState.WAITING, # This happens too soon (initially)
Expand All @@ -114,7 +114,7 @@ def on_enter_state(self, target, event):
RealizationState.IS_KILLED,
):
change = {self.realization.run_arg.iens: target.id}
asyncio.create_task(self.jobqueue._changes_to_publish.put(change))
asyncio.create_task(self.jobqueue._statechanges_to_publish.put(change))

def on_enter_SUBMITTED(self):

Check failure on line 119 in src/ert/job_queue/realization_state.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Function is missing a return type annotation
asyncio.create_task(self.jobqueue.driver.submit(self))
Expand Down

0 comments on commit e26780c

Please sign in to comment.