Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-7765][CH] Support CACHE META command for MergeTree table #7774

Merged
merged 1 commit into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
import java.util.Set;

public class CHNativeCacheManager {
public static String cacheParts(String table, Set<String> columns) {
return nativeCacheParts(table, String.join(",", columns));
public static String cacheParts(String table, Set<String> columns, boolean onlyMetaCache) {
return nativeCacheParts(table, String.join(",", columns), onlyMetaCache);
}

private static native String nativeCacheParts(String table, String columns);
private static native String nativeCacheParts(
String table, String columns, boolean onlyMetaCache);

public static CacheResult getCacheStatus(String jobId) {
return nativeGetCacheStatus(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ class GlutenExecutorEndpoint(val executorId: String, val conf: SparkConf)
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case GlutenMergeTreeCacheLoad(mergeTreeTable, columns) =>
case GlutenMergeTreeCacheLoad(mergeTreeTable, columns, onlyMetaCache) =>
try {
val jobId = CHNativeCacheManager.cacheParts(mergeTreeTable, columns)
val jobId = CHNativeCacheManager.cacheParts(mergeTreeTable, columns, onlyMetaCache)
context.reply(CacheJobInfo(status = true, jobId))
} catch {
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ object GlutenRpcMessages {
extends GlutenRpcMessage

// for mergetree cache
case class GlutenMergeTreeCacheLoad(mergeTreeTable: String, columns: util.Set[String])
case class GlutenMergeTreeCacheLoad(
mergeTreeTable: String,
columns: util.Set[String],
onlyMetaCache: Boolean)
extends GlutenRpcMessage

case class GlutenCacheLoadStatus(jobId: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ case class GlutenCHCacheDataCommand(
(
executorId,
executor.executorEndpointRef.ask[CacheJobInfo](
GlutenMergeTreeCacheLoad(tableMessage, selectedColumns.toSet.asJava)
GlutenMergeTreeCacheLoad(tableMessage, selectedColumns.toSet.asJava, onlyMetaCache)
)))
})
} else {
Expand All @@ -213,7 +213,7 @@ case class GlutenCHCacheDataCommand(
(
value._1,
executorData.executorEndpointRef.ask[CacheJobInfo](
GlutenMergeTreeCacheLoad(value._2, selectedColumns.toSet.asJava)
GlutenMergeTreeCacheLoad(value._2, selectedColumns.toSet.asJava, onlyMetaCache)
)))
})
}
Expand Down
24 changes: 18 additions & 6 deletions cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,31 @@ struct CacheJobContext
MergeTreeTableInstance table;
};

Task CacheManager::cachePart(const MergeTreeTableInstance & table, const MergeTreePart & part, const std::unordered_set<String> & columns)
Task CacheManager::cachePart(
const MergeTreeTableInstance & table, const MergeTreePart & part, const std::unordered_set<String> & columns, bool only_meta_cache)
{
CacheJobContext job_context{table};
job_context.table.parts.clear();
job_context.table.parts.push_back(part);
job_context.table.snapshot_id = "";
Task task = [job_detail = job_context, context = this->context, read_columns = columns]()
Task task = [job_detail = job_context, context = this->context, read_columns = columns, only_meta_cache]()
{
try
{
auto storage = job_detail.table.restoreStorage(context);
std::vector<DataPartPtr> selected_parts
= StorageMergeTreeFactory::getDataPartsByNames(storage->getStorageID(), "", {job_detail.table.parts.front().name});

if (only_meta_cache)
{
LOG_INFO(
getLogger("CacheManager"),
"Load meta cache of table {}.{} part {} success.",
job_detail.table.database,
job_detail.table.table,
job_detail.table.parts.front().name);
return;
}

auto storage_snapshot = std::make_shared<StorageSnapshot>(*storage, storage->getInMemoryMetadataPtr());
NamesAndTypesList names_and_types_list;
Expand All @@ -102,8 +116,6 @@ Task CacheManager::cachePart(const MergeTreeTableInstance & table, const MergeTr
names_and_types_list.push_back(NameAndTypePair(column.name, column.type));
}
auto query_info = buildQueryInfo(names_and_types_list);
std::vector<DataPartPtr> selected_parts
= StorageMergeTreeFactory::getDataPartsByNames(storage->getStorageID(), "", {job_detail.table.parts.front().name});
auto read_step = storage->reader.readFromParts(
selected_parts,
storage->getMutationsSnapshot({}),
Expand Down Expand Up @@ -135,13 +147,13 @@ Task CacheManager::cachePart(const MergeTreeTableInstance & table, const MergeTr
return std::move(task);
}

JobId CacheManager::cacheParts(const MergeTreeTableInstance & table, const std::unordered_set<String>& columns)
JobId CacheManager::cacheParts(const MergeTreeTableInstance & table, const std::unordered_set<String>& columns, bool only_meta_cache)
{
JobId id = toString(UUIDHelpers::generateV4());
Job job(id);
for (const auto & part : table.parts)
{
job.addTask(cachePart(table, part, columns));
job.addTask(cachePart(table, part, columns, only_meta_cache));
}
auto& scheduler = JobScheduler::instance();
scheduler.scheduleJob(std::move(job));
Expand Down
5 changes: 3 additions & 2 deletions cpp-ch/local-engine/Storages/Cache/CacheManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,16 @@ class CacheManager

static CacheManager & instance();
static void initialize(const DB::ContextMutablePtr & context);
JobId cacheParts(const MergeTreeTableInstance & table, const std::unordered_set<String> & columns);
JobId cacheParts(const MergeTreeTableInstance & table, const std::unordered_set<String> & columns, bool only_meta_cache);
static jobject getCacheStatus(JNIEnv * env, const String & jobId);

Task cacheFile(const substrait::ReadRel::LocalFiles::FileOrFiles & file, ReadBufferBuilderPtr read_buffer_builder);
JobId cacheFiles(substrait::ReadRel::LocalFiles file_infos);
static void removeFiles(String file, String cache_name);

private:
Task cachePart(const MergeTreeTableInstance & table, const MergeTreePart & part, const std::unordered_set<String> & columns);
Task cachePart(
const MergeTreeTableInstance & table, const MergeTreePart & part, const std::unordered_set<String> & columns, bool only_meta_cache);
CacheManager() = default;
DB::ContextMutablePtr context;
};
Expand Down
11 changes: 11 additions & 0 deletions cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
#include <Storages/MergeTree/SparkMergeTreeSink.h>
#include <Storages/MergeTree/checkDataPart.h>

namespace ProfileEvents
{
extern const Event LoadedDataParts;
extern const Event LoadedDataPartsMicroseconds;
}

namespace DB
{
namespace MergeTreeSetting
Expand Down Expand Up @@ -176,6 +182,7 @@ void SparkStorageMergeTree::prefetchMetaDataFile(std::unordered_set<std::string>

std::vector<MergeTreeDataPartPtr> SparkStorageMergeTree::loadDataPartsWithNames(const std::unordered_set<std::string> & parts)
{
Stopwatch watch;
prefetchMetaDataFile(parts);
std::vector<MergeTreeDataPartPtr> data_parts;
const auto disk = getStoragePolicy()->getDisks().at(0);
Expand All @@ -187,6 +194,10 @@ std::vector<MergeTreeDataPartPtr> SparkStorageMergeTree::loadDataPartsWithNames(
data_parts.emplace_back(res.part);
}

watch.stop();
LOG_DEBUG(log, "Loaded data parts ({} items) took {} microseconds", parts.size(), watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::LoadedDataParts, parts.size());
ProfileEvents::increment(ProfileEvents::LoadedDataPartsMicroseconds, watch.elapsedMicroseconds());
return data_parts;
}

Expand Down
6 changes: 3 additions & 3 deletions cpp-ch/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1340,8 +1340,8 @@ JNIEXPORT void Java_org_apache_gluten_utils_TestExceptionUtils_generateNativeExc
}


JNIEXPORT jstring
Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheParts(JNIEnv * env, jobject, jstring table_, jstring columns_)
JNIEXPORT jstring Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheParts(
JNIEnv * env, jobject, jstring table_, jstring columns_, jboolean only_meta_cache_)
{
LOCAL_ENGINE_JNI_METHOD_START
auto table_def = jstring2string(env, table_);
Expand All @@ -1351,7 +1351,7 @@ Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheParts(JNIEnv *
for (const auto & col : tokenizer)
column_set.insert(col);
local_engine::MergeTreeTableInstance table(table_def);
auto id = local_engine::CacheManager::instance().cacheParts(table, column_set);
auto id = local_engine::CacheManager::instance().cacheParts(table, column_set, only_meta_cache_);
return local_engine::charTojstring(env, id.c_str());
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr);
}
Expand Down
Loading