Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent cache write from exceeding IOV_MAX #9438

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 62 additions & 27 deletions velox/common/caching/SsdFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ void addEntryToIovecs(AsyncDataCacheEntry& entry, std::vector<iovec>& iovecs) {
};
}
}

// Returns the number of entries in a cache 'entry'.
int32_t numIoVectorsFromEntry(AsyncDataCacheEntry& entry) {
zacw7 marked this conversation as resolved.
Show resolved Hide resolved
if (entry.tinyData() != nullptr) {
return 1;
}
return entry.data().numRuns();
}
} // namespace

SsdPin::SsdPin(SsdFile& file, SsdRun run) : file_(&file), run_(run) {
Expand Down Expand Up @@ -374,46 +382,56 @@ void SsdFile::write(std::vector<CachePin>& pins) {
VELOX_CHECK_NULL(entry->ssdFile());
}

int32_t storeIndex = 0;
while (storeIndex < pins.size()) {
auto space = getSpace(pins, storeIndex);
int32_t writeIndex = 0;
while (writeIndex < pins.size()) {
const auto space = getSpace(pins, writeIndex);
if (!space.has_value()) {
// No space can be reclaimed. The pins are freed when the caller is freed.
return;
}

auto [offset, available] = space.value();
int32_t numWritten = 0;
int32_t bytes = 0;
std::vector<iovec> iovecs;
for (auto i = storeIndex; i < pins.size(); ++i) {
int32_t numWrittenEntries = 0;
uint64_t writeOffset = offset;
int32_t writeLength = 0;
std::vector<iovec> writeIovecs;
for (auto i = writeIndex; i < pins.size(); ++i) {
auto* entry = pins[i].checkedEntry();
const auto entrySize = entry->size();
if (bytes + entrySize > available) {
const auto numIovecs = numIoVectorsFromEntry(*entry);
VELOX_CHECK_LE(numIovecs, IOV_MAX);
if (writeIovecs.size() + numIovecs > IOV_MAX) {
// Writes out the accumulated iovecs if it exceeds IOV_MAX limit.
if (!write(writeOffset, writeLength, writeIovecs)) {
// If write fails, we return without adding the pins to the cache. The
// entries are unchanged.
return;
}
writeIovecs.clear();
writeOffset += writeLength;
writeLength = 0;
}
if (writeLength + entrySize > available) {
break;
}
addEntryToIovecs(*entry, iovecs);
bytes += entrySize;
++numWritten;
}
VELOX_CHECK_GE(fileSize_, offset + bytes);

const auto rc = folly::pwritev(fd_, iovecs.data(), iovecs.size(), offset);
if (rc != bytes) {
VELOX_SSD_CACHE_LOG(ERROR)
<< "Failed to write to SSD, file name: " << fileName_
<< ", fd: " << fd_ << ", size: " << iovecs.size()
<< ", offset: " << offset << ", error code: " << errno
<< ", error string: " << folly::errnoStr(errno);
++stats_.writeSsdErrors;
// If write fails, we return without adding the pins to the cache. The
// entries are unchanged.
return;
addEntryToIovecs(*entry, writeIovecs);
writeLength += entrySize;
++numWrittenEntries;
}
if (writeLength > 0) {
VELOX_CHECK(!writeIovecs.empty());
if (!write(writeOffset, writeLength, writeIovecs)) {
return;
}
writeIovecs.clear();
writeOffset += writeLength;
zacw7 marked this conversation as resolved.
Show resolved Hide resolved
writeLength = 0;
}
VELOX_CHECK_GE(fileSize_, writeOffset);

{
std::lock_guard<std::shared_mutex> l(mutex_);
for (auto i = storeIndex; i < storeIndex + numWritten; ++i) {
for (auto i = writeIndex; i < writeIndex + numWrittenEntries; ++i) {
auto* entry = pins[i].checkedEntry();
entry->setSsdFile(this, offset);
const auto size = entry->size();
Expand All @@ -429,7 +447,7 @@ void SsdFile::write(std::vector<CachePin>& pins) {
bytesAfterCheckpoint_ += size;
}
}
storeIndex += numWritten;
writeIndex += numWrittenEntries;
}

if ((checkpointIntervalBytes_ > 0) &&
Expand All @@ -438,6 +456,23 @@ void SsdFile::write(std::vector<CachePin>& pins) {
}
}

bool SsdFile::write(
uint64_t offset,
uint64_t length,
const std::vector<iovec>& iovecs) {
const auto ret = folly::pwritev(fd_, iovecs.data(), iovecs.size(), offset);
if (ret == length) {
return true;
}
VELOX_SSD_CACHE_LOG(ERROR)
<< "Failed to write to SSD, file name: " << fileName_ << ", fd: " << fd_
<< ", size: " << iovecs.size() << ", offset: " << offset
<< ", error code: " << errno
<< ", error string: " << folly::errnoStr(errno);
++stats_.writeSsdErrors;
return false;
}

namespace {
int32_t indexOfFirstMismatch(char* x, char* y, int n) {
for (auto i = 0; i < n; ++i) {
Expand Down
5 changes: 5 additions & 0 deletions velox/common/caching/SsdFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,11 @@ class SsdFile {
// the files for making new checkpoints.
void initializeCheckpoint();

// Writes 'iovecs' to the SSD file at the 'offset'. Returns true if the write
// succeeds; otherwise, log the error and return false.
bool
write(uint64_t offset, uint64_t length, const std::vector<iovec>& iovecs);

// Synchronously logs that 'regions' are no longer valid in a possibly xisting
// checkpoint.
void logEviction(const std::vector<int32_t>& regions);
Expand Down
16 changes: 13 additions & 3 deletions velox/common/caching/tests/SsdFileTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ class SsdFileTest : public testing::Test {
}
}

// Gets consecutive entries from file 'fileId' starting at 'startOffset' with
// Gets consecutive entries from file 'fileId' starting at 'startOffset' with
// sizes between 'minSize' and 'maxSize'. Sizes start at 'minSize' and double
// each time and go back to 'minSize' after exceeding 'maxSize'. This stops
// after the total size has exceeded 'totalSize'. The entries are returned as
// pins. The pins are exclusive for newly created entries and shared for
// existing ones. New entries are deterministically initialized from 'fileId'
// existing ones. New entries are deterministically initialized from 'fileId'
// and the entry's offset.
std::vector<CachePin> makePins(
uint64_t fileId,
Expand Down Expand Up @@ -280,7 +280,7 @@ TEST_F(SsdFileTest, writeAndRead) {
}
}

// We check howmany entries are found. The earliest writes will have been
// We check how many entries are found. The earliest writes will have been
// evicted. We read back the found entries and check their contents.
int32_t numFound = 0;
for (auto& entry : allEntries) {
Expand All @@ -302,6 +302,16 @@ TEST_F(SsdFileTest, writeAndRead) {
}
}
}

// Test cache writes with different iobufs sizes.
for (int numPins : {0, 1, IOV_MAX - 1, IOV_MAX, IOV_MAX + 1}) {
zacw7 marked this conversation as resolved.
Show resolved Hide resolved
SCOPED_TRACE(fmt::format("numPins: {}", numPins));
auto pins = makePins(fileName_.id(), 0, 4096, 4096, 4096 * numPins);
EXPECT_EQ(pins.size(), numPins);
ssdFile_->write(pins);
readAndCheckPins(pins);
pins.clear();
}
}

#ifdef VELOX_SSD_FILE_TEST_SET_NO_COW_FLAG
Expand Down
Loading