Skip to content

Commit

Permalink
[Bug Fix] Only create output file when there is actual output chunks.
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Nov 21, 2024
1 parent 00a721f commit bd0fa0e
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 17 deletions.
20 changes: 13 additions & 7 deletions cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ class SubstraitFileSink final : public DB::SinkToStorage
{
const std::string partition_id_;
const std::string relative_path_;
OutputFormatFilePtr format_file_;
OutputFormatFile::OutputFormatPtr output_format_;
std::shared_ptr<WriteStats> stats_;
DeltaStats delta_stats_;
Expand Down Expand Up @@ -272,8 +273,7 @@ class SubstraitFileSink final : public DB::SinkToStorage
: SinkToStorage(header)
, partition_id_(partition_id.empty() ? NO_PARTITION_ID : partition_id)
, relative_path_(relative)
, output_format_(createOutputFormatFile(context, makeAbsoluteFilename(base_path, partition_id, relative), header, format_hint)
->createOutputFormat(header))
, format_file_(createOutputFormatFile(context, makeAbsoluteFilename(base_path, partition_id, relative), header, format_hint))
, stats_(std::dynamic_pointer_cast<WriteStats>(stats))
, delta_stats_(delta_stats)
{
Expand All @@ -285,15 +285,21 @@ class SubstraitFileSink final : public DB::SinkToStorage
void consume(DB::Chunk & chunk) override
{
delta_stats_.update(chunk);
if (!output_format_) [[unlikely]]
output_format_ = format_file_->createOutputFormat();
output_format_->output->write(materializeBlock(getHeader().cloneWithColumns(chunk.detachColumns())));
}
void onFinish() override
{
output_format_->output->finalize();
output_format_->output->flush();
output_format_->write_buffer->finalize();
if (stats_ && delta_stats_.row_count > 0)
stats_->collectStats(relative_path_, partition_id_, delta_stats_);
if (output_format_) [[unlikely]]
{
output_format_->output->finalize();
output_format_->output->flush();
output_format_->write_buffer->finalize();
assert(delta_stats_.row_count > 0);
if (stats_)
stats_->collectStats(relative_path_, partition_id_, delta_stats_);
}
}
};

Expand Down
5 changes: 2 additions & 3 deletions cpp-ch/local-engine/Storages/Output/ORCOutputFormatFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
#include "config.h"

#if USE_ORC
# include <IO/WriteBuffer.h>
# include <Storages/Output/OutputFormatFile.h>
#include <IO/WriteBuffer.h>
#include <Storages/Output/OutputFormatFile.h>

namespace local_engine
{
Expand All @@ -33,7 +33,6 @@ class ORCOutputFormatFile : public OutputFormatFile
const std::string & file_uri_,
WriteBufferBuilderPtr write_buffer_builder_,
const DB::Block & preferred_schema_);
~ORCOutputFormatFile() override = default;

OutputFormatFile::OutputFormatPtr createOutputFormat(const DB::Block & header) override;
};
Expand Down
8 changes: 2 additions & 6 deletions cpp-ch/local-engine/Storages/Output/OutputFormatFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@
*/
#pragma once

#include <memory>
#include <optional>
#include <vector>

#include <Core/Block.h>
#include <IO/WriteBuffer.h>
#include <Interpreters/Context.h>
Expand Down Expand Up @@ -48,8 +44,8 @@ class OutputFormatFile
virtual ~OutputFormatFile() = default;

virtual OutputFormatPtr createOutputFormat(const DB::Block & header_) = 0;

virtual const DB::Block getPreferredSchema() const { return preferred_schema; }
OutputFormatPtr createOutputFormat() { return createOutputFormat(preferred_schema); }
DB::Block getPreferredSchema() const { return preferred_schema; }

protected:
DB::Block createHeaderWithPreferredSchema(const DB::Block & header);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ class ParquetOutputFormatFile : public OutputFormatFile
const std::string & file_uri_,
const WriteBufferBuilderPtr & write_buffer_builder_,
const DB::Block & preferred_schema_);
~ParquetOutputFormatFile() override = default;

OutputFormatFile::OutputFormatPtr createOutputFormat(const DB::Block & header) override;
};
Expand Down

0 comments on commit bd0fa0e

Please sign in to comment.