diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
index b897010d5bb38..6d0d361b7dffe 100644
--- a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
+++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
@@ -118,7 +118,6 @@ object OptimizeTableCommandOverwrites extends Logging {
val datasourceJniWrapper = new CHDatasourceJniWrapper()
val returnedMetrics =
datasourceJniWrapper.nativeMergeMTParts(
- planWithSplitInfo.plan,
planWithSplitInfo.splitInfo,
uuid,
taskId.getId.toString,
diff --git a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
index b897010d5bb38..6d0d361b7dffe 100644
--- a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
+++ b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
@@ -118,7 +118,6 @@ object OptimizeTableCommandOverwrites extends Logging {
val datasourceJniWrapper = new CHDatasourceJniWrapper()
val returnedMetrics =
datasourceJniWrapper.nativeMergeMTParts(
- planWithSplitInfo.plan,
planWithSplitInfo.splitInfo,
uuid,
taskId.getId.toString,
diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
index ef30aaad2294f..58ec497459d39 100644
--- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
+++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
@@ -120,7 +120,6 @@ object OptimizeTableCommandOverwrites extends Logging {
val datasourceJniWrapper = new CHDatasourceJniWrapper()
val returnedMetrics =
datasourceJniWrapper.nativeMergeMTParts(
- planWithSplitInfo.plan,
planWithSplitInfo.splitInfo,
uuid,
taskId.getId.toString,
@@ -172,7 +171,7 @@ object OptimizeTableCommandOverwrites extends Logging {
bucketNum: String,
bin: Seq[AddFile],
maxFileSize: Long): Seq[FileAction] = {
- val tableV2 = ClickHouseTableV2.getTable(txn.deltaLog);
+ val tableV2 = ClickHouseTableV2.getTable(txn.deltaLog)
val sparkSession = SparkSession.getActiveSession.get
diff --git a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java
index f19c5d39df1d8..e8746dbedad62 100644
--- a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java
+++ b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java
@@ -18,11 +18,15 @@
public class CHDatasourceJniWrapper {
- public native long nativeInitFileWriterWrapper(
- String filePath, byte[] preferredSchema, String formatHint);
+ public native void write(long instanceId, long blockAddress);
+
+ public native String close(long instanceId);
+
+ /// FileWriter
+ public native long createFilerWriter(String filePath, byte[] preferredSchema, String formatHint);
- public native long nativeInitMergeTreeWriterWrapper(
- byte[] plan,
+ /// MergeTreeWriter
+ public native long createMergeTreeWriter(
byte[] splitInfo,
String uuid,
String taskId,
@@ -31,43 +35,28 @@ public native long nativeInitMergeTreeWriterWrapper(
byte[] confArray);
public native String nativeMergeMTParts(
- byte[] plan,
- byte[] splitInfo,
- String uuid,
- String taskId,
- String partition_dir,
- String bucket_dir);
+ byte[] splitInfo, String uuid, String taskId, String partition_dir, String bucket_dir);
public static native String filterRangesOnDriver(byte[] plan, byte[] read);
- public native void write(long instanceId, long blockAddress);
-
- public native void writeToMergeTree(long instanceId, long blockAddress);
-
- public native void close(long instanceId);
-
- public native String closeMergeTreeWriter(long instanceId);
-
- /*-
+ /**
* The input block is already sorted by partition columns + bucket expressions. (check
- * org.apache.spark.sql.execution.datasources.FileFormatWriter#write)
- * However, the input block may contain parts(we call it stripe here) belonging to
- * different partition/buckets.
+ * org.apache.spark.sql.execution.datasources.FileFormatWriter#write) However, the input block may
+ * contain parts(we call it stripe here) belonging to different partition/buckets.
*
- * If bucketing is enabled, the input block's last column is guaranteed to be _bucket_value_.
+ *
If bucketing is enabled, the input block's last column is guaranteed to be _bucket_value_.
*
- * This function splits the input block in to several blocks, each of which belonging
- * to the same partition/bucket. Notice the stripe will NOT contain partition columns
+ *
This function splits the input block in to several blocks, each of which belonging to the
+ * same partition/bucket. Notice the stripe will NOT contain partition columns
*
- * Since all rows in a stripe share the same partition/bucket,
- * we only need to check the heading row.
- * So, for each stripe, the native code also returns each stripe's first row's index.
- * Caller can use these indice to get UnsafeRows from the input block,
- * to help FileFormatDataWriter to aware partition/bucket changes.
+ *
Since all rows in a stripe share the same partition/bucket, we only need to check the
+ * heading row. So, for each stripe, the native code also returns each stripe's first row's index.
+ * Caller can use these indices to get UnsafeRows from the input block, to help
+ * FileFormatDataWriter to aware partition/bucket changes.
*/
public static native BlockStripes splitBlockByPartitionAndBucket(
long blockAddress,
- int[] partitionColIndice,
+ int[] partitionColIndices,
boolean hasBucket,
boolean reserve_partition_columns);
}
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHFormatWriterInjects.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHFormatWriterInjects.scala
index 547904d7e0373..11013662e3290 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHFormatWriterInjects.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHFormatWriterInjects.scala
@@ -39,7 +39,7 @@ trait CHFormatWriterInjects extends GlutenFormatWriterInjectsBase {
context: TaskAttemptContext,
nativeConf: java.util.Map[String, String]): OutputWriter = {
val originPath = path
- val datasourceJniWrapper = new CHDatasourceJniWrapper();
+ val datasourceJniWrapper = new CHDatasourceJniWrapper()
CHThreadGroup.registerNewThreadGroup()
val namedStructBuilder = NamedStruct.newBuilder
@@ -52,10 +52,7 @@ trait CHFormatWriterInjects extends GlutenFormatWriterInjectsBase {
var namedStruct = namedStructBuilder.build
val instance =
- datasourceJniWrapper.nativeInitFileWriterWrapper(
- path,
- namedStruct.toByteArray,
- getFormatName());
+ datasourceJniWrapper.createFilerWriter(path, namedStruct.toByteArray, getFormatName())
new OutputWriter {
override def write(row: InternalRow): Unit = {
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
index d203deacc8104..33f0fd05dba2d 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
@@ -98,8 +98,7 @@ class CHMergeTreeWriterInjects extends GlutenFormatWriterInjectsBase {
)
val datasourceJniWrapper = new CHDatasourceJniWrapper()
val instance =
- datasourceJniWrapper.nativeInitMergeTreeWriterWrapper(
- planWithSplitInfo.plan,
+ datasourceJniWrapper.createMergeTreeWriter(
planWithSplitInfo.splitInfo,
uuid,
context.getTaskAttemptID.getTaskID.getId.toString,
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatDataWriter.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatDataWriter.scala
index 4f522e218659d..c8554f9b452de 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatDataWriter.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatDataWriter.scala
@@ -59,7 +59,7 @@ abstract class MergeTreeFileFormatDataWriter(
protected val updatedPartitions: mutable.Set[String] = mutable.Set[String]()
protected var currentWriter: OutputWriter = _
- protected val returnedMetrics = mutable.HashMap[String, AddFile]()
+ protected val returnedMetrics: mutable.Map[String, AddFile] = mutable.HashMap[String, AddFile]()
/** Trackers for computing various statistics on the data as it's being written out. */
protected val statsTrackers: Seq[WriteTaskStatsTracker] =
@@ -71,10 +71,10 @@ abstract class MergeTreeFileFormatDataWriter(
try {
currentWriter.close()
statsTrackers.foreach(_.closeFile(currentWriter.path()))
- val ret = currentWriter.asInstanceOf[MergeTreeOutputWriter].getAddFiles()
- if (ret.nonEmpty) {
- ret.foreach(addFile => returnedMetrics.put(addFile.path, addFile))
- }
+ currentWriter
+ .asInstanceOf[MergeTreeOutputWriter]
+ .getAddFiles
+ .foreach(addFile => returnedMetrics.put(addFile.path, addFile))
} finally {
currentWriter = null
}
@@ -117,12 +117,7 @@ abstract class MergeTreeFileFormatDataWriter(
releaseResources()
val (taskCommitMessage, taskCommitTime) = Utils.timeTakenMs {
// committer.commitTask(taskAttemptContext)
- val statuses = returnedMetrics
- .map(
- v => {
- v._2
- })
- .toSeq
+ val statuses = returnedMetrics.map(_._2).toSeq
new TaskCommitMessage(statuses)
}
@@ -142,7 +137,7 @@ abstract class MergeTreeFileFormatDataWriter(
override def close(): Unit = {}
- def getReturnedMetrics(): mutable.Map[String, AddFile] = returnedMetrics
+ def getReturnedMetrics: mutable.Map[String, AddFile] = returnedMetrics
}
/** FileFormatWriteTask for empty partitions */
@@ -443,7 +438,11 @@ class MergeTreeDynamicPartitionDataSingleWriter(
case fakeRow: FakeRow =>
if (fakeRow.batch.numRows() > 0) {
val blockStripes = GlutenRowSplitter.getInstance
- .splitBlockByPartitionAndBucket(fakeRow, partitionColIndice, isBucketed, true)
+ .splitBlockByPartitionAndBucket(
+ fakeRow,
+ partitionColIndice,
+ isBucketed,
+ reserve_partition_columns = true)
val iter = blockStripes.iterator()
while (iter.hasNext) {
@@ -526,10 +525,10 @@ class MergeTreeDynamicPartitionDataConcurrentWriter(
if (status.outputWriter != null) {
try {
status.outputWriter.close()
- val ret = status.outputWriter.asInstanceOf[MergeTreeOutputWriter].getAddFiles()
- if (ret.nonEmpty) {
- ret.foreach(addFile => returnedMetrics.put(addFile.path, addFile))
- }
+ status.outputWriter
+ .asInstanceOf[MergeTreeOutputWriter]
+ .getAddFiles
+ .foreach(addFile => returnedMetrics.put(addFile.path, addFile))
} finally {
status.outputWriter = null
}
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeOutputWriter.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeOutputWriter.scala
index 2ea2906c6a5aa..ac367a9b745be 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeOutputWriter.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeOutputWriter.scala
@@ -42,12 +42,12 @@ class MergeTreeOutputWriter(
if (nextBatch.numRows > 0) {
val col = nextBatch.column(0).asInstanceOf[CHColumnVector]
- datasourceJniWrapper.writeToMergeTree(instance, col.getBlockAddress)
+ datasourceJniWrapper.write(instance, col.getBlockAddress)
} // else just ignore this empty block
}
override def close(): Unit = {
- val returnedMetrics = datasourceJniWrapper.closeMergeTreeWriter(instance)
+ val returnedMetrics = datasourceJniWrapper.close(instance)
if (returnedMetrics != null && returnedMetrics.nonEmpty) {
addFiles.appendAll(
AddFileTags.partsMetricsToAddFile(
@@ -64,7 +64,7 @@ class MergeTreeOutputWriter(
originPath
}
- def getAddFiles(): ArrayBuffer[AddFile] = {
+ def getAddFiles: ArrayBuffer[AddFile] = {
addFiles
}
}
diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
index 7b1a79586eee0..776310f62bd35 100644
--- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
+++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
@@ -56,6 +56,7 @@ class GlutenClickHouseMergeTreeWriteSuite
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.sql.files.maxPartitionBytes", "20000000")
+ .set("spark.gluten.sql.native.writer.enabled", "true")
.setCHSettings("min_insert_block_size_rows", 100000)
.setCHSettings("mergetree.merge_after_insert", false)
.setCHSettings("input_format_parquet_max_block_size", 8192)
diff --git a/cpp-ch/local-engine/CMakeLists.txt b/cpp-ch/local-engine/CMakeLists.txt
index ca25692bf0135..d145ed339ff55 100644
--- a/cpp-ch/local-engine/CMakeLists.txt
+++ b/cpp-ch/local-engine/CMakeLists.txt
@@ -128,9 +128,9 @@ foreach(child ${children})
add_headers_and_sources(function_parsers ${child})
endforeach()
-# Notice: soures files under Parser/*_udf subdirectories must be built into
+# Notice: sources files under Parser/*_udf subdirectories must be built into
# target ${LOCALENGINE_SHARED_LIB} directly to make sure all function parsers
-# are registered successly.
+# are registered successfully.
add_library(
${LOCALENGINE_SHARED_LIB} SHARED
local_engine_jni.cpp ${local_udfs_sources} ${function_parsers_sources}
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp b/cpp-ch/local-engine/Common/CHUtil.cpp
index 5ad0b1b973843..4ec4ab376e6da 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -48,7 +48,6 @@
#include
#include
#include
-#include
#include
#include
#include
@@ -62,7 +61,6 @@
#include
#include
#include
-#include
#include
#include
#include
diff --git a/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp
index c1d2ee2504ef4..6ee0252d65352 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp
@@ -22,7 +22,7 @@
#include
#include
#include
-#include
+#include
#include
#include
#include
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
index c1f00c334812a..f4aa38df599b6 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
@@ -72,12 +72,10 @@
#include
#include
#include
-#include
#include
#include
#include
#include
-#include
#include
#include
#include
diff --git a/cpp-ch/local-engine/Parser/SubstraitParserUtils.cpp b/cpp-ch/local-engine/Parser/SubstraitParserUtils.cpp
index e72454ba6fb24..164cf0392dfbf 100644
--- a/cpp-ch/local-engine/Parser/SubstraitParserUtils.cpp
+++ b/cpp-ch/local-engine/Parser/SubstraitParserUtils.cpp
@@ -26,14 +26,12 @@ namespace local_engine
{
void logDebugMessage(const google::protobuf::Message & message, const char * type)
{
- auto * logger = &Poco::Logger::get("SubstraitPlan");
- if (logger->debug())
+ if (auto * logger = &Poco::Logger::get("SubstraitPlan"); logger->debug())
{
namespace pb_util = google::protobuf::util;
pb_util::JsonOptions options;
std::string json;
- auto s = pb_util::MessageToJsonString(message, &json, options);
- if (!s.ok())
+ if (auto s = pb_util::MessageToJsonString(message, &json, options); !s.ok())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not convert {} to Json", type);
LOG_DEBUG(logger, "{}:\n{}", type, json);
}
diff --git a/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp b/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp
index 6cd74c8af343b..cee45aa34c4cb 100644
--- a/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp
+++ b/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp
@@ -21,16 +21,10 @@
#include
#include
#include
-#include
-#include
#include
#include
-#include
#include
-#include
#include
-#include
-#include
namespace local_engine
{
@@ -86,7 +80,7 @@ void NativeSplitter::split(DB::Block & block)
{
if (partition_buffer[i]->size() >= options.buffer_size)
{
- output_buffer.emplace(std::pair(i, std::make_unique(partition_buffer[i]->releaseColumns())));
+ output_buffer.emplace(std::pair(i, std::make_unique(partition_buffer[i]->releaseColumns())));
}
}
}
@@ -116,7 +110,7 @@ bool NativeSplitter::hasNext()
{
if (inputHasNext())
{
- split(*reinterpret_cast(inputNext()));
+ split(*reinterpret_cast(inputNext()));
}
else
{
@@ -125,7 +119,7 @@ bool NativeSplitter::hasNext()
auto buffer = partition_buffer.at(i);
if (buffer->size() > 0)
{
- output_buffer.emplace(std::pair(i, new Block(buffer->releaseColumns())));
+ output_buffer.emplace(std::pair(i, new DB::Block(buffer->releaseColumns())));
}
}
break;
@@ -214,7 +208,7 @@ HashNativeSplitter::HashNativeSplitter(NativeSplitter::Options options_, jobject
selector_builder = std::make_unique(options.partition_num, hash_fields, options_.hash_algorithm);
}
-void HashNativeSplitter::computePartitionId(Block & block)
+void HashNativeSplitter::computePartitionId(DB::Block & block)
{
partition_info = selector_builder->build(block);
}
@@ -229,7 +223,7 @@ RoundRobinNativeSplitter::RoundRobinNativeSplitter(NativeSplitter::Options optio
selector_builder = std::make_unique(options_.partition_num);
}
-void RoundRobinNativeSplitter::computePartitionId(Block & block)
+void RoundRobinNativeSplitter::computePartitionId(DB::Block & block)
{
partition_info = selector_builder->build(block);
}
diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp
index 1ede4960aafe4..3c3201829885a 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp
@@ -43,11 +43,44 @@ Block removeColumnSuffix(const Block & block)
}
return Block(columns);
}
+
}
namespace local_engine
{
+std::string PartInfo::toJson(const std::vector & part_infos)
+{
+ rapidjson::StringBuffer result;
+ rapidjson::Writer writer(result);
+ writer.StartArray();
+ for (const auto & item : part_infos)
+ {
+ writer.StartObject();
+ writer.Key("part_name");
+ writer.String(item.part_name.c_str());
+ writer.Key("mark_count");
+ writer.Uint(item.mark_count);
+ writer.Key("disk_size");
+ writer.Uint(item.disk_size);
+ writer.Key("row_count");
+ writer.Uint(item.row_count);
+ writer.Key("bucket_id");
+ writer.String(item.bucket_id.c_str());
+ writer.Key("partition_values");
+ writer.StartObject();
+ for (const auto & key_value : item.partition_values)
+ {
+ writer.Key(key_value.first.c_str());
+ writer.String(key_value.second.c_str());
+ }
+ writer.EndObject();
+ writer.EndObject();
+ }
+ writer.EndArray();
+ return result.GetString();
+}
+
std::unique_ptr SparkMergeTreeWriter::create(
const MergeTreeTable & merge_tree_table,
const SparkMergeTreeWritePartitionSettings & write_settings_,
@@ -82,7 +115,7 @@ SparkMergeTreeWriter::SparkMergeTreeWriter(
{
}
-void SparkMergeTreeWriter::write(const DB::Block & block)
+void SparkMergeTreeWriter::write(DB::Block & block)
{
auto new_block = removeColumnSuffix(block);
auto converter = ActionsDAG::makeConvertingActions(
@@ -92,9 +125,10 @@ void SparkMergeTreeWriter::write(const DB::Block & block)
executor.push(new_block);
}
-void SparkMergeTreeWriter::finalize()
+std::string SparkMergeTreeWriter::close()
{
executor.finish();
+ return PartInfo::toJson(getAllPartInfo());
}
std::vector SparkMergeTreeWriter::getAllPartInfo() const
@@ -116,36 +150,4 @@ std::vector SparkMergeTreeWriter::getAllPartInfo() const
return res;
}
-String SparkMergeTreeWriter::partInfosToJson(const std::vector & part_infos)
-{
- rapidjson::StringBuffer result;
- rapidjson::Writer writer(result);
- writer.StartArray();
- for (const auto & item : part_infos)
- {
- writer.StartObject();
- writer.Key("part_name");
- writer.String(item.part_name.c_str());
- writer.Key("mark_count");
- writer.Uint(item.mark_count);
- writer.Key("disk_size");
- writer.Uint(item.disk_size);
- writer.Key("row_count");
- writer.Uint(item.row_count);
- writer.Key("bucket_id");
- writer.String(item.bucket_id.c_str());
- writer.Key("partition_values");
- writer.StartObject();
- for (const auto & key_value : item.partition_values)
- {
- writer.Key(key_value.first.c_str());
- writer.String(key_value.second.c_str());
- }
- writer.EndObject();
- writer.EndObject();
- }
- writer.EndArray();
- return result.GetString();
-}
-
}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h
index 699fd3d80b5b5..59535cef094dd 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h
@@ -21,6 +21,7 @@
#include
#include
#include
+#include
namespace DB
{
@@ -44,12 +45,13 @@ struct PartInfo
String bucket_id;
bool operator<(const PartInfo & rhs) const { return disk_size < rhs.disk_size; }
+
+ static std::string toJson(const std::vector & part_infos);
};
-class SparkMergeTreeWriter
+class SparkMergeTreeWriter : public NativeOutputWriter
{
public:
- static String partInfosToJson(const std::vector & part_infos);
static std::unique_ptr create(
const MergeTreeTable & merge_tree_table,
const SparkMergeTreeWritePartitionSettings & write_settings_,
@@ -61,9 +63,8 @@ class SparkMergeTreeWriter
DB::QueryPipeline && pipeline_,
std::unordered_map && partition_values_);
- void write(const DB::Block & block);
- void finalize();
- std::vector getAllPartInfo() const;
+ void write(DB::Block & block) override;
+ std::string close() override;
private:
DB::Block header;
@@ -71,5 +72,7 @@ class SparkMergeTreeWriter
DB::QueryPipeline pipeline;
DB::PushingPipelineExecutor executor;
std::unordered_map partition_values;
+
+ std::vector getAllPartInfo() const;
};
}
diff --git a/cpp-ch/local-engine/Storages/NativeOutputWriter.h b/cpp-ch/local-engine/Storages/NativeOutputWriter.h
new file mode 100644
index 0000000000000..bfcbe0b092a53
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/NativeOutputWriter.h
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include
+
+namespace DB
+{
+class Block;
+}
+namespace local_engine
+{
+class NativeOutputWriter
+{
+public:
+ NativeOutputWriter() = default;
+ virtual ~NativeOutputWriter() = default;
+
+ //TODO: change to write(const DB::Block & block)
+ virtual void write(DB::Block & block) = 0;
+ virtual std::string close() = 0;
+};
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.cpp b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp
similarity index 93%
rename from cpp-ch/local-engine/Storages/Output/FileWriterWrappers.cpp
rename to cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp
index 632fb0a4530c0..d90213ff23db4 100644
--- a/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.cpp
+++ b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include "FileWriterWrappers.h"
+#include "NormalFileWriter.h"
#include
namespace local_engine
@@ -23,12 +23,11 @@ namespace local_engine
const std::string SubstraitFileSink::NO_PARTITION_ID{"__NO_PARTITION_ID__"};
const std::string SubstraitPartitionedFileSink::DEFAULT_PARTITION_NAME{"__HIVE_DEFAULT_PARTITION__"};
-NormalFileWriter::NormalFileWriter(const OutputFormatFilePtr & file_, const DB::ContextPtr & context_)
- : FileWriterWrapper(file_), context(context_)
+NormalFileWriter::NormalFileWriter(const OutputFormatFilePtr & file_, const DB::ContextPtr & context_) : file(file_), context(context_)
{
}
-void NormalFileWriter::consume(DB::Block & block)
+void NormalFileWriter::write(DB::Block & block)
{
if (!writer) [[unlikely]]
{
@@ -61,12 +60,14 @@ void NormalFileWriter::consume(DB::Block & block)
writer->push(materializeBlock(block));
}
-void NormalFileWriter::close()
+std::string NormalFileWriter::close()
{
/// When insert into a table with empty dataset, NormalFileWriter::consume would be never called.
/// So we need to skip when writer is nullptr.
if (writer)
writer->finish();
+
+ return std::string{};
}
OutputFormatFilePtr createOutputFormatFile(
@@ -82,7 +83,7 @@ OutputFormatFilePtr createOutputFormatFile(
return OutputFormatFileUtil::createFile(context, write_buffer_builder, encoded, preferred_schema, format_hint);
}
-std::unique_ptr createFileWriterWrapper(
+std::unique_ptr NormalFileWriter::create(
const DB::ContextPtr & context, const std::string & file_uri, const DB::Block & preferred_schema, const std::string & format_hint)
{
return std::make_unique(createOutputFormatFile(context, file_uri, preferred_schema, format_hint), context);
diff --git a/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.h b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
similarity index 93%
rename from cpp-ch/local-engine/Storages/Output/FileWriterWrappers.h
rename to cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
index 49383f8de42cb..6d054a04fad39 100644
--- a/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.h
+++ b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
@@ -26,6 +26,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -36,31 +37,20 @@
namespace local_engine
{
-class FileWriterWrapper
+class NormalFileWriter : public NativeOutputWriter
{
public:
- explicit FileWriterWrapper(const OutputFormatFilePtr & file_) : file(file_) { }
- virtual ~FileWriterWrapper() = default;
+ static std::unique_ptr create(
+ const DB::ContextPtr & context, const std::string & file_uri, const DB::Block & preferred_schema, const std::string & format_hint);
- virtual void consume(DB::Block & block) = 0;
- virtual void close() = 0;
-
-protected:
- OutputFormatFilePtr file;
-};
-
-using FileWriterWrapperPtr = std::shared_ptr;
-
-class NormalFileWriter : public FileWriterWrapper
-{
-public:
NormalFileWriter(const OutputFormatFilePtr & file_, const DB::ContextPtr & context_);
~NormalFileWriter() override = default;
- void consume(DB::Block & block) override;
- void close() override;
+ void write(DB::Block & block) override;
+ std::string close() override;
private:
+ OutputFormatFilePtr file;
DB::ContextPtr context;
OutputFormatFile::OutputFormatPtr output_format;
@@ -68,9 +58,6 @@ class NormalFileWriter : public FileWriterWrapper
std::unique_ptr writer;
};
-std::unique_ptr createFileWriterWrapper(
- const DB::ContextPtr & context, const std::string & file_uri, const DB::Block & preferred_schema, const std::string & format_hint);
-
OutputFormatFilePtr createOutputFormatFile(
const DB::ContextPtr & context, const std::string & file_uri, const DB::Block & preferred_schema, const std::string & format_hint);
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp
index 68c445863133f..20f527bb4f4ea 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -45,9 +45,8 @@
#include
#include
#include
-#include
+#include
#include
-#include
#include
#include
#include
@@ -908,7 +907,7 @@ JNIEXPORT void Java_org_apache_gluten_vectorized_CHBlockWriterJniWrapper_nativeC
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
-JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_nativeInitFileWriterWrapper(
+JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_createFilerWriter(
JNIEnv * env, jobject, jstring file_uri_, jbyteArray preferred_schema_, jstring format_hint_)
{
LOCAL_ENGINE_JNI_METHOD_START
@@ -920,8 +919,7 @@ JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniW
reinterpret_cast(preferred_schema_ref.elems()), static_cast(preferred_schema_ref.length())};
substrait::NamedStruct res;
- bool ok = res.ParseFromString(view);
- if (!ok)
+ if (!res.ParseFromString(view))
return {};
return std::move(res);
};
@@ -935,15 +933,14 @@ JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniW
// for HiveFileFormat, the file url may not end with .parquet, so we pass in the format as a hint
const auto format_hint = jstring2string(env, format_hint_);
const auto context = local_engine::QueryContext::instance().currentQueryContext();
- auto * writer = local_engine::createFileWriterWrapper(context, file_uri, preferred_schema, format_hint).release();
+ auto * writer = local_engine::NormalFileWriter::create(context, file_uri, preferred_schema, format_hint).release();
return reinterpret_cast(writer);
LOCAL_ENGINE_JNI_METHOD_END(env, 0)
}
-JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_nativeInitMergeTreeWriterWrapper(
+JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_createMergeTreeWriter(
JNIEnv * env,
jobject,
- jbyteArray plan_,
jbyteArray split_info_,
jstring uuid_,
jstring task_id_,
@@ -1005,54 +1002,24 @@ Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_write(JNI
{
LOCAL_ENGINE_JNI_METHOD_START
- auto * writer = reinterpret_cast(instanceId);
+ auto * writer = reinterpret_cast(instanceId);
auto * block = reinterpret_cast(block_address);
- writer->consume(*block);
- LOCAL_ENGINE_JNI_METHOD_END(env, )
-}
-
-JNIEXPORT void Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_close(JNIEnv * env, jobject, jlong instanceId)
-{
- LOCAL_ENGINE_JNI_METHOD_START
- auto * writer = reinterpret_cast(instanceId);
- SCOPE_EXIT({ delete writer; });
- writer->close();
- LOCAL_ENGINE_JNI_METHOD_END(env, )
-}
-
-JNIEXPORT void Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_writeToMergeTree(
- JNIEnv * env, jobject, jlong instanceId, jlong block_address)
-{
- LOCAL_ENGINE_JNI_METHOD_START
- auto * writer = reinterpret_cast(instanceId);
- const auto * block = reinterpret_cast(block_address);
writer->write(*block);
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
-JNIEXPORT jstring
-Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_closeMergeTreeWriter(JNIEnv * env, jobject, jlong instanceId)
+JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_close(JNIEnv * env, jobject, jlong instanceId)
{
LOCAL_ENGINE_JNI_METHOD_START
- auto * writer = reinterpret_cast(instanceId);
+ auto * writer = reinterpret_cast(instanceId);
SCOPE_EXIT({ delete writer; });
-
- writer->finalize();
- const auto part_infos = writer->getAllPartInfo();
- const auto json_info = local_engine::SparkMergeTreeWriter::partInfosToJson(part_infos);
- return local_engine::charTojstring(env, json_info.c_str());
+ const auto result = writer->close();
+ return local_engine::charTojstring(env, result.c_str());
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
}
JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_nativeMergeMTParts(
- JNIEnv * env,
- jobject,
- jbyteArray plan_,
- jbyteArray split_info_,
- jstring uuid_,
- jstring task_id_,
- jstring partition_dir_,
- jstring bucket_dir_)
+ JNIEnv * env, jobject, jbyteArray split_info_, jstring uuid_, jstring task_id_, jstring partition_dir_, jstring bucket_dir_)
{
LOCAL_ENGINE_JNI_METHOD_START
@@ -1091,8 +1058,7 @@ JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
partPtr->name, partPtr->getMarksCount(), partPtr->getBytesOnDisk(), partPtr->rows_count, partition_values, bucket_dir});
}
- auto json_info = local_engine::SparkMergeTreeWriter::partInfosToJson(res);
-
+ auto json_info = local_engine::PartInfo::toJson(res);
return local_engine::charTojstring(env, json_info.c_str());
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
}
diff --git a/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp b/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp
index ac0ec2145757c..5a39580c19fdc 100644
--- a/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp
+++ b/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp
@@ -20,6 +20,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -39,21 +40,21 @@
#include
#include
-# define ASSERT_DURATION_LE(secs, stmt) \
- { \
- std::promise completed; \
- auto stmt_future = completed.get_future(); \
- std::thread( \
- [&](std::promise & completed) \
- { \
- stmt; \
- completed.set_value(true); \
- }, \
- std::ref(completed)) \
- .detach(); \
- if (stmt_future.wait_for(std::chrono::seconds(secs)) == std::future_status::timeout) \
- GTEST_FATAL_FAILURE_(" timed out (> " #secs " seconds). Check code for infinite loops"); \
- }
+#define ASSERT_DURATION_LE(secs, stmt) \
+ { \
+ std::promise completed; \
+ auto stmt_future = completed.get_future(); \
+ std::thread( \
+ [&](std::promise & completed) \
+ { \
+ stmt; \
+ completed.set_value(true); \
+ }, \
+ std::ref(completed)) \
+ .detach(); \
+ if (stmt_future.wait_for(std::chrono::seconds(secs)) == std::future_status::timeout) \
+ GTEST_FATAL_FAILURE_(" timed out (> " #secs " seconds). Check code for infinite loops"); \
+ }
namespace DB::ErrorCodes
diff --git a/cpp-ch/local-engine/tests/gtest_parser.cpp b/cpp-ch/local-engine/tests/gtest_parser.cpp
index b6c431ff25f35..da46ff6fdcdea 100644
--- a/cpp-ch/local-engine/tests/gtest_parser.cpp
+++ b/cpp-ch/local-engine/tests/gtest_parser.cpp
@@ -24,7 +24,6 @@
#include
#include
#include
-#include
#include
#include
#include
diff --git a/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp b/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp
index 18a18b0e2cbfc..68c83aa8a8293 100644
--- a/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp
+++ b/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp
@@ -19,31 +19,19 @@
#include
#include
#include
-#include
#include
#include
-#include
-#include
#include
-#include
-#include
#include
-#include
+#include
#include
-#include
#include
#include
-#include
-#include
-#include
-#include
-#include
#include
#include
-#include
+#include
#include
#include
-#include
#include
#include
@@ -252,197 +240,4 @@ TEST(WritePipeline, ComputePartitionedExpression)
EXPECT_EQ("s_nationkey=1/name=one", partition_by_result_column->getDataAt(0));
EXPECT_EQ("s_nationkey=2/name=two", partition_by_result_column->getDataAt(1));
EXPECT_EQ("s_nationkey=3/name=three", partition_by_result_column->getDataAt(2));
-}
-
-void do_remove(const std::string & folder)
-{
- namespace fs = std::filesystem;
- if (const std::filesystem::path ph(folder); fs::exists(ph))
- fs::remove_all(ph);
-}
-
-Chunk person_chunk()
-{
- auto id = INT()->createColumn();
- id->insert(100);
- id->insert(200);
- id->insert(300);
- id->insert(400);
- id->insert(500);
- id->insert(600);
- id->insert(700);
-
- auto name = STRING()->createColumn();
- name->insert("Joe");
- name->insert("Marry");
- name->insert("Mike");
- name->insert("Fred");
- name->insert("Albert");
- name->insert("Michelle");
- name->insert("Dan");
-
- auto age = makeNullable(INT())->createColumn();
- Field null_field;
- age->insert(30);
- age->insert(null_field);
- age->insert(18);
- age->insert(50);
- age->insert(null_field);
- age->insert(30);
- age->insert(50);
-
-
- MutableColumns x;
- x.push_back(std::move(id));
- x.push_back(std::move(name));
- x.push_back(std::move(age));
- return {std::move(x), 7};
-}
-
-TEST(WritePipeline, MergeTree)
-{
- ThreadStatus thread_status;
-
- const auto context = DB::Context::createCopy(QueryContext::globalContext());
- context->setPath("./");
- const Settings & settings = context->getSettingsRef();
-
- const std::string query
- = R"(create table if not exists person (id Int32, Name String, Age Nullable(Int32)) engine = MergeTree() ORDER BY id)";
-
- const char * begin = query.data();
- const char * end = query.data() + query.size();
- ParserQuery parser(end, settings.allow_settings_after_format_in_insert);
-
- ASTPtr ast = parseQuery(parser, begin, end, "", settings.max_query_size, settings.max_parser_depth, settings.max_parser_backtracks);
-
- EXPECT_TRUE(ast->as());
- auto & create = ast->as();
-
- ColumnsDescription column_descriptions
- = InterpreterCreateQuery::getColumnsDescription(*create.columns_list->columns, context, LoadingStrictnessLevel::CREATE);
-
- StorageInMemoryMetadata metadata;
- metadata.setColumns(column_descriptions);
- metadata.setComment("args.comment");
- ASTPtr partition_by_key;
- metadata.partition_key = KeyDescription::getKeyFromAST(partition_by_key, metadata.columns, context);
-
- MergeTreeData::MergingParams merging_params;
- merging_params.mode = MergeTreeData::MergingParams::Ordinary;
-
-
- /// This merging param maybe used as part of sorting key
- std::optional merging_param_key_arg;
- /// Get sorting key from engine arguments.
- ///
- /// NOTE: store merging_param_key_arg as additional key column. We do it
- /// before storage creation. After that storage will just copy this
- /// column if sorting key will be changed.
- metadata.sorting_key
- = KeyDescription::getSortingKeyFromAST(create.storage->order_by->ptr(), metadata.columns, context, merging_param_key_arg);
-
- std::unique_ptr storage_settings = std::make_unique(context->getMergeTreeSettings());
-
- UUID uuid;
- UUIDHelpers::getHighBytes(uuid) = 0xffffffffffff0fffull | 0x0000000000004000ull;
- UUIDHelpers::getLowBytes(uuid) = 0x3fffffffffffffffull | 0x8000000000000000ull;
-
- SCOPE_EXIT({ do_remove("WritePipeline_MergeTree"); });
-
- auto merge_tree = std::make_shared(
- StorageID("", "", uuid),
- "WritePipeline_MergeTree",
- metadata,
- LoadingStrictnessLevel::CREATE,
- context,
- "",
- merging_params,
- std::move(storage_settings));
-
- Block header{{INT(), "id"}, {STRING(), "Name"}, {makeNullable(INT()), "Age"}};
- DB::Squashing squashing(header, settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
- squashing.add(person_chunk());
- auto x = Squashing::squash(squashing.flush());
- x.getChunkInfos().add(std::make_shared());
-
- ASSERT_EQ(7, x.getNumRows());
- ASSERT_EQ(3, x.getNumColumns());
-
-
- auto metadata_snapshot = std::make_shared(metadata);
- ASTPtr none;
- auto sink = std::static_pointer_cast(merge_tree->write(none, metadata_snapshot, context, false));
-
- sink->consume(x);
- sink->onFinish();
-}
-
-INCBIN(_1_mergetree_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_mergetree.json");
-INCBIN(_1_mergetree_hdfs_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_mergetree_hdfs.json");
-INCBIN(_1_read_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_plan.json");
-
-TEST(WritePipeline, SparkMergeTree)
-{
- ThreadStatus thread_status;
-
- const auto context = DB::Context::createCopy(QueryContext::globalContext());
- context->setPath("./");
- const Settings & settings = context->getSettingsRef();
-
- const auto extension_table = local_engine::JsonStringToMessage(EMBEDDED_PLAN(_1_mergetree_));
- MergeTreeTableInstance merge_tree_table(extension_table);
-
- EXPECT_EQ(merge_tree_table.database, "default");
- EXPECT_EQ(merge_tree_table.table, "lineitem_mergetree");
- EXPECT_EQ(merge_tree_table.relative_path, "lineitem_mergetree");
- EXPECT_EQ(merge_tree_table.table_configs.storage_policy, "default");
-
- do_remove(merge_tree_table.relative_path);
-
- const auto dest_storage = merge_tree_table.getStorage(QueryContext::globalMutableContext());
- EXPECT_TRUE(dest_storage);
- EXPECT_FALSE(dest_storage->getStoragePolicy()->getAnyDisk()->isRemote());
- DB::StorageMetadataPtr metadata_snapshot = dest_storage->getInMemoryMetadataPtr();
- Block header = metadata_snapshot->getSampleBlock();
-
- constexpr std::string_view split_template
- = R"({"items":[{"uriFile":"{replace_local_files}","length":"19230111","parquet":{},"schema":{},"metadataColumns":[{}],"properties":{"fileSize":"19230111","modificationTime":"1722330598029"}}]})";
- constexpr std::string_view file{GLUTEN_SOURCE_TPCH_DIR("lineitem/part-00000-d08071cb-0dfa-42dc-9198-83cb334ccda3-c000.snappy.parquet")};
-
- SparkMergeTreeWritePartitionSettings gm_write_settings{
- .part_name_prefix{"this_is_prefix"},
- };
- gm_write_settings.set(context);
-
- auto writer = local_engine::SparkMergeTreeWriter::create(merge_tree_table, gm_write_settings, context);
- SparkMergeTreeWriter & spark_merge_tree_writer = *writer;
-
- auto [_, local_executor] = test::create_plan_and_executor(EMBEDDED_PLAN(_1_read_), split_template, file);
- EXPECT_TRUE(local_executor->hasNext());
-
- do
- {
- spark_merge_tree_writer.write(*local_executor->nextColumnar());
- } while (local_executor->hasNext());
-
- spark_merge_tree_writer.finalize();
- auto part_infos = spark_merge_tree_writer.getAllPartInfo();
- auto json_info = local_engine::SparkMergeTreeWriter::partInfosToJson(part_infos);
- std::cerr << json_info << std::endl;
-
- ///
- {
- const auto extension_table_hdfs
- = local_engine::JsonStringToMessage(EMBEDDED_PLAN(_1_mergetree_hdfs_));
- MergeTreeTableInstance merge_tree_table_hdfs(extension_table_hdfs);
- EXPECT_EQ(merge_tree_table_hdfs.database, "default");
- EXPECT_EQ(merge_tree_table_hdfs.table, "lineitem_mergetree_hdfs");
- EXPECT_EQ(merge_tree_table_hdfs.relative_path, "3.5/test/lineitem_mergetree_hdfs");
- EXPECT_EQ(merge_tree_table_hdfs.table_configs.storage_policy, "__hdfs_main");
-
- const auto dest_storage_hdfs = merge_tree_table_hdfs.getStorage(QueryContext::globalMutableContext());
- EXPECT_TRUE(dest_storage_hdfs);
- EXPECT_TRUE(dest_storage_hdfs->getStoragePolicy()->getAnyDisk()->isRemote());
- }
}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/tests/gtest_write_pipeline_mergetree.cpp b/cpp-ch/local-engine/tests/gtest_write_pipeline_mergetree.cpp
new file mode 100644
index 0000000000000..caf4481354336
--- /dev/null
+++ b/cpp-ch/local-engine/tests/gtest_write_pipeline_mergetree.cpp
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+using namespace local_engine;
+using namespace DB;
+
+namespace
+{
+void do_remove(const std::string & folder)
+{
+ namespace fs = std::filesystem;
+ if (const std::filesystem::path ph(folder); fs::exists(ph))
+ fs::remove_all(ph);
+}
+
+Chunk person_chunk()
+{
+ auto id = INT()->createColumn();
+ id->insert(100);
+ id->insert(200);
+ id->insert(300);
+ id->insert(400);
+ id->insert(500);
+ id->insert(600);
+ id->insert(700);
+
+ auto name = STRING()->createColumn();
+ name->insert("Joe");
+ name->insert("Marry");
+ name->insert("Mike");
+ name->insert("Fred");
+ name->insert("Albert");
+ name->insert("Michelle");
+ name->insert("Dan");
+
+ auto age = makeNullable(INT())->createColumn();
+ Field null_field;
+ age->insert(30);
+ age->insert(null_field);
+ age->insert(18);
+ age->insert(50);
+ age->insert(null_field);
+ age->insert(30);
+ age->insert(50);
+
+
+ MutableColumns x;
+ x.push_back(std::move(id));
+ x.push_back(std::move(name));
+ x.push_back(std::move(age));
+ return {std::move(x), 7};
+}
+}
+
+TEST(MergeTree, ClickhouseMergeTree)
+{
+ ThreadStatus thread_status;
+
+ const auto context = DB::Context::createCopy(QueryContext::globalContext());
+ context->setPath("./");
+ const Settings & settings = context->getSettingsRef();
+
+ const std::string query
+ = R"(create table if not exists person (id Int32, Name String, Age Nullable(Int32)) engine = MergeTree() ORDER BY id)";
+
+ const char * begin = query.data();
+ const char * end = query.data() + query.size();
+ ParserQuery parser(end, settings.allow_settings_after_format_in_insert);
+
+ ASTPtr ast = parseQuery(parser, begin, end, "", settings.max_query_size, settings.max_parser_depth, settings.max_parser_backtracks);
+
+ EXPECT_TRUE(ast->as());
+ auto & create = ast->as();
+
+ ColumnsDescription column_descriptions
+ = InterpreterCreateQuery::getColumnsDescription(*create.columns_list->columns, context, LoadingStrictnessLevel::CREATE);
+
+ StorageInMemoryMetadata metadata;
+ metadata.setColumns(column_descriptions);
+ metadata.setComment("args.comment");
+ ASTPtr partition_by_key;
+ metadata.partition_key = KeyDescription::getKeyFromAST(partition_by_key, metadata.columns, context);
+
+ MergeTreeData::MergingParams merging_params;
+ merging_params.mode = MergeTreeData::MergingParams::Ordinary;
+
+
+ /// This merging param maybe used as part of sorting key
+ std::optional merging_param_key_arg;
+ /// Get sorting key from engine arguments.
+ ///
+ /// NOTE: store merging_param_key_arg as additional key column. We do it
+ /// before storage creation. After that storage will just copy this
+ /// column if sorting key will be changed.
+ metadata.sorting_key
+ = KeyDescription::getSortingKeyFromAST(create.storage->order_by->ptr(), metadata.columns, context, merging_param_key_arg);
+
+ std::unique_ptr storage_settings = std::make_unique(context->getMergeTreeSettings());
+
+ UUID uuid;
+ UUIDHelpers::getHighBytes(uuid) = 0xffffffffffff0fffull | 0x0000000000004000ull;
+ UUIDHelpers::getLowBytes(uuid) = 0x3fffffffffffffffull | 0x8000000000000000ull;
+
+ SCOPE_EXIT({ do_remove("WritePipeline_MergeTree"); });
+
+ auto merge_tree = std::make_shared(
+ StorageID("", "", uuid),
+ "WritePipeline_MergeTree",
+ metadata,
+ LoadingStrictnessLevel::CREATE,
+ context,
+ "",
+ merging_params,
+ std::move(storage_settings));
+
+ Block header{{INT(), "id"}, {STRING(), "Name"}, {makeNullable(INT()), "Age"}};
+ DB::Squashing squashing(header, settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
+ squashing.add(person_chunk());
+ auto x = Squashing::squash(squashing.flush());
+ x.getChunkInfos().add(std::make_shared());
+
+ ASSERT_EQ(7, x.getNumRows());
+ ASSERT_EQ(3, x.getNumColumns());
+
+
+ auto metadata_snapshot = std::make_shared(metadata);
+ ASTPtr none;
+ auto sink = std::static_pointer_cast(merge_tree->write(none, metadata_snapshot, context, false));
+
+ sink->consume(x);
+ sink->onFinish();
+}
+
+INCBIN(_1_mergetree_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_mergetree.json");
+INCBIN(_1_mergetree_hdfs_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_mergetree_hdfs.json");
+INCBIN(_1_read_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_plan.json");
+
+TEST(MergeTree, SparkMergeTree)
+{
+ ThreadStatus thread_status;
+
+ const auto context = DB::Context::createCopy(QueryContext::globalContext());
+ context->setPath("./");
+ const Settings & settings = context->getSettingsRef();
+
+ const auto extension_table = local_engine::JsonStringToMessage(EMBEDDED_PLAN(_1_mergetree_));
+ MergeTreeTableInstance merge_tree_table(extension_table);
+
+ EXPECT_EQ(merge_tree_table.database, "default");
+ EXPECT_EQ(merge_tree_table.table, "lineitem_mergetree");
+ EXPECT_EQ(merge_tree_table.relative_path, "lineitem_mergetree");
+ EXPECT_EQ(merge_tree_table.table_configs.storage_policy, "default");
+
+ do_remove(merge_tree_table.relative_path);
+
+ const auto dest_storage = merge_tree_table.getStorage(QueryContext::globalMutableContext());
+ EXPECT_TRUE(dest_storage);
+ EXPECT_FALSE(dest_storage->getStoragePolicy()->getAnyDisk()->isRemote());
+ DB::StorageMetadataPtr metadata_snapshot = dest_storage->getInMemoryMetadataPtr();
+ Block header = metadata_snapshot->getSampleBlock();
+
+ constexpr std::string_view split_template
+ = R"({"items":[{"uriFile":"{replace_local_files}","length":"19230111","parquet":{},"schema":{},"metadataColumns":[{}],"properties":{"fileSize":"19230111","modificationTime":"1722330598029"}}]})";
+ constexpr std::string_view file{GLUTEN_SOURCE_TPCH_DIR("lineitem/part-00000-d08071cb-0dfa-42dc-9198-83cb334ccda3-c000.snappy.parquet")};
+
+ SparkMergeTreeWritePartitionSettings gm_write_settings{
+ .part_name_prefix{"this_is_prefix"},
+ };
+ gm_write_settings.set(context);
+
+ auto writer = local_engine::SparkMergeTreeWriter::create(merge_tree_table, gm_write_settings, context);
+ SparkMergeTreeWriter & spark_merge_tree_writer = *writer;
+
+ auto [_, local_executor] = test::create_plan_and_executor(EMBEDDED_PLAN(_1_read_), split_template, file);
+ EXPECT_TRUE(local_executor->hasNext());
+
+ do
+ {
+ spark_merge_tree_writer.write(*local_executor->nextColumnar());
+ } while (local_executor->hasNext());
+
+ auto json_info = spark_merge_tree_writer.close();
+ std::cerr << json_info << std::endl;
+
+ ///
+ {
+ const auto extension_table_hdfs
+ = local_engine::JsonStringToMessage(EMBEDDED_PLAN(_1_mergetree_hdfs_));
+ MergeTreeTableInstance merge_tree_table_hdfs(extension_table_hdfs);
+ EXPECT_EQ(merge_tree_table_hdfs.database, "default");
+ EXPECT_EQ(merge_tree_table_hdfs.table, "lineitem_mergetree_hdfs");
+ EXPECT_EQ(merge_tree_table_hdfs.relative_path, "3.5/test/lineitem_mergetree_hdfs");
+ EXPECT_EQ(merge_tree_table_hdfs.table_configs.storage_policy, "__hdfs_main");
+
+ const auto dest_storage_hdfs = merge_tree_table_hdfs.getStorage(QueryContext::globalMutableContext());
+ EXPECT_TRUE(dest_storage_hdfs);
+ EXPECT_TRUE(dest_storage_hdfs->getStoragePolicy()->getAnyDisk()->isRemote());
+ }
+}
+
+INCBIN(_2_mergetree_plan_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/2_one_pipeline.json");
+
+TEST(MergeTree, Pipeline)
+{
+ GTEST_SKIP();
+ const auto context = DB::Context::createCopy(QueryContext::globalContext());
+ GlutenWriteSettings settings{
+ .task_write_tmp_dir = "file:///tmp/lineitem_mergetree",
+ .task_write_filename = "part-00000-a09f9d59-2dc6-43bc-a485-dcab8384b2ff.c000.mergetree",
+ };
+ settings.set(context);
+
+ constexpr std::string_view split_template
+ = R"({"items":[{"uriFile":"{replace_local_files}","length":"19230111","parquet":{},"schema":{},"metadataColumns":[{}],"properties":{"fileSize":"19230111","modificationTime":"1722330598029"}}]})";
+ auto [_, local_executor] = test::create_plan_and_executor(
+ EMBEDDED_PLAN(_2_mergetree_plan_),
+ split_template,
+ GLUTEN_SOURCE_TPCH_DIR("lineitem/part-00000-d08071cb-0dfa-42dc-9198-83cb334ccda3-c000.snappy.parquet"),
+ context);
+ EXPECT_TRUE(local_executor->hasNext());
+ const Block & x = *local_executor->nextColumnar();
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/tests/json/mergetree/2_one_pipeline.json b/cpp-ch/local-engine/tests/json/mergetree/2_one_pipeline.json
new file mode 100644
index 0000000000000..fbc593267464d
--- /dev/null
+++ b/cpp-ch/local-engine/tests/json/mergetree/2_one_pipeline.json
@@ -0,0 +1,368 @@
+{
+ "relations": [
+ {
+ "root": {
+ "input": {
+ "write": {
+ "namedTable": {
+ "advancedExtension": {
+ "optimization": {
+ "@type": "type.googleapis.com/google.protobuf.StringValue",
+ "value": "WriteParameters:isSnappy=1;format=mergetree\n"
+ },
+ "enhancement": {
+ "@type": "type.googleapis.com/substrait.Type",
+ "struct": {
+ "types": [
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "date": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "date": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "date": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ }
+ ],
+ "nullability": "NULLABILITY_REQUIRED"
+ }
+ }
+ }
+ },
+ "tableSchema": {
+ "names": [
+ "l_orderkey",
+ "l_partkey",
+ "l_suppkey",
+ "l_linenumber",
+ "l_quantity",
+ "l_extendedprice",
+ "l_discount",
+ "l_tax",
+ "l_returnflag",
+ "l_linestatus",
+ "l_shipdate",
+ "l_commitdate",
+ "l_receiptdate",
+ "l_shipinstruct",
+ "l_shipmode",
+ "l_comment"
+ ],
+ "struct": {
+ "types": [
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "date": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "date": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "date": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ }
+ ]
+ },
+ "columnTypes": [
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL"
+ ]
+ },
+ "input": {
+ "read": {
+ "common": {
+ "direct": {}
+ },
+ "baseSchema": {
+ "names": [
+ "l_orderkey",
+ "l_partkey",
+ "l_suppkey",
+ "l_linenumber",
+ "l_quantity",
+ "l_extendedprice",
+ "l_discount",
+ "l_tax",
+ "l_returnflag",
+ "l_linestatus",
+ "l_shipdate",
+ "l_commitdate",
+ "l_receiptdate",
+ "l_shipinstruct",
+ "l_shipmode",
+ "l_comment"
+ ],
+ "struct": {
+ "types": [
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "date": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "date": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "date": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ }
+ ]
+ },
+ "columnTypes": [
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL"
+ ]
+ },
+ "advancedExtension": {
+ "optimization": {
+ "@type": "type.googleapis.com/google.protobuf.StringValue",
+ "value": "isMergeTree=0\n"
+ }
+ }
+ }
+ }
+ }
+ },
+ "outputSchema": {
+ "nullability": "NULLABILITY_REQUIRED"
+ }
+ }
+ }
+ ]
+}
\ No newline at end of file