Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename internal functions and attributes on state change #6627

Merged
merged 1 commit into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 15 additions & 16 deletions src/ert/job_queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def __init__(self, queue_config: "QueueConfig"):
self._ee_token: Optional[str] = None
self._ee_ssl_context: Optional[Union[ssl.SSLContext, bool]] = None

self._changes_to_publish: Optional[
self._statechanges_to_publish: Optional[
asyncio.Queue[Union[Dict[int, str], object]]
] = None

Expand Down Expand Up @@ -240,7 +240,7 @@ def _translate_change_to_cloudevent(
},
)

async def _publish_changes(
async def _publish_statechanges(
self, changes: Dict[int, str], ee_connection: WebSocketClientProtocol
) -> None:
assert self._ens_id is not None
Expand All @@ -254,18 +254,18 @@ async def _publish_changes(
await ee_connection.send(to_json(events[0]))
events.popleft()

async def _jobqueue_publisher(self) -> None:
async def _realization_statechange_publisher(self) -> None:
ee_headers = Headers()
if self._ee_token is not None:
ee_headers["token"] = self._ee_token

if self._ee_uri is None:
# If no ensemble evaluator present, we will publish to the log
assert self._changes_to_publish is not None
assert self._statechanges_to_publish is not None
while (
change := await self._changes_to_publish.get()
change := await self._statechanges_to_publish.get()
) != CLOSE_PUBLISHER_SENTINEL:
logger.warning(f"State change in jobqueue.execute(): {change}")
logger.warning(f"State change in JobQueue.execute(): {change}")
return

async for ee_connection in connect(
Expand All @@ -278,14 +278,13 @@ async def _jobqueue_publisher(self) -> None:
close_timeout=60,
):
try:
assert self._changes_to_publish is not None
assert self._statechanges_to_publish is not None
while True:
change = await self._changes_to_publish.get()
change = await self._statechanges_to_publish.get()
if change == CLOSE_PUBLISHER_SENTINEL:
print("CLOSE SENTINEL")
return
assert isinstance(change, dict)
await self._publish_changes(change, ee_connection)
await self._publish_statechanges(change, ee_connection)
except ConnectionClosed:
logger.debug(
"Websocket connection from JobQueue "
Expand All @@ -300,12 +299,12 @@ async def execute(
if evaluators is None:
evaluators = []

self._changes_to_publish = asyncio.Queue()
asyncio.create_task(self._jobqueue_publisher())
self._statechanges_to_publish = asyncio.Queue()
asyncio.create_task(self._realization_statechange_publisher())
asyncio.create_task(self._realization_submitter())

try:
# await self._changes_to_publish.put(self._differ.snapshot()) # Reimplement me!, maybe send waiting states?
# await self._statechanges_to_publish.put(self._differ.snapshot()) # Reimplement me!, maybe send waiting states?
while True:
await asyncio.sleep(1)

Expand All @@ -329,13 +328,13 @@ async def execute(
print("WE ARE STOPPED")
logger.debug("queue cancelled, stopping jobs...")
await self.stop_jobs_async()
await self._changes_to_publish.put(CLOSE_PUBLISHER_SENTINEL)
await self._statechanges_to_publish.put(CLOSE_PUBLISHER_SENTINEL)
return EVTYPE_ENSEMBLE_CANCELLED

if not self.is_active():
print("not active, breaking out")
await asyncio.sleep(0.1) # Let changes be propagated to the queue
while not self._changes_to_publish.empty():
while not self._statechanges_to_publish.empty():
# Drain queue..
await asyncio.sleep(0.1)
break
Expand All @@ -351,7 +350,7 @@ async def execute(
logger.debug("jobs stopped, re-raising exception")
return EVTYPE_ENSEMBLE_FAILED

await self._changes_to_publish.put(CLOSE_PUBLISHER_SENTINEL)
await self._statechanges_to_publish.put(CLOSE_PUBLISHER_SENTINEL)

if not self.is_all_reals_state(RealizationState.SUCCESS):
return EVTYPE_ENSEMBLE_FAILED
Expand Down
4 changes: 2 additions & 2 deletions src/ert/job_queue/realization_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def __init__(
donotgohere = UNKNOWN.to(STATUS_FAILURE)

def on_enter_state(self, target: RealizationState) -> None:
if self.jobqueue._changes_to_publish is None:
if self.jobqueue._statechanges_to_publish is None:
return
if target in (
# RealizationState.WAITING, # This happens too soon (initially)
Expand All @@ -107,7 +107,7 @@ def on_enter_state(self, target: RealizationState) -> None:
RealizationState.IS_KILLED,
):
change = {self.realization.run_arg.iens: target.id}
asyncio.create_task(self.jobqueue._changes_to_publish.put(change))
asyncio.create_task(self.jobqueue._statechanges_to_publish.put(change))

def on_enter_SUBMITTED(self) -> None:
asyncio.create_task(self.jobqueue.driver.submit(self))
Expand Down
Loading