Skip to content

Commit

Permalink
feat(cloud): Add File Cache Consistency Check (apache#41280)
Browse files Browse the repository at this point in the history
Add a feature to verify the consistency of the file cache, checking
whether the disk cache is under the control of the file cache management
system.
  • Loading branch information
yt committed Nov 21, 2024
1 parent bdef601 commit 7db7a4e
Show file tree
Hide file tree
Showing 9 changed files with 435 additions and 2 deletions.
23 changes: 22 additions & 1 deletion be/src/http/action/file_cache_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ constexpr static std::string_view CLEAR = "clear";
constexpr static std::string_view RESET = "reset";
constexpr static std::string_view HASH = "hash";
constexpr static std::string_view LIST_CACHE = "list_cache";
constexpr static std::string_view LIST_BASE_PATHS = "list_base_paths";
constexpr static std::string_view CHECK_CONSISTENCY = "check_consistency";
constexpr static std::string_view CAPACITY = "capacity";
constexpr static std::string_view RELEASE = "release";
constexpr static std::string_view BASE_PATH = "base_path";
Expand Down Expand Up @@ -127,6 +129,25 @@ Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metri
*json_metrics = json.ToString();
}
}
} else if (operation == LIST_BASE_PATHS) {
auto all_cache_base_path = io::FileCacheFactory::instance()->get_base_paths();
EasyJson json;
std::for_each(all_cache_base_path.begin(), all_cache_base_path.end(),
[&json](auto& x) { json.PushBack(std::move(x)); });
*json_metrics = json.ToString();
} else if (operation == CHECK_CONSISTENCY) {
const std::string& cache_base_path = req->param(BASE_PATH.data());
if (cache_base_path.empty()) {
st = Status::InvalidArgument("missing parameter: {} is required", VALUE.data());
} else {
auto* block_file_cache = io::FileCacheFactory::instance()->get_by_path(cache_base_path);
std::vector<std::string> inconsistencies;
RETURN_IF_ERROR(block_file_cache->report_file_cache_inconsistency(inconsistencies));
EasyJson json;
std::for_each(inconsistencies.begin(), inconsistencies.end(),
[&json](auto& x) { json.PushBack(std::move(x)); });
*json_metrics = json.ToString();
}
} else {
st = Status::InternalError("invalid operation: {}", operation);
}
Expand All @@ -145,4 +166,4 @@ void FileCacheAction::handle(HttpRequest* req) {
}
}

} // namespace doris
} // namespace doris
82 changes: 82 additions & 0 deletions be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

#include "io/cache/block_file_cache.h"

#include <unordered_set>

#include "common/status.h"
#include "cpp/sync_point.h"

Expand Down Expand Up @@ -2038,4 +2040,84 @@ std::map<std::string, double> BlockFileCache::get_stats_unsafe() {
template void BlockFileCache::remove(FileBlockSPtr file_block,
std::lock_guard<std::mutex>& cache_lock,
std::lock_guard<std::mutex>& block_lock, bool sync);

Status BlockFileCache::report_file_cache_inconsistency(std::vector<std::string>& results) {
InconsistencyContext inconsistency_context;
RETURN_IF_ERROR(check_file_cache_consistency(inconsistency_context));
auto n = inconsistency_context.types.size();
results.reserve(n);
for (size_t i = 0; i < n; i++) {
std::string result;
result += "File cahce info in manager:\n";
result += inconsistency_context.infos_in_manager[i].to_string();
result += "File cahce info in storage:\n";
result += inconsistency_context.infos_in_storage[i].to_string();
result += inconsistency_context.types[i].to_string();
result += "\n";
results.push_back(std::move(result));
}
return Status::OK();
}

Status BlockFileCache::check_file_cache_consistency(InconsistencyContext& inconsistency_context) {
std::lock_guard<std::mutex> cache_lock(_mutex);
std::vector<FileCacheInfo> infos_in_storage;
RETURN_IF_ERROR(_storage->get_file_cache_infos(infos_in_storage, cache_lock));
std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash> confirmed_blocks;
for (const auto& info_in_storage : infos_in_storage) {
confirmed_blocks.insert({info_in_storage.hash, info_in_storage.offset});
auto* cell = get_cell(info_in_storage.hash, info_in_storage.offset, cache_lock);
if (cell == nullptr) {
inconsistency_context.infos_in_manager.emplace_back();
inconsistency_context.infos_in_storage.push_back(info_in_storage);
inconsistency_context.types.emplace_back(InconsistencyType::NOT_LOADED);
continue;
}
FileCacheInfo info_in_manager {
.hash = info_in_storage.hash,
.expiration_time = cell->file_block->expiration_time(),
.size = cell->size(),
.offset = info_in_storage.offset,
.is_tmp = cell->file_block->state() == FileBlock::State::DOWNLOADING,
.cache_type = cell->file_block->cache_type()};
InconsistencyType inconsistent_type;
if (info_in_storage.is_tmp != info_in_manager.is_tmp) {
inconsistent_type |= InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE;
}
size_t expected_size =
info_in_manager.is_tmp ? cell->dowloading_size() : info_in_manager.size;
if (info_in_storage.size != expected_size) {
inconsistent_type |= InconsistencyType::SIZE_INCONSISTENT;
}
// Only if it is not a tmp file need we check the cache type.
if ((inconsistent_type & InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE) == 0 &&
info_in_storage.cache_type != info_in_manager.cache_type) {
inconsistent_type |= InconsistencyType::CACHE_TYPE_INCONSISTENT;
}
if (info_in_storage.expiration_time != info_in_manager.expiration_time) {
inconsistent_type |= InconsistencyType::EXPIRATION_TIME_INCONSISTENT;
}
if (inconsistent_type != InconsistencyType::NONE) {
inconsistency_context.infos_in_manager.push_back(info_in_manager);
inconsistency_context.infos_in_storage.push_back(info_in_storage);
inconsistency_context.types.push_back(inconsistent_type);
}
}

for (const auto& [hash, offset_to_cell] : _files) {
for (const auto& [offset, cell] : offset_to_cell) {
if (confirmed_blocks.contains({hash, offset})) {
continue;
}
const auto& block = cell.file_block;
inconsistency_context.infos_in_manager.emplace_back(
hash, block->expiration_time(), cell.size(), offset,
cell.file_block->state() == FileBlock::State::DOWNLOADING, block->cache_type());
inconsistency_context.infos_in_storage.emplace_back();
inconsistency_context.types.emplace_back(InconsistencyType::MISSING_IN_STORAGE);
}
}
return Status::OK();
}

} // namespace doris::io
4 changes: 4 additions & 0 deletions be/src/io/cache/block_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,9 @@ class BlockFileCache {
using QueryFileCacheContextHolderPtr = std::unique_ptr<QueryFileCacheContextHolder>;
QueryFileCacheContextHolderPtr get_query_context_holder(const TUniqueId& query_id);

Status report_file_cache_inconsistency(std::vector<std::string>& results);
Status check_file_cache_consistency(InconsistencyContext& inconsistency_context);

private:
struct FileBlockCell {
FileBlockSPtr file_block;
Expand All @@ -349,6 +352,7 @@ class BlockFileCache {
bool releasable() const { return file_block.use_count() == 1; }

size_t size() const { return file_block->_block_range.size(); }
size_t dowloading_size() const { return file_block->_downloaded_size; }

FileBlockCell(FileBlockSPtr file_block, std::lock_guard<std::mutex>& cache_lock);
FileBlockCell(FileBlockCell&& other) noexcept
Expand Down
52 changes: 52 additions & 0 deletions be/src/io/cache/file_cache_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,21 @@

namespace doris::io {

std::string file_cache_type_to_string(FileCacheType type) {
switch (type) {
case FileCacheType::INDEX:
return "INDEX";
case FileCacheType::NORMAL:
return "NORMAL";
case FileCacheType::DISPOSABLE:
return "DISPOSABLE";
case FileCacheType::TTL:
return "TTL";
default:
return "UNKNOWN";
}
}

std::string FileCacheSettings::to_string() const {
std::stringstream ss;
ss << "capacity: " << capacity << ", max_file_block_size: " << max_file_block_size
Expand Down Expand Up @@ -87,4 +102,41 @@ FileBlocksHolderPtr FileCacheAllocatorBuilder::allocate_cache_holder(size_t offs
return std::make_unique<FileBlocksHolder>(std::move(holder));
}

std::string FileCacheInfo::to_string() const {
std::stringstream ss;
ss << "Hash: " << hash.to_string() << "\n"
<< "Expiration Time: " << expiration_time << "\n"
<< "Offset: " << offset << "\n"
<< "Cache Type: " << file_cache_type_to_string(cache_type) << "\n";
return ss.str();
}

std::string InconsistencyType::to_string() const {
std::string result = "Inconsistency Reason: ";
if (type == NONE) {
result += "NONE";
} else {
if (type & NOT_LOADED) {
result += "NOT_LOADED ";
}
if (type & MISSING_IN_STORAGE) {
result += "MISSING_IN_STORAGE ";
}
if (type & SIZE_INCONSISTENT) {
result += "SIZE_INCONSISTENT ";
}
if (type & CACHE_TYPE_INCONSISTENT) {
result += "CACHE_TYPE_INCONSISTENT ";
}
if (type & EXPIRATION_TIME_INCONSISTENT) {
result += "EXPIRATION_TIME_INCONSISTENT ";
}
if (type & TMP_FILE_EXPECT_DOWNLOADING_STATE) {
result += "TMP_FILE_EXPECT_DOWNLOADING_STATE";
}
}
result += "\n";
return result;
}

} // namespace doris::io
47 changes: 47 additions & 0 deletions be/src/io/cache/file_cache_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
// and modified by Doris

#pragma once
#include <cstdint>
#include <vector>

#include "io/io_common.h"
#include "vec/common/uint128.h"

Expand All @@ -39,6 +42,7 @@ enum FileCacheType {
DISPOSABLE = 0,
TTL = 3,
};
std::string file_cache_type_to_string(FileCacheType type);

struct UInt128Wrapper {
uint128_t value_;
Expand Down Expand Up @@ -135,5 +139,48 @@ struct CacheContext {
int64_t expiration_time {0};
bool is_cold_data {false};
};
struct FileCacheInfo {
UInt128Wrapper hash {0};
uint64 expiration_time {0};
uint64_t size {0};
size_t offset {0};
bool is_tmp {false};
FileCacheType cache_type {NORMAL};

std::string to_string() const;
};

class InconsistencyType {
uint32_t type;

public:
enum : uint32_t {
// No anomaly
NONE = 0,
// Missing a block cache metadata in _files
NOT_LOADED = 1 << 0,
// A block cache is missing in storage
MISSING_IN_STORAGE = 1 << 1,
// Size of a block cache recorded in _files is inconsistent with the storage
SIZE_INCONSISTENT = 1 << 2,
// Cache type of a block cache recorded in _files is inconsistent with the storage
CACHE_TYPE_INCONSISTENT = 1 << 3,
// Expiration time of a block cache recorded in _files is inconsistent with the storage
EXPIRATION_TIME_INCONSISTENT = 1 << 4,
// File in storage has a _tmp suffix, but the state of block cache in _files is not set to downloading
TMP_FILE_EXPECT_DOWNLOADING_STATE = 1 << 5
};
InconsistencyType(uint32_t t = 0) : type(t) {}
operator uint32_t&() { return type; }

std::string to_string() const;
};

struct InconsistencyContext {
// The infos in _files of BlockFileCache.
std::vector<FileCacheInfo> infos_in_manager;
std::vector<FileCacheInfo> infos_in_storage;
std::vector<InconsistencyType> types;
};

} // namespace doris::io
7 changes: 7 additions & 0 deletions be/src/io/cache/file_cache_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

#pragma once

#include <mutex>

#include "common/status.h"
#include "io/cache/file_cache_common.h"
#include "util/slice.h"

Expand Down Expand Up @@ -67,6 +70,10 @@ class FileCacheStorage {
virtual FileCacheStorageType get_type() = 0;
// get local cached file
virtual std::string get_local_file(const FileCacheKey& key) = 0;
virtual Status get_file_cache_infos(std::vector<FileCacheInfo>& infos,
std::lock_guard<std::mutex>& cache_lock) const {
return Status::OK();
};
};

} // namespace doris::io
Loading

0 comments on commit 7db7a4e

Please sign in to comment.