Skip to content

Commit

Permalink
fix rebase error
Browse files Browse the repository at this point in the history
  • Loading branch information
liuneng1994 committed Apr 13, 2022
1 parent 8c6bb93 commit c342630
Show file tree
Hide file tree
Showing 18 changed files with 94 additions and 83 deletions.
4 changes: 4 additions & 0 deletions base/base/unit.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
#pragma once
#include <cstddef>

#ifdef HAS_RESERVED_IDENTIFIER
#pragma clang diagnostic ignored "-Wreserved-identifier"
#endif

constexpr size_t KiB = 1024;
constexpr size_t MiB = 1024 * KiB;
constexpr size_t GiB = 1024 * MiB;
Expand Down
97 changes: 48 additions & 49 deletions utils/local-engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ target_link_libraries(${LOCALENGINE_SHARED_LIB} PUBLIC
clickhouse_aggregate_functions
clickhouse_common_config
clickhouse_common_io
clickhouse_dictionaries
clickhouse_functions
clickhouse_parsers
clickhouse_storages_system
Expand All @@ -57,41 +56,41 @@ if (USE_LIBCXX)
target_compile_options(cxxabi PRIVATE -fPIC)
target_compile_options(cxx PRIVATE -fPIC)
endif()
target_compile_options(ssl PRIVATE -fPIC)
target_compile_options(crypto PRIVATE -fPIC)
target_compile_options(libpq PRIVATE -fPIC)
target_compile_options(libpqxx PRIVATE -fPIC)
target_compile_options(lz4 PRIVATE -fPIC)
target_compile_options(snappy PRIVATE -fPIC)
target_compile_options(zstd PRIVATE -fPIC)
target_compile_options(s2 PRIVATE -fPIC)
target_compile_options(sqlite PRIVATE -fPIC)
target_compile_options(zlib PRIVATE -fPIC)
target_compile_options(krb5 PRIVATE -fPIC)
target_compile_options(libprotobuf PRIVATE -fPIC)
#target_compile_options(ssl PRIVATE -fPIC)
#target_compile_options(crypto PRIVATE -fPIC)
#target_compile_options(libpq PRIVATE -fPIC)
#target_compile_options(libpqxx PRIVATE -fPIC)
#target_compile_options(lz4 PRIVATE -fPIC)
#target_compile_options(snappy PRIVATE -fPIC)
#target_compile_options(zstd PRIVATE -fPIC)
#target_compile_options(s2 PRIVATE -fPIC)
#target_compile_options(sqlite PRIVATE -fPIC)
#target_compile_options(zlib PRIVATE -fPIC)
#target_compile_options(krb5 PRIVATE -fPIC)
#target_compile_options(libprotobuf PRIVATE -fPIC)
target_compile_options(re2 PRIVATE -fPIC)
target_compile_options(ldap_r PRIVATE -fPIC)
target_compile_options(thrift_static PRIVATE -fPIC)
target_compile_options(double-conversion PRIVATE -fPIC)
target_compile_options(arrow_static PRIVATE -fPIC)
target_compile_options(parquet_static PRIVATE -fPIC)
target_compile_options(lber PRIVATE -fPIC)
#target_compile_options(ldap_r PRIVATE -fPIC)
#target_compile_options(thrift_static PRIVATE -fPIC)
#target_compile_options(double-conversion PRIVATE -fPIC)
#target_compile_options(arrow_static PRIVATE -fPIC)
#target_compile_options(parquet_static PRIVATE -fPIC)
#target_compile_options(lber PRIVATE -fPIC)
target_compile_options(_poco_foundation PRIVATE -fPIC)
target_compile_options(fmt PRIVATE -fPIC)
target_compile_options(cctz PRIVATE -fPIC)
#target_compile_options(fmt PRIVATE -fPIC)
#target_compile_options(cctz PRIVATE -fPIC)
target_compile_options(_poco_xml PRIVATE -fPIC)
target_compile_options(_poco_util PRIVATE -fPIC)
target_compile_options(_poco_json PRIVATE -fPIC)
target_compile_options(_poco_net PRIVATE -fPIC)
target_compile_options(_poco_net_ssl PRIVATE -fPIC)
target_compile_options(common PRIVATE -fPIC)
target_compile_options(mysqlxx PRIVATE -fPIC)
target_compile_options(nuraft PRIVATE -fPIC)
#target_compile_options(nuraft PRIVATE -fPIC)
target_compile_options(glibc-compatibility PRIVATE -fPIC)
target_compile_options(_poco_foundation_pcre PRIVATE -fPIC)
target_compile_options(_poco_xml_expat PRIVATE -fPIC)
target_compile_options(_poco_crypto PRIVATE -fPIC)
target_compile_options(rdkafka PRIVATE -fPIC)
#target_compile_options(rdkafka PRIVATE -fPIC)
target_compile_options(LLVMDemangle PRIVATE -fPIC)
target_compile_options(LLVMSupport PRIVATE -fPIC)
target_compile_options(LLVMDebugInfoCodeView PRIVATE -fPIC)
Expand Down Expand Up @@ -128,9 +127,9 @@ target_compile_options(LLVMAsmPrinter PRIVATE -fPIC)
target_compile_options(LLVMX86Desc PRIVATE -fPIC)
target_compile_options(LLVMX86CodeGen PRIVATE -fPIC)
target_compile_options(LLVMRuntimeDyld PRIVATE -fPIC)
target_compile_options(icuuc PRIVATE -fPIC)
target_compile_options(icui18n PRIVATE -fPIC)
target_compile_options(cpuid PRIVATE -fPIC)
#target_compile_options(icuuc PRIVATE -fPIC)
#target_compile_options(icui18n PRIVATE -fPIC)
#target_compile_options(cpuid PRIVATE -fPIC)
target_compile_options(re2_st PRIVATE -fPIC)
target_compile_options(_boost_program_options PRIVATE -fPIC)
target_compile_options(clickhouse_common_io PRIVATE -fPIC)
Expand All @@ -148,37 +147,37 @@ target_compile_options(LLVMMCDisassembler PRIVATE -fPIC)
target_compile_options(LLVMX86Info PRIVATE -fPIC)
target_compile_options(LLVMCFGuard PRIVATE -fPIC)
target_compile_options(LLVMExecutionEngine PRIVATE -fPIC)
target_compile_options(roaring PRIVATE -fPIC)
target_compile_options(yaml-cpp PRIVATE -fPIC)
#target_compile_options(roaring PRIVATE -fPIC)
#target_compile_options(yaml-cpp PRIVATE -fPIC)
target_compile_options(dbms PRIVATE -fPIC)
target_compile_options(clickhouse_functions_jsonpath PRIVATE -fPIC)
target_compile_options(dragonbox_to_chars PRIVATE -fPIC)
#target_compile_options(dragonbox_to_chars PRIVATE -fPIC)
target_compile_options(clickhouse_functions_gatherutils PRIVATE -fPIC)
target_compile_options(clickhouse_functions_array PRIVATE -fPIC)
target_compile_options(hyperscan PRIVATE -fPIC)
target_compile_options(simdjson PRIVATE -fPIC)
#target_compile_options(hyperscan PRIVATE -fPIC)
#target_compile_options(simdjson PRIVATE -fPIC)
target_compile_options(divide_impl_avx2 PRIVATE -fPIC)
target_compile_options(divide_impl PRIVATE -fPIC)
target_compile_options(murmurhash PRIVATE -fPIC)
target_compile_options(fastops PRIVATE -fPIC)
target_compile_options(base64 PRIVATE -fPIC)
target_compile_options(base64_avx PRIVATE -fPIC)
target_compile_options(base64_avx2 PRIVATE -fPIC)
target_compile_options(base64_ssse3 PRIVATE -fPIC)
target_compile_options(base64_scalar PRIVATE -fPIC)
#target_compile_options(murmurhash PRIVATE -fPIC)
#target_compile_options(fastops PRIVATE -fPIC)
#target_compile_options(base64 PRIVATE -fPIC)
#target_compile_options(base64_avx PRIVATE -fPIC)
#target_compile_options(base64_avx2 PRIVATE -fPIC)
#target_compile_options(base64_ssse3 PRIVATE -fPIC)
#target_compile_options(base64_scalar PRIVATE -fPIC)
target_compile_options(clickhouse_functions_url PRIVATE -fPIC)
target_compile_options(divide_impl_sse2 PRIVATE -fPIC)
target_compile_options(metrohash PRIVATE -fPIC)
target_compile_options(consistent-hashing PRIVATE -fPIC)
target_compile_options(h3 PRIVATE -fPIC)
target_compile_options(farmhash PRIVATE -fPIC)
target_compile_options(cityhash PRIVATE -fPIC)
target_compile_options(liblzma PRIVATE -fPIC)
target_compile_options(brotli PRIVATE -fPIC)
target_compile_options(bzip2 PRIVATE -fPIC)
target_compile_options(stemmer PRIVATE -fPIC)
target_compile_options(wnb PRIVATE -fPIC)
target_compile_options(lemmagen PRIVATE -fPIC)
#target_compile_options(metrohash PRIVATE -fPIC)
#target_compile_options(consistent-hashing PRIVATE -fPIC)
#target_compile_options(h3 PRIVATE -fPIC)
#target_compile_options(farmhash PRIVATE -fPIC)
#target_compile_options(cityhash PRIVATE -fPIC)
#target_compile_options(liblzma PRIVATE -fPIC)
#target_compile_options(brotli PRIVATE -fPIC)
#target_compile_options(bzip2 PRIVATE -fPIC)
#target_compile_options(stemmer PRIVATE -fPIC)
#target_compile_options(wnb PRIVATE -fPIC)
#target_compile_options(lemmagen PRIVATE -fPIC)

#target_compile_options(zstd PRIVATE -fPIC)

Expand Down
2 changes: 1 addition & 1 deletion utils/local-engine/Parser/CHColumnToSparkRow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ void writeValue(
}
else
{
throw std::runtime_error("doesn't support type " + std::string(getTypeName(nested_col->getDataType())));
throw std::runtime_error("doesn't support type " + String(magic_enum::enum_name(nested_col->getDataType())));
}
}

Expand Down
20 changes: 11 additions & 9 deletions utils/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Processors/Formats/Impl/ArrowBlockOutputFormat.h>
#include <Processors/Formats/Impl/ParquetBlockInputFormat.h>
#include <Processors/Pipe.h>
#include <QueryPipeline/Pipe.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/LimitStep.h>
Expand All @@ -29,7 +29,7 @@
#include <Storages/StorageMergeTreeFactory.h>
#include <Storages/ParquetRowInputFormat.h>
#include <Poco/Util/MapConfiguration.h>
#include <common/logger_useful.h>
#include <base/logger_useful.h>

bool local_engine::SerializedPlanParser::isReadRelFromJava(const substrait::ReadRel& rel) {
assert(rel.has_local_files());
Expand Down Expand Up @@ -99,6 +99,7 @@ DB::QueryPlanPtr local_engine::SerializedPlanParser::parseMergeTreeTable(const s
custom_storage_merge_tree->loadDataParts(false);
return custom_storage_merge_tree;
});
query_context.storage_snapshot = std::make_shared<StorageSnapshot>(*storage, metadata);
auto t_storage = watch.elapsedMicroseconds() - t_metadata;
query_context.custom_storage_merge_tree = storage;
auto query_info = local_engine::buildQueryInfo(names_and_types_list);
Expand All @@ -115,8 +116,7 @@ DB::QueryPlanPtr local_engine::SerializedPlanParser::parseMergeTreeTable(const s
}
auto query = query_context.custom_storage_merge_tree->reader.readFromParts(selected_parts,
names_and_types_list.getNames(),
query_context.metadata,
query_context.metadata,
query_context.storage_snapshot,
*query_info,
this->context,
4096 * 2,
Expand Down Expand Up @@ -567,10 +567,11 @@ DB::Chunk local_engine::BatchParquetFileSource::generate()
read_buf = std::move(nested_buffer);
ProcessorPtr format = std::make_shared<local_engine::ParquetRowInputFormat>(*read_buf, header);
// auto format = DB::ParquetBlockInputFormat::getParquetFormat(*read_buf, header);
pipeline = std::make_unique<QueryPipeline>();
pipeline->init(Pipe(format));
QueryPipelineBuilder builder;
builder.init(Pipe(format));
pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));

reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
reader = std::make_unique<PullingPipelineExecutor>(pipeline);
}

Chunk chunk;
Expand Down Expand Up @@ -598,14 +599,15 @@ void local_engine::LocalExecutor::execute(DB::QueryPlanPtr query_plan)
Stopwatch stopwatch;
stopwatch.start();
QueryPlanOptimizationSettings optimization_settings{.optimize_plan = true};
this->query_pipeline = query_plan->buildQueryPipeline(optimization_settings, BuildQueryPipelineSettings{
auto pipeline_builder = query_plan->buildQueryPipeline(optimization_settings, BuildQueryPipelineSettings{
.actions_settings = ExpressionActionsSettings{
.can_compile_expressions = true,
.min_count_to_compile_expression = 3,
.compile_expressions = CompileExpressions::no
}});
this->query_pipeline = QueryPipelineBuilder::getPipeline(std::move(*pipeline_builder));
auto t_pipeline = stopwatch.elapsedMicroseconds();
this->executor = std::make_unique<DB::PullingPipelineExecutor>(*query_pipeline);
this->executor = std::make_unique<DB::PullingPipelineExecutor>(query_pipeline);
auto t_executor = stopwatch.elapsedMicroseconds() - t_pipeline;
stopwatch.stop();
LOG_INFO(&Poco::Logger::get("SerializedPlanParser"),
Expand Down
7 changes: 4 additions & 3 deletions utils/local-engine/Parser/SerializedPlanParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include <Core/Block.h>
#include <Core/ColumnWithTypeAndName.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h>
#include <QueryPipeline/Pipe.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Formats/Impl/CHColumnToArrowColumn.h>
#include <arrow/ipc/writer.h>
Expand Down Expand Up @@ -44,7 +44,7 @@ class BatchParquetFileSource : public DB::SourceWithProgress
private:
FilesInfoPtr files_info;
std::unique_ptr<ReadBuffer> read_buf;
std::unique_ptr<QueryPipeline> pipeline;
QueryPipeline pipeline;
std::unique_ptr<PullingPipelineExecutor> reader;
bool finished_generate = false;
std::string current_path;
Expand Down Expand Up @@ -78,6 +78,7 @@ static const std::map<std::string, std::string> SCALAR_FUNCTIONS = {
static const std::set<std::string> FUNCTION_NEED_KEEP_ARGUMENTS = {"alias"};

struct QueryContext {
StorageSnapshotPtr storage_snapshot;
std::shared_ptr<DB::StorageInMemoryMetadata> metadata;
std::shared_ptr<local_engine::CustomStorageMergeTree> custom_storage_merge_tree;
};
Expand Down Expand Up @@ -186,7 +187,7 @@ class LocalExecutor
private:
QueryContext query_context;
std::unique_ptr<local_engine::SparkRowInfo> writeBlockToSparkRow(DB::Block & block);
QueryPipelinePtr query_pipeline;
QueryPipeline query_pipeline;
std::unique_ptr<PullingPipelineExecutor> executor;
Block header;
std::unique_ptr<local_engine::CHColumnToSparkRow> ch_column_to_spark_row;
Expand Down
2 changes: 1 addition & 1 deletion utils/local-engine/Parser/SparkColumnToCHColumn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ static void writeRowToColumns(std::vector<MutableColumnPtr> & columns, SparkRowR
}
else
{
throw std::runtime_error("doesn't support type " + std::string(getTypeName(columns[i]->getDataType())));
throw std::runtime_error("doesn't support type " + String(magic_enum::enum_name(columns[i]->getDataType())));
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion utils/local-engine/Parser/SparkColumnToCHColumn.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include <Core/Block.h>
#include <common/StringRef.h>
#include <base/StringRef.h>
#include <Parser/CHColumnToSparkRow.h>

namespace local_engine
Expand Down
4 changes: 2 additions & 2 deletions utils/local-engine/Shuffle/ShuffleReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ local_engine::ShuffleReader::ShuffleReader(std::unique_ptr<ReadBuffer> in_, bool
if (compressed)
{
compressed_in = std::make_unique<CompressedReadBuffer>(*in);
input_stream = std::make_unique<NativeBlockInputStream>(*compressed_in, 0);
input_stream = std::make_unique<NativeReader>(*compressed_in, 0);
}
else
{
input_stream = std::make_unique<NativeBlockInputStream>(*in, 0);
input_stream = std::make_unique<NativeReader>(*in, 0);
}
}
Block* local_engine::ShuffleReader::read()
Expand Down
4 changes: 2 additions & 2 deletions utils/local-engine/Shuffle/ShuffleReader.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once
#include <IO/ReadBuffer.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <Formats/NativeReader.h>
#include <Compression/CompressedReadBuffer.h>
#include <jni.h>

Expand All @@ -23,7 +23,7 @@ class ShuffleReader

private:
std::unique_ptr<CompressedReadBuffer> compressed_in;
std::unique_ptr<NativeBlockInputStream> input_stream;
std::unique_ptr<NativeReader> input_stream;
Block header;
};

Expand Down
4 changes: 3 additions & 1 deletion utils/local-engine/Shuffle/ShuffleSplitter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <IO/WriteHelpers.h>
#include <Functions/FunctionFactory.h>
#include <Parser/SerializedPlanParser.h>
#include <boost/algorithm/string/case_conv.hpp>


namespace local_engine
{
Expand Down Expand Up @@ -108,7 +110,7 @@ void ShuffleSplitter::spillPartition(size_t partition_id)
{
partition_write_buffers[partition_id]
= getPartitionWriteBuffer(partition_id);
partition_outputs[partition_id] = std::make_unique<DB::NativeBlockOutputStream>(
partition_outputs[partition_id] = std::make_unique<DB::NativeWriter>(
*partition_write_buffers[partition_id], 0, partition_buffer[partition_id].getHeader());
}
DB::Block result = partition_buffer[partition_id].releaseColumns();
Expand Down
4 changes: 2 additions & 2 deletions utils/local-engine/Shuffle/ShuffleSplitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include <Columns/IColumn.h>
#include <Common/PODArray_fwd.h>
#include <Common/PODArray.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <Formats/NativeWriter.h>
#include <IO/WriteBufferFromFile.h>
#include <Functions/IFunction.h>

Expand Down Expand Up @@ -82,7 +82,7 @@ class ShuffleSplitter
bool stopped = false;
std::vector<DB::IColumn::ColumnIndex> partition_ids;
std::vector<ColumnsBuffer> partition_buffer;
std::vector<std::unique_ptr<DB::NativeBlockOutputStream>> partition_outputs;
std::vector<std::unique_ptr<DB::NativeWriter>> partition_outputs;
std::vector<std::unique_ptr<DB::WriteBuffer>> partition_write_buffers;
std::vector<std::unique_ptr<DB::WriteBuffer>> partition_cached_write_buffers;
SplitOptions options;
Expand Down
6 changes: 3 additions & 3 deletions utils/local-engine/Storages/CustomMergeTreeSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

void local_engine::CustomMergeTreeSink::consume(Chunk chunk)
{
auto block = getPort().getHeader().cloneWithColumns(chunk.detachColumns());
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
DB::BlockWithPartition block_with_partition(Block(block), DB::Row{});
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(block_with_partition, metadata_snapshot, context);
storage.renameTempPartAndAdd(part, &storage.increment, nullptr, nullptr);
auto part = storage.writer.writeTempPart(block_with_partition, metadata_snapshot, context);
storage.renameTempPartAndAdd(part.part, &storage.increment, nullptr, nullptr);
}
2 changes: 1 addition & 1 deletion utils/local-engine/Storages/CustomStorageMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ std::vector<MergeTreeMutationStatus> local_engine::CustomStorageMergeTree::getMu
{
throw std::runtime_error("not implement");
}
bool local_engine::CustomStorageMergeTree::scheduleDataProcessingJob(IBackgroundJobExecutor & executor)
bool local_engine::CustomStorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & executor)
{
throw std::runtime_error("not implement");
}
Expand Down
4 changes: 2 additions & 2 deletions utils/local-engine/Storages/CustomStorageMergeTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <common/shared_ptr_helper.h>
#include <base/shared_ptr_helper.h>
#include <Storages/MutationCommands.h>

using namespace DB;
Expand All @@ -29,7 +29,7 @@ class CustomStorageMergeTree final : public shared_ptr_helper<CustomStorageMerge
bool has_force_restore_data_flag = false);
string getName() const override;
vector<MergeTreeMutationStatus> getMutationsStatus() const override;
bool scheduleDataProcessingJob(IBackgroundJobExecutor & executor) override;
bool scheduleDataProcessingJob(BackgroundJobsAssignee & executor) override;

MergeTreeDataWriter writer;
MergeTreeDataSelectExecutor reader;
Expand Down
Loading

0 comments on commit c342630

Please sign in to comment.