From 1b545be24d199eb7b009ffdd5d3858269773d1c4 Mon Sep 17 00:00:00 2001 From: loneylee Date: Fri, 16 Aug 2024 15:03:31 +0800 Subject: [PATCH] fix more --- .../apache/gluten/execution/CacheResult.java | 4 +- cpp-ch/local-engine/Parser/RelMetric.cpp | 51 ++++++++++--------- .../Storages/Cache/CacheManager.cpp | 14 +++-- .../SubstraitSource/ReadBufferBuilder.h | 3 +- 4 files changed, 42 insertions(+), 30 deletions(-) diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CacheResult.java b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CacheResult.java index 0fa69e0d0b1f..b6d538039ec4 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CacheResult.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CacheResult.java @@ -16,7 +16,9 @@ */ package org.apache.gluten.execution; -public class CacheResult { +import java.io.Serializable; + +public class CacheResult implements Serializable { public enum Status { RUNNING(0), SUCCESS(1), diff --git a/cpp-ch/local-engine/Parser/RelMetric.cpp b/cpp-ch/local-engine/Parser/RelMetric.cpp index 8762c154f807..039d978bf77d 100644 --- a/cpp-ch/local-engine/Parser/RelMetric.cpp +++ b/cpp-ch/local-engine/Parser/RelMetric.cpp @@ -45,6 +45,30 @@ extern const Event CachedReadBufferReadFromCacheMisses; namespace local_engine { +static void writeCacheHits(Writer & writer) +{ + auto & counters = DB::CurrentThread::getProfileEvents(); + auto read_cache_hits = counters[ProfileEvents::CachedReadBufferReadFromCacheHits].load(); + auto miss_cache_hits = counters[ProfileEvents::CachedReadBufferReadFromCacheMisses].load(); + auto read_cache_bytes = counters[ProfileEvents::CachedReadBufferReadFromCacheBytes].load(); + auto read_miss_bytes = counters[ProfileEvents::CachedReadBufferReadFromSourceBytes].load(); + auto read_cache_millisecond = counters[ProfileEvents::CachedReadBufferReadFromCacheMicroseconds].load() / 1000; + auto miss_cache_millisecond = counters[ProfileEvents::CachedReadBufferReadFromSourceMicroseconds].load() / 1000; + + writer.Key("read_cache_hits"); + writer.Uint64(read_cache_hits); + writer.Key("miss_cache_hits"); + writer.Uint64(miss_cache_hits); + writer.Key("read_cache_bytes"); + writer.Uint64(read_cache_bytes); + writer.Key("read_miss_bytes"); + writer.Uint64(read_miss_bytes); + writer.Key("read_cache_millisecond"); + writer.Uint64(read_cache_millisecond); + writer.Key("miss_cache_millisecond"); + writer.Uint64(miss_cache_millisecond); +} + RelMetric::RelMetric(size_t id_, const String & name_, std::vector & steps_) : id(id_), name(name_), steps(steps_) { } @@ -138,7 +162,7 @@ void RelMetric::serialize(Writer & writer, bool) const } writer.EndArray(); - if (auto read_mergetree = dynamic_cast(step)) + if (auto read_mergetree = dynamic_cast(step)) { auto selected_marks_pk = read_mergetree->getAnalysisResult().selected_marks_pk; auto selected_marks = read_mergetree->getAnalysisResult().selected_marks; @@ -149,30 +173,11 @@ void RelMetric::serialize(Writer & writer, bool) const writer.Uint64(selected_marks); writer.Key("total_marks_pk"); writer.Uint64(total_marks_pk); + writeCacheHits(writer); } - - if (auto read_from_file = dynamic_cast(step)) + else if (dynamic_cast(step)) { - auto & counters = DB::CurrentThread::getProfileEvents(); - auto read_cache_hits = counters[ProfileEvents::CachedReadBufferReadFromCacheHits].load(); - auto miss_cache_hits = counters[ProfileEvents::CachedReadBufferReadFromCacheMisses].load(); - auto read_cache_bytes = counters[ProfileEvents::CachedReadBufferReadFromCacheBytes].load(); - auto read_miss_bytes = counters[ProfileEvents::CachedReadBufferReadFromSourceBytes].load(); - auto read_cache_millisecond = counters[ProfileEvents::CachedReadBufferReadFromCacheMicroseconds].load() / 1000; - auto miss_cache_millisecond = counters[ProfileEvents::CachedReadBufferReadFromSourceMicroseconds].load() / 1000; - - writer.Key("read_cache_hits"); - writer.Uint64(read_cache_hits); - writer.Key("miss_cache_hits"); - writer.Uint64(miss_cache_hits); - writer.Key("read_cache_bytes"); - writer.Uint64(read_cache_bytes); - writer.Key("read_miss_bytes"); - writer.Uint64(read_miss_bytes); - writer.Key("read_cache_millisecond"); - writer.Uint64(read_cache_millisecond); - writer.Key("miss_cache_millisecond"); - writer.Uint64(miss_cache_millisecond); + writeCacheHits(writer); } writer.EndObject(); diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp index 65d130662f41..0dc852a90110 100644 --- a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp +++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp @@ -195,6 +195,7 @@ Task CacheManager::cacheFile(const substrait::ReadRel::LocalFiles::FileOrFiles & catch (std::exception & e) { LOG_ERROR(getLogger("CacheManager"), "Load cache file {} failed.\n {}", file.uri_file(), e.what()); + std::rethrow_exception(std::current_exception()); } }; @@ -211,8 +212,11 @@ JobId CacheManager::cacheFiles(substrait::ReadRel::LocalFiles file_infos) const Poco::URI file_uri(file_infos.items().Get(0).uri_file()); const auto read_buffer_builder = ReadBufferBuilderFactory::instance().createBuilder(file_uri.getScheme(), context); - for (const auto & file : file_infos.items()) - job.addTask(cacheFile(file, read_buffer_builder)); + if (read_buffer_builder->file_cache) + for (const auto & file : file_infos.items()) + job.addTask(cacheFile(file, read_buffer_builder)); + else + LOG_WARNING(getLogger("CacheManager"), "Load cache skipped because cache not enabled."); } auto & scheduler = JobScheduler::instance(); @@ -223,12 +227,12 @@ JobId CacheManager::cacheFiles(substrait::ReadRel::LocalFiles file_infos) void CacheManager::removeFiles(String file, String cache_name) { // only for ut - for (auto cache_data : FileCacheFactory::instance().getAll()) + for (const auto & [name, file_cache] : FileCacheFactory::instance().getAll()) { - if (cache_data.first != cache_name) + if (name != cache_name) continue; - if (const auto cache = cache_data.second->cache) + if (const auto cache = file_cache->cache) cache->removePathIfExists(file, DB::FileCache::getCommonUser().user_id); } } diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h index 2e1bd5e8e680..92d8d41c1290 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h +++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h @@ -41,8 +41,9 @@ class ReadBufferBuilder protected: DB::ReadSettings getReadSettings(DB::ContextPtr context) const; - DB::ContextPtr context; + +public: DB::FileCachePtr file_cache = nullptr; };