diff --git a/velox/common/file/FileSystems.h b/velox/common/file/FileSystems.h index 442bbc0a97516..07b8400fba7b0 100644 --- a/velox/common/file/FileSystems.h +++ b/velox/common/file/FileSystems.h @@ -16,6 +16,7 @@ #pragma once #include "velox/common/base/Exceptions.h" +#include "velox/common/memory/MemoryPool.h" #include #include @@ -31,8 +32,11 @@ namespace facebook::velox::filesystems { /// Defines the options for per-file operations. It contains a key-value pairs /// which can be easily extended to different storage systems. +/// MemoryPool to allocate buffers needed to read/write files on FileSystems +/// such as S3. struct FileOptions { std::unordered_map values; + memory::MemoryPool* pool{nullptr}; }; /// An abstract FileSystem diff --git a/velox/connectors/hive/storage_adapters/s3fs/CMakeLists.txt b/velox/connectors/hive/storage_adapters/s3fs/CMakeLists.txt index 1747142f90888..ac84135d2b039 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/s3fs/CMakeLists.txt @@ -19,7 +19,8 @@ if(VELOX_ENABLE_S3) target_sources(velox_s3fs PRIVATE S3FileSystem.cpp S3Util.cpp) target_include_directories(velox_s3fs PUBLIC ${AWSSDK_INCLUDE_DIRS}) - target_link_libraries(velox_s3fs Folly::folly ${AWSSDK_LIBRARIES} xsimd) + target_link_libraries(velox_s3fs velox_dwio_common_exception Folly::folly + ${AWSSDK_LIBRARIES} xsimd) if(${VELOX_BUILD_TESTING}) add_subdirectory(tests) diff --git a/velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.cpp b/velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.cpp index 6834bf1a7822d..5e8a9fd650c2d 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.cpp @@ -66,7 +66,7 @@ s3WriteFileSinkGenerator() { auto fileSystem = filesystems::getFileSystem(fileURI, options.connectorProperties); return std::make_unique( - fileSystem->openFileForWrite(fileURI), + fileSystem->openFileForWrite(fileURI, {{}, options.pool}), fileURI, options.metricLogger, options.stats); diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp index 372533035001c..13f170da92923 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp @@ -20,6 +20,7 @@ #include "velox/connectors/hive/storage_adapters/s3fs/S3Util.h" #include "velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h" #include "velox/core/Config.h" +#include "velox/dwio/common/DataBuffer.h" #include #include @@ -210,10 +211,16 @@ namespace filesystems { class S3WriteFile::Impl { public: - explicit Impl(const std::string& path, Aws::S3::S3Client* client) - : client_(client) { + explicit Impl( + const std::string& path, + Aws::S3::S3Client* client, + memory::MemoryPool* pool) + : client_(client), pool_(pool) { + VELOX_CHECK_NOT_NULL(client); + VELOX_CHECK_NOT_NULL(pool); getBucketAndKeyFromS3Path(path, bucket_, key_); - + currentPart_ = std::make_unique>(*pool_); + currentPart_->reserve(kPartUploadSize); // Check that the object doesn't exist, if it does throw an error. { Aws::S3::Model::HeadObjectRequest request; @@ -255,39 +262,35 @@ class S3WriteFile::Impl { uploadState_.id = outcome.GetResult().GetUploadId(); } - currentPart_.resize(kPartUploadSize); - closed_ = false; fileSize_ = 0; } // Appends data to the end of the file. void append(std::string_view data) { - VELOX_CHECK(!closed_, "File is closed"); - if (data.size() + currentPartSize_ >= kPartUploadSize) { + VELOX_CHECK(!closed(), "File is closed"); + if (data.size() + currentPart_->size() >= kPartUploadSize) { upload(data); } else { // Append to current part. - memcpy(currentPartBuffer() + currentPartSize_, data.data(), data.size()); - currentPartSize_ += data.size(); + currentPart_->unsafeAppend(data.data(), data.size()); } fileSize_ += data.size(); } // No-op. void flush() { - VELOX_CHECK(!closed_, "File is closed"); + VELOX_CHECK(!closed(), "File is closed"); /// currentPartSize must be less than kPartUploadSize since /// append() would have already flushed after reaching kUploadPartSize. - VELOX_CHECK_LE(currentPartSize_, kPartUploadSize); + VELOX_CHECK_LT(currentPart_->size(), kPartUploadSize); } // Complete the multipart upload and close the file. void close() { - if (closed_) { + if (closed()) { return; } - uploadPart({currentPartBuffer(), currentPartSize_}, true); - currentPartSize_ = 0; + uploadPart({currentPart_->data(), currentPart_->size()}, true); VELOX_CHECK_EQ(uploadState_.partNumber, uploadState_.completedParts.size()); // Complete the multipart upload. { @@ -303,7 +306,7 @@ class S3WriteFile::Impl { VELOX_CHECK_AWS_OUTCOME( outcome, "Failed to complete multiple part upload", bucket_, key_); } - closed_ = true; + currentPart_->clear(); } // Current file size, i.e. the sum of all previous appends. @@ -320,6 +323,10 @@ class S3WriteFile::Impl { static constexpr const char* kApplicationOctetStream = "application/octet-stream"; + bool closed() const { + return (currentPart_->capacity() == 0); + } + // Holds state for the multipart upload. struct UploadState { Aws::Vector completedParts; @@ -329,16 +336,15 @@ class S3WriteFile::Impl { UploadState uploadState_; // Data can be smaller or larger than the kPartUploadSize. - // Complete the currentPart_ and chunk the remaining data. - // Stash the remaining in the current part. + // Complete the currentPart_ and upload kPartUploadSize chunks of data. + // Save the remaining into currentPart_. void upload(const std::string_view data) { auto dataPtr = data.data(); auto dataSize = data.size(); // Fill-up the remaining currentPart_. - auto remainingBufferSize = kPartUploadSize - currentPartSize_; - memcpy( - currentPartBuffer() + currentPartSize_, dataPtr, remainingBufferSize); - uploadPart({currentPartBuffer(), kPartUploadSize}); + auto remainingBufferSize = currentPart_->capacity() - currentPart_->size(); + currentPart_->unsafeAppend(dataPtr, remainingBufferSize); + uploadPart({currentPart_->data(), currentPart_->size()}); dataPtr += remainingBufferSize; dataSize -= remainingBufferSize; while (dataSize > kPartUploadSize) { @@ -346,9 +352,8 @@ class S3WriteFile::Impl { dataPtr += kPartUploadSize; dataSize -= kPartUploadSize; } - // stash the remaining in currentPart; - memcpy(currentPartBuffer(), dataPtr, dataSize); - currentPartSize_ = dataSize; + // Stash the remaining at the beginning of currentPart. + currentPart_->unsafeAppend(0, dataPtr, dataSize); } void uploadPart(const std::string_view part, bool isLast = false) { @@ -377,22 +382,19 @@ class S3WriteFile::Impl { } } - char* currentPartBuffer() { - return currentPart_.data(); - } - - // TODO: Pass a MemoryPool to S3WriteFile use a MemorySink. - std::vector currentPart_; Aws::S3::S3Client* client_; + memory::MemoryPool* pool_; + std::unique_ptr> currentPart_; std::string bucket_; std::string key_; size_t fileSize_ = -1; - uint32_t currentPartSize_ = 0; - bool closed_ = true; }; -S3WriteFile::S3WriteFile(const std::string& path, Aws::S3::S3Client* client) { - impl_ = std::make_shared(path, client); +S3WriteFile::S3WriteFile( + const std::string& path, + Aws::S3::S3Client* client, + memory::MemoryPool* pool) { + impl_ = std::make_shared(path, client, pool); } void S3WriteFile::append(std::string_view data) { @@ -629,9 +631,10 @@ std::unique_ptr S3FileSystem::openFileForRead( std::unique_ptr S3FileSystem::openFileForWrite( std::string_view path, - const FileOptions& /*unused*/) { + const FileOptions& options) { const auto file = s3Path(path); - auto s3file = std::make_unique(file, impl_->s3Client()); + auto s3file = + std::make_unique(file, impl_->s3Client(), options.pool); return s3file; } diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h index 3b49eda32b28d..4240451ea2caa 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h @@ -39,7 +39,7 @@ class S3FileSystem : public FileSystem { std::unique_ptr openFileForWrite( std::string_view path, - const FileOptions& options = {}) override; + const FileOptions& options) override; void remove(std::string_view path) override { VELOX_UNSUPPORTED("remove for S3 not implemented"); diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h index f7d9f4102e6a2..dd2661fad3629 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h @@ -17,6 +17,7 @@ #pragma once #include "velox/common/file/File.h" +#include "velox/common/memory/MemoryPool.h" namespace Aws::S3 { class S3Client; @@ -46,7 +47,10 @@ namespace facebook::velox::filesystems { /// TODO: Implement retry on failure. class S3WriteFile : public WriteFile { public: - S3WriteFile(const std::string& path, Aws::S3::S3Client* client); + S3WriteFile( + const std::string& path, + Aws::S3::S3Client* client, + memory::MemoryPool* pool); /// Appends data to the end of the file. /// Uploads a part on reaching part size limit. diff --git a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp index 58f11509b03e1..d74ba33d2c6ae 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/tests/S3FileSystemTest.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ +#include "velox/common/memory/Memory.h" #include "velox/connectors/hive/storage_adapters/s3fs/S3WriteFile.h" #include "velox/connectors/hive/storage_adapters/s3fs/tests/S3Test.h" @@ -186,7 +187,8 @@ TEST_F(S3FileSystemTest, writeFileAndRead) { auto hiveConfig = minioServer_->hiveConfig(); filesystems::S3FileSystem s3fs(hiveConfig); - auto writeFile = s3fs.openFileForWrite(s3File); + auto pool = memory::defaultMemoryManager().addLeafPool("S3FileSystemTest"); + auto writeFile = s3fs.openFileForWrite(s3File, {{}, pool.get()}); auto s3WriteFile = dynamic_cast(writeFile.get()); std::string dataContent = "Dance me to your beauty with a burning violin" diff --git a/velox/dwio/common/DataBuffer.h b/velox/dwio/common/DataBuffer.h index 219406d81d19e..b94336e10a03a 100644 --- a/velox/dwio/common/DataBuffer.h +++ b/velox/dwio/common/DataBuffer.h @@ -177,6 +177,13 @@ class DataBuffer { size_ = (offset + items); } + void unsafeAppend(const T* src, uint64_t items) { + if (FOLLY_LIKELY(items > 0)) { + std::memcpy(data() + size_, src, sizeInBytes(items)); + size_ += items; + } + } + void unsafeAppend(T value) { buf_[size_++] = value; }