diff --git a/.github/workflows/velox_be.yml b/.github/workflows/velox_be.yml index bebd3b3c55f5..84e2e51566f6 100644 --- a/.github/workflows/velox_be.yml +++ b/.github/workflows/velox_be.yml @@ -79,11 +79,11 @@ jobs: cd /opt/gluten && \ mvn clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Prss -DargLine="-Dspark.test.home=/opt/spark322" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,io.glutenproject.tags.UDFTest,io.glutenproject.tags.SkipTestTags && \ mvn test -Pspark-3.2 -Pbackends-velox -DtagsToExclude=None -DtagsToInclude=io.glutenproject.tags.UDFTest' -# Cpp micro benchmarks will use generated files from unit test in backends-velox module. -# - name: Run micro benchmarks -# run: | -# docker exec ubuntu2004-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/cpp/build/velox/benchmarks && \ -# ./generic_benchmark --with-shuffle --threads 1 --iterations 1' + # Cpp micro benchmarks will use generated files from unit test in backends-velox module. + - name: Run micro benchmarks + run: | + docker exec ubuntu2004-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/cpp/build/velox/benchmarks && \ + ./generic_benchmark --with-shuffle --threads 1 --iterations 1' - name: Exit docker container if: ${{ always() }} run: | diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index c2e5462bc4ab..6e1226b83737 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -49,6 +49,7 @@ option(ENABLE_QAT "Enable QAT for de/compression" OFF) option(ENABLE_IAA "Enable IAA for de/compression" OFF) option(ENABLE_S3 "Enable S3" OFF) option(ENABLE_HDFS "Enable HDFS" OFF) +option(ENABLE_ORC "Enable ORC" OFF) set(root_directory ${PROJECT_BINARY_DIR}) get_filename_component(GLUTEN_HOME ${CMAKE_SOURCE_DIR} DIRECTORY) @@ -187,6 +188,10 @@ if(ENABLE_IAA) add_definitions(-DGLUTEN_ENABLE_IAA) endif() +if(ENABLE_ORC) + add_definitions(-DGLUTEN_ENABLE_ORC) +endif() + # # Subdirectories # diff --git a/cpp/velox/benchmarks/BatchIteratorWrapper.h b/cpp/velox/benchmarks/BatchIteratorWrapper.h deleted file mode 100644 index 2539519c1a69..000000000000 --- a/cpp/velox/benchmarks/BatchIteratorWrapper.h +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include "BenchmarkUtils.h" - -#include "compute/ResultIterator.h" -#include "memory/ColumnarBatch.h" -#include "memory/ColumnarBatchIterator.h" -#include "utils/DebugOut.h" - -#include -#include -#include -#include -#include -#include - -namespace gluten { - -using GetInputFunc = std::shared_ptr(const std::string&); - -class BatchIterator : public ColumnarBatchIterator { - public: - explicit BatchIterator(const std::string& path) : path_(path) {} - - virtual ~BatchIterator() = default; - - virtual void createReader() = 0; - - virtual std::shared_ptr getSchema() = 0; - - int64_t getCollectBatchTime() const { - return collectBatchTime_; - } - - protected: - int64_t collectBatchTime_ = 0; - std::string path_; -}; - -class ParquetBatchIterator : public BatchIterator { - public: - explicit ParquetBatchIterator(const std::string& path) : BatchIterator(getExampleFilePath(path)) {} - - void createReader() override { - parquet::ArrowReaderProperties properties = parquet::default_arrow_reader_properties(); - GLUTEN_THROW_NOT_OK(parquet::arrow::FileReader::Make( - arrow::default_memory_pool(), parquet::ParquetFileReader::OpenFile(path_), properties, &fileReader_)); - GLUTEN_THROW_NOT_OK( - fileReader_->GetRecordBatchReader(arrow::internal::Iota(fileReader_->num_row_groups()), &recordBatchReader_)); - - auto schema = recordBatchReader_->schema(); - std::cout << "schema:\n" << schema->ToString() << std::endl; - } - - std::shared_ptr getSchema() override { - return recordBatchReader_->schema(); - } - - protected: - std::unique_ptr fileReader_; - std::shared_ptr recordBatchReader_; -}; - -class OrcBatchIterator : public BatchIterator { - public: - explicit OrcBatchIterator(const std::string& path) : BatchIterator(path) {} - - void createReader() override { - // Open File - auto input = arrow::io::ReadableFile::Open(path_); - GLUTEN_THROW_NOT_OK(input); - - // Open ORC File Reader - auto maybeReader = arrow::adapters::orc::ORCFileReader::Open(*input, arrow::default_memory_pool()); - GLUTEN_THROW_NOT_OK(maybeReader); - fileReader_.reset((*maybeReader).release()); - - // get record batch Reader - auto recordBatchReader = fileReader_->GetRecordBatchReader(4096, std::vector()); - GLUTEN_THROW_NOT_OK(recordBatchReader); - recordBatchReader_ = *recordBatchReader; - } - - std::shared_ptr getSchema() override { - auto schema = fileReader_->ReadSchema(); - GLUTEN_THROW_NOT_OK(schema); - return *schema; - } - - protected: - std::unique_ptr fileReader_; - std::shared_ptr recordBatchReader_; -}; - -} // namespace gluten diff --git a/cpp/velox/benchmarks/BatchStreamIterator.h b/cpp/velox/benchmarks/BatchStreamIterator.h deleted file mode 100644 index 44f35384978e..000000000000 --- a/cpp/velox/benchmarks/BatchStreamIterator.h +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include "BatchIteratorWrapper.h" - -namespace gluten { - -class ParquetBatchStreamIterator final : public ParquetBatchIterator { - public: - explicit ParquetBatchStreamIterator(const std::string& path) : ParquetBatchIterator(path) { - createReader(); - DEBUG_OUT << "ParquetBatchStreamIterator open file: " << path << std::endl; - } - - std::shared_ptr next() override { - auto startTime = std::chrono::steady_clock::now(); - GLUTEN_ASSIGN_OR_THROW(auto batch, recordBatchReader_->Next()); - DEBUG_OUT << "ParquetBatchStreamIterator get a batch, num rows: " << (batch ? batch->num_rows() : 0) << std::endl; - collectBatchTime_ += - std::chrono::duration_cast(std::chrono::steady_clock::now() - startTime).count(); - if (batch == nullptr) { - return nullptr; - } - return convertBatch(std::make_shared(batch)); - } -}; - -inline std::shared_ptr getParquetInputFromBatchStream(const std::string& path) { - return std::make_shared(std::make_unique(path)); -} - -class OrcBatchStreamIterator final : public OrcBatchIterator { - public: - explicit OrcBatchStreamIterator(const std::string& path) : OrcBatchIterator(path) { - createReader(); - } - - std::shared_ptr next() override { - auto startTime = std::chrono::steady_clock::now(); - GLUTEN_ASSIGN_OR_THROW(auto batch, recordBatchReader_->Next()); - DEBUG_OUT << "OrcBatchStreamIterator get a batch, num rows: " << (batch ? batch->num_rows() : 0) << std::endl; - collectBatchTime_ += - std::chrono::duration_cast(std::chrono::steady_clock::now() - startTime).count(); - if (batch == nullptr) { - return nullptr; - } - return convertBatch(std::make_shared(batch)); - } -}; - -inline std::shared_ptr getOrcInputFromBatchStream(const std::string& path) { - return std::make_shared(std::make_unique(path)); -} - -} // namespace gluten diff --git a/cpp/velox/benchmarks/CMakeLists.txt b/cpp/velox/benchmarks/CMakeLists.txt index ebdb08364c8f..bc1782337754 100644 --- a/cpp/velox/benchmarks/CMakeLists.txt +++ b/cpp/velox/benchmarks/CMakeLists.txt @@ -15,19 +15,21 @@ find_arrow_lib(${PARQUET_LIB_NAME}) +set(VELOX_BENCHMARK_COMMON_SRCS common/FileReaderIterator.cc common/BenchmarkUtils.cc) +add_library(velox_benchmark_common STATIC ${VELOX_BENCHMARK_COMMON_SRCS}) +target_include_directories(velox_benchmark_common PUBLIC ${CMAKE_SOURCE_DIR}/velox ${CMAKE_SOURCE_DIR}/core) +target_link_libraries(velox_benchmark_common PUBLIC Arrow::parquet velox benchmark::benchmark gflags simdjson) + function(add_velox_benchmark BM_EXEC BM_FILE) - add_executable(${BM_EXEC} ${BM_FILE} BenchmarkUtils.cc) - target_include_directories(${BM_EXEC} PRIVATE ${CMAKE_SOURCE_DIR}/velox ${CMAKE_SOURCE_DIR}/src) - target_link_libraries(${BM_EXEC} gflags Arrow::parquet velox benchmark::benchmark simdjson) + add_executable(${BM_EXEC} ${BM_FILE}) + target_link_libraries(${BM_EXEC} PRIVATE velox_benchmark_common) endfunction() # Query benchmark add_velox_benchmark(query_benchmark QueryBenchmark.cc) # Generic benchmark -# TODO: This benchmark generates ORC file with Arrow ORC file writer. -# We need to compile Arrow with -DARROW_ORC first in Velox. -# add_velox_benchmark(generic_benchmark GenericBenchmark.cc) +add_velox_benchmark(generic_benchmark GenericBenchmark.cc) add_velox_benchmark(columnar_to_row_benchmark ColumnarToRowBenchmark.cc) @@ -35,3 +37,6 @@ add_velox_benchmark(parquet_write_benchmark ParquetWriteBenchmark.cc) add_velox_benchmark(shuffle_split_benchmark ShuffleSplitBenchmark.cc) +if(ENABLE_ORC) + add_velox_benchmark(orc_converter exec/OrcConverter.cc) +endif() diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc b/cpp/velox/benchmarks/GenericBenchmark.cc index 48ca2b057794..b5a5b4a2bf11 100644 --- a/cpp/velox/benchmarks/GenericBenchmark.cc +++ b/cpp/velox/benchmarks/GenericBenchmark.cc @@ -24,9 +24,8 @@ #include #include -#include "BatchStreamIterator.h" -#include "BatchVectorIterator.h" -#include "BenchmarkUtils.h" +#include "benchmarks/common/BenchmarkUtils.h" +#include "benchmarks/common/FileReaderIterator.h" #include "compute/VeloxExecutionCtx.h" #include "compute/VeloxPlanConverter.h" #include "config/GlutenConfig.h" @@ -40,7 +39,6 @@ using namespace gluten; namespace { DEFINE_bool(skip_input, false, "Skip specifying input files."); -DEFINE_bool(gen_orc_input, false, "Generate orc files from parquet as input files."); DEFINE_bool(with_shuffle, false, "Add shuffle split at end."); DEFINE_string(partitioning, "rr", "Short partitioning name. Valid options are rr, hash, range, single"); DEFINE_bool(zstd, false, "Use ZSTD as shuffle compression codec"); @@ -49,9 +47,6 @@ DEFINE_bool(qat_zstd, false, "Use QAT ZSTD as shuffle compression codec"); DEFINE_bool(iaa_gzip, false, "Use IAA GZIP as shuffle compression codec"); DEFINE_int32(shuffle_partitions, 200, "Number of shuffle split (reducer) partitions"); -static const std::string kParquetSuffix = ".parquet"; -static const std::string kOrcSuffix = ".orc"; - struct WriterMetrics { int64_t splitTime; int64_t evictTime; @@ -118,7 +113,7 @@ auto BM_Generic = [](::benchmark::State& state, const std::string& substraitJsonFile, const std::vector& inputFiles, const std::unordered_map& conf, - GetInputFunc* getInputIterator) { + FileReaderType readerType) { // Pin each threads to different CPU# starting from 0 or --cpu. if (FLAGS_cpu != -1) { setCpu(FLAGS_cpu + state.thread_index()); @@ -135,15 +130,17 @@ auto BM_Generic = [](::benchmark::State& state, for (auto _ : state) { std::vector> inputIters; - std::vector inputItersRaw; + std::vector inputItersRaw; if (!inputFiles.empty()) { - std::transform(inputFiles.cbegin(), inputFiles.cend(), std::back_inserter(inputIters), getInputIterator); + for (const auto& input : inputFiles) { + inputIters.push_back(getInputIteratorFromFileReader(input, readerType)); + } std::transform( inputIters.begin(), inputIters.end(), std::back_inserter(inputItersRaw), [](std::shared_ptr iter) { - return static_cast(iter->getInputIter()); + return static_cast(iter->getInputIter()); }); } @@ -197,7 +194,7 @@ auto BM_Generic = [](::benchmark::State& state, } collectBatchTime += - std::accumulate(inputItersRaw.begin(), inputItersRaw.end(), 0, [](int64_t sum, BatchIterator* iter) { + std::accumulate(inputItersRaw.begin(), inputItersRaw.end(), 0, [](int64_t sum, FileReaderIterator* iter) { return sum + iter->getCollectBatchTime(); }); @@ -226,80 +223,6 @@ auto BM_Generic = [](::benchmark::State& state, writerMetrics.compressTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); }; -class OrcFileGuard { - public: - explicit OrcFileGuard(const std::vector& inputFiles) { - orcFiles_.resize(inputFiles.size()); - for (auto i = 0; i != inputFiles.size(); ++i) { - GLUTEN_ASSIGN_OR_THROW(orcFiles_[i], createOrcFile(inputFiles[i])); - } - } - - ~OrcFileGuard() { - for (auto& x : orcFiles_) { - std::filesystem::remove(x); - } - } - - const std::vector& getOrcFiles() { - return orcFiles_; - } - - private: - arrow::Result createOrcFile(const std::string& inputFile) { - ParquetBatchStreamIterator parquetIterator(inputFile); - - std::string outputFile = inputFile; - // Get the filename. - auto pos = inputFile.find_last_of("/"); - if (pos != std::string::npos) { - outputFile = inputFile.substr(pos + 1); - } - // If any suffix is found, replace it with ".orc" - pos = outputFile.find_first_of("."); - if (pos != std::string::npos) { - outputFile = outputFile.substr(0, pos) + kOrcSuffix; - } else { - return arrow::Status::Invalid("Invalid input file: " + inputFile); - } - outputFile = std::filesystem::current_path().string() + "/" + outputFile; - - std::shared_ptr outputStream; - ARROW_ASSIGN_OR_RAISE(outputStream, arrow::io::FileOutputStream::Open(outputFile)); - - auto writerOptions = arrow::adapters::orc::WriteOptions(); - auto maybeWriter = arrow::adapters::orc::ORCFileWriter::Open(outputStream.get(), writerOptions); - GLUTEN_THROW_NOT_OK(maybeWriter); - auto& writer = *maybeWriter; - - while (true) { - // 1. read from Parquet - auto cb = parquetIterator.next(); - if (cb == nullptr) { - break; - } - - auto arrowColumnarBatch = std::dynamic_pointer_cast(cb); - auto recordBatch = arrowColumnarBatch->getRecordBatch(); - - // 2. write to Orc - if (!(writer->Write(*recordBatch)).ok()) { - return arrow::Status::IOError("Write failed"); - } - } - - if (!(writer->Close()).ok()) { - return arrow::Status::IOError("Close failed"); - } - - std::cout << "Created orc file: " << outputFile << std::endl; - - return outputFile; - } - - std::vector orcFiles_; -}; - int main(int argc, char** argv) { ::benchmark::Initialize(&argc, argv); gflags::ParseCommandLineFlags(&argc, &argv, true); @@ -307,7 +230,6 @@ int main(int argc, char** argv) { std::string substraitJsonFile; std::vector inputFiles; std::unordered_map conf; - std::shared_ptr orcFileGuard; conf.insert({gluten::kSparkBatchSize, FLAGS_batch_size}); initVeloxBackend(conf); @@ -339,19 +261,19 @@ int main(int argc, char** argv) { std::exit(EXIT_FAILURE); } -#define GENERIC_BENCHMARK(NAME, FUNC) \ - do { \ - auto* bm = ::benchmark::RegisterBenchmark(NAME, BM_Generic, substraitJsonFile, inputFiles, conf, FUNC) \ - ->MeasureProcessCPUTime() \ - ->UseRealTime(); \ - if (FLAGS_threads > 0) { \ - bm->Threads(FLAGS_threads); \ - } else { \ - bm->ThreadRange(1, std::thread::hardware_concurrency()); \ - } \ - if (FLAGS_iterations > 0) { \ - bm->Iterations(FLAGS_iterations); \ - } \ +#define GENERIC_BENCHMARK(NAME, READER_TYPE) \ + do { \ + auto* bm = ::benchmark::RegisterBenchmark(NAME, BM_Generic, substraitJsonFile, inputFiles, conf, READER_TYPE) \ + ->MeasureProcessCPUTime() \ + ->UseRealTime(); \ + if (FLAGS_threads > 0) { \ + bm->Threads(FLAGS_threads); \ + } else { \ + bm->ThreadRange(1, std::thread::hardware_concurrency()); \ + } \ + if (FLAGS_iterations > 0) { \ + bm->Iterations(FLAGS_iterations); \ + } \ } while (0) #if 0 @@ -364,15 +286,10 @@ int main(int argc, char** argv) { #endif if (FLAGS_skip_input) { - GENERIC_BENCHMARK("SkipInput", nullptr); - } else if (FLAGS_gen_orc_input) { - orcFileGuard = std::make_shared(inputFiles); - inputFiles = orcFileGuard->getOrcFiles(); - GENERIC_BENCHMARK("OrcInputFromBatchVector", getOrcInputFromBatchVector); - GENERIC_BENCHMARK("OrcInputFromBatchStream", getOrcInputFromBatchStream); + GENERIC_BENCHMARK("SkipInput", FileReaderType::kNone); } else { - GENERIC_BENCHMARK("ParquetInputFromBatchVector", getParquetInputFromBatchVector); - GENERIC_BENCHMARK("ParquetInputFromBatchStream", getParquetInputFromBatchStream); + GENERIC_BENCHMARK("InputFromBatchVector", FileReaderType::kBuffered); + GENERIC_BENCHMARK("InputFromBatchStream", FileReaderType::kStream); } ::benchmark::RunSpecifiedBenchmarks(); diff --git a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc index 33e88267e039..3406519bad56 100644 --- a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc +++ b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc @@ -33,7 +33,7 @@ #include -#include "BenchmarkUtils.h" +#include "benchmarks/common/BenchmarkUtils.h" #include "compute/VeloxExecutionCtx.h" #include "memory/ArrowMemoryPool.h" #include "memory/ColumnarBatch.h" diff --git a/cpp/velox/benchmarks/QueryBenchmark.cc b/cpp/velox/benchmarks/QueryBenchmark.cc index 7d26f08c591a..2c9bc0ce4930 100644 --- a/cpp/velox/benchmarks/QueryBenchmark.cc +++ b/cpp/velox/benchmarks/QueryBenchmark.cc @@ -18,7 +18,7 @@ #include #include -#include "BenchmarkUtils.h" +#include "benchmarks/common/BenchmarkUtils.h" #include "compute/VeloxPlanConverter.h" #include "memory/VeloxMemoryManager.h" #include "utils/VeloxArrowUtils.h" diff --git a/cpp/velox/benchmarks/ShuffleSplitBenchmark.cc b/cpp/velox/benchmarks/ShuffleSplitBenchmark.cc index d5b718dbcb5d..9e1325e5fd82 100644 --- a/cpp/velox/benchmarks/ShuffleSplitBenchmark.cc +++ b/cpp/velox/benchmarks/ShuffleSplitBenchmark.cc @@ -28,7 +28,7 @@ #include -#include "benchmarks/BenchmarkUtils.h" +#include "benchmarks/common/BenchmarkUtils.h" #include "memory/ColumnarBatch.h" #include "shuffle/LocalPartitionWriter.h" #include "shuffle/VeloxShuffleWriter.h" diff --git a/cpp/velox/benchmarks/BenchmarkUtils.cc b/cpp/velox/benchmarks/common/BenchmarkUtils.cc similarity index 100% rename from cpp/velox/benchmarks/BenchmarkUtils.cc rename to cpp/velox/benchmarks/common/BenchmarkUtils.cc diff --git a/cpp/velox/benchmarks/BenchmarkUtils.h b/cpp/velox/benchmarks/common/BenchmarkUtils.h similarity index 99% rename from cpp/velox/benchmarks/BenchmarkUtils.h rename to cpp/velox/benchmarks/common/BenchmarkUtils.h index 8b8fd84835ad..75706f24d223 100644 --- a/cpp/velox/benchmarks/BenchmarkUtils.h +++ b/cpp/velox/benchmarks/common/BenchmarkUtils.h @@ -17,12 +17,12 @@ #pragma once -#include #include #include #include #include #include +#include "benchmark/benchmark.h" #include "substrait/SubstraitToVeloxPlan.h" #include "compute/ProtobufUtils.h" diff --git a/cpp/velox/benchmarks/common/FileReaderIterator.cc b/cpp/velox/benchmarks/common/FileReaderIterator.cc new file mode 100644 index 000000000000..26985c7f03c4 --- /dev/null +++ b/cpp/velox/benchmarks/common/FileReaderIterator.cc @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "FileReaderIterator.h" +#include "benchmarks/common/ParquetReaderIterator.h" +#ifdef GLUTEN_ENABLE_ORC +#include "benchmarks/common/OrcReaderIterator.h" +#endif + +std::shared_ptr gluten::getInputIteratorFromFileReader( + const std::string& path, + gluten::FileReaderType readerType) { + std::filesystem::path input{path}; + auto suffix = input.extension().string(); + if (suffix == kParquetSuffix) { + if (readerType == FileReaderType::kStream) { + return std::make_shared(std::make_unique(path)); + } + if (readerType == FileReaderType::kBuffered) { + return std::make_shared(std::make_unique(path)); + } + } else if (suffix == kOrcSuffix) { +#ifdef GLUTEN_ENABLE_ORC + if (readerType == FileReaderType::kStream) { + return std::make_shared(std::make_unique(path)); + } + if (readerType == FileReaderType::kBuffered) { + return std::make_shared(std::make_unique(path)); + } +#endif + } + throw new GlutenException("Unreachable."); +} diff --git a/cpp/velox/benchmarks/common/FileReaderIterator.h b/cpp/velox/benchmarks/common/FileReaderIterator.h new file mode 100644 index 000000000000..c41573f93988 --- /dev/null +++ b/cpp/velox/benchmarks/common/FileReaderIterator.h @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "BenchmarkUtils.h" + +#include "compute/ResultIterator.h" +#include "memory/ColumnarBatch.h" +#include "memory/ColumnarBatchIterator.h" +#include "utils/DebugOut.h" + +#include +#include +#include +#include + +namespace gluten { + +static const std::string kOrcSuffix = ".orc"; +static const std::string kParquetSuffix = ".parquet"; + +enum FileReaderType { kBuffered, kStream, kNone }; + +class FileReaderIterator : public ColumnarBatchIterator { + public: + explicit FileReaderIterator(const std::string& path) : path_(path) {} + + virtual ~FileReaderIterator() = default; + + virtual void createReader() = 0; + + virtual std::shared_ptr getSchema() = 0; + + int64_t getCollectBatchTime() const { + return collectBatchTime_; + } + + protected: + int64_t collectBatchTime_ = 0; + std::string path_; +}; + +std::shared_ptr getInputIteratorFromFileReader( + const std::string& path, + FileReaderType readerType); + +} // namespace gluten diff --git a/cpp/velox/benchmarks/BatchVectorIterator.h b/cpp/velox/benchmarks/common/OrcReaderIterator.h similarity index 53% rename from cpp/velox/benchmarks/BatchVectorIterator.h rename to cpp/velox/benchmarks/common/OrcReaderIterator.h index 30152bb3d315..8bc2a50c9ec3 100644 --- a/cpp/velox/benchmarks/BatchVectorIterator.h +++ b/cpp/velox/benchmarks/common/OrcReaderIterator.h @@ -16,57 +16,70 @@ */ #pragma once -#include "BatchIteratorWrapper.h" +#include +#include "benchmarks/common/FileReaderIterator.h" namespace gluten { -class ParquetBatchVectorIterator final : public ParquetBatchIterator { +class OrcReaderIterator : public FileReaderIterator { public: - explicit ParquetBatchVectorIterator(const std::string& path) : ParquetBatchIterator(path) { - createReader(); - collectBatches(); + explicit OrcReaderIterator(const std::string& path) : FileReaderIterator(path) {} - iter_ = batches_.begin(); - DEBUG_OUT << "ParquetBatchVectorIterator open file: " << path << std::endl; - DEBUG_OUT << "Number of input batches: " << std::to_string(batches_.size()) << std::endl; - if (iter_ != batches_.cend()) { - DEBUG_OUT << "columns: " << (*iter_)->num_columns() << std::endl; - DEBUG_OUT << "rows: " << (*iter_)->num_rows() << std::endl; - } - } + void createReader() override { + // Open File + auto input = arrow::io::ReadableFile::Open(path_); + GLUTEN_THROW_NOT_OK(input); - std::shared_ptr next() override { - if (iter_ == batches_.cend()) { - return nullptr; - } - return convertBatch(std::make_shared(*iter_++)); + // Open ORC File Reader + auto maybeReader = arrow::adapters::orc::ORCFileReader::Open(*input, arrow::default_memory_pool()); + GLUTEN_THROW_NOT_OK(maybeReader); + fileReader_.reset((*maybeReader).release()); + + // get record batch Reader + auto recordBatchReader = fileReader_->GetRecordBatchReader(4096, std::vector()); + GLUTEN_THROW_NOT_OK(recordBatchReader); + recordBatchReader_ = *recordBatchReader; } - private: - void collectBatches() { - auto startTime = std::chrono::steady_clock::now(); - GLUTEN_ASSIGN_OR_THROW(batches_, recordBatchReader_->ToRecordBatches()); - auto endTime = std::chrono::steady_clock::now(); - collectBatchTime_ += std::chrono::duration_cast(endTime - startTime).count(); + std::shared_ptr getSchema() override { + auto schema = fileReader_->ReadSchema(); + GLUTEN_THROW_NOT_OK(schema); + return *schema; } - arrow::RecordBatchVector batches_; - std::vector>::const_iterator iter_; + protected: + std::unique_ptr fileReader_; + std::shared_ptr recordBatchReader_; }; -inline std::shared_ptr getParquetInputFromBatchVector(const std::string& path) { - return std::make_shared(std::make_unique(path)); -} +class OrcStreamReaderIterator final : public OrcReaderIterator { + public: + explicit OrcStreamReaderIterator(const std::string& path) : OrcReaderIterator(path) { + createReader(); + } -class OrcBatchVectorIterator final : public OrcBatchIterator { + std::shared_ptr next() override { + auto startTime = std::chrono::steady_clock::now(); + GLUTEN_ASSIGN_OR_THROW(auto batch, recordBatchReader_->Next()); + DEBUG_OUT << "OrcStreamReaderIterator get a batch, num rows: " << (batch ? batch->num_rows() : 0) << std::endl; + collectBatchTime_ += + std::chrono::duration_cast(std::chrono::steady_clock::now() - startTime).count(); + if (batch == nullptr) { + return nullptr; + } + return convertBatch(std::make_shared(batch)); + } +}; + +class OrcBufferedReaderIterator final : public OrcReaderIterator { public: - explicit OrcBatchVectorIterator(const std::string& path) : OrcBatchIterator(path) { + explicit OrcBufferedReaderIterator(const std::string& path) : OrcReaderIterator(path) { createReader(); collectBatches(); iter_ = batches_.begin(); #ifdef GLUTEN_PRINT_DEBUG - DEBUG_OUT << "OrcBatchVectorIterator open file: " << path << std::endl; + DEBUG_OUT << "OrcBufferedReaderIterator open file: " << path << std::endl; DEBUG_OUT << "Number of input batches: " << std::to_string(batches_.size()) << std::endl; if (iter_ != batches_.cend()) { DEBUG_OUT << "columns: " << (*iter_)->num_columns() << std::endl; @@ -94,8 +107,4 @@ class OrcBatchVectorIterator final : public OrcBatchIterator { std::vector>::const_iterator iter_; }; -inline std::shared_ptr getOrcInputFromBatchVector(const std::string& path) { - return std::make_shared(std::make_unique(path)); -} - -} // namespace gluten +} // namespace gluten \ No newline at end of file diff --git a/cpp/velox/benchmarks/common/ParquetReaderIterator.h b/cpp/velox/benchmarks/common/ParquetReaderIterator.h new file mode 100644 index 000000000000..f6621719786a --- /dev/null +++ b/cpp/velox/benchmarks/common/ParquetReaderIterator.h @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "benchmarks/common/FileReaderIterator.h" +#include "utils/macros.h" + +#include + +namespace gluten { + +class ParquetReaderIterator : public FileReaderIterator { + public: + explicit ParquetReaderIterator(const std::string& path) : FileReaderIterator(getExampleFilePath(path)) {} + + void createReader() override { + parquet::ArrowReaderProperties properties = parquet::default_arrow_reader_properties(); + GLUTEN_THROW_NOT_OK(parquet::arrow::FileReader::Make( + arrow::default_memory_pool(), parquet::ParquetFileReader::OpenFile(path_), properties, &fileReader_)); + GLUTEN_THROW_NOT_OK( + fileReader_->GetRecordBatchReader(arrow::internal::Iota(fileReader_->num_row_groups()), &recordBatchReader_)); + + auto schema = recordBatchReader_->schema(); + std::cout << "schema:\n" << schema->ToString() << std::endl; + } + + std::shared_ptr getSchema() override { + return recordBatchReader_->schema(); + } + + protected: + std::unique_ptr fileReader_; + std::shared_ptr recordBatchReader_; +}; + +class ParquetStreamReaderIterator final : public ParquetReaderIterator { + public: + explicit ParquetStreamReaderIterator(const std::string& path) : ParquetReaderIterator(path) { + createReader(); + DEBUG_OUT << "ParquetStreamReaderIterator open file: " << path << std::endl; + } + + std::shared_ptr next() override { + auto startTime = std::chrono::steady_clock::now(); + GLUTEN_ASSIGN_OR_THROW(auto batch, recordBatchReader_->Next()); + DEBUG_OUT << "ParquetStreamReaderIterator get a batch, num rows: " << (batch ? batch->num_rows() : 0) << std::endl; + collectBatchTime_ += + std::chrono::duration_cast(std::chrono::steady_clock::now() - startTime).count(); + if (batch == nullptr) { + return nullptr; + } + return convertBatch(std::make_shared(batch)); + } +}; + +class ParquetBufferedReaderIterator final : public ParquetReaderIterator { + public: + explicit ParquetBufferedReaderIterator(const std::string& path) : ParquetReaderIterator(path) { + createReader(); + collectBatches(); + + iter_ = batches_.begin(); + DEBUG_OUT << "ParquetBufferedReaderIterator open file: " << path << std::endl; + DEBUG_OUT << "Number of input batches: " << std::to_string(batches_.size()) << std::endl; + if (iter_ != batches_.cend()) { + DEBUG_OUT << "columns: " << (*iter_)->num_columns() << std::endl; + DEBUG_OUT << "rows: " << (*iter_)->num_rows() << std::endl; + } + } + + std::shared_ptr next() override { + if (iter_ == batches_.cend()) { + return nullptr; + } + return convertBatch(std::make_shared(*iter_++)); + } + + private: + void collectBatches() { + auto startTime = std::chrono::steady_clock::now(); + GLUTEN_ASSIGN_OR_THROW(batches_, recordBatchReader_->ToRecordBatches()); + auto endTime = std::chrono::steady_clock::now(); + collectBatchTime_ += std::chrono::duration_cast(endTime - startTime).count(); + } + + arrow::RecordBatchVector batches_; + std::vector>::const_iterator iter_; +}; + +} // namespace gluten diff --git a/cpp/velox/benchmarks/exec/OrcConverter.cc b/cpp/velox/benchmarks/exec/OrcConverter.cc new file mode 100644 index 000000000000..b421ecca3b37 --- /dev/null +++ b/cpp/velox/benchmarks/exec/OrcConverter.cc @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "benchmarks/common/ParquetReaderIterator.h" + +namespace gluten { + +class OrcConverter final { + public: + explicit OrcConverter(const std::vector& inputFiles) : inputFiles_(inputFiles) { + orcFiles_.resize(inputFiles.size()); + } + + const std::vector& getOrcFiles() { + for (auto i = 0; i != inputFiles_.size(); ++i) { + GLUTEN_ASSIGN_OR_THROW(orcFiles_[i], createOrcFile(inputFiles_[i])); + } + return orcFiles_; + } + + private: + arrow::Result createOrcFile(const std::string& inputFile) { + ParquetStreamReaderIterator parquetIterator(inputFile); + + std::string outputFile = inputFile; + // Get the filename. + auto pos = inputFile.find_last_of("/"); + if (pos != std::string::npos) { + outputFile = inputFile.substr(pos + 1); + } + // If any suffix is found, replace it with ".orc" + pos = outputFile.find_first_of("."); + if (pos != std::string::npos) { + outputFile = outputFile.substr(0, pos) + kOrcSuffix; + } else { + return arrow::Status::Invalid("Invalid input file: " + inputFile); + } + outputFile = std::filesystem::current_path().string() + "/" + outputFile; + + std::shared_ptr outputStream; + ARROW_ASSIGN_OR_RAISE(outputStream, arrow::io::FileOutputStream::Open(outputFile)); + + auto writerOptions = arrow::adapters::orc::WriteOptions(); + auto maybeWriter = arrow::adapters::orc::ORCFileWriter::Open(outputStream.get(), writerOptions); + GLUTEN_THROW_NOT_OK(maybeWriter); + auto& writer = *maybeWriter; + + // Read from parquet and write to ORC. + while (auto cb = parquetIterator.next()) { + GLUTEN_ASSIGN_OR_THROW( + auto recordBatch, arrow::ImportRecordBatch(cb->exportArrowArray().get(), parquetIterator.getSchema())); + if (!(writer->Write(*recordBatch)).ok()) { + return arrow::Status::IOError("Write failed"); + } + } + + if (!(writer->Close()).ok()) { + return arrow::Status::IOError("Close failed"); + } + return outputFile; + } + + std::vector inputFiles_; + std::vector orcFiles_; +}; + +} // namespace gluten + +int main(int argc, char** argv) { + if (argc == 1) { + std::cout << "Please specify parquet files as input arguments." << std::endl; + exit(0); + } + + std::vector inputFiles; + for (auto i = 1; i < argc; ++i) { + const auto& file = argv[i]; + if (!std::filesystem::exists(file)) { + std::cout << file << " doesn't exist!" << std::endl; + exit(1); + } + inputFiles.emplace_back(argv[i]); + } + + auto orcConverter = std::make_shared(inputFiles); + auto orcFiles = orcConverter->getOrcFiles(); + std::cout << "Generated output files: " << std::endl; + for (const auto& file : orcFiles) { + std::cout << file << std::endl; + } + return 0; +} diff --git a/cpp/velox/tests/OrcTest.cc b/cpp/velox/tests/OrcTest.cc index 29d4fd3667a5..0861701cf060 100644 --- a/cpp/velox/tests/OrcTest.cc +++ b/cpp/velox/tests/OrcTest.cc @@ -17,7 +17,7 @@ #include "arrow/c/abi.h" #include "benchmarks/BatchStreamIterator.h" -#include "benchmarks/BenchmarkUtils.h" +#include "benchmarks/common/BenchmarkUtils.h" #include "utils/TestUtils.h"