From 5d89e566c17806bae0fd8e8644c09bb08884d155 Mon Sep 17 00:00:00 2001 From: liuneng1994 Date: Wed, 7 Aug 2024 15:40:11 +0800 Subject: [PATCH] fix lost data in shuffle reader --- .../vectorized/CHColumnarBatchSerializer.scala | 10 +++++----- .../spark/shuffle/CHColumnarShuffleWriter.scala | 5 +---- cpp-ch/local-engine/Parser/SerializedPlanParser.cpp | 5 ++++- cpp-ch/local-engine/Shuffle/SparkExchangeSink.cpp | 12 +++++++++--- cpp-ch/local-engine/Shuffle/SparkExchangeSink.h | 7 +++++-- cpp-ch/local-engine/local_engine_jni.cpp | 2 +- .../shuffle/CHCelebornColumnarBatchSerializer.scala | 5 +++-- 7 files changed, 28 insertions(+), 18 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala index fa6f8addf163a..370d93d7e7fbd 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala @@ -63,12 +63,13 @@ private class CHColumnarBatchSerializerInstance( compressionCodec, GlutenConfig.getConf.columnarShuffleCodecBackend.orNull) + private val useColumnarShuffle: Boolean = GlutenConfig.getConf.isUseColumnarShuffleManager + override def deserializeStream(in: InputStream): DeserializationStream = { + // Don't use GlutenConfig in this method. It will execute in non task Thread. new DeserializationStream { - private val reader: CHStreamReader = new CHStreamReader( - in, - GlutenConfig.getConf.isUseColumnarShuffleManager, - CHBackendSettings.useCustomizedShuffleCodec) + private val reader: CHStreamReader = + new CHStreamReader(in, useColumnarShuffle, CHBackendSettings.useCustomizedShuffleCodec) private var cb: ColumnarBatch = _ private var numBatchesTotal: Long = _ @@ -97,7 +98,6 @@ private class CHColumnarBatchSerializerInstance( var nativeBlock = reader.next() while (nativeBlock.numRows() == 0) { if (nativeBlock.numColumns() == 0) { - nativeBlock.close() this.close() throw new EOFException } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala index 977bc35688c29..3f25762b0947a 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala @@ -74,8 +74,6 @@ class CHColumnarShuffleWriter[K, V]( private var rawPartitionLengths: Array[Long] = _ - private var firstRecordBatch: Boolean = true - @throws[IOException] override def write(records: Iterator[Product2[K, V]]): Unit = { CHThreadGroup.registerNewThreadGroup() @@ -108,7 +106,6 @@ class CHColumnarShuffleWriter[K, V]( if (splitResult.getTotalRows > 0) { dep.metrics("numInputRows").add(splitResult.getTotalRows) dep.metrics("inputBatches").add(splitResult.getTotalBatches) - writeMetrics.incRecordsWritten(splitResult.getTotalRows) dep.metrics("splitTime").add(splitResult.getSplitTime) dep.metrics("IOTime").add(splitResult.getDiskWriteTime) dep.metrics("serializeTime").add(splitResult.getSerializationTime) @@ -118,9 +115,9 @@ class CHColumnarShuffleWriter[K, V]( dep.metrics("bytesSpilled").add(splitResult.getTotalBytesSpilled) dep.metrics("dataSize").add(splitResult.getTotalBytesWritten) dep.metrics("shuffleWallTime").add(splitResult.getWallTime) + writeMetrics.incRecordsWritten(splitResult.getTotalRows) writeMetrics.incBytesWritten(splitResult.getTotalBytesWritten) writeMetrics.incWriteTime(splitResult.getTotalWriteTime + splitResult.getTotalSpillTime) - partitionLengths = splitResult.getPartitionLengths rawPartitionLengths = splitResult.getRawPartitionLengths diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp index 87c7943ac414e..3537083d9342e 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp @@ -86,6 +86,7 @@ #include #include #include +#include #include #include #include @@ -1660,13 +1661,15 @@ void LocalExecutor::cancel() { if (executor) executor->cancel(); + if (push_executor) + push_executor->cancel(); } void LocalExecutor::execute() { chassert(query_pipeline_builder); push_executor = query_pipeline_builder->execute(); - push_executor->execute(1, false); + push_executor->execute(local_engine::QueryContextManager::instance().currentQueryContext()->getSettingsRef().max_threads, false); } Block & LocalExecutor::getHeader() diff --git a/cpp-ch/local-engine/Shuffle/SparkExchangeSink.cpp b/cpp-ch/local-engine/Shuffle/SparkExchangeSink.cpp index ee476b2bda8ba..150fbc740f120 100644 --- a/cpp-ch/local-engine/Shuffle/SparkExchangeSink.cpp +++ b/cpp-ch/local-engine/Shuffle/SparkExchangeSink.cpp @@ -24,6 +24,7 @@ #include #include #include +#include namespace DB @@ -48,6 +49,8 @@ void SparkExchangeSink::consume(Chunk chunk) auto aggregate_info = chunk.getChunkInfos().get(); auto intput = inputs.front().getHeader().cloneWithColumns(chunk.getColumns()); Stopwatch split_time_watch; + if (sort_writer) + intput = convertAggregateStateInBlock(intput); split_result.total_split_time += split_time_watch.elapsedNanoseconds(); Stopwatch compute_pid_time_watch; @@ -151,7 +154,7 @@ void SparkExechangeManager::initSinks(size_t num) for (size_t i = 0; i < num; ++i) { partition_writers[i] = createPartitionWriter(options, use_sort_shuffle, std::move(celeborn_client)); - sinks[i] = std::make_shared(input_header, partitioner_creator(options), partition_writers[i], output_columns_indicies); + sinks[i] = std::make_shared(input_header, partitioner_creator(options), partition_writers[i], output_columns_indicies, use_sort_shuffle); } } @@ -166,6 +169,7 @@ void SparkExechangeManager::setSinksToPipeline(DB::QueryPipelineBuilder & pipeli } return std::make_shared(header); }; + chassert(pipeline.getNumStreams() == sinks.size()); pipeline.setSinks(getter); } @@ -210,6 +214,7 @@ void SparkExechangeManager::finish() { extra_datas.emplace_back(local_partition_writer->getExtraData()); } + } if (!extra_datas.empty()) chassert(extra_datas.size() == partition_writers.size()); @@ -291,7 +296,7 @@ std::vector SparkExechangeManager::mergeSpills(DB::WriteBuffer & data_fi { continue; } - buffer.reserve(size); + buffer.resize(size); auto count = spill_inputs[i]->readBigAt(buffer.data(), size, offset, nullptr); chassert(count == size); @@ -307,7 +312,8 @@ std::vector SparkExechangeManager::mergeSpills(DB::WriteBuffer & data_fi if (!extra_data.partition_block_buffer.empty() && !extra_data.partition_block_buffer[partition_id]->empty()) { Block block = extra_data.partition_block_buffer[partition_id]->releaseColumns(); - extra_data.partition_buffer[partition_id]->addBlock(std::move(block)); + if (block.rows() > 0) + extra_data.partition_buffer[partition_id]->addBlock(std::move(block)); } if (!extra_data.partition_buffer.empty()) { diff --git a/cpp-ch/local-engine/Shuffle/SparkExchangeSink.h b/cpp-ch/local-engine/Shuffle/SparkExchangeSink.h index 971846816bac2..69215fac09592 100644 --- a/cpp-ch/local-engine/Shuffle/SparkExchangeSink.h +++ b/cpp-ch/local-engine/Shuffle/SparkExchangeSink.h @@ -35,12 +35,14 @@ class PartitionWriter; class SparkExchangeSink : public DB::ISink { public: - SparkExchangeSink(const DB::Block& header, std::unique_ptr partitioner_, std::shared_ptr partition_writer_, - const std::vector & output_columns_indicies_) + SparkExchangeSink(const DB::Block& header, std::unique_ptr partitioner_, + std::shared_ptr partition_writer_, + const std::vector& output_columns_indicies_, bool sort_writer_) : DB::ISink(header) , partitioner(std::move(partitioner_)) , partition_writer(partition_writer_) , output_columns_indicies(output_columns_indicies_) + , sort_writer(sort_writer_) { initOutputHeader(header); partition_writer->initialize(&split_result, output_header); @@ -72,6 +74,7 @@ class SparkExchangeSink : public DB::ISink std::unique_ptr partitioner; std::shared_ptr partition_writer; std::vector output_columns_indicies; + bool sort_writer = false; SplitResult split_result; }; diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index f9e7023e7c115..57493f5bfd6bb 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -595,7 +595,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na auto * current_executor = local_engine::LocalExecutor::getCurrentExecutor(); chassert(current_executor); local_engine::SplitterHolder * splitter = new local_engine::SplitterHolder{.exechange_manager = std::make_unique(current_executor->getHeader(), name, options)}; - splitter->exechange_manager->initSinks(local_engine::QueryContextManager::instance().currentQueryContext()->getSettingsRef().max_threads); + splitter->exechange_manager->initSinks(1); current_executor->setSinks([&](auto & pipeline_builder) { splitter->exechange_manager->setSinksToPipeline(pipeline_builder);}); // execute pipeline current_executor->execute(); diff --git a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala index 5072ce6a1a2eb..360941cb04d15 100644 --- a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala +++ b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala @@ -75,6 +75,8 @@ private class CHCelebornColumnarBatchSerializerInstance( } private var cb: ColumnarBatch = _ private val isEmptyStream: Boolean = in.equals(CelebornInputStream.empty()) + private val forceCompress: Boolean = + GlutenConfig.getConf.isUseColumnarShuffleManager || GlutenConfig.getConf.isUseCelebornShuffleManager private var numBatchesTotal: Long = _ private var numRowsTotal: Long = _ @@ -177,8 +179,7 @@ private class CHCelebornColumnarBatchSerializerInstance( if (reader == null) { reader = new CHStreamReader( original_in, - GlutenConfig.getConf.isUseColumnarShuffleManager - || GlutenConfig.getConf.isUseCelebornShuffleManager, + forceCompress, CHBackendSettings.useCustomizedShuffleCodec ) }