Skip to content

Commit

Permalink
Readd: [velox][PR] Add runner for local distributed execution (facebo…
Browse files Browse the repository at this point in the history
…okincubator#11507)

Summary:
Reintroducing Orri's change based on PR: facebookincubator#11475
This diff attempts to fix the CMAKE errors of the first PR.

Original commit changeset: 6cf86c88d7c8

Original Phabricator Diff: D65631783

Differential Revision: D65779177

Pulled By: kgpai
  • Loading branch information
oerling authored and facebook-github-bot committed Nov 12, 2024
1 parent 7437093 commit d611c94
Show file tree
Hide file tree
Showing 27 changed files with 2,162 additions and 115 deletions.
1 change: 1 addition & 0 deletions velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ add_subdirectory(connectors)

if(${VELOX_ENABLE_EXEC})
add_subdirectory(exec)
add_subdirectory(runner)
endif()

if(${VELOX_ENABLE_DUCKDB})
Expand Down
108 changes: 108 additions & 0 deletions velox/connectors/hive/HiveConnectorSplit.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,112 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
static void registerSerDe();
};

class HiveConnectorSplitBuilder {
public:
explicit HiveConnectorSplitBuilder(std::string filePath)
: filePath_{std::move(filePath)} {
infoColumns_["$path"] = filePath_;
}

HiveConnectorSplitBuilder& start(uint64_t start) {
start_ = start;
return *this;
}

HiveConnectorSplitBuilder& length(uint64_t length) {
length_ = length;
return *this;
}

HiveConnectorSplitBuilder& splitWeight(int64_t splitWeight) {
splitWeight_ = splitWeight;
return *this;
}

HiveConnectorSplitBuilder& fileFormat(dwio::common::FileFormat format) {
fileFormat_ = format;
return *this;
}

HiveConnectorSplitBuilder& infoColumn(
const std::string& name,
const std::string& value) {
infoColumns_.emplace(std::move(name), std::move(value));
return *this;
}

HiveConnectorSplitBuilder& partitionKey(
std::string name,
std::optional<std::string> value) {
partitionKeys_.emplace(std::move(name), std::move(value));
return *this;
}

HiveConnectorSplitBuilder& tableBucketNumber(int32_t bucket) {
tableBucketNumber_ = bucket;
infoColumns_["$bucket"] = std::to_string(bucket);
return *this;
}

HiveConnectorSplitBuilder& customSplitInfo(
const std::unordered_map<std::string, std::string>& customSplitInfo) {
customSplitInfo_ = customSplitInfo;
return *this;
}

HiveConnectorSplitBuilder& extraFileInfo(
const std::shared_ptr<std::string>& extraFileInfo) {
extraFileInfo_ = extraFileInfo;
return *this;
}

HiveConnectorSplitBuilder& serdeParameters(
const std::unordered_map<std::string, std::string>& serdeParameters) {
serdeParameters_ = serdeParameters;
return *this;
}

HiveConnectorSplitBuilder& connectorId(const std::string& connectorId) {
connectorId_ = connectorId;
return *this;
}

HiveConnectorSplitBuilder& fileProperties(FileProperties fileProperties) {
fileProperties_ = fileProperties;
return *this;
}

std::shared_ptr<connector::hive::HiveConnectorSplit> build() const {
return std::make_shared<connector::hive::HiveConnectorSplit>(
connectorId_,
filePath_,
fileFormat_,
start_,
length_,
partitionKeys_,
tableBucketNumber_,
customSplitInfo_,
extraFileInfo_,
serdeParameters_,
splitWeight_,
infoColumns_,
fileProperties_);
}

private:
const std::string filePath_;
dwio::common::FileFormat fileFormat_{dwio::common::FileFormat::DWRF};
uint64_t start_{0};
uint64_t length_{std::numeric_limits<uint64_t>::max()};
std::unordered_map<std::string, std::optional<std::string>> partitionKeys_;
std::optional<int32_t> tableBucketNumber_;
std::unordered_map<std::string, std::string> customSplitInfo_ = {};
std::shared_ptr<std::string> extraFileInfo_ = {};
std::unordered_map<std::string, std::string> serdeParameters_ = {};
std::unordered_map<std::string, std::string> infoColumns_ = {};
std::string connectorId_;
int64_t splitWeight_{0};
std::optional<FileProperties> fileProperties_;
};

} // namespace facebook::velox::connector::hive
1 change: 1 addition & 0 deletions velox/connectors/hive/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ add_executable(
HiveConnectorSerDeTest.cpp
HivePartitionFunctionTest.cpp
HivePartitionUtilTest.cpp
HiveSplitTest.cpp
PartitionIdGeneratorTest.cpp
TableHandleTest.cpp)
add_test(velox_hive_connector_test velox_hive_connector_test)
Expand Down
64 changes: 64 additions & 0 deletions velox/connectors/hive/tests/HiveSplitTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "gtest/gtest.h"
#include "velox/common/config/Config.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"

using namespace facebook::velox;
using namespace facebook::velox::connector::hive;

TEST(HiveSplitTest, builder) {
FileProperties properties = {11, 1111};
auto extra = std::make_shared<std::string>("extra file info");
std::unordered_map<std::string, std::string> custom;
custom["custom1"] = "customValue1";
std::unordered_map<std::string, std::string> serde;
serde["serde1"] = "serdeValue1";
auto split = HiveConnectorSplitBuilder("filepath")
.start(100)
.length(100000)
.splitWeight(1)
.fileFormat(dwio::common::FileFormat::DWRF)
.infoColumn("info1", "infoValue1")
.partitionKey("DS", "2024-11-01")
.tableBucketNumber(11)
.customSplitInfo(custom)
.extraFileInfo(extra)
.serdeParameters(serde)
.connectorId("connectorId")
.fileProperties(properties)
.build();

EXPECT_EQ(100, split->start);
EXPECT_EQ(100000, split->length);
EXPECT_EQ(1, split->splitWeight);
EXPECT_TRUE(dwio::common::FileFormat::DWRF == split->fileFormat);
EXPECT_EQ("infoValue1", split->infoColumns["info1"]);
auto it = split->partitionKeys.find("DS");
EXPECT_TRUE(it != split->partitionKeys.end());
EXPECT_EQ("2024-11-01", it->second.value());
EXPECT_EQ(11, split->tableBucketNumber.value());
EXPECT_EQ("customValue1", split->customSplitInfo["custom1"]);
EXPECT_EQ(std::string("extra file info"), *split->extraFileInfo);
EXPECT_EQ("serdeValue1", split->serdeParameters["serde1"]);
EXPECT_EQ("connectorId", split->connectorId);
EXPECT_EQ(
properties.fileSize.value(), split->properties.value().fileSize.value());
EXPECT_EQ(
properties.modificationTime.value(),
split->properties.value().modificationTime.value());
}
30 changes: 16 additions & 14 deletions velox/exec/tests/TableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class TableScanTest : public virtual HiveConnectorTestBase {
const std::string& filePath,
const TypePtr& partitionType,
const std::optional<std::string>& partitionValue) {
auto split = HiveConnectorSplitBuilder(filePath)
auto split = exec::test::HiveConnectorSplitBuilder(filePath)
.partitionKey("pkey", partitionValue)
.build();
auto outputType =
Expand Down Expand Up @@ -411,7 +411,7 @@ TEST_F(TableScanTest, partitionKeyAlias) {
{"a", regularColumn("c0", BIGINT())},
{"ds_alias", partitionKey("ds", VARCHAR())}};

auto split = HiveConnectorSplitBuilder(filePath->getPath())
auto split = exec::test::HiveConnectorSplitBuilder(filePath->getPath())
.partitionKey("ds", "2021-12-02")
.build();

Expand Down Expand Up @@ -1806,7 +1806,8 @@ TEST_F(TableScanTest, splitOffsetAndLength) {
}

TEST_F(TableScanTest, fileNotFound) {
auto split = HiveConnectorSplitBuilder("/path/to/nowhere.orc").build();
auto split =
exec::test::HiveConnectorSplitBuilder("/path/to/nowhere.orc").build();
auto assertMissingFile = [&](bool ignoreMissingFiles) {
AssertQueryBuilder(tableScanNode())
.connectorSessionProperty(
Expand All @@ -1829,7 +1830,7 @@ TEST_F(TableScanTest, validFileNoData) {

auto filePath = facebook::velox::test::getDataFilePath(
"velox/exec/tests", "data/emptyPresto.dwrf");
auto split = HiveConnectorSplitBuilder(filePath)
auto split = exec::test::HiveConnectorSplitBuilder(filePath)
.start(0)
.length(fs::file_size(filePath) / 2)
.build();
Expand Down Expand Up @@ -1949,7 +1950,7 @@ TEST_F(TableScanTest, partitionedTableDateKey) {

// Test partition filter on date column.
{
auto split = HiveConnectorSplitBuilder(filePath->getPath())
auto split = exec::test::HiveConnectorSplitBuilder(filePath->getPath())
.partitionKey("pkey", partitionValue)
.build();
auto outputType = ROW({"pkey", "c0", "c1"}, {DATE(), BIGINT(), DOUBLE()});
Expand Down Expand Up @@ -2851,9 +2852,10 @@ TEST_F(TableScanTest, bucket) {
writeToFile(filePaths[i]->getPath(), rowVector);
rowVectors.emplace_back(rowVector);

splits.emplace_back(HiveConnectorSplitBuilder(filePaths[i]->getPath())
.tableBucketNumber(bucket)
.build());
splits.emplace_back(
exec::test::HiveConnectorSplitBuilder(filePaths[i]->getPath())
.tableBucketNumber(bucket)
.build());
}

createDuckDbTable(rowVectors);
Expand All @@ -2877,7 +2879,7 @@ TEST_F(TableScanTest, bucket) {

for (int i = 0; i < buckets.size(); ++i) {
int bucketValue = buckets[i];
auto hsplit = HiveConnectorSplitBuilder(filePaths[i]->getPath())
auto hsplit = exec::test::HiveConnectorSplitBuilder(filePaths[i]->getPath())
.tableBucketNumber(bucketValue)
.build();

Expand All @@ -2897,7 +2899,7 @@ TEST_F(TableScanTest, bucket) {

// Filter on bucket column, but don't project it out
auto rowTypes = ROW({"c0", "c1"}, {INTEGER(), BIGINT()});
hsplit = HiveConnectorSplitBuilder(filePaths[i]->getPath())
hsplit = exec::test::HiveConnectorSplitBuilder(filePaths[i]->getPath())
.tableBucketNumber(bucketValue)
.build();
op = PlanBuilder()
Expand Down Expand Up @@ -4141,7 +4143,7 @@ TEST_F(TableScanTest, reuseRowVector) {
.tableScan(rowType, {}, "c0 < 5")
.project({"c1.c0"})
.planNode();
auto split = HiveConnectorSplitBuilder(file->getPath()).build();
auto split = exec::test::HiveConnectorSplitBuilder(file->getPath()).build();
auto expected = makeRowVector(
{makeFlatVector<int32_t>(10, [](auto i) { return i % 5; })});
AssertQueryBuilder(plan).splits({split, split}).assertResults(expected);
Expand Down Expand Up @@ -4722,7 +4724,7 @@ TEST_F(TableScanTest, varbinaryPartitionKey) {
{"a", regularColumn("c0", BIGINT())},
{"ds_alias", partitionKey("ds", VARBINARY())}};

auto split = HiveConnectorSplitBuilder(filePath->getPath())
auto split = exec::test::HiveConnectorSplitBuilder(filePath->getPath())
.partitionKey("ds", "2021-12-02")
.build();

Expand Down Expand Up @@ -4761,7 +4763,7 @@ TEST_F(TableScanTest, timestampPartitionKey) {
ColumnHandleMap assignments = {{"t", partitionKey("t", TIMESTAMP())}};
std::vector<std::shared_ptr<connector::ConnectorSplit>> splits;
for (auto& t : inputs) {
splits.push_back(HiveConnectorSplitBuilder(filePath->getPath())
splits.push_back(exec::test::HiveConnectorSplitBuilder(filePath->getPath())
.partitionKey("t", t)
.build());
}
Expand All @@ -4780,7 +4782,7 @@ TEST_F(TableScanTest, partitionKeyNotMatchPartitionKeysHandle) {
writeToFile(filePath->getPath(), vectors);
createDuckDbTable(vectors);

auto split = HiveConnectorSplitBuilder(filePath->getPath())
auto split = exec::test::HiveConnectorSplitBuilder(filePath->getPath())
.partitionKey("ds", "2021-12-02")
.build();

Expand Down
3 changes: 3 additions & 0 deletions velox/exec/tests/utils/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ add_library(
AssertQueryBuilder.cpp
ArbitratorTestUtil.cpp
Cursor.cpp
DistributedPlanBuilder.cpp
HiveConnectorTestBase.cpp
LocalExchangeSource.cpp
LocalRunnerTestBase.cpp
OperatorTestBase.cpp
PlanBuilder.cpp
QueryAssertions.cpp
Expand All @@ -35,6 +37,7 @@ add_library(

target_link_libraries(
velox_exec_test_lib
velox_local_runner
velox_vector_test_lib
velox_temp_path
velox_core
Expand Down
20 changes: 20 additions & 0 deletions velox/exec/tests/utils/Cursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ class MultiThreadedTaskCursor : public TaskCursorBase {
/// Starts the task if not started yet.
bool moveNext() override {
start();
if (error_) {
std::rethrow_exception(error_);
}

current_ = queue_->dequeue();
if (task_->error()) {
// Wait for the task to finish (there's' a small period of time between
Expand Down Expand Up @@ -302,6 +306,13 @@ class MultiThreadedTaskCursor : public TaskCursorBase {
return current_;
}

void setError(std::exception_ptr error) override {
error_ = error;
if (task_) {
task_->setError(error);
}
}

const std::shared_ptr<Task>& task() override {
return task_;
}
Expand All @@ -316,6 +327,7 @@ class MultiThreadedTaskCursor : public TaskCursorBase {
std::shared_ptr<exec::Task> task_;
RowVectorPtr current_;
bool atEnd_{false};
std::exception_ptr error_;
};

class SingleThreadedTaskCursor : public TaskCursorBase {
Expand Down Expand Up @@ -391,6 +403,13 @@ class SingleThreadedTaskCursor : public TaskCursorBase {
return current_;
}

void setError(std::exception_ptr error) override {
error_ = error;
if (task_) {
task_->setError(error);
}
}

const std::shared_ptr<Task>& task() override {
return task_;
}
Expand All @@ -399,6 +418,7 @@ class SingleThreadedTaskCursor : public TaskCursorBase {
std::shared_ptr<exec::Task> task_;
RowVectorPtr current_;
RowVectorPtr next_;
std::exception_ptr error_;
};

std::unique_ptr<TaskCursor> TaskCursor::create(const CursorParameters& params) {
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/tests/utils/Cursor.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ class TaskCursor {

virtual RowVectorPtr& current() = 0;

virtual void setError(std::exception_ptr error) = 0;

virtual const std::shared_ptr<Task>& task() = 0;
};

Expand Down
Loading

0 comments on commit d611c94

Please sign in to comment.