Skip to content

Commit

Permalink
[GLUTEN-7765][CH] Support CACHE META command for MergeTree table (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
loneylee authored Nov 1, 2024
1 parent 0fd4957 commit 62ed4ab
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
import java.util.Set;

public class CHNativeCacheManager {
public static String cacheParts(String table, Set<String> columns) {
return nativeCacheParts(table, String.join(",", columns));
public static String cacheParts(String table, Set<String> columns, boolean onlyMetaCache) {
return nativeCacheParts(table, String.join(",", columns), onlyMetaCache);
}

private static native String nativeCacheParts(String table, String columns);
private static native String nativeCacheParts(
String table, String columns, boolean onlyMetaCache);

public static CacheResult getCacheStatus(String jobId) {
return nativeGetCacheStatus(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ class GlutenExecutorEndpoint(val executorId: String, val conf: SparkConf)
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case GlutenMergeTreeCacheLoad(mergeTreeTable, columns) =>
case GlutenMergeTreeCacheLoad(mergeTreeTable, columns, onlyMetaCache) =>
try {
val jobId = CHNativeCacheManager.cacheParts(mergeTreeTable, columns)
val jobId = CHNativeCacheManager.cacheParts(mergeTreeTable, columns, onlyMetaCache)
context.reply(CacheJobInfo(status = true, jobId))
} catch {
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ object GlutenRpcMessages {
extends GlutenRpcMessage

// for mergetree cache
case class GlutenMergeTreeCacheLoad(mergeTreeTable: String, columns: util.Set[String])
case class GlutenMergeTreeCacheLoad(
mergeTreeTable: String,
columns: util.Set[String],
onlyMetaCache: Boolean)
extends GlutenRpcMessage

case class GlutenCacheLoadStatus(jobId: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ case class GlutenCHCacheDataCommand(
(
executorId,
executor.executorEndpointRef.ask[CacheJobInfo](
GlutenMergeTreeCacheLoad(tableMessage, selectedColumns.toSet.asJava)
GlutenMergeTreeCacheLoad(tableMessage, selectedColumns.toSet.asJava, onlyMetaCache)
)))
})
} else {
Expand All @@ -213,7 +213,7 @@ case class GlutenCHCacheDataCommand(
(
value._1,
executorData.executorEndpointRef.ask[CacheJobInfo](
GlutenMergeTreeCacheLoad(value._2, selectedColumns.toSet.asJava)
GlutenMergeTreeCacheLoad(value._2, selectedColumns.toSet.asJava, onlyMetaCache)
)))
})
}
Expand Down
24 changes: 18 additions & 6 deletions cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,31 @@ struct CacheJobContext
MergeTreeTableInstance table;
};

Task CacheManager::cachePart(const MergeTreeTableInstance & table, const MergeTreePart & part, const std::unordered_set<String> & columns)
Task CacheManager::cachePart(
const MergeTreeTableInstance & table, const MergeTreePart & part, const std::unordered_set<String> & columns, bool only_meta_cache)
{
CacheJobContext job_context{table};
job_context.table.parts.clear();
job_context.table.parts.push_back(part);
job_context.table.snapshot_id = "";
Task task = [job_detail = job_context, context = this->context, read_columns = columns]()
Task task = [job_detail = job_context, context = this->context, read_columns = columns, only_meta_cache]()
{
try
{
auto storage = job_detail.table.restoreStorage(context);
std::vector<DataPartPtr> selected_parts
= StorageMergeTreeFactory::getDataPartsByNames(storage->getStorageID(), "", {job_detail.table.parts.front().name});

if (only_meta_cache)
{
LOG_INFO(
getLogger("CacheManager"),
"Load meta cache of table {}.{} part {} success.",
job_detail.table.database,
job_detail.table.table,
job_detail.table.parts.front().name);
return;
}

auto storage_snapshot = std::make_shared<StorageSnapshot>(*storage, storage->getInMemoryMetadataPtr());
NamesAndTypesList names_and_types_list;
Expand All @@ -102,8 +116,6 @@ Task CacheManager::cachePart(const MergeTreeTableInstance & table, const MergeTr
names_and_types_list.push_back(NameAndTypePair(column.name, column.type));
}
auto query_info = buildQueryInfo(names_and_types_list);
std::vector<DataPartPtr> selected_parts
= StorageMergeTreeFactory::getDataPartsByNames(storage->getStorageID(), "", {job_detail.table.parts.front().name});
auto read_step = storage->reader.readFromParts(
selected_parts,
storage->getMutationsSnapshot({}),
Expand Down Expand Up @@ -135,13 +147,13 @@ Task CacheManager::cachePart(const MergeTreeTableInstance & table, const MergeTr
return std::move(task);
}

JobId CacheManager::cacheParts(const MergeTreeTableInstance & table, const std::unordered_set<String>& columns)
JobId CacheManager::cacheParts(const MergeTreeTableInstance & table, const std::unordered_set<String>& columns, bool only_meta_cache)
{
JobId id = toString(UUIDHelpers::generateV4());
Job job(id);
for (const auto & part : table.parts)
{
job.addTask(cachePart(table, part, columns));
job.addTask(cachePart(table, part, columns, only_meta_cache));
}
auto& scheduler = JobScheduler::instance();
scheduler.scheduleJob(std::move(job));
Expand Down
5 changes: 3 additions & 2 deletions cpp-ch/local-engine/Storages/Cache/CacheManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,16 @@ class CacheManager

static CacheManager & instance();
static void initialize(const DB::ContextMutablePtr & context);
JobId cacheParts(const MergeTreeTableInstance & table, const std::unordered_set<String> & columns);
JobId cacheParts(const MergeTreeTableInstance & table, const std::unordered_set<String> & columns, bool only_meta_cache);
static jobject getCacheStatus(JNIEnv * env, const String & jobId);

Task cacheFile(const substrait::ReadRel::LocalFiles::FileOrFiles & file, ReadBufferBuilderPtr read_buffer_builder);
JobId cacheFiles(substrait::ReadRel::LocalFiles file_infos);
static void removeFiles(String file, String cache_name);

private:
Task cachePart(const MergeTreeTableInstance & table, const MergeTreePart & part, const std::unordered_set<String> & columns);
Task cachePart(
const MergeTreeTableInstance & table, const MergeTreePart & part, const std::unordered_set<String> & columns, bool only_meta_cache);
CacheManager() = default;
DB::ContextMutablePtr context;
};
Expand Down
11 changes: 11 additions & 0 deletions cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
#include <Storages/MergeTree/SparkMergeTreeSink.h>
#include <Storages/MergeTree/checkDataPart.h>

namespace ProfileEvents
{
extern const Event LoadedDataParts;
extern const Event LoadedDataPartsMicroseconds;
}

namespace DB
{
namespace MergeTreeSetting
Expand Down Expand Up @@ -176,6 +182,7 @@ void SparkStorageMergeTree::prefetchMetaDataFile(std::unordered_set<std::string>

std::vector<MergeTreeDataPartPtr> SparkStorageMergeTree::loadDataPartsWithNames(const std::unordered_set<std::string> & parts)
{
Stopwatch watch;
prefetchMetaDataFile(parts);
std::vector<MergeTreeDataPartPtr> data_parts;
const auto disk = getStoragePolicy()->getDisks().at(0);
Expand All @@ -187,6 +194,10 @@ std::vector<MergeTreeDataPartPtr> SparkStorageMergeTree::loadDataPartsWithNames(
data_parts.emplace_back(res.part);
}

watch.stop();
LOG_DEBUG(log, "Loaded data parts ({} items) took {} microseconds", parts.size(), watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::LoadedDataParts, parts.size());
ProfileEvents::increment(ProfileEvents::LoadedDataPartsMicroseconds, watch.elapsedMicroseconds());
return data_parts;
}

Expand Down
6 changes: 3 additions & 3 deletions cpp-ch/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1340,8 +1340,8 @@ JNIEXPORT void Java_org_apache_gluten_utils_TestExceptionUtils_generateNativeExc
}


JNIEXPORT jstring
Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheParts(JNIEnv * env, jobject, jstring table_, jstring columns_)
JNIEXPORT jstring Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheParts(
JNIEnv * env, jobject, jstring table_, jstring columns_, jboolean only_meta_cache_)
{
LOCAL_ENGINE_JNI_METHOD_START
auto table_def = jstring2string(env, table_);
Expand All @@ -1351,7 +1351,7 @@ Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheParts(JNIEnv *
for (const auto & col : tokenizer)
column_set.insert(col);
local_engine::MergeTreeTableInstance table(table_def);
auto id = local_engine::CacheManager::instance().cacheParts(table, column_set);
auto id = local_engine::CacheManager::instance().cacheParts(table, column_set, only_meta_cache_);
return local_engine::charTojstring(env, id.c_str());
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr);
}
Expand Down

0 comments on commit 62ed4ab

Please sign in to comment.