diff --git a/curvefs/proto/cache.proto b/curvefs/proto/cache.proto new file mode 100644 index 0000000000..119fc1c85a --- /dev/null +++ b/curvefs/proto/cache.proto @@ -0,0 +1,59 @@ +syntax = "proto2"; + +enum CacheErrorCode { + CACHE_OK = 0; + CACHE_FULL = 1; + CACHE_NOT_FOUND = 2; + CACHE_INTERNAL_ERROR = 3; + // ... +}; + +message GetObjectRequest { + message Range { + required uint32 offset = 1; + required uint32 len = 2; + } + + required string key = 1; // 对象名 + optional Range range = 2; // 读取对象的范围,不设置时获取整个对象 + optional bool read_through = 3 [default = true]; // 在缓存未命中时,控制是否从对象存储获取 + optional bool cache_on_worker = 4 [defualt = true]; // 控制在从对象存储获取对象后,是否缓存到worker中 +} + +message GetObjectResponse { + required CacheErrorCode err_code = 1; + + // 如果成功,数据在controller中的attachment中 +} + +message PutObjectRequset { + required string key = 1; // 对象名 + optional bool pin = 2 [default = false]; // 是否pin对象 + optional int64 cache_ttl = 3 [default = -1]; // 缓存的ttl,单位为秒,-1表示不设置ttl + optional uint64 upload_deadline = 4 [default = -1]; // 用于写缓存,标记上传的deadline,单位为秒,0表示收到请求后立即上传 + + // 数据在controller的attachment中 +} + +message PutObjectResponse { + required CacheErrorCode err_code = 1; +} + +message RemoveObjectRequest { + required string key = 1; +} + +message RemoveObjectResponse { + required CacheErrorCode err_code = 1; +} + +message PinObjectRequest { + required string key = 1; + required bool pin = 2; // pin or unpin +} + +message PinObjectResponse { + required CacheErrorCode err_code = 1; +} + +// batch version \ No newline at end of file diff --git a/curvefs/proto/metaserver.proto b/curvefs/proto/metaserver.proto index f0ab8167a2..5b1e517c30 100644 --- a/curvefs/proto/metaserver.proto +++ b/curvefs/proto/metaserver.proto @@ -239,7 +239,13 @@ message S3ChunkInfo { required uint64 offset = 3; // file offset required uint64 len = 4; // file logic length required uint64 size = 5; // file size in object storage - required bool zero = 6; // + required bool zero = 6; + + message RemoteCacheWorker { + required string id; // identifier of cache worker + } + + repeated RemoteCacheWorker cacheWorkers = 7; }; message S3ChunkInfoList { diff --git a/curvefs/src/cachecluster/buffer.cpp b/curvefs/src/cachecluster/buffer.cpp new file mode 100644 index 0000000000..40821935ce --- /dev/null +++ b/curvefs/src/cachecluster/buffer.cpp @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2021 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "cachelib/datatype/Buffer.h" + +namespace curvefs { +namespace cachelib { +namespace detail { +constexpr uint32_t Buffer::kInvalidOffset; + +bool Buffer::canAllocate(uint32_t size) const { + return Slot::getAllocSize(size) <= (remainingBytes() + wastedBytes()); +} + +bool Buffer::canAllocateWithoutCompaction(uint32_t size) const { + return Slot::getAllocSize(size) <= remainingBytes(); +} + +Buffer::Buffer(uint32_t capacity, const Buffer& other) + : capacity_(capacity), + deletedBytes_(other.deletedBytes_), + nextByte_(other.nextByte_) { + std::memcpy(&data_, &other.data_, other.nextByte_); +} + +uint32_t Buffer::allocate(uint32_t size) { + const uint32_t allocSize = static_cast(sizeof(Slot)) + size; + if (nextByte_ + allocSize > capacity_) { + return kInvalidOffset; + } + + auto* slot = new (&data_[nextByte_]) Slot(size); + XDCHECK_EQ(allocSize, slot->getAllocSize()); + + nextByte_ += slot->getAllocSize(); + + const uint32_t offset = static_cast( + reinterpret_cast(slot->getData()) - data_); + return offset; +} + +void Buffer::remove(uint32_t offset) { + auto* slot = getSlot(offset); + if (slot && !slot->isRemoved()) { + slot->markRemoved(); + deletedBytes_ += slot->getAllocSize(); + } +} + +void* Buffer::getData(uint32_t offset) { return data_ + offset; } + +const void* Buffer::getData(uint32_t offset) const { return data_ + offset; } + +void Buffer::compact(Buffer& dest) const { + XDCHECK_EQ(dest.capacity(), dest.remainingBytes()); + if (dest.capacity() < capacity() - wastedBytes() - remainingBytes()) { + throw std::invalid_argument(folly::sformat( + "destination buffer is too small. Dest Capacity: {}, " + "Current Capacity: {}, Current Wasted Space: {}, Current Remaining " + "Capacity: {}", + dest.capacity(), capacity(), wastedBytes(), remainingBytes())); + } + + for (uint32_t srcOffset = 0; srcOffset < nextByte_;) { + auto* slot = reinterpret_cast(&data_[srcOffset]); + if (!slot->isRemoved()) { + const uint32_t offset = dest.allocate(slot->getSize()); + XDCHECK_NE(kInvalidOffset, offset); + + std::memcpy(dest.getData(offset), slot->getData(), slot->getSize()); + } + + srcOffset += slot->getAllocSize(); + } +} + +Buffer::Slot* Buffer::getSlot(uint32_t dataOffset) { + return const_cast(getSlotImpl(dataOffset)); +} + +const Buffer::Slot* Buffer::getSlot(uint32_t dataOffset) const { + return getSlotImpl(dataOffset); +} + +const Buffer::Slot* Buffer::getSlotImpl(uint32_t dataOffset) const { + if (dataOffset >= nextByte_ || dataOffset < sizeof(Slot)) { + // Need this to compile due to alignment requirement for uint32_t& in + // folly::sformat() + const auto tmp = nextByte_; + throw std::invalid_argument(folly::sformat( + "invliad dataOffset. dataOffset: {}, nextByte: {}, sizeof(Slot): {}", + dataOffset, tmp, sizeof(Slot))); + } + const uint32_t slotOffset = dataOffset - static_cast(sizeof(Slot)); + const auto* slot = reinterpret_cast(&data_[slotOffset]); + return slot; +} + +Buffer::Slot* Buffer::getFirstSlot() { + if (nextByte_ < Slot::getAllocSize(0)) { + return nullptr; + } + return getSlot(Slot::getAllocSize(0)); +} + +Buffer::Slot* Buffer::getNextSlot(const Slot& curSlot) { + const uint32_t curDataOffset = getDataOffset(curSlot); + const uint32_t nextDataOffset = curDataOffset + curSlot.getAllocSize(); + if (nextDataOffset >= nextByte_) { + return nullptr; + } + + return getSlot(nextDataOffset); +} + +uint32_t Buffer::getDataOffset(const Slot& slot) const { + return static_cast( + reinterpret_cast(slot.getData()) - + reinterpret_cast(data_)); +} + +constexpr uint32_t BufferAddr::kInvalidOffset; +constexpr uint32_t BufferAddr::kByteOffsetBits; +constexpr uint32_t BufferAddr::kByteOffsetMask; +constexpr uint32_t BufferAddr::kMaxNumChainedItems; +} // namespace detail +} // namespace cachelib +} // namespace curvefs \ No newline at end of file diff --git a/curvefs/src/cachecluster/buffer.h b/curvefs/src/cachecluster/buffer.h new file mode 100644 index 0000000000..d7deb36fda --- /dev/null +++ b/curvefs/src/cachecluster/buffer.h @@ -0,0 +1,251 @@ +/* + * Copyright (c) 2021 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +namespace curvefs { +namespace cachelib { +namespace detail { + +template +constexpr uint32_t BufferManager::kMaxBufferCapacity; + +template +constexpr uint32_t BufferManager::kExpansionFactor; + +template +BufferAddr BufferManager::allocate(uint32_t size) { + XDCHECK((*parent_)->hasChainedItem()); + + const uint32_t requiredSize = Buffer::getAllocSize(size); + if (requiredSize >= kMaxBufferCapacity) { + return nullptr; + } + + // First we try iterating through all the items in the chain to look for + // available storage. + for (uint32_t i = 0; i < buffers_.size(); i++) { + const auto addr = + allocateFrom(buffers_[i]->template getMemoryAs(), size, i); + if (addr) { + return addr; + } + } + return nullptr; +} + +template +bool BufferManager::expand(uint32_t size) { + XDCHECK((*parent_)->hasChainedItem()); + + const uint32_t requiredSize = Buffer::getAllocSize(size); + if (requiredSize > kMaxBufferCapacity) { + return false; + } + + Item* itemToExpand = *buffers_.rbegin(); + uint32_t numChainedItems = buffers_.size(); + + // For the first item, double the capacity or expand to fit the required size + auto* expandedBuffer = expandBuffer(*itemToExpand, requiredSize); + if (expandedBuffer) { + return true; + } + + // Ensure we've not reached upper bound of chained items yet + if (numChainedItems >= BufferAddr::kMaxNumChainedItems) { + return false; + } + + // Since we already maxed out at least one chained item, we'll + // start at max capacity right away + Buffer* newBuffer = addNewBuffer(kMaxBufferCapacity); + + return newBuffer ? true : false; +} + +template +void BufferManager::remove(BufferAddr addr) { + if (!addr) { + throw std::invalid_argument("cannot remove null address"); + } + + const uint32_t itemOffset = addr.getItemOffset(); + const uint32_t byteOffset = addr.getByteOffset(); + getBuffer(itemOffset)->remove(byteOffset); +} + +template +template +T* BufferManager::get(BufferAddr addr) const { + if (!addr) { + throw std::invalid_argument("cannot get null address"); + } + return reinterpret_cast(getImpl(addr)); +} + +template +BufferManager BufferManager::clone(WriteHandle& newParent) const { + for (auto item : buffers_) { + auto handle = cache_->allocateChainedItem(newParent, item->getSize()); + if (!handle) { + return BufferManager(nullptr); + } + std::memcpy( + handle->getMemory(), item->getMemory(), cache_->getUsableSize(*item)); + cache_->addChainedItem(newParent, std::move(handle)); + } + BufferManager newManager(*cache_, newParent); + return newManager; +} + +template +void* BufferManager::getImpl(BufferAddr addr) const { + const uint32_t itemOffset = addr.getItemOffset(); + const uint32_t byteOffset = addr.getByteOffset(); + return getBuffer(itemOffset)->getData(byteOffset); +} + +template +Buffer* BufferManager::getBuffer(uint32_t index) const { + return buffers_.at(index)->template getMemoryAs(); +} + +template +void BufferManager::materializeChainedAllocs() { + // Copy in reverse order since then the index into the vector will line up + // with our chained item indices given out in BufferAddr + auto allocs = cache_->viewAsWritableChainedAllocs(*parent_); + buffers_.clear(); + for (auto& item : allocs.getChain()) { + buffers_.push_back(&item); + } + std::reverse(buffers_.begin(), buffers_.end()); +} + +template +BufferAddr BufferManager::allocateFrom(Buffer* buffer, + uint32_t size, + uint32_t nthChainedItem) { + if (buffer->canAllocateWithoutCompaction(size)) { + return BufferAddr{nthChainedItem, buffer->allocate(size)}; + } + return nullptr; +} + +template +Buffer* BufferManager::addNewBuffer(uint32_t capacity) { + auto chainedItem = cache_->allocateChainedItem( + *parent_, Buffer::computeStorageSize(capacity)); + if (!chainedItem) { + return nullptr; + } + + Buffer* buffer = new (chainedItem->getMemory()) Buffer(capacity); + cache_->addChainedItem(*parent_, std::move(chainedItem)); + + materializeChainedAllocs(); + + return buffer; +} + +template +Buffer* BufferManager::expandBuffer(Item& itemToExpand, + uint32_t minAdditionalSize) { + Buffer* oldBuffer = itemToExpand.template getMemoryAs(); + + const uint32_t currentCapacity = oldBuffer->capacity(); + if ((currentCapacity + minAdditionalSize) >= kMaxBufferCapacity) { + return nullptr; + } + + // We try to grow by kExpansionFactor unless minAdditionalSize is greater + // than currentCapacity; + const uint32_t normalDesiredCapacity = + std::min(kMaxBufferCapacity, currentCapacity * kExpansionFactor); + const uint32_t minDesiredCapacity = currentCapacity + minAdditionalSize; + const uint32_t desiredCapacity = + std::max(minDesiredCapacity, normalDesiredCapacity); + + auto chainedItem = cache_->allocateChainedItem( + *parent_, Buffer::computeStorageSize(desiredCapacity)); + if (!chainedItem) { + return nullptr; + } + + Buffer* buffer = + new (chainedItem->getMemory()) Buffer(desiredCapacity, *oldBuffer); + cache_->replaceChainedItem(itemToExpand, std::move(chainedItem), **parent_); + + materializeChainedAllocs(); + + return buffer; +} + +template +void BufferManager::compact() { + // O(M + N) where M is the number of allocations in the map to compact + // and N is the number of chained items to compact. + auto allocs = cache_->viewAsWritableChainedAllocs(*parent_); + for (auto& item : allocs.getChain()) { + Buffer* buffer = item.template getMemoryAs(); + + if (buffer->wastedBytes() == 0) { + continue; + } + + auto tmpBufferStorage = std::make_unique(item.getSize()); + Buffer* tmpBuffer = new (tmpBufferStorage.get()) Buffer(buffer->capacity()); + XDCHECK_EQ(buffer->capacity(), tmpBuffer->capacity()); + + buffer->compact(*tmpBuffer); + + new (buffer) Buffer(tmpBuffer->capacity(), *tmpBuffer); + } +} + +template +size_t BufferManager::remainingBytes() const { + size_t remainingBytes = 0; + for (const auto& b : buffers_) { + remainingBytes += b->template getMemoryAs()->remainingBytes(); + } + return remainingBytes; +} + +template +size_t BufferManager::wastedBytes() const { + size_t wastedBytes = 0; + for (const auto& b : buffers_) { + wastedBytes += b->template getMemoryAs()->wastedBytes(); + } + return wastedBytes; +} + +template +size_t BufferManager::wastedBytesPct() const { + size_t wastedBytes = 0; + size_t capacity = 0; + for (const auto& b : buffers_) { + Buffer* buffer = b->template getMemoryAs(); + wastedBytes += buffer->wastedBytes(); + capacity += buffer->capacity(); + } + return wastedBytes * 100 / capacity; +} +} // namespace detail +} // namespace cachelib +} // namespace curvefs diff --git a/curvefs/src/cachecluster/main.cpp b/curvefs/src/cachecluster/main.cpp new file mode 100644 index 0000000000..87c36eb2ef --- /dev/null +++ b/curvefs/src/cachecluster/main.cpp @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2021 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "cachelib/allocator/CacheAllocator.h" +#include "cachelib/allocator/MemoryTierCacheConfig.h" +#include "folly/init/Init.h" + +namespace curvefs { +namespace cachelib_examples { +using Cache = cachelib::LruAllocator; // or Lru2QAllocator, or TinyLFUAllocator +using CacheConfig = typename Cache::Config; +using CacheKey = typename Cache::Key; +using CacheReadHandle = typename Cache::ReadHandle; +using MemoryTierCacheConfig = typename cachelib::MemoryTierCacheConfig; +using NumaBitMask = typename cachelib::NumaBitMask; + +// Global cache object and a default cache pool +std::unique_ptr gCache_; +cachelib::PoolId defaultPool_; + +void initializeCache() { + CacheConfig config; + config + .setCacheSize(48 * 1024 * 1024) // 48 MB + .setCacheName("SingleTier Cache") + .enableCachePersistence("/tmp/simple-tier-cache") + .setAccessConfig( + {25 /* bucket power */, 10 /* lock power */}) // assuming caching 20 + // million items + .configureMemoryTiers( + {MemoryTierCacheConfig::fromShm().setRatio(1).setMemBind( + NumaBitMask().setBit(0))}) // allocate only from NUMA node 0 + .validate(); // will throw if bad config + + gCache_ = std::make_unique(Cache::SharedMemNew, config); + defaultPool_ = + gCache_->addPool("default", gCache_->getCacheMemoryStats().ramCacheSize); +} + +void destroyCache() { gCache_.reset(); } + +CacheReadHandle get(CacheKey key) { return gCache_->find(key); } + +bool put(CacheKey key, const std::string& value) { + auto handle = gCache_->allocate(defaultPool_, key, value.size()); + if (!handle) { + return false; // cache may fail to evict due to too many pending writes + } + std::memcpy(handle->getMemory(), value.data(), value.size()); + gCache_->insertOrReplace(handle); + return true; +} + +} // namespace cachelib_examples +} // namespace curvefs + +using namespace curvefs::cachelib_examples; + +int main(int argc, char** argv) { + folly::init(&argc, &argv); + + initializeCache(); + + std::string value(4 * 1024, 'X'); // 4 KB value + + const size_t NUM_ITEMS = 13000; + + // Use cache + { + for (size_t i = 0; i < NUM_ITEMS; ++i) { + std::string key = "key" + std::to_string(i); + + auto res = put(key, value); + + std::ignore = res; + assert(res); + } + + size_t nFound = 0; + size_t nNotFound = 0; + for (size_t i = 0; i < NUM_ITEMS; ++i) { + std::string key = "key" + std::to_string(i); + auto item = get(key); + if (item) { + ++nFound; + folly::StringPiece sp{reinterpret_cast(item->getMemory()), + item->getSize()}; + std::ignore = sp; + assert(sp == value); + } else { + ++nNotFound; + } + } + std::cout << "Found:\t\t" << nFound << " items\n" + << "Not found:\t" << nNotFound << " items" << std::endl; + } + + destroyCache(); +} \ No newline at end of file