Skip to content

Commit

Permalink
Resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Dec 27, 2023
1 parent f19cf3a commit 2c10863
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ class VeloxColumnarWriteFilesExec(
assert(iter.hasNext)
val cb = iter.next()
val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb)

val numRows = loadedCb.column(0).getLong(0)
val numWrittenRows = loadedCb.column(0).getLong(0)

var updatedPartitions = Set.empty[String]
val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]()
Expand Down Expand Up @@ -121,7 +120,7 @@ class VeloxColumnarWriteFilesExec(
}

// TODO: need to get the partition Internal row?
val stats = BasicWriteTaskStats(Seq.empty, (numRows - 1).toInt, numBytes, numRows)
val stats = BasicWriteTaskStats(Seq.empty, loadedCb.numRows() - 1, numBytes, numWrittenRows)
val summary =
ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ class VeloxParquetDataTypeValidationSuite extends VeloxWholeStageTransformerSuit
dir =>
val write_path = dir.toURI.getPath
val data_path = getClass.getResource("/").getPath + "/data-type-validation-data/type1"
// Spark 3.4 native write doesn't support Timestamp type.
// Velox native write doesn't support Timestamp type.
val df = spark.read.format("parquet").load(data_path).drop("timestamp")
df.write.mode("append").format("parquet").save(write_path)
val parquetDf = spark.read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,18 +128,10 @@ class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils {
"CREATE TABLE t (c int, d long, e long)" +
" STORED AS PARQUET partitioned by (c, d)")
withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") {
if (SparkShimLoader.getSparkVersion.startsWith("3.4")) {
checkNativeStaticPartitionWrite(
"INSERT OVERWRITE TABLE t partition(c=1, d)" +
" SELECT 3 as e, 2 as e",
native = false)
} else {
checkNativeStaticPartitionWrite(
"INSERT OVERWRITE TABLE t partition(c=1, d)" +
" SELECT 3 as e, 2 as e",
native = false)
}

checkNativeStaticPartitionWrite(
"INSERT OVERWRITE TABLE t partition(c=1, d)" +
" SELECT 3 as e, 2 as e",
native = false)
}
checkAnswer(spark.table("t"), Row(3, 1, 2))
}
Expand Down

0 comments on commit 2c10863

Please sign in to comment.