From 4920ab25b3062c04222823f3c47b8d4d8be7bd97 Mon Sep 17 00:00:00 2001 From: "V.Shkaberda" Date: Thu, 25 Apr 2024 10:16:22 +0300 Subject: [PATCH] Remove unnecessary validation from cncf provider. (#39238) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Шкаберда Вадим Миколайович --- .../operators/custom_object_launcher.py | 3 +- .../operators/test_custom_object_launcher.py | 28 ++++++++++++------- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py b/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py index 439c51e3cb3d3..77d99a0fba01e 100644 --- a/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py +++ b/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py @@ -60,12 +60,11 @@ def validate(self): if self.spec.get("dynamicAllocation", {}).get("enabled"): if not all( [ - self.spec["dynamicAllocation"].get("initialExecutors"), self.spec["dynamicAllocation"].get("minExecutors"), self.spec["dynamicAllocation"].get("maxExecutors"), ] ): - raise AirflowException("Make sure initial/min/max value for dynamic allocation is passed") + raise AirflowException("Make sure min/max value for dynamic allocation is passed") def update_resources(self): if self.spec["driver"].get("container_resources"): diff --git a/tests/providers/cncf/kubernetes/operators/test_custom_object_launcher.py b/tests/providers/cncf/kubernetes/operators/test_custom_object_launcher.py index d33fdd6048f43..3a57fdefdbd8e 100644 --- a/tests/providers/cncf/kubernetes/operators/test_custom_object_launcher.py +++ b/tests/providers/cncf/kubernetes/operators/test_custom_object_launcher.py @@ -64,6 +64,22 @@ def test_spark_job_spec_dynamicAllocation_enabled(self): assert spark_job_spec.spec["dynamicAllocation"]["enabled"] + def test_spark_job_spec_dynamicAllocation_enabled_with_default_initial_executors(self): + entries = { + "spec": { + "dynamicAllocation": { + "enabled": True, + "minExecutors": 1, + "maxExecutors": 2, + }, + "driver": {}, + "executor": {}, + } + } + spark_job_spec = SparkJobSpec(**entries) + + assert spark_job_spec.spec["dynamicAllocation"]["enabled"] + def test_spark_job_spec_dynamicAllocation_enabled_with_invalid_config(self): entries = { "spec": { @@ -78,19 +94,11 @@ def test_spark_job_spec_dynamicAllocation_enabled_with_invalid_config(self): } } - cloned_entries = entries.copy() - cloned_entries["spec"]["dynamicAllocation"]["initialExecutors"] = None - with pytest.raises( - AirflowException, - match="Make sure initial/min/max value for dynamic allocation is passed", - ): - SparkJobSpec(**cloned_entries) - cloned_entries = entries.copy() cloned_entries["spec"]["dynamicAllocation"]["minExecutors"] = None with pytest.raises( AirflowException, - match="Make sure initial/min/max value for dynamic allocation is passed", + match="Make sure min/max value for dynamic allocation is passed", ): SparkJobSpec(**cloned_entries) @@ -98,7 +106,7 @@ def test_spark_job_spec_dynamicAllocation_enabled_with_invalid_config(self): cloned_entries["spec"]["dynamicAllocation"]["maxExecutors"] = None with pytest.raises( AirflowException, - match="Make sure initial/min/max value for dynamic allocation is passed", + match="Make sure min/max value for dynamic allocation is passed", ): SparkJobSpec(**cloned_entries)