From bd0fa0eb785973f0fd3978192ce2b698b4870218 Mon Sep 17 00:00:00 2001 From: Chang Chen Date: Thu, 21 Nov 2024 11:26:31 +0800 Subject: [PATCH] [Bug Fix] Only create output file when there is actual output chunks. --- .../Storages/Output/NormalFileWriter.h | 20 ++++++++++++------- .../Storages/Output/ORCOutputFormatFile.h | 5 ++--- .../Storages/Output/OutputFormatFile.h | 8 ++------ .../Storages/Output/ParquetOutputFormatFile.h | 1 - 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h index 79c99f02db55..d55703741845 100644 --- a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h +++ b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h @@ -245,6 +245,7 @@ class SubstraitFileSink final : public DB::SinkToStorage { const std::string partition_id_; const std::string relative_path_; + OutputFormatFilePtr format_file_; OutputFormatFile::OutputFormatPtr output_format_; std::shared_ptr stats_; DeltaStats delta_stats_; @@ -272,8 +273,7 @@ class SubstraitFileSink final : public DB::SinkToStorage : SinkToStorage(header) , partition_id_(partition_id.empty() ? NO_PARTITION_ID : partition_id) , relative_path_(relative) - , output_format_(createOutputFormatFile(context, makeAbsoluteFilename(base_path, partition_id, relative), header, format_hint) - ->createOutputFormat(header)) + , format_file_(createOutputFormatFile(context, makeAbsoluteFilename(base_path, partition_id, relative), header, format_hint)) , stats_(std::dynamic_pointer_cast(stats)) , delta_stats_(delta_stats) { @@ -285,15 +285,21 @@ class SubstraitFileSink final : public DB::SinkToStorage void consume(DB::Chunk & chunk) override { delta_stats_.update(chunk); + if (!output_format_) [[unlikely]] + output_format_ = format_file_->createOutputFormat(); output_format_->output->write(materializeBlock(getHeader().cloneWithColumns(chunk.detachColumns()))); } void onFinish() override { - output_format_->output->finalize(); - output_format_->output->flush(); - output_format_->write_buffer->finalize(); - if (stats_ && delta_stats_.row_count > 0) - stats_->collectStats(relative_path_, partition_id_, delta_stats_); + if (output_format_) [[unlikely]] + { + output_format_->output->finalize(); + output_format_->output->flush(); + output_format_->write_buffer->finalize(); + assert(delta_stats_.row_count > 0); + if (stats_) + stats_->collectStats(relative_path_, partition_id_, delta_stats_); + } } }; diff --git a/cpp-ch/local-engine/Storages/Output/ORCOutputFormatFile.h b/cpp-ch/local-engine/Storages/Output/ORCOutputFormatFile.h index 2ea197cddaa0..ba9b2a3c473a 100644 --- a/cpp-ch/local-engine/Storages/Output/ORCOutputFormatFile.h +++ b/cpp-ch/local-engine/Storages/Output/ORCOutputFormatFile.h @@ -20,8 +20,8 @@ #include "config.h" #if USE_ORC -# include -# include +#include +#include namespace local_engine { @@ -33,7 +33,6 @@ class ORCOutputFormatFile : public OutputFormatFile const std::string & file_uri_, WriteBufferBuilderPtr write_buffer_builder_, const DB::Block & preferred_schema_); - ~ORCOutputFormatFile() override = default; OutputFormatFile::OutputFormatPtr createOutputFormat(const DB::Block & header) override; }; diff --git a/cpp-ch/local-engine/Storages/Output/OutputFormatFile.h b/cpp-ch/local-engine/Storages/Output/OutputFormatFile.h index 7dcffc867aa3..e94923f77a43 100644 --- a/cpp-ch/local-engine/Storages/Output/OutputFormatFile.h +++ b/cpp-ch/local-engine/Storages/Output/OutputFormatFile.h @@ -16,10 +16,6 @@ */ #pragma once -#include -#include -#include - #include #include #include @@ -48,8 +44,8 @@ class OutputFormatFile virtual ~OutputFormatFile() = default; virtual OutputFormatPtr createOutputFormat(const DB::Block & header_) = 0; - - virtual const DB::Block getPreferredSchema() const { return preferred_schema; } + OutputFormatPtr createOutputFormat() { return createOutputFormat(preferred_schema); } + DB::Block getPreferredSchema() const { return preferred_schema; } protected: DB::Block createHeaderWithPreferredSchema(const DB::Block & header); diff --git a/cpp-ch/local-engine/Storages/Output/ParquetOutputFormatFile.h b/cpp-ch/local-engine/Storages/Output/ParquetOutputFormatFile.h index cc87da7da854..37b843a628ec 100644 --- a/cpp-ch/local-engine/Storages/Output/ParquetOutputFormatFile.h +++ b/cpp-ch/local-engine/Storages/Output/ParquetOutputFormatFile.h @@ -33,7 +33,6 @@ class ParquetOutputFormatFile : public OutputFormatFile const std::string & file_uri_, const WriteBufferBuilderPtr & write_buffer_builder_, const DB::Block & preferred_schema_); - ~ParquetOutputFormatFile() override = default; OutputFormatFile::OutputFormatPtr createOutputFormat(const DB::Block & header) override; };