Skip to content

Commit

Permalink
Remove unnecessary validation from cncf provider. (apache#39238)
Browse files Browse the repository at this point in the history
Co-authored-by: Шкаберда Вадим Миколайович <[email protected]>
  • Loading branch information
VShkaberda and Шкаберда Вадим Миколайович authored Apr 25, 2024
1 parent 6cb9def commit 4920ab2
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,11 @@ def validate(self):
if self.spec.get("dynamicAllocation", {}).get("enabled"):
if not all(
[
self.spec["dynamicAllocation"].get("initialExecutors"),
self.spec["dynamicAllocation"].get("minExecutors"),
self.spec["dynamicAllocation"].get("maxExecutors"),
]
):
raise AirflowException("Make sure initial/min/max value for dynamic allocation is passed")
raise AirflowException("Make sure min/max value for dynamic allocation is passed")

def update_resources(self):
if self.spec["driver"].get("container_resources"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,22 @@ def test_spark_job_spec_dynamicAllocation_enabled(self):

assert spark_job_spec.spec["dynamicAllocation"]["enabled"]

def test_spark_job_spec_dynamicAllocation_enabled_with_default_initial_executors(self):
entries = {
"spec": {
"dynamicAllocation": {
"enabled": True,
"minExecutors": 1,
"maxExecutors": 2,
},
"driver": {},
"executor": {},
}
}
spark_job_spec = SparkJobSpec(**entries)

assert spark_job_spec.spec["dynamicAllocation"]["enabled"]

def test_spark_job_spec_dynamicAllocation_enabled_with_invalid_config(self):
entries = {
"spec": {
Expand All @@ -78,27 +94,19 @@ def test_spark_job_spec_dynamicAllocation_enabled_with_invalid_config(self):
}
}

cloned_entries = entries.copy()
cloned_entries["spec"]["dynamicAllocation"]["initialExecutors"] = None
with pytest.raises(
AirflowException,
match="Make sure initial/min/max value for dynamic allocation is passed",
):
SparkJobSpec(**cloned_entries)

cloned_entries = entries.copy()
cloned_entries["spec"]["dynamicAllocation"]["minExecutors"] = None
with pytest.raises(
AirflowException,
match="Make sure initial/min/max value for dynamic allocation is passed",
match="Make sure min/max value for dynamic allocation is passed",
):
SparkJobSpec(**cloned_entries)

cloned_entries = entries.copy()
cloned_entries["spec"]["dynamicAllocation"]["maxExecutors"] = None
with pytest.raises(
AirflowException,
match="Make sure initial/min/max value for dynamic allocation is passed",
match="Make sure min/max value for dynamic allocation is passed",
):
SparkJobSpec(**cloned_entries)

Expand Down

0 comments on commit 4920ab2

Please sign in to comment.