From 60ac5ab41661a1dc4e5615ec2b7be6b462bdfdbc Mon Sep 17 00:00:00 2001 From: Krishna Pai Date: Wed, 13 Nov 2024 11:51:12 -0800 Subject: [PATCH] Back out "Readd: [velox][PR] Add runner for local distributed execution" Summary: Backing out to unbreak OSS CI. The PR #11507 seems to have broken pyVelox builds. Backing out so we can investigate and fix. Original commit changeset: 4e52927c05ff Original Phabricator Diff: D65779177 Differential Revision: D65900074 --- velox/CMakeLists.txt | 1 - velox/connectors/hive/HiveConnectorSplit.h | 108 ------ velox/connectors/hive/tests/CMakeLists.txt | 1 - velox/connectors/hive/tests/HiveSplitTest.cpp | 64 ---- velox/exec/tests/TableScanTest.cpp | 30 +- velox/exec/tests/utils/CMakeLists.txt | 3 - velox/exec/tests/utils/Cursor.cpp | 20 -- velox/exec/tests/utils/Cursor.h | 2 - .../tests/utils/DistributedPlanBuilder.cpp | 120 ------- .../exec/tests/utils/DistributedPlanBuilder.h | 82 ----- .../exec/tests/utils/HiveConnectorTestBase.h | 106 +++++- .../exec/tests/utils/LocalRunnerTestBase.cpp | 105 ------ velox/exec/tests/utils/LocalRunnerTestBase.h | 95 ------ velox/exec/tests/utils/PlanBuilder.h | 36 +- velox/exec/tests/utils/QueryAssertions.cpp | 12 - velox/exec/tests/utils/QueryAssertions.h | 5 - velox/runner/CMakeLists.txt | 27 -- velox/runner/LocalRunner.cpp | 291 ---------------- velox/runner/LocalRunner.h | 109 ------ velox/runner/LocalSchema.cpp | 321 ------------------ velox/runner/LocalSchema.h | 115 ------- velox/runner/MultiFragmentPlan.h | 96 ------ velox/runner/Runner.cpp | 59 ---- velox/runner/Runner.h | 101 ------ velox/runner/Schema.h | 168 --------- velox/runner/tests/CMakeLists.txt | 25 -- velox/runner/tests/LocalRunnerTest.cpp | 175 ---------- 27 files changed, 115 insertions(+), 2162 deletions(-) delete mode 100644 velox/connectors/hive/tests/HiveSplitTest.cpp delete mode 100644 velox/exec/tests/utils/DistributedPlanBuilder.cpp delete mode 100644 velox/exec/tests/utils/DistributedPlanBuilder.h delete mode 100644 velox/exec/tests/utils/LocalRunnerTestBase.cpp delete mode 100644 velox/exec/tests/utils/LocalRunnerTestBase.h delete mode 100644 velox/runner/CMakeLists.txt delete mode 100644 velox/runner/LocalRunner.cpp delete mode 100644 velox/runner/LocalRunner.h delete mode 100644 velox/runner/LocalSchema.cpp delete mode 100644 velox/runner/LocalSchema.h delete mode 100644 velox/runner/MultiFragmentPlan.h delete mode 100644 velox/runner/Runner.cpp delete mode 100644 velox/runner/Runner.h delete mode 100644 velox/runner/Schema.h delete mode 100644 velox/runner/tests/CMakeLists.txt delete mode 100644 velox/runner/tests/LocalRunnerTest.cpp diff --git a/velox/CMakeLists.txt b/velox/CMakeLists.txt index f9b66d3e3ed9..06ae8bf1c053 100644 --- a/velox/CMakeLists.txt +++ b/velox/CMakeLists.txt @@ -58,7 +58,6 @@ add_subdirectory(connectors) if(${VELOX_ENABLE_EXEC}) add_subdirectory(exec) - add_subdirectory(runner) endif() if(${VELOX_ENABLE_DUCKDB}) diff --git a/velox/connectors/hive/HiveConnectorSplit.h b/velox/connectors/hive/HiveConnectorSplit.h index 99bca9b309cf..a06ffa668e11 100644 --- a/velox/connectors/hive/HiveConnectorSplit.h +++ b/velox/connectors/hive/HiveConnectorSplit.h @@ -111,112 +111,4 @@ struct HiveConnectorSplit : public connector::ConnectorSplit { static void registerSerDe(); }; -class HiveConnectorSplitBuilder { - public: - explicit HiveConnectorSplitBuilder(std::string filePath) - : filePath_{std::move(filePath)} { - infoColumns_["$path"] = filePath_; - } - - HiveConnectorSplitBuilder& start(uint64_t start) { - start_ = start; - return *this; - } - - HiveConnectorSplitBuilder& length(uint64_t length) { - length_ = length; - return *this; - } - - HiveConnectorSplitBuilder& splitWeight(int64_t splitWeight) { - splitWeight_ = splitWeight; - return *this; - } - - HiveConnectorSplitBuilder& fileFormat(dwio::common::FileFormat format) { - fileFormat_ = format; - return *this; - } - - HiveConnectorSplitBuilder& infoColumn( - const std::string& name, - const std::string& value) { - infoColumns_.emplace(std::move(name), std::move(value)); - return *this; - } - - HiveConnectorSplitBuilder& partitionKey( - std::string name, - std::optional value) { - partitionKeys_.emplace(std::move(name), std::move(value)); - return *this; - } - - HiveConnectorSplitBuilder& tableBucketNumber(int32_t bucket) { - tableBucketNumber_ = bucket; - infoColumns_["$bucket"] = std::to_string(bucket); - return *this; - } - - HiveConnectorSplitBuilder& customSplitInfo( - const std::unordered_map& customSplitInfo) { - customSplitInfo_ = customSplitInfo; - return *this; - } - - HiveConnectorSplitBuilder& extraFileInfo( - const std::shared_ptr& extraFileInfo) { - extraFileInfo_ = extraFileInfo; - return *this; - } - - HiveConnectorSplitBuilder& serdeParameters( - const std::unordered_map& serdeParameters) { - serdeParameters_ = serdeParameters; - return *this; - } - - HiveConnectorSplitBuilder& connectorId(const std::string& connectorId) { - connectorId_ = connectorId; - return *this; - } - - HiveConnectorSplitBuilder& fileProperties(FileProperties fileProperties) { - fileProperties_ = fileProperties; - return *this; - } - - std::shared_ptr build() const { - return std::make_shared( - connectorId_, - filePath_, - fileFormat_, - start_, - length_, - partitionKeys_, - tableBucketNumber_, - customSplitInfo_, - extraFileInfo_, - serdeParameters_, - splitWeight_, - infoColumns_, - fileProperties_); - } - - private: - const std::string filePath_; - dwio::common::FileFormat fileFormat_{dwio::common::FileFormat::DWRF}; - uint64_t start_{0}; - uint64_t length_{std::numeric_limits::max()}; - std::unordered_map> partitionKeys_; - std::optional tableBucketNumber_; - std::unordered_map customSplitInfo_ = {}; - std::shared_ptr extraFileInfo_ = {}; - std::unordered_map serdeParameters_ = {}; - std::unordered_map infoColumns_ = {}; - std::string connectorId_; - int64_t splitWeight_{0}; - std::optional fileProperties_; -}; - } // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/tests/CMakeLists.txt b/velox/connectors/hive/tests/CMakeLists.txt index ad3a373dab11..f4235cfa13c0 100644 --- a/velox/connectors/hive/tests/CMakeLists.txt +++ b/velox/connectors/hive/tests/CMakeLists.txt @@ -21,7 +21,6 @@ add_executable( HiveConnectorSerDeTest.cpp HivePartitionFunctionTest.cpp HivePartitionUtilTest.cpp - HiveSplitTest.cpp PartitionIdGeneratorTest.cpp TableHandleTest.cpp) add_test(velox_hive_connector_test velox_hive_connector_test) diff --git a/velox/connectors/hive/tests/HiveSplitTest.cpp b/velox/connectors/hive/tests/HiveSplitTest.cpp deleted file mode 100644 index 91412caaee2d..000000000000 --- a/velox/connectors/hive/tests/HiveSplitTest.cpp +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "gtest/gtest.h" -#include "velox/common/config/Config.h" -#include "velox/connectors/hive/HiveConnectorSplit.h" - -using namespace facebook::velox; -using namespace facebook::velox::connector::hive; - -TEST(HiveSplitTest, builder) { - FileProperties properties = {11, 1111}; - auto extra = std::make_shared("extra file info"); - std::unordered_map custom; - custom["custom1"] = "customValue1"; - std::unordered_map serde; - serde["serde1"] = "serdeValue1"; - auto split = HiveConnectorSplitBuilder("filepath") - .start(100) - .length(100000) - .splitWeight(1) - .fileFormat(dwio::common::FileFormat::DWRF) - .infoColumn("info1", "infoValue1") - .partitionKey("DS", "2024-11-01") - .tableBucketNumber(11) - .customSplitInfo(custom) - .extraFileInfo(extra) - .serdeParameters(serde) - .connectorId("connectorId") - .fileProperties(properties) - .build(); - - EXPECT_EQ(100, split->start); - EXPECT_EQ(100000, split->length); - EXPECT_EQ(1, split->splitWeight); - EXPECT_TRUE(dwio::common::FileFormat::DWRF == split->fileFormat); - EXPECT_EQ("infoValue1", split->infoColumns["info1"]); - auto it = split->partitionKeys.find("DS"); - EXPECT_TRUE(it != split->partitionKeys.end()); - EXPECT_EQ("2024-11-01", it->second.value()); - EXPECT_EQ(11, split->tableBucketNumber.value()); - EXPECT_EQ("customValue1", split->customSplitInfo["custom1"]); - EXPECT_EQ(std::string("extra file info"), *split->extraFileInfo); - EXPECT_EQ("serdeValue1", split->serdeParameters["serde1"]); - EXPECT_EQ("connectorId", split->connectorId); - EXPECT_EQ( - properties.fileSize.value(), split->properties.value().fileSize.value()); - EXPECT_EQ( - properties.modificationTime.value(), - split->properties.value().modificationTime.value()); -} diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index 5b66d6457e00..e6052e4eb91d 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -180,7 +180,7 @@ class TableScanTest : public virtual HiveConnectorTestBase { const std::string& filePath, const TypePtr& partitionType, const std::optional& partitionValue) { - auto split = exec::test::HiveConnectorSplitBuilder(filePath) + auto split = HiveConnectorSplitBuilder(filePath) .partitionKey("pkey", partitionValue) .build(); auto outputType = @@ -411,7 +411,7 @@ TEST_F(TableScanTest, partitionKeyAlias) { {"a", regularColumn("c0", BIGINT())}, {"ds_alias", partitionKey("ds", VARCHAR())}}; - auto split = exec::test::HiveConnectorSplitBuilder(filePath->getPath()) + auto split = HiveConnectorSplitBuilder(filePath->getPath()) .partitionKey("ds", "2021-12-02") .build(); @@ -1806,8 +1806,7 @@ TEST_F(TableScanTest, splitOffsetAndLength) { } TEST_F(TableScanTest, fileNotFound) { - auto split = - exec::test::HiveConnectorSplitBuilder("/path/to/nowhere.orc").build(); + auto split = HiveConnectorSplitBuilder("/path/to/nowhere.orc").build(); auto assertMissingFile = [&](bool ignoreMissingFiles) { AssertQueryBuilder(tableScanNode()) .connectorSessionProperty( @@ -1830,7 +1829,7 @@ TEST_F(TableScanTest, validFileNoData) { auto filePath = facebook::velox::test::getDataFilePath( "velox/exec/tests", "data/emptyPresto.dwrf"); - auto split = exec::test::HiveConnectorSplitBuilder(filePath) + auto split = HiveConnectorSplitBuilder(filePath) .start(0) .length(fs::file_size(filePath) / 2) .build(); @@ -1950,7 +1949,7 @@ TEST_F(TableScanTest, partitionedTableDateKey) { // Test partition filter on date column. { - auto split = exec::test::HiveConnectorSplitBuilder(filePath->getPath()) + auto split = HiveConnectorSplitBuilder(filePath->getPath()) .partitionKey("pkey", partitionValue) .build(); auto outputType = ROW({"pkey", "c0", "c1"}, {DATE(), BIGINT(), DOUBLE()}); @@ -2852,10 +2851,9 @@ TEST_F(TableScanTest, bucket) { writeToFile(filePaths[i]->getPath(), rowVector); rowVectors.emplace_back(rowVector); - splits.emplace_back( - exec::test::HiveConnectorSplitBuilder(filePaths[i]->getPath()) - .tableBucketNumber(bucket) - .build()); + splits.emplace_back(HiveConnectorSplitBuilder(filePaths[i]->getPath()) + .tableBucketNumber(bucket) + .build()); } createDuckDbTable(rowVectors); @@ -2879,7 +2877,7 @@ TEST_F(TableScanTest, bucket) { for (int i = 0; i < buckets.size(); ++i) { int bucketValue = buckets[i]; - auto hsplit = exec::test::HiveConnectorSplitBuilder(filePaths[i]->getPath()) + auto hsplit = HiveConnectorSplitBuilder(filePaths[i]->getPath()) .tableBucketNumber(bucketValue) .build(); @@ -2899,7 +2897,7 @@ TEST_F(TableScanTest, bucket) { // Filter on bucket column, but don't project it out auto rowTypes = ROW({"c0", "c1"}, {INTEGER(), BIGINT()}); - hsplit = exec::test::HiveConnectorSplitBuilder(filePaths[i]->getPath()) + hsplit = HiveConnectorSplitBuilder(filePaths[i]->getPath()) .tableBucketNumber(bucketValue) .build(); op = PlanBuilder() @@ -4143,7 +4141,7 @@ TEST_F(TableScanTest, reuseRowVector) { .tableScan(rowType, {}, "c0 < 5") .project({"c1.c0"}) .planNode(); - auto split = exec::test::HiveConnectorSplitBuilder(file->getPath()).build(); + auto split = HiveConnectorSplitBuilder(file->getPath()).build(); auto expected = makeRowVector( {makeFlatVector(10, [](auto i) { return i % 5; })}); AssertQueryBuilder(plan).splits({split, split}).assertResults(expected); @@ -4724,7 +4722,7 @@ TEST_F(TableScanTest, varbinaryPartitionKey) { {"a", regularColumn("c0", BIGINT())}, {"ds_alias", partitionKey("ds", VARBINARY())}}; - auto split = exec::test::HiveConnectorSplitBuilder(filePath->getPath()) + auto split = HiveConnectorSplitBuilder(filePath->getPath()) .partitionKey("ds", "2021-12-02") .build(); @@ -4763,7 +4761,7 @@ TEST_F(TableScanTest, timestampPartitionKey) { ColumnHandleMap assignments = {{"t", partitionKey("t", TIMESTAMP())}}; std::vector> splits; for (auto& t : inputs) { - splits.push_back(exec::test::HiveConnectorSplitBuilder(filePath->getPath()) + splits.push_back(HiveConnectorSplitBuilder(filePath->getPath()) .partitionKey("t", t) .build()); } @@ -4782,7 +4780,7 @@ TEST_F(TableScanTest, partitionKeyNotMatchPartitionKeysHandle) { writeToFile(filePath->getPath(), vectors); createDuckDbTable(vectors); - auto split = exec::test::HiveConnectorSplitBuilder(filePath->getPath()) + auto split = HiveConnectorSplitBuilder(filePath->getPath()) .partitionKey("ds", "2021-12-02") .build(); diff --git a/velox/exec/tests/utils/CMakeLists.txt b/velox/exec/tests/utils/CMakeLists.txt index 8e78d65f380d..c2d227410c1e 100644 --- a/velox/exec/tests/utils/CMakeLists.txt +++ b/velox/exec/tests/utils/CMakeLists.txt @@ -22,10 +22,8 @@ add_library( AssertQueryBuilder.cpp ArbitratorTestUtil.cpp Cursor.cpp - DistributedPlanBuilder.cpp HiveConnectorTestBase.cpp LocalExchangeSource.cpp - LocalRunnerTestBase.cpp OperatorTestBase.cpp PlanBuilder.cpp QueryAssertions.cpp @@ -37,7 +35,6 @@ add_library( target_link_libraries( velox_exec_test_lib - velox_local_runner velox_vector_test_lib velox_temp_path velox_core diff --git a/velox/exec/tests/utils/Cursor.cpp b/velox/exec/tests/utils/Cursor.cpp index ad7b4133c6c7..1aed8908206b 100644 --- a/velox/exec/tests/utils/Cursor.cpp +++ b/velox/exec/tests/utils/Cursor.cpp @@ -275,10 +275,6 @@ class MultiThreadedTaskCursor : public TaskCursorBase { /// Starts the task if not started yet. bool moveNext() override { start(); - if (error_) { - std::rethrow_exception(error_); - } - current_ = queue_->dequeue(); if (task_->error()) { // Wait for the task to finish (there's' a small period of time between @@ -306,13 +302,6 @@ class MultiThreadedTaskCursor : public TaskCursorBase { return current_; } - void setError(std::exception_ptr error) override { - error_ = error; - if (task_) { - task_->setError(error); - } - } - const std::shared_ptr& task() override { return task_; } @@ -327,7 +316,6 @@ class MultiThreadedTaskCursor : public TaskCursorBase { std::shared_ptr task_; RowVectorPtr current_; bool atEnd_{false}; - std::exception_ptr error_; }; class SingleThreadedTaskCursor : public TaskCursorBase { @@ -403,13 +391,6 @@ class SingleThreadedTaskCursor : public TaskCursorBase { return current_; } - void setError(std::exception_ptr error) override { - error_ = error; - if (task_) { - task_->setError(error); - } - } - const std::shared_ptr& task() override { return task_; } @@ -418,7 +399,6 @@ class SingleThreadedTaskCursor : public TaskCursorBase { std::shared_ptr task_; RowVectorPtr current_; RowVectorPtr next_; - std::exception_ptr error_; }; std::unique_ptr TaskCursor::create(const CursorParameters& params) { diff --git a/velox/exec/tests/utils/Cursor.h b/velox/exec/tests/utils/Cursor.h index 9c8cfaf7d7fa..314eb8958760 100644 --- a/velox/exec/tests/utils/Cursor.h +++ b/velox/exec/tests/utils/Cursor.h @@ -139,8 +139,6 @@ class TaskCursor { virtual RowVectorPtr& current() = 0; - virtual void setError(std::exception_ptr error) = 0; - virtual const std::shared_ptr& task() = 0; }; diff --git a/velox/exec/tests/utils/DistributedPlanBuilder.cpp b/velox/exec/tests/utils/DistributedPlanBuilder.cpp deleted file mode 100644 index fddb79a6f3ca..000000000000 --- a/velox/exec/tests/utils/DistributedPlanBuilder.cpp +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "velox/exec/tests/utils/DistributedPlanBuilder.h" - -namespace facebook::velox::exec::test { - -DistributedPlanBuilder::DistributedPlanBuilder( - const runner::MultiFragmentPlan::Options& options, - std::shared_ptr planNodeIdGenerator, - memory::MemoryPool* pool) - : PlanBuilder(planNodeIdGenerator, pool), options_(options), root_(this) { - root_->stack_.push_back(this); - newFragment(); - current_->width = options_.numWorkers; -} - -DistributedPlanBuilder::DistributedPlanBuilder(DistributedPlanBuilder& root) - : PlanBuilder(root.planNodeIdGenerator(), root.pool()), - options_(root.options_), - root_(&root) { - root_->stack_.push_back(this); - newFragment(); - current_->width = options_.numWorkers; -} - -std::vector DistributedPlanBuilder::fragments() { - newFragment(); - return std::move(fragments_); -} - -void DistributedPlanBuilder::newFragment() { - if (current_) { - gatherScans(planNode_); - current_->fragment = core::PlanFragment(std::move(planNode_)); - fragments_.push_back(std::move(*current_)); - } - current_ = std::make_unique( - fmt::format("{}.{}", options_.queryId, root_->fragmentCounter_++)); - planNode_ = nullptr; -} - -PlanBuilder& DistributedPlanBuilder::shuffle( - const std::vector& partitionKeys, - int numPartitions, - bool replicateNullsAndAny, - const std::vector& outputLayout) { - partitionedOutput( - partitionKeys, numPartitions, replicateNullsAndAny, outputLayout); - auto* output = - dynamic_cast(planNode_.get()); - VELOX_CHECK_NOT_NULL(output); - auto producerPrefix = current_->taskPrefix; - newFragment(); - current_->width = numPartitions; - exchange(output->outputType(), VectorSerde::Kind::kPresto); - auto* exchange = dynamic_cast(planNode_.get()); - VELOX_CHECK_NOT_NULL(exchange); - current_->inputStages.push_back( - runner::InputStage{exchange->id(), producerPrefix}); - return *this; -} - -core::PlanNodePtr DistributedPlanBuilder::shuffleResult( - const std::vector& partitionKeys, - int numPartitions, - bool replicateNullsAndAny, - const std::vector& outputLayout) { - partitionedOutput( - partitionKeys, numPartitions, replicateNullsAndAny, outputLayout); - auto* output = - dynamic_cast(planNode_.get()); - VELOX_CHECK_NOT_NULL(output); - const auto producerPrefix = current_->taskPrefix; - auto result = planNode_; - newFragment(); - root_->stack_.pop_back(); - auto* consumer = root_->stack_.back(); - if (consumer->current_->width != 0) { - VELOX_CHECK_EQ( - numPartitions, - consumer->current_->width, - "The consumer width should match the producer fanout"); - } else { - consumer->current_->width = numPartitions; - } - - for (auto& fragment : fragments_) { - root_->fragments_.push_back(std::move(fragment)); - } - exchange(output->outputType(), VectorSerde::Kind::kPresto); - auto* exchange = dynamic_cast(planNode_.get()); - consumer->current_->inputStages.push_back( - runner::InputStage{exchange->id(), producerPrefix}); - return std::move(planNode_); -} - -void DistributedPlanBuilder::gatherScans(const core::PlanNodePtr& plan) { - if (auto scan = std::dynamic_pointer_cast(plan)) { - current_->scans.push_back(scan); - return; - } - for (auto& source : plan->sources()) { - gatherScans(source); - } -} -} // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/DistributedPlanBuilder.h b/velox/exec/tests/utils/DistributedPlanBuilder.h deleted file mode 100644 index ba091ef8c66a..000000000000 --- a/velox/exec/tests/utils/DistributedPlanBuilder.h +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include "velox/exec/tests/utils/PlanBuilder.h" -#include "velox/runner/MultiFragmentPlan.h" - -namespace facebook::velox::exec::test { - -/// Builder for distributed plans. Adds a shuffle() and related -/// methods for building PartitionedOutput-Exchange pairs between -/// fragments. Not thread safe. -class DistributedPlanBuilder : public PlanBuilder { - public: - /// Constructs a top level DistributedPlanBuilder. - DistributedPlanBuilder( - const runner::MultiFragmentPlan::Options& options, - std::shared_ptr planNodeIdGenerator, - memory::MemoryPool* pool = nullptr); - - /// Constructs a child builder. Used for branching plans, e.g. the subplan for - /// a join build side. - DistributedPlanBuilder(DistributedPlanBuilder& root); - - /// Returns the planned fragments. The builder will be empty after this. This - /// is only called on the root builder. - std::vector fragments(); - - PlanBuilder& shuffle( - const std::vector& keys, - int numPartitions, - bool replicateNullsAndAny, - const std::vector& outputLayout = {}) override; - - core::PlanNodePtr shuffleResult( - const std::vector& keys, - int numPartitions, - bool replicateNullsAndAny, - const std::vector& outputLayout = {}) override; - - private: - void newFragment(); - - void gatherScans(const core::PlanNodePtr& plan); - - const runner::MultiFragmentPlan::Options& options_; - DistributedPlanBuilder* const root_; - - // Stack of outstanding builders. The last element is the immediately - // enclosing one. When returning an ExchangeNode from returnShuffle, the stack - // is used to establish the linkage between the fragment of the returning - // builder and the fragment current in the calling builder. Only filled in the - // root builder. - std::vector stack_; - - // Fragment counter. Only used in root builder. - int32_t fragmentCounter_{0}; - - // The fragment being built. Will be moved to the root builder's 'fragments_' - // when complete. - std::unique_ptr current_; - - // The fragments gathered under this builder. Moved to the root builder when - // returning the subplan. - std::vector fragments_; -}; - -} // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.h b/velox/exec/tests/utils/HiveConnectorTestBase.h index a53b2634ef4b..74dd5223307b 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.h +++ b/velox/exec/tests/utils/HiveConnectorTestBase.h @@ -223,15 +223,109 @@ class HiveConnectorTestBase : public OperatorTestBase { } }; -/// Same as connector::hive::HiveConnectorBuilder, except that this defaults -/// connectorId to kHiveConnectorId. -class HiveConnectorSplitBuilder - : public connector::hive::HiveConnectorSplitBuilder { +class HiveConnectorSplitBuilder { public: explicit HiveConnectorSplitBuilder(std::string filePath) - : connector::hive::HiveConnectorSplitBuilder(filePath) { - connectorId(kHiveConnectorId); + : filePath_{std::move(filePath)} { + infoColumns_["$path"] = filePath_; } + + HiveConnectorSplitBuilder& start(uint64_t start) { + start_ = start; + return *this; + } + + HiveConnectorSplitBuilder& length(uint64_t length) { + length_ = length; + return *this; + } + + HiveConnectorSplitBuilder& splitWeight(int64_t splitWeight) { + splitWeight_ = splitWeight; + return *this; + } + + HiveConnectorSplitBuilder& fileFormat(dwio::common::FileFormat format) { + fileFormat_ = format; + return *this; + } + + HiveConnectorSplitBuilder& infoColumn( + const std::string& name, + const std::string& value) { + infoColumns_.emplace(std::move(name), std::move(value)); + return *this; + } + + HiveConnectorSplitBuilder& partitionKey( + std::string name, + std::optional value) { + partitionKeys_.emplace(std::move(name), std::move(value)); + return *this; + } + + HiveConnectorSplitBuilder& tableBucketNumber(int32_t bucket) { + tableBucketNumber_ = bucket; + infoColumns_["$bucket"] = std::to_string(bucket); + return *this; + } + + HiveConnectorSplitBuilder& customSplitInfo( + const std::unordered_map& customSplitInfo) { + customSplitInfo_ = customSplitInfo; + return *this; + } + + HiveConnectorSplitBuilder& extraFileInfo( + const std::shared_ptr& extraFileInfo) { + extraFileInfo_ = extraFileInfo; + return *this; + } + + HiveConnectorSplitBuilder& serdeParameters( + const std::unordered_map& serdeParameters) { + serdeParameters_ = serdeParameters; + return *this; + } + + HiveConnectorSplitBuilder& connectorId(const std::string& connectorId) { + connectorId_ = connectorId; + return *this; + } + + std::shared_ptr build() const { + static const std::unordered_map customSplitInfo; + static const std::shared_ptr extraFileInfo; + static const std::unordered_map serdeParameters; + return std::make_shared( + connectorId_, + filePath_, + fileFormat_, + start_, + length_, + partitionKeys_, + tableBucketNumber_, + customSplitInfo, + extraFileInfo, + serdeParameters, + splitWeight_, + infoColumns_, + std::nullopt); + } + + private: + const std::string filePath_; + dwio::common::FileFormat fileFormat_{dwio::common::FileFormat::DWRF}; + uint64_t start_{0}; + uint64_t length_{std::numeric_limits::max()}; + std::unordered_map> partitionKeys_; + std::optional tableBucketNumber_; + std::unordered_map customSplitInfo_ = {}; + std::shared_ptr extraFileInfo_ = {}; + std::unordered_map serdeParameters_ = {}; + std::unordered_map infoColumns_ = {}; + std::string connectorId_ = kHiveConnectorId; + int64_t splitWeight_{0}; }; } // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/LocalRunnerTestBase.cpp b/velox/exec/tests/utils/LocalRunnerTestBase.cpp deleted file mode 100644 index c7919cb66bec..000000000000 --- a/velox/exec/tests/utils/LocalRunnerTestBase.cpp +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "velox/exec/tests/utils/LocalRunnerTestBase.h" -namespace facebook::velox::exec::test { - -void LocalRunnerTestBase::SetUp() { - HiveConnectorTestBase::SetUp(); - exec::ExchangeSource::factories().clear(); - exec::ExchangeSource::registerFactory(createLocalExchangeSource); - ensureTestData(); -} - -std::shared_ptr LocalRunnerTestBase::makeQueryCtx( - const std::string& queryId, - memory::MemoryPool* rootPool) { - auto config = config_; - auto hiveConfig = hiveConfig_; - std::unordered_map> - connectorConfigs; - auto copy = hiveConfig_; - connectorConfigs[kHiveConnectorId] = - std::make_shared(std::move(copy)); - - return core::QueryCtx::create( - schemaExecutor_.get(), - core::QueryConfig(config), - std::move(connectorConfigs), - cache::AsyncDataCache::getInstance(), - rootPool->shared_from_this(), - nullptr, - queryId); -} - -void LocalRunnerTestBase::ensureTestData() { - if (!files_) { - makeTables(testTables_, files_); - } - makeSchema(); - splitSourceFactory_ = - std::make_shared(schema_, 2); -} - -void LocalRunnerTestBase::makeSchema() { - auto schemaQueryCtx = makeQueryCtx("schema", rootPool_.get()); - common::SpillConfig spillConfig; - common::PrefixSortConfig prefixSortConfig(100, 130); - auto leafPool = schemaQueryCtx->pool()->addLeafChild("schemaReader"); - auto connectorQueryCtx = std::make_shared( - leafPool.get(), - schemaQueryCtx->pool(), - schemaQueryCtx->connectorSessionProperties(kHiveConnectorId), - &spillConfig, - prefixSortConfig, - std::make_unique( - schemaQueryCtx.get(), schemaPool_.get()), - schemaQueryCtx->cache(), - "scan_for_schema", - "schema", - "N/a", - 0, - schemaQueryCtx->queryConfig().sessionTimezone()); - auto connector = connector::getConnector(kHiveConnectorId); - schema_ = std::make_shared( - files_->getPath(), - dwio::common::FileFormat::DWRF, - reinterpret_cast(connector.get()), - connectorQueryCtx); -} - -void LocalRunnerTestBase::makeTables( - std::vector specs, - std::shared_ptr& directory) { - directory = exec::test::TempDirectoryPath::create(); - for (auto& spec : specs) { - auto tablePath = fmt::format("{}/{}", directory->getPath(), spec.name); - auto fs = filesystems::getFileSystem(tablePath, {}); - fs->mkdir(tablePath); - for (auto i = 0; i < spec.numFiles; ++i) { - auto vectors = HiveConnectorTestBase::makeVectors( - spec.columns, spec.numVectorsPerFile, spec.rowsPerVector); - if (spec.customizeData) { - for (auto& vector : vectors) { - spec.customizeData(vector); - } - } - writeToFile(fmt::format("{}/f{}", tablePath, i), vectors); - } - } -} - -} // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/LocalRunnerTestBase.h b/velox/exec/tests/utils/LocalRunnerTestBase.h deleted file mode 100644 index 58e80954ca5a..000000000000 --- a/velox/exec/tests/utils/LocalRunnerTestBase.h +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include "velox/exec/ExchangeSource.h" -#include "velox/exec/tests/utils/HiveConnectorTestBase.h" -#include "velox/exec/tests/utils/LocalExchangeSource.h" -#include "velox/exec/tests/utils/TempDirectoryPath.h" -#include "velox/runner/LocalRunner.h" - -namespace facebook::velox::exec::test { - -struct TableSpec { - std::string name; - RowTypePtr columns; - int32_t rowsPerVector{10000}; - int32_t numVectorsPerFile{5}; - int32_t numFiles{5}; - - /// Function Applied to generated RowVectors for the table before writing. - /// May be used to insert non-random data on top of the random datafrom - /// HiveConnectorTestBase::makeVectors. - std::function customizeData; -}; - -/// Test helper class that manages a TestCase with a set of generated tables and -/// a LocalSchema and LocalSplitSource covering the test data. The lifetime of -/// the test data is the test case consisting of multiple TEST_F's. -class LocalRunnerTestBase : public HiveConnectorTestBase { - protected: - static void SetUpTestCase() { - HiveConnectorTestBase::SetUpTestCase(); - schemaExecutor_ = std::make_unique(4); - } - - static void TearDownTestCase() { - files_.reset(); - HiveConnectorTestBase::TearDownTestCase(); - } - - void SetUp() override; - - void ensureTestData(); - void makeSchema(); - - void makeTables( - std::vector specs, - std::shared_ptr& directory); - - std::shared_ptr splitSourceFactory( - const runner::LocalSchema& schema); - - // Creates a QueryCtx with 'pool'. 'pool' must be a root pool. - static std::shared_ptr makeQueryCtx( - const std::string& queryId, - memory::MemoryPool* pool); - - // Configs for creating QueryCtx. - inline static std::unordered_map config_; - inline static std::unordered_map hiveConfig_; - - // The specification of the test data. The data is created in ensureTestData() - // called from each SetUp()(. - inline static std::vector testTables_; - - // The top level directory with the test data. - inline static std::shared_ptr files_; - inline static std::unique_ptr schemaExecutor_; - - // The schema built from the data in 'files_'. - std::shared_ptr schema_; - - // Split source factory for making SplitSources that range over tables inside - // 'files_'. - std::shared_ptr splitSourceFactory_; - - // Leaf pool for schema. - std::shared_ptr schemaPool_; -}; - -} // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index 5ccf3a216775..f380ef4f03c7 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -105,8 +105,6 @@ class PlanBuilder { planNodeIdGenerator_{std::move(planNodeIdGenerator)}, pool_{pool} {} - virtual ~PlanBuilder() = default; - static constexpr const std::string_view kHiveDefaultConnectorId{"test-hive"}; static constexpr const std::string_view kTpchDefaultConnectorId{"test-tpch"}; @@ -1031,9 +1029,7 @@ class PlanBuilder { return *this; } - /// Return the latest plan node, e.g. the root node of the plan - /// tree. The DistributedPlanBuilder override additionally moves stage - /// information to a parent PlanBuilder. + /// Return the latest plan node, e.g. the root node of the plan tree. const core::PlanNodePtr& planNode() const { return planNode_; } @@ -1058,28 +1054,6 @@ class PlanBuilder { return *this; } - /// In a DistributedPlanBuilder, introduces a shuffle boundary. The plan so - /// far is shuffled and subsequent nodes consume the shuffle. Arguments are as - /// in partitionedOutput(). - virtual PlanBuilder& shuffle( - const std::vector& keys, - int numPartitions, - bool replicateNullsAndAny, - const std::vector& outputLayout = {}) { - VELOX_UNSUPPORTED("Needs DistributedPlanBuilder"); - } - - /// In a DistributedPlanBuilder, returns an Exchange on top of the plan built - /// so far and couples it to the current stage in the enclosing builder. - /// Arguments are as in shuffle(). - virtual core::PlanNodePtr shuffleResult( - const std::vector& keys, - int numPartitions, - bool replicateNullsAndAny, - const std::vector& outputLayout = {}) { - VELOX_UNSUPPORTED("Needs DistributedPlanBuilder"); - } - protected: // Users who create custom operators might want to extend the PlanBuilder to // customize extended plan builders. Those functions are needed in such @@ -1089,14 +1063,6 @@ class PlanBuilder { std::shared_ptr inferTypes( const core::ExprPtr& untypedExpr); - std::shared_ptr planNodeIdGenerator() const { - return planNodeIdGenerator_; - } - - memory::MemoryPool* pool() const { - return pool_; - } - private: std::shared_ptr field(column_index_t index); diff --git a/velox/exec/tests/utils/QueryAssertions.cpp b/velox/exec/tests/utils/QueryAssertions.cpp index b6e5d4d195a1..c13a9a049c34 100644 --- a/velox/exec/tests/utils/QueryAssertions.cpp +++ b/velox/exec/tests/utils/QueryAssertions.cpp @@ -1456,18 +1456,6 @@ std::pair, std::vector> readCursor( return {std::move(cursor), std::move(result)}; } -// static -std::vector readCursor( - std::shared_ptr runner) { - // 'result' borrows memory from cursor so the life cycle must be shorter. - std::vector result; - - while (auto rows = runner->next()) { - result.push_back(rows); - } - return result; -} - bool waitForTaskFinish( exec::Task* task, TaskState expectedState, diff --git a/velox/exec/tests/utils/QueryAssertions.h b/velox/exec/tests/utils/QueryAssertions.h index d3fb15cd442a..1b60b56e6e13 100644 --- a/velox/exec/tests/utils/QueryAssertions.h +++ b/velox/exec/tests/utils/QueryAssertions.h @@ -23,7 +23,6 @@ #include "velox/vector/ComplexVector.h" #include // @manual -#include "velox/runner/LocalRunner.h" namespace facebook::velox::exec::test { @@ -180,10 +179,6 @@ std::pair, std::vector> readCursor( std::function addSplits, uint64_t maxWaitMicros = 5'000'000); -/// Reads all results from 'runner'. -std::vector readCursor( - std::shared_ptr runner); - /// The Task can return results before the Driver is finished executing. /// Wait upto maxWaitMicros for the Task to finish as 'expectedState' before /// returning to ensure it's stable e.g. the Driver isn't updating it anymore. diff --git a/velox/runner/CMakeLists.txt b/velox/runner/CMakeLists.txt deleted file mode 100644 index 4bac074d7af9..000000000000 --- a/velox/runner/CMakeLists.txt +++ /dev/null @@ -1,27 +0,0 @@ -# Copyright (c) Facebook, Inc. and its affiliates. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -add_subdirectory(tests) - -velox_add_library(velox_local_runner LocalRunner.cpp LocalSchema.cpp Runner.cpp) - -velox_link_libraries( - velox_local_runner - velox_common_base - velox_memory - velox_hive_connector - velox_dwio_common - velox_dwio_dwrf_writer - velox_exec_test_lib - velox_exec) diff --git a/velox/runner/LocalRunner.cpp b/velox/runner/LocalRunner.cpp deleted file mode 100644 index d18a32528416..000000000000 --- a/velox/runner/LocalRunner.cpp +++ /dev/null @@ -1,291 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "velox/runner/LocalRunner.h" -#include "velox/common/time/Timer.h" - -#include "velox/connectors/hive/HiveConnectorSplit.h" - -namespace facebook::velox::runner { -namespace { -std::shared_ptr remoteSplit( - const std::string& taskId) { - return std::make_shared(taskId); -} -} // namespace - -RowVectorPtr LocalRunner::next() { - if (!cursor_) { - start(); - } - bool hasNext = cursor_->moveNext(); - if (!hasNext) { - state_ = State::kFinished; - return nullptr; - } - return cursor_->current(); -} - -void LocalRunner::start() { - VELOX_CHECK_EQ(state_, State::kInitialized); - auto lastStage = makeStages(); - params_.planNode = plan_->fragments().back().fragment.planNode; - auto cursor = exec::test::TaskCursor::create(params_); - stages_.push_back({cursor->task()}); - // Add table scan splits to the final gathere stage. - for (auto& scan : fragments_.back().scans) { - auto source = splitSourceFactory_->splitSourceForScan(*scan); - for (;;) { - auto split = source->next(0); - if (!split.hasConnectorSplit()) { - break; - } - cursor->task()->addSplit(scan->id(), std::move(split)); - } - cursor->task()->noMoreSplits(scan->id()); - } - // If the plan only has the final gather stage, there are no shuffles between - // the last - // and previous stages to set up. - if (!lastStage.empty()) { - const auto finalStageConsumer = - fragments_.back().inputStages[0].consumerNodeId; - for (auto& remote : lastStage) { - cursor->task()->addSplit(finalStageConsumer, exec::Split(remote)); - } - cursor->task()->noMoreSplits(finalStageConsumer); - } - { - std::lock_guard l(mutex_); - if (!error_) { - cursor_ = std::move(cursor); - state_ = State::kRunning; - } - } - if (!cursor_) { - // The cursor was not set because previous fragments had an error. - abort(); - std::rethrow_exception(error_); - } -} - -void LocalRunner::abort() { - // If called without previous error, we set the error to be cancellation. - if (!error_) { - try { - state_ = State::kCancelled; - VELOX_FAIL("Query cancelled"); - } catch (const std::exception& e) { - error_ = std::current_exception(); - } - } - VELOX_CHECK(state_ != State::kInitialized); - // Setting errors is thred safe. The stages do not change after - // initialization. - for (auto& stage : stages_) { - for (auto& task : stage) { - task->setError(error_); - } - } - if (cursor_) { - cursor_->setError(error_); - } -} - -void LocalRunner::waitForCompletion(int32_t maxWaitUs) { - VELOX_CHECK_NE(state_, State::kInitialized); - std::vector futures; - { - std::lock_guard l(mutex_); - for (auto& stage : stages_) { - for (auto& task : stage) { - futures.push_back(task->taskDeletionFuture()); - } - stage.clear(); - } - } - auto startTime = getCurrentTimeMicro(); - for (auto& future : futures) { - auto& executor = folly::QueuedImmediateExecutor::instance(); - if (getCurrentTimeMicro() - startTime > maxWaitUs) { - VELOX_FAIL("LocalRunner did not finish within {} us", maxWaitUs); - } - std::move(future) - .within(std::chrono::microseconds(maxWaitUs)) - .via(&executor) - .wait(); - } -} - -std::vector> -LocalRunner::makeStages() { - std::unordered_map stageMap; - auto sharedRunner = shared_from_this(); - auto onError = [self = sharedRunner, this](std::exception_ptr error) { - { - std::lock_guard l(mutex_); - if (error_) { - return; - } - state_ = State::kError; - error_ = error; - } - if (cursor_) { - abort(); - } - }; - - for (auto fragmentIndex = 0; fragmentIndex < fragments_.size() - 1; - ++fragmentIndex) { - auto& fragment = fragments_[fragmentIndex]; - stageMap[fragment.taskPrefix] = stages_.size(); - stages_.emplace_back(); - for (auto i = 0; i < fragment.width; ++i) { - exec::Consumer consumer = nullptr; - auto task = exec::Task::create( - fmt::format( - "local://{}/{}.{}", - params_.queryCtx->queryId(), - fragment.taskPrefix, - i), - fragment.fragment, - i, - params_.queryCtx, - exec::Task::ExecutionMode::kParallel, - consumer, - onError); - stages_.back().push_back(task); - if (fragment.numBroadcastDestinations) { - // TODO: Add support for Arbitrary partition type. - task->updateOutputBuffers(fragment.numBroadcastDestinations, true); - } - task->start(options_.numDrivers); - } - } - - for (auto fragmentIndex = 0; fragmentIndex < fragments_.size() - 1; - ++fragmentIndex) { - auto& fragment = fragments_[fragmentIndex]; - for (auto& scan : fragment.scans) { - auto source = splitSourceFactory_->splitSourceForScan(*scan); - bool allDone = false; - do { - for (auto i = 0; i < stages_[fragmentIndex].size(); ++i) { - auto split = source->next(i); - if (!split.hasConnectorSplit()) { - allDone = true; - break; - } - stages_[fragmentIndex][i]->addSplit(scan->id(), std::move(split)); - } - } while (!allDone); - } - for (auto& scan : fragment.scans) { - for (auto i = 0; i < stages_[fragmentIndex].size(); ++i) { - stages_[fragmentIndex][i]->noMoreSplits(scan->id()); - } - } - - for (auto& input : fragment.inputStages) { - const auto sourceStage = stageMap[input.producerTaskPrefix]; - std::vector> sourceSplits; - for (auto i = 0; i < stages_[sourceStage].size(); ++i) { - sourceSplits.push_back(remoteSplit(stages_[sourceStage][i]->taskId())); - } - for (auto& task : stages_[fragmentIndex]) { - for (auto& remote : sourceSplits) { - task->addSplit(input.consumerNodeId, exec::Split(remote)); - } - task->noMoreSplits(input.consumerNodeId); - } - } - } - if (stages_.empty()) { - return {}; - } - std::vector> lastStage; - for (auto& task : stages_.back()) { - lastStage.push_back(remoteSplit(task->taskId())); - } - return lastStage; -} - -exec::Split LocalSplitSource::next(int32_t /*worker*/) { - if (currentFile_ >= static_cast(table_->files().size())) { - return exec::Split(); - } - - if (currentSplit_ >= fileSplits_.size()) { - fileSplits_.clear(); - ++currentFile_; - if (currentFile_ >= table_->files().size()) { - return exec::Split(); - } - - currentSplit_ = 0; - auto filePath = table_->files()[currentFile_]; - const auto fileSize = fs::file_size(filePath); - // Take the upper bound. - const int splitSize = std::ceil((fileSize) / splitsPerFile_); - for (int i = 0; i < splitsPerFile_; ++i) { - fileSplits_.push_back( - connector::hive::HiveConnectorSplitBuilder(filePath) - .connectorId(table_->schema()->connector()->connectorId()) - .fileFormat(table_->format()) - .start(i * splitSize) - .length(splitSize) - .build()); - } - } - return exec::Split(std::move(fileSplits_[currentSplit_++])); -} - -std::unique_ptr LocalSplitSourceFactory::splitSourceForScan( - const core::TableScanNode& tableScan) { - auto* tableHandle = dynamic_cast( - tableScan.tableHandle().get()); - VELOX_CHECK_NOT_NULL(tableHandle); - auto* table = reinterpret_cast( - schema_->findTable(tableHandle->tableName())); - - return std::make_unique(table, splitsPerFile_); -} - -std::vector LocalRunner::stats() const { - std::vector result; - std::lock_guard l(mutex_); - for (auto i = 0; i < stages_.size(); ++i) { - auto& tasks = stages_[i]; - VELOX_CHECK(!tasks.empty()); - auto stats = tasks[0]->taskStats(); - for (auto j = 1; j < tasks.size(); ++j) { - auto moreStats = tasks[j]->taskStats(); - for (auto pipeline = 0; pipeline < stats.pipelineStats.size(); - ++pipeline) { - for (auto op = 0; - op < stats.pipelineStats[pipeline].operatorStats.size(); - ++op) { - stats.pipelineStats[pipeline].operatorStats[op].add( - moreStats.pipelineStats[pipeline].operatorStats[op]); - } - } - } - result.push_back(std::move(stats)); - } - return result; -} - -} // namespace facebook::velox::runner diff --git a/velox/runner/LocalRunner.h b/velox/runner/LocalRunner.h deleted file mode 100644 index f5d3bb5c211f..000000000000 --- a/velox/runner/LocalRunner.h +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include "velox/connectors/Connector.h" -#include "velox/exec/Exchange.h" -#include "velox/exec/tests/utils/Cursor.h" -#include "velox/runner/LocalSchema.h" -#include "velox/runner/MultiFragmentPlan.h" -#include "velox/runner/Runner.h" - -namespace facebook::velox::runner { - -/// Runner for in-process execution of a distributed plan. -class LocalRunner : public Runner, - public std::enable_shared_from_this { - public: - LocalRunner( - MultiFragmentPlanPtr plan, - std::shared_ptr queryCtx, - std::shared_ptr splitSourceFactory) - : plan_(std::move(plan)), - fragments_(plan_->fragments()), - options_(plan_->options()), - splitSourceFactory_(std::move(splitSourceFactory)) { - params_.queryCtx = std::move(queryCtx); - } - - RowVectorPtr next() override; - - std::vector stats() const override; - - void abort() override; - - void waitForCompletion(int32_t maxWaitMicros) override; - - State state() const override { - return state_; - } - - private: - void start(); - - // Creates all stages except for the single worker final consumer stage. - std::vector> makeStages(); - - // Serializes 'cursor_' and 'error_'. - mutable std::mutex mutex_; - - const MultiFragmentPlanPtr plan_; - const std::vector fragments_; - const MultiFragmentPlan::Options& options_; - const std::shared_ptr splitSourceFactory_; - - exec::test::CursorParameters params_; - - tsan_atomic state_{State::kInitialized}; - - std::unique_ptr cursor_; - std::vector>> stages_; - std::exception_ptr error_; -}; - -/// Split source that produces splits from a LocalSchema. -class LocalSplitSource : public SplitSource { - public: - LocalSplitSource(const LocalTable* table, int32_t splitsPerFile) - : table_(table), splitsPerFile_(splitsPerFile) {} - - exec::Split next(int32_t worker) override; - - private: - const LocalTable* const table_; - const int32_t splitsPerFile_; - - std::vector> fileSplits_; - int32_t currentFile_{-1}; - int32_t currentSplit_{0}; -}; - -class LocalSplitSourceFactory : public SplitSourceFactory { - public: - LocalSplitSourceFactory( - std::shared_ptr schema, - int32_t splitsPerFile) - : schema_(std::move(schema)), splitsPerFile_(splitsPerFile) {} - - std::unique_ptr splitSourceForScan( - const core::TableScanNode& scan) override; - - private: - const std::shared_ptr schema_; - const int32_t splitsPerFile_; -}; - -} // namespace facebook::velox::runner diff --git a/velox/runner/LocalSchema.cpp b/velox/runner/LocalSchema.cpp deleted file mode 100644 index b35b50b416df..000000000000 --- a/velox/runner/LocalSchema.cpp +++ /dev/null @@ -1,321 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "velox/runner/LocalSchema.h" -#include "velox/connectors/hive/HiveConnectorSplit.h" -#include "velox/dwio/common/BufferedInput.h" -#include "velox/dwio/common/Reader.h" -#include "velox/dwio/common/ReaderFactory.h" - -namespace facebook::velox::runner { - -LocalSchema::LocalSchema( - const std::string& path, - dwio::common::FileFormat format, - connector::hive::HiveConnector* hiveConnector, - std::shared_ptr ctx) - : Schema(path, ctx->memoryPool()), - hiveConnector_(hiveConnector), - connectorId_(hiveConnector_->connectorId()), - connectorQueryCtx_(ctx), - format_(format) { - initialize(path); -} - -void LocalSchema::initialize(const std::string& path) { - for (auto const& dirEntry : fs::directory_iterator{path}) { - if (!dirEntry.is_directory() || - dirEntry.path().filename().c_str()[0] == '.') { - continue; - } - loadTable(dirEntry.path().filename(), dirEntry.path()); - } -} -// Feeds the values in 'vector' into 'builder'. -template -void addStats( - velox::dwrf::StatisticsBuilder* builder, - const BaseVector& vector) { - auto* typedVector = vector.asUnchecked>(); - for (auto i = 0; i < typedVector->size(); ++i) { - if (!typedVector->isNullAt(i)) { - reinterpret_cast(builder)->addValues(typedVector->valueAt(i)); - } - } -} - -std::pair LocalTable::sample( - float pct, - const std::vector& fields, - velox::connector::hive::SubfieldFilters filters, - const velox::core::TypedExprPtr& remainingFilter, - HashStringAllocator* /*allocator*/, - std::vector>* - statsBuilders) { - dwrf::StatisticsBuilderOptions options( - /*stringLengthLimit=*/100, /*initialSize=*/0); - std::vector> builders; - auto tableHandle = std::make_shared( - schema_->connector()->connectorId(), - name_, - true, - std::move(filters), - remainingFilter); - - std::unordered_map< - std::string, - std::shared_ptr> - columnHandles; - std::vector names; - std::vector types; - for (auto& field : fields) { - auto& path = field.path(); - auto column = - dynamic_cast(path[0].get()) - ->name(); - const auto idx = type_->getChildIdx(column); - names.push_back(type_->nameOf(idx)); - types.push_back(type_->childAt(idx)); - columnHandles[names.back()] = - std::make_shared( - names.back(), - connector::hive::HiveColumnHandle::ColumnType::kRegular, - types.back(), - types.back()); - switch (types.back()->kind()) { - case TypeKind::BIGINT: - case TypeKind::INTEGER: - case TypeKind::SMALLINT: - builders.push_back( - std::make_unique(options)); - break; - case TypeKind::REAL: - case TypeKind::DOUBLE: - builders.push_back( - std::make_unique(options)); - break; - case TypeKind::VARCHAR: - builders.push_back( - std::make_unique(options)); - break; - - default: - builders.push_back(nullptr); - } - } - - const auto outputType = ROW(std::move(names), std::move(types)); - int64_t passingRows = 0; - int64_t scannedRows = 0; - for (auto& file : files_) { - auto dataSource = schema_->connector()->createDataSource( - outputType, - tableHandle, - columnHandles, - schema_->connectorQueryCtx().get()); - - auto split = connector::hive::HiveConnectorSplitBuilder(file) - .fileFormat(format_) - .connectorId(schema_->connector()->connectorId()) - .build(); - dataSource->addSplit(split); - constexpr int32_t kBatchSize = 1000; - for (;;) { - ContinueFuture ignore{ContinueFuture::makeEmpty()}; - - auto data = dataSource->next(kBatchSize, ignore).value(); - if (data == nullptr) { - break; - } - passingRows += data->size(); - for (auto column = 0; column < builders.size(); ++column) { - if (!builders[column]) { - continue; - } - auto* builder = builders[column].get(); - auto loadChild = [](RowVectorPtr data, int32_t column) { - data->childAt(column) = - BaseVector::loadedVectorShared(data->childAt(column)); - }; - switch (type_->childAt(column)->kind()) { - case TypeKind::SMALLINT: - loadChild(data, column); - addStats( - builder, *data->childAt(column)); - break; - case TypeKind::INTEGER: - loadChild(data, column); - addStats( - builder, *data->childAt(column)); - break; - case TypeKind::BIGINT: - loadChild(data, column); - addStats( - builder, *data->childAt(column)); - break; - case TypeKind::REAL: - loadChild(data, column); - addStats( - builder, *data->childAt(column)); - break; - case TypeKind::DOUBLE: - loadChild(data, column); - addStats( - builder, *data->childAt(column)); - break; - case TypeKind::VARCHAR: - loadChild(data, column); - addStats( - builder, *data->childAt(column)); - break; - - default: - break; - } - } - break; - } - scannedRows += dataSource->getCompletedRows(); - if (scannedRows > numRows_ * (pct / 100)) { - break; - } - } - if (statsBuilders) { - *statsBuilders = std::move(builders); - } - return std::pair(scannedRows, passingRows); -} - -void LocalSchema::loadTable( - const std::string& tableName, - const fs::path& tablePath) { - // open each file in the directory and check their type and add up the row - // counts. - RowTypePtr tableType; - LocalTable* table = nullptr; - - for (auto const& dirEntry : fs::directory_iterator{tablePath}) { - if (!dirEntry.is_regular_file()) { - continue; - } - // Ignore hidden files. - if (dirEntry.path().filename().c_str()[0] == '.') { - continue; - } - auto it = tables_.find(tableName); - if (it != tables_.end()) { - table = reinterpret_cast(it->second.get()); - } else { - tables_[tableName] = - std::make_unique(tableName, format_, this); - table = reinterpret_cast(tables_[tableName].get()); - } - dwio::common::ReaderOptions readerOptions{pool_}; - readerOptions.setFileFormat(format_); - auto input = std::make_unique( - std::make_shared(dirEntry.path().string()), - readerOptions.memoryPool()); - std::unique_ptr reader = - dwio::common::getReaderFactory(readerOptions.fileFormat()) - ->createReader(std::move(input), readerOptions); - const auto fileType = reader->rowType(); - if (!tableType) { - tableType = fileType; - } else if (fileType->size() > tableType->size()) { - // The larger type is the later since there is only addition of columns. - // TODO: Check the column types are compatible where they overlap. - tableType = fileType; - } - const auto rows = reader->numberOfRows(); - - if (rows.has_value()) { - table->numRows_ += rows.value(); - } - for (auto i = 0; i < fileType->size(); ++i) { - auto name = fileType->nameOf(i); - LocalColumn* column; - auto columnIt = table->columns().find(name); - if (columnIt != table->columns().end()) { - column = columnIt->second.get(); - } else { - table->columns()[name] = - std::make_unique(name, fileType->childAt(i)); - column = table->columns()[name].get(); - } - // Initialize the stats from the first file. - if (column->stats() == nullptr) { - column->setStats(reader->columnStatistics(i)); - } - } - table->files_.push_back(dirEntry.path()); - } - VELOX_CHECK_NOT_NULL(table, "Table directory {} is empty", tablePath); - - table->setType(tableType); - table->sampleNumDistincts(2, pool_); -} - -void LocalTable::sampleNumDistincts(float samplePct, memory::MemoryPool* pool) { - std::vector fields; - for (auto i = 0; i < type_->size(); ++i) { - fields.push_back(common::Subfield(type_->nameOf(i))); - } - - // Sample the table. Adjust distinct values according to the samples. - auto allocator = std::make_unique(pool); - std::vector> statsBuilders; - auto [sampled, passed] = - sample(samplePct, fields, {}, nullptr, allocator.get(), &statsBuilders); - numSampledRows_ = sampled; - for (auto i = 0; i < statsBuilders.size(); ++i) { - if (statsBuilders[i]) { - // TODO: Use HLL estimate of distinct values here after this is added to - // the stats builder. Now assume that all rows have a different value. - // Later refine this by observed min-max range. - int64_t approxNumDistinct = numRows_; - // For tiny tables the sample is 100% and the approxNumDistinct is - // accurate. For partial samples, the distinct estimate is left to be the - // distinct estimate of the sample if there are few distincts. This is an - // enumeration where values in unsampled rows are likely the same. If - // there are many distincts, we multiply by 1/sample rate assuming that - // unsampled rows will mostly have new values. - - if (numSampledRows_ < numRows_) { - if (approxNumDistinct > sampled / 50) { - float numDups = - numSampledRows_ / static_cast(approxNumDistinct); - approxNumDistinct = std::min(numRows_, numRows_ / numDups); - - // If the type is an integer type, num distincts cannot be larger than - // max - min. - if (auto* ints = dynamic_cast( - statsBuilders[i].get())) { - auto min = ints->getMinimum(); - auto max = ints->getMaximum(); - if (min.has_value() && max.has_value()) { - auto range = max.value() - min.value(); - approxNumDistinct = std::min(approxNumDistinct, range); - } - } - } - - columns()[type_->nameOf(i)]->setNumDistinct(approxNumDistinct); - } - } - } -} - -} // namespace facebook::velox::runner diff --git a/velox/runner/LocalSchema.h b/velox/runner/LocalSchema.h deleted file mode 100644 index 628dd71c738f..000000000000 --- a/velox/runner/LocalSchema.h +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include "velox/common/base/Fs.h" -#include "velox/common/memory/HashStringAllocator.h" -#include "velox/connectors/hive/HiveConnector.h" -#include "velox/connectors/hive/TableHandle.h" -#include "velox/dwio/common/Options.h" -#include "velox/dwio/dwrf/writer/StatisticsBuilder.h" -#include "velox/runner/Schema.h" - -namespace facebook::velox::runner { - -class LocalColumn : public Column { - public: - LocalColumn(const std::string& name, TypePtr type) : Column(name, type) {} - - friend class LocalSchema; -}; - -class LocalSchema; - -class LocalTable : public Table { - public: - LocalTable( - const std::string& name, - dwio::common::FileFormat format, - Schema* schema) - : Table(name, format, schema) {} - - std::unordered_map>& columns() { - return columns_; - } - - LocalSchema* schema() const { - return reinterpret_cast(schema_); - } - - void setType(const RowTypePtr& type) { - type_ = type; - } - - std::pair sample( - float pct, - const std::vector& columns, - connector::hive::SubfieldFilters filters, - const core::TypedExprPtr& remainingFilter, - HashStringAllocator* allocator = nullptr, - std::vector>* statsBuilders = - nullptr) override; - - /// Samples 'samplePct' % rows of the table and sets the num distincts - /// estimate for the columns. uses 'pool' for temporary data. - void sampleNumDistincts(float samplePct, memory::MemoryPool* pool); - - const std::vector& files() const { - return files_; - } - - private: - // All columns. Filled by loadTable(). - std::unordered_map> columns_; - - std::vector files_; - int64_t numRows_{0}; - int64_t numSampledRows_{0}; - - friend class LocalSchema; -}; - -class LocalSchema : public Schema { - public: - /// 'path' is the directory containing a subdirectory per table. - LocalSchema( - const std::string& path, - dwio::common::FileFormat format, - connector::hive::HiveConnector* hiveConector, - std::shared_ptr ctx); - - connector::Connector* connector() const override { - return hiveConnector_; - } - - const std::shared_ptr& connectorQueryCtx() - const override { - return connectorQueryCtx_; - } - - private: - void initialize(const std::string& path); - - void loadTable(const std::string& tableName, const fs::path& tablePath); - - connector::hive::HiveConnector* const hiveConnector_; - const std::string connectorId_; - const std::shared_ptr connectorQueryCtx_; - const dwio::common::FileFormat format_; -}; - -} // namespace facebook::velox::runner diff --git a/velox/runner/MultiFragmentPlan.h b/velox/runner/MultiFragmentPlan.h deleted file mode 100644 index 3bd79907a923..000000000000 --- a/velox/runner/MultiFragmentPlan.h +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include "velox/exec/Task.h" - -namespace facebook::velox::runner { - -/// Describes an exchange source for an ExchangeNode a non-leaf stage. -struct InputStage { - // Id of ExchangeNode in the consumer fragment. - core::PlanNodeId consumerNodeId; - - /// Task prefix of producer stage. - std::string producerTaskPrefix; -}; - -/// Describes a fragment of a distributed plan. This allows a run -/// time to distribute fragments across workers and to set up -/// exchanges. A complete plan is a vector of these with the last -/// being the fragment that gathers results from the complete -/// plan. Different runtimes, e.g. local, streaming or -/// materialized shuffle can use this to describe exchange -/// parallel execution. Decisions on number of workers, location -/// of workers and mode of exchange are up to the runtime. -struct ExecutableFragment { - explicit ExecutableFragment(const std::string& taskPrefix) - : taskPrefix(taskPrefix) {} - std::string taskPrefix; - int32_t width{0}; - velox::core::PlanFragment fragment; - - /// Source fragments and Exchange node ids for remote shuffles producing input - /// for 'this'. - std::vector inputStages; - - /// Table scan nodes in 'this'. - std::vector> scans; - int32_t numBroadcastDestinations{0}; -}; - -/// Describes a distributed plan handed to a Runner for parallel/distributed -/// execution. The last element of 'fragments' is by convention the stage that -/// gathers the query result. Otherwise the order of 'fragments' is not -/// important since the producer-consumer relations are given by 'inputStages' -/// in each fragment. -class MultiFragmentPlan { - public: - /// Describes options for running a MultiFragmentPlan. - struct Options { - /// Query id used as a prefix for tasks ids. - std::string queryId; - - // Maximum Number of independent Tasks for one stage of execution. If 1, - // there are no exchanges. - int32_t numWorkers; - - // Number of threads in a fragment in a worker. If 1, there are no local - // exchanges. - int32_t numDrivers; - }; - - MultiFragmentPlan(std::vector fragments, Options options) - : fragments_(std::move(fragments)), options_(std::move(options)) {} - - const std::vector& fragments() const { - return fragments_; - } - - const Options& options() const { - return options_; - } - - std::string toString() const; - - private: - const std::vector fragments_; - const Options options_; -}; - -using MultiFragmentPlanPtr = std::shared_ptr; - -} // namespace facebook::velox::runner diff --git a/velox/runner/Runner.cpp b/velox/runner/Runner.cpp deleted file mode 100644 index d4f18c29f404..000000000000 --- a/velox/runner/Runner.cpp +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "velox/runner/Runner.h" - -namespace facebook::velox::runner { - -// static -std::string Runner::stateString(Runner::State state) { - switch (state) { - case Runner::State::kInitialized: - return "initialized"; - case Runner::State::kRunning: - return "running"; - case Runner::State::kCancelled: - return "cancelled"; - case Runner::State::kError: - return "error"; - case Runner::State::kFinished: - return "finished"; - } - return fmt::format("invalid state {}", static_cast(state)); -} - -std::string MultiFragmentPlan::toString() const { - std::stringstream out; - for (auto i = 0; i < fragments_.size(); ++i) { - out << fmt::format( - "Fragment {}: {} numWorkers={}:\n", - i, - fragments_[i].taskPrefix, - fragments_[i].width); - out << fragments_[i].fragment.planNode->toString(true, true) << std::endl; - if (!fragments_[i].inputStages.empty()) { - out << "Inputs: "; - for (auto& input : fragments_[i].inputStages) { - out << fmt::format( - " {} <- {} ", input.consumerNodeId, input.producerTaskPrefix); - } - out << std::endl; - } - } - return out.str(); -} - -} // namespace facebook::velox::runner diff --git a/velox/runner/Runner.h b/velox/runner/Runner.h deleted file mode 100644 index e570a0df1cf2..000000000000 --- a/velox/runner/Runner.h +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include "velox/connectors/Connector.h" -#include "velox/exec/Exchange.h" -#include "velox/exec/tests/utils/Cursor.h" -#include "velox/runner/LocalSchema.h" -#include "velox/runner/MultiFragmentPlan.h" - -/// Base classes for multifragment Velox query execution. -namespace facebook::velox::runner { - -/// Iterator for obtaining splits for a scan. One is created for each table -/// scan. -class SplitSource { - public: - virtual ~SplitSource() = default; - /// Returns a split for 'worker'. This may implement soft affinity or strict - /// bucket to worker mapping. - virtual exec::Split next(int32_t worker) = 0; -}; - -/// A factory for getting a SplitSource for each TableScan. The splits produced -/// may depend on partition keys, buckets etc mentioned by each tableScan. -class SplitSourceFactory { - public: - virtual ~SplitSourceFactory() = default; - - /// Returns a splitSource for one TableScan across all Tasks of - /// the fragment. The source will be invoked to produce splits for - /// each individual worker running the scan. - virtual std::unique_ptr splitSourceForScan( - const core::TableScanNode& scan) = 0; -}; - -/// Base class for executing multifragment Velox queries. One instance -/// of a Runner coordinates the execution of one multifragment -/// query. Different derived classes can support different shuffles -/// and different scheduling either in process or in a cluster. Unless -/// otherwise stated, the member functions are thread safe as long as -/// the caller holds an owning reference to the runner. -class Runner { - public: - enum class State { kInitialized, kRunning, kFinished, kError, kCancelled }; - - static std::string stateString(Runner::State state); - - virtual ~Runner() = default; - - /// Returns the next batch of results. Returns nullptr when no more results. - /// Throws any execution time errors. The result is allocated in the pool of - /// QueryCtx given to the Runner implementation. The caller is responsible for - /// serializing calls from different threads. - virtual RowVectorPtr next() = 0; - - /// Returns Task stats for each fragment of the plan. The stats correspond 1:1 - /// to the stages in the MultiFragmentPlan. This may be called at any time. - /// before waitForCompletion() or abort(). - virtual std::vector stats() const = 0; - - /// Returns the state of execution. - virtual State state() const = 0; - - /// Cancels the possibly pending execution. Returns immediately, thus before - /// the execution is actually finished. Use waitForCompletion() to wait for - /// all execution resources to be freed. May be called from any thread without - /// serialization. - virtual void abort() = 0; - - /// Waits up to 'maxWaitMicros' for all activity of the execution to cease. - /// This is used in tests to ensure that all pools are empty and unreferenced - /// before teradown. - - virtual void waitForCompletion(int32_t maxWaitMicros) = 0; -}; - -} // namespace facebook::velox::runner - -template <> -struct fmt::formatter - : formatter { - auto format(facebook::velox::runner::Runner::State state, format_context& ctx) - const { - return formatter::format( - facebook::velox::runner::Runner::stateString(state), ctx); - } -}; diff --git a/velox/runner/Schema.h b/velox/runner/Schema.h deleted file mode 100644 index a0d6f35dd9c4..000000000000 --- a/velox/runner/Schema.h +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include "velox/common/base/Fs.h" -#include "velox/common/memory/HashStringAllocator.h" -#include "velox/connectors/hive/HiveConnector.h" -#include "velox/connectors/hive/TableHandle.h" -#include "velox/dwio/common/Options.h" -#include "velox/dwio/dwrf/writer/StatisticsBuilder.h" - -/// Base classes for schema elements used in execution. A Schema is a collection -/// of Tables. A Table is a collection of Columns. Tables and Columns have -/// statistics and Tables can be sampled. Derived classes connect to different -/// metadata stores and provide different metadata, e.g. order, partitioning, -/// bucketing etc. -namespace facebook::velox::runner { - -/// Base class for column. The column's name and type are immutable but the -/// stats may be set multiple times. -class Column { - public: - virtual ~Column() = default; - - Column(const std::string& name, TypePtr type) : name_(name), type_(type) {} - - dwio::common::ColumnStatistics* stats() const { - return latestStats_; - } - - /// Sets statistics. May be called multipl times if table contents change. - void setStats(std::unique_ptr stats) { - std::lock_guard l(mutex_); - allStats_.push_back(std::move(stats)); - latestStats_ = allStats_.back().get(); - } - - const std::string& name() const { - return name_; - } - - const TypePtr& type() const { - return type_; - } - - void setNumDistinct(int64_t numDistinct) { - approxNumDistinct_ = numDistinct; - } - - protected: - const std::string name_; - const TypePtr type_; - - // The latest element added to 'allStats_'. - tsan_atomic latestStats_{nullptr}; - - // All statistics recorded for this column. Old values can be purged when the - // containing Schema is not in use. - std::vector> allStats_; - - // Latest approximate count of distinct values. - std::optional approxNumDistinct_; - - private: - // Serializes changes to statistics. - std::mutex mutex_; -}; - -class Schema; - -/// Base class for table. This is used to identify a table for purposes of -/// Split generation, statistics, sampling etc. -class Table { - public: - virtual ~Table() = default; - - Table( - const std::string& name, - dwio::common::FileFormat format, - Schema* schema) - : schema_(schema), name_(name), format_(format) {} - - const std::string& name() const { - return name_; - } - - const RowTypePtr& rowType() const { - return type_; - } - - dwio::common::FileFormat format() const { - return format_; - } - - /// Samples 'pct' percent of rows for 'fields'. Applies 'filters' - /// before sampling. Returns {count of sampled, count matching filters}. - /// Returns statistics for the post-filtering values in 'stats' for each of - /// 'fields'. If 'fields' is empty, simply returns the number of - /// rows matching 'filter' in a sample of 'pct'% of the table. - /// - /// TODO: Introduce generic statistics builder in dwio/common. - virtual std::pair sample( - float pct, - const std::vector& columns, - connector::hive::SubfieldFilters filters, - const core::TypedExprPtr& remainingFilter, - HashStringAllocator* allocator = nullptr, - std::vector>* statsBuilders = - nullptr) { - VELOX_UNSUPPORTED("Table class does not support sampling."); - } - - protected: - Schema* const schema_; - const std::string name_; - - const dwio::common::FileFormat format_; - - // Discovered from data. In the event of different types, we take the - // latest (i.e. widest) table type. - RowTypePtr type_; -}; - -/// Base class for collection of tables. A query executes against a -/// Schema and its tables and columns are resolved against the -/// Schema. The schema is mutable and may acquire tables and the -/// tables may acquire stats during their lifetime. -class Schema { - public: - virtual ~Schema() = default; - - Schema(const std::string& name, memory::MemoryPool* pool) - : name_(name), pool_(std::move(pool)) {} - - Table* findTable(const std::string& name) { - auto it = tables_.find(name); - VELOX_CHECK(it != tables_.end(), "Table {} not found", name); - return it->second.get(); - } - - virtual connector::Connector* connector() const = 0; - - virtual const std::shared_ptr& - connectorQueryCtx() const = 0; - - protected: - const std::string name_; - - memory::MemoryPool* const pool_; - - std::unordered_map> tables_; -}; - -} // namespace facebook::velox::runner diff --git a/velox/runner/tests/CMakeLists.txt b/velox/runner/tests/CMakeLists.txt deleted file mode 100644 index 005161a505c1..000000000000 --- a/velox/runner/tests/CMakeLists.txt +++ /dev/null @@ -1,25 +0,0 @@ -# Copyright (c) Facebook, Inc. and its affiliates. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -add_executable(velox_local_runner_test LocalRunnerTest.cpp - ../../exec/tests/Main.cpp) - -add_test(velox_local_runner_test velox_local_runner_test) - -target_link_libraries( - velox_local_runner_test - velox_exec_test_lib - velox_dwio_common - velox_parse_parser - velox_parse_expression) diff --git a/velox/runner/tests/LocalRunnerTest.cpp b/velox/runner/tests/LocalRunnerTest.cpp deleted file mode 100644 index ffd05cdc83f0..000000000000 --- a/velox/runner/tests/LocalRunnerTest.cpp +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "velox/exec/tests/utils/DistributedPlanBuilder.h" -#include "velox/exec/tests/utils/LocalRunnerTestBase.h" -#include "velox/exec/tests/utils/QueryAssertions.h" - -using namespace facebook::velox; -using namespace facebook::velox::exec; -using namespace facebook::velox::runner; -using namespace facebook::velox::exec::test; - -class LocalRunnerTest : public LocalRunnerTestBase { - protected: - static constexpr int32_t kNumFiles = 5; - static constexpr int32_t kNumVectors = 5; - static constexpr int32_t kRowsPerVector = 10000; - static constexpr int32_t kNumRows = kNumFiles * kNumVectors * kRowsPerVector; - - static void SetUpTestCase() { - // The lambdas will be run after this scope returns, so make captures - // static. - static int32_t counter1; - // Clear 'counter1' so that --gtest_repeat runs get the same data. - counter1 = 0; - auto customize1 = [&](const RowVectorPtr& rows) { - makeAscending(rows, counter1); - }; - - static int32_t counter2; - counter2 = 0; - auto customize2 = [&](const RowVectorPtr& rows) { - makeAscending(rows, counter2); - }; - - rowType_ = ROW({"c0"}, {BIGINT()}); - testTables_ = { - TableSpec{ - .name = "T", - .columns = rowType_, - .rowsPerVector = kRowsPerVector, - .numVectorsPerFile = kNumVectors, - .numFiles = kNumFiles, - .customizeData = customize1}, - TableSpec{ - .name = "U", - .columns = rowType_, - .rowsPerVector = kRowsPerVector, - .numVectorsPerFile = kNumVectors, - .numFiles = kNumFiles, - .customizeData = customize2}}; - - // Creates the data and schema from 'testTables_'. These are created on the - // first test fixture initialization. - LocalRunnerTestBase::SetUpTestCase(); - } - - // Returns a plan with a table scan. This is a single stage if 'numWorkers' is - // 1, otherwise this is a scan stage plus shuffle to a stage that gathers the - // scan results. - MultiFragmentPlanPtr makeScanPlan(const std::string& id, int32_t numWorkers) { - MultiFragmentPlan::Options options = { - .queryId = id, .numWorkers = numWorkers, .numDrivers = 2}; - const int32_t width = 3; - - DistributedPlanBuilder rootBuilder(options, idGenerator_, pool_.get()); - rootBuilder.tableScan("T", rowType_); - if (numWorkers > 1) { - rootBuilder.shuffle({}, 1, false); - } - return std::make_shared( - rootBuilder.fragments(), std::move(options)); - } - - MultiFragmentPlanPtr makeJoinPlan(std::string project = "c0") { - MultiFragmentPlan::Options options = { - .queryId = "test.", .numWorkers = 4, .numDrivers = 2}; - const int32_t width = 3; - - DistributedPlanBuilder rootBuilder(options, idGenerator_, pool_.get()); - rootBuilder.tableScan("T", rowType_) - .project({project}) - .shuffle({"c0"}, 3, false) - .hashJoin( - {"c0"}, - {"b0"}, - DistributedPlanBuilder(rootBuilder) - .tableScan("U", rowType_) - .project({"c0 as b0"}) - .shuffleResult({"b0"}, width, false), - "", - {"c0", "b0"}) - .shuffle({}, 1, false) - .finalAggregation({}, {"count(1)"}, {{BIGINT()}}); - return std::make_shared( - rootBuilder.fragments(), std::move(options)); - } - - static void makeAscending(const RowVectorPtr& rows, int32_t& counter) { - auto ints = rows->childAt(0)->as>(); - for (auto i = 0; i < ints->size(); ++i) { - ints->set(i, counter + i); - } - counter += ints->size(); - } - - void checkScanCount(const std::string& id, int32_t numWorkers) { - auto scan = makeScanPlan(id, numWorkers); - auto localRunner = std::make_shared( - std::move(scan), - makeQueryCtx("q1", rootPool_.get()), - splitSourceFactory_); - auto results = readCursor(localRunner); - - int32_t count = 0; - for (auto& rows : results) { - count += rows->size(); - } - localRunner->waitForCompletion(5000); - EXPECT_EQ(250'000, count); - } - - std::shared_ptr idGenerator_{ - std::make_shared()}; - // The below are declared static to be scoped to TestCase so as to reuse the - // dataset between tests. - - inline static RowTypePtr rowType_; -}; - -TEST_F(LocalRunnerTest, count) { - auto join = makeJoinPlan(); - auto localRunner = std::make_shared( - std::move(join), - makeQueryCtx("q1", rootPool_.get()), - splitSourceFactory_); - auto results = readCursor(localRunner); - auto stats = localRunner->stats(); - EXPECT_EQ(1, results.size()); - EXPECT_EQ(1, results[0]->size()); - EXPECT_EQ( - kNumRows, results[0]->childAt(0)->as>()->valueAt(0)); - results.clear(); - EXPECT_EQ(Runner::State::kFinished, localRunner->state()); - localRunner->waitForCompletion(5000); -} - -TEST_F(LocalRunnerTest, error) { - auto join = makeJoinPlan("if (c0 = 111, c0 / 0, c0 + 1) as c0"); - auto localRunner = std::make_shared( - std::move(join), - makeQueryCtx("q1", rootPool_.get()), - splitSourceFactory_); - EXPECT_THROW(readCursor(localRunner), VeloxUserError); - EXPECT_EQ(Runner::State::kError, localRunner->state()); - localRunner->waitForCompletion(5000); -} - -TEST_F(LocalRunnerTest, scan) { - checkScanCount("s1", 1); - checkScanCount("s2", 3); -}