From 905ef7a73bfd4a5d6d984e4dbfd61ed1acb90a74 Mon Sep 17 00:00:00 2001 From: Francesco Nazzaro Date: Fri, 15 Nov 2024 12:18:21 +0100 Subject: [PATCH] Include deleted jobs in processing time algorithm (#140) * Refactor request status handling to ensure finished_at is set for deleted requests * add logs * qa * fix tests --- cads_broker/database.py | 1 - cads_broker/dispatcher.py | 20 ++++++++++---------- tests/test_02_database.py | 10 +++++++++- tests/test_20_dispatcher.py | 2 +- 4 files changed, 20 insertions(+), 13 deletions(-) diff --git a/cads_broker/database.py b/cads_broker/database.py index 9042eeee..19c6bcfa 100644 --- a/cads_broker/database.py +++ b/cads_broker/database.py @@ -467,7 +467,6 @@ def get_users_queue_from_processing_time( interval_clause = sa.sql.and_( SystemRequest.finished_at >= interval_start, SystemRequest.finished_at < interval_stop, - SystemRequest.status != "deleted", SystemRequest.started_at.is_not(None), ) where_clause = sa.sql.or_(interval_clause, SystemRequest.status == "running") diff --git a/cads_broker/dispatcher.py b/cads_broker/dispatcher.py index 6bbeb51e..1512fe43 100644 --- a/cads_broker/dispatcher.py +++ b/cads_broker/dispatcher.py @@ -384,11 +384,8 @@ def manage_dismissed_request(self, request, session): previous_status = dismission_metadata.get("previous_status", "accepted") if dismission_metadata.get("reason", "DismissedRequest") == "PermissionError": request.status = "failed" - request.finished_at = datetime.datetime.now() else: request.status = "deleted" - if request.finished_at is None: - request.finished_at = datetime.datetime.now() if previous_status == "running": self.qos.notify_end_of_request( request, session, scheduler=self.internal_scheduler @@ -398,6 +395,9 @@ def manage_dismissed_request(self, request, session): self.qos.notify_dismission_of_request( request, session, scheduler=self.internal_scheduler ) + # set finished_at if it is not set + if request.finished_at is None: + request.finished_at = datetime.datetime.now() logger.info("job has finished", **db.logger_kwargs(request=request)) return session @@ -683,13 +683,9 @@ def processing_time_priority_algorithm( request, session=session_write, scheduler=self.internal_scheduler ) if can_run and may_run and requests_counter < number_of_requests: - logger.info( - "user priority", - user=user_uid, - request_uid=request.request_uid, - priority=user_cost, + self.submit_request( + request, priority=user_cost, session=session_write ) - self.submit_request(request, session=session_write) may_run = False requests_counter += 1 @@ -722,7 +718,10 @@ def submit_requests( requests_counter += 1 def submit_request( - self, request: db.SystemRequest, session: sa.orm.Session + self, + request: db.SystemRequest, + session: sa.orm.Session, + priority: int | None = None, ) -> None: """Submit the request to the dask scheduler and update the qos rules accordingly.""" request = db.set_request_status( @@ -749,6 +748,7 @@ def submit_request( self.futures[request.request_uid] = future logger.info( "submitted job to scheduler", + priority=priority, **db.logger_kwargs(request=request), ) diff --git a/tests/test_02_database.py b/tests/test_02_database.py index 2d1f4d00..f22e2407 100644 --- a/tests/test_02_database.py +++ b/tests/test_02_database.py @@ -747,6 +747,13 @@ def test_get_users_queue_from_processing_time(session_obj: sa.orm.sessionmaker) started_at=None, finished_at=datetime.datetime.now() - datetime.timedelta(hours=10), ) + request_8 = mock_system_request( + status="deleted", + adaptor_properties_hash=adaptor_properties.hash, + user_uid="user2", + started_at=datetime.datetime.now() - datetime.timedelta(hours=15), + finished_at=datetime.datetime.now() - datetime.timedelta(hours=10), + ) with session_obj() as session: session.add(adaptor_properties) session.add(request_1) @@ -756,6 +763,7 @@ def test_get_users_queue_from_processing_time(session_obj: sa.orm.sessionmaker) session.add(request_5) session.add(request_6) session.add(request_7) + session.add(request_8) session.commit() with session_obj() as session: users_cost = db.get_users_queue_from_processing_time( @@ -763,7 +771,7 @@ def test_get_users_queue_from_processing_time(session_obj: sa.orm.sessionmaker) ) assert users_cost["user3"] == 0 assert users_cost["user1"] == 15 * 60 * 60 - assert users_cost["user2"] == 30 * 60 * 60 + assert users_cost["user2"] == (10 + 20 + 5) * 60 * 60 def test_get_request_result(session_obj: sa.orm.sessionmaker) -> None: diff --git a/tests/test_20_dispatcher.py b/tests/test_20_dispatcher.py index c0063c17..06e157e6 100644 --- a/tests/test_20_dispatcher.py +++ b/tests/test_20_dispatcher.py @@ -147,7 +147,7 @@ def mock_get_users_queue_from_processing_time(): submitted_requests = [] - def mock_submit_request(self, request, session): + def mock_submit_request(self, request, priority, session): submitted_requests.append(request.request_uid) for candidate in candidates: if candidate.request_uid == request.request_uid: