Skip to content

Commit

Permalink
1. remove TempStorageFreer
Browse files Browse the repository at this point in the history
2. code cleanup
  • Loading branch information
baibaichen committed Aug 27, 2024
1 parent bfeafec commit 9cbb5c8
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 68 deletions.
49 changes: 15 additions & 34 deletions cpp-ch/local-engine/Common/MergeTreeTool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTSelectQuery.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Poco/StringTokenizer.h>

Expand Down Expand Up @@ -67,7 +67,7 @@ void setSecondaryIndex(
ss << ", ";
else
first = false;
ss << "_minmax_" << column.name << " " << column.name << " TYPE minmax GRANULARITY 1";
ss << "_minmax_" << column.name << " " << column.name << " TYPE minmax GRANULARITY 1";
}

if (bf_index_cols.contains(column.name))
Expand All @@ -76,7 +76,7 @@ void setSecondaryIndex(
ss << ", ";
else
first = false;
ss << "_bloomfilter_" << column.name << " " << column.name << " TYPE bloom_filter GRANULARITY 1";
ss << "_bloomfilter_" << column.name << " " << column.name << " TYPE bloom_filter GRANULARITY 1";
}

if (set_index_cols.contains(column.name))
Expand All @@ -91,34 +91,26 @@ void setSecondaryIndex(
metadata->setSecondaryIndices(IndicesDescription::parse(ss.str(), metadata->getColumns(), context));
}

std::shared_ptr<DB::StorageInMemoryMetadata> buildMetaData(
const DB::NamesAndTypesList & columns,
ContextPtr context,
const MergeTreeTable & table)
std::shared_ptr<DB::StorageInMemoryMetadata>
buildMetaData(const DB::NamesAndTypesList & columns, ContextPtr context, const MergeTreeTable & table)
{
std::shared_ptr<DB::StorageInMemoryMetadata> metadata = std::make_shared<DB::StorageInMemoryMetadata>();
ColumnsDescription columns_description;
for (const auto & item : columns)
{
columns_description.add(ColumnDescription(item.name, item.type));
}
metadata->setColumns(std::move(columns_description));

setSecondaryIndex(columns, context, table, metadata);

metadata->partition_key.expression_list_ast = std::make_shared<ASTExpressionList>();
metadata->sorting_key = KeyDescription::parse(table.order_by_key, metadata->getColumns(), context);
if (table.primary_key.empty())
{
if (table.order_by_key != MergeTreeTable::TUPLE)
metadata->primary_key = KeyDescription::parse(table.order_by_key, metadata->getColumns(), context);
else
if (table.order_by_key != MergeTreeTable::TUPLE)
metadata->primary_key = KeyDescription::parse(table.order_by_key, metadata->getColumns(), context);
else
metadata->primary_key.expression = std::make_shared<ExpressionActions>(ActionsDAG{});
}
else
{
metadata->primary_key = KeyDescription::parse(table.primary_key, metadata->getColumns(), context);
}
return metadata;
}

Expand All @@ -142,13 +134,12 @@ std::unique_ptr<SelectQueryInfo> buildQueryInfo(NamesAndTypesList & names_and_ty
}


void parseTableConfig(MergeTreeTableSettings & settings, String config_json)
void parseTableConfig(MergeTreeTableSettings & settings, const String & config_json)
{
rapidjson::Document doc;
doc.Parse(config_json.c_str());
if (doc.HasMember("storage_policy"))
settings.storage_policy = doc["storage_policy"].GetString();

}

MergeTreeTable parseMergeTreeTableString(const std::string & info)
Expand Down Expand Up @@ -240,27 +231,17 @@ bool sameColumns(const substrait::NamedStruct & left, const substrait::NamedStru
for (size_t i = 0; i < left.names_size(); i++)
map.emplace(left.names(i), left.struct_().types(i).kind_case());
for (size_t i = 0; i < right.names_size(); i++)
{
if (!map.contains(right.names(i)) || map[right.names(i)] != right.struct_().types(i).kind_case())
return false;
}
return true;
}

bool MergeTreeTable::sameStructWith(const MergeTreeTable & other) const
{
return database == other.database &&
table == other.table &&
snapshot_id == other.snapshot_id &&
sameColumns(schema, other.schema) &&
order_by_key == other.order_by_key &&
low_card_key == other.low_card_key &&
minmax_index_key == other.minmax_index_key &&
bf_index_key == other.bf_index_key &&
set_index_key == other.set_index_key &&
primary_key == other.primary_key &&
relative_path == other.relative_path &&
absolute_path == other.absolute_path &&
table_configs.storage_policy == other.table_configs.storage_policy;
return database == other.database && table == other.table && snapshot_id == other.snapshot_id && sameColumns(schema, other.schema)
&& order_by_key == other.order_by_key && low_card_key == other.low_card_key && minmax_index_key == other.minmax_index_key
&& bf_index_key == other.bf_index_key && set_index_key == other.set_index_key && primary_key == other.primary_key
&& relative_path == other.relative_path && absolute_path == other.absolute_path
&& table_configs.storage_policy == other.table_configs.storage_policy;
}
}
16 changes: 6 additions & 10 deletions cpp-ch/local-engine/Common/MergeTreeTool.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,15 @@
* limitations under the License.
*/
#pragma once
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>

#include <Interpreters/TableJoin.h>
#include <Interpreters/TreeRewriter.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSelectQuery.h>

#include <Interpreters/MergeTreeTransaction.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Interpreters/MergeTreeTransaction.h>
#include <substrait/plan.pb.h>

namespace local_engine
Expand Down Expand Up @@ -67,10 +62,11 @@ struct MergeTreeTable
std::vector<MergeTreePart> parts;
std::unordered_set<String> getPartNames() const;
RangesInDataParts extractRange(DataPartsVector parts_vector) const;
bool sameStructWith(const MergeTreeTable& other) const;
bool sameStructWith(const MergeTreeTable & other) const;
};

std::shared_ptr<DB::StorageInMemoryMetadata> buildMetaData(const DB::NamesAndTypesList &columns, ContextPtr context, const MergeTreeTable &);
std::shared_ptr<DB::StorageInMemoryMetadata>
buildMetaData(const DB::NamesAndTypesList & columns, ContextPtr context, const MergeTreeTable &);

std::unique_ptr<MergeTreeSettings> buildMergeTreeSettings(const MergeTreeTableSettings & config);

Expand Down
8 changes: 4 additions & 4 deletions cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ MergeTreeTable MergeTreeRelParser::parseMergeTreeTable(const substrait::ReadRel:

CustomStorageMergeTreePtr MergeTreeRelParser::parseStorage(const MergeTreeTable & merge_tree_table, ContextMutablePtr context, bool restore)
{
DB::Block header = TypeParser::buildBlockFromNamedStruct(merge_tree_table.schema, merge_tree_table.low_card_key);
auto names_and_types_list = header.getNamesAndTypesList();
auto metadata = buildMetaData(names_and_types_list, context, merge_tree_table);
const DB::Block header = TypeParser::buildBlockFromNamedStruct(merge_tree_table.schema, merge_tree_table.low_card_key);
const auto names_and_types_list = header.getNamesAndTypesList();
const auto metadata = buildMetaData(names_and_types_list, context, merge_tree_table);

// use instance global table (without uuid) to restore metadata folder on current instance
// we need its lock
Expand Down Expand Up @@ -396,7 +396,7 @@ String MergeTreeRelParser::filterRangesOnDriver(const substrait::ReadRel & read_
query_info->prewhere_info = parsePreWhereInfo(read_rel.filter(), input);

auto storage_factory = StorageMergeTreeFactory::instance();
std::vector<DataPartPtr> selected_parts = storage_factory.getDataPartsByNames(
std::vector<DataPartPtr> selected_parts = StorageMergeTreeFactory::getDataPartsByNames(
StorageID(merge_tree_table.database, merge_tree_table.table), merge_tree_table.snapshot_id, merge_tree_table.getPartNames());

auto storage_snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ void StorageMergeTreeFactory::freeStorage(const StorageID & id, const String & s
}


CustomStorageMergeTreePtr
StorageMergeTreeFactory::getStorage(const StorageID& id, const String & snapshot_id, MergeTreeTable merge_tree_table, std::function<CustomStorageMergeTreePtr()> creator)
CustomStorageMergeTreePtr StorageMergeTreeFactory::getStorage(
const StorageID & id, const String & snapshot_id, MergeTreeTable merge_tree_table, std::function<CustomStorageMergeTreePtr()> creator)
{
auto table_name = getTableName(id, snapshot_id);
std::lock_guard lock(storage_map_mutex);
Expand All @@ -78,7 +78,8 @@ StorageMergeTreeFactory::getStorage(const StorageID& id, const String & snapshot
return storage_map->get(table_name)->first;
}

DataPartsVector StorageMergeTreeFactory::getDataPartsByNames(const StorageID & id, const String & snapshot_id, std::unordered_set<String> part_name)
DataPartsVector
StorageMergeTreeFactory::getDataPartsByNames(const StorageID & id, const String & snapshot_id, std::unordered_set<String> part_name)
{
DataPartsVector res;
auto table_name = getTableName(id, snapshot_id);
Expand Down Expand Up @@ -121,8 +122,8 @@ DataPartsVector StorageMergeTreeFactory::getDataPartsByNames(const StorageID & i
return res;
}
// will be inited in native init phase
std::unique_ptr<Poco::LRUCache<std::string, std::pair<CustomStorageMergeTreePtr, MergeTreeTable>>> StorageMergeTreeFactory::storage_map = nullptr;
std::unique_ptr<Poco::LRUCache<std::string, std::shared_ptr<Poco::LRUCache<std::string, DataPartStorageHolderPtr>>>> StorageMergeTreeFactory::datapart_map = nullptr;
std::unique_ptr<storage_map_cache> StorageMergeTreeFactory::storage_map = nullptr;
std::unique_ptr<datapart_map_cache> StorageMergeTreeFactory::datapart_map = nullptr;
std::recursive_mutex StorageMergeTreeFactory::storage_map_mutex;
std::recursive_mutex StorageMergeTreeFactory::datapart_mutex;

Expand Down
21 changes: 7 additions & 14 deletions cpp-ch/local-engine/Storages/Mergetree/StorageMergeTreeFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <Common/GlutenConfig.h>
#include <Common/MergeTreeTool.h>


namespace local_engine
{
using CustomStorageMergeTreePtr = std::shared_ptr<CustomStorageMergeTree>;
Expand Down Expand Up @@ -56,7 +55,10 @@ class DataPartStorageHolder
DataPartPtr data_part_;
CustomStorageMergeTreePtr storage_;
};

using DataPartStorageHolderPtr = std::shared_ptr<DataPartStorageHolder>;
using storage_map_cache = Poco::LRUCache<std::string, std::pair<CustomStorageMergeTreePtr, MergeTreeTable>>;
using datapart_map_cache = Poco::LRUCache<std::string, std::shared_ptr<Poco::LRUCache<std::string, DataPartStorageHolderPtr>>>;

class StorageMergeTreeFactory
{
Expand All @@ -72,7 +74,7 @@ class StorageMergeTreeFactory
auto & storage_map_v = storage_map;
if (!storage_map_v)
{
storage_map_v = std::make_unique<Poco::LRUCache<std::string, std::pair<CustomStorageMergeTreePtr, MergeTreeTable>>>(config.table_metadata_cache_max_count);
storage_map_v = std::make_unique<storage_map_cache>(config.table_metadata_cache_max_count);
}
else
{
Expand All @@ -81,8 +83,7 @@ class StorageMergeTreeFactory
auto & datapart_map_v = datapart_map;
if (!datapart_map_v)
{
datapart_map_v = std::make_unique<Poco::LRUCache<std::string, std::shared_ptr<Poco::LRUCache<std::string, DataPartStorageHolderPtr>>>>(
config.table_metadata_cache_max_count);
datapart_map_v = std::make_unique<datapart_map_cache>(config.table_metadata_cache_max_count);
}
else
{
Expand All @@ -98,19 +99,11 @@ class StorageMergeTreeFactory
static String getTableName(const StorageID & id, const String & snapshot_id);

private:
static std::unique_ptr<Poco::LRUCache<std::string, std::pair<CustomStorageMergeTreePtr, MergeTreeTable>>> storage_map;
static std::unique_ptr<Poco::LRUCache<std::string, std::shared_ptr<Poco::LRUCache<std::string, DataPartStorageHolderPtr>>>> datapart_map;
static std::unique_ptr<storage_map_cache> storage_map;
static std::unique_ptr<datapart_map_cache> datapart_map;

static std::recursive_mutex storage_map_mutex;
static std::recursive_mutex datapart_mutex;
};

struct TempStorageFreer
{
StorageID id;
~TempStorageFreer()
{
StorageMergeTreeFactory::instance().freeStorage(id);
}
};
}
5 changes: 4 additions & 1 deletion cpp-ch/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,10 @@ JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
// prefetch all needed parts metadata before merge
local_engine::restoreMetaData(temp_storage, merge_tree_table, *context);

local_engine::TempStorageFreer freer{temp_storage->getStorageID()}; // to release temp CustomStorageMergeTree with RAII
// to release temp CustomStorageMergeTree with RAII
DB::StorageID storage_id = temp_storage->getStorageID();
SCOPE_EXIT({ local_engine::StorageMergeTreeFactory::freeStorage(storage_id);});

std::vector<DB::DataPartPtr> selected_parts = local_engine::StorageMergeTreeFactory::instance().getDataPartsByNames(
temp_storage->getStorageID(), "", merge_tree_table.getPartNames());

Expand Down

0 comments on commit 9cbb5c8

Please sign in to comment.