Skip to content

Commit

Permalink
Prevent cache write from exceeding IOV_MAX (#9438)
Browse files Browse the repository at this point in the history
Summary: Pull Request resolved: #9438

Differential Revision: D55941557
  • Loading branch information
zacw7 authored and facebook-github-bot committed Apr 12, 2024
1 parent 2b52132 commit a155b0f
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 30 deletions.
87 changes: 60 additions & 27 deletions velox/common/caching/SsdFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ void addEntryToIovecs(AsyncDataCacheEntry& entry, std::vector<iovec>& iovecs) {
};
}
}

// Returns the number of entries in a cache 'entry'.
int32_t numIoVectorsFromEntry(AsyncDataCacheEntry& entry) {
if (entry.tinyData() != nullptr) {
return 1;
}
return entry.data().numRuns();
}
} // namespace

SsdPin::SsdPin(SsdFile& file, SsdRun run) : file_(&file), run_(run) {
Expand Down Expand Up @@ -374,46 +382,52 @@ void SsdFile::write(std::vector<CachePin>& pins) {
VELOX_CHECK_NULL(entry->ssdFile());
}

int32_t storeIndex = 0;
while (storeIndex < pins.size()) {
auto space = getSpace(pins, storeIndex);
int32_t writeIndex = 0;
while (writeIndex < pins.size()) {
const auto space = getSpace(pins, writeIndex);
if (!space.has_value()) {
// No space can be reclaimed. The pins are freed when the caller is freed.
return;
}

auto [offset, available] = space.value();
int32_t numWritten = 0;
int32_t bytes = 0;
std::vector<iovec> iovecs;
for (auto i = storeIndex; i < pins.size(); ++i) {
int32_t numWrittenEntries = 0;
uint64_t writeOffset = offset;
int32_t writeLength = 0;
std::vector<iovec> writeIovecs;
for (auto i = writeIndex; i < pins.size(); ++i) {
auto* entry = pins[i].checkedEntry();
const auto entrySize = entry->size();
if (bytes + entrySize > available) {
const auto numIovecs = numIoVectorsFromEntry(*entry);
VELOX_CHECK_LE(numIovecs, IOV_MAX);
if (writeIovecs.size() + numIovecs > IOV_MAX) {
// Writes out the accumulated iovecs if it exceeds IOV_MAX limit.
if (!write(writeOffset, writeLength, writeIovecs)) {
return;
}
writeIovecs.clear();
writeOffset += writeLength;
writeLength = 0;
}
if (writeLength + entrySize > available) {
break;
}
addEntryToIovecs(*entry, iovecs);
bytes += entrySize;
++numWritten;
}
VELOX_CHECK_GE(fileSize_, offset + bytes);

const auto rc = folly::pwritev(fd_, iovecs.data(), iovecs.size(), offset);
if (rc != bytes) {
VELOX_SSD_CACHE_LOG(ERROR)
<< "Failed to write to SSD, file name: " << fileName_
<< ", fd: " << fd_ << ", size: " << iovecs.size()
<< ", offset: " << offset << ", error code: " << errno
<< ", error string: " << folly::errnoStr(errno);
++stats_.writeSsdErrors;
// If write fails, we return without adding the pins to the cache. The
// entries are unchanged.
return;
addEntryToIovecs(*entry, writeIovecs);
writeLength += entrySize;
++numWrittenEntries;
}
if (writeLength > 0) {
VELOX_CHECK(!writeIovecs.empty());
if (!write(writeOffset, writeLength, writeIovecs)) {
return;
}
writeOffset += writeLength;
}
VELOX_CHECK_GE(fileSize_, writeOffset);

{
std::lock_guard<std::shared_mutex> l(mutex_);
for (auto i = storeIndex; i < storeIndex + numWritten; ++i) {
for (auto i = writeIndex; i < writeIndex + numWrittenEntries; ++i) {
auto* entry = pins[i].checkedEntry();
entry->setSsdFile(this, offset);
const auto size = entry->size();
Expand All @@ -429,7 +443,7 @@ void SsdFile::write(std::vector<CachePin>& pins) {
bytesAfterCheckpoint_ += size;
}
}
storeIndex += numWritten;
writeIndex += numWrittenEntries;
}

if ((checkpointIntervalBytes_ > 0) &&
Expand All @@ -438,6 +452,25 @@ void SsdFile::write(std::vector<CachePin>& pins) {
}
}

bool SsdFile::write(
uint64_t offset,
uint64_t length,
const std::vector<iovec>& iovecs) {
const auto ret = folly::pwritev(fd_, iovecs.data(), iovecs.size(), offset);
if (ret != length) {
VELOX_SSD_CACHE_LOG(ERROR)
<< "Failed to write to SSD, file name: " << fileName_ << ", fd: " << fd_
<< ", size: " << iovecs.size() << ", offset: " << offset
<< ", error code: " << errno
<< ", error string: " << folly::errnoStr(errno);
++stats_.writeSsdErrors;
// If write fails, we return without adding the pins to the cache. The
// entries are unchanged.
return false;
}
return true;
}

namespace {
int32_t indexOfFirstMismatch(char* x, char* y, int n) {
for (auto i = 0; i < n; ++i) {
Expand Down
5 changes: 5 additions & 0 deletions velox/common/caching/SsdFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,11 @@ class SsdFile {
// the files for making new checkpoints.
void initializeCheckpoint();

// Writes 'iovecs' to the SSD file at the 'offset'. Returns true if the write
// succeeds; otherwise, log the error and return false.
bool
write(uint64_t offset, uint64_t length, const std::vector<iovec>& iovecs);

// Synchronously logs that 'regions' are no longer valid in a possibly xisting
// checkpoint.
void logEviction(const std::vector<int32_t>& regions);
Expand Down
16 changes: 13 additions & 3 deletions velox/common/caching/tests/SsdFileTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ class SsdFileTest : public testing::Test {
}
}

// Gets consecutive entries from file 'fileId' starting at 'startOffset' with
// Gets consecutive entries from file 'fileId' starting at 'startOffset' with
// sizes between 'minSize' and 'maxSize'. Sizes start at 'minSize' and double
// each time and go back to 'minSize' after exceeding 'maxSize'. This stops
// after the total size has exceeded 'totalSize'. The entries are returned as
// pins. The pins are exclusive for newly created entries and shared for
// existing ones. New entries are deterministically initialized from 'fileId'
// existing ones. New entries are deterministically initialized from 'fileId'
// and the entry's offset.
std::vector<CachePin> makePins(
uint64_t fileId,
Expand Down Expand Up @@ -280,7 +280,7 @@ TEST_F(SsdFileTest, writeAndRead) {
}
}

// We check howmany entries are found. The earliest writes will have been
// We check how many entries are found. The earliest writes will have been
// evicted. We read back the found entries and check their contents.
int32_t numFound = 0;
for (auto& entry : allEntries) {
Expand All @@ -302,6 +302,16 @@ TEST_F(SsdFileTest, writeAndRead) {
}
}
}

// Test cache writes with different iobufs sizes.
for (int numPins : {0, 1, IOV_MAX - 1, IOV_MAX, IOV_MAX + 1}) {
SCOPED_TRACE(fmt::format("numPins: {}", numPins));
auto pins = makePins(fileName_.id(), 0, 4096, 4096, 4096 * numPins);
EXPECT_EQ(pins.size(), numPins);
ssdFile_->write(pins);
readAndCheckPins(pins);
pins.clear();
}
}

#ifdef VELOX_SSD_FILE_TEST_SET_NO_COW_FLAG
Expand Down

0 comments on commit a155b0f

Please sign in to comment.