From 807d9f77cb412f3ab90ea9e15408019c52941399 Mon Sep 17 00:00:00 2001 From: WangGuangxin Date: Fri, 2 Aug 2024 11:26:48 +0800 Subject: [PATCH] [GLUTEN-6645][VL] Remove VeloxWriteQueue which may introduce deadlock (#6646) --- .../VeloxColumnarBatchIterator.scala | 76 ---------------- .../datasources/VeloxWriteQueue.scala | 90 ------------------- .../velox/VeloxFormatWriterInjects.scala | 31 +++---- cpp/core/jni/JniWrapper.cc | 14 +-- .../datasource/DatasourceJniWrapper.java | 3 +- .../org/apache/gluten/GlutenConfig.scala | 7 -- 6 files changed, 18 insertions(+), 203 deletions(-) delete mode 100644 backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxColumnarBatchIterator.scala delete mode 100644 backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxWriteQueue.scala diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxColumnarBatchIterator.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxColumnarBatchIterator.scala deleted file mode 100644 index 0e6aceddfc1a..000000000000 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxColumnarBatchIterator.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.datasources - -import org.apache.gluten.exception.GlutenException - -import org.apache.spark.sql.execution.datasources.VeloxWriteQueue.EOS_BATCH -import org.apache.spark.sql.vectorized.ColumnarBatch - -import org.apache.arrow.memory.BufferAllocator -import org.apache.arrow.vector.types.pojo.Schema - -import java.util.concurrent.{ArrayBlockingQueue, TimeUnit} - -class VeloxColumnarBatchIterator(schema: Schema, allocator: BufferAllocator, queueSize: Int) - extends Iterator[ColumnarBatch] - with AutoCloseable { - private val writeQueue = new ArrayBlockingQueue[ColumnarBatch](queueSize) - private var currentBatch: Option[ColumnarBatch] = None - - def enqueue(batch: ColumnarBatch): Unit = { - // Throw exception if the queue is full. - if (!writeQueue.offer(batch, 30L, TimeUnit.MINUTES)) { - throw new GlutenException("VeloxParquetWriter: Timeout waiting for adding data") - } - } - - override def hasNext: Boolean = { - val batch = - try { - writeQueue.poll(30L, TimeUnit.MINUTES) - } catch { - case _: InterruptedException => - Thread.currentThread().interrupt() - EOS_BATCH - } - if (batch == null) { - throw new GlutenException("VeloxParquetWriter: Timeout waiting for data") - } - if (batch == EOS_BATCH) { - return false - } - currentBatch = Some(batch) - true - } - - override def next(): ColumnarBatch = { - try { - currentBatch match { - case Some(b) => b - case _ => - throw new IllegalStateException("VeloxParquetWriter: Fatal: Call hasNext() first!") - } - } finally { - currentBatch = None - } - } - - override def close(): Unit = { - allocator.close() - } -} diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxWriteQueue.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxWriteQueue.scala deleted file mode 100644 index 6e3e64796b32..000000000000 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxWriteQueue.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.datasources - -import org.apache.gluten.datasource.DatasourceJniWrapper -import org.apache.gluten.utils.iterator.Iterators -import org.apache.gluten.vectorized.ColumnarBatchInIterator - -import org.apache.spark.TaskContext -import org.apache.spark.sql.execution.datasources.VeloxWriteQueue.EOS_BATCH -import org.apache.spark.sql.vectorized.ColumnarBatch - -import org.apache.arrow.memory.BufferAllocator -import org.apache.arrow.vector.types.pojo.Schema - -import java.util.UUID -import java.util.concurrent.atomic.AtomicReference -import java.util.regex.Pattern - -import scala.collection.JavaConverters._ - -// TODO: This probably can be removed: Velox's Parquet writer already supports push-based write. -class VeloxWriteQueue( - tc: TaskContext, - dsHandle: Long, - schema: Schema, - allocator: BufferAllocator, - datasourceJniWrapper: DatasourceJniWrapper, - outputFileURI: String, - queueSize: Int) - extends AutoCloseable { - private val scanner = new VeloxColumnarBatchIterator(schema, allocator, queueSize) - private val writeException = new AtomicReference[Throwable] - - private val writeThread = new Thread( - () => { - TaskContext.setTaskContext(tc) - try { - datasourceJniWrapper.write( - dsHandle, - new ColumnarBatchInIterator( - Iterators.wrap(scanner).recyclePayload(_.close()).create().asJava)) - } catch { - case e: Exception => - writeException.set(e) - } - }, - "VeloxWriteQueue - " + UUID.randomUUID().toString - ) - - writeThread.start() - - private def checkWriteException(): Unit = { - // check if VeloxWriteQueue thread was failed - val exception = writeException.get() - if (exception != null) { - throw exception - } - } - - def enqueue(batch: ColumnarBatch): Unit = { - scanner.enqueue(batch) - checkWriteException() - } - - override def close(): Unit = { - scanner.enqueue(EOS_BATCH) - writeThread.join() - checkWriteException() - } -} - -object VeloxWriteQueue { - val EOS_BATCH = new ColumnarBatch(null) - val TAILING_FILENAME_REGEX = Pattern.compile("^(.*)/([^/]+)$") -} diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala index 6901bfffdf5e..7da4da5f0784 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala @@ -16,8 +16,7 @@ */ package org.apache.spark.sql.execution.datasources.velox -import org.apache.gluten.GlutenConfig -import org.apache.gluten.columnarbatch.ColumnarBatches +import org.apache.gluten.columnarbatch.{ColumnarBatches, ColumnarBatchJniWrapper} import org.apache.gluten.datasource.DatasourceJniWrapper import org.apache.gluten.exception.GlutenException import org.apache.gluten.exec.Runtimes @@ -31,7 +30,6 @@ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.sql.utils.SparkArrowUtil -import org.apache.spark.util.TaskResources import com.google.common.base.Preconditions import org.apache.arrow.c.ArrowSchema @@ -74,29 +72,26 @@ trait VeloxFormatWriterInjects extends GlutenFormatWriterInjectsBase { cSchema.close() } - // FIXME: remove this once we support push-based write. - val queueSize = context.getConfiguration.getInt(GlutenConfig.VELOX_WRITER_QUEUE_SIZE.key, 64) - - val writeQueue = - new VeloxWriteQueue( - TaskResources.getLocalTaskContext(), - dsHandle, - arrowSchema, - allocator, - datasourceJniWrapper, - filePath, - queueSize) - new OutputWriter { override def write(row: InternalRow): Unit = { val batch = row.asInstanceOf[FakeRow].batch Preconditions.checkState(ColumnarBatches.isLightBatch(batch)) ColumnarBatches.retain(batch) - writeQueue.enqueue(batch) + val batchHandle = { + if (batch.numCols == 0) { + // the operation will find a zero column batch from a task-local pool + ColumnarBatchJniWrapper.create(runtime).getForEmptySchema(batch.numRows) + } else { + val offloaded = + ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance, batch) + ColumnarBatches.getNativeHandle(offloaded) + } + } + datasourceJniWrapper.writeBatch(dsHandle, batchHandle) + batch.close() } override def close(): Unit = { - writeQueue.close() datasourceJniWrapper.close(dsHandle) } diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index f39f9c92333e..4fa45d9cbe06 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -1093,22 +1093,16 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_datasource_DatasourceJniWrapper_cl JNI_METHOD_END() } -JNIEXPORT void JNICALL Java_org_apache_gluten_datasource_DatasourceJniWrapper_write( // NOLINT +JNIEXPORT void JNICALL Java_org_apache_gluten_datasource_DatasourceJniWrapper_writeBatch( // NOLINT JNIEnv* env, jobject wrapper, jlong dsHandle, - jobject jIter) { + jlong batchHandle) { JNI_METHOD_START auto ctx = gluten::getRuntime(env, wrapper); auto datasource = ObjectStore::retrieve(dsHandle); - auto iter = makeJniColumnarBatchIterator(env, jIter, ctx, nullptr); - while (true) { - auto batch = iter->next(); - if (!batch) { - break; - } - datasource->write(batch); - } + auto batch = ObjectStore::retrieve(batchHandle); + datasource->write(batch); JNI_METHOD_END() } diff --git a/gluten-data/src/main/java/org/apache/gluten/datasource/DatasourceJniWrapper.java b/gluten-data/src/main/java/org/apache/gluten/datasource/DatasourceJniWrapper.java index 5a34196c4eb7..11ed3fb7df8c 100644 --- a/gluten-data/src/main/java/org/apache/gluten/datasource/DatasourceJniWrapper.java +++ b/gluten-data/src/main/java/org/apache/gluten/datasource/DatasourceJniWrapper.java @@ -19,7 +19,6 @@ import org.apache.gluten.exec.Runtime; import org.apache.gluten.exec.RuntimeAware; import org.apache.gluten.init.JniUtils; -import org.apache.gluten.vectorized.ColumnarBatchInIterator; import org.apache.spark.sql.execution.datasources.BlockStripes; @@ -53,7 +52,7 @@ public long nativeInitDatasource(String filePath, long cSchema, Map