Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Make ExpressionEvaluatorJniWrapper static
Browse files Browse the repository at this point in the history
baibaichen committed Jul 17, 2024
1 parent 9189c9e commit 4837d46
Showing 8 changed files with 29 additions and 53 deletions.
Original file line number Diff line number Diff line change
@@ -36,15 +36,12 @@
import scala.Tuple2;
import scala.collection.JavaConverters;

public class CHNativeExpressionEvaluator {
private final ExpressionEvaluatorJniWrapper jniWrapper;
public class CHNativeExpressionEvaluator extends ExpressionEvaluatorJniWrapper {

public CHNativeExpressionEvaluator() {
jniWrapper = new ExpressionEvaluatorJniWrapper();
}
private CHNativeExpressionEvaluator() {}

// Used to initialize the native computing.
public void initNative(SparkConf conf) {
public static void initNative(SparkConf conf) {
Tuple2<String, String>[] all = conf.getAll();
Map<String, String> confMap =
Arrays.stream(all).collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
@@ -55,19 +52,19 @@ public void initNative(SparkConf conf) {
// Get the customer config from SparkConf for each backend
BackendsApiManager.getTransformerApiInstance().postProcessNativeConfig(nativeConfMap, prefix);

jniWrapper.nativeInitNative(buildNativeConf(nativeConfMap));
nativeInitNative(buildNativeConf(nativeConfMap));
}

public void finalizeNative() {
jniWrapper.nativeFinalizeNative();
public static void finalizeNative() {
nativeFinalizeNative();
}

// Used to validate the Substrait plan in native compute engine.
public boolean doValidate(byte[] subPlan) {
return jniWrapper.nativeDoValidate(subPlan);
public static boolean doValidate(byte[] subPlan) {
throw new UnsupportedOperationException("doValidate is not supported in Clickhouse Backend");
}

private byte[] buildNativeConf(Map<String, String> confs) {
private static byte[] buildNativeConf(Map<String, String> confs) {
StringMapNode stringMapNode = ExpressionBuilder.makeStringMap(confs);
AdvancedExtensionNode extensionNode =
ExtensionBuilder.makeAdvancedExtension(
@@ -76,22 +73,21 @@ private byte[] buildNativeConf(Map<String, String> confs) {
return PlanBuilder.makePlan(extensionNode).toProtobuf().toByteArray();
}

private Map<String, String> getNativeBackendConf() {
private static Map<String, String> getNativeBackendConf() {
return GlutenConfig.getNativeBackendConf(
BackendsApiManager.getSettings().getBackendConfigPrefix(), SQLConf.get().getAllConfs());
}

// Used by WholeStageTransform to create the native computing pipeline and
// return a columnar result iterator.
public BatchIterator createKernelWithBatchIterator(
public static BatchIterator createKernelWithBatchIterator(
byte[] wsPlan,
byte[][] splitInfo,
List<GeneralInIterator> iterList,
boolean materializeInput) {
long allocId = CHNativeMemoryAllocators.contextInstance().getNativeInstanceId();
long handle =
jniWrapper.nativeCreateKernelWithIterator(
allocId,
nativeCreateKernelWithIterator(
CHNativeMemoryAllocators.contextInstance().getNativeInstanceId(),
wsPlan,
splitInfo,
iterList.toArray(new GeneralInIterator[0]),
@@ -101,10 +97,10 @@ public BatchIterator createKernelWithBatchIterator(
}

// Only for UT.
public BatchIterator createKernelWithBatchIterator(
public static BatchIterator createKernelWithBatchIterator(
long allocId, byte[] wsPlan, byte[][] splitInfo, List<GeneralInIterator> iterList) {
long handle =
jniWrapper.nativeCreateKernelWithIterator(
nativeCreateKernelWithIterator(
allocId,
wsPlan,
splitInfo,
@@ -114,7 +110,7 @@ public BatchIterator createKernelWithBatchIterator(
return createBatchIterator(handle);
}

private BatchIterator createBatchIterator(long nativeHandle) {
private static BatchIterator createBatchIterator(long nativeHandle) {
return new BatchIterator(nativeHandle);
}
}
Original file line number Diff line number Diff line change
@@ -24,37 +24,22 @@
public class ExpressionEvaluatorJniWrapper {

/** Call initNative to initialize native computing. */
native void nativeInitNative(byte[] confAsPlan);
static native void nativeInitNative(byte[] confAsPlan);

/** Call finalizeNative to finalize native computing. */
native void nativeFinalizeNative();

/**
* Validate the Substrait plan in native compute engine.
*
* @param subPlan the Substrait plan in binary format.
* @return whether the computing of this plan is supported in native.
*/
native boolean nativeDoValidate(byte[] subPlan);
static native void nativeFinalizeNative();

/**
* Create a native compute kernel and return a columnar result iterator.
*
* @param allocatorId allocator id
* @return iterator instance id
*/
public native long nativeCreateKernelWithIterator(
public static native long nativeCreateKernelWithIterator(
long allocatorId,
byte[] wsPlan,
byte[][] splitInfo,
GeneralInIterator[] batchItr,
byte[] confArray,
boolean materializeInput);

/**
* Closes the projector referenced by nativeHandler.
*
* @param nativeHandler nativeHandler that needs to be closed
*/
native void nativeClose(long nativeHandler);
}
Original file line number Diff line number Diff line change
@@ -76,7 +76,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
}
.map(it => new ColumnarNativeIterator(it.asJava).asInstanceOf[GeneralInIterator])
.asJava
new CHNativeExpressionEvaluator().createKernelWithBatchIterator(
CHNativeExpressionEvaluator.createKernelWithBatchIterator(
wsPlan,
splitInfoByteArray,
listIterator,
Original file line number Diff line number Diff line change
@@ -83,7 +83,7 @@ class CHListenerApi extends ListenerApi with Logging {
val externalSortKey = s"${CHBackendSettings.getBackendConfigPrefix}.runtime_settings" +
s".max_bytes_before_external_sort"
if (conf.getLong(externalSortKey, -1) < 0) {
if (conf.getBoolean("spark.memory.offHeap.enabled", false)) {
if (conf.getBoolean("spark.memory.offHeap.enabled", defaultValue = false)) {
val memSize = JavaUtils.byteStringAsBytes(conf.get("spark.memory.offHeap.size"))
if (memSize > 0L) {
val cores = conf.getInt("spark.executor.cores", 1).toLong
@@ -97,8 +97,7 @@ class CHListenerApi extends ListenerApi with Logging {
// Load supported hive/python/scala udfs
UDFMappings.loadFromSparkConf(conf)

val initKernel = new CHNativeExpressionEvaluator()
initKernel.initNative(conf)
CHNativeExpressionEvaluator.initNative(conf)

// inject backend-specific implementations to override spark classes
// FIXME: The following set instances twice in local mode?
@@ -110,7 +109,6 @@ class CHListenerApi extends ListenerApi with Logging {

private def shutdown(): Unit = {
CHBroadcastBuildSideCache.cleanAll()
val kernel = new CHNativeExpressionEvaluator()
kernel.finalizeNative()
CHNativeExpressionEvaluator.finalizeNative()
}
}
Original file line number Diff line number Diff line change
@@ -38,8 +38,7 @@ import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
class CHValidatorApi extends ValidatorApi with AdaptiveSparkPlanHelper with Logging {

override def doNativeValidateWithFailureReason(plan: PlanNode): NativePlanValidationInfo = {
val validator = new CHNativeExpressionEvaluator()
if (validator.doValidate(plan.toProtobuf.toByteArray)) {
if (CHNativeExpressionEvaluator.doValidate(plan.toProtobuf.toByteArray)) {
new NativePlanValidationInfo(1, "")
} else {
new NativePlanValidationInfo(0, "CH native check failed.")
Original file line number Diff line number Diff line change
@@ -49,9 +49,8 @@ class NativeFileScanColumnarRDD(

val resIter = GlutenTimeMetric.millis(scanTime) {
_ =>
val transKernel = new CHNativeExpressionEvaluator()
val inBatchIters = new util.ArrayList[GeneralInIterator]()
transKernel.createKernelWithBatchIterator(
CHNativeExpressionEvaluator.createKernelWithBatchIterator(
inputPartition.plan,
splitInfoByteArray,
inBatchIters,
Original file line number Diff line number Diff line change
@@ -45,9 +45,8 @@ object GlutenClickHouseMetricsUTUtils {
SubstraitPlanPrinterUtil.jsonToSubstraitPlan(
substraitPlanJsonStr.replaceAll("basePath", basePath.substring(1)))

val transKernel = new CHNativeExpressionEvaluator()
val mockMemoryAllocator = CHNativeMemoryAllocators.contextInstanceForUT()
val resIter = transKernel.createKernelWithBatchIterator(
val resIter = CHNativeExpressionEvaluator.createKernelWithBatchIterator(
mockMemoryAllocator.getNativeInstanceId,
substraitPlan.toByteArray,
new Array[Array[Byte]](0),
6 changes: 3 additions & 3 deletions cpp-ch/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
@@ -201,7 +201,7 @@ JNIEXPORT void JNI_OnUnload(JavaVM * vm, void * /*reserved*/)
env->DeleteGlobalRef(local_engine::ReservationListenerWrapper::reservation_listener_class);
}

JNIEXPORT void Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_nativeInitNative(JNIEnv * env, jobject, jbyteArray conf_plan)
JNIEXPORT void Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_nativeInitNative(JNIEnv * env, jclass, jbyteArray conf_plan)
{
LOCAL_ENGINE_JNI_METHOD_START
const auto conf_plan_a = local_engine::getByteArrayElementsSafe(env, conf_plan);
@@ -210,7 +210,7 @@ JNIEXPORT void Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_n
LOCAL_ENGINE_JNI_METHOD_END(env, )
}

JNIEXPORT void Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_nativeFinalizeNative(JNIEnv * env)
JNIEXPORT void Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_nativeFinalizeNative(JNIEnv * env, jclass)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::BackendFinalizerUtil::finalizeSessionally();
@@ -219,7 +219,7 @@ JNIEXPORT void Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_n

JNIEXPORT jlong Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_nativeCreateKernelWithIterator(
JNIEnv * env,
jobject /*obj*/,
jclass ,
jlong allocator_id,
jbyteArray plan,
jobjectArray split_infos,

0 comments on commit 4837d46

Please sign in to comment.