Skip to content

Commit

Permalink
[GLUTEN-6645][VL] Remove VeloxWriteQueue which may introduce deadlock (
Browse files Browse the repository at this point in the history
  • Loading branch information
WangGuangxin authored Aug 2, 2024
1 parent 1dad0e6 commit 807d9f7
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 203 deletions.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
*/
package org.apache.spark.sql.execution.datasources.velox

import org.apache.gluten.GlutenConfig
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.columnarbatch.{ColumnarBatches, ColumnarBatchJniWrapper}
import org.apache.gluten.datasource.DatasourceJniWrapper
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.exec.Runtimes
Expand All @@ -31,7 +30,6 @@ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.utils.SparkArrowUtil
import org.apache.spark.util.TaskResources

import com.google.common.base.Preconditions
import org.apache.arrow.c.ArrowSchema
Expand Down Expand Up @@ -74,29 +72,26 @@ trait VeloxFormatWriterInjects extends GlutenFormatWriterInjectsBase {
cSchema.close()
}

// FIXME: remove this once we support push-based write.
val queueSize = context.getConfiguration.getInt(GlutenConfig.VELOX_WRITER_QUEUE_SIZE.key, 64)

val writeQueue =
new VeloxWriteQueue(
TaskResources.getLocalTaskContext(),
dsHandle,
arrowSchema,
allocator,
datasourceJniWrapper,
filePath,
queueSize)

new OutputWriter {
override def write(row: InternalRow): Unit = {
val batch = row.asInstanceOf[FakeRow].batch
Preconditions.checkState(ColumnarBatches.isLightBatch(batch))
ColumnarBatches.retain(batch)
writeQueue.enqueue(batch)
val batchHandle = {
if (batch.numCols == 0) {
// the operation will find a zero column batch from a task-local pool
ColumnarBatchJniWrapper.create(runtime).getForEmptySchema(batch.numRows)
} else {
val offloaded =
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance, batch)
ColumnarBatches.getNativeHandle(offloaded)
}
}
datasourceJniWrapper.writeBatch(dsHandle, batchHandle)
batch.close()
}

override def close(): Unit = {
writeQueue.close()
datasourceJniWrapper.close(dsHandle)
}

Expand Down
14 changes: 4 additions & 10 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1093,22 +1093,16 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_datasource_DatasourceJniWrapper_cl
JNI_METHOD_END()
}

JNIEXPORT void JNICALL Java_org_apache_gluten_datasource_DatasourceJniWrapper_write( // NOLINT
JNIEXPORT void JNICALL Java_org_apache_gluten_datasource_DatasourceJniWrapper_writeBatch( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong dsHandle,
jobject jIter) {
jlong batchHandle) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);
auto datasource = ObjectStore::retrieve<Datasource>(dsHandle);
auto iter = makeJniColumnarBatchIterator(env, jIter, ctx, nullptr);
while (true) {
auto batch = iter->next();
if (!batch) {
break;
}
datasource->write(batch);
}
auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
datasource->write(batch);
JNI_METHOD_END()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.gluten.exec.Runtime;
import org.apache.gluten.exec.RuntimeAware;
import org.apache.gluten.init.JniUtils;
import org.apache.gluten.vectorized.ColumnarBatchInIterator;

import org.apache.spark.sql.execution.datasources.BlockStripes;

Expand Down Expand Up @@ -53,7 +52,7 @@ public long nativeInitDatasource(String filePath, long cSchema, Map<String, Stri

public native void close(long dsHandle);

public native void write(long dsHandle, ColumnarBatchInIterator iterator);
public native void writeBatch(long dsHandle, long batchHandle);

public native BlockStripes splitBlockByPartitionAndBucket(
long blockAddress, int[] partitionColIndice, boolean hasBucket);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1565,13 +1565,6 @@ object GlutenConfig {
.booleanConf
.createOptional

val VELOX_WRITER_QUEUE_SIZE =
buildConf("spark.gluten.sql.velox.writer.queue.size")
.internal()
.doc("This is config to specify the velox writer queue size")
.intConf
.createWithDefault(64)

val NATIVE_HIVEFILEFORMAT_WRITER_ENABLED =
buildConf("spark.gluten.sql.native.hive.writer.enabled")
.internal()
Expand Down

0 comments on commit 807d9f7

Please sign in to comment.