Skip to content

Commit

Permalink
feat(cloud): Add File Cache Consistency Check (apache#41280)
Browse files Browse the repository at this point in the history
Add a feature to verify the consistency of the file cache, checking
whether the disk cache is under the control of the file cache management
system.
  • Loading branch information
yt committed Nov 7, 2024
1 parent d42e829 commit 17369e8
Show file tree
Hide file tree
Showing 9 changed files with 383 additions and 0 deletions.
20 changes: 20 additions & 0 deletions be/src/http/action/file_cache_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ constexpr static std::string_view CLEAR = "clear";
constexpr static std::string_view RESET = "reset";
constexpr static std::string_view HASH = "hash";
constexpr static std::string_view LIST_CACHE = "list_cache";
constexpr static std::string_view VIEW = "view";
constexpr static std::string_view CHECK = "check";
constexpr static std::string_view CAPACITY = "capacity";
constexpr static std::string_view RELEASE = "release";
constexpr static std::string_view BASE_PATH = "base_path";
Expand All @@ -60,6 +62,14 @@ Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metri
req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.data());
std::string operation = req->param(OP.data());
Status st = Status::OK();
auto get_json_array = [](const std::vector<std::string>& strs) {
EasyJson json_array;
json_array.SetArray();
for (const string& str : strs) {
json_array.PushBack(str);
}
return json_array;
};
if (operation == RELEASE) {
size_t released = 0;
const std::string& base_path = req->param(BASE_PATH.data());
Expand Down Expand Up @@ -110,6 +120,16 @@ Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metri
json[HASH.data()] = ret.to_string();
*json_metrics = json.ToString();
}
} else if (operation == VIEW) {
auto all_cache_base_path = io::FileCacheFactory::instance()->get_base_paths();
auto json_array = get_json_array(all_cache_base_path);
*json_metrics = json_array.ToString();
} else if (operation == CHECK) {
const std::string& cache_base_path = req->param(BASE_PATH.data());
auto* block_file_cache = io::FileCacheFactory::instance()->get_by_path(cache_base_path);
auto inconsistent_cache_context = block_file_cache->check_file_cache_consistency();
auto json_array = get_json_array(inconsistent_cache_context);
*json_metrics = json_array.ToString();
} else if (operation == LIST_CACHE) {
const std::string& segment_path = req->param(VALUE.data());
if (segment_path.empty()) {
Expand Down
24 changes: 24 additions & 0 deletions be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2084,4 +2084,28 @@ std::map<std::string, double> BlockFileCache::get_stats_unsafe() {
template void BlockFileCache::remove(FileBlockSPtr file_block,
std::lock_guard<std::mutex>& cache_lock,
std::lock_guard<std::mutex>& block_lock, bool sync);

std::vector<std::string> BlockFileCache::check_file_cache_consistency() {
std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash> confirmed_blocks;
std::vector<InconsistentCacheContext> inconsistent_cache_context;
std::lock_guard<std::mutex> lock(_mutex);
_storage->check_consistency(this, confirmed_blocks, inconsistent_cache_context, lock);
for (const auto& [hash, offset_to_cell] : _files) {
for (const auto& [offset, cell] : offset_to_cell) {
if (cell.is_deleted || confirmed_blocks.contains({hash, offset})) {
continue;
}
const auto& block = cell.file_block;
inconsistent_cache_context.emplace_back(
hash, block->expiration_time(), offset, block->cache_type(),
InconsistentCacheContext::InconsistentType::STORAGE_INCONSISTENT);
}
}
std::vector<std::string> inconsistent_results;
std::transform(inconsistent_cache_context.begin(), inconsistent_cache_context.end(),
std::back_inserter(inconsistent_results),
[](const InconsistentCacheContext& context) { return context.to_string(); });
return inconsistent_results;
}

} // namespace doris::io
3 changes: 3 additions & 0 deletions be/src/io/cache/block_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ class BlockFileCache {
using QueryFileCacheContextHolderPtr = std::unique_ptr<QueryFileCacheContextHolder>;
QueryFileCacheContextHolderPtr get_query_context_holder(const TUniqueId& query_id);

std::vector<std::string> check_file_cache_consistency();

private:
struct FileBlockCell {
FileBlockSPtr file_block;
Expand All @@ -349,6 +351,7 @@ class BlockFileCache {
bool releasable() const { return file_block.use_count() == 1; }

size_t size() const { return file_block->_block_range.size(); }
size_t dowloading_size() const { return file_block->_downloaded_size; }

FileBlockCell(FileBlockSPtr file_block, std::lock_guard<std::mutex>& cache_lock);
FileBlockCell(FileBlockCell&& other) noexcept
Expand Down
25 changes: 25 additions & 0 deletions be/src/io/cache/file_cache_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,21 @@

namespace doris::io {

std::string file_cache_type_to_string(FileCacheType type) {
switch (type) {
case FileCacheType::INDEX:
return "INDEX";
case FileCacheType::NORMAL:
return "NORMAL";
case FileCacheType::DISPOSABLE:
return "DISPOSABLE";
case FileCacheType::TTL:
return "TTL";
default:
return "UNKNOWN";
}
}

std::string FileCacheSettings::to_string() const {
std::stringstream ss;
ss << "capacity: " << capacity << ", max_file_block_size: " << max_file_block_size
Expand Down Expand Up @@ -86,4 +101,14 @@ FileBlocksHolderPtr FileCacheAllocatorBuilder::allocate_cache_holder(size_t offs
return std::make_unique<FileBlocksHolder>(std::move(holder));
}

std::string InconsistentCacheContext::to_string() const {
std::stringstream ss;
ss << "InconsistentCacheContext:\n"
<< "Hash: " << hash.to_string() << "\n"
<< "Expiration Time: " << expiration_time << "\n"
<< "Offset: " << offset << "\n"
<< "Cache Type: " << file_cache_type_to_string(cache_type) << "\n"
<< "Inconsistent Type: " << inconsistent_type.to_string();
return ss.str();
}
} // namespace doris::io
50 changes: 50 additions & 0 deletions be/src/io/cache/file_cache_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
// and modified by Doris

#pragma once
#include <cstdint>

#include "io/io_common.h"
#include "vec/common/uint128.h"

Expand All @@ -38,6 +40,7 @@ enum FileCacheType {
DISPOSABLE = 0,
TTL = 3,
};
std::string file_cache_type_to_string(FileCacheType type);

struct UInt128Wrapper {
uint128_t value_;
Expand Down Expand Up @@ -134,4 +137,51 @@ struct CacheContext {
bool is_cold_data {false};
};

struct InconsistentCacheContext {
UInt128Wrapper hash;
int64_t expiration_time;
size_t offset;
FileCacheType cache_type;
class InconsistentType {
uint32_t type;

public:
enum : uint32_t {
NONE = 0,
FILES_INCONSISTENT = 1 << 0,
STORAGE_INCONSISTENT = 1 << 1,
SIZE_INCONSISTENT = 1 << 2,
CACHE_TYPE_INCONSISTENT = 1 << 3,
EXPIRATION_TIME_INCONSISTENT = 1 << 4
};
InconsistentType(uint32_t t = 0) : type(t) {}
operator uint32_t&() { return type; }

std::string to_string() const {
std::stringstream ss;
if (type == NONE) {
ss << "NONE";
} else {
if (type & FILES_INCONSISTENT) {
ss << "FILES_INCONSISTENT ";
}
if (type & STORAGE_INCONSISTENT) {
ss << "STORAGE_INCONSISTENT ";
}
if (type & SIZE_INCONSISTENT) {
ss << "SIZE_INCONSISTENT ";
}
if (type & CACHE_TYPE_INCONSISTENT) {
ss << "CACHE_TYPE_INCONSISTENT ";
}
if (type & EXPIRATION_TIME_INCONSISTENT) {
ss << "EXPIRATION_TIME_INCONSISTENT ";
}
}
return ss.str();
}
} inconsistent_type;
std::string to_string() const;
};

} // namespace doris::io
7 changes: 7 additions & 0 deletions be/src/io/cache/file_cache_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include <mutex>

#include "io/cache/file_cache_common.h"
#include "util/slice.h"

Expand Down Expand Up @@ -67,6 +69,11 @@ class FileCacheStorage {
virtual FileCacheStorageType get_type() = 0;
// get local cached file
virtual std::string get_local_file(const FileCacheKey& key) = 0;
virtual void check_consistency(
BlockFileCache* _mgr,
std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash>& confirmed_blocks,
std::vector<InconsistentCacheContext>& inconsistent_cache_context,
std::lock_guard<std::mutex>& cache_lock) const {}
};

} // namespace doris::io
108 changes: 108 additions & 0 deletions be/src/io/cache/fs_file_cache_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,114 @@ void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) const
TEST_SYNC_POINT_CALLBACK("BlockFileCache::TmpFile2");
}

void FSFileCacheStorage::check_consistency(
BlockFileCache* _mgr,
std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash>& confirmed_blocks,
std::vector<InconsistentCacheContext>& inconsistent_cache_context,
std::lock_guard<std::mutex>& cache_lock) const {
std::error_code ec;
auto check = [_mgr, &cache_lock, &confirmed_blocks, &inconsistent_cache_context,
this](std::filesystem::directory_iterator key_it) {
for (; key_it != std::filesystem::directory_iterator(); ++key_it) {
auto key_with_suffix = key_it->path().filename().native();
auto delim_pos = key_with_suffix.find('_');
DCHECK(delim_pos != std::string::npos);
std::string key_str = key_with_suffix.substr(0, delim_pos);
std::string expiration_time_str = key_with_suffix.substr(delim_pos + 1);
auto hash = UInt128Wrapper(vectorized::unhex_uint<uint128_t>(key_str.c_str()));
std::error_code ec;
std::filesystem::directory_iterator offset_it(key_it->path(), ec);
if (ec) [[unlikely]] {
LOG(WARNING) << "filesystem error, failed to remove file, file=" << key_it->path()
<< " error=" << ec.message();
continue;
}
long expiration_time = std::stoul(expiration_time_str);
for (; offset_it != std::filesystem::directory_iterator(); ++offset_it) {
size_t size = offset_it->file_size(ec);
size_t offset = 0;
bool is_tmp = false;
FileCacheType cache_type = FileCacheType::NORMAL;
if (!this->parse_filename_suffix_to_cache_type(
fs, offset_it->path().filename().native(), expiration_time, size,
&offset, &is_tmp, &cache_type)) {
continue;
}
confirmed_blocks.insert({hash, offset});
std::string offset_path = offset_it->path();
auto* cell = _mgr->get_cell(hash, offset, cache_lock);

if (!cell || cell->is_deleted) {
inconsistent_cache_context.emplace_back(
hash, expiration_time, offset, cache_type,
InconsistentCacheContext::InconsistentType::FILES_INCONSISTENT);
continue;
}

size_t expected_size =
(is_tmp && cell->file_block->state() == FileBlock::State::DOWNLOADING)
? cell->dowloading_size()
: cell->size();
InconsistentCacheContext::InconsistentType inconsistent_type;
if (size != expected_size) {
inconsistent_type |=
InconsistentCacheContext::InconsistentType::SIZE_INCONSISTENT;
}
if (cache_type != cell->file_block->cache_type()) {
inconsistent_type |=
InconsistentCacheContext::InconsistentType::CACHE_TYPE_INCONSISTENT;
}
if (expiration_time != cell->file_block->expiration_time()) {
inconsistent_type |= InconsistentCacheContext::InconsistentType::
EXPIRATION_TIME_INCONSISTENT;
}

if (inconsistent_type != InconsistentCacheContext::InconsistentType::NONE) {
inconsistent_cache_context.emplace_back(hash, expiration_time, offset,
cache_type, inconsistent_type);
}
}
}
};
if constexpr (USE_CACHE_VERSION2) {
std::filesystem::directory_iterator key_prefix_it {_cache_base_path, ec};
if (ec) {
LOG(WARNING) << ec.message();
return;
}
for (; key_prefix_it != std::filesystem::directory_iterator(); ++key_prefix_it) {
if (!key_prefix_it->is_directory()) {
// skip version file
continue;
}
if (key_prefix_it->path().filename().native().size() != KEY_PREFIX_LENGTH) {
LOG(WARNING) << "Unknown directory " << key_prefix_it->path().native()
<< ", try to remove it";
std::error_code ec;
std::filesystem::remove(key_prefix_it->path(), ec);
if (ec) {
LOG(WARNING) << "failed to remove=" << key_prefix_it->path()
<< " msg=" << ec.message();
}
continue;
}
std::filesystem::directory_iterator key_it {key_prefix_it->path(), ec};
if (ec) {
LOG(WARNING) << ec.message();
continue;
}
check(key_it);
}
} else {
std::filesystem::directory_iterator key_it {_cache_base_path, ec};
if (ec) {
LOG(WARNING) << ec.message();
return;
}
check(key_it);
}
}

void FSFileCacheStorage::load_blocks_directly_unlocked(BlockFileCache* mgr, const FileCacheKey& key,
std::lock_guard<std::mutex>& cache_lock) {
// async load, can't find key, need to check exist.
Expand Down
6 changes: 6 additions & 0 deletions be/src/io/cache/fs_file_cache_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ class FSFileCacheStorage : public FileCacheStorage {

void load_cache_info_into_memory(BlockFileCache* _mgr) const;

void check_consistency(
BlockFileCache* _mgr,
std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash>& confirmed_blocks,
std::vector<InconsistentCacheContext>& inconsistent_cache_context,
std::lock_guard<std::mutex>& cache_lock) const override;

std::string _cache_base_path;
std::thread _cache_background_load_thread;
const std::shared_ptr<LocalFileSystem>& fs = global_local_filesystem();
Expand Down
Loading

0 comments on commit 17369e8

Please sign in to comment.