From 0914f82f7f28b1f22069c366058c22c71a25bd83 Mon Sep 17 00:00:00 2001 From: Ankita Victor Date: Fri, 31 May 2024 12:44:15 +0530 Subject: [PATCH] Resolve merge conflicts --- velox/common/caching/CachedFactory.h | 19 ++++++++----- .../caching/tests/CachedFactoryTest.cpp | 13 ++++++--- velox/connectors/hive/FileHandle.cpp | 11 ++++++-- velox/connectors/hive/FileHandle.h | 6 ++++- velox/connectors/hive/HiveConnectorSplit.h | 4 +++ velox/connectors/hive/SplitReader.cpp | 5 +++- .../connectors/hive/iceberg/IcebergSplit.cpp | 4 +++ velox/connectors/hive/iceberg/IcebergSplit.h | 2 ++ .../hive/iceberg/tests/IcebergReadTest.cpp | 1 + .../abfs/tests/AbfsFileSystemTest.cpp | 27 +++++++++++++++++++ .../connectors/hive/tests/FileHandleTest.cpp | 26 ++++++++++++++++++ .../hive/tests/HiveConnectorUtilTest.cpp | 1 + .../exec/tests/utils/HiveConnectorTestBase.h | 2 ++ 13 files changed, 107 insertions(+), 14 deletions(-) diff --git a/velox/common/caching/CachedFactory.h b/velox/common/caching/CachedFactory.h index e0587ef04aef..09b129981791 100644 --- a/velox/common/caching/CachedFactory.h +++ b/velox/common/caching/CachedFactory.h @@ -145,6 +145,7 @@ template < typename Key, typename Value, typename Generator, + typename Properties = void, typename Sizer = DefaultSizer, typename Comparator = std::equal_to, typename Hash = std::hash> @@ -175,7 +176,9 @@ class CachedFactory { /// repeatedly, handing off the results to one thread at a time until the /// all pending requests are satisfied or a cache insert succeeds. This /// will probably mess with your memory model, so really try to avoid it. - CachedPtr generate(const Key& key); + CachedPtr generate( + const Key& key, + const Properties* properties = nullptr); /// Advanced function taking in a group of keys. Separates those keys into /// one's present in the cache (returning CachedPtrs for them) and those not @@ -350,17 +353,18 @@ template < typename Key, typename Value, typename Generator, + typename Properties, typename Sizer, typename Comparator, typename Hash> CachedPtr -CachedFactory::generate( - const Key& key) { +CachedFactory:: + generate(const Key& key, const Properties* properties) { process::TraceContext trace("CachedFactory::generate"); if (cache_ == nullptr) { return CachedPtr{ /*fromCache=*/false, - (*generator_)(key).release(), + (*generator_)(key, properties).release(), nullptr, std::make_unique(key)}; } @@ -389,7 +393,7 @@ CachedFactory::generate( } pendingLock.unlock(); // Regenerates in the edge case. - return generate(key); + return generate(key, properties); } pending_.insert(key); @@ -400,7 +404,7 @@ CachedFactory::generate( pendingCv_.notify_all(); }; - std::unique_ptr generatedValue = (*generator_)(key); + std::unique_ptr generatedValue = (*generator_)(key, properties); const uint64_t valueSize = Sizer()(*generatedValue); Value* rawValue = generatedValue.release(); const bool inserted = addCache(key, rawValue, valueSize); @@ -424,10 +428,11 @@ template < typename Key, typename Value, typename Generator, + typename Properties, typename Sizer, typename Comparator, typename Hash> -void CachedFactory:: +void CachedFactory:: retrieveCached( const std::vector& keys, std::vector>>& diff --git a/velox/common/caching/tests/CachedFactoryTest.cpp b/velox/common/caching/tests/CachedFactoryTest.cpp index 1a9f924c6235..d5b8bd43022b 100644 --- a/velox/common/caching/tests/CachedFactoryTest.cpp +++ b/velox/common/caching/tests/CachedFactoryTest.cpp @@ -26,8 +26,11 @@ using namespace facebook::velox; namespace { + struct DoublerGenerator { - std::unique_ptr operator()(const int& value) { + std::unique_ptr operator()( + const int& value, + const void* properties = nullptr) { ++generated; return std::make_unique(value * 2); } @@ -35,7 +38,9 @@ struct DoublerGenerator { }; struct IdentityGenerator { - std::unique_ptr operator()(const int& value) { + std::unique_ptr operator()( + const int& value, + const void* properties = nullptr) { return std::make_unique(value); } }; @@ -106,7 +111,9 @@ TEST(CachedFactoryTest, basicGeneration) { } struct DoublerWithExceptionsGenerator { - std::unique_ptr operator()(const int& value) { + std::unique_ptr operator()( + const int& value, + const void* properties = nullptr) { if (value == 3) { VELOX_FAIL("3 is bad"); } diff --git a/velox/connectors/hive/FileHandle.cpp b/velox/connectors/hive/FileHandle.cpp index 40ac778adc4a..de611098d877 100644 --- a/velox/connectors/hive/FileHandle.cpp +++ b/velox/connectors/hive/FileHandle.cpp @@ -40,7 +40,8 @@ std::string groupName(const std::string& filename) { } // namespace std::unique_ptr FileHandleGenerator::operator()( - const std::string& filename) { + const std::string& filename, + const FileProperties* properties) { // We have seen cases where drivers are stuck when creating file handles. // Adding a trace here to spot this more easily in future. process::TraceContext trace("FileHandleGenerator::operator()"); @@ -49,8 +50,14 @@ std::unique_ptr FileHandleGenerator::operator()( { MicrosecondTimer timer(&elapsedTimeUs); fileHandle = std::make_unique(); + filesystems::FileOptions options; + if (properties) { + options.fileSize = properties->fileSize == -1 + ? std::nullopt + : std::optional{properties->fileSize}; + } fileHandle->file = filesystems::getFileSystem(filename, properties_) - ->openFileForRead(filename); + ->openFileForRead(filename, options); fileHandle->uuid = StringIdLease(fileIds(), filename); fileHandle->groupId = StringIdLease(fileIds(), groupName(filename)); VLOG(1) << "Generating file handle for: " << filename diff --git a/velox/connectors/hive/FileHandle.h b/velox/connectors/hive/FileHandle.h index e8a9a954094e..18ff0a8b00ed 100644 --- a/velox/connectors/hive/FileHandle.h +++ b/velox/connectors/hive/FileHandle.h @@ -28,6 +28,7 @@ #include "velox/common/caching/CachedFactory.h" #include "velox/common/caching/FileIds.h" #include "velox/common/file/File.h" +#include "velox/connectors/hive/FileProperties.h" namespace facebook::velox { @@ -67,7 +68,9 @@ class FileHandleGenerator { FileHandleGenerator() {} FileHandleGenerator(std::shared_ptr properties) : properties_(std::move(properties)) {} - std::unique_ptr operator()(const std::string& filename); + std::unique_ptr operator()( + const std::string& filename, + const FileProperties* properties); private: const std::shared_ptr properties_; @@ -77,6 +80,7 @@ using FileHandleFactory = CachedFactory< std::string, FileHandle, FileHandleGenerator, + FileProperties, FileHandleSizer>; using FileHandleCachedPtr = CachedPtr; diff --git a/velox/connectors/hive/HiveConnectorSplit.h b/velox/connectors/hive/HiveConnectorSplit.h index 48f39f64bec2..782097a7d221 100644 --- a/velox/connectors/hive/HiveConnectorSplit.h +++ b/velox/connectors/hive/HiveConnectorSplit.h @@ -18,6 +18,7 @@ #include #include #include "velox/connectors/Connector.h" +#include "velox/connectors/hive/FileProperties.h" #include "velox/dwio/common/Options.h" namespace facebook::velox::connector::hive { @@ -27,6 +28,7 @@ struct HiveConnectorSplit : public connector::ConnectorSplit { dwio::common::FileFormat fileFormat; const uint64_t start; const uint64_t length; + std::optional properties; /// Mapping from partition keys to values. Values are specified as strings /// formatted the same way as CAST(x as VARCHAR). Null values are specified as @@ -49,6 +51,7 @@ struct HiveConnectorSplit : public connector::ConnectorSplit { dwio::common::FileFormat _fileFormat, uint64_t _start = 0, uint64_t _length = std::numeric_limits::max(), + std::optional _properties = std::nullopt, const std::unordered_map>& _partitionKeys = {}, std::optional _tableBucketNumber = std::nullopt, @@ -62,6 +65,7 @@ struct HiveConnectorSplit : public connector::ConnectorSplit { fileFormat(_fileFormat), start(_start), length(_length), + properties(_properties), partitionKeys(_partitionKeys), tableBucketNumber(_tableBucketNumber), customSplitInfo(_customSplitInfo), diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index 9e10a1bc397d..9cde8af84377 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -229,7 +229,10 @@ void SplitReader::createReader() { FileHandleCachedPtr fileHandleCachePtr; try { - fileHandleCachePtr = fileHandleFactory_->generate(hiveSplit_->filePath); + fileHandleCachePtr = fileHandleFactory_->generate( + hiveSplit_->filePath, + hiveSplit_->properties.has_value() ? &*hiveSplit_->properties + : nullptr); VELOX_CHECK_NOT_NULL(fileHandleCachePtr.get()); } catch (const VeloxRuntimeError& e) { if (e.errorCode() == error_code::kFileNotFound && diff --git a/velox/connectors/hive/iceberg/IcebergSplit.cpp b/velox/connectors/hive/iceberg/IcebergSplit.cpp index 747d70869f53..3474703b4688 100644 --- a/velox/connectors/hive/iceberg/IcebergSplit.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplit.cpp @@ -26,6 +26,7 @@ HiveIcebergSplit::HiveIcebergSplit( dwio::common::FileFormat _fileFormat, uint64_t _start, uint64_t _length, + FileProperties _properties, const std::unordered_map>& _partitionKeys, std::optional _tableBucketNumber, @@ -38,6 +39,7 @@ HiveIcebergSplit::HiveIcebergSplit( _fileFormat, _start, _length, + _properties, _partitionKeys, _tableBucketNumber, _customSplitInfo, @@ -55,6 +57,7 @@ HiveIcebergSplit::HiveIcebergSplit( dwio::common::FileFormat _fileFormat, uint64_t _start, uint64_t _length, + FileProperties _properties, const std::unordered_map>& _partitionKeys, std::optional _tableBucketNumber, @@ -68,6 +71,7 @@ HiveIcebergSplit::HiveIcebergSplit( _fileFormat, _start, _length, + _properties, _partitionKeys, _tableBucketNumber, _customSplitInfo, diff --git a/velox/connectors/hive/iceberg/IcebergSplit.h b/velox/connectors/hive/iceberg/IcebergSplit.h index 972a48c8f5e9..4d7fda3c6e55 100644 --- a/velox/connectors/hive/iceberg/IcebergSplit.h +++ b/velox/connectors/hive/iceberg/IcebergSplit.h @@ -32,6 +32,7 @@ struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit { dwio::common::FileFormat _fileFormat, uint64_t _start = 0, uint64_t _length = std::numeric_limits::max(), + FileProperties fileProperties = {}, const std::unordered_map>& _partitionKeys = {}, std::optional _tableBucketNumber = std::nullopt, @@ -46,6 +47,7 @@ struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit { dwio::common::FileFormat _fileFormat, uint64_t _start = 0, uint64_t _length = std::numeric_limits::max(), + FileProperties fileProperties = {}, const std::unordered_map>& _partitionKeys = {}, std::optional _tableBucketNumber = std::nullopt, diff --git a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp index 2585d41b2737..8547d92b907c 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp +++ b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp @@ -163,6 +163,7 @@ class HiveIcebergTest : public HiveConnectorTestBase { fileFomat_, 0, fileSize, + FileProperties(), partitionKeys, std::nullopt, customSplitInfo, diff --git a/velox/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp b/velox/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp index 66ababa2ea3e..965e419eba75 100644 --- a/velox/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp +++ b/velox/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp @@ -200,6 +200,33 @@ TEST_F(AbfsFileSystemTest, openFileForReadWithInvalidOptions) { "File size must be non-negative"); } +TEST_F(AbfsFileSystemTest, fileHandleWithProperties) { + auto hiveConfig = AbfsFileSystemTest::hiveConfig( + {{"fs.azure.account.key.test.dfs.core.windows.net", + azuriteServer->connectionStr()}}); + FileHandleFactory factory( + std::make_unique< + SimpleLRUCache>>(1), + std::make_unique(hiveConfig)); + FileProperties properties = { + 15 + kOneMB, + 1 + } auto fileHandle = factory.generate(fullFilePath, &properties).second; + readData(fileHandle->file.get()); +} + +TEST_F(AbfsFileSystemTest, fileHandleWithoutProperties) { + auto hiveConfig = AbfsFileSystemTest::hiveConfig( + {{"fs.azure.account.key.test.dfs.core.windows.net", + azuriteServer->connectionStr()}}); + FileHandleFactory factory( + std::make_unique< + SimpleLRUCache>>(1), + std::make_unique(hiveConfig)); + auto fileHandle = factory.generate(fullFilePath).second; + readData(fileHandle->file.get()); +} + TEST_F(AbfsFileSystemTest, multipleThreadsWithReadFile) { std::atomic startThreads = false; auto hiveConfig = AbfsFileSystemTest::hiveConfig( diff --git a/velox/connectors/hive/tests/FileHandleTest.cpp b/velox/connectors/hive/tests/FileHandleTest.cpp index df045e4fd439..558e4be6a4fe 100644 --- a/velox/connectors/hive/tests/FileHandleTest.cpp +++ b/velox/connectors/hive/tests/FileHandleTest.cpp @@ -47,3 +47,29 @@ TEST(FileHandleTest, localFile) { // Clean up remove(filename.c_str()); } + +TEST(FileHandleTest, localFileWithProperties) { + filesystems::registerLocalFileSystem(); + + auto tempFile = exec::test::TempFilePath::create(); + const auto& filename = tempFile->getPath(); + remove(filename.c_str()); + + { + LocalWriteFile writeFile(filename); + writeFile.append("foo"); + } + + FileHandleFactory factory( + std::make_unique>(1000), + std::make_unique()); + FileProperties properties = { + tempFile->fileSize(), tempFile->fileModifiedTime()}; + auto fileHandle = factory.generate(filename, &properties); + ASSERT_EQ(fileHandle->file->size(), 3); + char buffer[3]; + ASSERT_EQ(fileHandle->file->pread(0, 3, &buffer), "foo"); + + // Clean up + remove(filename.c_str()); +} \ No newline at end of file diff --git a/velox/connectors/hive/tests/HiveConnectorUtilTest.cpp b/velox/connectors/hive/tests/HiveConnectorUtilTest.cpp index 0ebf989dbc2d..dbdd0d4e1404 100644 --- a/velox/connectors/hive/tests/HiveConnectorUtilTest.cpp +++ b/velox/connectors/hive/tests/HiveConnectorUtilTest.cpp @@ -74,6 +74,7 @@ TEST_F(HiveConnectorUtilTest, configureReaderOptions) { fileFormat, 0UL, std::numeric_limits::max(), + facebook::velox::FileProperties(), partitionKeys, std::nullopt, customSplitInfo, diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.h b/velox/exec/tests/utils/HiveConnectorTestBase.h index ebc4437fbe2a..f362f7f8f616 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.h +++ b/velox/exec/tests/utils/HiveConnectorTestBase.h @@ -284,6 +284,7 @@ class HiveConnectorSplitBuilder { fileFormat_, start_, length_, + properties_, partitionKeys_, tableBucketNumber_, customSplitInfo, @@ -298,6 +299,7 @@ class HiveConnectorSplitBuilder { dwio::common::FileFormat fileFormat_{dwio::common::FileFormat::DWRF}; uint64_t start_{0}; uint64_t length_{std::numeric_limits::max()}; + facebook::velox::FileProperties properties_; std::unordered_map> partitionKeys_; std::optional tableBucketNumber_; std::unordered_map customSplitInfo_ = {};