diff --git a/.gitignore b/.gitignore index 35ede89817a5..3845572c773f 100644 --- a/.gitignore +++ b/.gitignore @@ -153,3 +153,5 @@ website/package-lock.json /programs/server/metadata /programs/server/store + +utils/local-engine/tests/testConfig.h diff --git a/utils/local-engine/Common/DebugUtils.h b/utils/local-engine/Common/DebugUtils.h new file mode 100644 index 000000000000..c05b9c5fdf24 --- /dev/null +++ b/utils/local-engine/Common/DebugUtils.h @@ -0,0 +1,106 @@ +#pragma once +#include +#include +#include + +namespace debug +{ + +void headBlock(const DB::Block & block, size_t count=10) +{ + std::cerr << "============Block============" << std::endl; + // print header + for (auto name : block.getNames()) + { + std::cerr << name << "\t"; + } + std::cerr << std::endl; + // print rows + for (size_t row = 0; row < std::min(count, block.rows()); ++row) + { + for (size_t column = 0; column < block.columns(); ++column) + { + auto type = block.getByPosition(column).type; + auto col = block.getByPosition(column).column; + DB::WhichDataType which(type); + if (which.isUInt()) + { + auto value = DB::checkAndGetColumn(*col)->getUInt(row); + std::cerr << std::to_string(value) << "\t"; + } + else if (which.isString()) + { + auto value = DB::checkAndGetColumn(*col)->getDataAt(row).toString(); + std::cerr << value << "\t"; + } + else if (which.isInt()) + { + auto value = col->getInt(row); + std::cerr << std::to_string(value) << "\t"; + } + else if (which.isFloat32()) + { + auto value = col->getFloat32(row); + std::cerr << std::to_string(value) << "\t"; + } + else if (which.isFloat64()) + { + auto value = col->getFloat64(row); + std::cerr << std::to_string(value) << "\t"; + } + else + { + std::cerr << "N/A" + << "\t"; + } + } + std::cerr << std::endl; + } +} + +void headColumn(const DB::ColumnPtr column, size_t count=10) +{ + std::cerr << "============Column============" << std::endl; + // print header + + std::cerr << column->getName() << "\t"; + std::cerr << std::endl; + // print rows + for (size_t row = 0; row < std::min(count, column->size()); ++row) + { + auto type = column->getDataType(); + auto col = column; + DB::WhichDataType which(type); + if (which.isUInt()) + { + auto value = DB::checkAndGetColumn(*col)->getUInt(row); + std::cerr << std::to_string(value) << std::endl; + } + else if (which.isString()) + { + auto value = DB::checkAndGetColumn(*col)->getDataAt(row).toString(); + std::cerr << value << std::endl; + } + else if (which.isInt()) + { + auto value = col->getInt(row); + std::cerr << std::to_string(value) << std::endl; + } + else if (which.isFloat32()) + { + auto value = col->getFloat32(row); + std::cerr << std::to_string(value) << std::endl; + } + else if (which.isFloat64()) + { + auto value = col->getFloat64(row); + std::cerr << std::to_string(value) << std::endl; + } + else + { + std::cerr << "N/A" + << std::endl; + } + } +} +} diff --git a/utils/local-engine/Shuffle/ShuffleSplitter.cpp b/utils/local-engine/Shuffle/ShuffleSplitter.cpp index 734e16ef45bc..caf4dea7e572 100644 --- a/utils/local-engine/Shuffle/ShuffleSplitter.cpp +++ b/utils/local-engine/Shuffle/ShuffleSplitter.cpp @@ -1,25 +1,25 @@ #include "ShuffleSplitter.h" #include #include -#include -#include -#include #include -#include +#include #include +#include +#include +#include #include #include namespace local_engine { -void ShuffleSplitter::split(DB::Block& block) +void ShuffleSplitter::split(DB::Block & block) { Stopwatch watch; watch.start(); computeAndCountPartitionId(block); splitBlockByPartition(block); - split_result.total_write_time +=watch.elapsedNanoseconds(); + split_result.total_write_time += watch.elapsedNanoseconds(); } SplitResult ShuffleSplitter::stop() { @@ -108,10 +108,9 @@ void ShuffleSplitter::spillPartition(size_t partition_id) watch.start(); if (!partition_outputs[partition_id]) { - partition_write_buffers[partition_id] - = getPartitionWriteBuffer(partition_id); - partition_outputs[partition_id] = std::make_unique( - *partition_write_buffers[partition_id], 0, partition_buffer[partition_id].getHeader()); + partition_write_buffers[partition_id] = getPartitionWriteBuffer(partition_id); + partition_outputs[partition_id] + = std::make_unique(*partition_write_buffers[partition_id], 0, partition_buffer[partition_id].getHeader()); } DB::Block result = partition_buffer[partition_id].releaseColumns(); partition_outputs[partition_id]->write(result); @@ -142,7 +141,7 @@ void ShuffleSplitter::mergePartitionFiles() data_write_buffer.close(); } -ShuffleSplitter::ShuffleSplitter(SplitOptions&& options_) : options(options_) +ShuffleSplitter::ShuffleSplitter(SplitOptions && options_) : options(options_) { init(); } @@ -157,28 +156,32 @@ ShuffleSplitter::Ptr ShuffleSplitter::create(std::string short_name, SplitOption { return HashSplitter::create(std::move(options_)); } - else if (short_name == "single") { + else if (short_name == "single") + { options_.partition_nums = 1; return RoundRobinSplitter::create(std::move(options_)); } else { - throw "unsupported splitter " + short_name; + throw std::runtime_error("unsupported splitter " + short_name); } } std::string ShuffleSplitter::getPartitionTempFile(size_t partition_id) { - std::string dir = std::filesystem::path(options.local_tmp_dir)/"_shuffle_data"/std::to_string(options.map_id); - if (!std::filesystem::exists(dir)) std::filesystem::create_directories(dir); - return std::filesystem::path(dir)/std::to_string(partition_id); + std::string dir = std::filesystem::path(options.local_tmp_dir) / "_shuffle_data" / std::to_string(options.map_id); + if (!std::filesystem::exists(dir)) + std::filesystem::create_directories(dir); + return std::filesystem::path(dir) / std::to_string(partition_id); } std::unique_ptr ShuffleSplitter::getPartitionWriteBuffer(size_t partition_id) { auto file = getPartitionTempFile(partition_id); if (partition_cached_write_buffers[partition_id] == nullptr) - partition_cached_write_buffers[partition_id] = std::make_unique(file, DBMS_DEFAULT_BUFFER_SIZE, O_CREAT | O_WRONLY | O_APPEND); - if (!options.compress_method.empty() && std::find(compress_methods.begin(), compress_methods.end(), options.compress_method) != compress_methods.end()) + partition_cached_write_buffers[partition_id] + = std::make_unique(file, DBMS_DEFAULT_BUFFER_SIZE, O_CREAT | O_WRONLY | O_APPEND); + if (!options.compress_method.empty() + && std::find(compress_methods.begin(), compress_methods.end(), options.compress_method) != compress_methods.end()) { auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(options.compress_method), {}); return std::make_unique(*partition_cached_write_buffers[partition_id], codec); @@ -251,35 +254,36 @@ void RoundRobinSplitter::computeAndCountPartitionId(DB::Block & block) split_result.total_compute_pid_time += watch.elapsedNanoseconds(); } -std::unique_ptr RoundRobinSplitter::create(SplitOptions&& options_) +std::unique_ptr RoundRobinSplitter::create(SplitOptions && options_) { - return std::make_unique( std::move(options_)); + return std::make_unique(std::move(options_)); } std::unique_ptr HashSplitter::create(SplitOptions && options_) { - return std::make_unique( std::move(options_)); + return std::make_unique(std::move(options_)); } void HashSplitter::computeAndCountPartitionId(DB::Block & block) { Stopwatch watch; watch.start(); + ColumnsWithTypeAndName args; + for (auto &name : options.exprs) + { + args.emplace_back(block.getByName(name)); + } if (!hash_function) { auto & factory = DB::FunctionFactory::instance(); auto function = factory.get("murmurHash3_32", local_engine::SerializedPlanParser::global_context); - ColumnsWithTypeAndName args; - for (auto &name : options.exprs) - { - args.emplace_back(block.getByName(name)); - } + hash_function = function->build(args); } auto result_type = hash_function->getResultType(); - auto hash_column = hash_function->execute(block.getColumnsWithTypeAndName(), result_type, block.rows(), false); + auto hash_column = hash_function->execute(args, result_type, block.rows(), false); partition_ids.clear(); - for (size_t i=0; i < block.rows(); i++) + for (size_t i = 0; i < block.rows(); i++) { partition_ids.emplace_back(static_cast(hash_column->getUInt(i) % options.partition_nums)); } diff --git a/utils/local-engine/tests/gtest_ch_functions.cpp b/utils/local-engine/tests/gtest_ch_functions.cpp new file mode 100644 index 000000000000..16b1a9d74c3d --- /dev/null +++ b/utils/local-engine/tests/gtest_ch_functions.cpp @@ -0,0 +1,44 @@ +#include +#include +#include +#include + +TEST(TestFuntion, hash) +{ + auto & factory = DB::FunctionFactory::instance(); + auto function = factory.get("murmurHash2_64", local_engine::SerializedPlanParser::global_context); + auto type0 = DataTypeFactory::instance().get("String"); + auto column0 = type0->createColumn(); + column0->insert("A"); + column0->insert("A"); + column0->insert("B"); + column0->insert("c"); + + auto column1 = type0->createColumn(); + column1->insert("X"); + column1->insert("X"); + column1->insert("Y"); + column1->insert("Z"); + + ColumnsWithTypeAndName columns = {ColumnWithTypeAndName(std::move(column0),type0, "string0"), + ColumnWithTypeAndName(std::move(column1),type0, "string0")}; + Block block(columns); + std::cerr << "input:\n"; + debug::headBlock(block); + auto executable = function->build(block.getColumnsWithTypeAndName()); + auto result = executable->execute(block.getColumnsWithTypeAndName(), executable->getResultType(), block.rows()); + std::cerr << "output:\n"; + debug::headColumn(result); + ASSERT_EQ(result->getUInt(0), result->getUInt(1)); +} + + +//int main(int argc, char ** argv) +//{ +// SharedContextHolder shared_context = Context::createShared(); +// local_engine::SerializedPlanParser::global_context = Context::createGlobal(shared_context.get()); +// local_engine::SerializedPlanParser::global_context->makeGlobalContext(); +// local_engine::SerializedPlanParser::initFunctionEnv(); +// ::testing::InitGoogleTest(&argc, argv); +// return RUN_ALL_TESTS(); +//}