Skip to content

Commit

Permalink
support exchange sink processor
Browse files Browse the repository at this point in the history
  • Loading branch information
liuneng1994 committed Aug 12, 2024
1 parent 077a48a commit 8998c4e
Show file tree
Hide file tree
Showing 16 changed files with 766 additions and 630 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@ public native long nativeMakeForRSS(
Object pusher,
boolean forceMemorySort);

public native void split(long splitterId, long block);

public native CHSplitResult stop(long splitterId) throws IOException;

public native void close(long splitterId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ public class CHSplitResult extends SplitResult {
private final long splitTime;
private final long diskWriteTime;
private final long serializationTime;
private final long totalRows;
private final long totalBatches;
private final long wallTime;

public CHSplitResult(long totalComputePidTime,
long totalWriteTime,
Expand All @@ -31,7 +34,10 @@ public CHSplitResult(long totalComputePidTime,
long[] rawPartitionLengths,
long splitTime,
long diskWriteTime,
long serializationTime) {
long serializationTime,
long totalRows,
long totalBatches,
long wallTime) {
super(totalComputePidTime,
totalWriteTime,
totalEvictTime,
Expand All @@ -43,6 +49,9 @@ public CHSplitResult(long totalComputePidTime,
this.splitTime = splitTime;
this.diskWriteTime = diskWriteTime;
this.serializationTime = serializationTime;
this.totalRows = totalRows;
this.totalBatches = totalBatches;
this.wallTime = wallTime;
}

public long getSplitTime() {
Expand All @@ -56,4 +65,16 @@ public long getDiskWriteTime() {
public long getSerializationTime() {
return serializationTime;
}

public long getTotalRows() {
return totalRows;
}

public long getTotalBatches() {
return totalBatches;
}

public long getWallTime() {
return wallTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.gluten.vectorized._
import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.{SparkDirectoryUtil, Utils}

import java.io.IOException
Expand Down Expand Up @@ -85,17 +84,7 @@ class CHColumnarShuffleWriter[K, V](

private def internalCHWrite(records: Iterator[Product2[K, V]]): Unit = {
val splitterJniWrapper: CHShuffleSplitterJniWrapper = jniWrapper
if (!records.hasNext) {
partitionLengths = new Array[Long](dep.partitioner.numPartitions)
shuffleBlockResolver.writeMetadataFileAndCommit(
dep.shuffleId,
mapId,
partitionLengths,
Array[Long](),
null)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
return
}

val dataTmp = Utils.tempFileWith(shuffleBlockResolver.getDataFile(dep.shuffleId, mapId))
if (nativeSplitter == 0) {
nativeSplitter = splitterJniWrapper.make(
Expand All @@ -114,50 +103,51 @@ class CHColumnarShuffleWriter[K, V](
forceMemorySortShuffle
)
}
while (records.hasNext) {
val cb = records.next()._2.asInstanceOf[ColumnarBatch]
if (cb.numRows == 0 || cb.numCols == 0) {
logInfo(s"Skip ColumnarBatch of ${cb.numRows} rows, ${cb.numCols} cols")
} else {
firstRecordBatch = false
val col = cb.column(0).asInstanceOf[CHColumnVector]
val block = col.getBlockAddress
splitterJniWrapper
.split(nativeSplitter, block)
dep.metrics("numInputRows").add(cb.numRows)
dep.metrics("inputBatches").add(1)
writeMetrics.incRecordsWritten(cb.numRows)
}
}
splitResult = splitterJniWrapper.stop(nativeSplitter)

dep.metrics("splitTime").add(splitResult.getSplitTime)
dep.metrics("IOTime").add(splitResult.getDiskWriteTime)
dep.metrics("serializeTime").add(splitResult.getSerializationTime)
dep.metrics("spillTime").add(splitResult.getTotalSpillTime)
dep.metrics("compressTime").add(splitResult.getTotalCompressTime)
dep.metrics("computePidTime").add(splitResult.getTotalComputePidTime)
dep.metrics("bytesSpilled").add(splitResult.getTotalBytesSpilled)
dep.metrics("dataSize").add(splitResult.getTotalBytesWritten)
writeMetrics.incBytesWritten(splitResult.getTotalBytesWritten)
writeMetrics.incWriteTime(splitResult.getTotalWriteTime + splitResult.getTotalSpillTime)

partitionLengths = splitResult.getPartitionLengths
rawPartitionLengths = splitResult.getRawPartitionLengths
try {
splitResult = splitterJniWrapper.stop(nativeSplitter)
if (splitResult.getTotalRows > 0) {
dep.metrics("numInputRows").add(splitResult.getTotalRows)
dep.metrics("inputBatches").add(splitResult.getTotalBatches)
writeMetrics.incRecordsWritten(splitResult.getTotalRows)
dep.metrics("splitTime").add(splitResult.getSplitTime)
dep.metrics("IOTime").add(splitResult.getDiskWriteTime)
dep.metrics("serializeTime").add(splitResult.getSerializationTime)
dep.metrics("spillTime").add(splitResult.getTotalSpillTime)
dep.metrics("compressTime").add(splitResult.getTotalCompressTime)
dep.metrics("computePidTime").add(splitResult.getTotalComputePidTime)
dep.metrics("bytesSpilled").add(splitResult.getTotalBytesSpilled)
dep.metrics("dataSize").add(splitResult.getTotalBytesWritten)
dep.metrics("shuffleWallTime").add(splitResult.getWallTime)
writeMetrics.incBytesWritten(splitResult.getTotalBytesWritten)
writeMetrics.incWriteTime(splitResult.getTotalWriteTime + splitResult.getTotalSpillTime)

partitionLengths = splitResult.getPartitionLengths
rawPartitionLengths = splitResult.getRawPartitionLengths

try {
shuffleBlockResolver.writeMetadataFileAndCommit(
dep.shuffleId,
mapId,
partitionLengths,
Array[Long](),
dataTmp)
} finally {
if (dataTmp.exists() && !dataTmp.delete()) {
logError(s"Error while deleting temp file ${dataTmp.getAbsolutePath}")
}
}
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
} else {
partitionLengths = new Array[Long](dep.partitioner.numPartitions)
shuffleBlockResolver.writeMetadataFileAndCommit(
dep.shuffleId,
mapId,
partitionLengths,
Array[Long](),
dataTmp)
} finally {
if (dataTmp.exists() && !dataTmp.delete()) {
logError(s"Error while deleting temp file ${dataTmp.getAbsolutePath}")
}
null)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
}

mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
closeCHSplitter()
}

override def stop(success: Boolean): Option[MapStatus] = {
Expand All @@ -172,15 +162,15 @@ class CHColumnarShuffleWriter[K, V](
None
}
} finally {
if (nativeSplitter != 0) {
closeCHSplitter()
nativeSplitter = 0
}
closeCHSplitter()
}
}

private def closeCHSplitter(): Unit = {
jniWrapper.close(nativeSplitter)
if (nativeSplitter != 0) {
jniWrapper.close(nativeSplitter)
nativeSplitter = 0
}
}

// VisibleForTesting
Expand Down
7 changes: 2 additions & 5 deletions cpp-ch/local-engine/Common/QueryContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,7 @@ void QueryContextManager::finalizeQuery(int64_t id)
{
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not found.");
}
std::shared_ptr<QueryContext> context;
{
context = query_map.get(id);
}
std::shared_ptr<QueryContext> context = query_map.get(id);
auto query_context = context->thread_status->getQueryContext();
if (!query_context)
{
Expand All @@ -138,7 +135,7 @@ void QueryContextManager::finalizeQuery(int64_t id)
context->thread_status->finalizePerformanceCounters();
LOG_INFO(logger, "Task finished, peak memory usage: {} bytes", currentPeakMemory(id));

if (currentThreadGroupMemoryUsage() > 1_MiB)
if (currentThreadGroupMemoryUsage() > 2_MiB)
{
LOG_WARNING(logger, "{} bytes memory didn't release, There may be a memory leak!", currentThreadGroupMemoryUsage());
}
Expand Down
37 changes: 28 additions & 9 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@
#include <Common/MergeTreeTool.h>
#include <Common/logger_useful.h>
#include <Common/typeid_cast.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>

namespace DB
{
Expand Down Expand Up @@ -1321,16 +1323,14 @@ std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(DB::QueryPla
if (root_rel.root().input().has_write())
addSinkTransfrom(context, root_rel.root().input().write(), builder);
///
QueryPipeline pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));

auto * logger = &Poco::Logger::get("SerializedPlanParser");
LOG_INFO(logger, "build pipeline {} ms", stopwatch.elapsedMicroseconds() / 1000.0);
LOG_DEBUG(
logger, "clickhouse plan [optimization={}]:\n{}", settings.query_plan_enable_optimizations, PlanUtil::explainPlan(*query_plan));
LOG_DEBUG(logger, "clickhouse pipeline:\n{}", QueryPipelineUtil::explainPipeline(pipeline));
// LOG_DEBUG(logger, "clickhouse pipeline:\n{}", QueryPipelineUtil::explainPipeline(pipeline));

auto config = ExecutorConfig::loadFromContext(context);
return std::make_unique<LocalExecutor>(std::move(query_plan), std::move(pipeline), config.dump_pipeline);
return std::make_unique<LocalExecutor>(std::move(query_plan), std::move(builder), config.dump_pipeline);
}

SerializedPlanParser::SerializedPlanParser(const ContextPtr & context_) : context(context_)
Expand Down Expand Up @@ -1597,8 +1597,19 @@ std::unique_ptr<SparkRowInfo> LocalExecutor::writeBlockToSparkRow(const Block &
return ch_column_to_spark_row->convertCHColumnToSparkRow(block);
}

void LocalExecutor::initPullingPipelineExecutor()
{
if (!executor)
{
query_pipeline = QueryPipelineBuilder::getPipeline(std::move(*query_pipeline_builder));
query_pipeline.setNumThreads(1);
executor = std::make_unique<PullingAsyncPipelineExecutor>(query_pipeline);
}
}

bool LocalExecutor::hasNext()
{
initPullingPipelineExecutor();
size_t columns = currentBlock().columns();
if (columns == 0 || isConsumed())
{
Expand Down Expand Up @@ -1651,21 +1662,29 @@ void LocalExecutor::cancel()
executor->cancel();
}

void LocalExecutor::execute()
{
chassert(query_pipeline_builder);
push_executor = query_pipeline_builder->execute();
push_executor->execute(1, false);
}

Block & LocalExecutor::getHeader()
{
return header;
}

LocalExecutor::LocalExecutor(QueryPlanPtr query_plan, QueryPipeline && pipeline, bool dump_pipeline_)
: query_pipeline(std::move(pipeline))
, executor(std::make_unique<PullingPipelineExecutor>(query_pipeline))
LocalExecutor::LocalExecutor(QueryPlanPtr query_plan, QueryPipelineBuilderPtr pipeline_builder, bool dump_pipeline_)
: query_pipeline_builder(std::move(pipeline_builder))
, header(query_plan->getCurrentDataStream().header.cloneEmpty())
, dump_pipeline(dump_pipeline_)
, ch_column_to_spark_row(std::make_unique<CHColumnToSparkRow>())
, current_query_plan(std::move(query_plan))
{
chassert(!current_executor);
current_executor = this;
}

thread_local LocalExecutor * LocalExecutor::current_executor = nullptr;
std::string LocalExecutor::dumpPipeline() const
{
const auto & processors = query_pipeline.getProcessors();
Expand All @@ -1674,7 +1693,7 @@ std::string LocalExecutor::dumpPipeline() const
WriteBufferFromOwnString buffer;
auto data_stats = processor->getProcessorDataStats();
buffer << "(";
buffer << "\nexcution time: " << processor->getElapsedNs() / 1000U << " us.";
buffer << "\nexecution time: " << processor->getElapsedNs() / 1000U << " us.";
buffer << "\ninput wait time: " << processor->getInputWaitElapsedNs() / 1000U << " us.";
buffer << "\noutput wait time: " << processor->getOutputWaitElapsedNs() / 1000U << " us.";
buffer << "\ninput rows: " << data_stats.input_rows;
Expand Down
20 changes: 15 additions & 5 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
#include <Interpreters/Aggregator.h>
#include <Parser/CHColumnToSparkRow.h>
#include <Parser/RelMetric.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Processors/QueryPlan/ISourceStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Storages/CustomStorageMergeTree.h>
#include <Storages/SourceFromJavaIter.h>
#include <base/types.h>
#include <substrait/plan.pb.h>
#include <Common/BlockIterator.h>
#include <QueryPipeline/QueryPipelineBuilder.h>

namespace local_engine
{
Expand Down Expand Up @@ -210,7 +211,10 @@ struct SparkBuffer
class LocalExecutor : public BlockIterator
{
public:
LocalExecutor(QueryPlanPtr query_plan, QueryPipeline && pipeline, bool dump_pipeline_ = false);
static thread_local LocalExecutor * current_executor;
static LocalExecutor * getCurrentExecutor() { return current_executor; }
static void resetCurrentExecutor() { current_executor = nullptr; }
LocalExecutor(QueryPlanPtr query_plan, QueryPipelineBuilderPtr pipeline, bool dump_pipeline_ = false);
~LocalExecutor();

SparkRowInfoPtr next();
Expand All @@ -219,20 +223,26 @@ class LocalExecutor : public BlockIterator

/// Stop execution, used when task receives shutdown command or executor receives SIGTERM signal
void cancel();

void setSinks(std::function<void(QueryPipelineBuilder &)> setter)
{
setter(*query_pipeline_builder);
}
void execute();
Block & getHeader();
RelMetricPtr getMetric() const { return metric; }
void setMetric(const RelMetricPtr & metric_) { metric = metric_; }
void setExtraPlanHolder(std::vector<QueryPlanPtr> & extra_plan_holder_) { extra_plan_holder = std::move(extra_plan_holder_); }

private:
std::unique_ptr<SparkRowInfo> writeBlockToSparkRow(const DB::Block & block) const;

void initPullingPipelineExecutor();
/// Dump processor runtime information to log
std::string dumpPipeline() const;

QueryPipelineBuilderPtr query_pipeline_builder;
QueryPipeline query_pipeline;
std::unique_ptr<PullingPipelineExecutor> executor;
std::unique_ptr<DB::PullingAsyncPipelineExecutor> executor = nullptr;
PipelineExecutorPtr push_executor = nullptr;
Block header;
bool dump_pipeline;
std::unique_ptr<CHColumnToSparkRow> ch_column_to_spark_row;
Expand Down
Loading

0 comments on commit 8998c4e

Please sign in to comment.