Skip to content

Commit

Permalink
fix more
Browse files Browse the repository at this point in the history
  • Loading branch information
loneylee committed Aug 16, 2024
1 parent 25cacc5 commit 1b545be
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package org.apache.gluten.execution;

public class CacheResult {
import java.io.Serializable;

public class CacheResult implements Serializable {
public enum Status {
RUNNING(0),
SUCCESS(1),
Expand Down
51 changes: 28 additions & 23 deletions cpp-ch/local-engine/Parser/RelMetric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,30 @@ extern const Event CachedReadBufferReadFromCacheMisses;
namespace local_engine
{

static void writeCacheHits(Writer<StringBuffer> & writer)
{
auto & counters = DB::CurrentThread::getProfileEvents();
auto read_cache_hits = counters[ProfileEvents::CachedReadBufferReadFromCacheHits].load();
auto miss_cache_hits = counters[ProfileEvents::CachedReadBufferReadFromCacheMisses].load();
auto read_cache_bytes = counters[ProfileEvents::CachedReadBufferReadFromCacheBytes].load();
auto read_miss_bytes = counters[ProfileEvents::CachedReadBufferReadFromSourceBytes].load();
auto read_cache_millisecond = counters[ProfileEvents::CachedReadBufferReadFromCacheMicroseconds].load() / 1000;
auto miss_cache_millisecond = counters[ProfileEvents::CachedReadBufferReadFromSourceMicroseconds].load() / 1000;

writer.Key("read_cache_hits");
writer.Uint64(read_cache_hits);
writer.Key("miss_cache_hits");
writer.Uint64(miss_cache_hits);
writer.Key("read_cache_bytes");
writer.Uint64(read_cache_bytes);
writer.Key("read_miss_bytes");
writer.Uint64(read_miss_bytes);
writer.Key("read_cache_millisecond");
writer.Uint64(read_cache_millisecond);
writer.Key("miss_cache_millisecond");
writer.Uint64(miss_cache_millisecond);
}

RelMetric::RelMetric(size_t id_, const String & name_, std::vector<DB::IQueryPlanStep *> & steps_) : id(id_), name(name_), steps(steps_)
{
}
Expand Down Expand Up @@ -138,7 +162,7 @@ void RelMetric::serialize(Writer<StringBuffer> & writer, bool) const
}
writer.EndArray();

if (auto read_mergetree = dynamic_cast<DB::ReadFromMergeTree*>(step))
if (auto read_mergetree = dynamic_cast<DB::ReadFromMergeTree *>(step))
{
auto selected_marks_pk = read_mergetree->getAnalysisResult().selected_marks_pk;
auto selected_marks = read_mergetree->getAnalysisResult().selected_marks;
Expand All @@ -149,30 +173,11 @@ void RelMetric::serialize(Writer<StringBuffer> & writer, bool) const
writer.Uint64(selected_marks);
writer.Key("total_marks_pk");
writer.Uint64(total_marks_pk);
writeCacheHits(writer);
}

if (auto read_from_file = dynamic_cast<local_engine::SubstraitFileSourceStep *>(step))
else if (dynamic_cast<SubstraitFileSourceStep *>(step))
{
auto & counters = DB::CurrentThread::getProfileEvents();
auto read_cache_hits = counters[ProfileEvents::CachedReadBufferReadFromCacheHits].load();
auto miss_cache_hits = counters[ProfileEvents::CachedReadBufferReadFromCacheMisses].load();
auto read_cache_bytes = counters[ProfileEvents::CachedReadBufferReadFromCacheBytes].load();
auto read_miss_bytes = counters[ProfileEvents::CachedReadBufferReadFromSourceBytes].load();
auto read_cache_millisecond = counters[ProfileEvents::CachedReadBufferReadFromCacheMicroseconds].load() / 1000;
auto miss_cache_millisecond = counters[ProfileEvents::CachedReadBufferReadFromSourceMicroseconds].load() / 1000;

writer.Key("read_cache_hits");
writer.Uint64(read_cache_hits);
writer.Key("miss_cache_hits");
writer.Uint64(miss_cache_hits);
writer.Key("read_cache_bytes");
writer.Uint64(read_cache_bytes);
writer.Key("read_miss_bytes");
writer.Uint64(read_miss_bytes);
writer.Key("read_cache_millisecond");
writer.Uint64(read_cache_millisecond);
writer.Key("miss_cache_millisecond");
writer.Uint64(miss_cache_millisecond);
writeCacheHits(writer);
}

writer.EndObject();
Expand Down
14 changes: 9 additions & 5 deletions cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ Task CacheManager::cacheFile(const substrait::ReadRel::LocalFiles::FileOrFiles &
catch (std::exception & e)
{
LOG_ERROR(getLogger("CacheManager"), "Load cache file {} failed.\n {}", file.uri_file(), e.what());
std::rethrow_exception(std::current_exception());
}
};

Expand All @@ -211,8 +212,11 @@ JobId CacheManager::cacheFiles(substrait::ReadRel::LocalFiles file_infos)
const Poco::URI file_uri(file_infos.items().Get(0).uri_file());
const auto read_buffer_builder = ReadBufferBuilderFactory::instance().createBuilder(file_uri.getScheme(), context);

for (const auto & file : file_infos.items())
job.addTask(cacheFile(file, read_buffer_builder));
if (read_buffer_builder->file_cache)
for (const auto & file : file_infos.items())
job.addTask(cacheFile(file, read_buffer_builder));
else
LOG_WARNING(getLogger("CacheManager"), "Load cache skipped because cache not enabled.");
}

auto & scheduler = JobScheduler::instance();
Expand All @@ -223,12 +227,12 @@ JobId CacheManager::cacheFiles(substrait::ReadRel::LocalFiles file_infos)
void CacheManager::removeFiles(String file, String cache_name)
{
// only for ut
for (auto cache_data : FileCacheFactory::instance().getAll())
for (const auto & [name, file_cache] : FileCacheFactory::instance().getAll())
{
if (cache_data.first != cache_name)
if (name != cache_name)
continue;

if (const auto cache = cache_data.second->cache)
if (const auto cache = file_cache->cache)
cache->removePathIfExists(file, DB::FileCache::getCommonUser().user_id);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ class ReadBufferBuilder

protected:
DB::ReadSettings getReadSettings(DB::ContextPtr context) const;

DB::ContextPtr context;

public:
DB::FileCachePtr file_cache = nullptr;
};

Expand Down

0 comments on commit 1b545be

Please sign in to comment.