Skip to content

Commit

Permalink
[GLUTEN-6067][CH] [Part 3-2] Basic support for Native Write in Spark …
Browse files Browse the repository at this point in the history
…3.5 (#6586)

* 1. Reduce duplicated codes: BatchIterator::nextInternal now call CHNativeBlock::toColumnarBatch() to return ColumnarBatch
2. Extract a mew function SerializedPlanParser::buildPipeline, which used in the follow up PRs
3. Refactor File Wrapper, extract create_output_format_file for later use
4. Add GLUTEN_SOURCE_DIR, so that gtest can read java resource
5. Add SubstraitParserUtils.h, so that we can remove parseJson
6. Many litter refactor

* Make ExpressionEvaluatorJniWrapper static

* Refactor  GlutenClickHouseNativeWriteTableSuite and NativeWriteChecker

* Add BlockTypeUtils.h

* Support Native Write
baibaichen authored Jul 26, 2024
1 parent a7fb09e commit d90a7f4
Showing 61 changed files with 12,925 additions and 1,026 deletions.
Original file line number Diff line number Diff line change
@@ -27,6 +27,9 @@ public class OperatorMetrics implements IOperatorMetrics {
public JoinParams joinParams;
public AggregationParams aggParams;

public long physicalWrittenBytes;
public long numWrittenFiles;

/** Create an instance for operator metrics. */
public OperatorMetrics(
List<MetricsData> metricsList, JoinParams joinParams, AggregationParams aggParams) {
Original file line number Diff line number Diff line change
@@ -19,11 +19,8 @@
import org.apache.gluten.metrics.IMetrics;
import org.apache.gluten.metrics.NativeMetrics;

import org.apache.spark.sql.execution.utils.CHExecUtil;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

public class BatchIterator extends GeneralOutIterator {
@@ -43,8 +40,6 @@ public String getId() {

private native boolean nativeHasNext(long nativeHandle);

private native byte[] nativeNext(long nativeHandle);

private native long nativeCHNext(long nativeHandle);

private native void nativeClose(long nativeHandle);
@@ -54,22 +49,15 @@ public String getId() {
private native String nativeFetchMetrics(long nativeHandle);

@Override
public boolean hasNextInternal() throws IOException {
public boolean hasNextInternal() {
return nativeHasNext(handle);
}

@Override
public ColumnarBatch nextInternal() throws IOException {
public ColumnarBatch nextInternal() {
long block = nativeCHNext(handle);
CHNativeBlock nativeBlock = new CHNativeBlock(block);
int cols = nativeBlock.numColumns();
ColumnVector[] columnVectors = new ColumnVector[cols];
for (int i = 0; i < cols; i++) {
columnVectors[i] =
new CHColumnVector(
CHExecUtil.inferSparkDataType(nativeBlock.getTypeByPosition(i)), block, i);
}
return new ColumnarBatch(columnVectors, nativeBlock.numRows());
return nativeBlock.toColumnarBatch();
}

@Override
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@

public class CHColumnVector extends ColumnVector {
private final int columnPosition;
private long blockAddress;
private final long blockAddress;

public CHColumnVector(DataType type, long blockAddress, int columnPosition) {
super(type);
Original file line number Diff line number Diff line change
@@ -90,15 +90,13 @@ public static void closeFromColumnarBatch(ColumnarBatch cb) {
}

public ColumnarBatch toColumnarBatch() {
ColumnVector[] vectors = new ColumnVector[numColumns()];
for (int i = 0; i < numColumns(); i++) {
int numRows = numRows();
int cols = numColumns();
ColumnVector[] vectors = new ColumnVector[cols];
for (int i = 0; i < cols; i++) {
vectors[i] =
new CHColumnVector(CHExecUtil.inferSparkDataType(getTypeByPosition(i)), blockAddress, i);
}
int numRows = 0;
if (numColumns() != 0) {
numRows = numRows();
}
return new ColumnarBatch(vectors, numRows);
}
}
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@
import org.apache.spark.SparkConf;
import org.apache.spark.sql.internal.SQLConf;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -36,15 +37,12 @@
import scala.Tuple2;
import scala.collection.JavaConverters;

public class CHNativeExpressionEvaluator {
private final ExpressionEvaluatorJniWrapper jniWrapper;
public class CHNativeExpressionEvaluator extends ExpressionEvaluatorJniWrapper {

public CHNativeExpressionEvaluator() {
jniWrapper = new ExpressionEvaluatorJniWrapper();
}
private CHNativeExpressionEvaluator() {}

// Used to initialize the native computing.
public void initNative(SparkConf conf) {
public static void initNative(SparkConf conf) {
Tuple2<String, String>[] all = conf.getAll();
Map<String, String> confMap =
Arrays.stream(all).collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
@@ -55,19 +53,19 @@ public void initNative(SparkConf conf) {
// Get the customer config from SparkConf for each backend
BackendsApiManager.getTransformerApiInstance().postProcessNativeConfig(nativeConfMap, prefix);

jniWrapper.nativeInitNative(buildNativeConf(nativeConfMap));
nativeInitNative(buildNativeConf(nativeConfMap));
}

public void finalizeNative() {
jniWrapper.nativeFinalizeNative();
public static void finalizeNative() {
nativeFinalizeNative();
}

// Used to validate the Substrait plan in native compute engine.
public boolean doValidate(byte[] subPlan) {
return jniWrapper.nativeDoValidate(subPlan);
public static boolean doValidate(byte[] subPlan) {
throw new UnsupportedOperationException("doValidate is not supported in Clickhouse Backend");
}

private byte[] buildNativeConf(Map<String, String> confs) {
private static byte[] buildNativeConf(Map<String, String> confs) {
StringMapNode stringMapNode = ExpressionBuilder.makeStringMap(confs);
AdvancedExtensionNode extensionNode =
ExtensionBuilder.makeAdvancedExtension(
@@ -76,27 +74,28 @@ private byte[] buildNativeConf(Map<String, String> confs) {
return PlanBuilder.makePlan(extensionNode).toProtobuf().toByteArray();
}

private Map<String, String> getNativeBackendConf() {
private static Map<String, String> getNativeBackendConf() {
return GlutenConfig.getNativeBackendConf(
BackendsApiManager.getSettings().getBackendConfigPrefix(), SQLConf.get().getAllConfs());
}

public static void injectWriteFilesTempPath(String path, String fileName) {
throw new UnsupportedOperationException(
"injectWriteFilesTempPath Not supported in CHNativeExpressionEvaluator");
ExpressionEvaluatorJniWrapper.injectWriteFilesTempPath(
CHNativeMemoryAllocators.contextInstance().getNativeInstanceId(),
path.getBytes(StandardCharsets.UTF_8),
fileName.getBytes(StandardCharsets.UTF_8));
}

// Used by WholeStageTransform to create the native computing pipeline and
// return a columnar result iterator.
public BatchIterator createKernelWithBatchIterator(
public static BatchIterator createKernelWithBatchIterator(
byte[] wsPlan,
byte[][] splitInfo,
List<GeneralInIterator> iterList,
boolean materializeInput) {
long allocId = CHNativeMemoryAllocators.contextInstance().getNativeInstanceId();
long handle =
jniWrapper.nativeCreateKernelWithIterator(
allocId,
nativeCreateKernelWithIterator(
CHNativeMemoryAllocators.contextInstance().getNativeInstanceId(),
wsPlan,
splitInfo,
iterList.toArray(new GeneralInIterator[0]),
@@ -106,10 +105,10 @@ public BatchIterator createKernelWithBatchIterator(
}

// Only for UT.
public BatchIterator createKernelWithBatchIterator(
public static BatchIterator createKernelWithBatchIterator(
long allocId, byte[] wsPlan, byte[][] splitInfo, List<GeneralInIterator> iterList) {
long handle =
jniWrapper.nativeCreateKernelWithIterator(
nativeCreateKernelWithIterator(
allocId,
wsPlan,
splitInfo,
@@ -119,7 +118,7 @@ public BatchIterator createKernelWithBatchIterator(
return createBatchIterator(handle);
}

private BatchIterator createBatchIterator(long nativeHandle) {
private static BatchIterator createBatchIterator(long nativeHandle) {
return new BatchIterator(nativeHandle);
}
}
Original file line number Diff line number Diff line change
@@ -24,26 +24,18 @@
public class ExpressionEvaluatorJniWrapper {

/** Call initNative to initialize native computing. */
native void nativeInitNative(byte[] confAsPlan);
static native void nativeInitNative(byte[] confAsPlan);

/** Call finalizeNative to finalize native computing. */
native void nativeFinalizeNative();

/**
* Validate the Substrait plan in native compute engine.
*
* @param subPlan the Substrait plan in binary format.
* @return whether the computing of this plan is supported in native.
*/
native boolean nativeDoValidate(byte[] subPlan);
static native void nativeFinalizeNative();

/**
* Create a native compute kernel and return a columnar result iterator.
*
* @param allocatorId allocator id
* @return iterator instance id
*/
public native long nativeCreateKernelWithIterator(
public static native long nativeCreateKernelWithIterator(
long allocatorId,
byte[] wsPlan,
byte[][] splitInfo,
@@ -52,9 +44,11 @@ public native long nativeCreateKernelWithIterator(
boolean materializeInput);

/**
* Closes the projector referenced by nativeHandler.
* Set the temp path for writing files.
*
* @param nativeHandler nativeHandler that needs to be closed
* @param allocatorId allocator id for current task attempt(or thread)
* @param path the temp path for writing files
*/
native void nativeClose(long nativeHandler);
public static native void injectWriteFilesTempPath(
long allocatorId, byte[] path, byte[] filename);
}
Original file line number Diff line number Diff line change
@@ -18,20 +18,25 @@ package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.{CH_BRANCH, CH_COMMIT, GlutenConfig}
import org.apache.gluten.backendsapi._
import org.apache.gluten.execution.WriteFilesExecTransformer
import org.apache.gluten.expression.WindowFunctionsBuilder
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat._

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
import org.apache.spark.sql.types.{ArrayType, MapType, Metadata, StructField, StructType}

import java.util.Locale

@@ -187,6 +192,73 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}
}

override def supportWriteFilesExec(
format: FileFormat,
fields: Array[StructField],
bucketSpec: Option[BucketSpec],
options: Map[String, String]): ValidationResult = {

def validateCompressionCodec(): Option[String] = {
// FIXME: verify Support compression codec
val compressionCodec = WriteFilesExecTransformer.getCompressionCodec(options)
None
}

def validateFileFormat(): Option[String] = {
format match {
case _: ParquetFileFormat => None
case _: OrcFileFormat => None
case f: FileFormat => Some(s"Not support FileFormat: ${f.getClass.getSimpleName}")
}
}

// Validate if all types are supported.
def validateDateTypes(): Option[String] = {
None
}

def validateFieldMetadata(): Option[String] = {
// copy CharVarcharUtils.CHAR_VARCHAR_TYPE_STRING_METADATA_KEY
val CHAR_VARCHAR_TYPE_STRING_METADATA_KEY = "__CHAR_VARCHAR_TYPE_STRING"
fields
.find(_.metadata != Metadata.empty)
.filterNot(_.metadata.contains(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY))
.map {
filed =>
s"StructField contain the metadata information: $filed, metadata: ${filed.metadata}"
}
}
def validateWriteFilesOptions(): Option[String] = {
val maxRecordsPerFile = options
.get("maxRecordsPerFile")
.map(_.toLong)
.getOrElse(SQLConf.get.maxRecordsPerFile)
if (maxRecordsPerFile > 0) {
Some("Unsupported native write: maxRecordsPerFile not supported.")
} else {
None
}
}

def validateBucketSpec(): Option[String] = {
if (bucketSpec.nonEmpty) {
Some("Unsupported native write: bucket write is not supported.")
} else {
None
}
}

validateCompressionCodec()
.orElse(validateFileFormat())
.orElse(validateFieldMetadata())
.orElse(validateDateTypes())
.orElse(validateWriteFilesOptions())
.orElse(validateBucketSpec()) match {
case Some(reason) => ValidationResult.failed(reason)
case _ => ValidationResult.succeeded
}
}

override def supportShuffleWithProject(
outputPartitioning: Partitioning,
child: SparkPlan): Boolean = {
Original file line number Diff line number Diff line change
@@ -76,7 +76,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
}
.map(it => new ColumnarNativeIterator(it.asJava).asInstanceOf[GeneralInIterator])
.asJava
new CHNativeExpressionEvaluator().createKernelWithBatchIterator(
CHNativeExpressionEvaluator.createKernelWithBatchIterator(
wsPlan,
splitInfoByteArray,
listIterator,
Original file line number Diff line number Diff line change
@@ -83,7 +83,7 @@ class CHListenerApi extends ListenerApi with Logging {
val externalSortKey = s"${CHBackendSettings.getBackendConfigPrefix}.runtime_settings" +
s".max_bytes_before_external_sort"
if (conf.getLong(externalSortKey, -1) < 0) {
if (conf.getBoolean("spark.memory.offHeap.enabled", false)) {
if (conf.getBoolean("spark.memory.offHeap.enabled", defaultValue = false)) {
val memSize = JavaUtils.byteStringAsBytes(conf.get("spark.memory.offHeap.size"))
if (memSize > 0L) {
val cores = conf.getInt("spark.executor.cores", 1).toLong
@@ -97,8 +97,7 @@ class CHListenerApi extends ListenerApi with Logging {
// Load supported hive/python/scala udfs
UDFMappings.loadFromSparkConf(conf)

val initKernel = new CHNativeExpressionEvaluator()
initKernel.initNative(conf)
CHNativeExpressionEvaluator.initNative(conf)

// inject backend-specific implementations to override spark classes
// FIXME: The following set instances twice in local mode?
@@ -110,7 +109,6 @@ class CHListenerApi extends ListenerApi with Logging {

private def shutdown(): Unit = {
CHBroadcastBuildSideCache.cleanAll()
val kernel = new CHNativeExpressionEvaluator()
kernel.finalizeNative()
CHNativeExpressionEvaluator.finalizeNative()
}
}
Original file line number Diff line number Diff line change
@@ -383,13 +383,13 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
s"SampleTransformer metrics update is not supported in CH backend")
}

def genWriteFilesTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = {
throw new UnsupportedOperationException(
s"WriteFilesTransformer metrics update is not supported in CH backend")
}
def genWriteFilesTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"physicalWrittenBytes" -> SQLMetrics.createMetric(sparkContext, "number of written bytes"),
"numWrittenFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files")
)

def genWriteFilesTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater = {
throw new UnsupportedOperationException(
s"WriteFilesTransformer metrics update is not supported in CH backend")
new WriteFilesMetricsUpdater(metrics)
}
}
Loading

0 comments on commit d90a7f4

Please sign in to comment.