diff --git a/backends-velox/src/main/java/io/glutenproject/spark/sql/execution/datasources/velox/DatasourceJniWrapper.java b/backends-velox/src/main/java/io/glutenproject/spark/sql/execution/datasources/velox/DatasourceJniWrapper.java index 30f65ed11c7f..6a6c8d9c47e3 100644 --- a/backends-velox/src/main/java/io/glutenproject/spark/sql/execution/datasources/velox/DatasourceJniWrapper.java +++ b/backends-velox/src/main/java/io/glutenproject/spark/sql/execution/datasources/velox/DatasourceJniWrapper.java @@ -30,16 +30,26 @@ public class DatasourceJniWrapper extends JniInitialized { public DatasourceJniWrapper() throws IOException {} public long nativeInitDatasource( - String filePath, long cSchema, long memoryManagerId, Map options) { - return nativeInitDatasource(filePath, cSchema, memoryManagerId, JniUtils.toNativeConf(options)); + String filePath, + long cSchema, + long executionCtxHandle, + long memoryManagerHandle, + Map options) { + return nativeInitDatasource( + filePath, cSchema, executionCtxHandle, memoryManagerHandle, JniUtils.toNativeConf(options)); } public native long nativeInitDatasource( - String filePath, long cSchema, long memoryManagerId, byte[] options); + String filePath, + long cSchema, + long executionCtxHandle, + long memoryManagerHandle, + byte[] options); - public native void inspectSchema(long instanceId, long cSchemaAddress); + public native void inspectSchema(long executionCtxHandle, long dsHandle, long cSchemaAddress); - public native void close(long instanceId); + public native void close(long executionCtxHandle, long dsHandle); - public native void write(long instanceId, VeloxColumnarBatchIterator iterator); + public native void write( + long executionCtxHandle, long dsHandle, VeloxColumnarBatchIterator iterator); } diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecHandler.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecHandler.scala index dc5f51687637..afd530029b6a 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecHandler.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecHandler.scala @@ -19,6 +19,7 @@ package io.glutenproject.backendsapi.velox import io.glutenproject.GlutenConfig import io.glutenproject.backendsapi.SparkPlanExecApi import io.glutenproject.columnarbatch.ColumnarBatches +import io.glutenproject.exec.ExecutionCtxs import io.glutenproject.execution._ import io.glutenproject.expression._ import io.glutenproject.expression.ConverterUtils.FunctionConfig @@ -288,10 +289,11 @@ class SparkPlanExecHandler extends SparkPlanExecApi { } else { val handleArray = input.map(ColumnarBatches.getNativeHandle).toArray val serializeResult = ColumnarBatchSerializerJniWrapper.INSTANCE.serialize( + ExecutionCtxs.contextInstance().getHandle, handleArray, NativeMemoryManagers .contextInstance("BroadcastRelation") - .getNativeInstanceId) + .getNativeInstanceHandle) input.foreach(ColumnarBatches.release) Iterator((serializeResult.getNumRows, serializeResult.getSerialized)) } diff --git a/backends-velox/src/main/scala/io/glutenproject/execution/RowToVeloxColumnarExec.scala b/backends-velox/src/main/scala/io/glutenproject/execution/RowToVeloxColumnarExec.scala index 2f629c1bc4e1..bd3de96b5942 100644 --- a/backends-velox/src/main/scala/io/glutenproject/execution/RowToVeloxColumnarExec.scala +++ b/backends-velox/src/main/scala/io/glutenproject/execution/RowToVeloxColumnarExec.scala @@ -18,6 +18,7 @@ package io.glutenproject.execution import io.glutenproject.backendsapi.velox.Validator import io.glutenproject.columnarbatch.ColumnarBatches +import io.glutenproject.exec.ExecutionCtxs import io.glutenproject.memory.arrowalloc.ArrowBufferAllocators import io.glutenproject.memory.nmm.NativeMemoryManagers import io.glutenproject.utils.ArrowAbiUtil @@ -63,25 +64,27 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends RowToColumnarExecBas } else { val arrowSchema = SparkArrowUtil.toArrowSchema(localSchema, SQLConf.get.sessionLocalTimeZone) + val executionCtxHandle = ExecutionCtxs.contextInstance().getHandle val jniWrapper = new NativeRowToColumnarJniWrapper() val allocator = ArrowBufferAllocators.contextInstance() val cSchema = ArrowSchema.allocateNew(allocator) var closed = false - val r2cId = + val r2cHandle = try { ArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema) jniWrapper.init( cSchema.memoryAddress(), + executionCtxHandle, NativeMemoryManagers .contextInstance("RowToColumnar") - .getNativeInstanceId) + .getNativeInstanceHandle) } finally { cSchema.close() } - TaskResources.addRecycler(s"RowToColumnar_$r2cId", 100) { + TaskResources.addRecycler(s"RowToColumnar_$r2cHandle", 100) { if (!closed) { - jniWrapper.close(r2cId) + jniWrapper.close(executionCtxHandle, r2cHandle) closed = true } } @@ -91,7 +94,7 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends RowToColumnarExecBas override def hasNext: Boolean = { val itHasNext = rowIterator.hasNext if (!itHasNext && !closed) { - jniWrapper.close(r2cId) + jniWrapper.close(executionCtxHandle, r2cHandle) closed = true } itHasNext @@ -150,8 +153,12 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends RowToColumnarExecBas numInputRows += rowCount try { val handle = jniWrapper - .nativeConvertRowToColumnar(r2cId, rowLength.toArray, arrowBuf.memoryAddress()) - ColumnarBatches.create(handle) + .nativeConvertRowToColumnar( + executionCtxHandle, + r2cHandle, + rowLength.toArray, + arrowBuf.memoryAddress()) + ColumnarBatches.create(executionCtxHandle, handle) } finally { arrowBuf.close() arrowBuf = null diff --git a/backends-velox/src/main/scala/io/glutenproject/utils/DatasourceUtil.scala b/backends-velox/src/main/scala/io/glutenproject/utils/DatasourceUtil.scala index 1e57fb372c6f..f37d4f7a564f 100644 --- a/backends-velox/src/main/scala/io/glutenproject/utils/DatasourceUtil.scala +++ b/backends-velox/src/main/scala/io/glutenproject/utils/DatasourceUtil.scala @@ -16,6 +16,7 @@ */ package io.glutenproject.utils +import io.glutenproject.exec.ExecutionCtxs import io.glutenproject.memory.arrowalloc.ArrowBufferAllocators import io.glutenproject.memory.nmm.NativeMemoryManagers import io.glutenproject.spark.sql.execution.datasources.velox.DatasourceJniWrapper @@ -37,20 +38,23 @@ object DatasourceUtil { } def readSchema(file: FileStatus): Option[StructType] = { + val executionCtxHandle = ExecutionCtxs.contextInstance().getHandle val allocator = ArrowBufferAllocators.contextInstance() val datasourceJniWrapper = new DatasourceJniWrapper() - val instanceId = datasourceJniWrapper.nativeInitDatasource( + val dsHandle = datasourceJniWrapper.nativeInitDatasource( file.getPath.toString, -1, - NativeMemoryManagers.contextInstance("VeloxWriter").getNativeInstanceId, - new util.HashMap[String, String]()) + executionCtxHandle, + NativeMemoryManagers.contextInstance("VeloxWriter").getNativeInstanceHandle, + new util.HashMap[String, String]() + ) val cSchema = ArrowSchema.allocateNew(allocator) - datasourceJniWrapper.inspectSchema(instanceId, cSchema.memoryAddress()) + datasourceJniWrapper.inspectSchema(executionCtxHandle, dsHandle, cSchema.memoryAddress()) try { Option(SparkSchemaUtil.fromArrowSchema(ArrowAbiUtil.importToSchema(allocator, cSchema))) } finally { cSchema.close() - datasourceJniWrapper.close(instanceId) + datasourceJniWrapper.close(executionCtxHandle, dsHandle) } } } diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarToRowExec.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarToRowExec.scala index 6bec402aefc9..07f6dfd93ce3 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarToRowExec.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarToRowExec.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution import io.glutenproject.columnarbatch.ColumnarBatches +import io.glutenproject.exec.ExecutionCtxs import io.glutenproject.execution.ColumnarToRowExecBase import io.glutenproject.extension.ValidationResult import io.glutenproject.memory.nmm.NativeMemoryManagers @@ -104,15 +105,17 @@ class ColumnarToRowRDD( if (batches.isEmpty) { Iterator.empty } else { + val executionCtxHandle = ExecutionCtxs.contextInstance().getHandle // TODO:: pass the jni jniWrapper and arrowSchema and serializeSchema method by broadcast val jniWrapper = new NativeColumnarToRowJniWrapper() var closed = false val c2rId = jniWrapper.nativeColumnarToRowInit( - NativeMemoryManagers.contextInstance("ColumnarToRow").getNativeInstanceId) + executionCtxHandle, + NativeMemoryManagers.contextInstance("ColumnarToRow").getNativeInstanceHandle) TaskResources.addRecycler(s"ColumnarToRow_$c2rId", 100) { if (!closed) { - jniWrapper.nativeClose(c2rId) + jniWrapper.nativeClose(executionCtxHandle, c2rId) closed = true } } @@ -122,7 +125,7 @@ class ColumnarToRowRDD( override def hasNext: Boolean = { val hasNext = batches.hasNext if (!hasNext && !closed) { - jniWrapper.nativeClose(c2rId) + jniWrapper.nativeClose(executionCtxHandle, c2rId) closed = true } hasNext @@ -156,7 +159,8 @@ class ColumnarToRowRDD( val rows = batch.numRows() val beforeConvert = System.currentTimeMillis() val batchHandle = ColumnarBatches.getNativeHandle(batch) - val info = jniWrapper.nativeColumnarToRowConvert(batchHandle, c2rId) + val info = + jniWrapper.nativeColumnarToRowConvert(executionCtxHandle, batchHandle, c2rId) convertTime += (System.currentTimeMillis() - beforeConvert) // batch.close() diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxWriteQueue.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxWriteQueue.scala index 7ec387641342..035e510c7bbf 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxWriteQueue.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/VeloxWriteQueue.scala @@ -29,7 +29,8 @@ import java.util.concurrent.atomic.AtomicReference import java.util.regex.Pattern class VeloxWriteQueue( - instanceId: Long, + executionCtxHandle: Long, + dsHandle: Long, schema: Schema, allocator: BufferAllocator, datasourceJniWrapper: DatasourceJniWrapper, @@ -41,7 +42,7 @@ class VeloxWriteQueue( private val writeThread = new Thread( () => { try { - datasourceJniWrapper.write(instanceId, scanner) + datasourceJniWrapper.write(executionCtxHandle, dsHandle, scanner) } catch { case e: Throwable => writeException.set(e) diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala index 5a4e03a50600..04542f064b06 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.velox import io.glutenproject.columnarbatch.ColumnarBatches import io.glutenproject.exception.GlutenException +import io.glutenproject.exec.ExecutionCtxs import io.glutenproject.execution.datasource.GlutenRowSplitter import io.glutenproject.memory.arrowalloc.ArrowBufferAllocators import io.glutenproject.memory.nmm.NativeMemoryManagers @@ -50,15 +51,17 @@ trait VeloxFormatWriterInjects extends GlutenFormatWriterInjectsBase { val arrowSchema = SparkArrowUtil.toArrowSchema(dataSchema, SQLConf.get.sessionLocalTimeZone) val cSchema = ArrowSchema.allocateNew(ArrowBufferAllocators.contextInstance()) - var instanceId = -1L + var dsHandle = -1L val datasourceJniWrapper = new DatasourceJniWrapper() val allocator = ArrowBufferAllocators.contextInstance() + val executionCtxHandle = ExecutionCtxs.contextInstance().getHandle try { ArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema) - instanceId = datasourceJniWrapper.nativeInitDatasource( + dsHandle = datasourceJniWrapper.nativeInitDatasource( originPath, cSchema.memoryAddress(), - NativeMemoryManagers.contextInstance("VeloxWriter").getNativeInstanceId, + executionCtxHandle, + NativeMemoryManagers.contextInstance("VeloxWriter").getNativeInstanceHandle, nativeConf) } catch { case e: IOException => @@ -68,7 +71,13 @@ trait VeloxFormatWriterInjects extends GlutenFormatWriterInjectsBase { } val writeQueue = - new VeloxWriteQueue(instanceId, arrowSchema, allocator, datasourceJniWrapper, originPath) + new VeloxWriteQueue( + executionCtxHandle, + dsHandle, + arrowSchema, + allocator, + datasourceJniWrapper, + originPath) new OutputWriter { override def write(row: InternalRow): Unit = { @@ -80,7 +89,7 @@ trait VeloxFormatWriterInjects extends GlutenFormatWriterInjectsBase { override def close(): Unit = { writeQueue.close() - datasourceJniWrapper.close(instanceId) + datasourceJniWrapper.close(executionCtxHandle, dsHandle) } // Do NOT add override keyword for compatibility on spark 3.1. diff --git a/cpp/core/benchmarks/CompressionBenchmark.cc b/cpp/core/benchmarks/CompressionBenchmark.cc index 8e69d610876f..83e24a68436e 100644 --- a/cpp/core/benchmarks/CompressionBenchmark.cc +++ b/cpp/core/benchmarks/CompressionBenchmark.cc @@ -23,13 +23,11 @@ #include #include #include -#include #include #include #include #include #include -#include #include #include diff --git a/cpp/core/compute/ExecutionCtx.cc b/cpp/core/compute/ExecutionCtx.cc index fd5e79af09f5..b6add7b6fea7 100644 --- a/cpp/core/compute/ExecutionCtx.cc +++ b/cpp/core/compute/ExecutionCtx.cc @@ -40,8 +40,12 @@ void setExecutionCtxFactory(ExecutionCtxFactory factory) { #endif } -std::shared_ptr createExecutionCtx() { +ExecutionCtx* createExecutionCtx() { return getExecutionCtxFactoryContext()->create(); } +void releaseExecutionCtx(ExecutionCtx* executionCtx) { + delete executionCtx; +} + } // namespace gluten diff --git a/cpp/core/compute/ExecutionCtx.h b/cpp/core/compute/ExecutionCtx.h index 24d811768e2a..9aee84e48cc8 100644 --- a/cpp/core/compute/ExecutionCtx.h +++ b/cpp/core/compute/ExecutionCtx.h @@ -29,6 +29,7 @@ #include "shuffle/ShuffleReader.h" #include "shuffle/ShuffleWriter.h" #include "substrait/plan.pb.h" +#include "utils/ConcurrentMap.h" namespace gluten { @@ -47,11 +48,18 @@ class ExecutionCtx : public std::enable_shared_from_this { ExecutionCtx(const std::unordered_map& confMap) : confMap_(confMap) {} virtual ~ExecutionCtx() = default; - virtual std::shared_ptr getResultIterator( + virtual ResourceHandle createResultIterator( MemoryManager* memoryManager, const std::string& spillDir, const std::vector>& inputs, const std::unordered_map& sessionConf) = 0; + virtual ResourceHandle addResultIterator(std::shared_ptr) = 0; + virtual std::shared_ptr getResultIterator(ResourceHandle) = 0; + virtual void releaseResultIterator(ResourceHandle) = 0; + + virtual ResourceHandle addBatch(std::shared_ptr) = 0; + virtual std::shared_ptr getBatch(ResourceHandle) = 0; + virtual void releaseBatch(ResourceHandle) = 0; void parsePlan(const uint8_t* data, int32_t size) { parsePlan(data, size, {-1, -1, -1}); @@ -86,33 +94,50 @@ class ExecutionCtx : public std::enable_shared_from_this { /// This function is used to create certain converter from the format used by /// the backend to Spark unsafe row. - virtual std::shared_ptr getColumnar2RowConverter(MemoryManager* memoryManager) = 0; + virtual ResourceHandle createColumnar2RowConverter(MemoryManager* memoryManager) = 0; + virtual std::shared_ptr getColumnar2RowConverter(ResourceHandle) = 0; + virtual void releaseColumnar2RowConverter(ResourceHandle) = 0; - virtual std::shared_ptr getRowToColumnarConverter( - MemoryManager* memoryManager, - struct ArrowSchema* cSchema) = 0; + virtual ResourceHandle createRow2ColumnarConverter(MemoryManager* memoryManager, struct ArrowSchema* cSchema) = 0; + virtual std::shared_ptr getRow2ColumnarConverter(ResourceHandle) = 0; + virtual void releaseRow2ColumnarConverter(ResourceHandle) = 0; - virtual std::shared_ptr createShuffleWriter( + virtual ResourceHandle createShuffleWriter( int numPartitions, std::shared_ptr partitionWriterCreator, const ShuffleWriterOptions& options, MemoryManager* memoryManager) = 0; + virtual std::shared_ptr getShuffleWriter(ResourceHandle) = 0; + virtual void releaseShuffleWriter(ResourceHandle) = 0; virtual std::shared_ptr getMetrics(ColumnarBatchIterator* rawIter, int64_t exportNanos) = 0; - virtual std::shared_ptr - getDatasource(const std::string& filePath, MemoryManager* memoryManager, std::shared_ptr schema) = 0; + virtual ResourceHandle createDatasource( + const std::string& filePath, + MemoryManager* memoryManager, + std::shared_ptr schema) = 0; + virtual std::shared_ptr getDatasource(ResourceHandle) = 0; + virtual void releaseDatasource(ResourceHandle) = 0; - virtual std::shared_ptr createShuffleReader( + virtual ResourceHandle createShuffleReader( std::shared_ptr schema, ReaderOptions options, std::shared_ptr pool, MemoryManager* memoryManager) = 0; + virtual std::shared_ptr getShuffleReader(ResourceHandle) = 0; + virtual void releaseShuffleReader(ResourceHandle) = 0; - virtual std::shared_ptr getColumnarBatchSerializer( + virtual ResourceHandle createColumnarBatchSerializer( MemoryManager* memoryManager, std::shared_ptr arrowPool, struct ArrowSchema* cSchema) = 0; + // TODO: separate serializer and deserializer then remove this method. + virtual std::unique_ptr createTempColumnarBatchSerializer( + MemoryManager* memoryManager, + std::shared_ptr arrowPool, + struct ArrowSchema* cSchema) = 0; + virtual std::shared_ptr getColumnarBatchSerializer(ResourceHandle) = 0; + virtual void releaseColumnarBatchSerializer(ResourceHandle) = 0; std::unordered_map getConfMap() { return confMap_; @@ -129,9 +154,8 @@ class ExecutionCtx : public std::enable_shared_from_this { std::unordered_map confMap_; }; -using ExecutionCtxFactoryWithConf = - std::shared_ptr (*)(const std::unordered_map&); -using ExecutionCtxFactory = std::shared_ptr (*)(); +using ExecutionCtxFactoryWithConf = ExecutionCtx* (*)(const std::unordered_map&); +using ExecutionCtxFactory = ExecutionCtx* (*)(); struct ExecutionCtxFactoryContext { std::mutex mutex; @@ -178,7 +202,7 @@ struct ExecutionCtxFactoryContext { backendFactory = factory; } - std::shared_ptr create() { + ExecutionCtx* create() { std::lock_guard lockGuard(mutex); if (type == kExecutionCtxFactoryInvalid) { assert(false); @@ -198,6 +222,8 @@ void setExecutionCtxFactory( void setExecutionCtxFactory(ExecutionCtxFactory factory); -std::shared_ptr createExecutionCtx(); +ExecutionCtx* createExecutionCtx(); + +void releaseExecutionCtx(ExecutionCtx*); } // namespace gluten diff --git a/cpp/core/compute/ResultIterator.h b/cpp/core/compute/ResultIterator.h index 3f41293e0e99..8b3e9eab6ebd 100644 --- a/cpp/core/compute/ResultIterator.h +++ b/cpp/core/compute/ResultIterator.h @@ -29,10 +29,8 @@ class ExecutionCtx; // other places. class ResultIterator { public: - explicit ResultIterator( - std::unique_ptr iter, - std::shared_ptr executionCtx = nullptr) - : iter_(std::move(iter)), next_(nullptr), executionCtx_(std::move(executionCtx)) {} + explicit ResultIterator(std::unique_ptr iter, ExecutionCtx* executionCtx = nullptr) + : iter_(std::move(iter)), next_(nullptr), executionCtx_(executionCtx) {} // copy constructor and copy assignment (deleted) ResultIterator(const ResultIterator& in) = delete; @@ -88,7 +86,7 @@ class ResultIterator { std::unique_ptr iter_; std::shared_ptr next_; - std::shared_ptr executionCtx_; + ExecutionCtx* executionCtx_; int64_t exportNanos_; }; diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 98b0a3e5c8ca..464e590a8159 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -22,7 +22,6 @@ #include "compute/ExecutionCtx.h" #include "compute/ProtobufUtils.h" #include "config/GlutenConfig.h" -#include "jni/ConcurrentMap.h" #include "jni/JniCommon.h" #include "jni/JniErrors.h" @@ -71,38 +70,13 @@ static jmethodID serializedColumnarBatchIteratorNext; static jclass nativeColumnarToRowInfoClass; static jmethodID nativeColumnarToRowInfoConstructor; -static jclass veloxColumnarbatchScannerClass; -static jmethodID veloxColumnarbatchScannerHasNext; -static jmethodID veloxColumnarbatchScannerNext; +static jclass veloxColumnarBatchScannerClass; +static jmethodID veloxColumnarBatchScannerHasNext; +static jmethodID veloxColumnarBatchScannerNext; static jclass shuffleReaderMetricsClass; static jmethodID shuffleReaderMetricsSetDecompressTime; -static ConcurrentMap> columnarToRowConverterHolder; - -static ConcurrentMap> rowToColumnarConverterHolder; - -static ConcurrentMap> resultIteratorHolder; - -static ConcurrentMap> shuffleWriterHolder; - -static ConcurrentMap> shuffleReaderHolder; - -static ConcurrentMap> columnarBatchHolder; - -static ConcurrentMap> glutenDatasourceHolder; - -static ConcurrentMap> columnarBatchSerializerHolder; - -std::shared_ptr getArrayIterator(JNIEnv*, jlong id) { - auto handler = resultIteratorHolder.lookup(id); - if (!handler) { - std::string errorMessage = "invalid handler id " + std::to_string(id); - throw gluten::GlutenException(errorMessage); - } - return handler; -} - class JavaInputStreamAdaptor final : public arrow::io::InputStream { public: JavaInputStreamAdaptor(JNIEnv* env, std::shared_ptr pool, jobject jniIn) : pool_(pool) { @@ -182,8 +156,12 @@ class JavaInputStreamAdaptor final : public arrow::io::InputStream { class JniColumnarBatchIterator : public ColumnarBatchIterator { public: - explicit JniColumnarBatchIterator(JNIEnv* env, jobject jColumnarBatchItr, std::shared_ptr writer) - : writer_(writer) { + explicit JniColumnarBatchIterator( + JNIEnv* env, + jobject jColumnarBatchItr, + ExecutionCtx* executionCtx, + std::shared_ptr writer) + : executionCtx_(executionCtx), writer_(writer) { // IMPORTANT: DO NOT USE LOCAL REF IN DIFFERENT THREAD if (env->GetJavaVM(&vm_) != JNI_OK) { std::string errorMessage = "Unable to get JavaVM instance"; @@ -216,7 +194,7 @@ class JniColumnarBatchIterator : public ColumnarBatchIterator { checkException(env); jlong handle = env->CallLongMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorNext); checkException(env); - auto batch = columnarBatchHolder.lookup(handle); + auto batch = executionCtx_->getBatch(handle); if (writer_ != nullptr) { // save snapshot of the batch to file std::shared_ptr schema = batch->exportArrowSchema(); @@ -231,12 +209,16 @@ class JniColumnarBatchIterator : public ColumnarBatchIterator { private: JavaVM* vm_; jobject jColumnarBatchItr_; + ExecutionCtx* executionCtx_; std::shared_ptr writer_; }; -std::unique_ptr -makeJniColumnarBatchIterator(JNIEnv* env, jobject jColumnarBatchItr, std::shared_ptr writer) { - return std::make_unique(env, jColumnarBatchItr, writer); +std::unique_ptr makeJniColumnarBatchIterator( + JNIEnv* env, + jobject jColumnarBatchItr, + ExecutionCtx* executionCtx, + std::shared_ptr writer) { + return std::make_unique(env, jColumnarBatchItr, executionCtx, writer); } #ifdef __cplusplus @@ -293,12 +275,12 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { reserveMemoryMethod = getMethodIdOrError(env, javaReservationListenerClass, "reserve", "(J)J"); unreserveMemoryMethod = getMethodIdOrError(env, javaReservationListenerClass, "unreserve", "(J)J"); - veloxColumnarbatchScannerClass = + veloxColumnarBatchScannerClass = createGlobalClassReference(env, "Lorg/apache/spark/sql/execution/datasources/VeloxColumnarBatchIterator;"); - veloxColumnarbatchScannerHasNext = getMethodId(env, veloxColumnarbatchScannerClass, "hasNext", "()Z"); + veloxColumnarBatchScannerHasNext = getMethodId(env, veloxColumnarBatchScannerClass, "hasNext", "()Z"); - veloxColumnarbatchScannerNext = getMethodId(env, veloxColumnarbatchScannerClass, "next", "()J"); + veloxColumnarBatchScannerNext = getMethodId(env, veloxColumnarBatchScannerClass, "next", "()J"); shuffleReaderMetricsClass = createGlobalClassReferenceOrError(env, "Lio/glutenproject/vectorized/ShuffleReaderMetrics;"); @@ -309,12 +291,6 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { } void JNI_OnUnload(JavaVM* vm, void* reserved) { - resultIteratorHolder.clear(); - columnarToRowConverterHolder.clear(); - shuffleWriterHolder.clear(); - shuffleReaderHolder.clear(); - glutenDatasourceHolder.clear(); - JNIEnv* env; vm->GetEnv(reinterpret_cast(&env), jniVersion); env->DeleteGlobalRef(serializableObjBuilderClass); @@ -324,15 +300,16 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) { env->DeleteGlobalRef(serializedColumnarBatchIteratorClass); env->DeleteGlobalRef(nativeColumnarToRowInfoClass); env->DeleteGlobalRef(byteArrayClass); - env->DeleteGlobalRef(veloxColumnarbatchScannerClass); + env->DeleteGlobalRef(veloxColumnarBatchScannerClass); env->DeleteGlobalRef(shuffleReaderMetricsClass); } JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWithIterator( // NOLINT JNIEnv* env, - jobject obj, - jlong memoryManagerId, + jobject, + jlong ctxHandle, + jlong memoryManagerHandle, jbyteArray planArr, jobjectArray iterArr, jint stageId, @@ -342,14 +319,16 @@ Java_io_glutenproject_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWithI jstring spillDir, jbyteArray confArr) { JNI_METHOD_START - arrow::Status msg; + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + MemoryManager* memoryManager = reinterpret_cast(memoryManagerHandle); + GLUTEN_CHECK(memoryManager != nullptr, "MemoryManager should not be null."); auto spillDirStr = jStringToCString(env, spillDir); auto planData = reinterpret_cast(env->GetByteArrayElements(planArr, nullptr)); auto planSize = env->GetArrayLength(planArr); - auto executionCtx = gluten::createExecutionCtx(); executionCtx->parsePlan(planData, planSize, {stageId, partitionId, taskId}); auto confs = getConfMap(env, confArr); @@ -370,22 +349,25 @@ Java_io_glutenproject_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWithI writer = std::make_shared(file); } jobject iter = env->GetObjectArrayElement(iterArr, idx); - auto arrayIter = makeJniColumnarBatchIterator(env, iter, writer); + auto arrayIter = makeJniColumnarBatchIterator(env, iter, executionCtx, writer); auto resultIter = std::make_shared(std::move(arrayIter)); inputIters.push_back(std::move(resultIter)); } - MemoryManager* memoryManager = reinterpret_cast(memoryManagerId); - GLUTEN_CHECK(memoryManager != nullptr, "MemoryManager should not be null."); - auto resIter = executionCtx->getResultIterator(memoryManager, spillDirStr, inputIters, confs); - return resultIteratorHolder.insert(std::move(resIter)); - JNI_METHOD_END(-1) + return executionCtx->createResultIterator(memoryManager, spillDirStr, inputIters, confs); + JNI_METHOD_END(kInvalidResourceHandle) } -JNIEXPORT jboolean JNICALL -Java_io_glutenproject_vectorized_ColumnarBatchOutIterator_nativeHasNext(JNIEnv* env, jobject obj, jlong id) { // NOLINT +JNIEXPORT jboolean JNICALL Java_io_glutenproject_vectorized_ColumnarBatchOutIterator_nativeHasNext( + JNIEnv* env, + jobject, + jlong ctxHandle, + jlong iterHandle) { // NOLINT JNI_METHOD_START - auto iter = getArrayIterator(env, id); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + + auto iter = executionCtx->getResultIterator(iterHandle); if (iter == nullptr) { std::string errorMessage = "faked to get batch iterator"; throw gluten::GlutenException(errorMessage); @@ -394,28 +376,38 @@ Java_io_glutenproject_vectorized_ColumnarBatchOutIterator_nativeHasNext(JNIEnv* JNI_METHOD_END(false) } -JNIEXPORT jlong JNICALL -Java_io_glutenproject_vectorized_ColumnarBatchOutIterator_nativeNext(JNIEnv* env, jobject obj, jlong id) { // NOLINT +JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ColumnarBatchOutIterator_nativeNext( + JNIEnv* env, + jobject, + jlong ctxHandle, + jlong iterHandle) { // NOLINT JNI_METHOD_START - auto iter = getArrayIterator(env, id); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + + auto iter = executionCtx->getResultIterator(iterHandle); if (!iter->hasNext()) { - return -1L; + return kInvalidResourceHandle; } std::shared_ptr batch = iter->next(); - jlong batchHandle = columnarBatchHolder.insert(batch); + auto batchHandle = executionCtx->addBatch(batch); iter->setExportNanos(batch->getExportNanos()); return batchHandle; - JNI_METHOD_END(-1L) + JNI_METHOD_END(kInvalidResourceHandle) } JNIEXPORT jobject JNICALL Java_io_glutenproject_vectorized_ColumnarBatchOutIterator_nativeFetchMetrics( // NOLINT JNIEnv* env, - jobject obj, - jlong id) { + jobject, + jlong ctxHandle, + jlong iterHandle) { JNI_METHOD_START - auto iter = getArrayIterator(env, id); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + + auto iter = executionCtx->getResultIterator(iterHandle); std::shared_ptr metrics = iter->getMetrics(); int numMetrics = 0; @@ -510,27 +502,29 @@ JNIEXPORT jobject JNICALL Java_io_glutenproject_vectorized_ColumnarBatchOutItera JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ColumnarBatchOutIterator_nativeSpill( // NOLINT JNIEnv* env, - jobject thisObj, - jlong id, + jobject, + jlong ctxHandle, + jlong iterHandle, jlong size) { JNI_METHOD_START - auto it = resultIteratorHolder.lookup(id); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + + auto it = executionCtx->getResultIterator(iterHandle); return it->spillFixedSize(size); - JNI_METHOD_END(-1L) + JNI_METHOD_END(kInvalidResourceHandle) } JNIEXPORT void JNICALL Java_io_glutenproject_vectorized_ColumnarBatchOutIterator_nativeClose( // NOLINT JNIEnv* env, - jobject thisObj, - jlong id) { + jobject, + jlong ctxHandle, + jlong iterHandle) { JNI_METHOD_START -#ifdef GLUTEN_PRINT_DEBUG - auto it = resultIteratorHolder.lookup(id); - if (it.use_count() > 2) { - std::cout << "ArrowArrayResultIterator Id " << id << " use count is " << it.use_count() << std::endl; - } -#endif - resultIteratorHolder.erase(id); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + + executionCtx->releaseResultIterator(iterHandle); JNI_METHOD_END() } @@ -538,27 +532,31 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_NativeColumnarToRowJniWrapper_nativeColumnarToRowInit( // NOLINT JNIEnv* env, jobject, - jlong memoryManagerId) { + jlong ctxHandle, + jlong memoryManagerHandle) { JNI_METHOD_START - // Convert the native batch to Spark unsafe row. - auto executionCtx = gluten::createExecutionCtx(); - MemoryManager* memoryManager = reinterpret_cast(memoryManagerId); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + MemoryManager* memoryManager = reinterpret_cast(memoryManagerHandle); GLUTEN_CHECK(memoryManager != nullptr, "MemoryManager should not be null."); - auto columnarToRowConverter = executionCtx->getColumnar2RowConverter(memoryManager); - int64_t instanceID = columnarToRowConverterHolder.insert(columnarToRowConverter); - return instanceID; - JNI_METHOD_END(-1) + + // Convert the native batch to Spark unsafe row. + return executionCtx->createColumnar2RowConverter(memoryManager); + JNI_METHOD_END(kInvalidResourceHandle) } JNIEXPORT jobject JNICALL Java_io_glutenproject_vectorized_NativeColumnarToRowJniWrapper_nativeColumnarToRowConvert( // NOLINT JNIEnv* env, jobject, + jlong ctxHandle, jlong batchHandle, - jlong instanceId) { + jlong c2rHandle) { JNI_METHOD_START - auto columnarToRowConverter = columnarToRowConverterHolder.lookup(instanceId); - std::shared_ptr cb = columnarBatchHolder.lookup(batchHandle); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + auto columnarToRowConverter = executionCtx->getColumnar2RowConverter(c2rHandle); + auto cb = executionCtx->getBatch(batchHandle); columnarToRowConverter->convert(cb); const auto& offsets = columnarToRowConverter->getOffsets(); @@ -583,9 +581,13 @@ Java_io_glutenproject_vectorized_NativeColumnarToRowJniWrapper_nativeColumnarToR JNIEXPORT void JNICALL Java_io_glutenproject_vectorized_NativeColumnarToRowJniWrapper_nativeClose( // NOLINT JNIEnv* env, jobject, - jlong instanceId) { + jlong ctxHandle, + jlong c2rHandle) { JNI_METHOD_START - columnarToRowConverterHolder.erase(instanceId); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + + executionCtx->releaseColumnar2RowConverter(c2rHandle); JNI_METHOD_END() } @@ -593,25 +595,30 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_NativeRowToColumnarJniW JNIEnv* env, jobject, jlong cSchema, - jlong memoryManagerId) { + jlong ctxHandle, + jlong memoryManagerHandle) { JNI_METHOD_START - auto executionCtx = gluten::createExecutionCtx(); - MemoryManager* memoryManager = reinterpret_cast(memoryManagerId); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + MemoryManager* memoryManager = reinterpret_cast(memoryManagerHandle); GLUTEN_CHECK(memoryManager != nullptr, "MemoryManager should not be null."); - auto converter = - executionCtx->getRowToColumnarConverter(memoryManager, reinterpret_cast(cSchema)); - return rowToColumnarConverterHolder.insert(converter); - JNI_METHOD_END(-1) + + return executionCtx->createRow2ColumnarConverter(memoryManager, reinterpret_cast(cSchema)); + JNI_METHOD_END(kInvalidResourceHandle) } JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_NativeRowToColumnarJniWrapper_nativeConvertRowToColumnar( // NOLINT JNIEnv* env, jobject, - jlong r2cId, + jlong ctxHandle, + jlong r2cHandle, jlongArray rowLength, jlong memoryAddress) { JNI_METHOD_START + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + if (rowLength == nullptr) { throw gluten::GlutenException("Native convert row to columnar: buf_addrs can't be null"); } @@ -619,82 +626,119 @@ Java_io_glutenproject_vectorized_NativeRowToColumnarJniWrapper_nativeConvertRowT jlong* inRowLength = env->GetLongArrayElements(rowLength, nullptr); uint8_t* address = reinterpret_cast(memoryAddress); - auto converter = rowToColumnarConverterHolder.lookup(r2cId); + auto converter = executionCtx->getRow2ColumnarConverter(r2cHandle); auto cb = converter->convert(numRows, reinterpret_cast(inRowLength), address); env->ReleaseLongArrayElements(rowLength, inRowLength, JNI_ABORT); - return columnarBatchHolder.insert(cb); - JNI_METHOD_END(-1) + return executionCtx->addBatch(cb); + JNI_METHOD_END(kInvalidResourceHandle) } -JNIEXPORT void JNICALL -Java_io_glutenproject_vectorized_NativeRowToColumnarJniWrapper_close(JNIEnv* env, jobject, jlong r2cId) { // NOLINT +JNIEXPORT void JNICALL Java_io_glutenproject_vectorized_NativeRowToColumnarJniWrapper_close( + JNIEnv* env, + jobject, + jlong ctxHandle, + jlong r2cHandle) { // NOLINT JNI_METHOD_START - rowToColumnarConverterHolder.erase(r2cId); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + + executionCtx->releaseRow2ColumnarConverter(r2cHandle); JNI_METHOD_END() } -JNIEXPORT jstring JNICALL -Java_io_glutenproject_columnarbatch_ColumnarBatchJniWrapper_getType(JNIEnv* env, jobject, jlong handle) { // NOLINT +JNIEXPORT jstring JNICALL Java_io_glutenproject_columnarbatch_ColumnarBatchJniWrapper_getType( + JNIEnv* env, + jobject, + jlong ctxHandle, + jlong batchHandle) { // NOLINT JNI_METHOD_START - std::shared_ptr batch = columnarBatchHolder.lookup(handle); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + + auto batch = executionCtx->getBatch(batchHandle); return env->NewStringUTF(batch->getType().c_str()); JNI_METHOD_END(nullptr) } -JNIEXPORT jlong JNICALL -Java_io_glutenproject_columnarbatch_ColumnarBatchJniWrapper_numBytes(JNIEnv* env, jobject, jlong handle) { // NOLINT +JNIEXPORT jlong JNICALL Java_io_glutenproject_columnarbatch_ColumnarBatchJniWrapper_numBytes( + JNIEnv* env, + jobject, + jlong ctxHandle, + jlong batchHandle) { // NOLINT JNI_METHOD_START - std::shared_ptr batch = columnarBatchHolder.lookup(handle); + + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + + auto batch = executionCtx->getBatch(batchHandle); return batch->numBytes(); - JNI_METHOD_END(-1) + JNI_METHOD_END(kInvalidResourceHandle) } JNIEXPORT jlong JNICALL Java_io_glutenproject_columnarbatch_ColumnarBatchJniWrapper_numColumns( // NOLINT JNIEnv* env, jobject, - jlong handle) { + jlong ctxHandle, + jlong batchHandle) { JNI_METHOD_START - std::shared_ptr batch = columnarBatchHolder.lookup(handle); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + + auto batch = executionCtx->getBatch(batchHandle); return batch->numColumns(); - JNI_METHOD_END(-1L) + JNI_METHOD_END(kInvalidResourceHandle) } -JNIEXPORT jlong JNICALL -Java_io_glutenproject_columnarbatch_ColumnarBatchJniWrapper_numRows(JNIEnv* env, jobject, jlong handle) { // NOLINT +JNIEXPORT jlong JNICALL Java_io_glutenproject_columnarbatch_ColumnarBatchJniWrapper_numRows( + JNIEnv* env, + jobject, + jlong ctxHandle, + jlong batchHandle) { // NOLINT JNI_METHOD_START - std::shared_ptr batch = columnarBatchHolder.lookup(handle); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + + auto batch = executionCtx->getBatch(batchHandle); return batch->numRows(); - JNI_METHOD_END(-1L) + JNI_METHOD_END(kInvalidResourceHandle) } JNIEXPORT jlong JNICALL Java_io_glutenproject_columnarbatch_ColumnarBatchJniWrapper_compose( // NOLINT JNIEnv* env, jobject, + jlong ctxHandle, jlongArray handles) { JNI_METHOD_START + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + int handleCount = env->GetArrayLength(handles); jlong* handleArray = env->GetLongArrayElements(handles, nullptr); std::vector> batches; for (int i = 0; i < handleCount; ++i) { jlong handle = handleArray[i]; - std::shared_ptr batch = columnarBatchHolder.lookup(handle); + auto batch = executionCtx->getBatch(handle); batches.push_back(batch); } auto newBatch = CompositeColumnarBatch::create(std::move(batches)); env->ReleaseLongArrayElements(handles, handleArray, JNI_ABORT); - return columnarBatchHolder.insert(newBatch); - JNI_METHOD_END(-1L) + return executionCtx->addBatch(newBatch); + JNI_METHOD_END(kInvalidResourceHandle) } JNIEXPORT void JNICALL Java_io_glutenproject_columnarbatch_ColumnarBatchJniWrapper_exportToArrow( // NOLINT JNIEnv* env, jobject, - jlong handle, + jlong ctxHandle, + jlong batchHandle, jlong cSchema, jlong cArray) { JNI_METHOD_START - std::shared_ptr batch = columnarBatchHolder.lookup(handle); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + + auto batch = executionCtx->getBatch(batchHandle); std::shared_ptr exportedSchema = batch->exportArrowSchema(); std::shared_ptr exportedArray = batch->exportArrowArray(); ArrowSchemaMove(exportedSchema.get(), reinterpret_cast(cSchema)); @@ -705,9 +749,13 @@ JNIEXPORT void JNICALL Java_io_glutenproject_columnarbatch_ColumnarBatchJniWrapp JNIEXPORT jlong JNICALL Java_io_glutenproject_columnarbatch_ColumnarBatchJniWrapper_createWithArrowArray( // NOLINT JNIEnv* env, jobject, + jlong ctxHandle, jlong cSchema, jlong cArray) { JNI_METHOD_START + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + std::unique_ptr targetSchema = std::make_unique(); std::unique_ptr targetArray = std::make_unique(); auto* arrowSchema = reinterpret_cast(cSchema); @@ -716,15 +764,20 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_columnarbatch_ColumnarBatchJniWrap ArrowSchemaMove(arrowSchema, targetSchema.get()); std::shared_ptr batch = std::make_shared(std::move(targetSchema), std::move(targetArray)); - return columnarBatchHolder.insert(batch); - JNI_METHOD_END(-1L) + return executionCtx->addBatch(batch); + JNI_METHOD_END(kInvalidResourceHandle) } -JNIEXPORT void JNICALL -Java_io_glutenproject_columnarbatch_ColumnarBatchJniWrapper_close(JNIEnv* env, jobject, jlong handle) { // NOLINT +JNIEXPORT void JNICALL Java_io_glutenproject_columnarbatch_ColumnarBatchJniWrapper_close( + JNIEnv* env, + jobject, + jlong ctxHandle, + jlong batchHandle) { // NOLINT JNI_METHOD_START - std::shared_ptr batch = columnarBatchHolder.lookup(handle); - columnarBatchHolder.erase(handle); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + + executionCtx->releaseBatch(batchHandle); JNI_METHOD_END() } @@ -744,7 +797,8 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper jint numSubDirs, jstring localDirsJstr, jboolean preferEvict, - jlong memoryManagerId, + jlong ctxHandle, + jlong memoryManagerHandle, jboolean writeEOS, jlong firstBatchHandle, jlong taskAttemptId, @@ -752,9 +806,13 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper jobject partitionPusher, jstring partitionWriterTypeJstr) { JNI_METHOD_START + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + MemoryManager* memoryManager = reinterpret_cast(memoryManagerHandle); + GLUTEN_CHECK(memoryManager != nullptr, "MemoryManager should not be null."); if (partitioningNameJstr == nullptr) { throw gluten::GlutenException(std::string("Short partitioning name can't be null")); - return 0; + return kInvalidResourceHandle; } auto partitioningName = jStringToCString(env, partitioningNameJstr); @@ -773,8 +831,6 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper shuffleWriterOptions.compression_mode = getCompressionMode(env, compressionModeJstr); } - MemoryManager* memoryManager = reinterpret_cast(memoryManagerId); - GLUTEN_CHECK(memoryManager != nullptr, "MemoryManager should not be null."); shuffleWriterOptions.memory_pool = memoryManager->getArrowMemoryPool(); shuffleWriterOptions.ipc_memory_pool = shuffleWriterOptions.memory_pool; @@ -844,59 +900,71 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper throw gluten::GlutenException("Unrecognizable partition writer type: " + partitionWriterType); } - auto executionCtx = gluten::createExecutionCtx(); - auto shuffleWriter = executionCtx->createShuffleWriter( + return executionCtx->createShuffleWriter( numPartitions, std::move(partitionWriterCreator), std::move(shuffleWriterOptions), memoryManager); - return shuffleWriterHolder.insert(shuffleWriter); - JNI_METHOD_END(-1L) + JNI_METHOD_END(kInvalidResourceHandle) } JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper_nativeEvict( // NOLINT JNIEnv* env, jobject, - jlong shuffleWriterId, + jlong ctxHandle, + jlong shuffleWriterHandle, jlong size, jboolean callBySelf) { JNI_METHOD_START - auto shuffleWriter = shuffleWriterHolder.lookup(shuffleWriterId); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + + auto shuffleWriter = executionCtx->getShuffleWriter(shuffleWriterHandle); if (!shuffleWriter) { - std::string errorMessage = "Invalid shuffle writer id " + std::to_string(shuffleWriterId); + std::string errorMessage = "Invalid shuffle writer handle " + std::to_string(shuffleWriterHandle); throw gluten::GlutenException(errorMessage); } int64_t evictedSize; gluten::arrowAssertOkOrThrow( shuffleWriter->evictFixedSize(size, &evictedSize), "(shuffle) nativeEvict: evict failed"); return (jlong)evictedSize; - JNI_METHOD_END(-1L) + JNI_METHOD_END(kInvalidResourceHandle) } JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper_split( // NOLINT JNIEnv* env, jobject, - jlong shuffleWriterId, + jlong ctxHandle, + jlong shuffleWriterHandle, jint numRows, - jlong handle) { + jlong batchHandle) { JNI_METHOD_START - auto shuffleWriter = shuffleWriterHolder.lookup(shuffleWriterId); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + + auto shuffleWriter = executionCtx->getShuffleWriter(shuffleWriterHandle); if (!shuffleWriter) { - std::string errorMessage = "Invalid shuffle writer id " + std::to_string(shuffleWriterId); + std::string errorMessage = "Invalid shuffle writer handle " + std::to_string(shuffleWriterHandle); throw gluten::GlutenException(errorMessage); } // The column batch maybe VeloxColumnBatch or ArrowCStructColumnarBatch(FallbackRangeShuffleWriter) - std::shared_ptr batch = columnarBatchHolder.lookup(handle); + auto batch = executionCtx->getBatch(batchHandle); auto numBytes = batch->numBytes(); gluten::arrowAssertOkOrThrow(shuffleWriter->split(batch), "Native split: shuffle writer split failed"); return numBytes; - JNI_METHOD_END(-1L) + JNI_METHOD_END(kInvalidResourceHandle) } -JNIEXPORT jobject JNICALL -Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper_stop(JNIEnv* env, jobject, jlong shuffleWriterId) { // NOLINT +JNIEXPORT jobject JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper_stop( + JNIEnv* env, + jobject, + jlong ctxHandle, + jlong shuffleWriterHandle) { // NOLINT JNI_METHOD_START - auto shuffleWriter = shuffleWriterHolder.lookup(shuffleWriterId); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + + auto shuffleWriter = executionCtx->getShuffleWriter(shuffleWriterHandle); if (!shuffleWriter) { - std::string errorMessage = "Invalid shuffle writer id " + std::to_string(shuffleWriterId); + std::string errorMessage = "Invalid shuffle writer handle " + std::to_string(shuffleWriterHandle); throw gluten::GlutenException(errorMessage); } @@ -929,10 +997,16 @@ Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper_stop(JNIEnv* env, jobje JNI_METHOD_END(nullptr) } -JNIEXPORT void JNICALL -Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper_close(JNIEnv* env, jobject, jlong shuffleWriterId) { // NOLINT +JNIEXPORT void JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper_close( + JNIEnv* env, + jobject, + jlong ctxHandle, + jlong shuffleWriterHandle) { // NOLINT JNI_METHOD_START - shuffleWriterHolder.erase(shuffleWriterId); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + + executionCtx->releaseShuffleWriter(shuffleWriterHandle); JNI_METHOD_END() } @@ -953,13 +1027,17 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleReaderJniWrapper JNIEnv* env, jobject, jlong cSchema, - jlong memoryManagerId, + jlong ctxHandle, + jlong memoryManagerHandle, jstring compressionType, jstring compressionBackend, jstring compressionMode) { JNI_METHOD_START - MemoryManager* memoryManager = reinterpret_cast(memoryManagerId); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + MemoryManager* memoryManager = reinterpret_cast(memoryManagerHandle); GLUTEN_CHECK(memoryManager != nullptr, "MemoryManager should not be null."); + auto pool = memoryManager->getArrowMemoryPool(); ReaderOptions options = ReaderOptions::defaults(); options.ipc_read_options.memory_pool = pool.get(); @@ -972,32 +1050,38 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleReaderJniWrapper std::shared_ptr schema = gluten::arrowGetOrThrow(arrow::ImportSchema(reinterpret_cast(cSchema))); - auto executionCtx = gluten::createExecutionCtx(); - auto reader = executionCtx->createShuffleReader(schema, options, pool, memoryManager); - return shuffleReaderHolder.insert(reader); - JNI_METHOD_END(-1L) + return executionCtx->createShuffleReader(schema, options, pool, memoryManager); + JNI_METHOD_END(kInvalidResourceHandle) } JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleReaderJniWrapper_readStream( // NOLINT JNIEnv* env, jobject, - jlong handle, + jlong ctxHandle, + jlong shuffleReaderHandle, jobject jniIn) { JNI_METHOD_START - auto reader = shuffleReaderHolder.lookup(handle); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + + auto reader = executionCtx->getShuffleReader(shuffleReaderHandle); std::shared_ptr in = std::make_shared(env, reader->getPool(), jniIn); auto outItr = reader->readStream(in); - return resultIteratorHolder.insert(outItr); - JNI_METHOD_END(-1L) + return executionCtx->addResultIterator(outItr); + JNI_METHOD_END(kInvalidResourceHandle) } JNIEXPORT void JNICALL Java_io_glutenproject_vectorized_ShuffleReaderJniWrapper_populateMetrics( // NOLINT JNIEnv* env, jobject, - jlong handle, + jlong ctxHandle, + jlong shuffleReaderHandle, jobject metrics) { JNI_METHOD_START - auto reader = shuffleReaderHolder.lookup(handle); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + + auto reader = executionCtx->getShuffleReader(shuffleReaderHandle); env->CallVoidMethod(metrics, shuffleReaderMetricsSetDecompressTime, reader->getDecompressTime()); checkException(env); JNI_METHOD_END() @@ -1006,83 +1090,103 @@ JNIEXPORT void JNICALL Java_io_glutenproject_vectorized_ShuffleReaderJniWrapper_ JNIEXPORT void JNICALL Java_io_glutenproject_vectorized_ShuffleReaderJniWrapper_close( // NOLINT JNIEnv* env, jobject, - jlong handle) { + jlong ctxHandle, + jlong shuffleReaderHandle) { JNI_METHOD_START - auto reader = shuffleReaderHolder.lookup(handle); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + + auto reader = executionCtx->getShuffleReader(shuffleReaderHandle); GLUTEN_THROW_NOT_OK(reader->close()); - shuffleReaderHolder.erase(handle); + executionCtx->releaseShuffleReader(shuffleReaderHandle); JNI_METHOD_END() } JNIEXPORT jlong JNICALL Java_io_glutenproject_spark_sql_execution_datasources_velox_DatasourceJniWrapper_nativeInitDatasource( // NOLINT JNIEnv* env, - jobject obj, + jobject, jstring filePath, jlong cSchema, - jlong memoryManagerId, + jlong ctxHandle, + jlong memoryManagerHandle, jbyteArray options) { - auto executionCtx = gluten::createExecutionCtx(); - MemoryManager* memoryManager = reinterpret_cast(memoryManagerId); + JNI_METHOD_START + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + MemoryManager* memoryManager = reinterpret_cast(memoryManagerHandle); GLUTEN_CHECK(memoryManager != nullptr, "MemoryManager should not be null."); - std::shared_ptr datasource = nullptr; + ResourceHandle handle = kInvalidResourceHandle; if (cSchema == -1) { // Only inspect the schema and not write - datasource = executionCtx->getDatasource(jStringToCString(env, filePath), memoryManager, nullptr); + handle = executionCtx->createDatasource(jStringToCString(env, filePath), memoryManager, nullptr); } else { auto sparkOptions = gluten::getConfMap(env, options); auto sparkConf = executionCtx->getConfMap(); sparkOptions.insert(sparkConf.begin(), sparkConf.end()); auto schema = gluten::arrowGetOrThrow(arrow::ImportSchema(reinterpret_cast(cSchema))); - datasource = executionCtx->getDatasource(jStringToCString(env, filePath), memoryManager, schema); + handle = executionCtx->createDatasource(jStringToCString(env, filePath), memoryManager, schema); + auto datasource = executionCtx->getDatasource(handle); datasource->init(sparkOptions); } - int64_t instanceID = glutenDatasourceHolder.insert(datasource); - return instanceID; + return handle; + JNI_METHOD_END(kInvalidResourceHandle) } JNIEXPORT void JNICALL Java_io_glutenproject_spark_sql_execution_datasources_velox_DatasourceJniWrapper_inspectSchema( // NOLINT JNIEnv* env, - jobject obj, - jlong instanceId, + jobject, + jlong ctxHandle, + jlong dsHandle, jlong cSchema) { JNI_METHOD_START - auto datasource = glutenDatasourceHolder.lookup(instanceId); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + + auto datasource = executionCtx->getDatasource(dsHandle); datasource->inspectSchema(reinterpret_cast(cSchema)); JNI_METHOD_END() } JNIEXPORT void JNICALL Java_io_glutenproject_spark_sql_execution_datasources_velox_DatasourceJniWrapper_close( // NOLINT JNIEnv* env, - jobject obj, - jlong instanceId) { + jobject, + jlong ctxHandle, + jlong dsHandle) { JNI_METHOD_START - auto datasource = glutenDatasourceHolder.lookup(instanceId); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + + auto datasource = executionCtx->getDatasource(dsHandle); datasource->close(); - glutenDatasourceHolder.erase(instanceId); + executionCtx->releaseDatasource(dsHandle); JNI_METHOD_END() } JNIEXPORT void JNICALL Java_io_glutenproject_spark_sql_execution_datasources_velox_DatasourceJniWrapper_write( // NOLINT JNIEnv* env, - jobject obj, - jlong instanceId, + jobject, + jlong ctxHandle, + jlong dsHandle, jobject iter) { JNI_METHOD_START - auto datasource = glutenDatasourceHolder.lookup(instanceId); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + + auto datasource = executionCtx->getDatasource(dsHandle); - while (env->CallBooleanMethod(iter, veloxColumnarbatchScannerHasNext)) { + while (env->CallBooleanMethod(iter, veloxColumnarBatchScannerHasNext)) { checkException(env); - jlong handler = env->CallLongMethod(iter, veloxColumnarbatchScannerNext); + jlong batchHandle = env->CallLongMethod(iter, veloxColumnarBatchScannerNext); checkException(env); - auto batch = columnarBatchHolder.lookup(handler); + auto batch = executionCtx->getBatch(batchHandle); datasource->write(batch); // fixme this skips the general Java side batch-closing routine - columnarBatchHolder.erase(handler); + executionCtx->releaseBatch(batchHandle); } checkException(env); JNI_METHOD_END() @@ -1103,7 +1207,7 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_memory_alloc_NativeMemoryAllocator throw GlutenException("Unexpected allocator type name: " + typeName); } return reinterpret_cast(allocator); - JNI_METHOD_END(-1L) + JNI_METHOD_END(kInvalidResourceHandle) } JNIEXPORT void JNICALL Java_io_glutenproject_memory_alloc_NativeMemoryAllocator_releaseAllocator( // NOLINT @@ -1125,7 +1229,7 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_memory_alloc_NativeMemoryAllocator throw gluten::GlutenException("Memory allocator instance not found. It may not exist nor has been closed"); } return (*alloc)->getBytes(); - JNI_METHOD_END(-1L) + JNI_METHOD_END(kInvalidResourceHandle) } JNIEXPORT jlong JNICALL Java_io_glutenproject_memory_nmm_NativeMemoryManager_create( // NOLINT @@ -1149,21 +1253,24 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_memory_nmm_NativeMemoryManager_cre vm, jlistener, reserveMemoryMethod, unreserveMemoryMethod, reservationBlockSize); if (gluten::backtrace_allocation) { - listener = std::move(std::make_unique(std::move(listener))); + listener = std::make_unique(std::move(listener)); } auto name = jStringToCString(env, jname); + // TODO: move memory manager into ExecutionCtx then we can use more general ExecutionCtx. auto executionCtx = gluten::createExecutionCtx(); auto manager = executionCtx->createMemoryManager(name, *allocator, std::move(listener)); + gluten::releaseExecutionCtx(executionCtx); return reinterpret_cast(manager); - JNI_METHOD_END(-1L) + JNI_METHOD_END(kInvalidResourceHandle) } JNIEXPORT jbyteArray JNICALL Java_io_glutenproject_memory_nmm_NativeMemoryManager_collectMemoryUsage( // NOLINT JNIEnv* env, jclass, - jlong memoryManagerId) { - MemoryManager* memoryManager = reinterpret_cast(memoryManagerId); + jlong memoryManagerHandle) { + JNI_METHOD_START + MemoryManager* memoryManager = reinterpret_cast(memoryManagerHandle); const MemoryUsageStats& stats = memoryManager->collectMemoryUsageStats(); auto size = stats.ByteSizeLong(); jbyteArray out = env->NewByteArray(size); @@ -1173,50 +1280,56 @@ JNIEXPORT jbyteArray JNICALL Java_io_glutenproject_memory_nmm_NativeMemoryManage "Serialization failed when collecting memory usage stats"); env->SetByteArrayRegion(out, 0, size, reinterpret_cast(buffer)); return out; + JNI_METHOD_END(nullptr) } JNIEXPORT jlong JNICALL Java_io_glutenproject_memory_nmm_NativeMemoryManager_shrink( // NOLINT JNIEnv* env, jclass, - jlong memoryManagerId, + jlong memoryManagerHandle, jlong size) { - MemoryManager* memoryManager = reinterpret_cast(memoryManagerId); + JNI_METHOD_START + MemoryManager* memoryManager = reinterpret_cast(memoryManagerHandle); return memoryManager->shrink(static_cast(size)); + JNI_METHOD_END(kInvalidResourceHandle) } JNIEXPORT void JNICALL Java_io_glutenproject_memory_nmm_NativeMemoryManager_release( // NOLINT JNIEnv* env, jclass, - jlong memoryManagerId) { + jlong memoryManagerHandle) { JNI_METHOD_START - delete reinterpret_cast(memoryManagerId); + delete reinterpret_cast(memoryManagerHandle); JNI_METHOD_END() } JNIEXPORT jobject JNICALL Java_io_glutenproject_vectorized_ColumnarBatchSerializerJniWrapper_serialize( // NOLINT JNIEnv* env, jobject, + jlong ctxHandle, jlongArray handles, - jlong memoryManagerId) { + jlong memoryManagerHandle) { JNI_METHOD_START + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + MemoryManager* memoryManager = reinterpret_cast(memoryManagerHandle); + GLUTEN_CHECK(memoryManager != nullptr, "MemoryManager should not be null."); + int32_t numBatches = env->GetArrayLength(handles); - jlong* batchhandles = env->GetLongArrayElements(handles, nullptr); + jlong* batchHandles = env->GetLongArrayElements(handles, nullptr); - MemoryManager* memoryManager = reinterpret_cast(memoryManagerId); - GLUTEN_CHECK(memoryManager != nullptr, "MemoryManager should not be null."); std::vector> batches; int64_t numRows = 0L; for (int32_t i = 0; i < numBatches; i++) { - auto batch = columnarBatchHolder.lookup(batchhandles[i]); - GLUTEN_DCHECK(batch != nullptr, "Cannot find the ColumnarBatch with handle " + std::to_string(batchhandles[i])); + auto batch = executionCtx->getBatch(batchHandles[i]); + GLUTEN_DCHECK(batch != nullptr, "Cannot find the ColumnarBatch with handle " + std::to_string(batchHandles[i])); numRows += batch->numRows(); batches.emplace_back(batch); } - env->ReleaseLongArrayElements(handles, batchhandles, JNI_ABORT); + env->ReleaseLongArrayElements(handles, batchHandles, JNI_ABORT); - auto executionCtx = gluten::createExecutionCtx(); auto arrowPool = memoryManager->getArrowMemoryPool(); - auto serializer = executionCtx->getColumnarBatchSerializer(memoryManager, arrowPool, nullptr); + auto serializer = executionCtx->createTempColumnarBatchSerializer(memoryManager, arrowPool, nullptr); auto buffer = serializer->serializeColumnarBatches(batches); auto bufferArr = env->NewByteArray(buffer->size()); env->SetByteArrayRegion(bufferArr, 0, buffer->size(), reinterpret_cast(buffer->data())); @@ -1232,38 +1345,50 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ColumnarBatchSerializer JNIEnv* env, jobject, jlong cSchema, - jlong memoryManagerId) { + jlong ctxHandle, + jlong memoryManagerHandle) { JNI_METHOD_START - MemoryManager* memoryManager = reinterpret_cast(memoryManagerId); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + MemoryManager* memoryManager = reinterpret_cast(memoryManagerHandle); GLUTEN_DCHECK(memoryManager != nullptr, "Memory manager does not exist or has been closed"); + auto arrowPool = memoryManager->getArrowMemoryPool(); - auto executionCtx = gluten::createExecutionCtx(); - auto serializer = executionCtx->getColumnarBatchSerializer( + return executionCtx->createColumnarBatchSerializer( memoryManager, arrowPool, reinterpret_cast(cSchema)); - return columnarBatchSerializerHolder.insert(serializer); - JNI_METHOD_END(-1L) + JNI_METHOD_END(kInvalidResourceHandle) } JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ColumnarBatchSerializerJniWrapper_deserialize( // NOLINT JNIEnv* env, jobject, - jlong handle, + jlong ctxHandle, + jlong serializerHandle, jbyteArray data) { JNI_METHOD_START - std::shared_ptr serializer = columnarBatchSerializerHolder.lookup(handle); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + + auto serializer = executionCtx->getColumnarBatchSerializer(serializerHandle); GLUTEN_DCHECK(serializer != nullptr, "ColumnarBatchSerializer cannot be null"); int32_t size = env->GetArrayLength(data); jbyte* serialized = env->GetByteArrayElements(data, nullptr); auto batch = serializer->deserialize(reinterpret_cast(serialized), size); env->ReleaseByteArrayElements(data, serialized, JNI_ABORT); - return columnarBatchHolder.insert(batch); - JNI_METHOD_END(-1L) + return executionCtx->addBatch(batch); + JNI_METHOD_END(kInvalidResourceHandle) } -JNIEXPORT void JNICALL -Java_io_glutenproject_vectorized_ColumnarBatchSerializerJniWrapper_close(JNIEnv* env, jobject, jlong handle) { // NOLINT +JNIEXPORT void JNICALL Java_io_glutenproject_vectorized_ColumnarBatchSerializerJniWrapper_close( + JNIEnv* env, + jobject, + jlong ctxHandle, + jlong serializerHandle) { // NOLINT JNI_METHOD_START - columnarBatchSerializerHolder.erase(handle); + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + + executionCtx->releaseColumnarBatchSerializer(serializerHandle); JNI_METHOD_END() } diff --git a/cpp/core/tests/ExecutionCtxTest.cc b/cpp/core/tests/ExecutionCtxTest.cc index d449e9fd8d7d..fdca24dba649 100644 --- a/cpp/core/tests/ExecutionCtxTest.cc +++ b/cpp/core/tests/ExecutionCtxTest.cc @@ -23,13 +23,14 @@ namespace gluten { class DummyExecutionCtx final : public ExecutionCtx { public: - std::shared_ptr getResultIterator( + ResourceHandle createResultIterator( MemoryManager* memoryManager, const std::string& spillDir, const std::vector>& inputs, const std::unordered_map& sessionConf) override { auto resIter = std::make_unique(); - return std::make_shared(std::move(resIter)); + auto iter = std::make_shared(std::move(resIter)); + return resultIteratorHolder_.insert(iter); } MemoryManager* createMemoryManager( const std::string& name, @@ -37,45 +38,89 @@ class DummyExecutionCtx final : public ExecutionCtx { std::unique_ptr uniquePtr) override { return nullptr; } - std::shared_ptr getColumnar2RowConverter(MemoryManager* memoryManager) override { + ResourceHandle addResultIterator(std::shared_ptr ptr) override { + return kInvalidResourceHandle; + } + std::shared_ptr getResultIterator(ResourceHandle handle) override { + return resultIteratorHolder_.lookup(handle); + } + void releaseResultIterator(ResourceHandle handle) override {} + ResourceHandle addBatch(std::shared_ptr ptr) override { + return kInvalidResourceHandle; + } + std::shared_ptr getBatch(ResourceHandle handle) override { + return std::shared_ptr(); + } + void releaseBatch(ResourceHandle handle) override {} + ResourceHandle createColumnar2RowConverter(MemoryManager* memoryManager) override { + return kInvalidResourceHandle; + } + std::shared_ptr getColumnar2RowConverter(ResourceHandle handle) override { return std::shared_ptr(); } - std::shared_ptr getRowToColumnarConverter( - MemoryManager* memoryManager, - struct ArrowSchema* cSchema) override { + void releaseColumnar2RowConverter(ResourceHandle handle) override {} + ResourceHandle createRow2ColumnarConverter(MemoryManager* memoryManager, struct ArrowSchema* cSchema) override { + return kInvalidResourceHandle; + } + std::shared_ptr getRow2ColumnarConverter(ResourceHandle handle) override { return std::shared_ptr(); } - std::shared_ptr createShuffleWriter( + void releaseRow2ColumnarConverter(ResourceHandle handle) override {} + ResourceHandle createShuffleWriter( int numPartitions, std::shared_ptr partitionWriterCreator, const ShuffleWriterOptions& options, MemoryManager* memoryManager) override { + return kInvalidResourceHandle; + } + std::shared_ptr getShuffleWriter(ResourceHandle handle) override { return std::shared_ptr(); } + void releaseShuffleWriter(ResourceHandle handle) override {} std::shared_ptr getMetrics(ColumnarBatchIterator* rawIter, int64_t exportNanos) override { return std::shared_ptr(); } - std::shared_ptr getDatasource( + ResourceHandle createDatasource( const std::string& filePath, MemoryManager* memoryManager, std::shared_ptr schema) override { + return kInvalidResourceHandle; + } + std::shared_ptr getDatasource(ResourceHandle handle) override { return std::shared_ptr(); } - std::shared_ptr createShuffleReader( + void releaseDatasource(ResourceHandle handle) override {} + ResourceHandle createShuffleReader( std::shared_ptr schema, ReaderOptions options, std::shared_ptr pool, MemoryManager* memoryManager) override { + return kInvalidResourceHandle; + } + std::shared_ptr getShuffleReader(ResourceHandle handle) override { return std::shared_ptr(); } - std::shared_ptr getColumnarBatchSerializer( + void releaseShuffleReader(ResourceHandle handle) override {} + ResourceHandle createColumnarBatchSerializer( + MemoryManager* memoryManager, + std::shared_ptr arrowPool, + struct ArrowSchema* cSchema) override { + return kInvalidResourceHandle; + } + std::unique_ptr createTempColumnarBatchSerializer( MemoryManager* memoryManager, std::shared_ptr arrowPool, struct ArrowSchema* cSchema) override { + return std::unique_ptr(); + } + std::shared_ptr getColumnarBatchSerializer(ResourceHandle handle) override { return std::shared_ptr(); } + void releaseColumnarBatchSerializer(ResourceHandle handle) override {} private: + ConcurrentMap> resultIteratorHolder_; + class DummyResultIterator : public ColumnarBatchIterator { public: std::shared_ptr next() override { @@ -95,20 +140,21 @@ class DummyExecutionCtx final : public ExecutionCtx { }; }; -static std::shared_ptr DummyExecutionCtxFactory() { - return std::make_shared(); +static ExecutionCtx* DummyExecutionCtxFactory() { + return new DummyExecutionCtx(); } TEST(TestExecutionCtx, CreateExecutionCtx) { setExecutionCtxFactory(DummyExecutionCtxFactory); - auto executionCtxP = createExecutionCtx(); - auto& executionCtx = *executionCtxP.get(); - ASSERT_EQ(typeid(executionCtx), typeid(DummyExecutionCtx)); + auto executionCtx = createExecutionCtx(); + ASSERT_EQ(typeid(*executionCtx), typeid(DummyExecutionCtx)); + releaseExecutionCtx(executionCtx); } TEST(TestExecutionCtx, GetResultIterator) { auto executionCtx = std::make_shared(); - auto iter = executionCtx->getResultIterator(nullptr, "/tmp/test-spill", {}, {}); + auto handle = executionCtx->createResultIterator(nullptr, "/tmp/test-spill", {}, {}); + auto iter = executionCtx->getResultIterator(handle); ASSERT_TRUE(iter->hasNext()); auto next = iter->next(); ASSERT_NE(next, nullptr); diff --git a/cpp/core/jni/ConcurrentMap.h b/cpp/core/utils/ConcurrentMap.h similarity index 82% rename from cpp/core/jni/ConcurrentMap.h rename to cpp/core/utils/ConcurrentMap.h index e2e12dd3bbc8..2a619039a5db 100644 --- a/cpp/core/jni/ConcurrentMap.h +++ b/cpp/core/utils/ConcurrentMap.h @@ -17,7 +17,6 @@ #pragma once -#include #include #include #include @@ -25,6 +24,9 @@ namespace gluten { +using ResourceHandle = int64_t; +constexpr static ResourceHandle kInvalidResourceHandle = -1; + /** * An utility class that map module id to module pointers. * @tparam Holder class of the object to hold. @@ -34,19 +36,19 @@ class ConcurrentMap { public: ConcurrentMap() : moduleId_(kInitModuleId) {} - jlong insert(Holder holder) { + ResourceHandle insert(Holder holder) { std::lock_guard lock(mtx_); - jlong result = moduleId_++; - map_.insert(std::pair(result, holder)); + ResourceHandle result = moduleId_++; + map_.insert(std::pair(result, holder)); return result; } - void erase(jlong moduleId) { + void erase(ResourceHandle moduleId) { std::lock_guard lock(mtx_); map_.erase(moduleId); } - Holder lookup(jlong moduleId) { + Holder lookup(ResourceHandle moduleId) { std::lock_guard lock(mtx_); auto it = map_.find(moduleId); if (it != map_.end()) { @@ -70,11 +72,11 @@ class ConcurrentMap { // to allow for easier debugging of uninitialized java variables. static constexpr int kInitModuleId = 4; - int64_t moduleId_; + ResourceHandle moduleId_; std::mutex mtx_; // map from module ids returned to Java and module pointers - std::unordered_map map_; + std::unordered_map map_; }; } // namespace gluten diff --git a/cpp/velox/benchmarks/BenchmarkUtils.cc b/cpp/velox/benchmarks/BenchmarkUtils.cc index d53d91ff339c..29fdfb0a3eb8 100644 --- a/cpp/velox/benchmarks/BenchmarkUtils.cc +++ b/cpp/velox/benchmarks/BenchmarkUtils.cc @@ -35,9 +35,8 @@ namespace { std::unordered_map bmConfMap = {{gluten::kSparkBatchSize, FLAGS_batch_size}}; -std::shared_ptr veloxExecutionCtxFactory( - const std::unordered_map& sparkConfs) { - return std::make_shared(sparkConfs); +gluten::ExecutionCtx* veloxExecutionCtxFactory(const std::unordered_map& sparkConfs) { + return new gluten::VeloxExecutionCtx(sparkConfs); } } // anonymous namespace diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc b/cpp/velox/benchmarks/GenericBenchmark.cc index 8d5192e9f00f..62ac5298e679 100644 --- a/cpp/velox/benchmarks/GenericBenchmark.cc +++ b/cpp/velox/benchmarks/GenericBenchmark.cc @@ -126,6 +126,7 @@ auto BM_Generic = [](::benchmark::State& state, setCpu(state.thread_index()); } auto memoryManager = getDefaultMemoryManager(); + auto executionCtx = gluten::createExecutionCtx(); const auto& filePath = getExampleFilePath(substraitJsonFile); auto plan = getPlanFromFile(filePath); auto startTime = std::chrono::steady_clock::now(); @@ -133,7 +134,6 @@ auto BM_Generic = [](::benchmark::State& state, WriterMetrics writerMetrics{}; for (auto _ : state) { - auto executionCtx = gluten::createExecutionCtx(); std::vector> inputIters; std::vector inputItersRaw; if (!inputFiles.empty()) { @@ -148,9 +148,10 @@ auto BM_Generic = [](::benchmark::State& state, } executionCtx->parsePlan(reinterpret_cast(plan.data()), plan.size()); - auto resultIter = - executionCtx->getResultIterator(memoryManager.get(), "/tmp/test-spill", std::move(inputIters), conf); - auto veloxPlan = std::dynamic_pointer_cast(executionCtx)->getVeloxPlan(); + auto iterHandle = + executionCtx->createResultIterator(memoryManager.get(), "/tmp/test-spill", std::move(inputIters), conf); + auto resultIter = executionCtx->getResultIterator(iterHandle); + auto veloxPlan = dynamic_cast(executionCtx)->getVeloxPlan(); if (FLAGS_with_shuffle) { int64_t shuffleWriteTime; TIME_NANO_START(shuffleWriteTime); @@ -206,6 +207,7 @@ auto BM_Generic = [](::benchmark::State& state, auto statsStr = facebook::velox::exec::printPlanWithStats(*planNode, task->taskStats(), true); std::cout << statsStr << std::endl; } + gluten::releaseExecutionCtx(executionCtx); auto endTime = std::chrono::steady_clock::now(); auto duration = std::chrono::duration_cast(endTime - startTime).count(); diff --git a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc index 11a1333ccfb5..1f12bc0234d6 100644 --- a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc +++ b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc @@ -163,8 +163,6 @@ class GoogleBenchmarkArrowParquetWriteCacheScanBenchmark : public GoogleBenchmar // reuse the ParquetWriteConverter for batches caused system % increase a lot auto fileName = "arrow_parquet_write.parquet"; - auto executionCtx = std::dynamic_pointer_cast(gluten::createExecutionCtx()); - for (auto _ : state) { // Choose compression std::shared_ptr<::parquet::WriterProperties> props = @@ -258,7 +256,7 @@ class GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark : public GoogleBenchmar // reuse the ParquetWriteConverter for batches caused system % increase a lot auto fileName = "velox_parquet_write.parquet"; - auto executionCtx = std::dynamic_pointer_cast(gluten::createExecutionCtx()); + auto executionCtx = gluten::createExecutionCtx(); auto memoryManager = getDefaultMemoryManager(); auto veloxPool = memoryManager->getAggregateMemoryPool(); @@ -294,6 +292,7 @@ class GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark : public GoogleBenchmar benchmark::Counter(initTime, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); state.counters["write_time"] = benchmark::Counter(writeTime, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + gluten::releaseExecutionCtx(executionCtx); } }; diff --git a/cpp/velox/benchmarks/QueryBenchmark.cc b/cpp/velox/benchmarks/QueryBenchmark.cc index 9ecf538c9c97..bfdd5c41d877 100644 --- a/cpp/velox/benchmarks/QueryBenchmark.cc +++ b/cpp/velox/benchmarks/QueryBenchmark.cc @@ -35,7 +35,7 @@ const std::string getFilePath(const std::string& fileName) { // Used by unit test and benchmark. std::shared_ptr getResultIterator( std::shared_ptr veloxPool, - std::shared_ptr executionCtx, + ExecutionCtx* executionCtx, const std::vector>& setScanInfos, std::shared_ptr& veloxPlan) { auto ctxPool = veloxPool->addAggregateChild( @@ -65,7 +65,9 @@ std::shared_ptr getResultIterator( "/tmp/test-spill", executionCtx->getConfMap(), executionCtx->getSparkTaskInfo()); - return std::make_shared(std::move(wholestageIter), executionCtx); + auto iter = std::make_shared(std::move(wholestageIter), executionCtx); + auto handle = executionCtx->addResultIterator(iter); + return executionCtx->getResultIterator(handle); } auto BM = [](::benchmark::State& state, @@ -76,6 +78,7 @@ auto BM = [](::benchmark::State& state, auto plan = getPlanFromFile(filePath); auto memoryManager = getDefaultMemoryManager(); + auto executionCtx = gluten::createExecutionCtx(); auto veloxPool = memoryManager->getAggregateMemoryPool(); std::vector> scanInfos; @@ -90,7 +93,6 @@ auto BM = [](::benchmark::State& state, for (auto _ : state) { state.PauseTiming(); - auto executionCtx = std::dynamic_pointer_cast(gluten::createExecutionCtx()); state.ResumeTiming(); executionCtx->parsePlan(reinterpret_cast(plan.data()), plan.size()); @@ -107,6 +109,7 @@ auto BM = [](::benchmark::State& state, std::cout << maybeBatch.ValueOrDie()->ToString() << std::endl; } } + gluten::releaseExecutionCtx(executionCtx); }; #define orc_reader_decimal 1 diff --git a/cpp/velox/compute/VeloxExecutionCtx.cc b/cpp/velox/compute/VeloxExecutionCtx.cc index af3ab650ae35..29ed15bbf42d 100644 --- a/cpp/velox/compute/VeloxExecutionCtx.cc +++ b/cpp/velox/compute/VeloxExecutionCtx.cc @@ -25,7 +25,6 @@ #include "config/GlutenConfig.h" #include "operators/serializer/VeloxRowToColumnarConverter.h" #include "shuffle/VeloxShuffleWriter.h" -#include "velox/common/file/FileSystems.h" using namespace facebook; @@ -71,7 +70,7 @@ void VeloxExecutionCtx::getInfoAndIds( } } -std::shared_ptr VeloxExecutionCtx::getResultIterator( +ResourceHandle VeloxExecutionCtx::createResultIterator( MemoryManager* memoryManager, const std::string& spillDir, const std::vector>& inputs, @@ -100,27 +99,76 @@ std::shared_ptr VeloxExecutionCtx::getResultIterator( // Source node is not required. auto wholestageIter = std::make_unique( veloxPool, veloxPlan_, streamIds, spillDir, sessionConf, taskInfo_); - return std::make_shared(std::move(wholestageIter), shared_from_this()); + auto resultIter = std::make_shared(std::move(wholestageIter), this); + return resultIteratorHolder_.insert(resultIter); } else { auto wholestageIter = std::make_unique( veloxPool, veloxPlan_, scanIds, scanInfos, streamIds, spillDir, sessionConf, taskInfo_); - return std::make_shared(std::move(wholestageIter), shared_from_this()); + auto resultIter = std::make_shared(std::move(wholestageIter), this); + return resultIteratorHolder_.insert(resultIter); } } -std::shared_ptr VeloxExecutionCtx::getColumnar2RowConverter(MemoryManager* memoryManager) { +ResourceHandle VeloxExecutionCtx::addResultIterator(std::shared_ptr iterator) { + return resultIteratorHolder_.insert(iterator); +} + +std::shared_ptr VeloxExecutionCtx::getResultIterator(ResourceHandle iterHandle) { + auto instance = resultIteratorHolder_.lookup(iterHandle); + if (!instance) { + std::string errorMessage = "invalid handle for ResultIterator " + std::to_string(iterHandle); + throw gluten::GlutenException(errorMessage); + } + return instance; +} + +void VeloxExecutionCtx::releaseResultIterator(ResourceHandle iterHandle) { + resultIteratorHolder_.erase(iterHandle); +} + +ResourceHandle VeloxExecutionCtx::createColumnar2RowConverter(MemoryManager* memoryManager) { auto ctxVeloxPool = getLeafVeloxPool(memoryManager); - return std::make_shared(ctxVeloxPool); + auto converter = std::make_shared(ctxVeloxPool); + return columnarToRowConverterHolder_.insert(converter); +} + +std::shared_ptr VeloxExecutionCtx::getColumnar2RowConverter(ResourceHandle handle) { + return columnarToRowConverterHolder_.lookup(handle); +} + +void VeloxExecutionCtx::releaseColumnar2RowConverter(ResourceHandle handle) { + columnarToRowConverterHolder_.erase(handle); +} + +ResourceHandle VeloxExecutionCtx::addBatch(std::shared_ptr batch) { + return columnarBatchHolder_.insert(batch); } -std::shared_ptr VeloxExecutionCtx::getRowToColumnarConverter( +std::shared_ptr VeloxExecutionCtx::getBatch(ResourceHandle handle) { + return columnarBatchHolder_.lookup(handle); +} + +void VeloxExecutionCtx::releaseBatch(ResourceHandle handle) { + columnarBatchHolder_.erase(handle); +} + +ResourceHandle VeloxExecutionCtx::createRow2ColumnarConverter( MemoryManager* memoryManager, struct ArrowSchema* cSchema) { auto ctxVeloxPool = getLeafVeloxPool(memoryManager); - return std::make_shared(cSchema, ctxVeloxPool); + auto converter = std::make_shared(cSchema, ctxVeloxPool); + return rowToColumnarConverterHolder_.insert(converter); } -std::shared_ptr VeloxExecutionCtx::createShuffleWriter( +std::shared_ptr VeloxExecutionCtx::getRow2ColumnarConverter(ResourceHandle handle) { + return rowToColumnarConverterHolder_.lookup(handle); +} + +void VeloxExecutionCtx::releaseRow2ColumnarConverter(ResourceHandle handle) { + rowToColumnarConverterHolder_.erase(handle); +} + +ResourceHandle VeloxExecutionCtx::createShuffleWriter( int numPartitions, std::shared_ptr partitionWriterCreator, const ShuffleWriterOptions& options, @@ -129,15 +177,75 @@ std::shared_ptr VeloxExecutionCtx::createShuffleWriter( GLUTEN_ASSIGN_OR_THROW( auto shuffle_writer, VeloxShuffleWriter::create(numPartitions, std::move(partitionWriterCreator), std::move(options), ctxPool)); - return shuffle_writer; + return shuffleWriterHolder_.insert(shuffle_writer); +} + +std::shared_ptr VeloxExecutionCtx::getShuffleWriter(ResourceHandle handle) { + return shuffleWriterHolder_.lookup(handle); } -std::shared_ptr VeloxExecutionCtx::getColumnarBatchSerializer( +void VeloxExecutionCtx::releaseShuffleWriter(ResourceHandle handle) { + shuffleWriterHolder_.erase(handle); +} + +ResourceHandle VeloxExecutionCtx::createDatasource( + const std::string& filePath, + MemoryManager* memoryManager, + std::shared_ptr schema) { + auto veloxPool = getAggregateVeloxPool(memoryManager); + auto datasource = std::make_shared(filePath, veloxPool, schema); + return datasourceHolder_.insert(datasource); +} + +std::shared_ptr VeloxExecutionCtx::getDatasource(ResourceHandle handle) { + return datasourceHolder_.lookup(handle); +} + +void VeloxExecutionCtx::releaseDatasource(ResourceHandle handle) { + datasourceHolder_.erase(handle); +} + +ResourceHandle VeloxExecutionCtx::createShuffleReader( + std::shared_ptr schema, + ReaderOptions options, + std::shared_ptr pool, + MemoryManager* memoryManager) { + auto ctxVeloxPool = getLeafVeloxPool(memoryManager); + auto shuffleReader = std::make_shared(schema, options, pool, ctxVeloxPool); + return shuffleReaderHolder_.insert(shuffleReader); +} + +std::shared_ptr VeloxExecutionCtx::getShuffleReader(ResourceHandle handle) { + return shuffleReaderHolder_.lookup(handle); +} + +void VeloxExecutionCtx::releaseShuffleReader(ResourceHandle handle) { + shuffleReaderHolder_.erase(handle); +} + +std::unique_ptr VeloxExecutionCtx::createTempColumnarBatchSerializer( + MemoryManager* memoryManager, + std::shared_ptr arrowPool, + struct ArrowSchema* cSchema) { + auto ctxVeloxPool = getLeafVeloxPool(memoryManager); + return std::make_unique(arrowPool, ctxVeloxPool, cSchema); +} + +ResourceHandle VeloxExecutionCtx::createColumnarBatchSerializer( MemoryManager* memoryManager, std::shared_ptr arrowPool, struct ArrowSchema* cSchema) { auto ctxVeloxPool = getLeafVeloxPool(memoryManager); - return std::make_shared(arrowPool, ctxVeloxPool, cSchema); + auto serializer = std::make_shared(arrowPool, ctxVeloxPool, cSchema); + return columnarBatchSerializerHolder_.insert(serializer); +} + +std::shared_ptr VeloxExecutionCtx::getColumnarBatchSerializer(ResourceHandle handle) { + return columnarBatchSerializerHolder_.lookup(handle); +} + +void VeloxExecutionCtx::releaseColumnarBatchSerializer(ResourceHandle handle) { + columnarBatchSerializerHolder_.erase(handle); } } // namespace gluten diff --git a/cpp/velox/compute/VeloxExecutionCtx.h b/cpp/velox/compute/VeloxExecutionCtx.h index c837189ea1e9..6fba0e428e31 100644 --- a/cpp/velox/compute/VeloxExecutionCtx.h +++ b/cpp/velox/compute/VeloxExecutionCtx.h @@ -17,12 +17,6 @@ #pragma once -#include -#include -#include -#include -#include - #include "WholeStageResultIterator.h" #include "compute/ExecutionCtx.h" #include "memory/VeloxMemoryManager.h" @@ -63,50 +57,65 @@ class VeloxExecutionCtx final : public ExecutionCtx { } // FIXME This is not thread-safe? - std::shared_ptr getResultIterator( + ResourceHandle createResultIterator( MemoryManager* memoryManager, const std::string& spillDir, const std::vector>& inputs = {}, const std::unordered_map& sessionConf = {}) override; + ResourceHandle addResultIterator(std::shared_ptr ptr) override; + std::shared_ptr getResultIterator(ResourceHandle handle) override; + void releaseResultIterator(ResourceHandle handle) override; - std::shared_ptr getColumnar2RowConverter(MemoryManager* memoryManager) override; + ResourceHandle createColumnar2RowConverter(MemoryManager* memoryManager) override; + std::shared_ptr getColumnar2RowConverter(ResourceHandle handle) override; + void releaseColumnar2RowConverter(ResourceHandle handle) override; - std::shared_ptr getRowToColumnarConverter( - MemoryManager* memoryManager, - struct ArrowSchema* cSchema) override; + ResourceHandle addBatch(std::shared_ptr ptr) override; + std::shared_ptr getBatch(ResourceHandle handle) override; + void releaseBatch(ResourceHandle handle) override; - std::shared_ptr createShuffleWriter( + ResourceHandle createRow2ColumnarConverter(MemoryManager* memoryManager, struct ArrowSchema* cSchema) override; + std::shared_ptr getRow2ColumnarConverter(ResourceHandle handle) override; + void releaseRow2ColumnarConverter(ResourceHandle handle) override; + + ResourceHandle createShuffleWriter( int numPartitions, std::shared_ptr partitionWriterCreator, const ShuffleWriterOptions& options, MemoryManager* memoryManager) override; + std::shared_ptr getShuffleWriter(ResourceHandle handle) override; + void releaseShuffleWriter(ResourceHandle handle) override; std::shared_ptr getMetrics(ColumnarBatchIterator* rawIter, int64_t exportNanos) override { auto iter = static_cast(rawIter); return iter->getMetrics(exportNanos); } - std::shared_ptr getDatasource( + ResourceHandle createDatasource( const std::string& filePath, MemoryManager* memoryManager, - std::shared_ptr schema) override { - auto veloxPool = getAggregateVeloxPool(memoryManager); - return std::make_shared(filePath, veloxPool, schema); - } + std::shared_ptr schema) override; + std::shared_ptr getDatasource(ResourceHandle handle) override; + void releaseDatasource(ResourceHandle handle) override; - std::shared_ptr createShuffleReader( + ResourceHandle createShuffleReader( std::shared_ptr schema, ReaderOptions options, std::shared_ptr pool, - MemoryManager* memoryManager) override { - auto ctxVeloxPool = getLeafVeloxPool(memoryManager); - return std::make_shared(schema, options, pool, ctxVeloxPool); - } + MemoryManager* memoryManager) override; + std::shared_ptr getShuffleReader(ResourceHandle handle) override; + void releaseShuffleReader(ResourceHandle handle) override; - std::shared_ptr getColumnarBatchSerializer( + std::unique_ptr createTempColumnarBatchSerializer( MemoryManager* memoryManager, std::shared_ptr arrowPool, struct ArrowSchema* cSchema) override; + ResourceHandle createColumnarBatchSerializer( + MemoryManager* memoryManager, + std::shared_ptr arrowPool, + struct ArrowSchema* cSchema) override; + std::shared_ptr getColumnarBatchSerializer(ResourceHandle handle) override; + void releaseColumnarBatchSerializer(ResourceHandle handle) override; std::shared_ptr getVeloxPlan() { return veloxPlan_; @@ -120,6 +129,15 @@ class VeloxExecutionCtx final : public ExecutionCtx { std::vector& streamIds); private: + ConcurrentMap> columnarBatchHolder_; + ConcurrentMap> datasourceHolder_; + ConcurrentMap> columnarToRowConverterHolder_; + ConcurrentMap> shuffleReaderHolder_; + ConcurrentMap> shuffleWriterHolder_; + ConcurrentMap> columnarBatchSerializerHolder_; + ConcurrentMap> rowToColumnarConverterHolder_; + ConcurrentMap> resultIteratorHolder_; + std::vector> inputIters_; std::shared_ptr veloxPlan_; }; diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index 6f27cca08def..38bebc2a48f6 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -16,7 +16,6 @@ */ #include -#include "arrow/c/bridge.h" #include #include @@ -36,9 +35,8 @@ using namespace facebook; namespace { -std::shared_ptr veloxExecutionCtxFactory( - const std::unordered_map& sparkConfs) { - return std::make_shared(sparkConfs); +gluten::ExecutionCtx* veloxExecutionCtxFactory(const std::unordered_map& sparkConfs) { + return new gluten::VeloxExecutionCtx(sparkConfs); } } // namespace @@ -84,6 +82,27 @@ JNIEXPORT void JNICALL Java_io_glutenproject_init_BackendJniWrapper_initializeBa JNI_METHOD_END() } +JNIEXPORT jlong JNICALL Java_io_glutenproject_init_BackendJniWrapper_createExecutionCtx( // NOLINT + JNIEnv* env, + jclass) { + JNI_METHOD_START + auto executionCtx = gluten::createExecutionCtx(); + return reinterpret_cast(executionCtx); + JNI_METHOD_END(-1) +} + +JNIEXPORT void JNICALL Java_io_glutenproject_init_BackendJniWrapper_releaseExecutionCtx( // NOLINT + JNIEnv* env, + jclass, + jlong ctxHandle) { + JNI_METHOD_START + auto executionCtx = reinterpret_cast(ctxHandle); + GLUTEN_CHECK(executionCtx != nullptr, "ExecutionCtx should not be null."); + + gluten::releaseExecutionCtx(executionCtx); + JNI_METHOD_END() +} + JNIEXPORT void JNICALL Java_io_glutenproject_udf_UdfJniWrapper_nativeLoadUdfLibraries( // NOLINT JNIEnv* env, jclass, diff --git a/cpp/velox/operators/writer/VeloxParquetDatasource.cc b/cpp/velox/operators/writer/VeloxParquetDatasource.cc index b9afdb0ea220..7a9267cffa94 100644 --- a/cpp/velox/operators/writer/VeloxParquetDatasource.cc +++ b/cpp/velox/operators/writer/VeloxParquetDatasource.cc @@ -17,13 +17,11 @@ #include "VeloxParquetDatasource.h" -#include #include #include #include #include "arrow/c/bridge.h" -#include "compute/ExecutionCtx.h" #include "compute/VeloxExecutionCtx.h" #include "config/GlutenConfig.h" @@ -38,8 +36,6 @@ using namespace facebook::velox::dwio::common; namespace gluten { void VeloxParquetDatasource::init(const std::unordered_map& sparkConfs) { - auto executionCtx = std::dynamic_pointer_cast(gluten::createExecutionCtx()); - if (strncmp(filePath_.c_str(), "file:", 5) == 0) { sink_ = std::make_unique(filePath_.substr(5)); } else if (strncmp(filePath_.c_str(), "hdfs:", 5) == 0) { diff --git a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/CelebornColumnarBatchSerializer.scala b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/CelebornColumnarBatchSerializer.scala index 4afa83ae721f..5388c1173215 100644 --- a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/CelebornColumnarBatchSerializer.scala +++ b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/CelebornColumnarBatchSerializer.scala @@ -17,6 +17,7 @@ package org.apache.spark.shuffle import io.glutenproject.GlutenConfig +import io.glutenproject.exec.ExecutionCtxs import io.glutenproject.memory.arrowalloc.ArrowBufferAllocators import io.glutenproject.memory.nmm.NativeMemoryManagers import io.glutenproject.utils.ArrowAbiUtil @@ -61,7 +62,8 @@ private class CelebornColumnarBatchSerializerInstance( extends SerializerInstance with Logging { - private lazy val shuffleReaderHandle = { + private lazy val (executionCtxHandle, shuffleReaderHandle) = { + val executionCtxHandle = ExecutionCtxs.contextInstance().getHandle val allocator: BufferAllocator = ArrowBufferAllocators .contextInstance() .newChildAllocator("GlutenColumnarBatch deserialize", 0, Long.MaxValue) @@ -80,7 +82,8 @@ private class CelebornColumnarBatchSerializerInstance( GlutenConfig.getConf.columnarShuffleCodecBackend.orNull val handle = ShuffleReaderJniWrapper.INSTANCE.make( cSchema.memoryAddress(), - NativeMemoryManagers.contextInstance("ShuffleReader").getNativeInstanceId, + executionCtxHandle, + NativeMemoryManagers.contextInstance("ShuffleReader").getNativeInstanceHandle, compressionCodec, compressionCodecBackend, GlutenConfig.getConf.columnarShuffleCompressionMode @@ -91,17 +94,19 @@ private class CelebornColumnarBatchSerializerInstance( // should keep alive before all buffers to finish consuming. TaskResources.addRecycler(s"CelebornShuffleReaderHandle_$handle", 50) { cSchema.close() - ShuffleReaderJniWrapper.INSTANCE.close(handle) + ShuffleReaderJniWrapper.INSTANCE.close(executionCtxHandle, handle) allocator.close() } - handle + (executionCtxHandle, handle) } override def deserializeStream(in: InputStream): DeserializationStream = { new DeserializationStream { private lazy val byteIn: JniByteInputStream = JniByteInputStreams.create(in) private lazy val wrappedOut: GeneralOutIterator = new ColumnarBatchOutIterator( - ShuffleReaderJniWrapper.INSTANCE.readStream(shuffleReaderHandle, byteIn)) + executionCtxHandle, + ShuffleReaderJniWrapper.INSTANCE + .readStream(executionCtxHandle, shuffleReaderHandle, byteIn)) private var cb: ColumnarBatch = _ diff --git a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/CelebornHashBasedVeloxColumnarShuffleWriter.scala b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/CelebornHashBasedVeloxColumnarShuffleWriter.scala index 1209a6cece09..721dfe3f3f2d 100644 --- a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/CelebornHashBasedVeloxColumnarShuffleWriter.scala +++ b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/CelebornHashBasedVeloxColumnarShuffleWriter.scala @@ -18,6 +18,7 @@ package org.apache.spark.shuffle import io.glutenproject.GlutenConfig import io.glutenproject.columnarbatch.ColumnarBatches +import io.glutenproject.exec.ExecutionCtxs import io.glutenproject.memory.memtarget.spark.Spiller import io.glutenproject.memory.nmm.NativeMemoryManagers import io.glutenproject.vectorized._ @@ -79,6 +80,7 @@ class CelebornHashBasedVeloxColumnarShuffleWriter[K, V]( private var stopping = false private var mapStatus: MapStatus = _ private var nativeShuffleWriter: Long = -1L + private lazy val executionCtxHandle: Long = ExecutionCtxs.contextInstance().getHandle private var splitResult: SplitResult = _ @@ -120,6 +122,7 @@ class CelebornHashBasedVeloxColumnarShuffleWriter[K, V]( GlutenConfig.getConf.columnarShuffleCompressionMode, celebornConf.clientPushBufferMaxSize, celebornPartitionPusher, + executionCtxHandle, NativeMemoryManagers .create( "CelebornShuffleWriter", @@ -134,20 +137,21 @@ class CelebornHashBasedVeloxColumnarShuffleWriter[K, V]( } logInfo(s"Gluten shuffle writer: Trying to push $size bytes of data") // fixme pass true when being called by self - val pushed = jniWrapper.nativeEvict(nativeShuffleWriter, size, false) + val pushed = + jniWrapper.nativeEvict(executionCtxHandle, nativeShuffleWriter, size, false) logInfo(s"Gluten shuffle writer: Pushed $pushed / $size bytes of data") pushed } } ) - .getNativeInstanceId, + .getNativeInstanceHandle, handle, context.taskAttemptId(), "celeborn" ) } val startTime = System.nanoTime() - val bytes = jniWrapper.split(nativeShuffleWriter, cb.numRows, handle) + val bytes = jniWrapper.split(executionCtxHandle, nativeShuffleWriter, cb.numRows, handle) dep.metrics("dataSize").add(bytes) dep.metrics("splitTime").add(System.nanoTime() - startTime) dep.metrics("numInputRows").add(cb.numRows) @@ -159,7 +163,7 @@ class CelebornHashBasedVeloxColumnarShuffleWriter[K, V]( val startTime = System.nanoTime() if (nativeShuffleWriter != -1L) { - splitResult = jniWrapper.stop(nativeShuffleWriter) + splitResult = jniWrapper.stop(executionCtxHandle, nativeShuffleWriter) } dep @@ -202,7 +206,7 @@ class CelebornHashBasedVeloxColumnarShuffleWriter[K, V]( } def closeShuffleWriter(): Unit = { - jniWrapper.close(nativeShuffleWriter) + jniWrapper.close(executionCtxHandle, nativeShuffleWriter) } def getPartitionLengths: Array[Long] = partitionLengths diff --git a/gluten-data/src/main/java/io/glutenproject/columnarbatch/ColumnarBatchJniWrapper.java b/gluten-data/src/main/java/io/glutenproject/columnarbatch/ColumnarBatchJniWrapper.java index fb4d14ab0218..712e84bff4ad 100644 --- a/gluten-data/src/main/java/io/glutenproject/columnarbatch/ColumnarBatchJniWrapper.java +++ b/gluten-data/src/main/java/io/glutenproject/columnarbatch/ColumnarBatchJniWrapper.java @@ -23,19 +23,20 @@ public class ColumnarBatchJniWrapper extends JniInitialized { private ColumnarBatchJniWrapper() {} - public native String getType(long handle); + public native String getType(long executionCtxHandle, long batchHandle); - public native long numColumns(long handle); + public native long numColumns(long executionCtxHandle, long batchHandle); - public native long numRows(long handle); + public native long numRows(long executionCtxHandle, long batchHandle); - public native long numBytes(long handle); + public native long numBytes(long executionCtxHandle, long batchHandle); - public native long compose(long[] handles); + public native long compose(long executionCtxHandle, long[] batchHandles); - public native long createWithArrowArray(long cSchema, long cArray); + public native long createWithArrowArray(long executionCtxHandle, long cSchema, long cArray); - public native void exportToArrow(long handle, long cSchema, long cArray); + public native void exportToArrow( + long executionCtxHandle, long batchHandle, long cSchema, long cArray); - public native void close(long handle); + public native void close(long executionCtxHandle, long batchHandle); } diff --git a/gluten-data/src/main/java/io/glutenproject/columnarbatch/ColumnarBatches.java b/gluten-data/src/main/java/io/glutenproject/columnarbatch/ColumnarBatches.java index 004c70c39e99..f96016eabaec 100644 --- a/gluten-data/src/main/java/io/glutenproject/columnarbatch/ColumnarBatches.java +++ b/gluten-data/src/main/java/io/glutenproject/columnarbatch/ColumnarBatches.java @@ -17,12 +17,14 @@ package io.glutenproject.columnarbatch; import io.glutenproject.exception.GlutenException; +import io.glutenproject.exec.ExecutionCtxs; import io.glutenproject.memory.arrowalloc.ArrowBufferAllocators; import io.glutenproject.utils.ArrowAbiUtil; import io.glutenproject.utils.ArrowUtil; import io.glutenproject.utils.ImplicitClass; import io.glutenproject.vectorized.ArrowWritableColumnVector; +import com.google.common.base.Preconditions; import org.apache.arrow.c.ArrowArray; import org.apache.arrow.c.ArrowSchema; import org.apache.arrow.c.CDataDictionaryProvider; @@ -147,7 +149,10 @@ private static ColumnarBatch load(BufferAllocator allocator, ColumnarBatch input ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator); CDataDictionaryProvider provider = new CDataDictionaryProvider()) { ColumnarBatchJniWrapper.INSTANCE.exportToArrow( - handle, cSchema.memoryAddress(), cArray.memoryAddress()); + ExecutionCtxs.contextInstance().getHandle(), + handle, + cSchema.memoryAddress(), + cArray.memoryAddress()); Data.exportSchema( allocator, ArrowUtil.toArrowSchema(cSchema, allocator, provider), provider, arrowSchema); @@ -179,14 +184,15 @@ private static ColumnarBatch offload(BufferAllocator allocator, ColumnarBatch in if (!isHeavyBatch(input)) { throw new IllegalArgumentException("batch is not Arrow columnar batch"); } + final long executionCtxHandle = ExecutionCtxs.contextInstance().getHandle(); try (ArrowArray cArray = ArrowArray.allocateNew(allocator); ArrowSchema cSchema = ArrowSchema.allocateNew(allocator)) { ArrowAbiUtil.exportFromSparkColumnarBatch( ArrowBufferAllocators.contextInstance(), input, cSchema, cArray); long handle = ColumnarBatchJniWrapper.INSTANCE.createWithArrowArray( - cSchema.memoryAddress(), cArray.memoryAddress()); - ColumnarBatch output = ColumnarBatches.create(handle); + executionCtxHandle, cSchema.memoryAddress(), cArray.memoryAddress()); + ColumnarBatch output = ColumnarBatches.create(executionCtxHandle, handle); // Follow input's reference count. This might be optimized using // automatic clean-up or once the extensibility of ColumnarBatch is enriched @@ -243,11 +249,12 @@ public InternalRow next() { } public static void close(ColumnarBatch input) { - ColumnarBatchJniWrapper.INSTANCE.close(ColumnarBatches.getNativeHandle(input)); + ColumnarBatchJniWrapper.INSTANCE.close( + ColumnarBatches.getExecutionCtxHandle(input), ColumnarBatches.getNativeHandle(input)); } - public static void close(long handle) { - ColumnarBatchJniWrapper.INSTANCE.close(handle); + public static void close(long executionCtxHandle, long handle) { + ColumnarBatchJniWrapper.INSTANCE.close(executionCtxHandle, handle); } /** @@ -256,19 +263,30 @@ public static void close(long handle) { */ public static long compose(ColumnarBatch... batches) { long[] handles = Arrays.stream(batches).mapToLong(ColumnarBatches::getNativeHandle).toArray(); - return ColumnarBatchJniWrapper.INSTANCE.compose(handles); + // we assume all input batches should be managed by same ExecutionCtx. + long[] executionCtxHandles = + Arrays.stream(batches) + .mapToLong(ColumnarBatches::getExecutionCtxHandle) + .distinct() + .toArray(); + Preconditions.checkState( + executionCtxHandles.length == 1, + "All input batches should be managed by same ExecutionCtx."); + return ColumnarBatchJniWrapper.INSTANCE.compose(executionCtxHandles[0], handles); } public static long numBytes(ColumnarBatch input) { - return ColumnarBatchJniWrapper.INSTANCE.numBytes(ColumnarBatches.getNativeHandle(input)); + return ColumnarBatchJniWrapper.INSTANCE.numBytes( + ColumnarBatches.getExecutionCtxHandle(input), ColumnarBatches.getNativeHandle(input)); } public static String getType(ColumnarBatch input) { - return ColumnarBatchJniWrapper.INSTANCE.getType(ColumnarBatches.getNativeHandle(input)); + return ColumnarBatchJniWrapper.INSTANCE.getType( + ColumnarBatches.getExecutionCtxHandle(input), ColumnarBatches.getNativeHandle(input)); } - public static ColumnarBatch create(long nativeHandle) { - final IndicatorVector iv = new IndicatorVector(nativeHandle); + public static ColumnarBatch create(long executionCtxHandle, long nativeHandle) { + final IndicatorVector iv = new IndicatorVector(executionCtxHandle, nativeHandle); int numColumns = Math.toIntExact(iv.getNumColumns()); int numRows = Math.toIntExact(iv.getNumRows()); if (numColumns == 0) { @@ -313,4 +331,14 @@ public static long getNativeHandle(ColumnarBatch batch) { IndicatorVector iv = (IndicatorVector) batch.column(0); return iv.getNativeHandle(); } + + public static long getExecutionCtxHandle(ColumnarBatch batch) { + if (!isLightBatch(batch)) { + throw new UnsupportedOperationException( + "Cannot get native batch handle due to " + + "input batch is not intermediate Gluten batch"); + } + IndicatorVector iv = (IndicatorVector) batch.column(0); + return iv.getExecutionCtxHandle(); + } } diff --git a/gluten-data/src/main/java/io/glutenproject/columnarbatch/IndicatorVector.java b/gluten-data/src/main/java/io/glutenproject/columnarbatch/IndicatorVector.java index 3d2633d6fe96..e0938ad8b3cb 100644 --- a/gluten-data/src/main/java/io/glutenproject/columnarbatch/IndicatorVector.java +++ b/gluten-data/src/main/java/io/glutenproject/columnarbatch/IndicatorVector.java @@ -26,28 +26,34 @@ import java.util.concurrent.atomic.AtomicLong; public class IndicatorVector extends ColumnVector { - private final long nativeHandle; + private final long executionCtxHandle; + private final long batchHandle; private final AtomicLong refCnt = new AtomicLong(1L); - protected IndicatorVector(long nativeHandle) { + protected IndicatorVector(long executionCtxHandle, long batchHandle) { super(DataTypes.NullType); - this.nativeHandle = nativeHandle; + this.executionCtxHandle = executionCtxHandle; + this.batchHandle = batchHandle; } public long getNativeHandle() { - return nativeHandle; + return batchHandle; + } + + public long getExecutionCtxHandle() { + return executionCtxHandle; } public String getType() { - return ColumnarBatchJniWrapper.INSTANCE.getType(nativeHandle); + return ColumnarBatchJniWrapper.INSTANCE.getType(executionCtxHandle, batchHandle); } public long getNumColumns() { - return ColumnarBatchJniWrapper.INSTANCE.numColumns(nativeHandle); + return ColumnarBatchJniWrapper.INSTANCE.numColumns(executionCtxHandle, batchHandle); } public long getNumRows() { - return ColumnarBatchJniWrapper.INSTANCE.numRows(nativeHandle); + return ColumnarBatchJniWrapper.INSTANCE.numRows(executionCtxHandle, batchHandle); } public long refCnt() { @@ -65,7 +71,7 @@ public void close() { return; } if (refCnt.decrementAndGet() == 0) { - ColumnarBatchJniWrapper.INSTANCE.close(nativeHandle); + ColumnarBatchJniWrapper.INSTANCE.close(executionCtxHandle, batchHandle); } } diff --git a/gluten-data/src/main/java/io/glutenproject/init/BackendJniWrapper.java b/gluten-data/src/main/java/io/glutenproject/init/BackendJniWrapper.java index d045060c00b0..889cbff98e93 100644 --- a/gluten-data/src/main/java/io/glutenproject/init/BackendJniWrapper.java +++ b/gluten-data/src/main/java/io/glutenproject/init/BackendJniWrapper.java @@ -16,15 +16,13 @@ */ package io.glutenproject.init; -class BackendJniWrapper { +public class BackendJniWrapper { private BackendJniWrapper() {} - // For global context - static native void initializeBackend(byte[] configPlan); + public static native void initializeBackend(byte[] configPlan); - // For local context - static native long makeTaskContext(); + public static native long createExecutionCtx(); - static native void closeTaskContext(long handle); + public static native void releaseExecutionCtx(long handle); } diff --git a/gluten-data/src/main/java/io/glutenproject/memory/nmm/NativeMemoryManager.java b/gluten-data/src/main/java/io/glutenproject/memory/nmm/NativeMemoryManager.java index e0998b111134..7e877f602fb2 100644 --- a/gluten-data/src/main/java/io/glutenproject/memory/nmm/NativeMemoryManager.java +++ b/gluten-data/src/main/java/io/glutenproject/memory/nmm/NativeMemoryManager.java @@ -28,13 +28,14 @@ public class NativeMemoryManager implements TaskResource { private static final Logger LOGGER = LoggerFactory.getLogger(NativeMemoryManager.class); - private final long nativeInstanceId; + private final long nativeInstanceHandle; private final String name; private final ReservationListener listener; - private NativeMemoryManager(String name, long nativeInstanceId, ReservationListener listener) { + private NativeMemoryManager( + String name, long nativeInstanceHandle, ReservationListener listener) { this.name = name; - this.nativeInstanceId = nativeInstanceId; + this.nativeInstanceHandle = nativeInstanceHandle; this.listener = listener; } @@ -45,19 +46,19 @@ public static NativeMemoryManager create(String name, ReservationListener listen name, create(name, allocatorId, reservationBlockSize, listener), listener); } - public long getNativeInstanceId() { - return this.nativeInstanceId; + public long getNativeInstanceHandle() { + return this.nativeInstanceHandle; } public byte[] collectMemoryUsage() { - return collectMemoryUsage(nativeInstanceId); + return collectMemoryUsage(nativeInstanceHandle); } public long shrink(long size) { - return shrink(nativeInstanceId, size); + return shrink(nativeInstanceHandle, size); } - private static native long shrink(long nativeInstanceId, long size); + private static native long shrink(long nativeInstanceHandle, long size); private static native long create( String name, long allocatorId, long reservationBlockSize, ReservationListener listener); @@ -68,7 +69,7 @@ private static native long create( @Override public void release() throws Exception { - release(nativeInstanceId); + release(nativeInstanceHandle); if (listener.getUsedBytes() != 0) { LOGGER.warn( name diff --git a/gluten-data/src/main/java/io/glutenproject/vectorized/ColumnarBatchOutIterator.java b/gluten-data/src/main/java/io/glutenproject/vectorized/ColumnarBatchOutIterator.java index 4d04dd4cf890..78638f9dfb77 100644 --- a/gluten-data/src/main/java/io/glutenproject/vectorized/ColumnarBatchOutIterator.java +++ b/gluten-data/src/main/java/io/glutenproject/vectorized/ColumnarBatchOutIterator.java @@ -24,54 +24,56 @@ import java.io.IOException; public class ColumnarBatchOutIterator extends GeneralOutIterator { - private final long handle; + private final long executionCtxHandle; + private final long iterHandle; - public ColumnarBatchOutIterator(long handle) throws IOException { + public ColumnarBatchOutIterator(long executionCtxHandle, long iterHandle) throws IOException { super(); - this.handle = handle; + this.executionCtxHandle = executionCtxHandle; + this.iterHandle = iterHandle; } @Override public String getId() { - // Using native handle as identifier - return String.valueOf(handle); + // Using native iterHandle as identifier + return String.valueOf(iterHandle); } - private native boolean nativeHasNext(long nativeHandle); + private native boolean nativeHasNext(long executionCtxHandle, long iterHandle); - private native long nativeNext(long nativeHandle); + private native long nativeNext(long executionCtxHandle, long iterHandle); - private native long nativeSpill(long nativeHandle, long size); + private native long nativeSpill(long executionCtxHandle, long iterHandle, long size); - private native void nativeClose(long nativeHandle); + private native void nativeClose(long executionCtxHandle, long iterHandle); - private native IMetrics nativeFetchMetrics(long nativeHandle); + private native IMetrics nativeFetchMetrics(long executionCtxHandle, long iterHandle); @Override public boolean hasNextInternal() throws IOException { - return nativeHasNext(handle); + return nativeHasNext(executionCtxHandle, iterHandle); } @Override public ColumnarBatch nextInternal() throws IOException { - long batchHandle = nativeNext(handle); + long batchHandle = nativeNext(executionCtxHandle, iterHandle); if (batchHandle == -1L) { return null; // stream ended } - return ColumnarBatches.create(batchHandle); + return ColumnarBatches.create(executionCtxHandle, batchHandle); } @Override public IMetrics getMetricsInternal() throws IOException, ClassNotFoundException { - return nativeFetchMetrics(handle); + return nativeFetchMetrics(executionCtxHandle, iterHandle); } public long spill(long size) { - return nativeSpill(handle, size); + return nativeSpill(executionCtxHandle, iterHandle, size); } @Override public void closeInternal() { - nativeClose(handle); + nativeClose(executionCtxHandle, iterHandle); } } diff --git a/gluten-data/src/main/java/io/glutenproject/vectorized/ColumnarBatchSerializerJniWrapper.java b/gluten-data/src/main/java/io/glutenproject/vectorized/ColumnarBatchSerializerJniWrapper.java index 63dfb9773c9f..d32f477a132f 100644 --- a/gluten-data/src/main/java/io/glutenproject/vectorized/ColumnarBatchSerializerJniWrapper.java +++ b/gluten-data/src/main/java/io/glutenproject/vectorized/ColumnarBatchSerializerJniWrapper.java @@ -25,12 +25,13 @@ public class ColumnarBatchSerializerJniWrapper extends JniInitialized { private ColumnarBatchSerializerJniWrapper() {} - public native ColumnarBatchSerializeResult serialize(long[] handles, long memoryManagerId); + public native ColumnarBatchSerializeResult serialize( + long executionCtxHandle, long[] handles, long memoryManagerHandle); // Return the native ColumnarBatchSerializer handle - public native long init(long cSchema, long memoryManagerId); + public native long init(long cSchema, long executionCtxHandle, long memoryManagerHandle); - public native long deserialize(long handle, byte[] data); + public native long deserialize(long executionCtxHandle, long serializerHandle, byte[] data); - public native void close(long handle); + public native void close(long executionCtxHandle, long serializerHandle); } diff --git a/gluten-data/src/main/java/io/glutenproject/vectorized/NativeColumnarToRowJniWrapper.java b/gluten-data/src/main/java/io/glutenproject/vectorized/NativeColumnarToRowJniWrapper.java index e049dbcd4cd6..a89b2c58d848 100644 --- a/gluten-data/src/main/java/io/glutenproject/vectorized/NativeColumnarToRowJniWrapper.java +++ b/gluten-data/src/main/java/io/glutenproject/vectorized/NativeColumnarToRowJniWrapper.java @@ -24,10 +24,11 @@ public class NativeColumnarToRowJniWrapper extends JniInitialized { public NativeColumnarToRowJniWrapper() throws IOException {} - public native long nativeColumnarToRowInit(long memoryManagerId) throws RuntimeException; + public native long nativeColumnarToRowInit(long executionCtxHandle, long memoryManagerHandle) + throws RuntimeException; public native NativeColumnarToRowInfo nativeColumnarToRowConvert( - long batchHandle, long instanceId) throws RuntimeException; + long executionCtxHandle, long batchHandle, long c2rHandle) throws RuntimeException; - public native void nativeClose(long instanceID); + public native void nativeClose(long executionCtxHandle, long c2rHandle); } diff --git a/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java b/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java index 61dd08a4d361..b5d0427ecb7b 100644 --- a/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java +++ b/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java @@ -18,6 +18,7 @@ import io.glutenproject.GlutenConfig; import io.glutenproject.backendsapi.BackendsApiManager; +import io.glutenproject.exec.ExecutionCtxs; import io.glutenproject.memory.nmm.NativeMemoryManagers; import io.glutenproject.substrait.expression.ExpressionBuilder; import io.glutenproject.substrait.expression.StringMapNode; @@ -64,8 +65,9 @@ private PlanNode buildNativeConfNode(Map confs) { // return a columnar result iterator. public GeneralOutIterator createKernelWithBatchIterator( Plan wsPlan, List iterList) throws RuntimeException, IOException { + final long executionCtxHandle = ExecutionCtxs.contextInstance().getHandle(); final AtomicReference outIterator = new AtomicReference<>(); - final long memoryManagerId = + final long memoryManagerHandle = NativeMemoryManagers.create( "WholeStageIterator", (size) -> { @@ -80,16 +82,17 @@ public GeneralOutIterator createKernelWithBatchIterator( + "hasNext()/next()")); return instance.spill(size); }) - .getNativeInstanceId(); + .getNativeInstanceHandle(); final String spillDirPath = SparkDirectoryUtil.namespace("gluten-spill") .mkChildDirRoundRobin(UUID.randomUUID().toString()) .getAbsolutePath(); - long handle = + long iterHandle = jniWrapper.nativeCreateKernelWithIterator( - memoryManagerId, + executionCtxHandle, + memoryManagerHandle, getPlanBytesBuf(wsPlan), iterList.toArray(new GeneralInIterator[0]), TaskContext.get().stageId(), @@ -103,12 +106,13 @@ public GeneralOutIterator createKernelWithBatchIterator( SQLConf.get().getAllConfs())) .toProtobuf() .toByteArray()); - outIterator.set(createOutIterator(handle)); + outIterator.set(createOutIterator(executionCtxHandle, iterHandle)); return outIterator.get(); } - private ColumnarBatchOutIterator createOutIterator(long nativeHandle) throws IOException { - return new ColumnarBatchOutIterator(nativeHandle); + private ColumnarBatchOutIterator createOutIterator(long executionCtxHandle, long iterHandle) + throws IOException { + return new ColumnarBatchOutIterator(executionCtxHandle, iterHandle); } private byte[] getPlanBytesBuf(Plan planNode) { diff --git a/gluten-data/src/main/java/io/glutenproject/vectorized/NativeRowToColumnarJniWrapper.java b/gluten-data/src/main/java/io/glutenproject/vectorized/NativeRowToColumnarJniWrapper.java index 83763874a737..b50d7bf2133d 100644 --- a/gluten-data/src/main/java/io/glutenproject/vectorized/NativeRowToColumnarJniWrapper.java +++ b/gluten-data/src/main/java/io/glutenproject/vectorized/NativeRowToColumnarJniWrapper.java @@ -21,9 +21,10 @@ public class NativeRowToColumnarJniWrapper extends JniInitialized { public NativeRowToColumnarJniWrapper() {} - public native long init(long cSchema, long memoryManagerId); + public native long init(long cSchema, long executionCtxHandle, long memoryManagerHandle); - public native long nativeConvertRowToColumnar(long r2CId, long[] rowLength, long bufferAddress); + public native long nativeConvertRowToColumnar( + long executionCtxHandle, long r2cHandle, long[] rowLength, long bufferAddress); - public native void close(long r2cId); + public native void close(long executionCtxHandle, long r2cHandle); } diff --git a/gluten-data/src/main/java/io/glutenproject/vectorized/PlanEvaluatorJniWrapper.java b/gluten-data/src/main/java/io/glutenproject/vectorized/PlanEvaluatorJniWrapper.java index 18ce0802717e..1a66e4c3956f 100644 --- a/gluten-data/src/main/java/io/glutenproject/vectorized/PlanEvaluatorJniWrapper.java +++ b/gluten-data/src/main/java/io/glutenproject/vectorized/PlanEvaluatorJniWrapper.java @@ -40,11 +40,12 @@ public PlanEvaluatorJniWrapper() {} /** * Create a native compute kernel and return a columnar result iterator. * - * @param memoryManagerId NativeMemoryManager instance id + * @param memoryManagerHandle NativeMemoryManager instance handle * @return iterator instance id */ public native long nativeCreateKernelWithIterator( - long memoryManagerId, + long ctxHandle, + long memoryManagerHandle, byte[] wsPlan, GeneralInIterator[] batchItr, int stageId, diff --git a/gluten-data/src/main/java/io/glutenproject/vectorized/ShuffleReaderJniWrapper.java b/gluten-data/src/main/java/io/glutenproject/vectorized/ShuffleReaderJniWrapper.java index 02377c02b3ec..a89be49d0244 100644 --- a/gluten-data/src/main/java/io/glutenproject/vectorized/ShuffleReaderJniWrapper.java +++ b/gluten-data/src/main/java/io/glutenproject/vectorized/ShuffleReaderJniWrapper.java @@ -25,14 +25,17 @@ private ShuffleReaderJniWrapper() {} public native long make( long cSchema, - long memoryManagerId, + long executionCtxHandle, + long memoryManagerHandle, String compressionType, String compressionCodecBackend, String compressionMode); - public native long readStream(long handle, JniByteInputStream jniIn); + public native long readStream( + long executionCtxHandle, long shuffleReaderHandle, JniByteInputStream jniIn); - public native void populateMetrics(long handle, ShuffleReaderMetrics metrics); + public native void populateMetrics( + long executionCtxHandle, long shuffleReaderHandle, ShuffleReaderMetrics metrics); - public native void close(long handle); + public native void close(long executionCtxHandle, long shuffleReaderHandle); } diff --git a/gluten-data/src/main/java/io/glutenproject/vectorized/ShuffleWriterJniWrapper.java b/gluten-data/src/main/java/io/glutenproject/vectorized/ShuffleWriterJniWrapper.java index 7f7f19b8a157..92498983b920 100644 --- a/gluten-data/src/main/java/io/glutenproject/vectorized/ShuffleWriterJniWrapper.java +++ b/gluten-data/src/main/java/io/glutenproject/vectorized/ShuffleWriterJniWrapper.java @@ -35,8 +35,7 @@ public ShuffleWriterJniWrapper() {} * @param subDirsPerLocalDir SparkConf spark.diskStore.subDirectories * @param localDirs configured local directories where Spark can write files * @param preferEvict if true, write the partition buffer to disk once it is full - * @param memoryPoolId - * @return native shuffle writer instance id if created successfully. + * @return native shuffle writer instance handle if created successfully. */ public long make( NativePartitioning part, @@ -50,7 +49,8 @@ public long make( int subDirsPerLocalDir, String localDirs, boolean preferEvict, - long memoryManagerId, + long executionCtxHandle, + long memoryManagerHandle, boolean writeEOS, long handle, long taskAttemptId) { @@ -67,7 +67,8 @@ public long make( subDirsPerLocalDir, localDirs, preferEvict, - memoryManagerId, + executionCtxHandle, + memoryManagerHandle, writeEOS, handle, taskAttemptId, @@ -82,8 +83,7 @@ public long make( * @param part contains the partitioning parameter needed by native shuffle writer * @param bufferSize size of native buffers hold by partition writer * @param codec compression codec - * @param memoryPoolId - * @return native shuffle writer instance id if created successfully. + * @return native shuffle writer instance handle if created successfully. */ public long makeForRSS( NativePartitioning part, @@ -94,7 +94,8 @@ public long makeForRSS( String compressionMode, int pushBufferMaxSize, Object pusher, - long memoryManagerId, + long executionCtxHandle, + long memoryManagerHandle, long handle, long taskAttemptId, String partitionWriterType) { @@ -111,7 +112,8 @@ public long makeForRSS( 0, null, true, - memoryManagerId, + executionCtxHandle, + memoryManagerHandle, true, handle, taskAttemptId, @@ -133,7 +135,8 @@ public native long nativeMake( int subDirsPerLocalDir, String localDirs, boolean preferEvict, - long memoryManagerId, + long executionCtxHandle, + long memoryManagerHandle, boolean writeEOS, long handle, long taskAttemptId, @@ -144,39 +147,43 @@ public native long nativeMake( /** * Evict partition data. * - * @param shuffleWriterId shuffle writer instance id + * @param shuffleWriterHandle shuffle writer instance handle * @param size expected size to Evict (in bytes) * @param callBySelf whether the caller is the shuffle writer itself, true when running out of * off-heap memory due to allocations from the evaluator itself * @return actual spilled size */ - public native long nativeEvict(long shuffleWriterId, long size, boolean callBySelf) + public native long nativeEvict( + long executionCtxHandle, long shuffleWriterHandle, long size, boolean callBySelf) throws RuntimeException; /** * Split one record batch represented by bufAddrs and bufSizes into several batches. The batch is * split according to the first column as partition id. * - * @param shuffleWriterId shuffle writer instance id + * @param shuffleWriterHandle shuffle writer instance handle * @param numRows Rows per batch * @param handler handler of Velox Vector * @return batch bytes. */ - public native long split(long shuffleWriterId, int numRows, long handler) throws IOException; + public native long split( + long executionCtxHandle, long shuffleWriterHandle, int numRows, long handler) + throws IOException; /** * Write the data remained in the buffers hold by native shuffle writer to each partition's * temporary file. And stop processing splitting * - * @param shuffleWriterId shuffle writer instance id + * @param shuffleWriterHandle shuffle writer instance handle * @return GlutenSplitResult */ - public native GlutenSplitResult stop(long shuffleWriterId) throws IOException; + public native GlutenSplitResult stop(long executionCtxHandle, long shuffleWriterHandle) + throws IOException; /** * Release resources associated with designated shuffle writer instance. * - * @param shuffleWriterId shuffle writer instance id + * @param shuffleWriterHandle shuffle writer instance handle */ - public native void close(long shuffleWriterId); + public native void close(long executionCtxHandle, long shuffleWriterHandle); } diff --git a/gluten-data/src/main/scala/io/glutenproject/exec/ExecutionCtx.scala b/gluten-data/src/main/scala/io/glutenproject/exec/ExecutionCtx.scala new file mode 100644 index 000000000000..1edacb79fd95 --- /dev/null +++ b/gluten-data/src/main/scala/io/glutenproject/exec/ExecutionCtx.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.exec + +import io.glutenproject.init.BackendJniWrapper + +import org.apache.spark.util.TaskResource + +class ExecutionCtx extends TaskResource { + private val handle = BackendJniWrapper.createExecutionCtx() + + def getHandle: Long = handle + + override def release(): Unit = BackendJniWrapper.releaseExecutionCtx(handle) + + override def priority(): Int = 10 + + override def resourceName(): String = s"ExecutionCtx_" + handle +} diff --git a/gluten-data/src/main/scala/io/glutenproject/exec/ExecutionCtxs.scala b/gluten-data/src/main/scala/io/glutenproject/exec/ExecutionCtxs.scala new file mode 100644 index 000000000000..27220f67d18f --- /dev/null +++ b/gluten-data/src/main/scala/io/glutenproject/exec/ExecutionCtxs.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.exec + +import org.apache.spark.util.TaskResources + +object ExecutionCtxs { + private val EXECUTION_CTX_NAME = "ExecutionCtx" + + /** Get or create the execution ctx which bound with Spark TaskContext. */ + def contextInstance(): ExecutionCtx = { + if (!TaskResources.inSparkTask()) { + throw new IllegalStateException("This method must be called in a Spark task.") + } + + TaskResources.addResourceIfNotRegistered(EXECUTION_CTX_NAME, () => createExecutionCtx()) + } + + /** Create a temporary execution ctx, caller must invoke ExecutionCtx#release manually. */ + def tmpInstance(): ExecutionCtx = { + createExecutionCtx() + } + + private def createExecutionCtx(): ExecutionCtx = { + new ExecutionCtx + } +} diff --git a/gluten-data/src/main/scala/io/glutenproject/vectorized/ColumnarBatchSerializer.scala b/gluten-data/src/main/scala/io/glutenproject/vectorized/ColumnarBatchSerializer.scala index 86500d4fc402..a1b9166728c3 100644 --- a/gluten-data/src/main/scala/io/glutenproject/vectorized/ColumnarBatchSerializer.scala +++ b/gluten-data/src/main/scala/io/glutenproject/vectorized/ColumnarBatchSerializer.scala @@ -17,6 +17,7 @@ package io.glutenproject.vectorized import io.glutenproject.GlutenConfig +import io.glutenproject.exec.ExecutionCtxs import io.glutenproject.memory.arrowalloc.ArrowBufferAllocators import io.glutenproject.memory.nmm.NativeMemoryManagers import io.glutenproject.utils.ArrowAbiUtil @@ -67,7 +68,8 @@ private class ColumnarBatchSerializerInstance( extends SerializerInstance with Logging { - private lazy val shuffleReaderHandle = { + private lazy val (executionCtxHandle, shuffleReaderHandle) = { + val executionCtxHandle = ExecutionCtxs.contextInstance().getHandle val allocator: BufferAllocator = ArrowBufferAllocators .contextInstance() .newChildAllocator("GlutenColumnarBatch deserialize", 0, Long.MaxValue) @@ -84,9 +86,10 @@ private class ColumnarBatchSerializerInstance( } val compressionCodecBackend = GlutenConfig.getConf.columnarShuffleCodecBackend.orNull - val handle = ShuffleReaderJniWrapper.INSTANCE.make( + val shuffleReaderHandle = ShuffleReaderJniWrapper.INSTANCE.make( cSchema.memoryAddress(), - NativeMemoryManagers.contextInstance("ShuffleReader").getNativeInstanceId, + executionCtxHandle, + NativeMemoryManagers.contextInstance("ShuffleReader").getNativeInstanceHandle, compressionCodec, compressionCodecBackend, GlutenConfig.getConf.columnarShuffleCompressionMode @@ -95,24 +98,29 @@ private class ColumnarBatchSerializerInstance( // since the native reader could hold a reference to memory pool that // was used to create all buffers read from shuffle reader. The pool // should keep alive before all buffers finish consuming. - TaskResources.addRecycler(s"ShuffleReaderHandle_$handle", 50) { + TaskResources.addRecycler(s"ShuffleReaderHandle_$shuffleReaderHandle", 50) { // Collect Metrics val readerMetrics = new ShuffleReaderMetrics() - ShuffleReaderJniWrapper.INSTANCE.populateMetrics(handle, readerMetrics) + ShuffleReaderJniWrapper.INSTANCE.populateMetrics( + executionCtxHandle, + shuffleReaderHandle, + readerMetrics) decompressTime += readerMetrics.getDecompressTime cSchema.close() - ShuffleReaderJniWrapper.INSTANCE.close(handle) + ShuffleReaderJniWrapper.INSTANCE.close(executionCtxHandle, shuffleReaderHandle) allocator.close() } - handle + (executionCtxHandle, shuffleReaderHandle) } override def deserializeStream(in: InputStream): DeserializationStream = { new DeserializationStream { private lazy val byteIn: JniByteInputStream = JniByteInputStreams.create(in) private lazy val wrappedOut: GeneralOutIterator = new ColumnarBatchOutIterator( - ShuffleReaderJniWrapper.INSTANCE.readStream(shuffleReaderHandle, byteIn)) + executionCtxHandle, + ShuffleReaderJniWrapper.INSTANCE + .readStream(executionCtxHandle, shuffleReaderHandle, byteIn)) private var cb: ColumnarBatch = _ diff --git a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala index 49ea78c2bed2..f391b78b2899 100644 --- a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala +++ b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala @@ -18,6 +18,7 @@ package org.apache.spark.shuffle import io.glutenproject.GlutenConfig import io.glutenproject.columnarbatch.ColumnarBatches +import io.glutenproject.exec.ExecutionCtxs import io.glutenproject.memory.memtarget.spark.Spiller import io.glutenproject.memory.nmm.NativeMemoryManagers import io.glutenproject.vectorized._ @@ -78,6 +79,8 @@ class ColumnarShuffleWriter[K, V]( private val writeEOS = GlutenConfig.getConf.columnarShuffleWriteEOS + private lazy val executionCtxHandle: Long = ExecutionCtxs.contextInstance().getHandle + private val jniWrapper = new ShuffleWriterJniWrapper private var nativeShuffleWriter: Long = -1L @@ -132,6 +135,7 @@ class ColumnarShuffleWriter[K, V]( blockManager.subDirsPerLocalDir, localDirs, preferSpill, + executionCtxHandle, NativeMemoryManagers .create( "ShuffleWriter", @@ -145,20 +149,21 @@ class ColumnarShuffleWriter[K, V]( } logInfo(s"Gluten shuffle writer: Trying to spill $size bytes of data") // fixme pass true when being called by self - val spilled = jniWrapper.nativeEvict(nativeShuffleWriter, size, false) + val spilled = + jniWrapper.nativeEvict(executionCtxHandle, nativeShuffleWriter, size, false) logInfo(s"Gluten shuffle writer: Spilled $spilled / $size bytes of data") spilled } } ) - .getNativeInstanceId, + .getNativeInstanceHandle, writeEOS, handle, taskContext.taskAttemptId() ) } val startTime = System.nanoTime() - val bytes = jniWrapper.split(nativeShuffleWriter, rows, handle) + val bytes = jniWrapper.split(executionCtxHandle, nativeShuffleWriter, rows, handle) dep.metrics("dataSize").add(bytes) dep.metrics("splitTime").add(System.nanoTime() - startTime) dep.metrics("numInputRows").add(rows) @@ -171,7 +176,7 @@ class ColumnarShuffleWriter[K, V]( val startTime = System.nanoTime() if (nativeShuffleWriter != -1L) { - splitResult = jniWrapper.stop(nativeShuffleWriter) + splitResult = jniWrapper.stop(executionCtxHandle, nativeShuffleWriter) closeShuffleWriter } @@ -216,7 +221,7 @@ class ColumnarShuffleWriter[K, V]( } private def closeShuffleWriter(): Unit = { - jniWrapper.close(nativeShuffleWriter) + jniWrapper.close(executionCtxHandle, nativeShuffleWriter) nativeShuffleWriter = -1L } diff --git a/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala b/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala index 85c4467fcd44..20f6653c1933 100644 --- a/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala +++ b/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution import io.glutenproject.columnarbatch.ColumnarBatches +import io.glutenproject.exec.ExecutionCtxs import io.glutenproject.execution.BroadCastHashJoinContext import io.glutenproject.memory.arrowalloc.ArrowBufferAllocators import io.glutenproject.memory.nmm.NativeMemoryManagers @@ -48,6 +49,7 @@ case class ColumnarBuildSideRelation( var batchId = 0 var closed = false private var finalBatch = -1L + val executionCtxHandle = ExecutionCtxs.contextInstance().getHandle val serializeHandle: Long = { val allocator = ArrowBufferAllocators.contextInstance() val cSchema = ArrowSchema.allocateNew(allocator) @@ -57,21 +59,22 @@ case class ColumnarBuildSideRelation( ArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema) val handle = ColumnarBatchSerializerJniWrapper.INSTANCE.init( cSchema.memoryAddress(), + executionCtxHandle, NativeMemoryManagers .contextInstance("BuildSideRelation#BatchSerializer") - .getNativeInstanceId) + .getNativeInstanceHandle) cSchema.close() handle } TaskResources.addRecycler(s"BuildSideRelation_deserialized_$serializeHandle", 50) { - ColumnarBatchSerializerJniWrapper.INSTANCE.close(serializeHandle) + ColumnarBatchSerializerJniWrapper.INSTANCE.close(executionCtxHandle, serializeHandle) } override def hasNext: Boolean = { val has = batchId < batches.length if (!has && !closed) { - ColumnarBatches.close(finalBatch) + ColumnarBatches.close(executionCtxHandle, finalBatch) closed = true } has @@ -79,12 +82,15 @@ case class ColumnarBuildSideRelation( override def next: ColumnarBatch = { val handle = - ColumnarBatchSerializerJniWrapper.INSTANCE.deserialize(serializeHandle, batches(batchId)) + ColumnarBatchSerializerJniWrapper.INSTANCE.deserialize( + executionCtxHandle, + serializeHandle, + batches(batchId)) if (batchId == batches.length - 1) { finalBatch = handle } batchId += 1 - ColumnarBatches.create(handle) + ColumnarBatches.create(executionCtxHandle, handle) } } } @@ -98,6 +104,7 @@ case class ColumnarBuildSideRelation( */ override def transform(key: Expression): Array[InternalRow] = { // convert batches: Array[Array[Byte]] to Array[InternalRow] by key and distinct. + val executionCtx = ExecutionCtxs.tmpInstance() val nativeMemoryManager = NativeMemoryManagers.tmpInstance("BuildSideRelation#transform") val serializeHandle = { val allocator = ArrowBufferAllocators.globalInstance() @@ -108,7 +115,8 @@ case class ColumnarBuildSideRelation( ArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema) val handle = ColumnarBatchSerializerJniWrapper.INSTANCE.init( cSchema.memoryAddress(), - nativeMemoryManager.getNativeInstanceId) + executionCtx.getHandle, + nativeMemoryManager.getNativeInstanceHandle) cSchema.close() handle } @@ -117,15 +125,20 @@ case class ColumnarBuildSideRelation( // Convert columnar to Row. val jniWrapper = new NativeColumnarToRowJniWrapper() - val c2rId = jniWrapper.nativeColumnarToRowInit(nativeMemoryManager.getNativeInstanceId) + val c2rId = jniWrapper.nativeColumnarToRowInit( + executionCtx.getHandle, + nativeMemoryManager.getNativeInstanceHandle) var batchId = 0 val iterator = if (batches.length > 0) { val res: Iterator[Iterator[InternalRow]] = new Iterator[Iterator[InternalRow]] { override def hasNext: Boolean = { val itHasNext = batchId < batches.length if (!itHasNext && !closed) { - jniWrapper.nativeClose(c2rId) - ColumnarBatchSerializerJniWrapper.INSTANCE.close(serializeHandle) + jniWrapper.nativeClose(executionCtx.getHandle, c2rId) + ColumnarBatchSerializerJniWrapper.INSTANCE.close( + executionCtx.getHandle, + serializeHandle) + executionCtx.release() nativeMemoryManager.release() closed = true } @@ -136,8 +149,11 @@ case class ColumnarBuildSideRelation( val batchBytes = batches(batchId) batchId += 1 val batchHandle = - ColumnarBatchSerializerJniWrapper.INSTANCE.deserialize(serializeHandle, batchBytes) - val batch = ColumnarBatches.create(batchHandle) + ColumnarBatchSerializerJniWrapper.INSTANCE.deserialize( + executionCtx.getHandle, + serializeHandle, + batchBytes) + val batch = ColumnarBatches.create(executionCtx.getHandle, batchHandle) if (batch.numRows == 0) { batch.close() Iterator.empty @@ -148,7 +164,8 @@ case class ColumnarBuildSideRelation( } else { val cols = batch.numCols() val rows = batch.numRows() - val info = jniWrapper.nativeColumnarToRowConvert(batchHandle, c2rId) + val info = + jniWrapper.nativeColumnarToRowConvert(executionCtx.getHandle, batchHandle, c2rId) batch.close() val columnNames = key.flatMap { case expression: AttributeReference => diff --git a/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala b/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala index 5d374541d4b1..1b2a97cef73d 100644 --- a/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala +++ b/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.utils import io.glutenproject.columnarbatch.ColumnarBatches +import io.glutenproject.exec.ExecutionCtxs import io.glutenproject.memory.arrowalloc.ArrowBufferAllocators import io.glutenproject.memory.nmm.NativeMemoryManagers import io.glutenproject.vectorized.{ArrowWritableColumnVector, NativeColumnarToRowInfo, NativeColumnarToRowJniWrapper, NativePartitioning} @@ -41,23 +42,25 @@ import org.apache.spark.util.{MutablePair, TaskResources} object ExecUtil { def convertColumnarToRow(batch: ColumnarBatch): Iterator[InternalRow] = { + val executionCtxHandle = ExecutionCtxs.contextInstance().getHandle val jniWrapper = new NativeColumnarToRowJniWrapper() var info: NativeColumnarToRowInfo = null val batchHandle = ColumnarBatches.getNativeHandle(batch) - val instanceId = jniWrapper.nativeColumnarToRowInit( + val c2rHandle = jniWrapper.nativeColumnarToRowInit( + executionCtxHandle, NativeMemoryManagers .contextInstance("ExecUtil#ColumnarToRow") - .getNativeInstanceId) - info = jniWrapper.nativeColumnarToRowConvert(batchHandle, instanceId) + .getNativeInstanceHandle) + info = jniWrapper.nativeColumnarToRowConvert(executionCtxHandle, batchHandle, c2rHandle) new Iterator[InternalRow] { var rowId = 0 val row = new UnsafeRow(batch.numCols()) var closed = false - TaskResources.addRecycler(s"ColumnarToRow_$instanceId", 100) { + TaskResources.addRecycler(s"ColumnarToRow_$c2rHandle", 100) { if (!closed) { - jniWrapper.nativeClose(instanceId) + jniWrapper.nativeClose(executionCtxHandle, c2rHandle) closed = true } } @@ -65,7 +68,7 @@ object ExecUtil { override def hasNext: Boolean = { val result = rowId < batch.numRows() if (!result && !closed) { - jniWrapper.nativeClose(instanceId) + jniWrapper.nativeClose(executionCtxHandle, c2rHandle) closed = true } result @@ -144,7 +147,7 @@ object ExecUtil { ArrowBufferAllocators.contextInstance(), new ColumnarBatch(Array[ColumnVector](pidVec), cb.numRows)) val newHandle = ColumnarBatches.compose(pidBatch, cb) - (0, ColumnarBatches.create(newHandle)) + (0, ColumnarBatches.create(ColumnarBatches.getExecutionCtxHandle(cb), newHandle)) } } }