Skip to content

Commit

Permalink
[GLUTEN-3077][VL] part-2: Use ExecutionCtx to manage resources' lifec…
Browse files Browse the repository at this point in the history
…ycle
  • Loading branch information
Yohahaha authored Sep 15, 2023
1 parent ec71a19 commit d5f6f6e
Show file tree
Hide file tree
Showing 43 changed files with 1,050 additions and 500 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,26 @@ public class DatasourceJniWrapper extends JniInitialized {
public DatasourceJniWrapper() throws IOException {}

public long nativeInitDatasource(
String filePath, long cSchema, long memoryManagerId, Map<String, String> options) {
return nativeInitDatasource(filePath, cSchema, memoryManagerId, JniUtils.toNativeConf(options));
String filePath,
long cSchema,
long executionCtxHandle,
long memoryManagerHandle,
Map<String, String> options) {
return nativeInitDatasource(
filePath, cSchema, executionCtxHandle, memoryManagerHandle, JniUtils.toNativeConf(options));
}

public native long nativeInitDatasource(
String filePath, long cSchema, long memoryManagerId, byte[] options);
String filePath,
long cSchema,
long executionCtxHandle,
long memoryManagerHandle,
byte[] options);

public native void inspectSchema(long instanceId, long cSchemaAddress);
public native void inspectSchema(long executionCtxHandle, long dsHandle, long cSchemaAddress);

public native void close(long instanceId);
public native void close(long executionCtxHandle, long dsHandle);

public native void write(long instanceId, VeloxColumnarBatchIterator iterator);
public native void write(
long executionCtxHandle, long dsHandle, VeloxColumnarBatchIterator iterator);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package io.glutenproject.backendsapi.velox
import io.glutenproject.GlutenConfig
import io.glutenproject.backendsapi.SparkPlanExecApi
import io.glutenproject.columnarbatch.ColumnarBatches
import io.glutenproject.exec.ExecutionCtxs
import io.glutenproject.execution._
import io.glutenproject.expression._
import io.glutenproject.expression.ConverterUtils.FunctionConfig
Expand Down Expand Up @@ -288,10 +289,11 @@ class SparkPlanExecHandler extends SparkPlanExecApi {
} else {
val handleArray = input.map(ColumnarBatches.getNativeHandle).toArray
val serializeResult = ColumnarBatchSerializerJniWrapper.INSTANCE.serialize(
ExecutionCtxs.contextInstance().getHandle,
handleArray,
NativeMemoryManagers
.contextInstance("BroadcastRelation")
.getNativeInstanceId)
.getNativeInstanceHandle)
input.foreach(ColumnarBatches.release)
Iterator((serializeResult.getNumRows, serializeResult.getSerialized))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package io.glutenproject.execution

import io.glutenproject.backendsapi.velox.Validator
import io.glutenproject.columnarbatch.ColumnarBatches
import io.glutenproject.exec.ExecutionCtxs
import io.glutenproject.memory.arrowalloc.ArrowBufferAllocators
import io.glutenproject.memory.nmm.NativeMemoryManagers
import io.glutenproject.utils.ArrowAbiUtil
Expand Down Expand Up @@ -63,25 +64,27 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends RowToColumnarExecBas
} else {
val arrowSchema =
SparkArrowUtil.toArrowSchema(localSchema, SQLConf.get.sessionLocalTimeZone)
val executionCtxHandle = ExecutionCtxs.contextInstance().getHandle
val jniWrapper = new NativeRowToColumnarJniWrapper()
val allocator = ArrowBufferAllocators.contextInstance()
val cSchema = ArrowSchema.allocateNew(allocator)
var closed = false
val r2cId =
val r2cHandle =
try {
ArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema)
jniWrapper.init(
cSchema.memoryAddress(),
executionCtxHandle,
NativeMemoryManagers
.contextInstance("RowToColumnar")
.getNativeInstanceId)
.getNativeInstanceHandle)
} finally {
cSchema.close()
}

TaskResources.addRecycler(s"RowToColumnar_$r2cId", 100) {
TaskResources.addRecycler(s"RowToColumnar_$r2cHandle", 100) {
if (!closed) {
jniWrapper.close(r2cId)
jniWrapper.close(executionCtxHandle, r2cHandle)
closed = true
}
}
Expand All @@ -91,7 +94,7 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends RowToColumnarExecBas
override def hasNext: Boolean = {
val itHasNext = rowIterator.hasNext
if (!itHasNext && !closed) {
jniWrapper.close(r2cId)
jniWrapper.close(executionCtxHandle, r2cHandle)
closed = true
}
itHasNext
Expand Down Expand Up @@ -150,8 +153,12 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends RowToColumnarExecBas
numInputRows += rowCount
try {
val handle = jniWrapper
.nativeConvertRowToColumnar(r2cId, rowLength.toArray, arrowBuf.memoryAddress())
ColumnarBatches.create(handle)
.nativeConvertRowToColumnar(
executionCtxHandle,
r2cHandle,
rowLength.toArray,
arrowBuf.memoryAddress())
ColumnarBatches.create(executionCtxHandle, handle)
} finally {
arrowBuf.close()
arrowBuf = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package io.glutenproject.utils

import io.glutenproject.exec.ExecutionCtxs
import io.glutenproject.memory.arrowalloc.ArrowBufferAllocators
import io.glutenproject.memory.nmm.NativeMemoryManagers
import io.glutenproject.spark.sql.execution.datasources.velox.DatasourceJniWrapper
Expand All @@ -37,20 +38,23 @@ object DatasourceUtil {
}

def readSchema(file: FileStatus): Option[StructType] = {
val executionCtxHandle = ExecutionCtxs.contextInstance().getHandle
val allocator = ArrowBufferAllocators.contextInstance()
val datasourceJniWrapper = new DatasourceJniWrapper()
val instanceId = datasourceJniWrapper.nativeInitDatasource(
val dsHandle = datasourceJniWrapper.nativeInitDatasource(
file.getPath.toString,
-1,
NativeMemoryManagers.contextInstance("VeloxWriter").getNativeInstanceId,
new util.HashMap[String, String]())
executionCtxHandle,
NativeMemoryManagers.contextInstance("VeloxWriter").getNativeInstanceHandle,
new util.HashMap[String, String]()
)
val cSchema = ArrowSchema.allocateNew(allocator)
datasourceJniWrapper.inspectSchema(instanceId, cSchema.memoryAddress())
datasourceJniWrapper.inspectSchema(executionCtxHandle, dsHandle, cSchema.memoryAddress())
try {
Option(SparkSchemaUtil.fromArrowSchema(ArrowAbiUtil.importToSchema(allocator, cSchema)))
} finally {
cSchema.close()
datasourceJniWrapper.close(instanceId)
datasourceJniWrapper.close(executionCtxHandle, dsHandle)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.sql.execution

import io.glutenproject.columnarbatch.ColumnarBatches
import io.glutenproject.exec.ExecutionCtxs
import io.glutenproject.execution.ColumnarToRowExecBase
import io.glutenproject.extension.ValidationResult
import io.glutenproject.memory.nmm.NativeMemoryManagers
Expand Down Expand Up @@ -104,15 +105,17 @@ class ColumnarToRowRDD(
if (batches.isEmpty) {
Iterator.empty
} else {
val executionCtxHandle = ExecutionCtxs.contextInstance().getHandle
// TODO:: pass the jni jniWrapper and arrowSchema and serializeSchema method by broadcast
val jniWrapper = new NativeColumnarToRowJniWrapper()
var closed = false
val c2rId = jniWrapper.nativeColumnarToRowInit(
NativeMemoryManagers.contextInstance("ColumnarToRow").getNativeInstanceId)
executionCtxHandle,
NativeMemoryManagers.contextInstance("ColumnarToRow").getNativeInstanceHandle)

TaskResources.addRecycler(s"ColumnarToRow_$c2rId", 100) {
if (!closed) {
jniWrapper.nativeClose(c2rId)
jniWrapper.nativeClose(executionCtxHandle, c2rId)
closed = true
}
}
Expand All @@ -122,7 +125,7 @@ class ColumnarToRowRDD(
override def hasNext: Boolean = {
val hasNext = batches.hasNext
if (!hasNext && !closed) {
jniWrapper.nativeClose(c2rId)
jniWrapper.nativeClose(executionCtxHandle, c2rId)
closed = true
}
hasNext
Expand Down Expand Up @@ -156,7 +159,8 @@ class ColumnarToRowRDD(
val rows = batch.numRows()
val beforeConvert = System.currentTimeMillis()
val batchHandle = ColumnarBatches.getNativeHandle(batch)
val info = jniWrapper.nativeColumnarToRowConvert(batchHandle, c2rId)
val info =
jniWrapper.nativeColumnarToRowConvert(executionCtxHandle, batchHandle, c2rId)

convertTime += (System.currentTimeMillis() - beforeConvert)
// batch.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import java.util.concurrent.atomic.AtomicReference
import java.util.regex.Pattern

class VeloxWriteQueue(
instanceId: Long,
executionCtxHandle: Long,
dsHandle: Long,
schema: Schema,
allocator: BufferAllocator,
datasourceJniWrapper: DatasourceJniWrapper,
Expand All @@ -41,7 +42,7 @@ class VeloxWriteQueue(
private val writeThread = new Thread(
() => {
try {
datasourceJniWrapper.write(instanceId, scanner)
datasourceJniWrapper.write(executionCtxHandle, dsHandle, scanner)
} catch {
case e: Throwable =>
writeException.set(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.velox

import io.glutenproject.columnarbatch.ColumnarBatches
import io.glutenproject.exception.GlutenException
import io.glutenproject.exec.ExecutionCtxs
import io.glutenproject.execution.datasource.GlutenRowSplitter
import io.glutenproject.memory.arrowalloc.ArrowBufferAllocators
import io.glutenproject.memory.nmm.NativeMemoryManagers
Expand Down Expand Up @@ -50,15 +51,17 @@ trait VeloxFormatWriterInjects extends GlutenFormatWriterInjectsBase {
val arrowSchema =
SparkArrowUtil.toArrowSchema(dataSchema, SQLConf.get.sessionLocalTimeZone)
val cSchema = ArrowSchema.allocateNew(ArrowBufferAllocators.contextInstance())
var instanceId = -1L
var dsHandle = -1L
val datasourceJniWrapper = new DatasourceJniWrapper()
val allocator = ArrowBufferAllocators.contextInstance()
val executionCtxHandle = ExecutionCtxs.contextInstance().getHandle
try {
ArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema)
instanceId = datasourceJniWrapper.nativeInitDatasource(
dsHandle = datasourceJniWrapper.nativeInitDatasource(
originPath,
cSchema.memoryAddress(),
NativeMemoryManagers.contextInstance("VeloxWriter").getNativeInstanceId,
executionCtxHandle,
NativeMemoryManagers.contextInstance("VeloxWriter").getNativeInstanceHandle,
nativeConf)
} catch {
case e: IOException =>
Expand All @@ -68,7 +71,13 @@ trait VeloxFormatWriterInjects extends GlutenFormatWriterInjectsBase {
}

val writeQueue =
new VeloxWriteQueue(instanceId, arrowSchema, allocator, datasourceJniWrapper, originPath)
new VeloxWriteQueue(
executionCtxHandle,
dsHandle,
arrowSchema,
allocator,
datasourceJniWrapper,
originPath)

new OutputWriter {
override def write(row: InternalRow): Unit = {
Expand All @@ -80,7 +89,7 @@ trait VeloxFormatWriterInjects extends GlutenFormatWriterInjectsBase {

override def close(): Unit = {
writeQueue.close()
datasourceJniWrapper.close(instanceId)
datasourceJniWrapper.close(executionCtxHandle, dsHandle)
}

// Do NOT add override keyword for compatibility on spark 3.1.
Expand Down
2 changes: 0 additions & 2 deletions cpp/core/benchmarks/CompressionBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,11 @@
#include <arrow/record_batch.h>
#include <arrow/type.h>
#include <arrow/type_fwd.h>
#include <arrow/util/io_util.h>
#include <benchmark/benchmark.h>
#include <execinfo.h>
#include <parquet/arrow/reader.h>
#include <parquet/file_reader.h>
#include <sched.h>
#include <sys/mman.h>

#include <chrono>
#include <iostream>
Expand Down
6 changes: 5 additions & 1 deletion cpp/core/compute/ExecutionCtx.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,12 @@ void setExecutionCtxFactory(ExecutionCtxFactory factory) {
#endif
}

std::shared_ptr<ExecutionCtx> createExecutionCtx() {
ExecutionCtx* createExecutionCtx() {
return getExecutionCtxFactoryContext()->create();
}

void releaseExecutionCtx(ExecutionCtx* executionCtx) {
delete executionCtx;
}

} // namespace gluten
Loading

0 comments on commit d5f6f6e

Please sign in to comment.