Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50947][PYTHON][SQL][CONNECT] Assign appropriate error class and SparkException for duplicated artifacts #49598

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,12 @@
],
"sqlState" : "42710"
},
"DUPLICATED_ARTIFACT" : {
itholic marked this conversation as resolved.
Show resolved Hide resolved
"message" : [
"Duplicate Artifact: <normalizedRemoteRelativePath>. Artifacts cannot be overwritten."
itholic marked this conversation as resolved.
Show resolved Hide resolved
],
"sqlState" : "42713"
},
"DUPLICATE_ASSIGNMENTS" : {
"message" : [
"The columns or variables <nameList> appear more than once as assignment targets."
Expand Down
12 changes: 8 additions & 4 deletions python/pyspark/sql/tests/connect/client/test_artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
if should_test_connect:
from pyspark.sql.connect.client.artifact import ArtifactManager
from pyspark.sql.connect.client import DefaultChannelBuilder
from pyspark.errors.exceptions.connect import SparkConnectGrpcException
from pyspark.errors import SparkRuntimeException


class ArtifactTestsMixin:
Expand Down Expand Up @@ -73,11 +73,15 @@ def test_artifacts_cannot_be_overwritten(self):
with open(pyfile_path, "w+") as f:
f.write("my_func = lambda: 11")

with self.assertRaisesRegex(
SparkConnectGrpcException, "\\(java.lang.RuntimeException\\) Duplicate Artifact"
):
with self.assertRaises(SparkRuntimeException) as pe:
self.spark.addArtifacts(pyfile_path, pyfile=True)

self.check_error(
exception=pe.exception,
errorClass="DUPLICATED_ARTIFACT",
messageParameters={"normalizedRemoteRelativePath": "pyfiles/my_pyfile.py"},
)

def check_add_zipped_package(self, spark_session):
with tempfile.TemporaryDirectory(prefix="check_add_zipped_package") as d:
package_path = os.path.join(d, "my_zipfile")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import scala.reflect.ClassTag
import org.apache.commons.io.{FilenameUtils, FileUtils}
import org.apache.hadoop.fs.{LocalFileSystem, Path => FSPath}

import org.apache.spark.{JobArtifactSet, JobArtifactState, SparkContext, SparkEnv, SparkException, SparkUnsupportedOperationException}
import org.apache.spark.{JobArtifactSet, JobArtifactState, SparkContext, SparkEnv, SparkException, SparkRuntimeException, SparkUnsupportedOperationException}
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.config.{CONNECT_SCALA_UDF_STUB_PREFIXES, EXECUTOR_USER_CLASS_PATH_FIRST}
import org.apache.spark.sql.{Artifact, SparkSession}
Expand Down Expand Up @@ -216,8 +216,10 @@ class ArtifactManager(session: SparkSession) extends AutoCloseable with Logging
return
}

throw new RuntimeException(s"Duplicate Artifact: $normalizedRemoteRelativePath. " +
"Artifacts cannot be overwritten.")
throw new SparkRuntimeException(
"DUPLICATED_ARTIFACT",
Map("normalizedRemoteRelativePath" -> normalizedRemoteRelativePath.toString)
)
}
transferFile(serverLocalStagingPath, target, deleteSource = deleteStagedFile)

Expand Down
Loading