diff --git a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc index dc3243695037..08e2126e22d1 100644 --- a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc +++ b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc @@ -268,6 +268,7 @@ class GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark : public GoogleBenchmar outputPath_ + "/" + fileName, veloxPool->addAggregateChild("writer_benchmark"), veloxPool->addLeafChild("s3_sink_pool"), + veloxPool->addLeafChild("gcs_sink_pool"), localSchema); veloxParquetDatasource->init(runtime->getConfMap()); diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index 02869701e866..66f0962a47d7 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -163,11 +163,12 @@ std::shared_ptr VeloxRuntime::createDatasource( std::shared_ptr schema) { static std::atomic_uint32_t id{0UL}; auto veloxPool = getAggregateVeloxPool(memoryManager)->addAggregateChild("datasource." + std::to_string(id++)); - // Pass a dedicate pool for S3 sink as can't share veloxPool + // Pass a dedicate pool for S3 and GCS sinks as can't share veloxPool // with parquet writer. auto s3SinkPool = getLeafVeloxPool(memoryManager); + auto gcsSinkPool = getLeafVeloxPool(memoryManager); - return std::make_shared(filePath, veloxPool, s3SinkPool, schema); + return std::make_shared(filePath, veloxPool, s3SinkPool, gcsSinkPool, schema); } std::shared_ptr VeloxRuntime::createShuffleReader( diff --git a/cpp/velox/operators/writer/VeloxParquetDatasource.cc b/cpp/velox/operators/writer/VeloxParquetDatasource.cc index c6621581b5ca..26c420316fd6 100644 --- a/cpp/velox/operators/writer/VeloxParquetDatasource.cc +++ b/cpp/velox/operators/writer/VeloxParquetDatasource.cc @@ -57,6 +57,16 @@ void VeloxParquetDatasource::init(const std::unordered_map(fileSystem.get()); + sink_ = std::make_unique( + gcsFileSystem->openFileForWrite(filePath_, {{}, gcsSinkPool_.get()}), filePath_); +#else + throw std::runtime_error( + "The write path is GCS path but the GCS haven't been enabled when writing parquet data in velox runtime!"); #endif } else if (strncmp(filePath_.c_str(), "hdfs:", 5) == 0) { #ifdef ENABLE_HDFS diff --git a/cpp/velox/operators/writer/VeloxParquetDatasource.h b/cpp/velox/operators/writer/VeloxParquetDatasource.h index 4a0f95bc9949..3daad8fe3952 100644 --- a/cpp/velox/operators/writer/VeloxParquetDatasource.h +++ b/cpp/velox/operators/writer/VeloxParquetDatasource.h @@ -36,6 +36,9 @@ #include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h" #include "velox/connectors/hive/storage_adapters/s3fs/S3Util.h" #endif +#ifdef ENABLE_GCS +#include "velox/connectors/hive/storage_adapters/gcs/GCSFileSystem.h" +#endif #ifdef ENABLE_HDFS #include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h" #include "velox/connectors/hive/storage_adapters/hdfs/HdfsUtil.h" @@ -54,12 +57,14 @@ class VeloxParquetDatasource final : public Datasource { const std::string& filePath, std::shared_ptr veloxPool, std::shared_ptr s3SinkPool, + std::shared_ptr gcsSinkPool, std::shared_ptr schema) : Datasource(filePath, schema), filePath_(filePath), schema_(schema), pool_(std::move(veloxPool)), - s3SinkPool_(std::move(s3SinkPool)) {} + s3SinkPool_(std::move(s3SinkPool)), + gcsSinkPool_(std::move(gcsSinkPool)) {} void init(const std::unordered_map& sparkConfs) override; void inspectSchema(struct ArrowSchema* out) override; @@ -92,6 +97,7 @@ class VeloxParquetDatasource final : public Datasource { std::shared_ptr parquetWriter_; std::shared_ptr pool_; std::shared_ptr s3SinkPool_; + std::shared_ptr gcsSinkPool_; std::unique_ptr sink_; };