Skip to content

Commit

Permalink
Include deleted jobs in processing time algorithm (#140)
Browse files Browse the repository at this point in the history
* Refactor request status handling to ensure finished_at is set for deleted requests

* add logs

* qa

* fix tests
  • Loading branch information
francesconazzaro authored Nov 15, 2024
1 parent 490e98f commit 905ef7a
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 13 deletions.
1 change: 0 additions & 1 deletion cads_broker/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,6 @@ def get_users_queue_from_processing_time(
interval_clause = sa.sql.and_(
SystemRequest.finished_at >= interval_start,
SystemRequest.finished_at < interval_stop,
SystemRequest.status != "deleted",
SystemRequest.started_at.is_not(None),
)
where_clause = sa.sql.or_(interval_clause, SystemRequest.status == "running")
Expand Down
20 changes: 10 additions & 10 deletions cads_broker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,11 +384,8 @@ def manage_dismissed_request(self, request, session):
previous_status = dismission_metadata.get("previous_status", "accepted")
if dismission_metadata.get("reason", "DismissedRequest") == "PermissionError":
request.status = "failed"
request.finished_at = datetime.datetime.now()
else:
request.status = "deleted"
if request.finished_at is None:
request.finished_at = datetime.datetime.now()
if previous_status == "running":
self.qos.notify_end_of_request(
request, session, scheduler=self.internal_scheduler
Expand All @@ -398,6 +395,9 @@ def manage_dismissed_request(self, request, session):
self.qos.notify_dismission_of_request(
request, session, scheduler=self.internal_scheduler
)
# set finished_at if it is not set
if request.finished_at is None:
request.finished_at = datetime.datetime.now()
logger.info("job has finished", **db.logger_kwargs(request=request))
return session

Expand Down Expand Up @@ -683,13 +683,9 @@ def processing_time_priority_algorithm(
request, session=session_write, scheduler=self.internal_scheduler
)
if can_run and may_run and requests_counter < number_of_requests:
logger.info(
"user priority",
user=user_uid,
request_uid=request.request_uid,
priority=user_cost,
self.submit_request(
request, priority=user_cost, session=session_write
)
self.submit_request(request, session=session_write)
may_run = False
requests_counter += 1

Expand Down Expand Up @@ -722,7 +718,10 @@ def submit_requests(
requests_counter += 1

def submit_request(
self, request: db.SystemRequest, session: sa.orm.Session
self,
request: db.SystemRequest,
session: sa.orm.Session,
priority: int | None = None,
) -> None:
"""Submit the request to the dask scheduler and update the qos rules accordingly."""
request = db.set_request_status(
Expand All @@ -749,6 +748,7 @@ def submit_request(
self.futures[request.request_uid] = future
logger.info(
"submitted job to scheduler",
priority=priority,
**db.logger_kwargs(request=request),
)

Expand Down
10 changes: 9 additions & 1 deletion tests/test_02_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,13 @@ def test_get_users_queue_from_processing_time(session_obj: sa.orm.sessionmaker)
started_at=None,
finished_at=datetime.datetime.now() - datetime.timedelta(hours=10),
)
request_8 = mock_system_request(
status="deleted",
adaptor_properties_hash=adaptor_properties.hash,
user_uid="user2",
started_at=datetime.datetime.now() - datetime.timedelta(hours=15),
finished_at=datetime.datetime.now() - datetime.timedelta(hours=10),
)
with session_obj() as session:
session.add(adaptor_properties)
session.add(request_1)
Expand All @@ -756,14 +763,15 @@ def test_get_users_queue_from_processing_time(session_obj: sa.orm.sessionmaker)
session.add(request_5)
session.add(request_6)
session.add(request_7)
session.add(request_8)
session.commit()
with session_obj() as session:
users_cost = db.get_users_queue_from_processing_time(
session, interval_stop=datetime.datetime.now()
)
assert users_cost["user3"] == 0
assert users_cost["user1"] == 15 * 60 * 60
assert users_cost["user2"] == 30 * 60 * 60
assert users_cost["user2"] == (10 + 20 + 5) * 60 * 60


def test_get_request_result(session_obj: sa.orm.sessionmaker) -> None:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_20_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def mock_get_users_queue_from_processing_time():

submitted_requests = []

def mock_submit_request(self, request, session):
def mock_submit_request(self, request, priority, session):
submitted_requests.append(request.request_uid)
for candidate in candidates:
if candidate.request_uid == request.request_uid:
Expand Down

0 comments on commit 905ef7a

Please sign in to comment.