Skip to content

Commit

Permalink
refactor: Add arg cpuExecutor to ConnectorFactory::newConnector (#11861)
Browse files Browse the repository at this point in the history
Summary:
Add an additional argument, cpuExecutor, to the ConnectorFactory's
newConnector interface. Connectors could send async operators to this
cpuExecutor to prevent occupying the driver executor.

Pull Request resolved: #11861

Reviewed By: xiaoxmeng

Differential Revision: D67231282

Pulled By: gggrace14

fbshipit-source-id: a92bb2c27ca26bf86a083ffea2a62b6c89231093
  • Loading branch information
gggrace14 authored and facebook-github-bot committed Dec 14, 2024
1 parent 1779351 commit 22ba7ec
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 9 deletions.
3 changes: 2 additions & 1 deletion velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,8 @@ class ConnectorFactory {
virtual std::shared_ptr<Connector> newConnector(
const std::string& id,
std::shared_ptr<const config::ConfigBase> config,
folly::Executor* executor = nullptr) = 0;
folly::Executor* ioExecutor = nullptr,
folly::Executor* cpuExecutor = nullptr) = 0;

private:
const std::string name_;
Expand Down
5 changes: 3 additions & 2 deletions velox/connectors/fuzzer/FuzzerConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,9 @@ class FuzzerConnectorFactory : public ConnectorFactory {
std::shared_ptr<Connector> newConnector(
const std::string& id,
std::shared_ptr<const config::ConfigBase> config,
folly::Executor* executor = nullptr) override {
return std::make_shared<FuzzerConnector>(id, config, executor);
folly::Executor* ioExecutor = nullptr,
folly::Executor* cpuExecutor = nullptr) override {
return std::make_shared<FuzzerConnector>(id, config, ioExecutor);
}
};

Expand Down
5 changes: 3 additions & 2 deletions velox/connectors/hive/HiveConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,9 @@ class HiveConnectorFactory : public ConnectorFactory {
std::shared_ptr<Connector> newConnector(
const std::string& id,
std::shared_ptr<const config::ConfigBase> config,
folly::Executor* executor = nullptr) override {
return std::make_shared<HiveConnector>(id, config, executor);
folly::Executor* ioExecutor = nullptr,
folly::Executor* cpuExecutor = nullptr) override {
return std::make_shared<HiveConnector>(id, config, ioExecutor);
}
};

Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/tests/ConnectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class TestConnectorFactory : public connector::ConnectorFactory {
std::shared_ptr<Connector> newConnector(
const std::string& id,
std::shared_ptr<const config::ConfigBase> /*config*/,
folly::Executor* /*executor*/ = nullptr) override {
folly::Executor* /*ioExecutor*/ = nullptr,
folly::Executor* /*cpuExecutor*/ = nullptr) override {
return std::make_shared<TestConnector>(id);
}
};
Expand Down
5 changes: 3 additions & 2 deletions velox/connectors/tpch/TpchConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,9 @@ class TpchConnectorFactory : public ConnectorFactory {
std::shared_ptr<Connector> newConnector(
const std::string& id,
std::shared_ptr<const config::ConfigBase> config,
folly::Executor* executor = nullptr) override {
return std::make_shared<TpchConnector>(id, config, executor);
folly::Executor* ioExecutor = nullptr,
folly::Executor* cpuExecutor = nullptr) override {
return std::make_shared<TpchConnector>(id, config, ioExecutor);
}
};

Expand Down
3 changes: 2 additions & 1 deletion velox/exec/tests/AsyncConnectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ class TestConnectorFactory : public connector::ConnectorFactory {
std::shared_ptr<connector::Connector> newConnector(
const std::string& id,
std::shared_ptr<const config::ConfigBase> config,
folly::Executor* /* executor */) override {
folly::Executor* /* ioExecutor */,
folly::Executor* /* cpuExecutor */) override {
return std::make_shared<TestConnector>(id);
}
};
Expand Down

0 comments on commit 22ba7ec

Please sign in to comment.