diff --git a/velox/common/base/SpillConfig.h b/velox/common/base/SpillConfig.h index a51801a62a490..2f99ed0c6e2af 100644 --- a/velox/common/base/SpillConfig.h +++ b/velox/common/base/SpillConfig.h @@ -35,7 +35,7 @@ namespace facebook::velox::common { /// Defining type for a callback function that returns the spill directory path. /// Implementations can use it to ensure the path exists before returning. -using GetSpillDirectoryPathCB = std::function; +using GetSpillDirectoryPathCB = std::function; /// The callback used to update the aggregated spill bytes of a query. If the /// query spill limit is set, the callback throws if the aggregated spilled diff --git a/velox/common/base/tests/FsTest.cpp b/velox/common/base/tests/FsTest.cpp index a9dbd822c0faf..ebe7a7ee2e51b 100644 --- a/velox/common/base/tests/FsTest.cpp +++ b/velox/common/base/tests/FsTest.cpp @@ -24,7 +24,8 @@ namespace facebook::velox::common { class FsTest : public testing::Test {}; TEST_F(FsTest, createDirectory) { - auto rootPath = exec::test::TempDirectoryPath::createTempDirectory(); + auto dir = exec::test::TempDirectoryPath::create(); + auto rootPath = dir->path(); auto tmpDirectoryPath = rootPath + "/first/second/third"; // First time should generate directory successfully. EXPECT_FALSE(fs::exists(tmpDirectoryPath.c_str())); @@ -34,7 +35,7 @@ TEST_F(FsTest, createDirectory) { // Directory already exist, not creating but should return success. EXPECT_TRUE(generateFileDirectory(tmpDirectoryPath.c_str())); EXPECT_TRUE(fs::exists(tmpDirectoryPath.c_str())); - boost::filesystem::remove_all(rootPath); + dir.reset(); EXPECT_FALSE(fs::exists(rootPath.c_str())); } diff --git a/velox/common/caching/tests/AsyncDataCacheTest.cpp b/velox/common/caching/tests/AsyncDataCacheTest.cpp index b74afaabbab48..56427078007f0 100644 --- a/velox/common/caching/tests/AsyncDataCacheTest.cpp +++ b/velox/common/caching/tests/AsyncDataCacheTest.cpp @@ -106,7 +106,7 @@ class AsyncDataCacheTest : public testing::Test { tempDirectory_ = exec::test::TempDirectoryPath::create(); } ssdCache = std::make_unique( - fmt::format("{}/cache", tempDirectory_->path), + fmt::format("{}/cache", tempDirectory_->path()), ssdBytes, 4, executor(), @@ -816,7 +816,7 @@ TEST_F(AsyncDataCacheTest, DISABLED_ssd) { cache_->ssdCache()->clear(); // We cut the tail off one of the cache shards. - corruptFile(fmt::format("{}/cache0.cpt", tempDirectory_->path)); + corruptFile(fmt::format("{}/cache0.cpt", tempDirectory_->path())); // We open the cache from checkpoint. Reading checks the data integrity, here // we check that more data was read than written. initializeCache(kRamBytes, kSsdBytes); diff --git a/velox/common/caching/tests/SsdFileTest.cpp b/velox/common/caching/tests/SsdFileTest.cpp index 9cb89f2fabb92..80ca323517ada 100644 --- a/velox/common/caching/tests/SsdFileTest.cpp +++ b/velox/common/caching/tests/SsdFileTest.cpp @@ -67,7 +67,7 @@ class SsdFileTest : public testing::Test { tempDirectory_ = exec::test::TempDirectoryPath::create(); ssdFile_ = std::make_unique( - fmt::format("{}/ssdtest", tempDirectory_->path), + fmt::format("{}/ssdtest", tempDirectory_->path()), 0, // shardId bits::roundUp(ssdBytes, SsdFile::kRegionSize) / SsdFile::kRegionSize, 0, // checkpointInternalBytes diff --git a/velox/common/file/File.cpp b/velox/common/file/File.cpp index ea5ef04b4b7e0..a7f3f0c03f459 100644 --- a/velox/common/file/File.cpp +++ b/velox/common/file/File.cpp @@ -238,10 +238,8 @@ LocalWriteFile::LocalWriteFile( { if (shouldThrowOnFileAlreadyExists) { FILE* exists = fopen(buf.get(), "rb"); - VELOX_CHECK( - !exists, - "Failure in LocalWriteFile: path '{}' already exists.", - path); + VELOX_CHECK_NULL( + exists, "Failure in LocalWriteFile: path '{}' already exists.", path); } } auto* file = fopen(buf.get(), "ab"); diff --git a/velox/common/file/File.h b/velox/common/file/File.h index 2ce64cad3b989..1193f7c65f79b 100644 --- a/velox/common/file/File.h +++ b/velox/common/file/File.h @@ -225,14 +225,15 @@ class InMemoryWriteFile final : public WriteFile { std::string* file_; }; -// Current implementation for the local version is quite simple (e.g. no -// internal arenaing), as local disk writes are expected to be cheap. Local -// files match against any filepath starting with '/'. - +/// Current implementation for the local version is quite simple (e.g. no +/// internal arenaing), as local disk writes are expected to be cheap. Local +/// files match against any filepath starting with '/'. class LocalReadFile final : public ReadFile { public: explicit LocalReadFile(std::string_view path); + /// TODO: deprecate this after creating local file all through velox fs + /// interface. explicit LocalReadFile(int32_t fd); ~LocalReadFile(); diff --git a/velox/common/file/tests/CMakeLists.txt b/velox/common/file/tests/CMakeLists.txt index 446ef6a859e18..654bb05f9f46f 100644 --- a/velox/common/file/tests/CMakeLists.txt +++ b/velox/common/file/tests/CMakeLists.txt @@ -12,8 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +add_subdirectory(utils) +g add_library(velox_file_test_utils TestUtils.cpp) -target_link_libraries(velox_file_test_utils PUBLIC velox_file) +target_link_libraries(velox_file_test_utils + velox_file_test_lib PUBLIC velox_file) add_executable(velox_file_test FileTest.cpp UtilsTest.cpp) add_test(velox_file_test velox_file_test) diff --git a/velox/common/file/tests/FileTest.cpp b/velox/common/file/tests/FileTest.cpp index 9f837fc94a23d..034b44276707c 100644 --- a/velox/common/file/tests/FileTest.cpp +++ b/velox/common/file/tests/FileTest.cpp @@ -19,6 +19,7 @@ #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/file/File.h" #include "velox/common/file/FileSystems.h" +#include "velox/common/file/tests/utils/FaultyFileSystem.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" #include "velox/exec/tests/utils/TempFilePath.h" @@ -26,6 +27,7 @@ using namespace facebook::velox; using facebook::velox::common::Region; +using namespace facebook::velox::tests::utils; constexpr int kOneMB = 1 << 20; @@ -126,45 +128,58 @@ TEST(InMemoryFile, preadv) { EXPECT_EQ(expected, values); } -TEST(LocalFile, writeAndRead) { +class LocalFileTest : public ::testing::TestWithParam { + protected: + LocalFileTest() : useFaultyFs_(GetParam()) {} + + static void SetUpTestCase() { + filesystems::registerLocalFileSystem(); + tests::utils::registerFaultyFileSystem(); + } + + const bool useFaultyFs_; +}; + +TEST_P(LocalFileTest, writeAndRead) { for (bool useIOBuf : {true, false}) { - auto tempFile = ::exec::test::TempFilePath::create(); - const auto& filename = tempFile->path.c_str(); - remove(filename); + SCOPED_TRACE(fmt::format("useIOBuf: {}", useIOBuf)); + + auto tempFile = exec::test::TempFilePath::create(useFaultyFs_); + const auto& filename = tempFile->path(); + auto fs = filesystems::getFileSystem(filename, {}); + fs->remove(filename); { - LocalWriteFile writeFile(filename); - writeData(&writeFile, useIOBuf); - writeFile.close(); - ASSERT_EQ(writeFile.size(), 15 + kOneMB); + auto writeFile = fs->openFileForWrite(filename); + writeData(writeFile.get(), useIOBuf); + writeFile->close(); + ASSERT_EQ(writeFile->size(), 15 + kOneMB); } - LocalReadFile readFile(filename); - readData(&readFile); + auto readFile = fs->openFileForRead(filename); + readData(readFile.get()); } } -TEST(LocalFile, viaRegistry) { - filesystems::registerLocalFileSystem(); - auto tempFile = ::exec::test::TempFilePath::create(); - const auto& filename = tempFile->path.c_str(); - remove(filename); - auto lfs = filesystems::getFileSystem(filename, nullptr); +TEST_P(LocalFileTest, viaRegistry) { + auto tempFile = exec::test::TempFilePath::create(useFaultyFs_); + const auto& filename = tempFile->path(); + auto fs = filesystems::getFileSystem(filename, {}); + fs->remove(filename); { - auto writeFile = lfs->openFileForWrite(filename); + auto writeFile = fs->openFileForWrite(filename); writeFile->append("snarf"); } - auto readFile = lfs->openFileForRead(filename); + auto readFile = fs->openFileForRead(filename); ASSERT_EQ(readFile->size(), 5); char buffer1[5]; ASSERT_EQ(readFile->pread(0, 5, &buffer1), "snarf"); - lfs->remove(filename); + fs->remove(filename); } -TEST(LocalFile, rename) { - filesystems::registerLocalFileSystem(); - auto tempFolder = ::exec::test::TempDirectoryPath::create(); - auto a = fmt::format("{}/a", tempFolder->path); - auto b = fmt::format("{}/b", tempFolder->path); - auto newA = fmt::format("{}/newA", tempFolder->path); +TEST_P(LocalFileTest, rename) { + const auto tempFolder = ::exec::test::TempDirectoryPath::create(useFaultyFs_); + const auto a = fmt::format("{}/a", tempFolder->path()); + const auto b = fmt::format("{}/b", tempFolder->path()); + const auto newA = fmt::format("{}/newA", tempFolder->path()); const std::string data("aaaaa"); auto localFs = filesystems::getFileSystem(a, nullptr); { @@ -176,7 +191,7 @@ TEST(LocalFile, rename) { ASSERT_TRUE(localFs->exists(a)); ASSERT_TRUE(localFs->exists(b)); ASSERT_FALSE(localFs->exists(newA)); - EXPECT_THROW(localFs->rename(a, b), VeloxUserError); + VELOX_ASSERT_USER_THROW(localFs->rename(a, b), ""); localFs->rename(a, newA); ASSERT_FALSE(localFs->exists(a)); ASSERT_TRUE(localFs->exists(b)); @@ -187,11 +202,10 @@ TEST(LocalFile, rename) { ASSERT_EQ(readFile->pread(0, 5, &buffer), data); } -TEST(LocalFile, exists) { - filesystems::registerLocalFileSystem(); - auto tempFolder = ::exec::test::TempDirectoryPath::create(); - auto a = fmt::format("{}/a", tempFolder->path); - auto b = fmt::format("{}/b", tempFolder->path); +TEST_P(LocalFileTest, exists) { + auto tempFolder = ::exec::test::TempDirectoryPath::create(useFaultyFs_); + auto a = fmt::format("{}/a", tempFolder->path()); + auto b = fmt::format("{}/b", tempFolder->path()); auto localFs = filesystems::getFileSystem(a, nullptr); { auto writeFile = localFs->openFileForWrite(a); @@ -207,47 +221,50 @@ TEST(LocalFile, exists) { ASSERT_FALSE(localFs->exists(b)); } -TEST(LocalFile, list) { - filesystems::registerLocalFileSystem(); - auto tempFolder = ::exec::test::TempDirectoryPath::create(); - auto a = fmt::format("{}/1", tempFolder->path); - auto b = fmt::format("{}/2", tempFolder->path); +TEST_P(LocalFileTest, list) { + const auto tempFolder = ::exec::test::TempDirectoryPath::create(useFaultyFs_); + const auto a = fmt::format("{}/1", tempFolder->path()); + const auto b = fmt::format("{}/2", tempFolder->path()); auto localFs = filesystems::getFileSystem(a, nullptr); { auto writeFile = localFs->openFileForWrite(a); writeFile = localFs->openFileForWrite(b); } - auto files = localFs->list(std::string_view(tempFolder->path)); + auto files = localFs->list(std::string_view(tempFolder->path())); std::sort(files.begin(), files.end()); ASSERT_EQ(files, std::vector({a, b})); localFs->remove(a); ASSERT_EQ( - localFs->list(std::string_view(tempFolder->path)), + localFs->list(std::string_view(tempFolder->path())), std::vector({b})); localFs->remove(b); - ASSERT_TRUE(localFs->list(std::string_view(tempFolder->path)).empty()); + ASSERT_TRUE(localFs->list(std::string_view(tempFolder->path())).empty()); } -TEST(LocalFile, readFileDestructor) { - auto tempFile = ::exec::test::TempFilePath::create(); - const auto& filename = tempFile->path.c_str(); - remove(filename); +TEST_P(LocalFileTest, readFileDestructor) { + if (useFaultyFs_) { + return; + } + auto tempFile = exec::test::TempFilePath::create(useFaultyFs_); + const auto& filename = tempFile->path(); + auto fs = filesystems::getFileSystem(filename, {}); + fs->remove(filename); { - LocalWriteFile writeFile(filename); - writeData(&writeFile); + auto writeFile = fs->openFileForWrite(filename); + writeData(writeFile.get()); } { - LocalReadFile readFile(filename); - readData(&readFile); + auto readFile = fs->openFileForRead(filename); + readData(readFile.get()); } int32_t readFd; { - std::unique_ptr buf(new char[tempFile->path.size() + 1]); - buf[tempFile->path.size()] = 0; - memcpy(buf.get(), tempFile->path.data(), tempFile->path.size()); - readFd = open(buf.get(), O_RDONLY); + std::unique_ptr buf(new char[tempFile->path().size() + 1]); + buf[tempFile->path().size()] = 0; + ::memcpy(buf.get(), tempFile->path().c_str(), tempFile->path().size()); + readFd = ::open(buf.get(), O_RDONLY); } { LocalReadFile readFile(readFd); @@ -260,11 +277,10 @@ TEST(LocalFile, readFileDestructor) { } } -TEST(LocalFile, mkdir) { - filesystems::registerLocalFileSystem(); - auto tempFolder = ::exec::test::TempDirectoryPath::create(); +TEST_P(LocalFileTest, mkdir) { + auto tempFolder = exec::test::TempDirectoryPath::create(useFaultyFs_); - std::string path = tempFolder->path; + std::string path = tempFolder->path(); auto localFs = filesystems::getFileSystem(path, nullptr); // Create 3 levels of directories and ensure they exist. @@ -286,11 +302,10 @@ TEST(LocalFile, mkdir) { EXPECT_TRUE(localFs->exists(path)); } -TEST(LocalFile, rmdir) { - filesystems::registerLocalFileSystem(); - auto tempFolder = ::exec::test::TempDirectoryPath::create(); +TEST_P(LocalFileTest, rmdir) { + auto tempFolder = exec::test::TempDirectoryPath::create(useFaultyFs_); - std::string path = tempFolder->path; + std::string path = tempFolder->path(); auto localFs = filesystems::getFileSystem(path, nullptr); // Create 3 levels of directories and ensure they exist. @@ -309,24 +324,282 @@ TEST(LocalFile, rmdir) { EXPECT_TRUE(localFs->exists(path)); // Now delete the whole temp folder and ensure it is gone. - EXPECT_NO_THROW(localFs->rmdir(tempFolder->path)); - EXPECT_FALSE(localFs->exists(tempFolder->path)); + EXPECT_NO_THROW(localFs->rmdir(tempFolder->path())); + EXPECT_FALSE(localFs->exists(tempFolder->path())); // Delete a non-existing directory. path += "/does_not_exist/subdir"; EXPECT_FALSE(localFs->exists(path)); // The function does not throw, but will return zero files and folders // deleted, which is not an error. - EXPECT_NO_THROW(localFs->rmdir(tempFolder->path)); + EXPECT_NO_THROW(localFs->rmdir(tempFolder->path())); } -TEST(LocalFile, fileNotFound) { - filesystems::registerLocalFileSystem(); - auto tempFolder = ::exec::test::TempDirectoryPath::create(); - auto path = fmt::format("{}/file", tempFolder->path); +TEST_P(LocalFileTest, fileNotFound) { + auto tempFolder = exec::test::TempDirectoryPath::create(useFaultyFs_); + auto path = fmt::format("{}/file", tempFolder->path()); auto localFs = filesystems::getFileSystem(path, nullptr); VELOX_ASSERT_RUNTIME_THROW_CODE( localFs->openFileForRead(path), error_code::kFileNotFound, "No such file or directory"); } + +INSTANTIATE_TEST_SUITE_P( + LocalFileTestSuite, + LocalFileTest, + ::testing::Values(false, true)); + +class FaultyFsTest : public ::testing::Test { + protected: + FaultyFsTest() {} + + static void SetUpTestCase() { + filesystems::registerLocalFileSystem(); + tests::utils::registerFaultyFileSystem(); + } + + void SetUp() { + dir_ = exec::test::TempDirectoryPath::create(true); + fs_ = std::dynamic_pointer_cast( + filesystems::getFileSystem(dir_->path(), {})); + VELOX_CHECK_NOT_NULL(fs_); + filePath_ = fmt::format("{}/faultyTestFile", dir_->path()); + const int bufSize = 1024; + buffer_.resize(bufSize); + for (int i = 0; i < bufSize; ++i) { + buffer_[i] = i % 256; + } + { + auto writeFile = fs_->openFileForWrite(filePath_, {}); + writeData(writeFile.get()); + } + auto readFile = fs_->openFileForRead(filePath_, {}); + readData(readFile.get(), true); + try { + VELOX_FAIL("InjectedFaultFileError"); + } catch (VeloxRuntimeError&) { + fileError_ = std::current_exception(); + } + } + + void TearDown() { + fs_->clearFileFaultInjections(); + } + + void writeData(WriteFile* file) { + file->append(std::string_view(buffer_)); + file->flush(); + } + + void readData(ReadFile* file, bool useReadv = false) { + char readBuf[buffer_.size()]; + if (!useReadv) { + file->pread(0, buffer_.size(), readBuf); + } else { + std::vector> buffers; + buffers.push_back(folly::Range(readBuf, buffer_.size())); + file->preadv(0, buffers); + } + for (int i = 0; i < buffer_.size(); ++i) { + if (buffer_[i] != readBuf[i]) { + VELOX_FAIL("Data Mismatch"); + } + } + } + + std::shared_ptr dir_; + std::string filePath_; + std::shared_ptr fs_; + std::string buffer_; + std::exception_ptr fileError_; +}; + +TEST_F(FaultyFsTest, fileReadErrorInjection) { + // Set read error. + fs_->setFileInjectionError(fileError_, {FaultFileOperation::Type::kRead}); + { + auto readFile = fs_->openFileForRead(filePath_, {}); + VELOX_ASSERT_THROW( + readData(readFile.get(), false), "InjectedFaultFileError"); + } + { + auto readFile = fs_->openFileForRead(filePath_, {}); + // We only inject error for pread API so preadv should be fine. + readData(readFile.get(), true); + } + + // Set readv error + fs_->setFileInjectionError(fileError_, {FaultFileOperation::Type::kReadv}); + { + auto readFile = fs_->openFileForRead(filePath_, {}); + VELOX_ASSERT_THROW( + readData(readFile.get(), true), "InjectedFaultFileError"); + } + { + auto readFile = fs_->openFileForRead(filePath_, {}); + // We only inject error for pread API so preadv should be fine. + readData(readFile.get(), false); + } + + // Set error for all kinds of operations. + fs_->setFileInjectionError(fileError_); + auto readFile = fs_->openFileForRead(filePath_, {}); + VELOX_ASSERT_THROW(readData(readFile.get(), true), "InjectedFaultFileError"); + VELOX_ASSERT_THROW(readData(readFile.get(), false), "InjectedFaultFileError"); + fs_->remove(filePath_); + // Check there is no interference on write as we don't support it for now. + auto writeFile = fs_->openFileForWrite(filePath_, {}); + writeData(writeFile.get()); +} + +TEST_F(FaultyFsTest, fileReadDelayInjection) { + // Set 3 seconds delay. + const uint64_t injectDelay{2'000'000}; + fs_->setFileInjectionDelay(injectDelay, {FaultFileOperation::Type::kRead}); + { + auto readFile = fs_->openFileForRead(filePath_, {}); + uint64_t readDurationUs{0}; + { + MicrosecondTimer readTimer(&readDurationUs); + readData(readFile.get(), false); + } + ASSERT_GE(readDurationUs, injectDelay); + } + { + auto readFile = fs_->openFileForRead(filePath_, {}); + // We only inject error for pread API so preadv should be fine. + uint64_t readDurationUs{0}; + { + MicrosecondTimer readTimer(&readDurationUs); + readData(readFile.get(), true); + } + ASSERT_LT(readDurationUs, injectDelay); + } + + // Set readv error + fs_->setFileInjectionDelay(injectDelay, {FaultFileOperation::Type::kReadv}); + { + auto readFile = fs_->openFileForRead(filePath_, {}); + uint64_t readDurationUs{0}; + { + MicrosecondTimer readTimer(&readDurationUs); + readData(readFile.get(), true); + } + ASSERT_GE(readDurationUs, injectDelay); + } + { + auto readFile = fs_->openFileForRead(filePath_, {}); + // We only inject error for pread API so preadv should be fine. + uint64_t readDurationUs{0}; + { + MicrosecondTimer readTimer(&readDurationUs); + readData(readFile.get(), false); + } + ASSERT_LT(readDurationUs, injectDelay); + } + + // Set error for all kinds of operations. + fs_->setFileInjectionDelay(injectDelay); + { + auto readFile = fs_->openFileForRead(filePath_, {}); + // We only inject error for pread API so preadv should be fine. + uint64_t readDurationUs{0}; + { + MicrosecondTimer readTimer(&readDurationUs); + readData(readFile.get(), false); + } + ASSERT_GE(readDurationUs, injectDelay); + } + { + auto readFile = fs_->openFileForRead(filePath_, {}); + // We only inject error for pread API so preadv should be fine. + uint64_t readDurationUs{0}; + { + MicrosecondTimer readTimer(&readDurationUs); + readData(readFile.get(), false); + } + ASSERT_GE(readDurationUs, injectDelay); + } + + fs_->remove(filePath_); + // Check there is no interference on write as we don't support it for now. + auto writeFile = fs_->openFileForWrite(filePath_, {}); + uint64_t writeDurationUs{0}; + { + MicrosecondTimer writeTimer(&writeDurationUs); + writeData(writeFile.get()); + } + ASSERT_LT(writeDurationUs, injectDelay); +} + +TEST_F(FaultyFsTest, fileReadFaultHookInjection) { + const std::string path1 = fmt::format("{}/hookFile1", dir_->path()); + { + auto writeFile = fs_->openFileForWrite(path1, {}); + writeData(writeFile.get()); + auto readFile = fs_->openFileForRead(path1, {}); + readData(readFile.get()); + } + const std::string path2 = fmt::format("{}/hookFile2", dir_->path()); + { + auto writeFile = fs_->openFileForWrite(path2, {}); + writeData(writeFile.get()); + auto readFile = fs_->openFileForRead(path2, {}); + readData(readFile.get()); + } + // Set read error. + fs_->setFileInjectionHook([&](FaultFileOperation* op) { + // Only inject error for readv. + if (op->type != FaultFileOperation::Type::kReadv) { + return; + } + // Only inject error for path2. + if (op->path != path2) { + return; + } + VELOX_FAIL("inject hook read failure"); + }); + { + auto readFile = fs_->openFileForRead(path1, {}); + readData(readFile.get(), false); + readData(readFile.get(), true); + } + { + auto readFile = fs_->openFileForRead(path2, {}); + // Verify only throw for readv. + readData(readFile.get(), false); + VELOX_ASSERT_THROW( + readData(readFile.get(), true), "inject hook read failure"); + } + + // Set to return fake data. + fs_->setFileInjectionHook([&](FaultFileOperation* op) { + // Only inject error for path1. + if (op->path != path1) { + return; + } + // Only inject error for read. + if (op->type != FaultFileOperation::Type::kRead) { + return; + } + auto* readOp = static_cast(op); + char* readBuf = static_cast(readOp->buf); + for (int i = 0; i < readOp->length; ++i) { + readBuf[i] = 0; + } + readOp->delegate = false; + }); + + { + auto readFile = fs_->openFileForRead(path2, {}); + readData(readFile.get(), false); + readData(readFile.get(), true); + } + { + auto readFile = fs_->openFileForRead(path1, {}); + // Verify only throw for read. + readData(readFile.get(), true); + VELOX_ASSERT_THROW(readData(readFile.get(), false), "Data Mismatch"); + } +} \ No newline at end of file diff --git a/velox/common/file/tests/utils/CMakeLists.txt b/velox/common/file/tests/utils/CMakeLists.txt new file mode 100644 index 0000000000000..56bdede2f417e --- /dev/null +++ b/velox/common/file/tests/utils/CMakeLists.txt @@ -0,0 +1,22 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_library( + velox_file_test_lib + FaultyFile.cpp + FaultyFileSystem.cpp) + +target_link_libraries( + velox_file_test_lib + velox_file) diff --git a/velox/common/file/tests/utils/FaultyFile.cpp b/velox/common/file/tests/utils/FaultyFile.cpp new file mode 100644 index 0000000000000..89ecc85e92fc8 --- /dev/null +++ b/velox/common/file/tests/utils/FaultyFile.cpp @@ -0,0 +1,79 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/file/tests/utils/FaultyFile.h" + +namespace facebook::velox::tests::utils { + +FaultyReadFile::FaultyReadFile( + const std::string& path, + std::shared_ptr delegatedFile, + FileFaultInjectionHook injectionHook) + : path_(path), + delegatedFile_(std::move(delegatedFile)), + injectionHook_(std::move(injectionHook)) { + VELOX_CHECK_NOT_NULL(delegatedFile_); +} + +std::string_view +FaultyReadFile::pread(uint64_t offset, uint64_t length, void* buf) const { + if (injectionHook_ != nullptr) { + FaultFileReadOperation op(path_, offset, length, buf); + injectionHook_(&op); + if (!op.delegate) { + return std::string_view(static_cast(op.buf), op.length); + } + } + return delegatedFile_->pread(offset, length, buf); +} + +uint64_t FaultyReadFile::preadv( + uint64_t offset, + const std::vector>& buffers) const { + if (injectionHook_ != nullptr) { + FaultFileReadvOperation op(path_, offset, buffers); + injectionHook_(&op); + if (!op.delegate) { + return op.readBytes; + } + } + return delegatedFile_->preadv(offset, buffers); +} + +FaultyWriteFile::FaultyWriteFile( + std::shared_ptr delegatedFile, + FileFaultInjectionHook injectionHook) + : delegatedFile_(std::move(delegatedFile)), + injectionHook_(std::move(injectionHook)) { + VELOX_CHECK_NOT_NULL(delegatedFile_); +} + +void FaultyWriteFile::append(std::string_view data) { + delegatedFile_->append(data); +} + +void FaultyWriteFile::append(std::unique_ptr data) { + delegatedFile_->append(std::move(data)); +} + +void FaultyWriteFile::flush() { + delegatedFile_->flush(); +} + +void FaultyWriteFile::close() { + delegatedFile_->close(); +} +} // namespace facebook::velox::tests::utils diff --git a/velox/common/file/tests/utils/FaultyFile.h b/velox/common/file/tests/utils/FaultyFile.h new file mode 100644 index 0000000000000..4916fad3e2f83 --- /dev/null +++ b/velox/common/file/tests/utils/FaultyFile.h @@ -0,0 +1,150 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/common/file/File.h" + +namespace facebook::velox::tests::utils { + +/// Defines the per-file operation fault injection. +struct FaultFileOperation { + enum class Type { + /// Injects faults for file read operations. + kRead, + kReadv, + /// TODO: add to support fault injections for the other file operation + /// types. + }; + + const Type type; + + /// The delegated file path. + const std::string path; + + /// Indicates to forward this operation to the delegated file or not. If not, + /// then the file fault injection hook must have processed the request. For + /// instance, if this is a file read injection, then the hook must have filled + /// the fake read data for data corruption tests. + bool delegate{true}; + + FaultFileOperation(Type _type, const std::string& _path) + : type(_type), path(_path) {} +}; + +/// Fault injection parameters for file read API. +struct FaultFileReadOperation : FaultFileOperation { + const uint64_t offset; + const uint64_t length; + void* const buf; + + FaultFileReadOperation( + const std::string& _path, + uint64_t _offset, + uint64_t _length, + void* _buf) + : FaultFileOperation(FaultFileOperation::Type::kRead, _path), + offset(_offset), + length(_length), + buf(_buf) {} +}; + +/// Fault injection parameters for file readv API. +struct FaultFileReadvOperation : FaultFileOperation { + const uint64_t offset; + const std::vector>& buffers; + uint64_t readBytes{0}; + + FaultFileReadvOperation( + const std::string& _path, + uint64_t _offset, + const std::vector>& _buffers) + : FaultFileOperation(FaultFileOperation::Type::kReadv, _path), + offset(_offset), + buffers(_buffers) {} +}; + +/// The fault injection hook on the file operation path. +using FileFaultInjectionHook = std::function; + +class FaultyReadFile : public ReadFile { + public: + FaultyReadFile( + const std::string& path, + std::shared_ptr delegatedFile, + FileFaultInjectionHook injectionHook); + + ~FaultyReadFile() override{}; + + uint64_t size() const override { + return delegatedFile_->size(); + } + + std::string_view pread(uint64_t offset, uint64_t length, void* buf) + const override; + + uint64_t preadv( + uint64_t offset, + const std::vector>& buffers) const override; + + uint64_t memoryUsage() const override { + return delegatedFile_->memoryUsage(); + } + + bool shouldCoalesce() const override { + return delegatedFile_->shouldCoalesce(); + } + + std::string getName() const override { + return delegatedFile_->getName(); + } + + uint64_t getNaturalReadSize() const override { + return delegatedFile_->getNaturalReadSize(); + } + + private: + const std::string path_; + const std::shared_ptr delegatedFile_; + const FileFaultInjectionHook injectionHook_; +}; + +class FaultyWriteFile : public WriteFile { + public: + FaultyWriteFile( + std::shared_ptr delegatedFile, + FileFaultInjectionHook injectionHook); + + ~FaultyWriteFile() override{}; + + void append(std::string_view data) override; + + void append(std::unique_ptr data) override; + + void flush() override; + + void close() override; + + uint64_t size() const override { + return delegatedFile_->size(); + } + + private: + const std::shared_ptr delegatedFile_; + const FileFaultInjectionHook injectionHook_; +}; + +} // namespace facebook::velox::tests::utils diff --git a/velox/common/file/tests/utils/FaultyFileSystem.cpp b/velox/common/file/tests/utils/FaultyFileSystem.cpp new file mode 100644 index 0000000000000..cb1b79ef44bf9 --- /dev/null +++ b/velox/common/file/tests/utils/FaultyFileSystem.cpp @@ -0,0 +1,192 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/file/tests/utils/FaultyFileSystem.h" +#include + +#include + +namespace facebook::velox::tests::utils { +namespace { +// Extracts the delegated real file path by removing the faulty file system +// scheme prefix. +inline std::string extractPath(std::string_view path) { + VELOX_CHECK_EQ(path.find(FaultyFileSystem::scheme()), 0); + return std::string(path.substr(FaultyFileSystem::scheme().length())); +} + +// Constructs the faulty file path based on the delegated read file 'path'. It +// pre-appends the faulty file system scheme. +inline std::string faultyPath(const std::string& path) { + return fmt::format("{}{}", FaultyFileSystem::scheme(), path); +} + +std::function schemeMatcher() { + // Note: presto behavior is to prefix local paths with 'file:'. + // Check for that prefix and prune to absolute regular paths as needed. + return [](std::string_view filePath) { + return filePath.find(FaultyFileSystem::scheme()) == 0; + }; +} + +folly::once_flag faultFilesystemInitOnceFlag; + +std::function(std::shared_ptr, std::string_view)> +fileSystemGenerator() { + return [](std::shared_ptr properties, + std::string_view /*unused*/) { + // One instance of faulty FileSystem is sufficient. Initializes on first + // access and reuse after that. + static std::shared_ptr lfs; + folly::call_once(faultFilesystemInitOnceFlag, [&properties]() { + lfs = std::make_shared(std::move(properties)); + }); + return lfs; + }; +} +} // namespace + +std::unique_ptr FaultyFileSystem::openFileForRead( + std::string_view path, + const FileOptions& options) { + const std::string delegatedPath = extractPath(path); + auto delegatedFile = getFileSystem(delegatedPath, config_) + ->openFileForRead(delegatedPath, options); + return std::make_unique( + std::string(path), std::move(delegatedFile), [&](FaultFileOperation* op) { + maybeInjectFileFault(op); + }); +} + +std::unique_ptr FaultyFileSystem::openFileForWrite( + std::string_view path, + const FileOptions& options) { + const std::string delegatedPath = extractPath(path); + auto delegatedFile = getFileSystem(delegatedPath, config_) + ->openFileForWrite(delegatedPath, options); + return std::make_unique( + std::move(delegatedFile), + [&](FaultFileOperation* op) { maybeInjectFileFault(op); }); +} + +void FaultyFileSystem::remove(std::string_view path) { + const std::string delegatedPath = extractPath(path); + getFileSystem(delegatedPath, config_)->remove(delegatedPath); +} + +void FaultyFileSystem::rename( + std::string_view oldPath, + std::string_view newPath, + bool overwrite) { + const auto delegatedOldPath = extractPath(oldPath); + const auto delegatedNewPath = extractPath(newPath); + getFileSystem(delegatedOldPath, config_) + ->rename(delegatedOldPath, delegatedNewPath, overwrite); +} + +bool FaultyFileSystem::exists(std::string_view path) { + const auto delegatedPath = extractPath(path); + return getFileSystem(delegatedPath, config_)->exists(delegatedPath); +} + +std::vector FaultyFileSystem::list(std::string_view path) { + const auto delegatedDirPath = extractPath(path); + const auto delegatedFiles = + getFileSystem(delegatedDirPath, config_)->list(delegatedDirPath); + // NOTE: we shall return the faulty file paths instead of the delegated file + // paths for list result. + std::vector files; + files.reserve(delegatedFiles.size()); + for (const auto& delegatedFile : delegatedFiles) { + files.push_back(faultyPath(delegatedFile)); + } + return files; +} + +void FaultyFileSystem::mkdir(std::string_view path) { + const auto delegatedDirPath = extractPath(path); + getFileSystem(delegatedDirPath, config_)->mkdir(delegatedDirPath); +} + +void FaultyFileSystem::rmdir(std::string_view path) { + const auto delegatedDirPath = extractPath(path); + getFileSystem(delegatedDirPath, config_)->rmdir(delegatedDirPath); +} + +void FaultyFileSystem::setFileInjectionHook( + FileFaultInjectionHook injectionHook) { + std::lock_guard l(mu_); + fileInjections_ = FileInjections(std::move(injectionHook)); +} + +void FaultyFileSystem::setFileInjectionError( + std::exception_ptr error, + std::unordered_set opTypes) { + std::lock_guard l(mu_); + fileInjections_ = FileInjections(std::move(error), std::move(opTypes)); +} + +void FaultyFileSystem::setFileInjectionDelay( + uint64_t delayUs, + std::unordered_set opTypes) { + std::lock_guard l(mu_); + fileInjections_ = FileInjections(delayUs, std::move(opTypes)); +} + +void FaultyFileSystem::clearFileFaultInjections() { + std::lock_guard l(mu_); + fileInjections_.reset(); +} + +void FaultyFileSystem::maybeInjectFileFault(FaultFileOperation* op) { + FileInjections injections; + { + std::lock_guard l(mu_); + if (!fileInjections_.has_value()) { + return; + } + injections = fileInjections_.value(); + } + + if (injections.fileInjectionHook != nullptr) { + injections.fileInjectionHook(op); + return; + } + + if (!injections.opTypes.empty() && injections.opTypes.count(op->type) == 0) { + return; + } + + if (injections.fileException != nullptr) { + std::rethrow_exception(injections.fileException); + } + + if (injections.fileDelayUs != 0) { + std::this_thread::sleep_for( + std::chrono::microseconds(injections.fileDelayUs)); + } +} + +void registerFaultyFileSystem() { + registerFileSystem(schemeMatcher(), fileSystemGenerator()); +} + +std::shared_ptr faultyFileSystem() { + return std::dynamic_pointer_cast( + getFileSystem(FaultyFileSystem::scheme(), {})); +} +} // namespace facebook::velox::tests::utils diff --git a/velox/common/file/tests/utils/FaultyFileSystem.h b/velox/common/file/tests/utils/FaultyFileSystem.h new file mode 100644 index 0000000000000..ec9756255e36a --- /dev/null +++ b/velox/common/file/tests/utils/FaultyFileSystem.h @@ -0,0 +1,131 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/common/file/FileSystems.h" + +#include +#include +#include +#include "velox/common/file/tests/utils/FaultyFile.h" + +namespace facebook::velox::tests::utils { + +using namespace filesystems; + +/// Implements faulty filesystem for io fault injection in unit test. It is a +/// wrapper on top of a real file system, and by default it delegates the the +/// file operation to the real file system underneath. +class FaultyFileSystem : public FileSystem { + public: + explicit FaultyFileSystem(std::shared_ptr config) + : FileSystem(std::move(config)) {} + + ~FaultyFileSystem() override {} + + static inline std::string scheme() { + return "faulty:"; + } + + std::string name() const override { + return "Faulty FS"; + } + + std::unique_ptr openFileForRead( + std::string_view path, + const FileOptions& options) override; + + std::unique_ptr openFileForWrite( + std::string_view path, + const FileOptions& options) override; + + void remove(std::string_view path) override; + + void rename( + std::string_view oldPath, + std::string_view newPath, + bool overwrite) override; + + bool exists(std::string_view path) override; + + std::vector list(std::string_view path) override; + + void mkdir(std::string_view path) override; + + void rmdir(std::string_view path) override; + + /// Setups hook for file fault injection. + void setFileInjectionHook(FileFaultInjectionHook hook); + + /// Setups to inject 'error' for a particular set of file operation types. If + /// 'opTypes' is empty, it injects error for all kinds of file operation + /// types. + void setFileInjectionError( + std::exception_ptr error, + std::unordered_set opTypes = {}); + + /// Setups to inject delay for a particular set of file operation types. If + /// 'opTypes' is empty, it injects delay for all kinds of file operation + /// types. + void setFileInjectionDelay( + uint64_t delayUs, + std::unordered_set opTypes = {}); + + /// Clears the file fault injections. + void clearFileFaultInjections(); + + private: + // Defines the file injection setup and only one type of injection can be set + // at a time. + struct FileInjections { + FileFaultInjectionHook fileInjectionHook{nullptr}; + + std::exception_ptr fileException{nullptr}; + + uint64_t fileDelayUs{0}; + + std::unordered_set opTypes{}; + + FileInjections() = default; + + explicit FileInjections(FileFaultInjectionHook _fileInjectionHook) + : fileInjectionHook(std::move(_fileInjectionHook)) {} + + FileInjections( + uint64_t _fileDelayUs, + std::unordered_set _opTypes) + : fileDelayUs(_fileDelayUs), opTypes(std::move(_opTypes)) {} + + FileInjections( + std::exception_ptr _fileException, + std::unordered_set _opTypes) + : fileException(std::move(_fileException)), + opTypes(std::move(_opTypes)) {} + }; + + // Invoked to inject file fault to 'op' if configured. + void maybeInjectFileFault(FaultFileOperation* op); + + mutable std::mutex mu_; + std::optional fileInjections_; +}; + +/// Registers the faulty filesystem. +void registerFaultyFileSystem(); + +/// Gets the fault filesystem instance. +std::shared_ptr faultyFileSystem(); +} // namespace facebook::velox::tests::utils diff --git a/velox/common/memory/tests/SharedArbitratorTest.cpp b/velox/common/memory/tests/SharedArbitratorTest.cpp index 05badf0306b13..7a5d61bca494c 100644 --- a/velox/common/memory/tests/SharedArbitratorTest.cpp +++ b/velox/common/memory/tests/SharedArbitratorTest.cpp @@ -723,7 +723,7 @@ DEBUG_ONLY_TEST_F( VELOX_ASSERT_THROW( AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(queryCtx) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kJoinSpillEnabled, "true") .config(core::QueryConfig::kSpillNumPartitionBits, "2") @@ -794,7 +794,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, asyncArbitratonFromNonDriverContext) { std::thread queryThread([&]() { task = AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(queryCtx) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kJoinSpillEnabled, "true") .config(core::QueryConfig::kSpillNumPartitionBits, "2") @@ -865,7 +865,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, runtimeStats) { auto writerPlan = PlanBuilder() .values(vectors) - .tableWrite(outputDirectory->path) + .tableWrite(outputDirectory->path()) .singleAggregation( {}, {fmt::format("sum({})", TableWriteTraits::rowCountColumnName())}) @@ -875,7 +875,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, runtimeStats) { AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(queryCtx) .maxDrivers(1) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kWriterSpillEnabled, "true") // Set 0 file writer flush threshold to always trigger flush in diff --git a/velox/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp b/velox/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp index 44fa5a3c85085..49a730179b109 100644 --- a/velox/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp +++ b/velox/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp @@ -99,7 +99,7 @@ class AbfsFileSystemTest : public testing::Test { private: static std::shared_ptr<::exec::test::TempFilePath> createFile( uint64_t size = -1) { - auto tempFile = ::exec::test::TempFilePath::create(); + auto tempFile = exec::test::TempFilePath::create(); if (size == -1) { tempFile->append("aaaaa"); tempFile->append("bbbbb"); diff --git a/velox/connectors/hive/storage_adapters/hdfs/tests/HdfsFileSystemTest.cpp b/velox/connectors/hive/storage_adapters/hdfs/tests/HdfsFileSystemTest.cpp index ac8ed66f7c0f5..c316d1891e130 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/tests/HdfsFileSystemTest.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/tests/HdfsFileSystemTest.cpp @@ -64,7 +64,7 @@ class HdfsFileSystemTest : public testing::Test { private: static std::shared_ptr<::exec::test::TempFilePath> createFile() { - auto tempFile = ::exec::test::TempFilePath::create(); + auto tempFile = exec::test::TempFilePath::create(); tempFile->append("aaaaa"); tempFile->append("bbbbb"); tempFile->append(std::string(kOneMB, 'c')); diff --git a/velox/connectors/hive/tests/FileHandleTest.cpp b/velox/connectors/hive/tests/FileHandleTest.cpp index 659f0299f9eeb..3832e303d5ee9 100644 --- a/velox/connectors/hive/tests/FileHandleTest.cpp +++ b/velox/connectors/hive/tests/FileHandleTest.cpp @@ -27,8 +27,8 @@ using namespace facebook::velox; TEST(FileHandleTest, localFile) { filesystems::registerLocalFileSystem(); - auto tempFile = ::exec::test::TempFilePath::create(); - const auto& filename = tempFile->path; + auto tempFile = exec::test::TempFilePath::create(); + const auto& filename = tempFile->path(); remove(filename.c_str()); { diff --git a/velox/connectors/hive/tests/HiveDataSinkTest.cpp b/velox/connectors/hive/tests/HiveDataSinkTest.cpp index 7ee1e3a065dda..1afd9bb98e8ee 100644 --- a/velox/connectors/hive/tests/HiveDataSinkTest.cpp +++ b/velox/connectors/hive/tests/HiveDataSinkTest.cpp @@ -475,7 +475,7 @@ TEST_F(HiveDataSinkTest, hiveBucketProperty) { TEST_F(HiveDataSinkTest, basic) { const auto outputDirectory = TempDirectoryPath::create(); - auto dataSink = createDataSink(rowType_, outputDirectory->path); + auto dataSink = createDataSink(rowType_, outputDirectory->path()); auto stats = dataSink->stats(); ASSERT_TRUE(stats.empty()) << stats.toString(); ASSERT_EQ( @@ -503,14 +503,14 @@ TEST_F(HiveDataSinkTest, basic) { ASSERT_EQ(partitions.size(), 1); createDuckDbTable(vectors); - verifyWrittenData(outputDirectory->path); + verifyWrittenData(outputDirectory->path()); } TEST_F(HiveDataSinkTest, close) { for (bool empty : {true, false}) { SCOPED_TRACE(fmt::format("Data sink is empty: {}", empty)); const auto outputDirectory = TempDirectoryPath::create(); - auto dataSink = createDataSink(rowType_, outputDirectory->path); + auto dataSink = createDataSink(rowType_, outputDirectory->path()); auto vectors = createVectors(500, 1); @@ -532,7 +532,7 @@ TEST_F(HiveDataSinkTest, close) { ASSERT_EQ(partitions.size(), 1); ASSERT_GT(stats.numWrittenBytes, 0); createDuckDbTable(vectors); - verifyWrittenData(outputDirectory->path); + verifyWrittenData(outputDirectory->path()); } else { ASSERT_TRUE(partitions.empty()); ASSERT_EQ(stats.numWrittenBytes, 0); @@ -544,7 +544,7 @@ TEST_F(HiveDataSinkTest, abort) { for (bool empty : {true, false}) { SCOPED_TRACE(fmt::format("Data sink is empty: {}", empty)); const auto outputDirectory = TempDirectoryPath::create(); - auto dataSink = createDataSink(rowType_, outputDirectory->path); + auto dataSink = createDataSink(rowType_, outputDirectory->path()); auto vectors = createVectors(1, 1); int initialBytes = 0; @@ -634,7 +634,7 @@ TEST_F(HiveDataSinkTest, memoryReclaim) { if (testData.writerSpillEnabled) { spillDirectory = exec::test::TempDirectoryPath::create(); spillConfig = - getSpillConfig(spillDirectory->path, testData.writerFlushThreshold); + getSpillConfig(spillDirectory->path(), testData.writerFlushThreshold); auto connectorQueryCtx = std::make_unique( opPool_.get(), connectorPool_.get(), @@ -664,7 +664,7 @@ TEST_F(HiveDataSinkTest, memoryReclaim) { auto dataSink = createDataSink( rowType_, - outputDirectory->path, + outputDirectory->path(), testData.format, partitionBy, bucketProperty); @@ -771,7 +771,7 @@ TEST_F(HiveDataSinkTest, memoryReclaimAfterClose) { std::unique_ptr spillConfig; if (testData.writerSpillEnabled) { spillDirectory = exec::test::TempDirectoryPath::create(); - spillConfig = getSpillConfig(spillDirectory->path, 0); + spillConfig = getSpillConfig(spillDirectory->path(), 0); auto connectorQueryCtx = std::make_unique( opPool_.get(), connectorPool_.get(), @@ -801,7 +801,7 @@ TEST_F(HiveDataSinkTest, memoryReclaimAfterClose) { auto dataSink = createDataSink( rowType_, - outputDirectory->path, + outputDirectory->path(), testData.format, partitionBy, bucketProperty); @@ -865,7 +865,7 @@ DEBUG_ONLY_TEST_F(HiveDataSinkTest, sortWriterFailureTest) { const std::shared_ptr spillDirectory = exec::test::TempDirectoryPath::create(); std::unique_ptr spillConfig = - getSpillConfig(spillDirectory->path, 0); + getSpillConfig(spillDirectory->path(), 0); // Triggers the memory reservation in sort buffer. spillConfig->minSpillableReservationPct = 1'000; auto connectorQueryCtx = std::make_unique( @@ -883,7 +883,7 @@ DEBUG_ONLY_TEST_F(HiveDataSinkTest, sortWriterFailureTest) { auto dataSink = createDataSink( rowType_, - outputDirectory->path, + outputDirectory->path(), dwio::common::FileFormat::DWRF, partitionBy, bucketProperty); diff --git a/velox/dwio/common/tests/LocalFileSinkTest.cpp b/velox/dwio/common/tests/LocalFileSinkTest.cpp index 90394f59d1768..cf7795f9de64d 100644 --- a/velox/dwio/common/tests/LocalFileSinkTest.cpp +++ b/velox/dwio/common/tests/LocalFileSinkTest.cpp @@ -34,7 +34,7 @@ class LocalFileSinkTest : public testing::Test { void runTest() { auto root = TempDirectoryPath::create(); - auto filePath = fs::path(root->path) / "xxx/yyy/zzz/test_file.ext"; + auto filePath = fs::path(root->path()) / "xxx/yyy/zzz/test_file.ext"; ASSERT_FALSE(fs::exists(filePath.string())); diff --git a/velox/dwio/common/tests/ReadFileInputStreamTests.cpp b/velox/dwio/common/tests/ReadFileInputStreamTests.cpp index 4beab04458474..0d2ce3bfa5bd0 100644 --- a/velox/dwio/common/tests/ReadFileInputStreamTests.cpp +++ b/velox/dwio/common/tests/ReadFileInputStreamTests.cpp @@ -35,8 +35,8 @@ class ReadFileInputStreamTest : public testing::Test { }; TEST_F(ReadFileInputStreamTest, LocalReadFile) { - auto tempFile = ::exec::test::TempFilePath::create(); - const auto& filename = tempFile->path; + auto tempFile = exec::test::TempFilePath::create(); + const auto& filename = tempFile->path(); remove(filename.c_str()); { LocalWriteFile writeFile(filename); diff --git a/velox/dwio/dwrf/test/CacheInputTest.cpp b/velox/dwio/dwrf/test/CacheInputTest.cpp index 85e8663eb3a86..23cee653e05ab 100644 --- a/velox/dwio/dwrf/test/CacheInputTest.cpp +++ b/velox/dwio/dwrf/test/CacheInputTest.cpp @@ -78,7 +78,7 @@ class CacheTest : public testing::Test { FLAGS_ssd_odirect = false; tempDirectory_ = exec::test::TempDirectoryPath::create(); ssd = std::make_unique( - fmt::format("{}/cache", tempDirectory_->path), + fmt::format("{}/cache", tempDirectory_->path()), ssdBytes, 1, executor_.get()); diff --git a/velox/examples/ScanAndSort.cpp b/velox/examples/ScanAndSort.cpp index ec73732c2d587..0f46019a051a6 100644 --- a/velox/examples/ScanAndSort.cpp +++ b/velox/examples/ScanAndSort.cpp @@ -168,7 +168,7 @@ int main(int argc, char** argv) { // HiveConnectorSplit for each file, using the same HiveConnector id defined // above, the local file path (the "file:" prefix specifies which FileSystem // to use; local, in this case), and the file format (DWRF/ORC). - for (auto& filePath : fs::directory_iterator(tempDir->path)) { + for (auto& filePath : fs::directory_iterator(tempDir->path())) { auto connectorSplit = std::make_shared( kHiveConnectorId, "file:" + filePath.path().string(), diff --git a/velox/exec/fuzzer/AggregationFuzzer.cpp b/velox/exec/fuzzer/AggregationFuzzer.cpp index 89ea2ac08fbb0..8add3432715e4 100644 --- a/velox/exec/fuzzer/AggregationFuzzer.cpp +++ b/velox/exec/fuzzer/AggregationFuzzer.cpp @@ -741,7 +741,7 @@ bool AggregationFuzzer::verifyAggregation( const auto inputRowType = asRowType(input[0]->type()); if (isTableScanSupported(inputRowType) && vectorFuzzer_.coinToss(0.5)) { - auto splits = makeSplits(input, directory->path); + auto splits = makeSplits(input, directory->path()); std::vector tableScanPlans; makeAlternativePlansWithTableScan( @@ -856,7 +856,7 @@ bool AggregationFuzzer::verifySortedAggregation( const auto inputRowType = asRowType(input[0]->type()); if (isTableScanSupported(inputRowType)) { directory = exec::test::TempDirectoryPath::create(); - auto splits = makeSplits(input, directory->path); + auto splits = makeSplits(input, directory->path()); plans.push_back( {PlanBuilder() @@ -1160,7 +1160,7 @@ bool AggregationFuzzer::verifyDistinctAggregation( const auto inputRowType = asRowType(input[0]->type()); if (isTableScanSupported(inputRowType) && vectorFuzzer_.coinToss(0.5)) { directory = exec::test::TempDirectoryPath::create(); - auto splits = makeSplits(input, directory->path); + auto splits = makeSplits(input, directory->path()); plans.push_back( {PlanBuilder() diff --git a/velox/exec/fuzzer/AggregationFuzzerBase.cpp b/velox/exec/fuzzer/AggregationFuzzerBase.cpp index d826f6fbc2320..397318d576104 100644 --- a/velox/exec/fuzzer/AggregationFuzzerBase.cpp +++ b/velox/exec/fuzzer/AggregationFuzzerBase.cpp @@ -405,7 +405,7 @@ velox::test::ResultOrError AggregationFuzzerBase::execute( int32_t spillPct{0}; if (injectSpill) { spillDirectory = exec::test::TempDirectoryPath::create(); - builder.spillDirectory(spillDirectory->path) + builder.spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kAggregationSpillEnabled, "true") .config(core::QueryConfig::kMaxSpillRunRows, randInt(32, 1L << 30)); diff --git a/velox/exec/fuzzer/WindowFuzzer.cpp b/velox/exec/fuzzer/WindowFuzzer.cpp index 3d73cbaa676eb..9d00ed54efb88 100644 --- a/velox/exec/fuzzer/WindowFuzzer.cpp +++ b/velox/exec/fuzzer/WindowFuzzer.cpp @@ -333,7 +333,7 @@ void WindowFuzzer::testAlternativePlans( auto directory = exec::test::TempDirectoryPath::create(); const auto inputRowType = asRowType(input[0]->type()); if (isTableScanSupported(inputRowType)) { - auto splits = makeSplits(input, directory->path); + auto splits = makeSplits(input, directory->path()); plans.push_back( {PlanBuilder() diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index ac721e4420e25..bd1dbeaa3ec3a 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -1130,7 +1130,7 @@ TEST_F(AggregationTest, spillAll) { auto queryCtx = std::make_shared(executor_.get()); TestScopedSpillInjection scopedSpillInjection(100); auto task = AssertQueryBuilder(plan) - .spillDirectory(tempDirectory->path) + .spillDirectory(tempDirectory->path()) .config(QueryConfig::kSpillEnabled, true) .config(QueryConfig::kAggregationSpillEnabled, true) .assertResults(results); @@ -1581,7 +1581,7 @@ TEST_F(AggregationTest, outputBatchSizeCheckWithSpill) { TestScopedSpillInjection scopedSpillInjection(100); auto task = AssertQueryBuilder(duckDbQueryRunner_) - .spillDirectory(tempDirectory->path) + .spillDirectory(tempDirectory->path()) .config(QueryConfig::kSpillEnabled, true) .config(QueryConfig::kAggregationSpillEnabled, true) .config( @@ -1624,7 +1624,7 @@ TEST_F(AggregationTest, spillDuringOutputProcessing) { TestScopedSpillInjection scopedSpillInjection(100); auto task = AssertQueryBuilder(duckDbQueryRunner_) - .spillDirectory(tempDirectory->path) + .spillDirectory(tempDirectory->path()) .config(QueryConfig::kSpillEnabled, true) .config(QueryConfig::kAggregationSpillEnabled, true) // Set very large output buffer size, the number of output rows is @@ -1765,7 +1765,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, minSpillableMemoryReservation) { auto spillDirectory = exec::test::TempDirectoryPath::create(); auto task = AssertQueryBuilder(duckDbQueryRunner_) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(QueryConfig::kSpillEnabled, true) .config(QueryConfig::kAggregationSpillEnabled, true) .config( @@ -1791,7 +1791,7 @@ TEST_F(AggregationTest, distinctWithSpilling) { core::PlanNodeId aggrNodeId; TestScopedSpillInjection scopedSpillInjection(100); auto task = AssertQueryBuilder(duckDbQueryRunner_) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(QueryConfig::kSpillEnabled, true) .config(QueryConfig::kAggregationSpillEnabled, true) .plan(PlanBuilder() @@ -1815,7 +1815,7 @@ TEST_F(AggregationTest, spillingForAggrsWithDistinct) { TestScopedSpillInjection scopedSpillInjection(100); auto task = AssertQueryBuilder(duckDbQueryRunner_) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(QueryConfig::kSpillEnabled, true) .config(QueryConfig::kAggregationSpillEnabled, true) .plan(PlanBuilder() @@ -1843,7 +1843,7 @@ TEST_F(AggregationTest, spillingForAggrsWithSorting) { SCOPED_TRACE(sql); TestScopedSpillInjection scopedSpillInjection(100); auto task = AssertQueryBuilder(duckDbQueryRunner_) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(QueryConfig::kSpillEnabled, true) .config(QueryConfig::kAggregationSpillEnabled, true) .plan(plan) @@ -1889,7 +1889,7 @@ TEST_F(AggregationTest, preGroupedAggregationWithSpilling) { TestScopedSpillInjection scopedSpillInjection(100); auto task = AssertQueryBuilder(duckDbQueryRunner_) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(QueryConfig::kSpillEnabled, true) .config(QueryConfig::kAggregationSpillEnabled, true) .plan(PlanBuilder() @@ -2044,7 +2044,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) { .singleAggregation({"c0", "c1"}, {"array_agg(c2)"}) .planNode()) .queryCtx(queryCtx) - .spillDirectory(tempDirectory->path) + .spillDirectory(tempDirectory->path()) .config(QueryConfig::kSpillEnabled, true) .config(QueryConfig::kAggregationSpillEnabled, true) .maxDrivers(1) @@ -2184,7 +2184,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringReserve) { .singleAggregation({"c0", "c1"}, {"array_agg(c2)"}) .planNode()) .queryCtx(queryCtx) - .spillDirectory(tempDirectory->path) + .spillDirectory(tempDirectory->path()) .config(QueryConfig::kSpillEnabled, true) .config(QueryConfig::kAggregationSpillEnabled, true) .maxDrivers(1) @@ -2300,7 +2300,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringAllocation) { .singleAggregation({"c0", "c1"}, {"array_agg(c2)"}) .planNode()) .queryCtx(queryCtx) - .spillDirectory(tempDirectory->path) + .spillDirectory(tempDirectory->path()) .config(QueryConfig::kSpillEnabled, true) .config(QueryConfig::kAggregationSpillEnabled, true) .maxDrivers(1) @@ -2414,7 +2414,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringOutputProcessing) { .singleAggregation({"c0", "c1"}, {"array_agg(c2)"}) .planNode()) .queryCtx(queryCtx) - .spillDirectory(tempDirectory->path) + .spillDirectory(tempDirectory->path()) .config(QueryConfig::kSpillEnabled, true) .config(QueryConfig::kAggregationSpillEnabled, true) .maxDrivers(1) @@ -2588,7 +2588,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringNonReclaimableSection) { .capturePlanNodeId(aggregationPlanNodeId) .planNode()) .queryCtx(queryCtx) - .spillDirectory(tempDirectory->path) + .spillDirectory(tempDirectory->path()) .config(QueryConfig::kSpillEnabled, true) .config(QueryConfig::kAggregationSpillEnabled, true) .maxDrivers(1) @@ -2704,7 +2704,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimWithEmptyAggregationTable) { AssertQueryBuilder(nullptr) .plan(aggregationPlan) .queryCtx(queryCtx) - .spillDirectory(tempDirectory->path) + .spillDirectory(tempDirectory->path()) .config(QueryConfig::kSpillEnabled, true) .config(QueryConfig::kAggregationSpillEnabled, true) .maxDrivers(1) @@ -3032,7 +3032,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimEmptyInput) { .capturePlanNodeId(aggNodeId) .planNode(), duckDbQueryRunner_) - .spillDirectory(tempDirectory->path) + .spillDirectory(tempDirectory->path()) .queryCtx(queryCtx) .config(QueryConfig::kSpillEnabled, true) .config(QueryConfig::kAggregationSpillEnabled, true) @@ -3097,7 +3097,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimEmptyOutput) { .singleAggregation({"c0", "c1"}, {"array_agg(c2)"}) .capturePlanNodeId(aggNodeId) .planNode()) - .spillDirectory(tempDirectory->path) + .spillDirectory(tempDirectory->path()) .queryCtx(queryCtx) .config(QueryConfig::kSpillEnabled, true) .config(QueryConfig::kAggregationSpillEnabled, true) @@ -3141,7 +3141,7 @@ TEST_F(AggregationTest, maxSpillBytes) { try { TestScopedSpillInjection scopedSpillInjection(100); AssertQueryBuilder(plan) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .queryCtx(queryCtx) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kAggregationSpillEnabled, true) @@ -3186,7 +3186,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimFromAggregation) { core::PlanNodeId aggrNodeId; auto task = AssertQueryBuilder(duckDbQueryRunner_) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kAggregationSpillEnabled, true) .config( @@ -3233,7 +3233,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimFromDistinctAggregation) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); core::PlanNodeId aggrNodeId; auto task = AssertQueryBuilder(duckDbQueryRunner_) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kAggregationSpillEnabled, true) .config( @@ -3305,7 +3305,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimFromAggregationOnNoMoreInput) { std::thread aggregationThread([&]() { auto task = AssertQueryBuilder(duckDbQueryRunner_) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kAggregationSpillEnabled, true) .queryCtx(aggregationQueryCtx) @@ -3389,7 +3389,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimFromAggregationDuringOutput) { std::thread aggregationThread([&]() { auto task = AssertQueryBuilder(duckDbQueryRunner_) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kAggregationSpillEnabled, true) .config( diff --git a/velox/exec/tests/GroupedExecutionTest.cpp b/velox/exec/tests/GroupedExecutionTest.cpp index ed4c8c2647429..2fea9c47b174e 100644 --- a/velox/exec/tests/GroupedExecutionTest.cpp +++ b/velox/exec/tests/GroupedExecutionTest.cpp @@ -180,7 +180,7 @@ TEST_F(GroupedExecutionTest, groupedExecutionWithOutputBuffer) { // Create source file - we will read from it in 6 splits. auto vectors = makeVectors(10, 1'000); auto filePath = TempFilePath::create(); - writeToFile(filePath->path, vectors); + writeToFile(filePath->path(), vectors); // A chain of three pipelines separated by local exchange with the leaf one // having scan running grouped execution - this will make all three pipelines @@ -211,7 +211,7 @@ TEST_F(GroupedExecutionTest, groupedExecutionWithOutputBuffer) { EXPECT_EQ(0, task->numRunningDrivers()); // Add one split for group (8). - task->addSplit("0", makeHiveSplitWithGroup(filePath->path, 8)); + task->addSplit("0", makeHiveSplitWithGroup(filePath->path(), 8)); // Only one split group should be in the processing mode, so 9 drivers (3 per // pipeline). @@ -219,11 +219,11 @@ TEST_F(GroupedExecutionTest, groupedExecutionWithOutputBuffer) { EXPECT_EQ(std::unordered_set{}, getCompletedSplitGroups(task)); // Add the rest of splits - task->addSplit("0", makeHiveSplitWithGroup(filePath->path, 1)); - task->addSplit("0", makeHiveSplitWithGroup(filePath->path, 5)); - task->addSplit("0", makeHiveSplitWithGroup(filePath->path, 8)); - task->addSplit("0", makeHiveSplitWithGroup(filePath->path, 5)); - task->addSplit("0", makeHiveSplitWithGroup(filePath->path, 8)); + task->addSplit("0", makeHiveSplitWithGroup(filePath->path(), 1)); + task->addSplit("0", makeHiveSplitWithGroup(filePath->path(), 5)); + task->addSplit("0", makeHiveSplitWithGroup(filePath->path(), 8)); + task->addSplit("0", makeHiveSplitWithGroup(filePath->path(), 5)); + task->addSplit("0", makeHiveSplitWithGroup(filePath->path(), 8)); // One split group should be in the processing mode, so 9 drivers. EXPECT_EQ(9, task->numRunningDrivers()); @@ -289,7 +289,7 @@ DEBUG_ONLY_TEST_F( // Create source file to read as split input. auto vectors = makeVectors(24, 20); auto filePath = TempFilePath::create(); - writeToFile(filePath->path, vectors); + writeToFile(filePath->path(), vectors); const int numDriversPerGroup{3}; @@ -400,7 +400,7 @@ DEBUG_ONLY_TEST_F( "0", std::move(planFragment), 0, std::move(queryCtx)); const auto spillDirectory = exec::test::TempDirectoryPath::create(); if (testData.enableSpill) { - task->setSpillDirectory(spillDirectory->path); + task->setSpillDirectory(spillDirectory->path()); } // 'numDriversPerGroup' drivers max to execute one group at a time. @@ -409,17 +409,19 @@ DEBUG_ONLY_TEST_F( // Add split(s) to the build scan. if (testData.mixedExecutionMode) { - task->addSplit(buildScanNodeId, makeHiveSplit(filePath->path)); + task->addSplit(buildScanNodeId, makeHiveSplit(filePath->path())); } else { task->addSplit( - buildScanNodeId, makeHiveSplitWithGroup(filePath->path, 0)); + buildScanNodeId, makeHiveSplitWithGroup(filePath->path(), 0)); task->addSplit( - buildScanNodeId, makeHiveSplitWithGroup(filePath->path, 1)); + buildScanNodeId, makeHiveSplitWithGroup(filePath->path(), 1)); } // Add one split for probe split group (0). - task->addSplit(probeScanNodeId, makeHiveSplitWithGroup(filePath->path, 0)); + task->addSplit( + probeScanNodeId, makeHiveSplitWithGroup(filePath->path(), 0)); // Add one split for probe split group (1). - task->addSplit(probeScanNodeId, makeHiveSplitWithGroup(filePath->path, 1)); + task->addSplit( + probeScanNodeId, makeHiveSplitWithGroup(filePath->path(), 1)); // Finalize the build split(s). if (testData.mixedExecutionMode) { @@ -458,7 +460,7 @@ TEST_F(GroupedExecutionTest, groupedExecutionWithHashAndNestedLoopJoin) { // Create source file - we will read from it in 6 splits. auto vectors = makeVectors(4, 20); auto filePath = TempFilePath::create(); - writeToFile(filePath->path, vectors); + writeToFile(filePath->path(), vectors); // Run the test twice - for Hash and Cross Join. for (size_t i = 0; i < 2; ++i) { @@ -520,10 +522,11 @@ TEST_F(GroupedExecutionTest, groupedExecutionWithHashAndNestedLoopJoin) { EXPECT_EQ(3, task->numRunningDrivers()); // Add single split to the build scan. - task->addSplit(buildScanNodeId, makeHiveSplit(filePath->path)); + task->addSplit(buildScanNodeId, makeHiveSplit(filePath->path())); // Add one split for group (8). - task->addSplit(probeScanNodeId, makeHiveSplitWithGroup(filePath->path, 8)); + task->addSplit( + probeScanNodeId, makeHiveSplitWithGroup(filePath->path(), 8)); // Only one split group should be in the processing mode, so 9 drivers (3 // per pipeline) grouped + 3 ungrouped. @@ -531,11 +534,16 @@ TEST_F(GroupedExecutionTest, groupedExecutionWithHashAndNestedLoopJoin) { EXPECT_EQ(std::unordered_set{}, getCompletedSplitGroups(task)); // Add the rest of splits - task->addSplit(probeScanNodeId, makeHiveSplitWithGroup(filePath->path, 1)); - task->addSplit(probeScanNodeId, makeHiveSplitWithGroup(filePath->path, 5)); - task->addSplit(probeScanNodeId, makeHiveSplitWithGroup(filePath->path, 8)); - task->addSplit(probeScanNodeId, makeHiveSplitWithGroup(filePath->path, 5)); - task->addSplit(probeScanNodeId, makeHiveSplitWithGroup(filePath->path, 8)); + task->addSplit( + probeScanNodeId, makeHiveSplitWithGroup(filePath->path(), 1)); + task->addSplit( + probeScanNodeId, makeHiveSplitWithGroup(filePath->path(), 5)); + task->addSplit( + probeScanNodeId, makeHiveSplitWithGroup(filePath->path(), 8)); + task->addSplit( + probeScanNodeId, makeHiveSplitWithGroup(filePath->path(), 5)); + task->addSplit( + probeScanNodeId, makeHiveSplitWithGroup(filePath->path(), 8)); // One split group should be in the processing mode, so 9 drivers (3 per // pipeline) grouped + 3 ungrouped. @@ -621,7 +629,7 @@ TEST_F(GroupedExecutionTest, groupedExecution) { const size_t numSplits{6}; auto vectors = makeVectors(10, 1'000); auto filePath = TempFilePath::create(); - writeToFile(filePath->path, vectors); + writeToFile(filePath->path(), vectors); CursorParameters params; params.planNode = tableScanNode(ROW({}, {})); @@ -641,7 +649,7 @@ TEST_F(GroupedExecutionTest, groupedExecution) { auto task = cursor->task(); // Add one splits before start to ensure we can handle such cases. - task->addSplit("0", makeHiveSplitWithGroup(filePath->path, 8)); + task->addSplit("0", makeHiveSplitWithGroup(filePath->path(), 8)); // Start task now. cursor->start(); @@ -651,11 +659,11 @@ TEST_F(GroupedExecutionTest, groupedExecution) { EXPECT_EQ(std::unordered_set{}, getCompletedSplitGroups(task)); // Add the rest of splits - task->addSplit("0", makeHiveSplitWithGroup(filePath->path, 1)); - task->addSplit("0", makeHiveSplitWithGroup(filePath->path, 5)); - task->addSplit("0", makeHiveSplitWithGroup(filePath->path, 8)); - task->addSplit("0", makeHiveSplitWithGroup(filePath->path, 5)); - task->addSplit("0", makeHiveSplitWithGroup(filePath->path, 8)); + task->addSplit("0", makeHiveSplitWithGroup(filePath->path(), 1)); + task->addSplit("0", makeHiveSplitWithGroup(filePath->path(), 5)); + task->addSplit("0", makeHiveSplitWithGroup(filePath->path(), 8)); + task->addSplit("0", makeHiveSplitWithGroup(filePath->path(), 5)); + task->addSplit("0", makeHiveSplitWithGroup(filePath->path(), 8)); // Only two split groups should be in the processing mode, so 4 drivers. EXPECT_EQ(4, task->numRunningDrivers()); @@ -714,7 +722,7 @@ TEST_F(GroupedExecutionTest, allGroupSplitsReceivedBeforeTaskStart) { const size_t numSplits{6}; auto vectors = makeVectors(10, 1'000); auto filePath = TempFilePath::create(); - writeToFile(filePath->path, vectors); + writeToFile(filePath->path(), vectors); CursorParameters params; params.planNode = tableScanNode(ROW({}, {})); @@ -729,12 +737,12 @@ TEST_F(GroupedExecutionTest, allGroupSplitsReceivedBeforeTaskStart) { auto task = cursor->task(); // Add all split groups before start to ensure we can handle such cases. - task->addSplit("0", makeHiveSplitWithGroup(filePath->path, 0)); - task->addSplit("0", makeHiveSplitWithGroup(filePath->path, 1)); - task->addSplit("0", makeHiveSplitWithGroup(filePath->path, 2)); - task->addSplit("0", makeHiveSplitWithGroup(filePath->path, 0)); - task->addSplit("0", makeHiveSplitWithGroup(filePath->path, 1)); - task->addSplit("0", makeHiveSplitWithGroup(filePath->path, 2)); + task->addSplit("0", makeHiveSplitWithGroup(filePath->path(), 0)); + task->addSplit("0", makeHiveSplitWithGroup(filePath->path(), 1)); + task->addSplit("0", makeHiveSplitWithGroup(filePath->path(), 2)); + task->addSplit("0", makeHiveSplitWithGroup(filePath->path(), 0)); + task->addSplit("0", makeHiveSplitWithGroup(filePath->path(), 1)); + task->addSplit("0", makeHiveSplitWithGroup(filePath->path(), 2)); task->noMoreSplits("0"); // Start task now. diff --git a/velox/exec/tests/HashJoinBridgeTest.cpp b/velox/exec/tests/HashJoinBridgeTest.cpp index 1f825418a4220..e7e39b547504c 100644 --- a/velox/exec/tests/HashJoinBridgeTest.cpp +++ b/velox/exec/tests/HashJoinBridgeTest.cpp @@ -112,13 +112,13 @@ class HashJoinBridgeTest : public testing::Test, static uint32_t fakeFileId{0}; SpillFiles files; files.reserve(numFiles); - const std::string filePathPrefix = tempDir_->path + "/Spill"; + const std::string filePathPrefix = tempDir_->path() + "/Spill"; for (int32_t i = 0; i < numFiles; ++i) { const auto fileId = fakeFileId; files.push_back( {fileId, rowType_, - tempDir_->path + "/Spill_" + std::to_string(fileId), + tempDir_->path() + "/Spill_" + std::to_string(fileId), 1024, 1, std::vector({}), diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index b859f86351b64..d4807be9a4c94 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -617,7 +617,7 @@ class HashJoinBuilder { int32_t spillPct{0}; if (injectSpill) { spillDirectory = exec::test::TempDirectoryPath::create(); - builder.spillDirectory(spillDirectory->path); + builder.spillDirectory(spillDirectory->path()); config(core::QueryConfig::kSpillEnabled, "true"); config(core::QueryConfig::kMaxSpillLevel, std::to_string(maxSpillLevel)); config(core::QueryConfig::kJoinSpillEnabled, "true"); @@ -5046,7 +5046,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, buildReservationReleaseCheck) { // only gets executed when spilling is enabled. We don't care about if // spilling is really triggered in test or not. auto spillDirectory = exec::test::TempDirectoryPath::create(); - params.spillDirectory = spillDirectory->path; + params.spillDirectory = spillDirectory->path(); params.queryCtx->testingOverrideConfigUnsafe( {{core::QueryConfig::kSpillEnabled, "true"}, {core::QueryConfig::kMaxSpillLevel, "0"}}); @@ -6383,7 +6383,7 @@ TEST_F(HashJoinTest, maxSpillBytes) { try { TestScopedSpillInjection scopedSpillInjection(100); AssertQueryBuilder(plan) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .queryCtx(queryCtx) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kJoinSpillEnabled, true) @@ -6439,7 +6439,7 @@ TEST_F(HashJoinTest, onlyHashBuildMaxSpillBytes) { try { TestScopedSpillInjection scopedSpillInjection(100); AssertQueryBuilder(plan) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .queryCtx(queryCtx) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kJoinSpillEnabled, true) @@ -6784,7 +6784,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, arbitrationTriggeredByEnsureJoinTableFit) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); auto task = AssertQueryBuilder(duckDbQueryRunner_) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kJoinSpillEnabled, true) .config(core::QueryConfig::kSpillNumPartitionBits, 2) @@ -6855,7 +6855,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringJoinTableBuild) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); auto task = AssertQueryBuilder(duckDbQueryRunner_) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kJoinSpillEnabled, true) .config(core::QueryConfig::kSpillNumPartitionBits, 2) @@ -6962,7 +6962,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, joinBuildSpillError) { VELOX_ASSERT_THROW( AssertQueryBuilder(plan) .queryCtx(joinQueryCtx) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, true) .copyResults(pool()), injectedErrorMsg); @@ -7137,7 +7137,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, hashProbeSpill) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) .numDrivers(1) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .probeKeys({"t_k1"}) .probeVectors(std::move(probeVectors)) .buildKeys({"u_k1"}) @@ -7193,7 +7193,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, hashProbeSpillInMiddeOfLastOutputProcessing) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) .numDrivers(1) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .probeKeys({"t_k1"}) .probeVectors(std::move(probeVectors)) .buildKeys({"u_k1"}) @@ -7266,7 +7266,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, hashProbeSpillInMiddeOfOutputProcessing) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) .numDrivers(1) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .probeKeys({"t_k1"}) .probeVectors(std::move(probeVectors)) .buildKeys({"u_k1"}) @@ -7324,7 +7324,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, hashProbeSpillWhenOneOfProbeFinish) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) .numDrivers(numDrivers, true, true) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .keyTypes({BIGINT()}) .probeVectors(32, 5) .buildVectors(32, 5) @@ -7369,7 +7369,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, hashProbeSpillExceedLimit) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) .numDrivers(1) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .probeKeys({"t_k1"}) .probeVectors(std::move(probeVectors)) .buildKeys({"u_k1"}) @@ -7436,7 +7436,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, hashProbeSpillUnderNonReclaimableSection) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) .numDrivers(1) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .keyTypes({BIGINT()}) .probeVectors(32, 5) .buildVectors(32, 5) @@ -7491,7 +7491,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, spillOutputWithRightSemiJoins) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) .numDrivers(1) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .probeType(probeType_) .probeVectors(128, 3) .probeKeys({"t_k1"}) @@ -7613,7 +7613,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, spillCheckOnLeftSemiFilterWithDynamicFilters) { HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) .planNode(std::move(op)) .makeInputSplits(makeInputSplits(probeScanId)) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .injectSpill(false) .referenceQuery( "SELECT t.c0, t.c1 + 1 FROM t WHERE t.c0 IN (SELECT c0 FROM u)") diff --git a/velox/exec/tests/JoinFuzzer.cpp b/velox/exec/tests/JoinFuzzer.cpp index 45b4faf775109..170732172b11d 100644 --- a/velox/exec/tests/JoinFuzzer.cpp +++ b/velox/exec/tests/JoinFuzzer.cpp @@ -432,7 +432,7 @@ RowVectorPtr JoinFuzzer::execute(const PlanWithSplits& plan, bool injectSpill) { spillDirectory = exec::test::TempDirectoryPath::create(); builder.config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kAggregationSpillEnabled, "true") - .spillDirectory(spillDirectory->path); + .spillDirectory(spillDirectory->path()); spillPct = 10; } @@ -982,7 +982,7 @@ void JoinFuzzer::verify(core::JoinType joinType) { const auto tableScanDir = exec::test::TempDirectoryPath::create(); addPlansWithTableScan( - tableScanDir->path, + tableScanDir->path(), joinType, nullAware, probeKeys, diff --git a/velox/exec/tests/OrderByTest.cpp b/velox/exec/tests/OrderByTest.cpp index a269ab2cb09bd..926ebba3a7216 100644 --- a/velox/exec/tests/OrderByTest.cpp +++ b/velox/exec/tests/OrderByTest.cpp @@ -205,7 +205,7 @@ class OrderByTest : public OperatorTestBase { CursorParameters params; params.planNode = planNode; params.queryCtx = queryCtx; - params.spillDirectory = spillDirectory->path; + params.spillDirectory = spillDirectory->path(); auto task = assertQueryOrdered(params, duckDbSql, sortingKeys); auto inputRows = toPlanStats(task->taskStats()).at(orderById).inputRows; const uint64_t peakSpillMemoryUsage = @@ -509,7 +509,7 @@ TEST_F(OrderByTest, spill) { auto spillDirectory = exec::test::TempDirectoryPath::create(); TestScopedSpillInjection scopedSpillInjection(100); auto task = AssertQueryBuilder(plan) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kOrderBySpillEnabled, true) .assertResults(expectedResult); @@ -566,7 +566,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringInputProcessing) { for (const auto& testData : testSettings) { SCOPED_TRACE(testData.debugString()); - auto tempDirectory = exec::test::TempDirectoryPath::create(); + auto spillDirectory = exec::test::TempDirectoryPath::create(); auto queryCtx = std::make_shared(executor_.get()); queryCtx->testingOverrideMemoryPool(memory::memoryManager()->addRootPool( queryCtx->queryId(), kMaxBytes, memory::MemoryReclaimer::create())); @@ -626,7 +626,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringInputProcessing) { .orderBy({fmt::format("{} ASC NULLS LAST", "c0")}, false) .planNode()) .queryCtx(queryCtx) - .spillDirectory(tempDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kOrderBySpillEnabled, true) .maxDrivers(1) @@ -705,7 +705,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringReserve) { batches.push_back(fuzzer.fuzzRow(rowType)); } - auto tempDirectory = exec::test::TempDirectoryPath::create(); + auto spillDirectory = exec::test::TempDirectoryPath::create(); auto queryCtx = std::make_shared(executor_.get()); queryCtx->testingOverrideMemoryPool(memory::memoryManager()->addRootPool( queryCtx->queryId(), kMaxBytes, memory::MemoryReclaimer::create())); @@ -765,7 +765,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringReserve) { .orderBy({fmt::format("{} ASC NULLS LAST", "c0")}, false) .planNode()) .queryCtx(queryCtx) - .spillDirectory(tempDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kOrderBySpillEnabled, true) .maxDrivers(1) @@ -818,7 +818,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringAllocation) { const std::vector enableSpillings = {false, true}; for (const auto enableSpilling : enableSpillings) { SCOPED_TRACE(fmt::format("enableSpilling {}", enableSpilling)); - auto tempDirectory = exec::test::TempDirectoryPath::create(); + auto spillDirectory = exec::test::TempDirectoryPath::create(); auto queryCtx = std::make_shared(executor_.get()); queryCtx->testingOverrideMemoryPool( memory::memoryManager()->addRootPool(queryCtx->queryId(), kMaxBytes)); @@ -883,7 +883,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringAllocation) { .orderBy({fmt::format("{} ASC NULLS LAST", "c0")}, false) .planNode()) .queryCtx(queryCtx) - .spillDirectory(tempDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kOrderBySpillEnabled, true) .maxDrivers(1) @@ -948,7 +948,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringOutputProcessing) { const std::vector enableSpillings = {false, true}; for (const auto enableSpilling : enableSpillings) { SCOPED_TRACE(fmt::format("enableSpilling {}", enableSpilling)); - auto tempDirectory = exec::test::TempDirectoryPath::create(); + auto spillDirectory = exec::test::TempDirectoryPath::create(); auto queryCtx = std::make_shared(executor_.get()); queryCtx->testingOverrideMemoryPool(memory::memoryManager()->addRootPool( queryCtx->queryId(), kMaxBytes, memory::MemoryReclaimer::create())); @@ -1000,7 +1000,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringOutputProcessing) { .orderBy({fmt::format("{} ASC NULLS LAST", "c0")}, false) .planNode()) .queryCtx(queryCtx) - .spillDirectory(tempDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kOrderBySpillEnabled, true) .maxDrivers(1) @@ -1226,7 +1226,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, spillWithNoMoreOutput) { auto spillDirectory = exec::test::TempDirectoryPath::create(); auto task = AssertQueryBuilder(plan) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kOrderBySpillEnabled, true) // Set output buffer size to extreme large to read all the @@ -1272,7 +1272,7 @@ TEST_F(OrderByTest, maxSpillBytes) { try { TestScopedSpillInjection scopedSpillInjection(100); AssertQueryBuilder(plan) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .queryCtx(queryCtx) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kOrderBySpillEnabled, true) @@ -1341,7 +1341,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimFromOrderBy) { std::thread orderByThread([&]() { auto task = AssertQueryBuilder(duckDbQueryRunner_) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kOrderBySpillEnabled, true) .queryCtx(orderByQueryCtx) @@ -1409,7 +1409,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimFromEmptyOrderBy) { std::thread orderByThread([&]() { auto task = AssertQueryBuilder(duckDbQueryRunner_) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kOrderBySpillEnabled, true) .queryCtx(orderByQueryCtx) diff --git a/velox/exec/tests/RowNumberTest.cpp b/velox/exec/tests/RowNumberTest.cpp index 6f76dbf2bd8fd..df4f0b025f944 100644 --- a/velox/exec/tests/RowNumberTest.cpp +++ b/velox/exec/tests/RowNumberTest.cpp @@ -211,7 +211,7 @@ TEST_F(RowNumberTest, spill) { core::PlanNodeId rowNumberPlanNodeId; auto task = AssertQueryBuilder(duckDbQueryRunner_) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kRowNumberSpillEnabled, true) .config( @@ -273,7 +273,7 @@ TEST_F(RowNumberTest, maxSpillBytes) { try { TestScopedSpillInjection scopedSpillInjection(100); AssertQueryBuilder(plan) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .queryCtx(queryCtx) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kRowNumberSpillEnabled, true) @@ -315,11 +315,11 @@ TEST_F(RowNumberTest, memoryUsage) { std::shared_ptr task; TestScopedSpillInjection scopedSpillInjection(100); AssertQueryBuilder(plan) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .queryCtx(queryCtx) .config(core::QueryConfig::kSpillEnabled, spillEnableConfig) .config(core::QueryConfig::kRowNumberSpillEnabled, spillEnableConfig) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .copyResults(pool_.get(), task); if (spillEnable) { diff --git a/velox/exec/tests/SortBufferTest.cpp b/velox/exec/tests/SortBufferTest.cpp index 771b16cad06df..b5158136c096e 100644 --- a/velox/exec/tests/SortBufferTest.cpp +++ b/velox/exec/tests/SortBufferTest.cpp @@ -286,7 +286,7 @@ TEST_F(SortBufferTest, batchOutput) { SCOPED_TRACE(testData.debugString()); auto spillDirectory = exec::test::TempDirectoryPath::create(); auto spillConfig = common::SpillConfig( - [&]() -> const std::string& { return spillDirectory->path; }, + [&]() -> const std::string& { return spillDirectory->path(); }, [&](uint64_t) {}, "0.0.0", 1000, @@ -380,7 +380,7 @@ TEST_F(SortBufferTest, spill) { auto spillableReservationGrowthPct = testData.memoryReservationFailure ? 100000 : 100; auto spillConfig = common::SpillConfig( - [&]() -> const std::string& { return spillDirectory->path; }, + [&]() -> const std::string& { return spillDirectory->path(); }, [&](uint64_t) {}, "0.0.0", 1000, @@ -454,7 +454,7 @@ TEST_F(SortBufferTest, emptySpill) { for (bool hasPostSpillData : {false, true}) { SCOPED_TRACE(fmt::format("hasPostSpillData {}", hasPostSpillData)); auto spillDirectory = exec::test::TempDirectoryPath::create(); - auto spillConfig = getSpillConfig(spillDirectory->path); + auto spillConfig = getSpillConfig(spillDirectory->path()); folly::Synchronized spillStats; auto sortBuffer = std::make_unique( inputType_, diff --git a/velox/exec/tests/SpillTest.cpp b/velox/exec/tests/SpillTest.cpp index d7310ad99058a..8b2de7e77f69e 100644 --- a/velox/exec/tests/SpillTest.cpp +++ b/velox/exec/tests/SpillTest.cpp @@ -157,7 +157,7 @@ class SpillTest : public ::testing::TestWithParam, // partitions produce an ascending sequence of integers without gaps. spillStats_.wlock()->reset(); state_ = std::make_unique( - [&]() -> const std::string& { return tempDir_->path; }, + [&]() -> const std::string { return tempDir_->path(); }, updateSpilledBytesCb_, fileNamePrefix_, numPartitions, @@ -337,7 +337,7 @@ class SpillTest : public ::testing::TestWithParam, ASSERT_EQ(expectedNumSpilledFiles, spilledFileSet.size()); // Verify the spilled file exist on file system. std::shared_ptr fs = - filesystems::getFileSystem(tempDir_->path, nullptr); + filesystems::getFileSystem(tempDir_->path(), nullptr); uint64_t totalFileBytes{0}; for (const auto& spilledFile : spilledFileSet) { auto readFile = fs->openFileForRead(spilledFile); @@ -467,7 +467,7 @@ TEST_P(SpillTest, spillTimestamp) { // read back. auto tempDirectory = exec::test::TempDirectoryPath::create(); std::vector emptyCompareFlags; - const std::string spillPath = tempDirectory->path + "/test"; + const std::string spillPath = tempDirectory->path() + "/test"; std::vector timeValues = { Timestamp{0, 0}, Timestamp{12, 0}, @@ -478,7 +478,7 @@ TEST_P(SpillTest, spillTimestamp) { Timestamp{Timestamp::kMinSeconds, 0}}; SpillState state( - [&]() -> const std::string& { return tempDirectory->path; }, + [&]() -> const std::string { return tempDirectory->path(); }, updateSpilledBytesCb_, "test", 1, @@ -774,13 +774,13 @@ SpillFiles makeFakeSpillFiles(int32_t numFiles) { static uint32_t fakeFileId{0}; SpillFiles files; files.reserve(numFiles); - const std::string filePathPrefix = tempDir->path + "/Spill"; + const std::string filePathPrefix = tempDir->path() + "/Spill"; for (int32_t i = 0; i < numFiles; ++i) { const auto fileId = fakeFileId; files.push_back( {fileId, ROW({"k1", "k2"}, {BIGINT(), BIGINT()}), - tempDir->path + "/Spill_" + std::to_string(fileId), + tempDir->path() + "/Spill_" + std::to_string(fileId), 1024, 1, std::vector({}), diff --git a/velox/exec/tests/SpillerTest.cpp b/velox/exec/tests/SpillerTest.cpp index 5a98684c4c684..f078fe3888901 100644 --- a/velox/exec/tests/SpillerTest.cpp +++ b/velox/exec/tests/SpillerTest.cpp @@ -160,7 +160,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { RowContainerTestBase::SetUp(); rng_.seed(1); tempDirPath_ = exec::test::TempDirectoryPath::create(); - fs_ = filesystems::getFileSystem(tempDirPath_->path, nullptr); + fs_ = filesystems::getFileSystem(tempDirPath_->path(), nullptr); containerType_ = ROW({ {"bool_val", BOOLEAN()}, {"tiny_val", TINYINT()}, @@ -520,10 +520,11 @@ class SpillerTest : public exec::test::RowContainerTestBase { bool makeError, uint64_t maxSpillRunRows = 0) { static const std::string kBadSpillDirPath = "/bad/path"; - common::GetSpillDirectoryPathCB badSpillDirCb = - [&]() -> const std::string& { return kBadSpillDirPath; }; + common::GetSpillDirectoryPathCB badSpillDirCb = [&]() -> const std::string { + return kBadSpillDirPath; + }; common::GetSpillDirectoryPathCB tempSpillDirCb = - [&]() -> const std::string& { return tempDirPath_->path; }; + [&]() -> const std::string { return tempDirPath_->path(); }; stats_.clear(); spillStats_ = folly::Synchronized(); diff --git a/velox/exec/tests/TableWriteTest.cpp b/velox/exec/tests/TableWriteTest.cpp index 23fc416e08ab9..f172ab2e9a067 100644 --- a/velox/exec/tests/TableWriteTest.cpp +++ b/velox/exec/tests/TableWriteTest.cpp @@ -260,7 +260,7 @@ class TableWriteTest : public HiveConnectorTestBase { bool spillEnabled = false) { std::vector splits; for (const auto& filePath : filePaths) { - splits.push_back(exec::Split(makeHiveConnectorSplit(filePath->path))); + splits.push_back(exec::Split(makeHiveConnectorSplit(filePath->path()))); } if (!spillEnabled) { return AssertQueryBuilder(plan, duckDbQueryRunner_) @@ -279,7 +279,7 @@ class TableWriteTest : public HiveConnectorTestBase { const auto spillDirectory = exec::test::TempDirectoryPath::create(); TestScopedSpillInjection scopedSpillInjection(100); return AssertQueryBuilder(plan, duckDbQueryRunner_) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .maxDrivers( 2 * std::max(kNumTableWriterCount, kNumPartitionedTableWriterCount)) .config( @@ -317,7 +317,7 @@ class TableWriteTest : public HiveConnectorTestBase { const auto spillDirectory = exec::test::TempDirectoryPath::create(); TestScopedSpillInjection scopedSpillInjection(100); return AssertQueryBuilder(plan, duckDbQueryRunner_) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .maxDrivers( 2 * std::max(kNumTableWriterCount, kNumPartitionedTableWriterCount)) .config( @@ -350,7 +350,7 @@ class TableWriteTest : public HiveConnectorTestBase { const auto spillDirectory = exec::test::TempDirectoryPath::create(); TestScopedSpillInjection scopedSpillInjection(100); return AssertQueryBuilder(plan, duckDbQueryRunner_) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .maxDrivers( 2 * std::max(kNumTableWriterCount, kNumPartitionedTableWriterCount)) .config( @@ -408,7 +408,7 @@ class TableWriteTest : public HiveConnectorTestBase { std::vector> makeHiveConnectorSplits( const std::shared_ptr& directoryPath) { - return makeHiveConnectorSplits(directoryPath->path); + return makeHiveConnectorSplits(directoryPath->path()); } std::vector> @@ -1057,18 +1057,18 @@ TEST_F(BasicTableWriteTest, roundTrip) { }); auto sourceFilePath = TempFilePath::create(); - writeToFile(sourceFilePath->path, data); + writeToFile(sourceFilePath->path(), data); auto targetDirectoryPath = TempDirectoryPath::create(); auto rowType = asRowType(data->type()); auto plan = PlanBuilder() .tableScan(rowType) - .tableWrite(targetDirectoryPath->path) + .tableWrite(targetDirectoryPath->path()) .planNode(); auto results = AssertQueryBuilder(plan) - .split(makeHiveConnectorSplit(sourceFilePath->path)) + .split(makeHiveConnectorSplit(sourceFilePath->path())) .copyResults(pool()); ASSERT_EQ(2, results->size()); @@ -1098,7 +1098,7 @@ TEST_F(BasicTableWriteTest, roundTrip) { auto copy = AssertQueryBuilder(plan) .split(makeHiveConnectorSplit(fmt::format( - "{}/{}", targetDirectoryPath->path, writeFileName))) + "{}/{}", targetDirectoryPath->path(), writeFileName))) .copyResults(pool()); assertEqualResults({data}, {copy}); } @@ -1483,7 +1483,7 @@ TEST_P(AllTableWriterTest, scanFilterProjectWrite) { auto filePaths = makeFilePaths(5); auto vectors = makeVectors(filePaths.size(), 500); for (int i = 0; i < filePaths.size(); i++) { - writeToFile(filePaths[i]->path, vectors[i]); + writeToFile(filePaths[i]->path(), vectors[i]); } createDuckDbTable(vectors); @@ -1503,7 +1503,7 @@ TEST_P(AllTableWriterTest, scanFilterProjectWrite) { auto plan = createInsertPlan( project, outputType, - outputDirectory->path, + outputDirectory->path(), partitionedBy_, bucketProperty_, compressionKind_, @@ -1523,13 +1523,13 @@ TEST_P(AllTableWriterTest, scanFilterProjectWrite) { PlanBuilder().tableScan(newOutputType).planNode(), makeHiveConnectorSplits(outputDirectory), "SELECT c3, c5, c2 + c3, substr(c5, 1, 1) FROM tmp WHERE c2 <> 0"); - verifyTableWriterOutput(outputDirectory->path, newOutputType, false); + verifyTableWriterOutput(outputDirectory->path(), newOutputType, false); } else { assertQuery( PlanBuilder().tableScan(outputType).planNode(), makeHiveConnectorSplits(outputDirectory), "SELECT c0, c1, c3, c5, c2 + c3, substr(c5, 1, 1) FROM tmp WHERE c2 <> 0"); - verifyTableWriterOutput(outputDirectory->path, outputType, false); + verifyTableWriterOutput(outputDirectory->path(), outputType, false); } } @@ -1537,7 +1537,7 @@ TEST_P(AllTableWriterTest, renameAndReorderColumns) { auto filePaths = makeFilePaths(5); auto vectors = makeVectors(filePaths.size(), 500); for (int i = 0; i < filePaths.size(); ++i) { - writeToFile(filePaths[i]->path, vectors[i]); + writeToFile(filePaths[i]->path(), vectors[i]); } createDuckDbTable(vectors); @@ -1569,7 +1569,7 @@ TEST_P(AllTableWriterTest, renameAndReorderColumns) { PlanBuilder().tableScan(rowType_), inputRowType, tableSchema_, - outputDirectory->path, + outputDirectory->path(), partitionedBy_, bucketProperty_, compressionKind_, @@ -1586,14 +1586,14 @@ TEST_P(AllTableWriterTest, renameAndReorderColumns) { makeHiveConnectorSplits(outputDirectory), "SELECT c2, c5, c4, c3 FROM tmp"); - verifyTableWriterOutput(outputDirectory->path, newOutputType, false); + verifyTableWriterOutput(outputDirectory->path(), newOutputType, false); } else { HiveConnectorTestBase::assertQuery( PlanBuilder().tableScan(tableSchema_).planNode(), makeHiveConnectorSplits(outputDirectory), "SELECT c2, c5, c4, c1, c0, c3 FROM tmp"); - verifyTableWriterOutput(outputDirectory->path, tableSchema_, false); + verifyTableWriterOutput(outputDirectory->path(), tableSchema_, false); } } @@ -1602,7 +1602,7 @@ TEST_P(AllTableWriterTest, directReadWrite) { auto filePaths = makeFilePaths(5); auto vectors = makeVectors(filePaths.size(), 200); for (int i = 0; i < filePaths.size(); i++) { - writeToFile(filePaths[i]->path, vectors[i]); + writeToFile(filePaths[i]->path(), vectors[i]); } createDuckDbTable(vectors); @@ -1611,7 +1611,7 @@ TEST_P(AllTableWriterTest, directReadWrite) { auto plan = createInsertPlan( PlanBuilder().tableScan(rowType_), rowType_, - outputDirectory->path, + outputDirectory->path(), partitionedBy_, bucketProperty_, compressionKind_, @@ -1632,14 +1632,14 @@ TEST_P(AllTableWriterTest, directReadWrite) { makeHiveConnectorSplits(outputDirectory), "SELECT c2, c3, c4, c5 FROM tmp"); rowType_ = newOutputType; - verifyTableWriterOutput(outputDirectory->path, rowType_); + verifyTableWriterOutput(outputDirectory->path(), rowType_); } else { assertQuery( PlanBuilder().tableScan(rowType_).planNode(), makeHiveConnectorSplits(outputDirectory), "SELECT * FROM tmp"); - verifyTableWriterOutput(outputDirectory->path, rowType_); + verifyTableWriterOutput(outputDirectory->path(), rowType_); } } @@ -1656,7 +1656,7 @@ TEST_P(AllTableWriterTest, constantVectors) { auto op = createInsertPlan( PlanBuilder().values({vector}), rowType_, - outputDirectory->path, + outputDirectory->path(), partitionedBy_, bucketProperty_, compressionKind_, @@ -1673,14 +1673,14 @@ TEST_P(AllTableWriterTest, constantVectors) { makeHiveConnectorSplits(outputDirectory), "SELECT c2, c3, c4, c5 FROM tmp"); rowType_ = newOutputType; - verifyTableWriterOutput(outputDirectory->path, rowType_); + verifyTableWriterOutput(outputDirectory->path(), rowType_); } else { assertQuery( PlanBuilder().tableScan(rowType_).planNode(), makeHiveConnectorSplits(outputDirectory), "SELECT * FROM tmp"); - verifyTableWriterOutput(outputDirectory->path, rowType_); + verifyTableWriterOutput(outputDirectory->path(), rowType_); } } @@ -1690,7 +1690,7 @@ TEST_P(AllTableWriterTest, emptyInput) { auto op = createInsertPlan( PlanBuilder().values({vector}), rowType_, - outputDirectory->path, + outputDirectory->path(), partitionedBy_, bucketProperty_, compressionKind_, @@ -1715,7 +1715,7 @@ TEST_P(AllTableWriterTest, commitStrategies) { auto plan = createInsertPlan( PlanBuilder().values(vectors), rowType_, - outputDirectory->path, + outputDirectory->path(), partitionedBy_, bucketProperty_, compressionKind_, @@ -1734,14 +1734,14 @@ TEST_P(AllTableWriterTest, commitStrategies) { "SELECT c2, c3, c4, c5 FROM tmp"); auto originalRowType = rowType_; rowType_ = newOutputType; - verifyTableWriterOutput(outputDirectory->path, rowType_); + verifyTableWriterOutput(outputDirectory->path(), rowType_); rowType_ = originalRowType; } else { assertQuery( PlanBuilder().tableScan(rowType_).planNode(), makeHiveConnectorSplits(outputDirectory), "SELECT * FROM tmp"); - verifyTableWriterOutput(outputDirectory->path, rowType_); + verifyTableWriterOutput(outputDirectory->path(), rowType_); } } // Test kNoCommit commit strategy writing to non-temporary files. @@ -1752,7 +1752,7 @@ TEST_P(AllTableWriterTest, commitStrategies) { auto plan = createInsertPlan( PlanBuilder().values(vectors), rowType_, - outputDirectory->path, + outputDirectory->path(), partitionedBy_, bucketProperty_, compressionKind_, @@ -1770,13 +1770,13 @@ TEST_P(AllTableWriterTest, commitStrategies) { makeHiveConnectorSplits(outputDirectory), "SELECT c2, c3, c4, c5 FROM tmp"); rowType_ = newOutputType; - verifyTableWriterOutput(outputDirectory->path, rowType_); + verifyTableWriterOutput(outputDirectory->path(), rowType_); } else { assertQuery( PlanBuilder().tableScan(rowType_).planNode(), makeHiveConnectorSplits(outputDirectory), "SELECT * FROM tmp"); - verifyTableWriterOutput(outputDirectory->path, rowType_); + verifyTableWriterOutput(outputDirectory->path(), rowType_); } } } @@ -1839,14 +1839,14 @@ TEST_P(PartitionedTableWriterTest, specialPartitionName) { auto inputFilePaths = makeFilePaths(numBatches); for (int i = 0; i < numBatches; i++) { - writeToFile(inputFilePaths[i]->path, vectors[i]); + writeToFile(inputFilePaths[i]->path(), vectors[i]); } auto outputDirectory = TempDirectoryPath::create(); auto plan = createInsertPlan( PlanBuilder().tableScan(rowType), rowType, - outputDirectory->path, + outputDirectory->path(), partitionKeys, bucketProperty_, compressionKind_, @@ -1857,7 +1857,7 @@ TEST_P(PartitionedTableWriterTest, specialPartitionName) { auto task = assertQuery(plan, inputFilePaths, "SELECT count(*) FROM tmp"); std::set actualPartitionDirectories = - getLeafSubdirectories(outputDirectory->path); + getLeafSubdirectories(outputDirectory->path()); std::set expectedPartitionDirectories; const std::vector expectedCharsAfterEscape = { @@ -1881,7 +1881,7 @@ TEST_P(PartitionedTableWriterTest, specialPartitionName) { auto partitionName = fmt::format( "p0={}/p1=str_{}{}", i, i, expectedCharsAfterEscape.at(i % 15)); expectedPartitionDirectories.emplace( - fs::path(outputDirectory->path) / partitionName); + fs::path(outputDirectory->path()) / partitionName); } EXPECT_EQ(actualPartitionDirectories, expectedPartitionDirectories); } @@ -1925,14 +1925,14 @@ TEST_P(PartitionedTableWriterTest, multiplePartitions) { auto inputFilePaths = makeFilePaths(numBatches); for (int i = 0; i < numBatches; i++) { - writeToFile(inputFilePaths[i]->path, vectors[i]); + writeToFile(inputFilePaths[i]->path(), vectors[i]); } auto outputDirectory = TempDirectoryPath::create(); auto plan = createInsertPlan( PlanBuilder().tableScan(rowType), rowType, - outputDirectory->path, + outputDirectory->path(), partitionKeys, bucketProperty_, compressionKind_, @@ -1944,7 +1944,7 @@ TEST_P(PartitionedTableWriterTest, multiplePartitions) { // Verify that there is one partition directory for each partition. std::set actualPartitionDirectories = - getLeafSubdirectories(outputDirectory->path); + getLeafSubdirectories(outputDirectory->path()); std::set expectedPartitionDirectories; std::set partitionNames; @@ -1952,7 +1952,7 @@ TEST_P(PartitionedTableWriterTest, multiplePartitions) { auto partitionName = fmt::format("p0={}/p1=str_{}", i, i); partitionNames.emplace(partitionName); expectedPartitionDirectories.emplace( - fs::path(outputDirectory->path) / partitionName); + fs::path(outputDirectory->path()) / partitionName); } EXPECT_EQ(actualPartitionDirectories, expectedPartitionDirectories); @@ -2005,7 +2005,7 @@ TEST_P(PartitionedTableWriterTest, singlePartition) { auto inputFilePaths = makeFilePaths(numBatches); for (int i = 0; i < numBatches; i++) { - writeToFile(inputFilePaths[i]->path, vectors[i]); + writeToFile(inputFilePaths[i]->path(), vectors[i]); } auto outputDirectory = TempDirectoryPath::create(); @@ -2013,7 +2013,7 @@ TEST_P(PartitionedTableWriterTest, singlePartition) { auto plan = createInsertPlan( PlanBuilder().tableScan(rowType), rowType, - outputDirectory->path, + outputDirectory->path(), partitionKeys, bucketProperty_, compressionKind_, @@ -2025,13 +2025,13 @@ TEST_P(PartitionedTableWriterTest, singlePartition) { plan, inputFilePaths, "SELECT count(*) FROM tmp"); std::set partitionDirectories = - getLeafSubdirectories(outputDirectory->path); + getLeafSubdirectories(outputDirectory->path()); // Verify only a single partition directory is created. ASSERT_EQ(partitionDirectories.size(), 1); EXPECT_EQ( *partitionDirectories.begin(), - fs::path(outputDirectory->path) / "p0=365"); + fs::path(outputDirectory->path()) / "p0=365"); // Verify all data is written to the single partition directory. auto newOutputType = getNonPartitionsColumns(partitionKeys, rowType); @@ -2073,7 +2073,7 @@ TEST_P(PartitionedWithoutBucketTableWriterTest, fromSinglePartitionToMultiple) { auto plan = createInsertPlan( PlanBuilder().values(vectors), rowType, - outputDirectory->path, + outputDirectory->path(), partitionKeys, nullptr, compressionKind_, @@ -2129,7 +2129,7 @@ TEST_P(PartitionedTableWriterTest, maxPartitions) { auto plan = createInsertPlan( PlanBuilder().values({vector}), rowType, - outputDirectory->path, + outputDirectory->path(), partitionKeys, bucketProperty_, compressionKind_, @@ -2165,7 +2165,7 @@ TEST_P(AllTableWriterTest, writeNoFile) { auto plan = createInsertPlan( PlanBuilder().tableScan(rowType_).filter("false"), rowType_, - outputDirectory->path); + outputDirectory->path()); auto execute = [&](const std::shared_ptr& plan, std::shared_ptr queryCtx) { @@ -2176,7 +2176,7 @@ TEST_P(AllTableWriterTest, writeNoFile) { }; execute(plan, std::make_shared(executor_.get())); - ASSERT_TRUE(fs::is_empty(outputDirectory->path)); + ASSERT_TRUE(fs::is_empty(outputDirectory->path())); } TEST_P(UnpartitionedTableWriterTest, differentCompression) { @@ -2198,7 +2198,7 @@ TEST_P(UnpartitionedTableWriterTest, differentCompression) { createInsertPlan( PlanBuilder().values(input), rowType_, - outputDirectory->path, + outputDirectory->path(), {}, nullptr, compressionKind, @@ -2210,7 +2210,7 @@ TEST_P(UnpartitionedTableWriterTest, differentCompression) { auto plan = createInsertPlan( PlanBuilder().values(input), rowType_, - outputDirectory->path, + outputDirectory->path(), {}, nullptr, compressionKind, @@ -2290,7 +2290,7 @@ TEST_P(UnpartitionedTableWriterTest, runtimeStatsCheck) { auto plan = createInsertPlan( PlanBuilder().values(vectors), rowType, - outputDirectory->path, + outputDirectory->path(), {}, nullptr, compressionKind_, @@ -2350,7 +2350,7 @@ TEST_P(UnpartitionedTableWriterTest, immutableSettings) { auto plan = createInsertPlan( PlanBuilder().values(input), rowType_, - outputDirectory->path, + outputDirectory->path(), {}, nullptr, CompressionKind_NONE, @@ -2404,7 +2404,7 @@ TEST_P(BucketedTableOnlyWriteTest, bucketCountLimit) { auto plan = createInsertPlan( PlanBuilder().values({input}), rowType_, - outputDirectory->path, + outputDirectory->path(), partitionedBy_, bucketProperty_, compressionKind_, @@ -2433,14 +2433,14 @@ TEST_P(BucketedTableOnlyWriteTest, bucketCountLimit) { "SELECT c2, c3, c4, c5 FROM tmp"); auto originalRowType = rowType_; rowType_ = newOutputType; - verifyTableWriterOutput(outputDirectory->path, rowType_); + verifyTableWriterOutput(outputDirectory->path(), rowType_); rowType_ = originalRowType; } else { assertQuery( PlanBuilder().tableScan(rowType_).planNode(), makeHiveConnectorSplits(outputDirectory), "SELECT * FROM tmp"); - verifyTableWriterOutput(outputDirectory->path, rowType_); + verifyTableWriterOutput(outputDirectory->path(), rowType_); } } } @@ -2463,7 +2463,7 @@ TEST_P(BucketedTableOnlyWriteTest, mismatchedBucketTypes) { auto plan = createInsertPlan( PlanBuilder().values({input}), rowType_, - outputDirectory->path, + outputDirectory->path(), partitionedBy_, bucketProperty_, compressionKind_, @@ -2491,7 +2491,7 @@ TEST_P(AllTableWriterTest, tableWriteOutputCheck) { auto plan = createInsertPlan( PlanBuilder().values({input}), rowType_, - outputDirectory->path, + outputDirectory->path(), partitionedBy_, bucketProperty_, compressionKind_, @@ -2529,8 +2529,8 @@ TEST_P(AllTableWriterTest, tableWriteOutputCheck) { ASSERT_FALSE(fragmentVector->isNullAt(i)); folly::dynamic obj = folly::parseJson(fragmentVector->valueAt(i)); if (testMode_ == TestMode::kUnpartitioned) { - ASSERT_EQ(obj["targetPath"], outputDirectory->path); - ASSERT_EQ(obj["writePath"], outputDirectory->path); + ASSERT_EQ(obj["targetPath"], outputDirectory->path()); + ASSERT_EQ(obj["writePath"], outputDirectory->path()); } else { std::string partitionDirRe; for (const auto& partitionBy : partitionedBy_) { @@ -2538,11 +2538,11 @@ TEST_P(AllTableWriterTest, tableWriteOutputCheck) { } ASSERT_TRUE(RE2::FullMatch( obj["targetPath"].asString(), - fmt::format("{}{}", outputDirectory->path, partitionDirRe))) + fmt::format("{}{}", outputDirectory->path(), partitionDirRe))) << obj["targetPath"].asString(); ASSERT_TRUE(RE2::FullMatch( obj["writePath"].asString(), - fmt::format("{}{}", outputDirectory->path, partitionDirRe))) + fmt::format("{}{}", outputDirectory->path(), partitionDirRe))) << obj["writePath"].asString(); } numRows += obj["rowCount"].asInt(); @@ -2588,7 +2588,7 @@ TEST_P(AllTableWriterTest, tableWriteOutputCheck) { ASSERT_GT(writeFiles.size(), 0); ASSERT_LE(writeFiles.size(), numTableWriterCount_); } - auto diskFiles = listAllFiles(outputDirectory->path); + auto diskFiles = listAllFiles(outputDirectory->path()); std::sort(diskFiles.begin(), diskFiles.end()); std::sort(writeFiles.begin(), writeFiles.end()); ASSERT_EQ(diskFiles, writeFiles) @@ -2765,7 +2765,7 @@ TEST_P(AllTableWriterTest, columnStatsDataTypes) { rowType_->children(), partitionedBy_, nullptr, - makeLocationHandle(outputDirectory->path))), + makeLocationHandle(outputDirectory->path()))), false, CommitStrategy::kNoCommit)) .planNode(); @@ -2854,7 +2854,7 @@ TEST_P(AllTableWriterTest, columnStats) { rowType_->children(), partitionedBy_, bucketProperty_, - makeLocationHandle(outputDirectory->path))), + makeLocationHandle(outputDirectory->path()))), false, commitStrategy_)) .planNode(); @@ -2953,7 +2953,7 @@ TEST_P(AllTableWriterTest, columnStatsWithTableWriteMerge) { rowType_->children(), partitionedBy_, bucketProperty_, - makeLocationHandle(outputDirectory->path))), + makeLocationHandle(outputDirectory->path()))), false, commitStrategy_)); @@ -3045,7 +3045,7 @@ TEST_P(AllTableWriterTest, tableWriterStats) { auto inputFilePaths = makeFilePaths(numBatches); for (int i = 0; i < numBatches; i++) { - writeToFile(inputFilePaths[i]->path, vectors[i]); + writeToFile(inputFilePaths[i]->path(), vectors[i]); } auto outputDirectory = TempDirectoryPath::create(); @@ -3053,7 +3053,7 @@ TEST_P(AllTableWriterTest, tableWriterStats) { auto plan = createInsertPlan( PlanBuilder().tableScan(rowType), rowType, - outputDirectory->path, + outputDirectory->path(), partitionKeys, bucketProperty_, compressionKind_, @@ -3137,7 +3137,7 @@ DEBUG_ONLY_TEST_P( auto op = createInsertPlan( PlanBuilder().values(vectors), rowType_, - outputDirectory->path, + outputDirectory->path(), partitionedBy_, bucketProperty_, compressionKind_, @@ -3186,7 +3186,7 @@ DEBUG_ONLY_TEST_P(UnpartitionedTableWriterTest, dataSinkAbortError) { auto outputDirectory = TempDirectoryPath::create(); auto plan = PlanBuilder() .values({vector}) - .tableWrite(outputDirectory->path, fileFormat_) + .tableWrite(outputDirectory->path(), fileFormat_) .planNode(); VELOX_ASSERT_THROW( AssertQueryBuilder(plan).copyResults(pool()), "inject writer error"); @@ -3204,7 +3204,7 @@ TEST_P(BucketSortOnlyTableWriterTest, sortWriterSpill) { auto op = createInsertPlan( PlanBuilder().values(vectors), rowType_, - outputDirectory->path, + outputDirectory->path(), partitionedBy_, bucketProperty_, compressionKind_, @@ -3217,9 +3217,9 @@ TEST_P(BucketSortOnlyTableWriterTest, sortWriterSpill) { assertQueryWithWriterConfigs(op, fmt::format("SELECT {}", 5 * 500), true); if (partitionedBy_.size() > 0) { rowType_ = getNonPartitionsColumns(partitionedBy_, rowType_); - verifyTableWriterOutput(outputDirectory->path, rowType_); + verifyTableWriterOutput(outputDirectory->path(), rowType_); } else { - verifyTableWriterOutput(outputDirectory->path, rowType_); + verifyTableWriterOutput(outputDirectory->path(), rowType_); } const auto updatedSpillStats = globalSpillStats(); @@ -3299,7 +3299,7 @@ DEBUG_ONLY_TEST_P(BucketSortOnlyTableWriterTest, outputBatchRows) { auto plan = createInsertPlan( PlanBuilder().values({vectors}), rowType, - outputDirectory->path, + outputDirectory->path(), partitionKeys, bucketProperty_, compressionKind_, @@ -3437,7 +3437,7 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, reclaimFromTableWriter) { auto writerPlan = PlanBuilder() .values(vectors) - .tableWrite(outputDirectory->path) + .tableWrite(outputDirectory->path()) .capturePlanNodeId(tableWriteNodeId) .project({TableWriteTraits::rowCountColumnName()}) .singleAggregation( @@ -3450,7 +3450,7 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, reclaimFromTableWriter) { AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(queryCtx) .maxDrivers(1) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, writerSpillEnabled) .config( core::QueryConfig::kWriterSpillEnabled, writerSpillEnabled) @@ -3548,7 +3548,7 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, reclaimFromSortTableWriter) { auto writerPlan = PlanBuilder() .values(vectors) - .tableWrite(outputDirectory->path, {"c0"}, 4, {"c1"}, {"c2"}) + .tableWrite(outputDirectory->path(), {"c0"}, 4, {"c1"}, {"c2"}) .project({TableWriteTraits::rowCountColumnName()}) .singleAggregation( {}, @@ -3559,7 +3559,7 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, reclaimFromSortTableWriter) { AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(queryCtx) .maxDrivers(1) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, writerSpillEnabled) .config(core::QueryConfig::kWriterSpillEnabled, writerSpillEnabled) // Set 0 file writer flush threshold to always trigger flush in test. @@ -3644,7 +3644,7 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, writerFlushThreshold) { auto writerPlan = PlanBuilder() .values(vectors) - .tableWrite(outputDirectory->path) + .tableWrite(outputDirectory->path()) .project({TableWriteTraits::rowCountColumnName()}) .singleAggregation( {}, @@ -3655,7 +3655,7 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, writerFlushThreshold) { AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(queryCtx) .maxDrivers(1) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kWriterSpillEnabled, true) .config( @@ -3720,7 +3720,7 @@ DEBUG_ONLY_TEST_F( auto writerPlan = PlanBuilder() .values(vectors) - .tableWrite(outputDirectory->path) + .tableWrite(outputDirectory->path()) .project({TableWriteTraits::rowCountColumnName()}) .singleAggregation( {}, @@ -3731,7 +3731,7 @@ DEBUG_ONLY_TEST_F( AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(queryCtx) .maxDrivers(1) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kWriterSpillEnabled, true) // Set file writer flush threshold of zero to always trigger flush in @@ -3812,7 +3812,7 @@ DEBUG_ONLY_TEST_F( auto writerPlan = PlanBuilder() .values(vectors) - .tableWrite(outputDirectory->path) + .tableWrite(outputDirectory->path()) .project({TableWriteTraits::rowCountColumnName()}) .singleAggregation( {}, @@ -3823,7 +3823,7 @@ DEBUG_ONLY_TEST_F( AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(queryCtx) .maxDrivers(1) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kWriterSpillEnabled, true) // Set 0 file writer flush threshold to always trigger flush in test. @@ -3900,7 +3900,7 @@ DEBUG_ONLY_TEST_F( auto writerPlan = PlanBuilder() .values(vectors) - .tableWrite(outputDirectory->path, {"c0"}, 4, {"c1"}, {"c2"}) + .tableWrite(outputDirectory->path(), {"c0"}, 4, {"c1"}, {"c2"}) .project({TableWriteTraits::rowCountColumnName()}) .singleAggregation( {}, @@ -3912,7 +3912,7 @@ DEBUG_ONLY_TEST_F( AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(queryCtx) .maxDrivers(1) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kWriterSpillEnabled, "true") // Set file writer flush threshold of zero to always trigger flush in @@ -3983,13 +3983,13 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, tableFileWriteError) { const auto outputDirectory = TempDirectoryPath::create(); auto writerPlan = PlanBuilder() .values(vectors) - .tableWrite(outputDirectory->path) + .tableWrite(outputDirectory->path()) .planNode(); VELOX_ASSERT_THROW( AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(queryCtx) .maxDrivers(1) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kWriterSpillEnabled, true) // Set 0 file writer flush threshold to always reclaim memory from @@ -4072,13 +4072,13 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, tableWriteSpillUseMoreMemory) { const auto outputDirectory = TempDirectoryPath::create(); auto writerPlan = PlanBuilder() .values(vectors) - .tableWrite(outputDirectory->path) + .tableWrite(outputDirectory->path()) .planNode(); VELOX_ASSERT_THROW( AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(queryCtx) .maxDrivers(1) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kWriterSpillEnabled, true) // Set 0 file writer flush threshold to always trigger flush in test. @@ -4162,7 +4162,7 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, tableWriteReclaimOnClose) { auto writerPlan = PlanBuilder() .values(vectors) - .tableWrite(outputDirectory->path) + .tableWrite(outputDirectory->path()) .singleAggregation( {}, {fmt::format("sum({})", TableWriteTraits::rowCountColumnName())}) @@ -4171,7 +4171,7 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, tableWriteReclaimOnClose) { AssertQueryBuilder(duckDbQueryRunner_) .queryCtx(queryCtx) .maxDrivers(1) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kWriterSpillEnabled, true) // Set 0 file writer flush threshold to always trigger flush in test. diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index 3bf0a1d487a6b..8aa1484d20e35 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -767,7 +767,7 @@ TEST_F(TaskTest, singleThreadedExecution) { // Project + Aggregation over TableScan. auto filePath = TempFilePath::create(); - writeToFile(filePath->path, {data, data}); + writeToFile(filePath->path(), {data, data}); core::PlanNodeId scanId; plan = PlanBuilder() @@ -779,7 +779,7 @@ TEST_F(TaskTest, singleThreadedExecution) { { auto [task, results] = - executeSingleThreaded(plan, {{scanId, {filePath->path}}}); + executeSingleThreaded(plan, {{scanId, {filePath->path()}}}); assertEqualResults({expectedResult}, results); } @@ -796,7 +796,7 @@ TEST_F(TaskTest, singleThreadedHashJoin) { makeFlatVector({10, 20, 30, 40}), }); auto leftPath = TempFilePath::create(); - writeToFile(leftPath->path, {left}); + writeToFile(leftPath->path(), {left}); auto right = makeRowVector( {"u_c0"}, @@ -804,7 +804,7 @@ TEST_F(TaskTest, singleThreadedHashJoin) { makeFlatVector({0, 1, 3, 5}), }); auto rightPath = TempFilePath::create(); - writeToFile(rightPath->path, {right}); + writeToFile(rightPath->path(), {right}); auto planNodeIdGenerator = std::make_shared(); core::PlanNodeId leftScanId; @@ -832,7 +832,7 @@ TEST_F(TaskTest, singleThreadedHashJoin) { { auto [task, results] = executeSingleThreaded( plan, - {{leftScanId, {leftPath->path}}, {rightScanId, {rightPath->path}}}); + {{leftScanId, {leftPath->path()}}, {rightScanId, {rightPath->path()}}}); assertEqualResults({expectedResult}, results); } } @@ -840,11 +840,11 @@ TEST_F(TaskTest, singleThreadedHashJoin) { TEST_F(TaskTest, singleThreadedCrossJoin) { auto left = makeRowVector({"t_c0"}, {makeFlatVector({1, 2, 3})}); auto leftPath = TempFilePath::create(); - writeToFile(leftPath->path, {left}); + writeToFile(leftPath->path(), {left}); auto right = makeRowVector({"u_c0"}, {makeFlatVector({10, 20})}); auto rightPath = TempFilePath::create(); - writeToFile(rightPath->path, {right}); + writeToFile(rightPath->path(), {right}); auto planNodeIdGenerator = std::make_shared(); core::PlanNodeId leftScanId; @@ -869,7 +869,7 @@ TEST_F(TaskTest, singleThreadedCrossJoin) { { auto [task, results] = executeSingleThreaded( plan, - {{leftScanId, {leftPath->path}}, {rightScanId, {rightPath->path}}}); + {{leftScanId, {leftPath->path()}}, {rightScanId, {rightPath->path()}}}); assertEqualResults({expectedResult}, results); } } @@ -1349,7 +1349,7 @@ DEBUG_ONLY_TEST_F(TaskTest, driverCounters) { makeFlatVector(1'000, [](auto row) { return row; }), }); auto filePath = TempFilePath::create(); - writeToFile(filePath->path, {data, data}); + writeToFile(filePath->path(), {data, data}); core::PlanNodeId scanNodeId; auto plan = PlanBuilder() @@ -1434,7 +1434,7 @@ DEBUG_ONLY_TEST_F(TaskTest, driverCounters) { // Now add a split, finalize splits and wait for the task to finish. auto split = exec::Split(makeHiveConnectorSplit( - filePath->path, 0, std::numeric_limits::max(), 1)); + filePath->path(), 0, std::numeric_limits::max(), 1)); task->addSplit(scanNodeId, std::move(split)); task->noMoreSplits(scanNodeId); taskThread.join(); @@ -1500,7 +1500,7 @@ TEST_F(TaskTest, spillDirectoryLifecycleManagement) { std::shared_ptr task = cursor->task(); auto rootTempDir = exec::test::TempDirectoryPath::create(); auto tmpDirectoryPath = - rootTempDir->path + "/spillDirectoryLifecycleManagement"; + rootTempDir->path() + "/spillDirectoryLifecycleManagement"; task->setSpillDirectory(tmpDirectoryPath, false); TestScopedSpillInjection scopedSpillInjection(100); @@ -1556,7 +1556,7 @@ TEST_F(TaskTest, spillDirNotCreated) { auto cursor = TaskCursor::create(params); auto* task = cursor->task().get(); auto rootTempDir = exec::test::TempDirectoryPath::create(); - auto tmpDirectoryPath = rootTempDir->path + "/spillDirNotCreated"; + auto tmpDirectoryPath = rootTempDir->path() + "/spillDirNotCreated"; task->setSpillDirectory(tmpDirectoryPath, false); while (cursor->moveNext()) { diff --git a/velox/exec/tests/TopNRowNumberTest.cpp b/velox/exec/tests/TopNRowNumberTest.cpp index bbc9bcd1c61a5..7dd58fe0d0246 100644 --- a/velox/exec/tests/TopNRowNumberTest.cpp +++ b/velox/exec/tests/TopNRowNumberTest.cpp @@ -139,7 +139,7 @@ TEST_F(TopNRowNumberTest, largeOutput) { .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kTopNRowNumberSpillEnabled, "true") - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .assertResults(sql); auto taskStats = exec::toPlanStats(task->taskStats()); @@ -223,7 +223,7 @@ TEST_F(TopNRowNumberTest, manyPartitions) { fmt::format("{}", outputBatchBytes)) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kTopNRowNumberSpillEnabled, "true") - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .assertResults(sql); auto taskStats = exec::toPlanStats(task->taskStats()); @@ -359,7 +359,7 @@ TEST_F(TopNRowNumberTest, maxSpillBytes) { try { TestScopedSpillInjection scopedSpillInjection(100); AssertQueryBuilder(plan) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .queryCtx(queryCtx) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kTopNRowNumberSpillEnabled, "true") diff --git a/velox/exec/tests/WindowTest.cpp b/velox/exec/tests/WindowTest.cpp index cfc45ee5afd65..c5d07cd632667 100644 --- a/velox/exec/tests/WindowTest.cpp +++ b/velox/exec/tests/WindowTest.cpp @@ -66,7 +66,7 @@ TEST_F(WindowTest, spill) { .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kWindowSpillEnabled, "true") - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .assertResults( "SELECT *, row_number() over (partition by p order by s) FROM tmp"); diff --git a/velox/exec/tests/utils/ArbitratorTestUtil.cpp b/velox/exec/tests/utils/ArbitratorTestUtil.cpp index 58c5d0af4b25e..d5311e7dc66ec 100644 --- a/velox/exec/tests/utils/ArbitratorTestUtil.cpp +++ b/velox/exec/tests/utils/ArbitratorTestUtil.cpp @@ -94,7 +94,7 @@ QueryTestResult runHashJoinTask( if (enableSpilling) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); result.data = AssertQueryBuilder(plan) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kJoinSpillEnabled, true) .config(core::QueryConfig::kSpillStartPartitionBit, "29") @@ -137,7 +137,7 @@ QueryTestResult runAggregateTask( const auto spillDirectory = exec::test::TempDirectoryPath::create(); result.data = AssertQueryBuilder(plan) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kAggregationSpillEnabled, "true") .queryCtx(queryCtx) @@ -179,7 +179,7 @@ QueryTestResult runOrderByTask( if (enableSpilling) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); result.data = AssertQueryBuilder(plan) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kOrderBySpillEnabled, "true") .queryCtx(queryCtx) @@ -221,7 +221,7 @@ QueryTestResult runRowNumberTask( if (enableSpilling) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); result.data = AssertQueryBuilder(plan) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kRowNumberSpillEnabled, "true") .queryCtx(queryCtx) @@ -264,7 +264,7 @@ QueryTestResult runTopNTask( const auto spillDirectory = exec::test::TempDirectoryPath::create(); result.data = AssertQueryBuilder(plan) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kTopNRowNumberSpillEnabled, "true") .queryCtx(queryCtx) @@ -306,12 +306,12 @@ QueryTestResult runWriteTask( const RowVectorPtr& expectedResult) { QueryTestResult result; const auto outputDirectory = TempDirectoryPath::create(); - auto plan = writePlan(vectors, outputDirectory->path, result.planNodeId); + auto plan = writePlan(vectors, outputDirectory->path(), result.planNodeId); if (enableSpilling) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); result.data = AssertQueryBuilder(plan) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kAggregationSpillEnabled, "false") .config(core::QueryConfig::kWriterSpillEnabled, "true") diff --git a/velox/exec/tests/utils/TempDirectoryPath.cpp b/velox/exec/tests/utils/TempDirectoryPath.cpp index b34815a0cd5a9..74fffa9241be5 100644 --- a/velox/exec/tests/utils/TempDirectoryPath.cpp +++ b/velox/exec/tests/utils/TempDirectoryPath.cpp @@ -20,21 +20,26 @@ namespace facebook::velox::exec::test { -std::shared_ptr TempDirectoryPath::create() { - struct SharedTempDirectoryPath : public TempDirectoryPath { - SharedTempDirectoryPath() : TempDirectoryPath() {} - }; - return std::make_shared(); +std::shared_ptr TempDirectoryPath::create(bool injectFault) { + auto* tempDirPath = new TempDirectoryPath(injectFault); + return std::shared_ptr(tempDirPath); } TempDirectoryPath::~TempDirectoryPath() { - LOG(INFO) << "TempDirectoryPath:: removing all files from " << path; + LOG(INFO) << "TempDirectoryPath:: removing all files from " << path_; try { - boost::filesystem::remove_all(path.c_str()); + boost::filesystem::remove_all(path_.c_str()); } catch (...) { LOG(WARNING) << "TempDirectoryPath:: destructor failed while calling boost::filesystem::remove_all"; } } +std::string TempDirectoryPath::createTempDirectory() { + char path[] = "/tmp/velox_test_XXXXXX"; + const char* tempDirectoryPath = ::mkdtemp(path); + VELOX_CHECK_NOT_NULL(tempDirectoryPath, "Cannot open temp directory"); + return tempDirectoryPath; +} + } // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/TempDirectoryPath.h b/velox/exec/tests/utils/TempDirectoryPath.h index 31d8d5dbea161..c61bd6d84c3f6 100644 --- a/velox/exec/tests/utils/TempDirectoryPath.h +++ b/velox/exec/tests/utils/TempDirectoryPath.h @@ -24,27 +24,35 @@ namespace facebook::velox::exec::test { -// It manages the lifetime of a temporary directory. +/// Manages the lifetime of a temporary directory. class TempDirectoryPath { public: - static std::shared_ptr create(); + /// If 'enableFaultInjection' is true, we enable fault injection on the + /// created file directory. + static std::shared_ptr create( + bool enableFaultInjection = false); virtual ~TempDirectoryPath(); - const std::string path; - TempDirectoryPath(const TempDirectoryPath&) = delete; TempDirectoryPath& operator=(const TempDirectoryPath&) = delete; - TempDirectoryPath() : path(createTempDirectory()) {} - - static std::string createTempDirectory() { - char path[] = "/tmp/velox_test_XXXXXX"; - const char* tempDirectoryPath = mkdtemp(path); - if (tempDirectoryPath == nullptr) { - throw std::logic_error("Cannot open temp directory"); - } - return tempDirectoryPath; + /// If fault injection is enabled, the returned the file path has the faulty + /// file system prefix scheme. The velox fs then opens the directory through + /// the faulty file system. The actual file operation might either fails or + /// delegate to the actual file. + const std::string path() const { + return enableFaultInjection_ ? fmt::format("faulty:{}", path_) : path_; } + + private: + static std::string createTempDirectory(); + + explicit TempDirectoryPath(bool enableFaultInjection) + : path_(createTempDirectory()), + enableFaultInjection_(enableFaultInjection) {} + + const std::string path_; + const bool enableFaultInjection_{false}; }; } // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/TempFilePath.cpp b/velox/exec/tests/utils/TempFilePath.cpp index 7c5cbe7370c18..7097539b9a1a8 100644 --- a/velox/exec/tests/utils/TempFilePath.cpp +++ b/velox/exec/tests/utils/TempFilePath.cpp @@ -18,11 +18,22 @@ namespace facebook::velox::exec::test { -std::shared_ptr TempFilePath::create() { - struct SharedTempFilePath : public TempFilePath { - SharedTempFilePath() : TempFilePath() {} - }; - return std::make_shared(); +TempFilePath::~TempFilePath() { + ::unlink(path_.c_str()); + ::close(fd_); } +std::shared_ptr TempFilePath::create(bool enableFaultInjection) { + auto* tempFilePath = new TempFilePath(enableFaultInjection); + return std::shared_ptr(tempFilePath); +} + +std::string TempFilePath::createTempFile(TempFilePath* tempFilePath) { + char path[] = "/tmp/velox_test_XXXXXX"; + tempFilePath->fd_ = ::mkstemp(path); + if (tempFilePath->fd_ == -1) { + VELOX_FAIL("Cannot open temp file: {}", folly::errnoStr(errno)); + } + return path; +} } // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/TempFilePath.h b/velox/exec/tests/utils/TempFilePath.h index d993795f1e3a0..2cc86582687cd 100644 --- a/velox/exec/tests/utils/TempFilePath.h +++ b/velox/exec/tests/utils/TempFilePath.h @@ -26,23 +26,21 @@ namespace facebook::velox::exec::test { -// It manages the lifetime of a temporary file. +/// Manages the lifetime of a temporary file. class TempFilePath { public: - static std::shared_ptr create(); + /// If 'enableFaultInjection' is true, we enable fault injection on the + /// created file. + static std::shared_ptr create( + bool enableFaultInjection = false); - virtual ~TempFilePath() { - unlink(path.c_str()); - close(fd); - } - - const std::string path; + ~TempFilePath(); TempFilePath(const TempFilePath&) = delete; TempFilePath& operator=(const TempFilePath&) = delete; void append(std::string data) { - std::ofstream file(path, std::ios_base::app); + std::ofstream file(path_, std::ios_base::app); file << data; file.flush(); file.close(); @@ -50,31 +48,37 @@ class TempFilePath { const int64_t fileSize() { struct stat st; - stat(path.data(), &st); + ::stat(path_.data(), &st); return st.st_size; } - const int64_t fileModifiedTime() { + int64_t fileModifiedTime() { struct stat st; - stat(path.data(), &st); + ::stat(path_.data(), &st); return st.st_mtime; } + /// If fault injection is enabled, the returned the file path has the faulty + /// file system prefix scheme. The velox fs then opens the file through the + /// faulty file system. The actual file operation might either fails or + /// delegate to the actual file. + std::string path() const { + return enableFaultInjection_ ? fmt::format("faulty:{}", path_) : path_; + } + private: - int fd; + static std::string createTempFile(TempFilePath* tempFilePath); - TempFilePath() : path(createTempFile(this)) { - VELOX_CHECK_NE(fd, -1); + TempFilePath(bool enableFaultInjection) + : enableFaultInjection_(enableFaultInjection), + path_(createTempFile(this)) { + VELOX_CHECK_NE(fd_, -1); } - static std::string createTempFile(TempFilePath* tempFilePath) { - char path[] = "/tmp/velox_test_XXXXXX"; - tempFilePath->fd = mkstemp(path); - if (tempFilePath->fd == -1) { - throw std::logic_error("Cannot open temp file"); - } - return path; - } + const bool enableFaultInjection_; + const std::string path_; + + int fd_; }; } // namespace facebook::velox::exec::test diff --git a/velox/expression/tests/ExpressionRunnerUnitTest.cpp b/velox/expression/tests/ExpressionRunnerUnitTest.cpp index 845325636d487..25f0d3f7095c0 100644 --- a/velox/expression/tests/ExpressionRunnerUnitTest.cpp +++ b/velox/expression/tests/ExpressionRunnerUnitTest.cpp @@ -49,8 +49,10 @@ TEST_F(ExpressionRunnerUnitTest, run) { auto inputFile = exec::test::TempFilePath::create(); auto sqlFile = exec::test::TempFilePath::create(); auto resultFile = exec::test::TempFilePath::create(); - const char* inputPath = inputFile->path.data(); - const char* resultPath = resultFile->path.data(); + const auto inputPathStr = inputFile->path(); + const char* inputPath = inputPathStr.data(); + const auto resultPathStr = resultFile->path(); + const char* resultPath = resultPathStr.data(); const int vectorSize = 100; VectorMaker vectorMaker(pool_.get()); @@ -108,8 +110,10 @@ TEST_F(ExpressionRunnerUnitTest, persistAndReproComplexSql) { // Emulate a reproduce from complex constant SQL auto sqlFile = exec::test::TempFilePath::create(); auto complexConstantsFile = exec::test::TempFilePath::create(); - auto sqlPath = sqlFile->path.c_str(); - auto complexConstantsPath = complexConstantsFile->path.c_str(); + const auto complexConstantsFilePathStr = complexConstantsFile->path(); + auto sqlPathStr = sqlFile->path(); + auto sqlPath = sqlPathStr.c_str(); + auto complexConstantsPath = complexConstantsFilePathStr.c_str(); // Write to file.. saveStringToFile(complexConstantsSql, sqlPath); diff --git a/velox/functions/lib/aggregates/tests/utils/AggregationTestBase.cpp b/velox/functions/lib/aggregates/tests/utils/AggregationTestBase.cpp index 7a04de0324948..ae7b3b9d1fa79 100644 --- a/velox/functions/lib/aggregates/tests/utils/AggregationTestBase.cpp +++ b/velox/functions/lib/aggregates/tests/utils/AggregationTestBase.cpp @@ -404,7 +404,7 @@ void AggregationTestBase::testAggregationsWithCompanion( queryBuilder.configs(config) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kAggregationSpillEnabled, true) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .maxDrivers(4); exec::TestScopedSpillInjection scopedSpillInjection(100); @@ -647,10 +647,10 @@ void AggregationTestBase::testReadFromFiles( for (auto& vector : inputs) { auto file = exec::test::TempFilePath::create(); - writeToFile(file->path, vector, writerPool.get()); + writeToFile(file->path(), vector, writerPool.get()); files.push_back(file); splits.emplace_back(std::make_shared( - kHiveConnectorId, file->path, dwio::common::FileFormat::DWRF)); + kHiveConnectorId, file->path(), dwio::common::FileFormat::DWRF)); } // No need to test streaming as the streaming test generates its own inputs, // so it would be the same as the original test. @@ -666,7 +666,7 @@ void AggregationTestBase::testReadFromFiles( } for (const auto& file : files) { - remove(file->path.c_str()); + remove(file->path().c_str()); } } @@ -843,7 +843,7 @@ void AggregationTestBase::testAggregationsImpl( queryBuilder.configs(config) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kAggregationSpillEnabled, true) - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .maxDrivers(4); exec::TestScopedSpillInjection scopedSpillInjection(100); @@ -945,7 +945,7 @@ void AggregationTestBase::testAggregationsImpl( queryBuilder.configs(config) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kAggregationSpillEnabled, "true") - .spillDirectory(spillDirectory->path); + .spillDirectory(spillDirectory->path()); TestScopedSpillInjection scopedSpillInjection(100); auto task = assertResults(queryBuilder); diff --git a/velox/functions/prestosql/aggregates/tests/ArbitraryTest.cpp b/velox/functions/prestosql/aggregates/tests/ArbitraryTest.cpp index 4ae59387b1e2d..d5b609130288a 100644 --- a/velox/functions/prestosql/aggregates/tests/ArbitraryTest.cpp +++ b/velox/functions/prestosql/aggregates/tests/ArbitraryTest.cpp @@ -402,7 +402,7 @@ TEST_F(ArbitraryTest, spilling) { exec::TestScopedSpillInjection scopedSpillInjection(100); spillDirectory = exec::test::TempDirectoryPath::create(); - builder.spillDirectory(spillDirectory->path) + builder.spillDirectory(spillDirectory->path()) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kAggregationSpillEnabled, "true"); diff --git a/velox/functions/sparksql/aggregates/tests/FirstAggregateTest.cpp b/velox/functions/sparksql/aggregates/tests/FirstAggregateTest.cpp index 2088d088644be..712a81bb2c01c 100644 --- a/velox/functions/sparksql/aggregates/tests/FirstAggregateTest.cpp +++ b/velox/functions/sparksql/aggregates/tests/FirstAggregateTest.cpp @@ -539,7 +539,7 @@ TEST_F(FirstAggregateTest, spillingAndSorting) { results = AssertQueryBuilder(plan) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kAggregationSpillEnabled, "true") - .spillDirectory(spillDirectory->path) + .spillDirectory(spillDirectory->path()) .copyResults(pool()); exec::test::assertEqualResults({expected}, {results}); } diff --git a/velox/vector/tests/VectorSaverTest.cpp b/velox/vector/tests/VectorSaverTest.cpp index e2735201ce333..c06acc4140fd7 100644 --- a/velox/vector/tests/VectorSaverTest.cpp +++ b/velox/vector/tests/VectorSaverTest.cpp @@ -120,11 +120,11 @@ class VectorSaverTest : public testing::Test, public VectorTestBase { VectorPtr takeRoundTrip(const VectorPtr& vector) { auto path = exec::test::TempFilePath::create(); - std::ofstream outputFile(path->path, std::ofstream::binary); + std::ofstream outputFile(path->path(), std::ofstream::binary); saveVector(*vector, outputFile); outputFile.close(); - std::ifstream inputFile(path->path, std::ifstream::binary); + std::ifstream inputFile(path->path(), std::ifstream::binary); auto copy = restoreVector(inputFile, pool()); inputFile.close(); return copy; @@ -139,11 +139,11 @@ class VectorSaverTest : public testing::Test, public VectorTestBase { void testTypeRoundTrip(const TypePtr& type) { auto path = exec::test::TempFilePath::create(); - std::ofstream outputFile(path->path, std::ofstream::binary); + std::ofstream outputFile(path->path(), std::ofstream::binary); saveType(type, outputFile); outputFile.close(); - std::ifstream inputFile(path->path, std::ifstream::binary); + std::ifstream inputFile(path->path(), std::ifstream::binary); auto copy = restoreType(inputFile); inputFile.close(); @@ -625,8 +625,8 @@ TEST_F(VectorSaverTest, LazyVector) { TEST_F(VectorSaverTest, stdVector) { std::vector intVector = {1, 2, 3, 4, 5}; auto path = exec::test::TempFilePath::create(); - saveStdVectorToFile(intVector, path->path.c_str()); - auto copy = restoreStdVectorFromFile(path->path.c_str()); + saveStdVectorToFile(intVector, path->path().c_str()); + auto copy = restoreStdVectorFromFile(path->path().c_str()); ASSERT_EQ(intVector, copy); } @@ -661,7 +661,8 @@ TEST_F(VectorSaverTest, exceptionContext) { }; VectorPtr data = makeFlatVector({1, 2, 3, 4, 5}); - VectorSaverInfo info{tempDirectory.get()->path.c_str(), data.get()}; + const auto path = tempDirectory->path(); + VectorSaverInfo info{path.c_str(), data.get()}; { ExceptionContextSetter context({messageFunction, &info}); try {