Skip to content

Commit

Permalink
Merge branch 'feature/MaxiBoether/emptyevals' into feature/MaxiBoethe…
Browse files Browse the repository at this point in the history
…r/sigmod-revision-2
  • Loading branch information
MaxiBoether committed Sep 24, 2024
2 parents c9cefce + 7d4c548 commit 8efde93
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 41 deletions.
17 changes: 13 additions & 4 deletions modyn/selector/internal/grpc/selector_grpc_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,13 @@ def get_sample_keys_and_weights( # pylint: disable-next=unused-argument
pid = os.getpid()

logger.info(
f"[{pid}][{tid}][Pipeline {pipeline_id}]: Fetching samples for trigger id {trigger_id}"
+ f" and worker id {worker_id} and partition id {partition_id}"
"[%s][%s][Pipeline %s]: Fetching samples for trigger id %s and worker id %s and partition id %s",
pid,
tid,
pipeline_id,
trigger_id,
worker_id,
partition_id,
)

samples = self.selector_manager.get_sample_keys_and_weights(pipeline_id, trigger_id, worker_id, partition_id)
Expand Down Expand Up @@ -90,8 +95,12 @@ def inform_data_and_trigger(self, request: DataInformRequest, context: grpc.Serv
tid = threading.get_native_id()
pid = os.getpid()
logger.info(
f"[{pid}][{tid}][Pipeline {pipeline_id}]: Selector is informed of {len(keys)} new data points"
+ f"+ trigger at timestamp {timestamps[-1] if len(keys) > 0 else 'n/a'}"
"[%s][%s][Pipeline %s]: Selector is informed of %s new data points + trigger at timestamp %s",
pid,
tid,
pipeline_id,
len(keys),
timestamps[-1] if keys else "n/a",
)

trigger_id, log = self.selector_manager.inform_data_and_trigger(pipeline_id, keys, timestamps, labels)
Expand Down
2 changes: 1 addition & 1 deletion modyn/supervisor/internal/triggers/datadrifttrigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def _run_detection(
metric_result.distance
)

logger.info(f"[DataDriftDetector][Dataset {self.dataloader_info.dataset_id}]" + f"[Result] {drift_results}")
logger.info("[DataDriftDetector][Dataset %s][Result] %s", self.dataloader_info.dataset_id, drift_results)
if is_warmup:
return False, {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,14 +486,6 @@ def test__run_evaluation_retain_metrics_before_real_exception(test_connect_to_st
get_result_req = EvaluationResultRequest(evaluation_id=evaluation_id)
get_result_resp = evaluator.get_evaluation_result(get_result_req, None)
assert not get_result_resp.valid
# evaluation on the last interval was not finished
assert len(get_result_resp.evaluation_results) == len(intervals) - 1
assert get_result_resp.evaluation_results[0].interval_index == 0
assert len(get_result_resp.evaluation_results[0].evaluation_data) == 2
assert get_result_resp.evaluation_results[0].evaluation_data[0].metric == "Accuracy"
assert get_result_resp.evaluation_results[0].evaluation_data[0].result == pytest.approx(0.5)
assert get_result_resp.evaluation_results[0].evaluation_data[1].metric == "F1Score"
assert get_result_resp.evaluation_results[0].evaluation_data[1].result == pytest.approx(0.6)


@patch.object(EvaluatorGRPCServicer, "connect_to_storage", return_value=DummyStorageStub())
Expand Down Expand Up @@ -536,11 +528,6 @@ def test_get_evaluation_result_incomplete_metric(test_is_alive, test_connect_to_
metric_result_queue.put((1, [("Accuracy", 0.5)]))
response = evaluator.get_evaluation_result(EvaluationResultRequest(evaluation_id=3), None)
assert not response.valid
assert len(response.evaluation_results) == 1
assert response.evaluation_results[0].interval_index == 1
assert len(response.evaluation_results[0].evaluation_data) == 1
assert response.evaluation_results[0].evaluation_data[0].result == pytest.approx(0.5)
assert response.evaluation_results[0].evaluation_data[0].metric == "Accuracy"


@patch.object(EvaluatorGRPCServicer, "connect_to_storage", return_value=DummyStorageStub())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,11 @@ def test_single_batched_evaluation_mixed(
eval_req2 = dummy_eval_request()
eval_req.interval_start = 64
eval_req.interval_end = 14

request = MagicMock()
request.metrics = ["Accuracy"]
test_prepare_evaluation_request.return_value = request

results = evaluation_executor._single_batched_evaluation(
[
(eval_req.interval_start, eval_req.interval_end),
Expand All @@ -317,4 +322,4 @@ def test_single_batched_evaluation_mixed(
)
test_wait_for_evaluation_completion.assert_called_once()
test_get_evaluation_results.assert_called_once()
test_cleanup_evaluations.assert_called_once()
assert test_cleanup_evaluations.call_count == 2
36 changes: 14 additions & 22 deletions modyn/tests/supervisor/internal/test_grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,30 +368,22 @@ def test_wait_for_evaluation_completion(*args):
assert handler.evaluator is not None

with patch.object(handler.evaluator, "get_evaluation_status") as status_method:
status_method.side_effect = [
EvaluationStatusResponse(valid=True, is_running=True),
EvaluationStatusResponse(valid=False),
EvaluationStatusResponse(valid=True, is_running=False),
]
with pytest.raises(RuntimeError):
handler.wait_for_evaluation_completion(10)
assert status_method.call_count == 2
with patch.object(handler.evaluator, "cleanup_evaluations") as _:
status_method.side_effect = [
EvaluationStatusResponse(valid=True, is_running=True),
EvaluationStatusResponse(valid=True, is_running=False),
]

status_method.reset_mock()
status_method.side_effect = [
EvaluationStatusResponse(valid=True, exception="Some error"),
EvaluationStatusResponse(valid=True, is_running=False),
]
assert not handler.wait_for_evaluation_completion(10)
assert status_method.call_count == 1
assert handler.wait_for_evaluation_completion(10)
assert status_method.call_count == 2

status_method.reset_mock()
status_method.side_effect = [
EvaluationStatusResponse(valid=True, is_running=True),
EvaluationStatusResponse(valid=True, is_running=False),
]
assert handler.wait_for_evaluation_completion(10)
assert status_method.call_count == 2
status_method.reset_mock()
status_method.side_effect = [
EvaluationStatusResponse(valid=True, exception="Some error"),
EvaluationStatusResponse(valid=True, is_running=False),
]
assert not handler.wait_for_evaluation_completion(10)
assert status_method.call_count == 1


@patch("modyn.supervisor.internal.grpc_handler.grpc_connection_established", return_value=True)
Expand Down

0 comments on commit 8efde93

Please sign in to comment.