Skip to content

Commit

Permalink
Back out "Readd: [velox][PR] Add runner for local distributed execution"
Browse files Browse the repository at this point in the history
Summary:
Backing out to unbreak OSS CI. The PR facebookincubator#11507 seems to have broken pyVelox builds. Backing out so we can investigate and fix.

Original commit changeset: 4e52927c05ff

Original Phabricator Diff: D65779177

Differential Revision: D65900074
  • Loading branch information
Krishna Pai authored and facebook-github-bot committed Nov 13, 2024
1 parent 6c25a91 commit 60ac5ab
Show file tree
Hide file tree
Showing 27 changed files with 115 additions and 2,162 deletions.
1 change: 0 additions & 1 deletion velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ add_subdirectory(connectors)

if(${VELOX_ENABLE_EXEC})
add_subdirectory(exec)
add_subdirectory(runner)
endif()

if(${VELOX_ENABLE_DUCKDB})
Expand Down
108 changes: 0 additions & 108 deletions velox/connectors/hive/HiveConnectorSplit.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,112 +111,4 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
static void registerSerDe();
};

class HiveConnectorSplitBuilder {
public:
explicit HiveConnectorSplitBuilder(std::string filePath)
: filePath_{std::move(filePath)} {
infoColumns_["$path"] = filePath_;
}

HiveConnectorSplitBuilder& start(uint64_t start) {
start_ = start;
return *this;
}

HiveConnectorSplitBuilder& length(uint64_t length) {
length_ = length;
return *this;
}

HiveConnectorSplitBuilder& splitWeight(int64_t splitWeight) {
splitWeight_ = splitWeight;
return *this;
}

HiveConnectorSplitBuilder& fileFormat(dwio::common::FileFormat format) {
fileFormat_ = format;
return *this;
}

HiveConnectorSplitBuilder& infoColumn(
const std::string& name,
const std::string& value) {
infoColumns_.emplace(std::move(name), std::move(value));
return *this;
}

HiveConnectorSplitBuilder& partitionKey(
std::string name,
std::optional<std::string> value) {
partitionKeys_.emplace(std::move(name), std::move(value));
return *this;
}

HiveConnectorSplitBuilder& tableBucketNumber(int32_t bucket) {
tableBucketNumber_ = bucket;
infoColumns_["$bucket"] = std::to_string(bucket);
return *this;
}

HiveConnectorSplitBuilder& customSplitInfo(
const std::unordered_map<std::string, std::string>& customSplitInfo) {
customSplitInfo_ = customSplitInfo;
return *this;
}

HiveConnectorSplitBuilder& extraFileInfo(
const std::shared_ptr<std::string>& extraFileInfo) {
extraFileInfo_ = extraFileInfo;
return *this;
}

HiveConnectorSplitBuilder& serdeParameters(
const std::unordered_map<std::string, std::string>& serdeParameters) {
serdeParameters_ = serdeParameters;
return *this;
}

HiveConnectorSplitBuilder& connectorId(const std::string& connectorId) {
connectorId_ = connectorId;
return *this;
}

HiveConnectorSplitBuilder& fileProperties(FileProperties fileProperties) {
fileProperties_ = fileProperties;
return *this;
}

std::shared_ptr<connector::hive::HiveConnectorSplit> build() const {
return std::make_shared<connector::hive::HiveConnectorSplit>(
connectorId_,
filePath_,
fileFormat_,
start_,
length_,
partitionKeys_,
tableBucketNumber_,
customSplitInfo_,
extraFileInfo_,
serdeParameters_,
splitWeight_,
infoColumns_,
fileProperties_);
}

private:
const std::string filePath_;
dwio::common::FileFormat fileFormat_{dwio::common::FileFormat::DWRF};
uint64_t start_{0};
uint64_t length_{std::numeric_limits<uint64_t>::max()};
std::unordered_map<std::string, std::optional<std::string>> partitionKeys_;
std::optional<int32_t> tableBucketNumber_;
std::unordered_map<std::string, std::string> customSplitInfo_ = {};
std::shared_ptr<std::string> extraFileInfo_ = {};
std::unordered_map<std::string, std::string> serdeParameters_ = {};
std::unordered_map<std::string, std::string> infoColumns_ = {};
std::string connectorId_;
int64_t splitWeight_{0};
std::optional<FileProperties> fileProperties_;
};

} // namespace facebook::velox::connector::hive
1 change: 0 additions & 1 deletion velox/connectors/hive/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ add_executable(
HiveConnectorSerDeTest.cpp
HivePartitionFunctionTest.cpp
HivePartitionUtilTest.cpp
HiveSplitTest.cpp
PartitionIdGeneratorTest.cpp
TableHandleTest.cpp)
add_test(velox_hive_connector_test velox_hive_connector_test)
Expand Down
64 changes: 0 additions & 64 deletions velox/connectors/hive/tests/HiveSplitTest.cpp

This file was deleted.

30 changes: 14 additions & 16 deletions velox/exec/tests/TableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class TableScanTest : public virtual HiveConnectorTestBase {
const std::string& filePath,
const TypePtr& partitionType,
const std::optional<std::string>& partitionValue) {
auto split = exec::test::HiveConnectorSplitBuilder(filePath)
auto split = HiveConnectorSplitBuilder(filePath)
.partitionKey("pkey", partitionValue)
.build();
auto outputType =
Expand Down Expand Up @@ -411,7 +411,7 @@ TEST_F(TableScanTest, partitionKeyAlias) {
{"a", regularColumn("c0", BIGINT())},
{"ds_alias", partitionKey("ds", VARCHAR())}};

auto split = exec::test::HiveConnectorSplitBuilder(filePath->getPath())
auto split = HiveConnectorSplitBuilder(filePath->getPath())
.partitionKey("ds", "2021-12-02")
.build();

Expand Down Expand Up @@ -1806,8 +1806,7 @@ TEST_F(TableScanTest, splitOffsetAndLength) {
}

TEST_F(TableScanTest, fileNotFound) {
auto split =
exec::test::HiveConnectorSplitBuilder("/path/to/nowhere.orc").build();
auto split = HiveConnectorSplitBuilder("/path/to/nowhere.orc").build();
auto assertMissingFile = [&](bool ignoreMissingFiles) {
AssertQueryBuilder(tableScanNode())
.connectorSessionProperty(
Expand All @@ -1830,7 +1829,7 @@ TEST_F(TableScanTest, validFileNoData) {

auto filePath = facebook::velox::test::getDataFilePath(
"velox/exec/tests", "data/emptyPresto.dwrf");
auto split = exec::test::HiveConnectorSplitBuilder(filePath)
auto split = HiveConnectorSplitBuilder(filePath)
.start(0)
.length(fs::file_size(filePath) / 2)
.build();
Expand Down Expand Up @@ -1950,7 +1949,7 @@ TEST_F(TableScanTest, partitionedTableDateKey) {

// Test partition filter on date column.
{
auto split = exec::test::HiveConnectorSplitBuilder(filePath->getPath())
auto split = HiveConnectorSplitBuilder(filePath->getPath())
.partitionKey("pkey", partitionValue)
.build();
auto outputType = ROW({"pkey", "c0", "c1"}, {DATE(), BIGINT(), DOUBLE()});
Expand Down Expand Up @@ -2852,10 +2851,9 @@ TEST_F(TableScanTest, bucket) {
writeToFile(filePaths[i]->getPath(), rowVector);
rowVectors.emplace_back(rowVector);

splits.emplace_back(
exec::test::HiveConnectorSplitBuilder(filePaths[i]->getPath())
.tableBucketNumber(bucket)
.build());
splits.emplace_back(HiveConnectorSplitBuilder(filePaths[i]->getPath())
.tableBucketNumber(bucket)
.build());
}

createDuckDbTable(rowVectors);
Expand All @@ -2879,7 +2877,7 @@ TEST_F(TableScanTest, bucket) {

for (int i = 0; i < buckets.size(); ++i) {
int bucketValue = buckets[i];
auto hsplit = exec::test::HiveConnectorSplitBuilder(filePaths[i]->getPath())
auto hsplit = HiveConnectorSplitBuilder(filePaths[i]->getPath())
.tableBucketNumber(bucketValue)
.build();

Expand All @@ -2899,7 +2897,7 @@ TEST_F(TableScanTest, bucket) {

// Filter on bucket column, but don't project it out
auto rowTypes = ROW({"c0", "c1"}, {INTEGER(), BIGINT()});
hsplit = exec::test::HiveConnectorSplitBuilder(filePaths[i]->getPath())
hsplit = HiveConnectorSplitBuilder(filePaths[i]->getPath())
.tableBucketNumber(bucketValue)
.build();
op = PlanBuilder()
Expand Down Expand Up @@ -4143,7 +4141,7 @@ TEST_F(TableScanTest, reuseRowVector) {
.tableScan(rowType, {}, "c0 < 5")
.project({"c1.c0"})
.planNode();
auto split = exec::test::HiveConnectorSplitBuilder(file->getPath()).build();
auto split = HiveConnectorSplitBuilder(file->getPath()).build();
auto expected = makeRowVector(
{makeFlatVector<int32_t>(10, [](auto i) { return i % 5; })});
AssertQueryBuilder(plan).splits({split, split}).assertResults(expected);
Expand Down Expand Up @@ -4724,7 +4722,7 @@ TEST_F(TableScanTest, varbinaryPartitionKey) {
{"a", regularColumn("c0", BIGINT())},
{"ds_alias", partitionKey("ds", VARBINARY())}};

auto split = exec::test::HiveConnectorSplitBuilder(filePath->getPath())
auto split = HiveConnectorSplitBuilder(filePath->getPath())
.partitionKey("ds", "2021-12-02")
.build();

Expand Down Expand Up @@ -4763,7 +4761,7 @@ TEST_F(TableScanTest, timestampPartitionKey) {
ColumnHandleMap assignments = {{"t", partitionKey("t", TIMESTAMP())}};
std::vector<std::shared_ptr<connector::ConnectorSplit>> splits;
for (auto& t : inputs) {
splits.push_back(exec::test::HiveConnectorSplitBuilder(filePath->getPath())
splits.push_back(HiveConnectorSplitBuilder(filePath->getPath())
.partitionKey("t", t)
.build());
}
Expand All @@ -4782,7 +4780,7 @@ TEST_F(TableScanTest, partitionKeyNotMatchPartitionKeysHandle) {
writeToFile(filePath->getPath(), vectors);
createDuckDbTable(vectors);

auto split = exec::test::HiveConnectorSplitBuilder(filePath->getPath())
auto split = HiveConnectorSplitBuilder(filePath->getPath())
.partitionKey("ds", "2021-12-02")
.build();

Expand Down
3 changes: 0 additions & 3 deletions velox/exec/tests/utils/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ add_library(
AssertQueryBuilder.cpp
ArbitratorTestUtil.cpp
Cursor.cpp
DistributedPlanBuilder.cpp
HiveConnectorTestBase.cpp
LocalExchangeSource.cpp
LocalRunnerTestBase.cpp
OperatorTestBase.cpp
PlanBuilder.cpp
QueryAssertions.cpp
Expand All @@ -37,7 +35,6 @@ add_library(

target_link_libraries(
velox_exec_test_lib
velox_local_runner
velox_vector_test_lib
velox_temp_path
velox_core
Expand Down
20 changes: 0 additions & 20 deletions velox/exec/tests/utils/Cursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,6 @@ class MultiThreadedTaskCursor : public TaskCursorBase {
/// Starts the task if not started yet.
bool moveNext() override {
start();
if (error_) {
std::rethrow_exception(error_);
}

current_ = queue_->dequeue();
if (task_->error()) {
// Wait for the task to finish (there's' a small period of time between
Expand Down Expand Up @@ -306,13 +302,6 @@ class MultiThreadedTaskCursor : public TaskCursorBase {
return current_;
}

void setError(std::exception_ptr error) override {
error_ = error;
if (task_) {
task_->setError(error);
}
}

const std::shared_ptr<Task>& task() override {
return task_;
}
Expand All @@ -327,7 +316,6 @@ class MultiThreadedTaskCursor : public TaskCursorBase {
std::shared_ptr<exec::Task> task_;
RowVectorPtr current_;
bool atEnd_{false};
std::exception_ptr error_;
};

class SingleThreadedTaskCursor : public TaskCursorBase {
Expand Down Expand Up @@ -403,13 +391,6 @@ class SingleThreadedTaskCursor : public TaskCursorBase {
return current_;
}

void setError(std::exception_ptr error) override {
error_ = error;
if (task_) {
task_->setError(error);
}
}

const std::shared_ptr<Task>& task() override {
return task_;
}
Expand All @@ -418,7 +399,6 @@ class SingleThreadedTaskCursor : public TaskCursorBase {
std::shared_ptr<exec::Task> task_;
RowVectorPtr current_;
RowVectorPtr next_;
std::exception_ptr error_;
};

std::unique_ptr<TaskCursor> TaskCursor::create(const CursorParameters& params) {
Expand Down
2 changes: 0 additions & 2 deletions velox/exec/tests/utils/Cursor.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@ class TaskCursor {

virtual RowVectorPtr& current() = 0;

virtual void setError(std::exception_ptr error) = 0;

virtual const std::shared_ptr<Task>& task() = 0;
};

Expand Down
Loading

0 comments on commit 60ac5ab

Please sign in to comment.