diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index 0f347d133a046..5cc5b1ca9d151 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -265,7 +265,7 @@ macro(find_awssdk) endmacro() macro(find_gcssdk) - set (CMAKE_FIND_LIBRARY_SUFFIXES ".so") + set (CMAKE_FIND_LIBRARY_SUFFIXES ".a") find_package(google_cloud_cpp_storage REQUIRED) endmacro() diff --git a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc index dc32436950376..08e2126e22d11 100644 --- a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc +++ b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc @@ -268,6 +268,7 @@ class GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark : public GoogleBenchmar outputPath_ + "/" + fileName, veloxPool->addAggregateChild("writer_benchmark"), veloxPool->addLeafChild("s3_sink_pool"), + veloxPool->addLeafChild("gcs_sink_pool"), localSchema); veloxParquetDatasource->init(runtime->getConfMap()); diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index a25a74cb32682..4c2cf57ee98c1 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -162,11 +162,12 @@ std::shared_ptr VeloxRuntime::createDatasource( MemoryManager* memoryManager, std::shared_ptr schema) { auto veloxPool = getAggregateVeloxPool(memoryManager); - // Pass a dedicate pool for S3 sink as can't share veloxPool + // Pass dedicated pools for S3 and GCS sinks as can't share veloxPool // with parquet writer. auto s3SinkPool = getLeafVeloxPool(memoryManager); + auto gcsSinkPool = getLeafVeloxPool(memoryManager); - return std::make_shared(filePath, veloxPool, s3SinkPool, schema); + return std::make_shared(filePath, veloxPool, s3SinkPool, gcsSinkPool, schema); } std::shared_ptr VeloxRuntime::createShuffleReader( diff --git a/cpp/velox/operators/writer/VeloxParquetDatasource.cc b/cpp/velox/operators/writer/VeloxParquetDatasource.cc index 186338efc6e15..303610326024b 100644 --- a/cpp/velox/operators/writer/VeloxParquetDatasource.cc +++ b/cpp/velox/operators/writer/VeloxParquetDatasource.cc @@ -56,6 +56,16 @@ void VeloxParquetDatasource::init(const std::unordered_map(fileSystem.get()); + sink_ = std::make_unique( + gcsFileSystem->openFileForWrite(filePath_, {{}, gcsSinkPool_.get()}), filePath_); +#else + throw std::runtime_error( + "The write path is GCS path but the GCS haven't been enabled when writing parquet data in velox runtime!"); #endif } else if (strncmp(filePath_.c_str(), "hdfs:", 5) == 0) { #ifdef ENABLE_HDFS diff --git a/cpp/velox/operators/writer/VeloxParquetDatasource.h b/cpp/velox/operators/writer/VeloxParquetDatasource.h index e87e271e28b15..5aaeeba0eb49c 100644 --- a/cpp/velox/operators/writer/VeloxParquetDatasource.h +++ b/cpp/velox/operators/writer/VeloxParquetDatasource.h @@ -36,6 +36,9 @@ #include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h" #include "velox/connectors/hive/storage_adapters/s3fs/S3Util.h" #endif +#ifdef ENABLE_GCS +#include "velox/connectors/hive/storage_adapters/gcs/GCSFileSystem.h" +#endif #ifdef ENABLE_HDFS #include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h" #include "velox/connectors/hive/storage_adapters/hdfs/HdfsUtil.h" @@ -54,12 +57,14 @@ class VeloxParquetDatasource final : public Datasource { const std::string& filePath, std::shared_ptr veloxPool, std::shared_ptr s3SinkPool, + std::shared_ptr gcsSinkPool, std::shared_ptr schema) : Datasource(filePath, schema), filePath_(filePath), schema_(schema), pool_(std::move(veloxPool)), - s3SinkPool_(std::move(s3SinkPool)) {} + s3SinkPool_(std::move(s3SinkPool)), + gcsSinkPool_(std::move(gcsSinkPool)) {} void init(const std::unordered_map& sparkConfs) override; void inspectSchema(struct ArrowSchema* out) override; @@ -79,6 +84,7 @@ class VeloxParquetDatasource final : public Datasource { std::shared_ptr parquetWriter_; std::shared_ptr pool_; std::shared_ptr s3SinkPool_; + std::shared_ptr gcsSinkPool_; std::unique_ptr sink_; };