From 71e38f20fdec8d89f4c63348ec2ec766fd279119 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20Steinkopf=20S=C3=B8hoel?= Date: Wed, 8 Jan 2025 15:34:15 +0100 Subject: [PATCH] Make realization number span attribute --- src/ert/scheduler/job.py | 54 +++++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/src/ert/scheduler/job.py b/src/ert/scheduler/job.py index 53b33b4e61b..83c8d79d29a 100644 --- a/src/ert/scheduler/job.py +++ b/src/ert/scheduler/job.py @@ -17,7 +17,7 @@ from ert.constant_filenames import ERROR_file from ert.load_status import LoadStatus from ert.storage.realization_storage_state import RealizationStorageState -from ert.trace import tracer +from ert.trace import trace, tracer from .driver import Driver, FailedSubmit @@ -143,6 +143,7 @@ async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None: timeout_task.cancel() sem.release() + @tracer.start_as_current_span(f"{__name__}.run") async def run( self, sem: asyncio.BoundedSemaphore, @@ -150,31 +151,32 @@ async def run( checksum_lock: asyncio.Lock, max_submit: int = 1, ) -> None: - with tracer.start_as_current_span(f"{__name__}.run.realization_{self.iens}"): - self._requested_max_submit = max_submit - for attempt in range(max_submit): - await self._submit_and_run_once(sem) - - if self.returncode.cancelled() or self._scheduler._cancelled: - break - - if self.returncode.result() == 0: - if self._scheduler._manifest_queue is not None: - await self._verify_checksum(checksum_lock) - async with forward_model_ok_lock: - await self._handle_finished_forward_model() - break - - if attempt < max_submit - 1: - message = ( - f"Realization {self.iens} failed, " - f"resubmitting for attempt {attempt+2} of {max_submit}" - ) - logger.warning(message) - self.returncode = asyncio.Future() - self.started.clear() - else: - await self._send(JobState.FAILED) + current_span = trace.get_current_span() + current_span.set_attribute("ert.realization_number", self.iens) + self._requested_max_submit = max_submit + for attempt in range(max_submit): + await self._submit_and_run_once(sem) + + if self.returncode.cancelled() or self._scheduler._cancelled: + break + + if self.returncode.result() == 0: + if self._scheduler._manifest_queue is not None: + await self._verify_checksum(checksum_lock) + async with forward_model_ok_lock: + await self._handle_finished_forward_model() + break + + if attempt < max_submit - 1: + message = ( + f"Realization {self.iens} failed, " + f"resubmitting for attempt {attempt+2} of {max_submit}" + ) + logger.warning(message) + self.returncode = asyncio.Future() + self.started.clear() + else: + await self._send(JobState.FAILED) async def _max_runtime_task(self) -> None: assert self.real.max_runtime is not None