Skip to content

Commit

Permalink
Speed up batching of events in evaluator
Browse files Browse the repository at this point in the history
asyncio wait_for used in evaluator is "slow" in performance when dealing with O(100K) events in the event queue.
Therefore the suggestion is to replace it with direct fetching (via get_nowait())
and instead just sleep whenever the event queue is empty.

Co-authored-by: Lars Evje <[email protected]>
  • Loading branch information
xjules and larsevj committed Jan 14, 2025
1 parent 60fc8f8 commit 09de232
Showing 1 changed file with 4 additions and 3 deletions.
7 changes: 4 additions & 3 deletions src/ert/ensemble_evaluator/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,12 @@ def set_event_handler(event_types: set[type[Event]], func: Any) -> None:
):
self._complete_batch.clear()
try:
event = await asyncio.wait_for(self._events.get(), timeout=0.1)
event = self._events.get_nowait()
function = event_handler[type(event)]
batch.append((function, event))
self._events.task_done()
except TimeoutError:
except asyncio.QueueEmpty:
await asyncio.sleep(0.1)
continue
self._complete_batch.set()
await self._batch_processing_queue.put(batch)
Expand Down Expand Up @@ -381,7 +382,7 @@ async def _monitor_and_handle_tasks(self) -> None:
raise task_exception
elif task.get_name() == "server_task":
return
elif task.get_name() == "ensemble_task" or task.get_name() in {
elif task.get_name() in {
"ensemble_task",
"listener_task",
}:
Expand Down

0 comments on commit 09de232

Please sign in to comment.