diff --git a/velox/common/caching/SsdFile.cpp b/velox/common/caching/SsdFile.cpp index 6c104e8ae1d2d..3e613b3c6a358 100644 --- a/velox/common/caching/SsdFile.cpp +++ b/velox/common/caching/SsdFile.cpp @@ -376,7 +376,7 @@ void SsdFile::write(std::vector& pins) { int32_t storeIndex = 0; while (storeIndex < pins.size()) { - auto space = getSpace(pins, storeIndex); + const auto space = getSpace(pins, storeIndex); if (!space.has_value()) { // No space can be reclaimed. The pins are freed when the caller is freed. return; @@ -384,32 +384,42 @@ void SsdFile::write(std::vector& pins) { auto [offset, available] = space.value(); int32_t numWritten = 0; - int32_t bytes = 0; + int32_t bytesToWrite = 0; std::vector iovecs; + uint64_t writeOffset = offset; for (auto i = storeIndex; i < pins.size(); ++i) { auto* entry = pins[i].checkedEntry(); const auto entrySize = entry->size(); - if (bytes + entrySize > available) { + const auto hasSpaceForMore = (bytesToWrite + entrySize <= available); + if (hasSpaceForMore) { + addEntryToIovecs(*entry, iovecs); + bytesToWrite += entrySize; + ++numWritten; + } + if (iovecs.size() == IOV_MAX || (i == pins.size() - 1) || + !hasSpaceForMore) { + const auto rc = + folly::pwritev(fd_, iovecs.data(), iovecs.size(), writeOffset); + if (rc != bytesToWrite) { + VELOX_SSD_CACHE_LOG(ERROR) + << "Failed to write to SSD, file name: " << fileName_ + << ", fd: " << fd_ << ", size: " << iovecs.size() + << ", offset: " << writeOffset << ", error code: " << errno + << ", error string: " << folly::errnoStr(errno); + ++stats_.writeSsdErrors; + // If write fails, we return without adding the pins to the cache. The + // entries are unchanged. + return; + } + iovecs.clear(); + writeOffset += bytesToWrite; + bytesToWrite = 0; + } + if (!hasSpaceForMore) { break; } - addEntryToIovecs(*entry, iovecs); - bytes += entrySize; - ++numWritten; - } - VELOX_CHECK_GE(fileSize_, offset + bytes); - - const auto rc = folly::pwritev(fd_, iovecs.data(), iovecs.size(), offset); - if (rc != bytes) { - VELOX_SSD_CACHE_LOG(ERROR) - << "Failed to write to SSD, file name: " << fileName_ - << ", fd: " << fd_ << ", size: " << iovecs.size() - << ", offset: " << offset << ", error code: " << errno - << ", error string: " << folly::errnoStr(errno); - ++stats_.writeSsdErrors; - // If write fails, we return without adding the pins to the cache. The - // entries are unchanged. - return; } + VELOX_CHECK_GE(fileSize_, writeOffset); { std::lock_guard l(mutex_); diff --git a/velox/common/caching/tests/SsdFileTest.cpp b/velox/common/caching/tests/SsdFileTest.cpp index 9cb89f2fabb92..79ba159ec39fe 100644 --- a/velox/common/caching/tests/SsdFileTest.cpp +++ b/velox/common/caching/tests/SsdFileTest.cpp @@ -118,12 +118,12 @@ class SsdFileTest : public testing::Test { } } - // Gets consecutive entries from file 'fileId' starting at 'startOffset' with + // Gets consecutive entries from file 'fileId' starting at 'startOffset' with // sizes between 'minSize' and 'maxSize'. Sizes start at 'minSize' and double // each time and go back to 'minSize' after exceeding 'maxSize'. This stops // after the total size has exceeded 'totalSize'. The entries are returned as // pins. The pins are exclusive for newly created entries and shared for - // existing ones. New entries are deterministically initialized from 'fileId' + // existing ones. New entries are deterministically initialized from 'fileId' // and the entry's offset. std::vector makePins( uint64_t fileId, @@ -302,6 +302,35 @@ TEST_F(SsdFileTest, writeAndRead) { } } } + + // Test cache writes with different iobufs sizes. + std::vector pins; + ssdFile_->write(pins); + readAndCheckPins(pins); + + pins = makePins(fileName_.id(), 0, 4096, 4096, 4096); + EXPECT_EQ(pins.size(), 1); + ssdFile_->write(pins); + readAndCheckPins(pins); + pins.clear(); + + pins = makePins(fileName_.id(), 0, 4096, 4096, 4096 * (IOV_MAX - 1)); + EXPECT_EQ(pins.size(), IOV_MAX - 1); + ssdFile_->write(pins); + readAndCheckPins(pins); + pins.clear(); + + pins = makePins(fileName_.id(), 0, 4096, 4096, 4096 * IOV_MAX); + EXPECT_EQ(pins.size(), IOV_MAX); + ssdFile_->write(pins); + readAndCheckPins(pins); + pins.clear(); + + pins = makePins(fileName_.id(), 0, 4096, 4096, 4096 * (IOV_MAX + 1)); + EXPECT_EQ(pins.size(), IOV_MAX + 1); + ssdFile_->write(pins); + readAndCheckPins(pins); + pins.clear(); } #ifdef VELOX_SSD_FILE_TEST_SET_NO_COW_FLAG