From abaf323d907334fbfe35353893d081eacc46561b Mon Sep 17 00:00:00 2001 From: xiaoxmeng Date: Sun, 26 May 2024 11:37:05 -0700 Subject: [PATCH] Extend lru cache to support cache entry expiration (#9860) Summary: Extend simple LRU cache to support cache entry expiration for Meta internal remote storage throttling use case. It mainly replace folly simple cache with its own cache implementation so that we can customize the caching behavior. We are have two folly intrusive lists one for lru and the other expiration list for cache expiration. The lru list updated on each cache lookup according the data recency while expiration list is never updated and is positioned based on the cache insertion time. When create simple LRU cache, we need to configure the expiration timeout in milliseconds. If it is zero, then there is no cache expiration enforcement. Otherwise, we will check the expiration upon cache access, cache insertion and cache entry release. We will evict out an expired cache entry no matter how hot it is. Pull Request resolved: https://github.com/facebookincubator/velox/pull/9860 Reviewed By: Yuhta Differential Revision: D57691395 Pulled By: xiaoxmeng fbshipit-source-id: a7ca88fb245aefa0e8b2f8465ea5b6078d991277 --- velox/common/caching/CMakeLists.txt | 1 + velox/common/caching/CachedFactory.h | 471 +++++++++++++----- velox/common/caching/SimpleLRUCache.h | 370 +++++++++++--- velox/common/caching/tests/CMakeLists.txt | 9 +- .../caching/tests/CachedFactoryTest.cpp | 463 ++++++++++++----- .../caching/tests/SimpleLRUCacheTest.cpp | 247 +++++++-- velox/connectors/hive/FileHandle.cpp | 12 +- velox/connectors/hive/FileHandle.h | 14 +- velox/connectors/hive/HiveConnector.cpp | 3 +- velox/connectors/hive/SplitReader.cpp | 16 +- .../iceberg/PositionalDeleteFileReader.cpp | 6 +- .../tests/S3FileSystemRegistrationTest.cpp | 4 +- .../s3fs/tests/S3FileSystemTest.cpp | 34 ++ .../connectors/hive/tests/FileHandleTest.cpp | 5 +- 14 files changed, 1289 insertions(+), 366 deletions(-) diff --git a/velox/common/caching/CMakeLists.txt b/velox/common/caching/CMakeLists.txt index 4917957126ae..9a1d91f1f0f4 100644 --- a/velox/common/caching/CMakeLists.txt +++ b/velox/common/caching/CMakeLists.txt @@ -29,6 +29,7 @@ target_link_libraries( velox_file velox_memory velox_process + velox_time Folly::folly fmt::fmt gflags::gflags diff --git a/velox/common/caching/CachedFactory.h b/velox/common/caching/CachedFactory.h index 70dddc4895ff..e0587ef04aef 100644 --- a/velox/common/caching/CachedFactory.h +++ b/velox/common/caching/CachedFactory.h @@ -31,6 +31,7 @@ #include #include +#include #include #include "folly/container/F14Set.h" @@ -39,93 +40,219 @@ namespace facebook::velox { -// CachedFactory provides a thread-safe way of backing a keyed generator -// (e.g. the key is filename, and the value is the file data) by a cache. -// -// Generator should take a single Key argument and return a Value; -// The Value should be either a value type or should manage its own lifecycle -// (shared_ptr). If it is not thread-safe it must do its own internal locking. -template +/// A smart pointer that represents data that may be in a cache and is thus not +/// owned, or is owned like a unique_ptr. We could also implement this by a +/// unique_ptr with a custom deleter. +template < + typename Key, + typename Value, + typename Comparator = std::equal_to, + typename Hash = std::hash> +class CachedPtr { + public: + /// Nullptr case. + CachedPtr(); + + /// Data is not in cache, ownership taken by *this. + explicit CachedPtr(Value* value); + + /// Data is in the provided cache referenced by the given key. The cache is + /// not guarded by a mutex. + CachedPtr( + bool cached, + Value* value, + SimpleLRUCache* cache, + std::unique_ptr key); + + /// Same as above, but the cache is guarded by a mutex. + CachedPtr( + bool cached, + Value* value, + SimpleLRUCache* cache, + std::unique_ptr key, + std::mutex* cacheMu); + + /// The destructor handles the in-cache and non-in-cache cases appropriately. + ~CachedPtr(); + + /// Move allowed, copy disallowed. Moving a new value into a non-null + /// CachedPtr will clear the previous value. + CachedPtr(CachedPtr&&); + CachedPtr& operator=(CachedPtr&&); + CachedPtr(const CachedPtr&) = delete; + CachedPtr& operator=(const CachedPtr&) = delete; + + /// Whether this value is load from cache. If we had to wait for a generation + /// (whether the actual generation was done in this thread or another) then + /// this is false. Has no effect on this behavior, but may be useful for + /// monitoring cache hit rates/etc. + bool fromCache() const { + return fromCache_; + } + + /// Indicates if this value is cached or not. + bool cached() const { + return cache_ != nullptr; + } + + Value* operator->() const { + return value_; + } + Value& operator*() const { + return *value_; + } + Value* get() const { + return value_; + } + + void testingClear() { + clear(); + key_.reset(); + value_ = nullptr; + cache_ = nullptr; + cacheMu_ = nullptr; + } + + private: + // Delete or release owned value. + void clear(); + + bool fromCache_; + std::unique_ptr key_; + Value* value_; + std::mutex* cacheMu_; + // If 'value_' is in cache, 'cache_' and 'key_' will be non-null, and + // 'cacheMu_' may be non-null. If cacheMu_ is non-null, we use it to protect + // our operations to 'cache_'. + SimpleLRUCache* cache_; +}; + +template +struct DefaultSizer { + int64_t operator()(const Value& value) const { + return 1; + } +}; + +/// CachedFactory provides a thread-safe way of backing a keyed generator +/// (e.g. the key is filename, and the value is the file data) by a cache. +/// +/// Generator should take a single Key argument and return a unique_ptr; +/// If it is not thread-safe it must do its own internal locking. +/// Sizer takes a Value and returns how much cache space it will occupy. The +/// DefaultSizer says each value occupies 1 space. +template < + typename Key, + typename Value, + typename Generator, + typename Sizer = DefaultSizer, + typename Comparator = std::equal_to, + typename Hash = std::hash> class CachedFactory { public: - // It is generally expected that most inserts into the cache will succeed, - // i.e. the cache is large compared to the size of the elements and the number - // of elements that are pinned. Everything should still work if this is not - // true, but performance will suffer. - // If 'cache' is nullptr, this means the cache is disabled. 'generator' is - // invoked directly in 'generate' function. + /// It is generally expected that most inserts into the cache will succeed, + /// i.e. the cache is large compared to the size of the elements and the + /// number of elements that are pinned. Everything should still work if this + /// is not true, but performance will suffer. If 'cache' is nullptr, this + /// means the cache is disabled. 'generator' is invoked directly in 'generate' + /// function. CachedFactory( - std::unique_ptr> cache, + std::unique_ptr> cache, std::unique_ptr generator) - : cache_(std::move(cache)), generator_(std::move(generator)) {} - - // Returns the generator's output on the given key. If the output is - // in the cache, returns immediately. Otherwise, blocks until the output - // is ready. - // The function returns a pair. The boolean in the pair indicates whether a - // cache hit or miss. The Value is the generator output for the key if cache - // miss, or Value in the cache if cache hit. - std::pair generate(const Key& key); - - // Advanced function taking in a group of keys. Separates those keys into - // one's present in the cache (returning CachedPtrs for them) and those not - // in the cache. Does NOT call the Generator for any key. + : generator_(std::move(generator)), cache_(std::move(cache)) {} + + CachedFactory(std::unique_ptr generator) + : CachedFactory(nullptr, std::move(generator)) {} + + /// Returns the generator's output on the given key. If the output is in the + /// cache, returns immediately. Otherwise, blocks until the output is ready. + /// For a given key we will only ever be running the Generator function once. + /// E.g., if N threads ask for the same key at once, the generator will be + /// fired once and all N will receive a pointer from the cache. + /// + /// Actually the last sentence is not quite true in the edge case where + /// inserts into the cache fail; in that case we will re-run the generator + /// repeatedly, handing off the results to one thread at a time until the + /// all pending requests are satisfied or a cache insert succeeds. This + /// will probably mess with your memory model, so really try to avoid it. + CachedPtr generate(const Key& key); + + /// Advanced function taking in a group of keys. Separates those keys into + /// one's present in the cache (returning CachedPtrs for them) and those not + /// in the cache. Does NOT call the Generator for any key. void retrieveCached( const std::vector& keys, - std::vector>* cached, - std::vector* missing); + std::vector>>& + cached, + std::vector& missing); - // Total size of elements cached (NOT the maximum size/limit). + /// Total size of elements cached (NOT the maximum size/limit). int64_t currentSize() const { - if (cache_) { - return cache_->currentSize(); - } else { + if (cache_ == nullptr) { return 0; } + return cache_->currentSize(); } - // The maximum size of the underlying cache. + /// The maximum size of the underlying cache. int64_t maxSize() const { - if (cache_) { - return cache_->maxSize(); - } else { + if (cache_ == nullptr) { return 0; } + return cache_->maxSize(); } SimpleLRUCacheStats cacheStats() { - if (cache_) { - std::lock_guard l(cacheMu_); - return cache_->getStats(); - } else { - return {0, 0, 0, 0}; + if (cache_ == nullptr) { + return {}; } + std::lock_guard l(cacheMu_); + return cache_->stats(); } // Clear the cache and return the current cache status SimpleLRUCacheStats clearCache() { - if (cache_) { - std::lock_guard l(cacheMu_); - cache_->clear(); - return cache_->getStats(); - } else { - return {0, 0, 0, 0}; + if (cache_ == nullptr) { + return {}; } + std::lock_guard l(cacheMu_); + cache_->free(cache_->maxSize()); + return cache_->stats(); } - // Move allowed, copy disallowed. + /// Move allowed, copy disallowed. CachedFactory(CachedFactory&&) = default; CachedFactory& operator=(CachedFactory&&) = default; CachedFactory(const CachedFactory&) = delete; CachedFactory& operator=(const CachedFactory&) = delete; private: - std::unique_ptr> cache_; + void removePending(const Key& key) { + std::lock_guard pendingLock(pendingMu_); + pending_.erase(key); + } + + bool addCache(const Key& key, Value* value, int64_t size) { + std::lock_guard cacheLock(cacheMu_); + return cache_->addPinned(key, value, size); + } + + Value* getCache(const Key& key) { + std::lock_guard cacheLock(cacheMu_); + return getCacheLocked(key); + } + + Value* getCacheLocked(const Key& key) { + return cache_->get(key); + } + std::unique_ptr generator_; - folly::F14FastSet pending_; std::mutex cacheMu_; + std::unique_ptr> cache_; + std::mutex pendingMu_; + folly::F14FastSet pending_; std::condition_variable pendingCv_; }; @@ -133,84 +260,198 @@ class CachedFactory { // End of public API. Implementation follows. // -template -std::pair CachedFactory::generate( +template +CachedPtr::CachedPtr() + : fromCache_(false), + key_(nullptr), + value_(nullptr), + cacheMu_(nullptr), + cache_(nullptr) {} + +template +CachedPtr::CachedPtr(Value* value) + : fromCache_(false), + key_(nullptr), + value_(value), + cacheMu_(nullptr), + cache_(nullptr) {} + +template +CachedPtr::CachedPtr( + bool cached, + Value* value, + SimpleLRUCache* cache, + std::unique_ptr key) + : fromCache_(cached), + key_(std::move(key)), + value_(value), + cacheMu_(nullptr), + cache_(cache) {} + +template +CachedPtr::CachedPtr( + bool cached, + Value* value, + SimpleLRUCache* cache, + std::unique_ptr key, + std::mutex* cacheMu) + : fromCache_(cached), + key_(std::move(key)), + value_(value), + cacheMu_(cacheMu), + cache_(cache) {} + +template +CachedPtr::~CachedPtr() { + clear(); +} + +template +CachedPtr::CachedPtr(CachedPtr&& other) { + fromCache_ = other.fromCache_; + value_ = other.value_; + key_ = std::move(other.key_); + cache_ = other.cache_; + cacheMu_ = other.cacheMu_; + other.value_ = nullptr; +} + +template +CachedPtr& +CachedPtr::operator=(CachedPtr&& other) { + clear(); + fromCache_ = other.fromCache_; + value_ = other.value_; + key_ = std::move(other.key_); + cache_ = other.cache_; + cacheMu_ = other.cacheMu_; + other.value_ = nullptr; + return *this; +} + +template +void CachedPtr::clear() { + if (value_ == nullptr) { + return; + } + if (cache_ == nullptr) { + delete value_; + return; + } + if (cacheMu_ != nullptr) { + std::lock_guard l(*cacheMu_); + cache_->release(*key_); + } else { + cache_->release(*key_); + } +} + +template < + typename Key, + typename Value, + typename Generator, + typename Sizer, + typename Comparator, + typename Hash> +CachedPtr +CachedFactory::generate( const Key& key) { process::TraceContext trace("CachedFactory::generate"); - if (!cache_) { - return std::make_pair(false, (*generator_)(key)); + if (cache_ == nullptr) { + return CachedPtr{ + /*fromCache=*/false, + (*generator_)(key).release(), + nullptr, + std::make_unique(key)}; } - std::unique_lock pending_lock(pendingMu_); + std::unique_lock pendingLock(pendingMu_); { - std::lock_guard cache_lock(cacheMu_); - auto value = cache_->get(key); - if (value) { - return std::make_pair(true, value.value()); + if (Value* value = getCache(key)) { + return CachedPtr( + /*fromCache=*/true, + value, + cache_.get(), + std::make_unique(key), + &cacheMu_); } } - if (pending_.contains(key)) { - pendingCv_.wait(pending_lock, [&]() { return !pending_.contains(key); }); + pendingCv_.wait(pendingLock, [&]() { return !pending_.contains(key); }); // Will normally hit the cache now. - { - std::lock_guard cache_lock(cacheMu_); - auto value = cache_->get(key); - if (value) { - return std::make_pair(true, value.value()); - } - } - pending_lock.unlock(); - return generate(key); // Regenerate in the edge case. - } else { - pending_.insert(key); - pending_lock.unlock(); - Value generatedValue; - // TODO: consider using folly/ScopeGuard here. - try { - generatedValue = (*generator_)(key); - } catch (const std::exception&) { - { - std::lock_guard pending_lock_2(pendingMu_); - pending_.erase(key); - } - pendingCv_.notify_all(); - throw; - } - cacheMu_.lock(); - cache_->add(key, generatedValue); - cacheMu_.unlock(); - - // TODO: this code is exception unsafe and can leave pending_ in an - // inconsistent state. Eventually this code should move to - // folly:synchronized and rewritten with better primitives. - { - std::lock_guard pending_lock_2(pendingMu_); - pending_.erase(key); + if (Value* value = getCache(key)) { + return CachedPtr( + /*fromCache=*/false, + value, + cache_.get(), + std::make_unique(key), + &cacheMu_); } + pendingLock.unlock(); + // Regenerates in the edge case. + return generate(key); + } + + pending_.insert(key); + pendingLock.unlock(); + + SCOPE_EXIT { + removePending(key); pendingCv_.notify_all(); - return std::make_pair(false, generatedValue); + }; + + std::unique_ptr generatedValue = (*generator_)(key); + const uint64_t valueSize = Sizer()(*generatedValue); + Value* rawValue = generatedValue.release(); + const bool inserted = addCache(key, rawValue, valueSize); + + CachedPtr result; + if (inserted) { + result = CachedPtr( + /*fromCache=*/false, + rawValue, + cache_.get(), + std::make_unique(key), + &cacheMu_); + } else { + FB_LOG_EVERY_MS(WARNING, 60'000) << "Unable to insert into cache!"; + result = CachedPtr(rawValue); } + return result; } -template -void CachedFactory::retrieveCached( - const std::vector& keys, - std::vector>* cached, - std::vector* missing) { - if (cache_) { - std::lock_guard cache_lock(cacheMu_); - for (const Key& key : keys) { - auto value = cache_->get(key); - if (value) { - cached->emplace_back(key, value.value()); - } else { - missing->push_back(key); - } - } - } else { - for (const Key& key : keys) { - missing->push_back(key); +template < + typename Key, + typename Value, + typename Generator, + typename Sizer, + typename Comparator, + typename Hash> +void CachedFactory:: + retrieveCached( + const std::vector& keys, + std::vector>>& + cached, + std::vector& missing) { + if (cache_ == nullptr) { + missing.insert(missing.end(), keys.begin(), keys.end()); + return; + } + + std::lock_guard l(cacheMu_); + for (const Key& key : keys) { + Value* value = getCacheLocked(key); + if (value != nullptr) { + cached.emplace_back( + key, + CachedPtr( + /*fromCache=*/true, + value, + cache_.get(), + std::make_unique(key), + &cacheMu_)); + } else { + missing.push_back(key); } } } diff --git a/velox/common/caching/SimpleLRUCache.h b/velox/common/caching/SimpleLRUCache.h index cd420995f641..5812a30c99a9 100644 --- a/velox/common/caching/SimpleLRUCache.h +++ b/velox/common/caching/SimpleLRUCache.h @@ -18,147 +18,379 @@ #include #include #include -#include #include -#include "folly/container/EvictingCacheMap.h" +#include "folly/IntrusiveList.h" +#include "folly/container/F14Map.h" +#include "velox/common/base/Exceptions.h" +#include "velox/common/time/Timer.h" namespace facebook::velox { struct SimpleLRUCacheStats { SimpleLRUCacheStats( size_t _maxSize, + size_t _expireDurationMs, size_t _curSize, + size_t _pinnedSize, + size_t _numElements, size_t _numHits, size_t _numLookups) : maxSize{_maxSize}, + expireDurationMs(_expireDurationMs), curSize{_curSize}, + pinnedSize{_pinnedSize}, + numElements{_numElements}, numHits{_numHits}, - numLookups{_numLookups}, - numElements{curSize}, - pinnedSize{curSize} {} + numLookups{_numLookups} {} - // Capacity of the cache. - const size_t maxSize; + SimpleLRUCacheStats() = default; - // Current cache size used. - const size_t curSize; + /// Capacity of the cache. + size_t maxSize{0}; - // Total number of cache hits since server start. - const size_t numHits; + size_t expireDurationMs{0}; - // Total number of cache lookups since server start. - const size_t numLookups; + /// Current cache size used. + size_t curSize{0}; - // TODO: These 2 are unused, but open source Presto depends on them - // Remove the usage in open source presto and get rid of them. - const size_t numElements; - const size_t pinnedSize; + /// Current cache size used by pinned entries. + size_t pinnedSize{0}; + + /// Total number of elements in the cache. + size_t numElements{0}; + + /// Total number of cache hits since server start. + size_t numHits{0}; + + /// Total number of cache lookups since server start. + size_t numLookups{0}; std::string toString() const { return fmt::format( "{{\n" " maxSize: {}\n" + " expireDurationMs: {}\n" " curSize: {}\n" + " pinnedSize: {}\n" + " numElements: {}\n" " numHits: {}\n" " numLookups: {}\n" "}}\n", maxSize, + expireDurationMs, curSize, + pinnedSize, + numElements, numHits, numLookups); } - bool operator==(const SimpleLRUCacheStats& rhs) const { - return std::tie(curSize, maxSize, numHits, numLookups) == - std::tie(rhs.curSize, rhs.maxSize, rhs.numHits, rhs.numLookups); + bool operator==(const SimpleLRUCacheStats& other) const { + return std::tie( + curSize, + expireDurationMs, + maxSize, + pinnedSize, + numElements, + numHits, + numLookups) == + std::tie( + other.curSize, + other.expireDurationMs, + other.maxSize, + other.pinnedSize, + other.numElements, + other.numHits, + other.numLookups); } }; -/// A simple wrapper on top of the folly::EvictingCacheMap that tracks -/// hit/miss counters. Key/Value evicted are immediately destructed. -/// So the Key/Value should be a value type or self managing lifecycle -/// shared_ptr. +/// A simple LRU cache that allows each element to occupy an arbitrary amount of +/// space in the cache. Useful when the size of the cached elements can vary a +/// lot; if they are all roughly the same size something that only tracks the +/// number of elements in the cache like common/datastruct/LRUCacheMap.h may be +/// better. /// /// NOTE: -/// 1. NOT Thread-Safe: All the public calls modify internal structures -/// and hence require external write locks if used from multiple threads. -template +/// 1. NOT Thread-Safe: All the public calls modify internal structures and +/// hence require external write locks if used from multiple threads. +/// 2. 'Key' is required to be copyable and movable. +template < + typename Key, + typename Value, + typename Comparator = std::equal_to, + typename Hash = std::hash> class SimpleLRUCache { public: - /// Constructs a cache of the specified size. The maxSize represents the - /// number of entries in the cache. clearSize represents the number of entries - /// to evict in a given time, when the cache is full. - explicit SimpleLRUCache(size_t maxSize, size_t clearSize = 1); + /// Constructs a cache of the specified size. This size can represent whatever + /// you want -- slots, or bytes, or etc; you provide the size of each element + /// whenever you add a new value to the cache. If 'expireDurationMs' is not + /// zero, then a cache value will be evicted out of cache after + /// 'expireDurationMs' time passed since its insertion into the cache no + /// matter if it been accessed or not. + explicit SimpleLRUCache(size_t maxSize, size_t expireDurationMs = 0); + + /// Frees all owned data. Check-fails if any element remains pinned. + ~SimpleLRUCache(); + + /// Adds a key-value pair that will occupy the provided size, evicting + /// older elements repeatedly until enough room is avialable in the cache. + /// Returns whether insertion succeeded. If it did, the cache takes + /// ownership of |value|. Insertion will fail in two cases: + /// 1) There isn't enough room in the cache even after all unpinned + /// elements are freed. + /// 2) The key you are adding is already present in the cache. In + /// this case the element currently existing in the cache remains + /// totally unchanged. + /// + /// If you use size to represent in-memory size, keep in mind that the + /// total space used per entry is roughly 2 * key_size + value_size + 30 bytes + /// (nonexact because we use a hash map internally, so the ratio of reserved + /// slot to used slots will vary). + bool add(Key key, Value* value, size_t size); - /// Add an item to the cache. Returns true if the item is successfully - /// added, false otherwise. - bool add(const Key& key, const Value& value); + /// Same as add(), but the value starts pinned. Saves a map lookup if you + /// would otherwise do add() then get(). Keep in mind that if insertion + /// fails the key's pin count has NOT been incremented. + bool addPinned(Key key, Value* value, size_t size); - /// Gets value associated with key. - /// returns std::nullopt when the key is missing - /// returns the cached value, when the key is present. - std::optional get(const Key& key); + /// Gets an unowned pointer to the value associated with key. + /// Returns nullptr if the key is not present in the cache. + /// Once you are done using the returned non-null *value, you must call + /// release with the same key you passed to get. + /// + /// The returned pointer is guaranteed to remain valid until release + /// is called. + /// + /// Note that we return a non-const pointer, and multiple callers + /// can lease the same object, so if you're mutating it you need + /// to manage your own locking. + Value* get(const Key& key); - void clear(); + /// Unpins a key. You MUST call release on every key you have + /// get'd once are you done using the value or bad things will + /// happen (namely, memory leaks). + void release(const Key& key); /// Total size of elements in the cache (NOT the maximum size/limit). size_t currentSize() const { - return lru_.size(); + return curSize_; } /// The maximum size of the cache. size_t maxSize() const { - return lru_.getMaxSize(); + return maxSize_; } - SimpleLRUCacheStats getStats() const { + SimpleLRUCacheStats stats() const { return { - lru_.getMaxSize(), - lru_.size(), + maxSize_, + expireDurationMs_, + curSize_, + pinnedSize_, + lruList_.size(), numHits_, numLookups_, }; } + /// Removes unpinned elements until at least size space is freed. Returns + /// the size actually freed, which may be less than requested if the + /// remaining are all pinned. + size_t free(size_t size); + private: + struct Element { + Key key; + Value* value; + size_t size; + uint32_t numPins; + size_t expireTimeMs; + folly::IntrusiveListHook lruEntry; + folly::IntrusiveListHook expireEntry; + }; + using LruList = folly::IntrusiveList; + using ExpireList = folly::IntrusiveList; + + bool addInternal(Key key, Value* value, size_t size, bool pinned); + + // Removes the expired and unpinned cache entries from the cache. The function + // is invoked upon cache lookup, cache insertion and cache entry release. + void removeExpiredEntries(); + + // Removes entry 'e' from cache by unlinking it from 'lruList_' and + // 'expireList_', and destroy the object at the end. + size_t freeEntry(Element* e); + + const size_t maxSize_; + const size_t expireDurationMs_; + size_t curSize_{0}; + size_t pinnedSize_{0}; size_t numHits_{0}; size_t numLookups_{0}; - folly::EvictingCacheMap lru_; + // Elements get newer as we evict from lruList_.begin() to lruList_.end(). + LruList lruList_; + ExpireList expireList_; + folly::F14FastMap keys_; }; -// -// End of public API. Imlementation follows. -// - -template -inline SimpleLRUCache::SimpleLRUCache( +/// +/// End of public API. Implementation follows. +/// +template +inline SimpleLRUCache::SimpleLRUCache( size_t maxSize, - size_t clearSize) - : lru_(maxSize, clearSize) {} - -template -inline bool SimpleLRUCache::add( - const Key& key, - const Value& value) { - return lru_.insert(key, value).second; + size_t expireDurationMs) + : maxSize_(maxSize), expireDurationMs_(expireDurationMs) {} + +template +inline SimpleLRUCache::~SimpleLRUCache() { + VELOX_CHECK_EQ(pinnedSize_, 0); + // We could be more optimal than calling free here, but in + // general this destructor will never get called during normal + // usage so we don't bother. + free(maxSize_); + VELOX_CHECK(lruList_.empty()); + VELOX_CHECK(expireList_.empty()); + VELOX_CHECK(keys_.empty()); + VELOX_CHECK_EQ(curSize_, 0); } -template -inline std::optional SimpleLRUCache::get(const Key& key) { - ++numLookups_; - auto it = lru_.find(key); - if (it == lru_.end()) { - return std::nullopt; +template +inline bool SimpleLRUCache::add( + Key key, + Value* value, + size_t size) { + return addInternal(key, value, size, /*pinned=*/false); +} + +template +inline bool SimpleLRUCache::addPinned( + Key key, + Value* value, + size_t size) { + return addInternal(key, value, size, /*pinned=*/true); +} + +template +inline void +SimpleLRUCache::removeExpiredEntries() { + if (expireDurationMs_ == 0) { + return; + } + const auto currentTimeMs = getCurrentTimeMs(); + auto it = expireList_.begin(); + while (it != expireList_.end()) { + if (it->expireTimeMs > currentTimeMs) { + return; + } + if (it->numPins > 0) { + ++it; + continue; + } + Element* expiredEntry = &*it; + it = expireList_.erase(it); + freeEntry(expiredEntry); } +} + +template +inline bool SimpleLRUCache::addInternal( + Key key, + Value* value, + size_t size, + bool pinned) { + removeExpiredEntries(); + + if (keys_.find(key) != keys_.end()) { + return false; + } + if (pinnedSize_ + size > maxSize_) { + return false; + } + const int64_t spaceNeeded = curSize_ + size - maxSize_; + if (spaceNeeded > 0) { + free(spaceNeeded); + } + + Element* e = new Element; + e->key = std::move(key); + e->value = value; + e->size = size; + e->numPins = !!pinned; + if (pinned) { + pinnedSize_ += size; + } + keys_.emplace(e->key, e); + lruList_.push_back(*e); + if (expireDurationMs_ != 0) { + e->expireTimeMs = getCurrentTimeMs() + expireDurationMs_; + expireList_.push_back(*e); + } + curSize_ += size; + return true; +} +template +inline Value* SimpleLRUCache::get( + const Key& key) { + removeExpiredEntries(); + + ++numLookups_; + auto it = keys_.find(key); + if (it == keys_.end()) { + return nullptr; + } + Element* entry = it->second; + if (entry->numPins++ == 0) { + pinnedSize_ += entry->size; + } + VELOX_DCHECK(entry->lruEntry.is_linked()); + entry->lruEntry.unlink(); + lruList_.push_back(*entry); ++numHits_; - return it->second; + return it->second->value; +} + +template +inline void SimpleLRUCache::release( + const Key& key) { + Element* e = keys_[key]; + if (--e->numPins == 0) { + pinnedSize_ -= e->size; + } + removeExpiredEntries(); +} + +template +inline size_t SimpleLRUCache::free(size_t size) { + auto it = lruList_.begin(); + size_t freed = 0; + while (it != lruList_.end() && freed < size) { + if (it->numPins == 0) { + Element* evictedEntry = &*it; + it = lruList_.erase(it); + freed += freeEntry(evictedEntry); + } else { + ++it; + } + } + return freed; } -template -inline void SimpleLRUCache::clear() { - lru_.clear(); +template +inline size_t SimpleLRUCache::freeEntry( + Element* e) { + VELOX_CHECK_EQ(e->numPins, 0); + // NOTE: the list hook dtor will unlink the entry from list so we don't need + // to explicitly unlink here. + const auto freedSize = e->size; + curSize_ -= freedSize; + keys_.erase(e->key); + delete e->value; + delete e; + return freedSize; } } // namespace facebook::velox diff --git a/velox/common/caching/tests/CMakeLists.txt b/velox/common/caching/tests/CMakeLists.txt index f02c59b5f573..dac47af8efe8 100644 --- a/velox/common/caching/tests/CMakeLists.txt +++ b/velox/common/caching/tests/CMakeLists.txt @@ -14,8 +14,8 @@ add_executable(simple_lru_cache_test SimpleLRUCacheTest.cpp) add_test(simple_lru_cache_test simple_lru_cache_test) -target_link_libraries(simple_lru_cache_test PRIVATE Folly::folly glog::glog - gtest gtest_main) +target_link_libraries(simple_lru_cache_test PRIVATE Folly::folly velox_time + glog::glog gtest gtest_main) add_executable( velox_cache_test AsyncDataCacheTest.cpp CacheTTLControllerTest.cpp @@ -34,5 +34,6 @@ target_link_libraries( add_executable(cached_factory_test CachedFactoryTest.cpp) add_test(cached_factory_test cached_factory_test) -target_link_libraries(cached_factory_test PRIVATE velox_process Folly::folly - glog::glog gtest gtest_main) +target_link_libraries( + cached_factory_test PRIVATE velox_process Folly::folly velox_time glog::glog + gtest gtest_main) diff --git a/velox/common/caching/tests/CachedFactoryTest.cpp b/velox/common/caching/tests/CachedFactoryTest.cpp index 6ae2d62e7de8..1a9f924c6235 100644 --- a/velox/common/caching/tests/CachedFactoryTest.cpp +++ b/velox/common/caching/tests/CachedFactoryTest.cpp @@ -16,140 +16,160 @@ #include "velox/common/caching/CachedFactory.h" +#include "folly/Random.h" #include "folly/executors/EDFThreadPoolExecutor.h" #include "folly/executors/thread_factory/NamedThreadFactory.h" #include "folly/synchronization/Latch.h" #include "gtest/gtest.h" +#include "velox/common/base/tests/GTestUtils.h" using namespace facebook::velox; -namespace { +namespace { struct DoublerGenerator { - int operator()(const int& value) { - ++generated_; - return value * 2; + std::unique_ptr operator()(const int& value) { + ++generated; + return std::make_unique(value * 2); } - std::atomic generated_ = 0; + std::atomic generated = 0; }; -template -T getCachedValue(std::pair& value) { - return value.second; -} - -template -bool isCached(std::pair& value) { - return value.first; -} - -template -std::pair cacheHit(const T& value) { - return std::make_pair(true, value); -} - -template -std::pair cacheMiss(const T& value) { - return std::make_pair(false, value); -} - +struct IdentityGenerator { + std::unique_ptr operator()(const int& value) { + return std::make_unique(value); + } +}; } // namespace TEST(CachedFactoryTest, basicGeneration) { auto generator = std::make_unique(); - auto* generated = &generator->generated_; + auto* generated = &generator->generated; CachedFactory factory( std::make_unique>(1000), std::move(generator)); - EXPECT_EQ(factory.maxSize(), 1000); + ASSERT_EQ(factory.maxSize(), 1000); + ASSERT_EQ(factory.currentSize(), 0); + { auto val1 = factory.generate(1); - EXPECT_EQ(val1, cacheMiss(2)); - EXPECT_EQ(*generated, 1); - + ASSERT_EQ(*val1, 2); + ASSERT_EQ(*generated, 1); + ASSERT_FALSE(val1.fromCache()); auto val2 = factory.generate(1); - EXPECT_EQ(val2, cacheHit(2)); - EXPECT_EQ(*generated, 1); - EXPECT_EQ(factory.currentSize(), 1); + ASSERT_EQ(*val2, 2); + ASSERT_EQ(*generated, 1); + ASSERT_TRUE(val2.fromCache()); + ASSERT_EQ(factory.currentSize(), 1); + ASSERT_EQ(factory.cacheStats().pinnedSize, 1); } + ASSERT_EQ(factory.cacheStats().pinnedSize, 0); + { auto val3 = factory.generate(1); - EXPECT_EQ(val3, cacheHit(2)); - EXPECT_EQ(*generated, 1); - + ASSERT_EQ(*val3, 2); + ASSERT_EQ(*generated, 1); + ASSERT_TRUE(val3.fromCache()); auto val4 = factory.generate(2); - EXPECT_EQ(val4, cacheMiss(4)); - EXPECT_EQ(*generated, 2); - + ASSERT_EQ(*val4, 4); + ASSERT_EQ(*generated, 2); + ASSERT_FALSE(val4.fromCache()); auto val5 = factory.generate(3); - EXPECT_EQ(val5, cacheMiss(6)); - EXPECT_EQ(*generated, 3); - EXPECT_EQ(factory.currentSize(), 3); + ASSERT_EQ(*val5, 6); + ASSERT_EQ(*generated, 3); + ASSERT_FALSE(val5.fromCache()); + ASSERT_EQ(factory.currentSize(), 3); + ASSERT_EQ(factory.cacheStats().pinnedSize, 3); } + ASSERT_EQ(factory.cacheStats().pinnedSize, 0); - auto val6 = factory.generate(1); - EXPECT_EQ(val6, cacheHit(2)); - EXPECT_EQ(*generated, 3); - - auto val7 = factory.generate(4); - EXPECT_EQ(val7, cacheMiss(8)); - EXPECT_EQ(*generated, 4); + { + auto val6 = factory.generate(1); + ASSERT_EQ(*val6, 2); + ASSERT_EQ(*generated, 3); + ASSERT_TRUE(val6.fromCache()); + auto val7 = factory.generate(4); + ASSERT_EQ(*val7, 8); + ASSERT_EQ(*generated, 4); + ASSERT_FALSE(val7.fromCache()); + auto val8 = factory.generate(3); + ASSERT_EQ(*val8, 6); + ASSERT_EQ(*generated, 4); + ASSERT_TRUE(val8.fromCache()); + ASSERT_EQ(factory.currentSize(), 4); + ASSERT_EQ(factory.cacheStats().pinnedSize, 3); + } + ASSERT_EQ(factory.cacheStats().pinnedSize, 0); - auto val8 = factory.generate(3); - EXPECT_EQ(val8, cacheHit(6)); - EXPECT_EQ(*generated, 4); - EXPECT_EQ(factory.currentSize(), 4); + factory.clearCache(); + ASSERT_EQ(factory.currentSize(), 0); + ASSERT_EQ(factory.cacheStats().curSize, 0); + ASSERT_EQ(factory.cacheStats().pinnedSize, 0); } struct DoublerWithExceptionsGenerator { - int operator()(const int& value) { + std::unique_ptr operator()(const int& value) { if (value == 3) { - throw std::invalid_argument("3 is bad"); + VELOX_FAIL("3 is bad"); } - ++generated_; - return value * 2; + ++generated; + return std::make_unique(value * 2); } - int generated_ = 0; + int generated = 0; }; TEST(CachedFactoryTest, clearCache) { auto generator = std::make_unique(); CachedFactory factory( std::make_unique>(1000), std::move(generator)); - EXPECT_EQ(factory.maxSize(), 1000); + ASSERT_EQ(factory.maxSize(), 1000); { auto val1 = factory.generate(1); - EXPECT_EQ(val1, cacheMiss(2)); + ASSERT_FALSE(val1.fromCache()); } factory.clearCache(); - EXPECT_EQ(factory.currentSize(), 0); - EXPECT_EQ(factory.generate(1), cacheMiss(2)); + ASSERT_EQ(factory.currentSize(), 0); + + ASSERT_FALSE(factory.generate(1).fromCache()); + auto cachedValue = factory.generate(1); + ASSERT_TRUE(cachedValue.fromCache()); + ASSERT_FALSE(factory.generate(2).fromCache()); + ASSERT_EQ(factory.cacheStats().pinnedSize, 1); + ASSERT_EQ(factory.cacheStats().curSize, 2); + + factory.clearCache(); + ASSERT_EQ(factory.currentSize(), 1); + ASSERT_EQ(factory.cacheStats().pinnedSize, 1); + + cachedValue.testingClear(); + ASSERT_EQ(factory.currentSize(), 1); + ASSERT_EQ(factory.cacheStats().pinnedSize, 0); + + factory.clearCache(); + ASSERT_EQ(factory.currentSize(), 0); + ASSERT_EQ(factory.cacheStats().pinnedSize, 0); } TEST(CachedFactoryTest, basicExceptionHandling) { auto generator = std::make_unique(); - int* generated = &generator->generated_; + int* generated = &generator->generated; CachedFactory factory( std::make_unique>(1000), std::move(generator)); auto val1 = factory.generate(1); - EXPECT_EQ(getCachedValue(val1), 2); - EXPECT_EQ(*generated, 1); - try { - auto val2 = factory.generate(3); - FAIL() << "Factory generation should have failed"; - } catch (const std::invalid_argument&) { - // Expected. - } + ASSERT_EQ(*val1, 2); + ASSERT_EQ(*generated, 1); + VELOX_ASSERT_THROW(factory.generate(3), "3 is bad"); + val1 = factory.generate(4); - EXPECT_EQ(getCachedValue(val1), 8); - EXPECT_EQ(*generated, 2); + ASSERT_EQ(*val1, 8); + ASSERT_EQ(*generated, 2); val1 = factory.generate(1); - EXPECT_EQ(getCachedValue(val1), 2); - EXPECT_EQ(*generated, 2); + ASSERT_EQ(*val1, 2); + ASSERT_EQ(*generated, 2); } TEST(CachedFactoryTest, multiThreadedGeneration) { auto generator = std::make_unique(); - auto* generated = &generator->generated_; + auto* generated = &generator->generated; CachedFactory factory( std::make_unique>(1000), std::move(generator)); folly::EDFThreadPoolExecutor pool( @@ -157,31 +177,31 @@ TEST(CachedFactoryTest, multiThreadedGeneration) { const int numValues = 5; const int requestsPerValue = 10; folly::Latch latch(numValues * requestsPerValue); - for (int i = 0; i < requestsPerValue; i++) { - for (int j = 0; j < numValues; j++) { + for (int i = 0; i < requestsPerValue; ++i) { + for (int j = 0; j < numValues; ++j) { pool.add([&, j]() { auto value = factory.generate(j); - EXPECT_EQ(getCachedValue(value), 2 * j); + CHECK_EQ(*value, 2 * j); latch.count_down(); }); } } latch.wait(); - EXPECT_EQ(*generated, numValues); + ASSERT_EQ(*generated, numValues); } -// Same as above, but we keep the returned CachedPtrs till the end -// of the function. +// Same as above, but we keep the returned CachedPtrs till the end of the +// function. TEST(CachedFactoryTest, multiThreadedGenerationAgain) { auto generator = std::make_unique(); - auto* generated = &generator->generated_; + auto* generated = &generator->generated; CachedFactory factory( std::make_unique>(1000), std::move(generator)); folly::EDFThreadPoolExecutor pool( 100, std::make_shared("test_pool")); const int numValues = 5; const int requestsPerValue = 10; - std::vector> cachedValues(numValues * requestsPerValue); + std::vector> cachedValues(numValues * requestsPerValue); folly::Latch latch(numValues * requestsPerValue); for (int i = 0; i < requestsPerValue; i++) { for (int j = 0; j < numValues; j++) { @@ -195,69 +215,284 @@ TEST(CachedFactoryTest, multiThreadedGenerationAgain) { ASSERT_EQ(*generated, numValues); for (int i = 0; i < requestsPerValue; i++) { for (int j = 0; j < numValues; j++) { - EXPECT_EQ(getCachedValue(cachedValues[i * numValues + j]), 2 * j); + ASSERT_EQ(*cachedValues[i * numValues + j], 2 * j); } } } +TEST(CachedFactoryTest, lruCacheEviction) { + auto generator = std::make_unique(); + CachedFactory factory( + std::make_unique>(3), std::move(generator)); + ASSERT_EQ(factory.maxSize(), 3); + ASSERT_EQ(factory.currentSize(), 0); + + auto val1 = factory.generate(1); + ASSERT_FALSE(val1.fromCache()); + ASSERT_TRUE(val1.cached()); + auto val2 = factory.generate(2); + ASSERT_FALSE(val2.fromCache()); + ASSERT_TRUE(val2.cached()); + auto val3 = factory.generate(3); + ASSERT_FALSE(val3.fromCache()); + ASSERT_TRUE(val3.cached()); + ASSERT_EQ(factory.currentSize(), 3); + ASSERT_EQ(factory.cacheStats().pinnedSize, 3); + auto val4 = factory.generate(4); + ASSERT_FALSE(val4.fromCache()); + ASSERT_FALSE(val4.cached()); + + { + auto val = factory.generate(4); + ASSERT_FALSE(val.fromCache()); + ASSERT_FALSE(val.cached()); + val = factory.generate(1); + ASSERT_TRUE(val.fromCache()); + ASSERT_TRUE(val.cached()); + val = factory.generate(2); + ASSERT_TRUE(val.fromCache()); + ASSERT_TRUE(val.cached()); + val = factory.generate(3); + ASSERT_TRUE(val.fromCache()); + ASSERT_TRUE(val.cached()); + } + { + auto val = factory.generate(1); + ASSERT_TRUE(val.fromCache()); + ASSERT_TRUE(val.cached()); + } + val1.testingClear(); + val2.testingClear(); + val3.testingClear(); + + val4 = factory.generate(4); + ASSERT_FALSE(val4.fromCache()); + ASSERT_TRUE(val4.cached()); + ASSERT_EQ(factory.cacheStats().curSize, 3); + { + auto val = factory.generate(4); + ASSERT_TRUE(val.fromCache()); + ASSERT_TRUE(val.cached()); + val = factory.generate(1); + ASSERT_TRUE(val.fromCache()); + ASSERT_TRUE(val.cached()); + // Cache entry 2 should be selected for eviction. + val = factory.generate(2); + ASSERT_FALSE(val.fromCache()); + ASSERT_TRUE(val.cached()); + // Cache entry 2 insertion caused cache entry 3 eviction. + val = factory.generate(3); + ASSERT_FALSE(val.fromCache()); + ASSERT_TRUE(val.cached()); + } + ASSERT_EQ(factory.currentSize(), 3); + ASSERT_EQ(factory.cacheStats().pinnedSize, 1); +} + +TEST(CachedFactoryTest, cacheExpiration) { + auto generator = std::make_unique(); + CachedFactory factory( + std::make_unique>(3, 1'000), + std::move(generator)); + ASSERT_EQ(factory.maxSize(), 3); + ASSERT_EQ(factory.currentSize(), 0); + + auto val1 = factory.generate(1); + ASSERT_FALSE(val1.fromCache()); + ASSERT_TRUE(val1.cached()); + auto val2 = factory.generate(2); + ASSERT_FALSE(val2.fromCache()); + ASSERT_TRUE(val2.cached()); + auto val3 = factory.generate(3); + ASSERT_FALSE(val3.fromCache()); + ASSERT_TRUE(val3.cached()); + ASSERT_EQ(factory.currentSize(), 3); + ASSERT_EQ(factory.cacheStats().pinnedSize, 3); + auto val4 = factory.generate(4); + ASSERT_FALSE(val4.fromCache()); + ASSERT_FALSE(val4.cached()); + + std::this_thread::sleep_for(std::chrono::milliseconds{1'500}); + ASSERT_EQ(factory.currentSize(), 3); + ASSERT_EQ(factory.cacheStats().pinnedSize, 3); + + val4 = factory.generate(4); + ASSERT_FALSE(val4.fromCache()); + ASSERT_FALSE(val4.cached()); + ASSERT_EQ(factory.currentSize(), 3); + ASSERT_EQ(factory.cacheStats().pinnedSize, 3); + + val1.testingClear(); + ASSERT_EQ(factory.currentSize(), 2); + ASSERT_EQ(factory.cacheStats().pinnedSize, 2); + + val4 = factory.generate(4); + ASSERT_FALSE(val4.fromCache()); + ASSERT_TRUE(val4.cached()); + ASSERT_EQ(factory.currentSize(), 3); + ASSERT_EQ(factory.cacheStats().pinnedSize, 3); + + val2.testingClear(); + val3.testingClear(); + ASSERT_EQ(factory.currentSize(), 1); + ASSERT_EQ(factory.cacheStats().pinnedSize, 1); + val4.testingClear(); + ASSERT_EQ(factory.currentSize(), 1); + ASSERT_EQ(factory.cacheStats().pinnedSize, 0); + + std::this_thread::sleep_for(std::chrono::milliseconds{1'500}); + + val1 = factory.generate(1); + ASSERT_FALSE(val1.fromCache()); + ASSERT_TRUE(val1.cached()); + ASSERT_EQ(factory.currentSize(), 1); + ASSERT_EQ(factory.cacheStats().pinnedSize, 1); +} + TEST(CachedFactoryTest, retrievedCached) { auto generator = std::make_unique(); - auto* generated = &generator->generated_; + auto* generated = &generator->generated; CachedFactory factory( std::make_unique>(1000), std::move(generator)); - for (int i = 0; i < 10; i += 2) + for (int i = 0; i < 10; i += 2) { factory.generate(i); - EXPECT_EQ(*generated, 5); + } + ASSERT_EQ(*generated, 5); + ASSERT_EQ(factory.cacheStats().pinnedSize, 0); + ASSERT_EQ(factory.cacheStats().curSize, 5); + std::vector keys(10); - for (int i = 0; i < 10; i += 1) + for (int i = 0; i < 10; ++i) { keys[i] = i; - std::vector> cached; + } + std::vector>> cached; std::vector missing; - factory.retrieveCached(keys, &cached, &missing); - ASSERT_EQ(5, cached.size()); + factory.retrieveCached(keys, cached, missing); + ASSERT_EQ(cached.size(), 5); + ASSERT_EQ(factory.cacheStats().pinnedSize, 5); + ASSERT_EQ(factory.cacheStats().curSize, 5); + for (int i = 0; i < 5; ++i) { - EXPECT_EQ(cached[i].first, 2 * i); - EXPECT_EQ(cached[i].second, 4 * i); + ASSERT_EQ(cached[i].first, 2 * i); + ASSERT_EQ(*cached[i].second, 4 * i); + ASSERT_TRUE(cached[i].second.fromCache()); } - ASSERT_EQ(5, missing.size()); + ASSERT_EQ(missing.size(), 5); + for (int i = 0; i < 5; ++i) { - EXPECT_EQ(missing[i], 2 * i + 1); + ASSERT_EQ(missing[i], 2 * i + 1); } - EXPECT_EQ(*generated, 5); + ASSERT_EQ(*generated, 5); } -TEST(CachedFactoryTest, disableCache) { +TEST(CachedFactoryTest, clearCacheWithManyEntries) { auto generator = std::make_unique(); - auto* generated = &generator->generated_; CachedFactory factory( - nullptr, std::move(generator)); + std::make_unique>(1000), std::move(generator)); + for (auto i = 0; i < 1000; ++i) { + factory.generate(i); + } + std::vector keys(500); + for (int i = 0; i < 500; ++i) { + keys[i] = i; + } + { + std::vector>> cached; + std::vector missing; + factory.retrieveCached(keys, cached, missing); + ASSERT_EQ(cached.size(), 500); + auto cacheStats = factory.clearCache(); + ASSERT_EQ(cacheStats.numElements, 500); + ASSERT_EQ(cacheStats.pinnedSize, 500); + } + auto cacheStats = factory.cacheStats(); + ASSERT_EQ(cacheStats.numElements, 500); + ASSERT_EQ(cacheStats.pinnedSize, 0); + + cacheStats = factory.clearCache(); + ASSERT_EQ(cacheStats.numElements, 0); + ASSERT_EQ(cacheStats.pinnedSize, 0); +} + +TEST(CachedFactoryTest, disableCache) { + auto generator = std::make_unique(); + auto* generated = &generator->generated; + CachedFactory factory(std::move(generator)); auto val1 = factory.generate(1); - EXPECT_EQ(val1, cacheMiss(2)); - EXPECT_EQ(*generated, 1); + ASSERT_FALSE(val1.fromCache()); + ASSERT_EQ(*generated, 1); + ASSERT_EQ(factory.currentSize(), 0); + ASSERT_EQ(factory.cacheStats().curSize, 0); + ASSERT_EQ(factory.cacheStats().pinnedSize, 0); auto val2 = factory.generate(1); - EXPECT_EQ(val2, cacheMiss(2)); + ASSERT_FALSE(val2.fromCache()); EXPECT_EQ(*generated, 2); + ASSERT_EQ(factory.currentSize(), 0); + ASSERT_EQ(factory.cacheStats().curSize, 0); + ASSERT_EQ(factory.cacheStats().pinnedSize, 0); + ASSERT_EQ(factory.cacheStats().expireDurationMs, 0); - EXPECT_EQ(factory.currentSize(), 0); - - EXPECT_EQ(factory.maxSize(), 0); + ASSERT_EQ(factory.maxSize(), 0); - EXPECT_EQ(factory.cacheStats(), SimpleLRUCacheStats(0, 0, 0, 0)); + EXPECT_EQ(factory.cacheStats(), SimpleLRUCacheStats{}); - EXPECT_EQ(factory.clearCache(), SimpleLRUCacheStats(0, 0, 0, 0)); + EXPECT_EQ(factory.clearCache(), SimpleLRUCacheStats{}); std::vector keys(10); - for (int i = 0; i < 10; i += 1) { + for (int i = 0; i < 10; ++i) { keys[i] = i; } - std::vector> cached; + + std::vector>> cached; std::vector missing; - factory.retrieveCached(keys, &cached, &missing); - ASSERT_EQ(0, cached.size()); - ASSERT_EQ(10, missing.size()); + factory.retrieveCached(keys, cached, missing); + ASSERT_EQ(cached.size(), 0); + ASSERT_EQ(missing.size(), 10); for (int i = 0; i < 10; ++i) { - EXPECT_EQ(missing[i], i); + ASSERT_EQ(missing[i], i); + } +} + +TEST(CachedFactoryTest, fuzzer) { + const int numThreads = 32; + const int testDurationMs = 5'000; + const size_t expirationDurationMs = 1; + folly::Random::DefaultGenerator rng(23); + for (const bool expireCache : {false, true}) { + SCOPED_TRACE(fmt::format("expireCache: {}", expireCache)); + auto generator = std::make_unique(); + CachedFactory factory( + std::make_unique>( + 128, expireCache ? expirationDurationMs : 0), + std::move(generator)); + + std::vector threads; + threads.reserve(numThreads); + for (int i = 0; i < numThreads; ++i) { + threads.emplace_back([&]() { + const auto startTimeMs = getCurrentTimeMs(); + while (startTimeMs + testDurationMs > getCurrentTimeMs()) { + const auto key = folly::Random::rand32(rng) % 256; + const auto val = factory.generate(key); + if (val.fromCache()) { + ASSERT_TRUE(val.cached()); + ASSERT_EQ(*val, key); + } + if (folly::Random::oneIn(4)) { + std::this_thread::sleep_for(std::chrono::microseconds{100}); + } + } + }); + } + for (auto& thread : threads) { + thread.join(); + } + ASSERT_EQ(factory.cacheStats().pinnedSize, 0); + ASSERT_LE(factory.cacheStats().curSize, 128); + ASSERT_LE(factory.cacheStats().numElements, 128); + ASSERT_GT(factory.cacheStats().numHits, 0); + ASSERT_GT(factory.cacheStats().numLookups, 0); } } diff --git a/velox/common/caching/tests/SimpleLRUCacheTest.cpp b/velox/common/caching/tests/SimpleLRUCacheTest.cpp index c7e5b4a9a100..9193d34d0fc0 100644 --- a/velox/common/caching/tests/SimpleLRUCacheTest.cpp +++ b/velox/common/caching/tests/SimpleLRUCacheTest.cpp @@ -15,75 +15,238 @@ */ #include "velox/common/caching/SimpleLRUCache.h" -#include #include "gtest/gtest.h" using namespace facebook::velox; -namespace { -void verifyCacheStats( - const SimpleLRUCacheStats& actual, - size_t maxSize, - size_t curSize, - size_t numHits, - size_t numLookups) { - SimpleLRUCacheStats expectedStats{maxSize, curSize, numHits, numLookups}; - EXPECT_EQ(actual, expectedStats) << " Actual " << actual.toString() - << " Expected " << expectedStats.toString(); -} -} // namespace - TEST(SimpleLRUCache, basicCaching) { SimpleLRUCache cache(1000); - EXPECT_FALSE(cache.get(1).has_value()); - EXPECT_FALSE(cache.get(2).has_value()); - - verifyCacheStats(cache.getStats(), 1000, 0, 0, 2); + ASSERT_TRUE(cache.add(1, new int(11), 1)); + int* value = cache.get(1); + ASSERT_NE(value, nullptr); + ASSERT_EQ(*value, 11); + cache.release(1); - int firstValue = 11; - ASSERT_TRUE(cache.add(1, firstValue)); - auto value = cache.get(1); - ASSERT_EQ(value, std::make_optional(11)); - - int secondValue = 22; - ASSERT_TRUE(cache.add(2, secondValue)); - - verifyCacheStats(cache.getStats(), 1000, 2, 1, 3); + int* secondValue = new int(22); + ASSERT_TRUE(cache.addPinned(2, secondValue, 1)); + *secondValue += 5; + cache.release(2); value = cache.get(1); - ASSERT_EQ(value, std::make_optional(11)); + ASSERT_NE(value, nullptr); + ASSERT_EQ(*value, 11); + cache.release(1); value = cache.get(2); - ASSERT_EQ(value, std::make_optional(22)); + ASSERT_NE(value, nullptr); + ASSERT_EQ(*value, 27); + cache.release(2); value = cache.get(1); - ASSERT_EQ(value, std::make_optional(11)); + ASSERT_NE(value, nullptr); + ASSERT_EQ(*value, 11); + secondValue = cache.get(1); + ASSERT_EQ(value, secondValue); + cache.release(1); + cache.release(1); + + ASSERT_EQ( + cache.stats().toString(), + "{\n maxSize: 1000\n expireDurationMs: 0\n curSize: 2\n pinnedSize: 0\n numElements: 2\n numHits: 5\n numLookups: 5\n}\n"); +} - value = cache.get(2); - ASSERT_EQ(value, std::make_optional(22)); - verifyCacheStats(cache.getStats(), 1000, 2, 5, 7); +TEST(SimpleLRUCache, lruEviction) { + SimpleLRUCache cache(3); - cache.clear(); - verifyCacheStats(cache.getStats(), 1000, 0, 5, 7); - EXPECT_FALSE(cache.get(1).has_value()); - EXPECT_FALSE(cache.get(2).has_value()); + for (int i = 0; i < 3; ++i) { + ASSERT_TRUE(cache.add(i, new int(i), 1)); + } + ASSERT_EQ(cache.stats().numElements, 3); + ASSERT_EQ(*cache.get(0), 0); + cache.release(0); + + ASSERT_TRUE(cache.add(3, new int(3), 1)); + ASSERT_EQ(*cache.get(0), 0); + cache.release(0); + ASSERT_EQ(cache.get(1), nullptr); + ASSERT_EQ(*cache.get(3), 3); + cache.release(3); + ASSERT_EQ(cache.stats().numElements, 3); } TEST(SimpleLRUCache, eviction) { SimpleLRUCache cache(1000); for (int i = 0; i < 1010; ++i) { - ASSERT_TRUE(cache.add(i, i)); + ASSERT_TRUE(cache.add(i, new int(i), 1)); } for (int i = 0; i < 10; ++i) { - ASSERT_FALSE(cache.get(i).has_value()); + ASSERT_EQ(cache.get(i), nullptr); } - for (int i = 10; i < 1010; ++i) { - auto value = cache.get(i); - ASSERT_EQ(value, std::make_optional(i)); + int* value = cache.get(i); + ASSERT_NE(value, nullptr); + ASSERT_EQ(*value, i); + cache.release(i); + } +} + +TEST(SimpleLRUCache, pinnedEviction) { + SimpleLRUCache cache(100); + + for (int i = 0; i < 10; ++i) { + ASSERT_TRUE(cache.addPinned(i, new int(i), 1)); + } + for (int i = 10; i < 110; ++i) { + ASSERT_TRUE(cache.add(i, new int(i), 1)); + } + + for (int i = 0; i < 10; ++i) { + int* value = cache.get(i); + ASSERT_NE(value, nullptr); + ASSERT_EQ(*value, i); + cache.release(i); + cache.release(i); // Release the original pin too. + } + for (int i = 10; i < 20; ++i) { + ASSERT_EQ(cache.get(i), nullptr); + } + for (int i = 20; i < 110; ++i) { + int* value = cache.get(i); + ASSERT_NE(value, nullptr); + ASSERT_EQ(*value, i); + cache.release(i); + } +} + +TEST(SimpleLRUCache, fullyPinned) { + SimpleLRUCache cache(10); + + for (int i = 0; i < 10; ++i) { + ASSERT_TRUE(cache.addPinned(i, new int(i), 1)); + } + for (int i = 10; i < 20; ++i) { + int* value = new int(i); + ASSERT_FALSE(cache.add(i, value, 1)); + delete value; + } + for (int i = 20; i < 30; ++i) { + int* value = new int(i); + ASSERT_FALSE(cache.addPinned(i, value, 1)); + delete value; + } + + for (int i = 0; i < 10; ++i) { + int* value = cache.get(i); + ASSERT_NE(value, nullptr); + ASSERT_EQ(*value, i); + cache.release(i); + cache.release(i); // Release the original pin too. + } + for (int i = 10; i < 30; ++i) { + ASSERT_EQ(cache.get(i), nullptr); + } +} + +TEST(SimpleLRUCache, size) { + SimpleLRUCache cache(10); + ASSERT_EQ(cache.maxSize(), 10); + + for (int i = 0; i < 5; ++i) { + ASSERT_TRUE(cache.addPinned(i, new int(i), 2)); + ASSERT_EQ(cache.currentSize(), 2 * (i + 1)); + } + int* value = new int(5); + ASSERT_FALSE(cache.addPinned(5, value, 1)); + + for (int i = 0; i < 5; ++i) { + cache.release(i); + } + ASSERT_TRUE(cache.addPinned(5, value, 10)); + ASSERT_EQ(cache.currentSize(), 10); + + for (int i = 0; i < 5; ++i) { + ASSERT_EQ(cache.get(i), nullptr); + } + cache.release(5); +} + +TEST(SimpleLRUCache, insertLargerThanCacheFails) { + SimpleLRUCache cache(10); + + int* value = new int(42); + ASSERT_FALSE(cache.add(123, value, 11)); + delete value; +} + +TEST(SimpleLRUCache, expiredCacheEntries) { + SimpleLRUCache cache(100, 1'000); + + // Expires on insert new entry. + int* value1 = new int(42); + ASSERT_TRUE(cache.add(123, value1, 11)); + ASSERT_EQ(cache.currentSize(), 11); + ASSERT_EQ(cache.get(123), value1); + cache.release(123); + + std::this_thread::sleep_for(std::chrono::seconds{2}); + ASSERT_EQ(cache.currentSize(), 11); + + int* value2 = new int(32); + ASSERT_TRUE(cache.add(122, value2, 22)); + ASSERT_EQ(cache.currentSize(), 22); + ASSERT_EQ(cache.get(123), nullptr); + ASSERT_EQ(cache.get(122), value2); + cache.release(122); + + // Expires when get cache entry. + std::this_thread::sleep_for(std::chrono::seconds{2}); + ASSERT_EQ(cache.currentSize(), 22); + ASSERT_EQ(cache.get(123), nullptr); + ASSERT_EQ(cache.currentSize(), 0); + ASSERT_EQ(cache.get(122), nullptr); + ASSERT_EQ(cache.currentSize(), 0); + + // Expires when get the same cache entry. + value2 = new int(33); + ASSERT_TRUE(cache.add(124, value2, 11)); + ASSERT_EQ(cache.currentSize(), 11); + ASSERT_EQ(cache.get(124), value2); + cache.release(124); + ASSERT_EQ(cache.currentSize(), 11); + std::this_thread::sleep_for(std::chrono::seconds{2}); + ASSERT_EQ(cache.currentSize(), 11); + ASSERT_EQ(cache.get(124), nullptr); + ASSERT_EQ(cache.currentSize(), 0); + + // Adds multiple entries. + int expectedCacheSize{0}; + for (int i = 0; i < 10; ++i) { + int* value = new int(i); + ASSERT_TRUE(cache.add(i, value, i)); + ASSERT_EQ(cache.get(i), value); + cache.release(i); + expectedCacheSize += i; + ASSERT_EQ(cache.currentSize(), expectedCacheSize); } + std::this_thread::sleep_for(std::chrono::seconds{2}); + ASSERT_EQ(cache.currentSize(), expectedCacheSize); + ASSERT_EQ(cache.get(0), nullptr); + ASSERT_EQ(cache.currentSize(), 0); + + // Expire on release. + value2 = new int(64); + ASSERT_TRUE(cache.addPinned(124, value2, 11)); + ASSERT_EQ(cache.currentSize(), 11); + std::this_thread::sleep_for(std::chrono::seconds{2}); + ASSERT_EQ(cache.currentSize(), 11); + ASSERT_EQ(*cache.get(124), 64); + cache.release(124); + ASSERT_EQ(cache.currentSize(), 11); + cache.release(124); + ASSERT_EQ(cache.currentSize(), 0); + ASSERT_EQ(cache.get(124), nullptr); } diff --git a/velox/connectors/hive/FileHandle.cpp b/velox/connectors/hive/FileHandle.cpp index 8deabd1f2d72..40ac778adc4a 100644 --- a/velox/connectors/hive/FileHandle.cpp +++ b/velox/connectors/hive/FileHandle.cpp @@ -24,6 +24,12 @@ namespace facebook::velox { +uint64_t FileHandleSizer::operator()(const FileHandle& fileHandle) { + // TODO: add to support variable file cache size support when the file system + // underneath supports. + return 1; +} + namespace { // The group tracking is at the level of the directory, i.e. Hive partition. std::string groupName(const std::string& filename) { @@ -33,16 +39,16 @@ std::string groupName(const std::string& filename) { } } // namespace -std::shared_ptr FileHandleGenerator::operator()( +std::unique_ptr FileHandleGenerator::operator()( const std::string& filename) { // We have seen cases where drivers are stuck when creating file handles. // Adding a trace here to spot this more easily in future. process::TraceContext trace("FileHandleGenerator::operator()"); uint64_t elapsedTimeUs{0}; - std::shared_ptr fileHandle; + std::unique_ptr fileHandle; { MicrosecondTimer timer(&elapsedTimeUs); - fileHandle = std::make_shared(); + fileHandle = std::make_unique(); fileHandle->file = filesystems::getFileSystem(filename, properties_) ->openFileForRead(filename); fileHandle->uuid = StringIdLease(fileIds(), filename); diff --git a/velox/connectors/hive/FileHandle.h b/velox/connectors/hive/FileHandle.h index 6fb6853d7544..e8a9a954094e 100644 --- a/velox/connectors/hive/FileHandle.h +++ b/velox/connectors/hive/FileHandle.h @@ -54,6 +54,11 @@ struct FileHandle { // first diff we'll not include the map. }; +/// Estimates the memory usage of a FileHandle object. +struct FileHandleSizer { + uint64_t operator()(const FileHandle& a); +}; + using FileHandleCache = SimpleLRUCache; // Creates FileHandles via the Generator interface the CachedFactory requires. @@ -62,7 +67,7 @@ class FileHandleGenerator { FileHandleGenerator() {} FileHandleGenerator(std::shared_ptr properties) : properties_(std::move(properties)) {} - std::shared_ptr operator()(const std::string& filename); + std::unique_ptr operator()(const std::string& filename); private: const std::shared_ptr properties_; @@ -70,8 +75,11 @@ class FileHandleGenerator { using FileHandleFactory = CachedFactory< std::string, - std::shared_ptr, - FileHandleGenerator>; + FileHandle, + FileHandleGenerator, + FileHandleSizer>; + +using FileHandleCachedPtr = CachedPtr; using FileHandleCacheStats = SimpleLRUCacheStats; diff --git a/velox/connectors/hive/HiveConnector.cpp b/velox/connectors/hive/HiveConnector.cpp index c693e8ee0922..00d463b25b91 100644 --- a/velox/connectors/hive/HiveConnector.cpp +++ b/velox/connectors/hive/HiveConnector.cpp @@ -59,8 +59,7 @@ HiveConnector::HiveConnector( hiveConfig_(std::make_shared(config)), fileHandleFactory_( hiveConfig_->isFileHandleCacheEnabled() - ? std::make_unique< - SimpleLRUCache>>( + ? std::make_unique>( hiveConfig_->numCacheFileHandles()) : nullptr, std::make_unique(config)), diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index 9785c7a3b894..9e10a1bc397d 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -227,18 +227,18 @@ void SplitReader::createReader() { VELOX_CHECK_NE( baseReaderOpts_.getFileFormat(), dwio::common::FileFormat::UNKNOWN); - std::shared_ptr fileHandle; + FileHandleCachedPtr fileHandleCachePtr; try { - fileHandle = fileHandleFactory_->generate(hiveSplit_->filePath).second; + fileHandleCachePtr = fileHandleFactory_->generate(hiveSplit_->filePath); + VELOX_CHECK_NOT_NULL(fileHandleCachePtr.get()); } catch (const VeloxRuntimeError& e) { if (e.errorCode() == error_code::kFileNotFound && hiveConfig_->ignoreMissingFiles( connectorQueryCtx_->sessionProperties())) { emptySplit_ = true; return; - } else { - throw; } + throw; } // Here we keep adding new entries to CacheTTLController when new fileHandles @@ -246,10 +246,14 @@ void SplitReader::createReader() { // CacheTTLController needs to make sure a size control strategy was available // such as removing aged out entries. if (auto* cacheTTLController = cache::CacheTTLController::getInstance()) { - cacheTTLController->addOpenFileInfo(fileHandle->uuid.id()); + cacheTTLController->addOpenFileInfo(fileHandleCachePtr->uuid.id()); } auto baseFileInput = createBufferedInput( - *fileHandle, baseReaderOpts_, connectorQueryCtx_, ioStats_, executor_); + *fileHandleCachePtr, + baseReaderOpts_, + connectorQueryCtx_, + ioStats_, + executor_); baseReader_ = dwio::common::getReaderFactory(baseReaderOpts_.getFileFormat()) ->createReader(std::move(baseFileInput), baseReaderOpts_); diff --git a/velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp b/velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp index 842c350a9239..9eb56c8e8016 100644 --- a/velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp +++ b/velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp @@ -92,10 +92,10 @@ PositionalDeleteFileReader::PositionalDeleteFileReader( deleteFileSchema, deleteSplit_); - auto deleteFileHandle = - fileHandleFactory_->generate(deleteFile_.filePath).second; + auto deleteFileHandleCachePtr = + fileHandleFactory_->generate(deleteFile_.filePath); auto deleteFileInput = createBufferedInput( - *deleteFileHandle, + *deleteFileHandleCachePtr, deleteReaderOpts, connectorQueryCtx, ioStats_, diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemRegistrationTest.cpp b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemRegistrationTest.cpp index f2558cbc1d37..36edb9d631e9 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemRegistrationTest.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemRegistrationTest.cpp @@ -67,8 +67,8 @@ TEST_F(S3FileSystemRegistrationTest, fileHandle) { std::make_unique< SimpleLRUCache>>(1000), std::make_unique(hiveConfig)); - auto fileHandle = factory.generate(s3File).second; - readData(fileHandle->file.get()); + auto fileHandleCachePtr = factory.generate(s3File); + readData(fileHandleCachePtr.get()); } TEST_F(S3FileSystemRegistrationTest, finalize) { diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp index 93de7e79e515..f306c36a380e 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp @@ -60,6 +60,40 @@ TEST_F(S3FileSystemTest, writeAndRead) { readData(readFile.get()); } +TEST_F(S3FileSystemTest, viaRegistry) { + const char* bucketName = "data2"; + const char* file = "test.txt"; + const std::string filename = localPath(bucketName) + "/" + file; + const std::string s3File = s3URI(bucketName, file); + addBucket(bucketName); + { + LocalWriteFile writeFile(filename); + writeData(&writeFile); + } + auto hiveConfig = minioServer_->hiveConfig(); + auto s3fs = filesystems::getFileSystem(s3File, hiveConfig); + auto readFile = s3fs->openFileForRead(s3File); + readData(readFile.get()); +} + +TEST_F(S3FileSystemTest, fileHandle) { + const char* bucketName = "data3"; + const char* file = "test.txt"; + const std::string filename = localPath(bucketName) + "/" + file; + const std::string s3File = s3URI(bucketName, file); + addBucket(bucketName); + { + LocalWriteFile writeFile(filename); + writeData(&writeFile); + } + auto hiveConfig = minioServer_->hiveConfig(); + FileHandleFactory factory( + std::make_unique>(1000), + std::make_unique(hiveConfig)); + auto fileHandle = factory.generate(s3File); + readData(fileHandle->file.get()); +} + TEST_F(S3FileSystemTest, invalidCredentialsConfig) { { const std::unordered_map config( diff --git a/velox/connectors/hive/tests/FileHandleTest.cpp b/velox/connectors/hive/tests/FileHandleTest.cpp index d00c01f8b5de..df045e4fd439 100644 --- a/velox/connectors/hive/tests/FileHandleTest.cpp +++ b/velox/connectors/hive/tests/FileHandleTest.cpp @@ -37,10 +37,9 @@ TEST(FileHandleTest, localFile) { } FileHandleFactory factory( - std::make_unique< - SimpleLRUCache>>(1000), + std::make_unique>(1000), std::make_unique()); - auto fileHandle = factory.generate(filename).second; + auto fileHandle = factory.generate(filename); ASSERT_EQ(fileHandle->file->size(), 3); char buffer[3]; ASSERT_EQ(fileHandle->file->pread(0, 3, &buffer), "foo");