Skip to content

Commit

Permalink
[SPARK-50947][PYTHON][SQL][CONNECT] Assign appropriate error class an…
Browse files Browse the repository at this point in the history
…d SparkException for duplicated artifacts

### What changes were proposed in this pull request?

This PR proposes to assign appropriate error class and SparkException for duplicated artifacts.

### Why are the changes needed?

To convert SparkConnectGrpcException into proper PySparkException so that we can ensure handling exceptions from Spark Connect Server properly.

### Does this PR introduce _any_ user-facing change?

No API changes, but the user-facing error message would be improved.

### How was this patch tested?

Updated the existing test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #49598 from itholic/duplicated_artifact.

Authored-by: Haejoon Lee <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
  • Loading branch information
itholic authored and MaxGekk committed Jan 23, 2025
1 parent 620f552 commit 6e4240f
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 7 deletions.
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@
],
"sqlState" : "22003"
},
"ARTIFACT_ALREADY_EXISTS" : {
"message" : [
"The artifact <normalizedRemoteRelativePath> already exists. Please choose a different name for the new artifact because it cannot be overwritten."
],
"sqlState" : "42713"
},
"ASSIGNMENT_ARITY_MISMATCH" : {
"message" : [
"The number of columns or variables assigned or aliased: <numTarget> does not match the number of source expressions: <numExpr>."
Expand Down
12 changes: 8 additions & 4 deletions python/pyspark/sql/tests/connect/client/test_artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
if should_test_connect:
from pyspark.sql.connect.client.artifact import ArtifactManager
from pyspark.sql.connect.client import DefaultChannelBuilder
from pyspark.errors.exceptions.connect import SparkConnectGrpcException
from pyspark.errors import SparkRuntimeException


class ArtifactTestsMixin:
Expand Down Expand Up @@ -73,11 +73,15 @@ def test_artifacts_cannot_be_overwritten(self):
with open(pyfile_path, "w+") as f:
f.write("my_func = lambda: 11")

with self.assertRaisesRegex(
SparkConnectGrpcException, "\\(java.lang.RuntimeException\\) Duplicate Artifact"
):
with self.assertRaises(SparkRuntimeException) as pe:
self.spark.addArtifacts(pyfile_path, pyfile=True)

self.check_error(
exception=pe.exception,
errorClass="ARTIFACT_ALREADY_EXISTS",
messageParameters={"normalizedRemoteRelativePath": "pyfiles/my_pyfile.py"},
)

def check_add_zipped_package(self, spark_session):
with tempfile.TemporaryDirectory(prefix="check_add_zipped_package") as d:
package_path = os.path.join(d, "my_zipfile")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import scala.reflect.ClassTag
import org.apache.commons.io.{FilenameUtils, FileUtils}
import org.apache.hadoop.fs.{LocalFileSystem, Path => FSPath}

import org.apache.spark.{JobArtifactSet, JobArtifactState, SparkContext, SparkEnv, SparkException, SparkUnsupportedOperationException}
import org.apache.spark.{JobArtifactSet, JobArtifactState, SparkContext, SparkEnv, SparkException, SparkRuntimeException, SparkUnsupportedOperationException}
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.config.{CONNECT_SCALA_UDF_STUB_PREFIXES, EXECUTOR_USER_CLASS_PATH_FIRST}
import org.apache.spark.sql.{Artifact, SparkSession}
Expand Down Expand Up @@ -216,8 +216,10 @@ class ArtifactManager(session: SparkSession) extends AutoCloseable with Logging
return
}

throw new RuntimeException(s"Duplicate Artifact: $normalizedRemoteRelativePath. " +
"Artifacts cannot be overwritten.")
throw new SparkRuntimeException(
"ARTIFACT_ALREADY_EXISTS",
Map("normalizedRemoteRelativePath" -> normalizedRemoteRelativePath.toString)
)
}
transferFile(serverLocalStagingPath, target, deleteSource = deleteStagedFile)

Expand Down

0 comments on commit 6e4240f

Please sign in to comment.