Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
exmy committed Jul 18, 2024
1 parent fe0707c commit eecc0ea
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,16 @@ object FileFormatWriter extends Logging {
statsTrackers = statsTrackers
)

SQLExecution.checkSQLExecutionId(sparkSession)

// propagate the description UUID into the jobs, so that committers
// get an ID guaranteed to be unique.
job.getConfiguration.set("spark.sql.sources.writeJobUUID", description.uuid)

// This call shouldn't be put into the `try` block below because it only initializes and
// prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
committer.setupJob(job)

// We should first sort by partition columns, then bucket id, and finally sorting columns.
val requiredOrdering = partitionColumns.drop(numStaticPartitionCols) ++
bucketIdExpression ++ sortColumns
Expand All @@ -245,16 +255,6 @@ object FileFormatWriter extends Logging {
}
}

SQLExecution.checkSQLExecutionId(sparkSession)

// propagate the description UUID into the jobs, so that committers
// get an ID guaranteed to be unique.
job.getConfiguration.set("spark.sql.sources.writeJobUUID", description.uuid)

// This call shouldn't be put into the `try` block below because it only initializes and
// prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
committer.setupJob(job)

def nativeWrap(plan: SparkPlan) = {
var wrapped: SparkPlan = plan
if (bucketIdExpression.isDefined) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,16 @@ object FileFormatWriter extends Logging {
statsTrackers = statsTrackers
)

SQLExecution.checkSQLExecutionId(sparkSession)

// propagate the description UUID into the jobs, so that committers
// get an ID guaranteed to be unique.
job.getConfiguration.set("spark.sql.sources.writeJobUUID", description.uuid)

// This call shouldn't be put into the `try` block below because it only initializes and
// prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
committer.setupJob(job)

// We should first sort by partition columns, then bucket id, and finally sorting columns.
val requiredOrdering = partitionColumns.drop(numStaticPartitionCols) ++
writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns
Expand All @@ -265,16 +275,6 @@ object FileFormatWriter extends Logging {
}
}

SQLExecution.checkSQLExecutionId(sparkSession)

// propagate the description UUID into the jobs, so that committers
// get an ID guaranteed to be unique.
job.getConfiguration.set("spark.sql.sources.writeJobUUID", description.uuid)

// This call shouldn't be put into the `try` block below because it only initializes and
// prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
committer.setupJob(job)

def nativeWrap(plan: SparkPlan) = {
var wrapped: SparkPlan = plan
if (writerBucketSpec.isDefined) {
Expand Down

0 comments on commit eecc0ea

Please sign in to comment.