Skip to content

Commit

Permalink
Support TPCH Q1 (#8)
Browse files Browse the repository at this point in the history
* support tpch q1

* clean log

* disable background executors

* add storage cache evict

* enable function compile

* support jemalloc

* fix q6 benchmark failed

* remove shared unwind
  • Loading branch information
liuneng1994 authored Apr 27, 2022
1 parent 04b4e51 commit ee7d3c8
Show file tree
Hide file tree
Showing 17 changed files with 171 additions and 240 deletions.
6 changes: 1 addition & 5 deletions cmake/unwind.cmake
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
option (USE_UNWIND "Enable libunwind (better stacktraces)" ${ENABLE_LIBRARIES})

if (USE_UNWIND)
if (MAKE_STATIC_LIBRARIES)
add_subdirectory(contrib/libunwind-cmake)
else()
add_subdirectory(${ClickHouse_SOURCE_DIR}/utils/local-engine/cmake/libunwind)
endif()
add_subdirectory(contrib/libunwind-cmake)
set (UNWIND_LIBRARIES unwind)
set (EXCEPTION_HANDLING_LIBRARY ${UNWIND_LIBRARIES})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@
/* #undef JEMALLOC_MUTEX_INIT_CB */

/* Non-empty if the tls_model attribute is supported. */
#define JEMALLOC_TLS_MODEL __attribute__((tls_model("initial-exec")))
/*#define JEMALLOC_TLS_MODEL __attribute__((tls_model("initial-exec")))*/
#define JEMALLOC_TLS_MODEL

/*
* JEMALLOC_DEBUG enables assertions and other sanity checks, and disables
Expand Down
4 changes: 4 additions & 0 deletions src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3197,5 +3197,9 @@ ReadSettings Context::getReadSettings() const

return res;
}
void Context::setBackgroundExecutorsInitialized(bool initialized)
{
shared->is_background_executors_initialized = initialized;
}

}
3 changes: 3 additions & 0 deletions src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,9 @@ class Context: public std::enable_shared_from_this<Context>
/** Get settings for reading from filesystem. */
ReadSettings getReadSettings() const;

/** Used to disable global context initialized. */
void setBackgroundExecutorsInitialized(bool initialized);

private:
std::unique_lock<std::recursive_mutex> getLock() const;

Expand Down
4 changes: 3 additions & 1 deletion utils/local-engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ target_compile_options(absl_int128 PRIVATE -fPIC)
target_compile_options(absl_raw_hash_set PRIVATE -fPIC)
target_compile_options(absl_raw_hash_set PRIVATE -fPIC)
add_compile_options(-fPIC)

if (ENABLE_JEMALLOC)
target_compile_options(_jemalloc PRIVATE -fPIC)
endif()
#target_compile_options(zstd PRIVATE -fPIC)

#set(CPACK_PACKAGE_VERSION 0.1.0)
Expand Down
2 changes: 1 addition & 1 deletion utils/local-engine/Common/Logger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ void local_engine::Logger::initConsoleLogger()
AutoPtr<ConsoleChannel> pCons(new ConsoleChannel);
AutoPtr<AsyncChannel> pAsync(new AsyncChannel(pCons));
Poco::Logger::root().setChannel(pAsync);
Poco::Logger::root().setLevel("debug");
Poco::Logger::root().setLevel("error");
Poco::Logger::root().debug("init logger success");
}

8 changes: 8 additions & 0 deletions utils/local-engine/Common/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <Interpreters/Context.h>
#include <Parser/SerializedPlanParser.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Common/Logger.h>
#include <jni.h>

Expand All @@ -24,10 +25,17 @@ void init()
registerAllFunctions();
local_engine::SerializedPlanParser::shared_context = SharedContextHolder(Context::createShared());
local_engine::SerializedPlanParser::global_context = Context::createGlobal(local_engine::SerializedPlanParser::shared_context.get());
// disable global context initialized
local_engine::SerializedPlanParser::global_context->setBackgroundExecutorsInitialized(true);
local_engine::SerializedPlanParser::global_context->makeGlobalContext();
local_engine::SerializedPlanParser::global_context->setConfig(local_engine::SerializedPlanParser::config);
local_engine::SerializedPlanParser::global_context->setPath("/");
local_engine::Logger::initConsoleLogger();

/// 128 MB
constexpr size_t compiled_expression_cache_size_default = 1024 * 1024 * 128;
constexpr size_t compiled_expression_cache_elements_size_default = 10000;
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_size_default, compiled_expression_cache_size_default);
}

char * createExecutor(std::string plan_string)
Expand Down
102 changes: 86 additions & 16 deletions utils/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
#include <Processors/Formats/Impl/ArrowBlockOutputFormat.h>
#include <Processors/Formats/Impl/ParquetBlockInputFormat.h>
#include <QueryPipeline/Pipe.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/MergingAggregatedStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/LimitStep.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
Expand Down Expand Up @@ -86,7 +88,7 @@ DB::QueryPlanPtr local_engine::SerializedPlanParser::parseMergeTreeTable(const s
auto metadata = local_engine::buildMetaData(names_and_types_list, this->context);
auto t_metadata = watch.elapsedMicroseconds();
query_context.metadata = metadata;
auto storage = storageFactory.getStorage(DB::StorageID(merge_tree_table.database, merge_tree_table.table), [merge_tree_table, metadata]() -> local_engine::CustomStorageMergeTreePtr {
auto storage = storageFactory.getStorage(DB::StorageID(merge_tree_table.database, merge_tree_table.table), metadata->getColumns(), [merge_tree_table, metadata]() -> local_engine::CustomStorageMergeTreePtr {
auto custom_storage_merge_tree = std::make_shared<local_engine::CustomStorageMergeTree>(
DB::StorageID(merge_tree_table.database, merge_tree_table.table),
merge_tree_table.relative_path,
Expand Down Expand Up @@ -256,6 +258,12 @@ DB::QueryPlanPtr local_engine::SerializedPlanParser::parseOp(const substrait::Re
}
required_columns.emplace_back(DB::NameWithAlias (name, name));
}
else if (expr.has_literal())
{
auto const_col = parseArgument(actions_dag, expr);
actions_dag->addOrReplaceInIndex(*const_col);
required_columns.emplace_back(DB::NameWithAlias(const_col->result_name, const_col->result_name));
}
else
{
throw std::runtime_error("unsupported projection type");
Expand Down Expand Up @@ -336,7 +344,46 @@ DB::QueryPlanStepPtr local_engine::SerializedPlanParser::parseAggregate(DB::Quer
auto expression_before_aggregate = std::make_unique<DB::ExpressionStep>(input, expression);
plan.addStep(std::move(expression_before_aggregate));

// TODO need support grouping key
std::set<substrait::AggregationPhase> phase_set;
for (int i = 0; i < rel.measures_size(); ++i)
{
const auto & measure = rel.measures(i);
phase_set.emplace(measure.measure().phase());
}
if (phase_set.size() > 1)
{
throw std::runtime_error("two many aggregate phase!");
}
bool final=true;
if (phase_set.contains(substrait::AggregationPhase::AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE)
|| phase_set.contains(substrait::AggregationPhase::AGGREGATION_PHASE_INTERMEDIATE_TO_INTERMEDIATE))
final = false;

bool only_merge = phase_set.contains(substrait::AggregationPhase::AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT);


DB::ColumnNumbers keys = {};
if (rel.groupings_size() == 1)
{
for (auto group : rel.groupings(0).grouping_expressions())
{
if (group.has_selection() && group.selection().has_direct_reference())
{
keys.emplace_back(group.selection().direct_reference().struct_field().field());
}
else
{
throw std::runtime_error("unsupported group expression");

}
}
}
// only support one grouping or no grouping
else if (rel.groupings_size() != 0)
{
throw std::runtime_error("too many groupings");
}

auto aggregates = DB::AggregateDescriptions();
for (int i = 0; i < rel.measures_size(); ++i)
{
Expand All @@ -346,25 +393,46 @@ DB::QueryPlanStepPtr local_engine::SerializedPlanParser::parseAggregate(DB::Quer
auto function_name_idx = function_signature.find(":");
assert(function_name_idx != function_signature.npos && ("invalid function signature: " + function_signature).c_str());
auto function_name = function_signature.substr(0, function_name_idx);
agg.column_name = function_name +"(" + measure_names.at(i) + ")";
if (only_merge)
{
agg.column_name = measure_names.at(i);
}
else
{
agg.column_name = function_name +"(" + measure_names.at(i) + ")";
}
agg.arguments = DB::ColumnNumbers{plan.getCurrentDataStream().header.getPositionByName(measure_names.at(i))};
agg.argument_names = DB::Names{measure_names.at(i)};
agg.function = ::getAggregateFunction(function_name, {plan.getCurrentDataStream().header.getByName(measure_names.at(i)).type});
aggregates.push_back(agg);
}

auto aggregating_step = std::make_unique<AggregatingStep>(
plan.getCurrentDataStream(),
this->getAggregateParam(plan.getCurrentDataStream().header, {}, aggregates),
true,
1000000,
1,
1,
1,
false,
nullptr,
DB::SortDescription());
return aggregating_step;

if (only_merge)
{
auto transform_params = std::make_shared<AggregatingTransformParams>(this->getMergedAggregateParam(plan.getCurrentDataStream().header, keys, aggregates), final);
return std::make_unique<DB::MergingAggregatedStep>(
plan.getCurrentDataStream(),
transform_params,
false,
1,
1);
}
else
{
auto aggregating_step = std::make_unique<AggregatingStep>(
plan.getCurrentDataStream(),
this->getAggregateParam(plan.getCurrentDataStream().header, keys, aggregates),
final,
1000000,
1,
1,
1,
false,
nullptr,
DB::SortDescription());
return aggregating_step;
}
}

DB::NamesAndTypesList local_engine::SerializedPlanParser::blockToNameAndTypeList(const DB::Block & header)
Expand Down Expand Up @@ -411,6 +479,7 @@ std::string local_engine::SerializedPlanParser::getFunctionName(std::string func
}
else
{
LOG_ERROR(&Poco::Logger::get("SerializedPlanParser"), "doesn't support function {}", function_signature);
throw std::runtime_error("doesn't support function " + function_signature);
}
}
Expand Down Expand Up @@ -521,6 +590,7 @@ DB::QueryPlanPtr local_engine::SerializedPlanParser::parse(std::string & plan)
{
auto plan_ptr = std::make_unique<substrait::Plan>();
plan_ptr->ParseFromString(plan);
// std::cerr << plan_ptr->DebugString() <<std::endl;
return parse(std::move(plan_ptr));
}
void local_engine::SerializedPlanParser::initFunctionEnv()
Expand Down Expand Up @@ -604,7 +674,7 @@ void local_engine::LocalExecutor::execute(DB::QueryPlanPtr query_plan)
.actions_settings = ExpressionActionsSettings{
.can_compile_expressions = true,
.min_count_to_compile_expression = 3,
.compile_expressions = CompileExpressions::no
.compile_expressions = CompileExpressions::yes
}});
this->query_pipeline = QueryPipelineBuilder::getPipeline(std::move(*pipeline_builder));
auto t_pipeline = stopwatch.elapsedMicroseconds();
Expand Down
21 changes: 16 additions & 5 deletions utils/local-engine/Parser/SerializedPlanParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,17 @@ static const std::map<std::string, std::string> SCALAR_FUNCTIONS = {
{"and", "and"},
{"lte", "lessOrEquals"},
{"lt", "less"},
{"multiply", "multiply"},
{"sum", "sum"},
{"TO_DATE", "toDate"},
{"equal", "equals"},
{"cast", ""},
{"alias", "alias"}
{"alias", "alias"},
{"subtract", "minus"},
{"multiply", "multiply"},
{"add", "plus"},
// aggregate functions
{"count", "count"},
{"avg", "avg"},
{"sum", "sum"}
};

static const std::set<std::string> FUNCTION_NEED_KEEP_ARGUMENTS = {"alias"};
Expand Down Expand Up @@ -142,8 +147,14 @@ class SerializedPlanParser
nullptr,
settings.max_threads,
settings.min_free_disk_space_for_temporary_data,
false,
settings.min_count_to_compile_aggregate_expression);
true,
3);
}

Aggregator::Params getMergedAggregateParam(const Block & header, const ColumnNumbers & keys, const AggregateDescriptions & aggregates)
{
Settings settings;
return Aggregator::Params(header, keys, aggregates, false, settings.max_threads);
}


Expand Down
8 changes: 6 additions & 2 deletions utils/local-engine/Storages/SourceFromJavaIter.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "SourceFromJavaIter.h"
#include<Processors/Transforms/AggregatingTransform.h>

namespace local_engine
{
Expand All @@ -17,12 +18,15 @@ DB::Chunk SourceFromJavaIter::generate()
DB::Block * data = reinterpret_cast<DB::Block *>(byteArrayToLong(env, block));
size_t rows = data->rows();
auto chunk = DB::Chunk(data->mutateColumns(), rows);
// delete data;
auto info = std::make_shared<DB::AggregatedChunkInfo>();
info->is_overflows = data->info.is_overflows;
info->bucket_num = data->info.bucket_num;
chunk.setChunkInfo(info);
return chunk;
}
else
{
return DB::Chunk();
return {};
}
}
SourceFromJavaIter::~SourceFromJavaIter()
Expand Down
20 changes: 19 additions & 1 deletion utils/local-engine/Storages/StorageMergeTreeFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,33 @@ local_engine::StorageMergeTreeFactory & local_engine::StorageMergeTreeFactory::i
return ret;
}
local_engine::CustomStorageMergeTreePtr
local_engine::StorageMergeTreeFactory::getStorage(StorageID id, std::function<CustomStorageMergeTreePtr()> creator)
local_engine::StorageMergeTreeFactory::getStorage(StorageID id, ColumnsDescription columns, std::function<CustomStorageMergeTreePtr()> creator)
{
auto table_name = id.database_name + "." + id.table_name;

if (!storage_map.contains(table_name))
{
std::lock_guard lock(storage_map_mutex);
if (storage_map.contains(table_name))
{
std::set<string>& existed_columns = storage_columns_map.at(table_name);
for (const auto& column : columns)
{
if (!existed_columns.contains(column.name))
{
storage_map.erase(table_name);
storage_columns_map.erase(table_name);
}
}
}
if (!storage_map.contains(table_name))
{
storage_map.emplace(table_name, creator());
storage_columns_map.emplace(table_name, std::set<std::string>());
for (const auto& column : storage_map.at(table_name)->getInMemoryMetadataPtr()->columns)
{
storage_columns_map.at(table_name).emplace(column.name);
}
}
}
return storage_map.at(table_name);
Expand All @@ -38,6 +55,7 @@ local_engine::StorageInMemoryMetadataPtr local_engine::StorageMergeTreeFactory::


std::unordered_map<std::string , local_engine::CustomStorageMergeTreePtr> local_engine::StorageMergeTreeFactory::storage_map;
std::unordered_map<std::string , std::set<std::string>> local_engine::StorageMergeTreeFactory::storage_columns_map;
std::mutex local_engine::StorageMergeTreeFactory::storage_map_mutex;

std::unordered_map<std::string , local_engine::StorageInMemoryMetadataPtr> local_engine::StorageMergeTreeFactory::metadata_map;
Expand Down
3 changes: 2 additions & 1 deletion utils/local-engine/Storages/StorageMergeTreeFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ class StorageMergeTreeFactory
{
public:
static StorageMergeTreeFactory & instance();
static CustomStorageMergeTreePtr getStorage(StorageID id, std::function<CustomStorageMergeTreePtr()> creator);
static CustomStorageMergeTreePtr getStorage(StorageID id, ColumnsDescription columns, std::function<CustomStorageMergeTreePtr()> creator);
static StorageInMemoryMetadataPtr getMetadata(StorageID id, std::function<StorageInMemoryMetadataPtr()> creator);


private:
static std::unordered_map<std::string , CustomStorageMergeTreePtr> storage_map;
static std::unordered_map<std::string , std::set<std::string>> storage_columns_map;
static std::mutex storage_map_mutex;

static std::unordered_map<std::string , StorageInMemoryMetadataPtr> metadata_map;
Expand Down
Loading

0 comments on commit ee7d3c8

Please sign in to comment.