diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index 7bd6e791905a0..e90a3821a41ba 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -323,9 +323,11 @@ class CollectMetricIterator( private var outputRowCount = 0L private var outputVectorCount = 0L private var metricsUpdated = false + // Whether the stage is executed completely using ClickHouse pipeline. private var wholeStagePipeline = true override def hasNext: Boolean = { + // The hasNext call is triggered only when there is a fallback. wholeStagePipeline = false nativeIterator.hasNext } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala index ec7d5baa50901..53f85d84672b9 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala @@ -190,26 +190,31 @@ class CHColumnarShuffleWriter[K, V]( } object CHColumnarShuffleWriter { + private val TOTAL_OUTPUT_ROWS = "total_output_rows" + + private val TOTAL_OUTPUT_BATCHES = "total_output_batches" + + // Pass the statistics of the last operator before shuffle to CollectMetricIterator. def setOutputMetrics(splitResult: CHSplitResult): Unit = { TaskContext .get() .getLocalProperties - .setProperty("total_output_rows", splitResult.getTotalRows.toString) + .setProperty(TOTAL_OUTPUT_ROWS, splitResult.getTotalRows.toString) TaskContext .get() .getLocalProperties - .setProperty("total_output_batches", splitResult.getTotalBatches.toString) + .setProperty(TOTAL_OUTPUT_BATCHES, splitResult.getTotalBatches.toString) } def getTotalOutputRows(): Long = { - val output_rows = TaskContext.get().getLocalProperty("total_output_rows") + val output_rows = TaskContext.get().getLocalProperty(TOTAL_OUTPUT_ROWS) var output_rows_value = 0L if (output_rows != null && output_rows.nonEmpty) output_rows_value = output_rows.toLong output_rows_value } def getTotalOutputBatches(): Long = { - val output_batches = TaskContext.get().getLocalProperty("total_output_batches") + val output_batches = TaskContext.get().getLocalProperty(TOTAL_OUTPUT_BATCHES) var output_batches_value = 0L if (output_batches != null) output_batches_value = output_batches.toLong output_batches_value diff --git a/cpp-ch/local-engine/Parser/LocalExecutor.cpp b/cpp-ch/local-engine/Parser/LocalExecutor.cpp index 17eee75710080..3ab1b71ad5cdd 100644 --- a/cpp-ch/local-engine/Parser/LocalExecutor.cpp +++ b/cpp-ch/local-engine/Parser/LocalExecutor.cpp @@ -21,8 +21,7 @@ #include #include #include - -#include "SerializedPlanParser.h" +#include using namespace DB; namespace local_engine @@ -169,4 +168,4 @@ std::string LocalExecutor::dumpPipeline() const DB::printPipeline(processors, out); return out.str(); } -} \ No newline at end of file +} diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp index 3c2984e069f4d..b4f2b326c830b 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp @@ -1321,18 +1321,18 @@ std::unique_ptr SerializedPlanParser::createExecutor(DB::QueryPla const Settings & settings = context->getSettingsRef(); auto builder = buildQueryPipeline(*query_plan); - /// + assert(s_plan.relations_size() == 1); const substrait::PlanRel & root_rel = s_plan.relations().at(0); assert(root_rel.has_root()); if (root_rel.root().input().has_write()) addSinkTransfrom(context, root_rel.root().input().write(), builder); - /// + auto * logger = &Poco::Logger::get("SerializedPlanParser"); LOG_INFO(logger, "build pipeline {} ms", stopwatch.elapsedMicroseconds() / 1000.0); LOG_DEBUG( logger, "clickhouse plan [optimization={}]:\n{}", settings.query_plan_enable_optimizations, PlanUtil::explainPlan(*query_plan)); - // LOG_DEBUG(logger, "clickhouse pipeline:\n{}", QueryPipelineUtil::explainPipeline(pipeline)); + LOG_DEBUG(logger, "clickhouse pipeline:\n{}", QueryPipelineUtil::explainPipeline(pipeline)); auto config = ExecutorConfig::loadFromContext(context); return std::make_unique(std::move(query_plan), std::move(builder), config.dump_pipeline); diff --git a/cpp-ch/local-engine/Shuffle/SparkExchangeSink.cpp b/cpp-ch/local-engine/Shuffle/SparkExchangeSink.cpp index 5a27807c8847c..ebe44248722c2 100644 --- a/cpp-ch/local-engine/Shuffle/SparkExchangeSink.cpp +++ b/cpp-ch/local-engine/Shuffle/SparkExchangeSink.cpp @@ -386,4 +386,4 @@ std::unordered_map SparkExchangeManager::partition {"single", createSingleSelectorBuilder}, {"range", createRangeSelectorBuilder}, }; -} \ No newline at end of file +} diff --git a/cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp b/cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp index 254ee4f79186a..1e23ee53b8f28 100644 --- a/cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp +++ b/cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp @@ -21,7 +21,6 @@ #include #include #include -#include namespace DB { diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index dc8135204d97e..5dd540c47e386 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -56,7 +56,6 @@ #include #include #include -#include #ifdef __cplusplus namespace DB diff --git a/cpp-ch/local-engine/tests/benchmark_local_engine.cpp b/cpp-ch/local-engine/tests/benchmark_local_engine.cpp index 839f3ea529c1a..7f0243ee5dd07 100644 --- a/cpp-ch/local-engine/tests/benchmark_local_engine.cpp +++ b/cpp-ch/local-engine/tests/benchmark_local_engine.cpp @@ -49,6 +49,7 @@ #include #include #include +#include #include "testConfig.h" #if defined(__SSE2__)