Skip to content

Commit

Permalink
move thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Sep 3, 2024
1 parent 190e062 commit 73ee427
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 215 deletions.
62 changes: 6 additions & 56 deletions cpp-ch/local-engine/Common/CHUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ class BlockUtil

/// The column names may be different in two blocks.
/// and the nullability also could be different, with TPCDS-Q1 as an example.
static DB::ColumnWithTypeAndName convertColumnAsNecessary(const DB::ColumnWithTypeAndName & column, const DB::ColumnWithTypeAndName & sample_column);
static DB::ColumnWithTypeAndName
convertColumnAsNecessary(const DB::ColumnWithTypeAndName & column, const DB::ColumnWithTypeAndName & sample_column);
};

class PODArrayUtil
Expand Down Expand Up @@ -214,7 +215,8 @@ class BackendInitializerUtil
static void registerAllFactories();
static void applyGlobalConfigAndSettings(DB::Context::ConfigurationPtr, DB::Settings &);
static void updateNewSettings(const DB::ContextMutablePtr &, const DB::Settings &);
static std::vector<String> wrapDiskPathConfig(const String & path_prefix, const String & path_suffix, Poco::Util::AbstractConfiguration & config);
static std::vector<String>
wrapDiskPathConfig(const String & path_prefix, const String & path_suffix, Poco::Util::AbstractConfiguration & config);


static std::map<std::string, std::string> getBackendConfMap(std::string_view plan);
Expand Down Expand Up @@ -260,64 +262,12 @@ class MemoryUtil
static UInt64 getMemoryRSS();
};

template <typename T>
class ConcurrentDeque
{
public:
std::optional<T> pop_front()
{
std::lock_guard<std::mutex> lock(mtx);

if (deq.empty())
return {};

T t = deq.front();
deq.pop_front();
return t;
}

void emplace_back(T value)
{
std::lock_guard<std::mutex> lock(mtx);
deq.emplace_back(value);
}

void emplace_back(std::vector<T> values)
{
std::lock_guard<std::mutex> lock(mtx);
deq.insert(deq.end(), values.begin(), values.end());
}

void emplace_front(T value)
{
std::lock_guard<std::mutex> lock(mtx);
deq.emplace_front(value);
}

size_t size()
{
std::lock_guard<std::mutex> lock(mtx);
return deq.size();
}

bool empty()
{
std::lock_guard<std::mutex> lock(mtx);
return deq.empty();
}

std::deque<T> unsafeGet() { return deq; }

private:
std::deque<T> deq;
mutable std::mutex mtx;
};

class JoinUtil
{
public:
static void reorderJoinOutput(DB::QueryPlan & plan, DB::Names cols);
static std::pair<DB::JoinKind, DB::JoinStrictness> getJoinKindAndStrictness(substrait::JoinRel_JoinType join_type, bool is_existence_join);
static std::pair<DB::JoinKind, DB::JoinStrictness>
getJoinKindAndStrictness(substrait::JoinRel_JoinType join_type, bool is_existence_join);
static std::pair<DB::JoinKind, DB::JoinStrictness> getCrossJoinKindAndStrictness(substrait::CrossRel_JoinType join_type);
};

Expand Down
6 changes: 3 additions & 3 deletions cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ std::vector<MergeTreeDataPartPtr> mergeParts(
std::vector<DB::DataPartPtr> selected_parts,
std::unordered_map<String, String> & partition_values,
const String & new_part_uuid,
CustomStorageMergeTreePtr storage,
CustomStorageMergeTree & storage,
const String & partition_dir,
const String & bucket_dir)
{
Expand All @@ -179,15 +179,15 @@ std::vector<MergeTreeDataPartPtr> mergeParts(
// Copying a vector of columns `deduplicate by columns.
DB::IExecutableTask::TaskResultCallback f = [](bool) {};
auto task = std::make_shared<local_engine::MergeSparkMergeTreeTask>(
*storage, storage->getInMemoryMetadataPtr(), false, std::vector<std::string>{}, false, entry,
storage, storage.getInMemoryMetadataPtr(), false, std::vector<std::string>{}, false, entry,
DB::TableLockHolder{}, f);

task->setCurrentTransaction(DB::MergeTreeTransactionHolder{}, DB::MergeTreeTransactionPtr{});

executeHere(task);

std::unordered_set<std::string> to_load{future_part->name};
std::vector<MergeTreeDataPartPtr> merged = storage->loadDataPartsWithNames(to_load);
std::vector<MergeTreeDataPartPtr> merged = storage.loadDataPartsWithNames(to_load);
return merged;
}

Expand Down
7 changes: 2 additions & 5 deletions cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,13 @@ namespace local_engine
void restoreMetaData(const CustomStorageMergeTreePtr & storage, const MergeTreeTable & mergeTreeTable, const Context & context);

void saveFileStatus(
const DB::MergeTreeData & storage,
const DB::ContextPtr& context,
const String & part_name,
IDataPartStorage & data_part_storage);
const DB::MergeTreeData & storage, const DB::ContextPtr & context, const String & part_name, IDataPartStorage & data_part_storage);

std::vector<MergeTreeDataPartPtr> mergeParts(
std::vector<DB::DataPartPtr> selected_parts,
std::unordered_map<String, String> & partition_values,
const String & new_part_uuid,
CustomStorageMergeTreePtr storage,
CustomStorageMergeTree & storage,
const String & partition_dir,
const String & bucket_dir);

Expand Down
Loading

0 comments on commit 73ee427

Please sign in to comment.