Skip to content

Commit

Permalink
[SPARK-46231][PYTHON] Migrate all remaining NotImplementedError & `…
Browse files Browse the repository at this point in the history
…TypeError` into PySpark error framework

### What changes were proposed in this pull request?

This PR proposes to migrate all remaining `NotImplementedError` and `TypeError`  from `pyspark/sql/*` into PySpark error framework, `PySparkNotImplementedError` with assigning dedicated error classes.

### Why are the changes needed?

To improve the error handling in PySpark.

### Does this PR introduce _any_ user-facing change?

No API changes, but the user-facing error messages will be improved.

### How was this patch tested?

The existing CI should pass.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#44148 from itholic/not_impl_and_type.

Authored-by: Haejoon Lee <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
itholic authored and dongjoon-hyun committed Dec 4, 2023
1 parent 37d19b9 commit 9666bf3
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 5 deletions.
21 changes: 17 additions & 4 deletions python/pyspark/sql/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from pyspark.sql import Row
from pyspark.sql.types import StructType
from pyspark.errors import PySparkNotImplementedError

if TYPE_CHECKING:
from pyspark.sql._typing import OptionalPrimitiveType
Expand Down Expand Up @@ -103,7 +104,10 @@ def schema(self) -> Union[StructType, str]:
>>> def schema(self):
... return StructType().add("a", "int").add("b", "string")
"""
raise NotImplementedError
raise PySparkNotImplementedError(
error_class="NOT_IMPLEMENTED",
message_parameters={"feature": "schema"},
)

def reader(self, schema: StructType) -> "DataSourceReader":
"""
Expand All @@ -121,7 +125,10 @@ def reader(self, schema: StructType) -> "DataSourceReader":
reader : DataSourceReader
A reader instance for this data source.
"""
raise NotImplementedError
raise PySparkNotImplementedError(
error_class="NOT_IMPLEMENTED",
message_parameters={"feature": "reader"},
)

def writer(self, schema: StructType, saveMode: str) -> "DataSourceWriter":
"""
Expand All @@ -142,7 +149,10 @@ def writer(self, schema: StructType, saveMode: str) -> "DataSourceWriter":
writer : DataSourceWriter
A writer instance for this data source.
"""
raise NotImplementedError
raise PySparkNotImplementedError(
error_class="NOT_IMPLEMENTED",
message_parameters={"feature": "writer"},
)


class InputPartition:
Expand Down Expand Up @@ -239,7 +249,10 @@ def partitions(self) -> Sequence[InputPartition]:
>>> def partitions(self):
... return [RangeInputPartition(1, 3), RangeInputPartition(5, 10)]
"""
raise NotImplementedError
raise PySparkNotImplementedError(
error_class="NOT_IMPLEMENTED",
message_parameters={"feature": "partitions"},
)

@abstractmethod
def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,9 @@ def biased_sum(v, w=None):


class GroupedAggPandasUDFTests(GroupedAggPandasUDFTestsMixin, ReusedSQLTestCase):
def test_unsupported_types(self):
super().test_unsupported_types()

pass


Expand Down
8 changes: 7 additions & 1 deletion python/pyspark/sql/udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,13 @@ def returnType(self) -> DataType:
try:
# StructType is not yet allowed as a return type, explicitly check here to fail fast
if isinstance(self._returnType_placeholder, StructType):
raise TypeError
raise PySparkNotImplementedError(
error_class="NOT_IMPLEMENTED",
message_parameters={
"feature": f"Invalid return type with grouped aggregate Pandas UDFs: "
f"{self._returnType_placeholder}"
},
)
to_arrow_type(self._returnType_placeholder)
except TypeError:
raise PySparkNotImplementedError(
Expand Down

0 comments on commit 9666bf3

Please sign in to comment.