From 22ba7ec869e04a04b460418185e3ab673490e3f2 Mon Sep 17 00:00:00 2001 From: Ge Gao Date: Sat, 14 Dec 2024 15:08:06 -0800 Subject: [PATCH] refactor: Add arg cpuExecutor to ConnectorFactory::newConnector (#11861) Summary: Add an additional argument, cpuExecutor, to the ConnectorFactory's newConnector interface. Connectors could send async operators to this cpuExecutor to prevent occupying the driver executor. Pull Request resolved: https://github.com/facebookincubator/velox/pull/11861 Reviewed By: xiaoxmeng Differential Revision: D67231282 Pulled By: gggrace14 fbshipit-source-id: a92bb2c27ca26bf86a083ffea2a62b6c89231093 --- velox/connectors/Connector.h | 3 ++- velox/connectors/fuzzer/FuzzerConnector.h | 5 +++-- velox/connectors/hive/HiveConnector.h | 5 +++-- velox/connectors/tests/ConnectorTest.cpp | 3 ++- velox/connectors/tpch/TpchConnector.h | 5 +++-- velox/exec/tests/AsyncConnectorTest.cpp | 3 ++- 6 files changed, 15 insertions(+), 9 deletions(-) diff --git a/velox/connectors/Connector.h b/velox/connectors/Connector.h index 565e64fdb363..b0a0f108aa21 100644 --- a/velox/connectors/Connector.h +++ b/velox/connectors/Connector.h @@ -482,7 +482,8 @@ class ConnectorFactory { virtual std::shared_ptr newConnector( const std::string& id, std::shared_ptr config, - folly::Executor* executor = nullptr) = 0; + folly::Executor* ioExecutor = nullptr, + folly::Executor* cpuExecutor = nullptr) = 0; private: const std::string name_; diff --git a/velox/connectors/fuzzer/FuzzerConnector.h b/velox/connectors/fuzzer/FuzzerConnector.h index 4e8f665608b4..64477b73ea36 100644 --- a/velox/connectors/fuzzer/FuzzerConnector.h +++ b/velox/connectors/fuzzer/FuzzerConnector.h @@ -141,8 +141,9 @@ class FuzzerConnectorFactory : public ConnectorFactory { std::shared_ptr newConnector( const std::string& id, std::shared_ptr config, - folly::Executor* executor = nullptr) override { - return std::make_shared(id, config, executor); + folly::Executor* ioExecutor = nullptr, + folly::Executor* cpuExecutor = nullptr) override { + return std::make_shared(id, config, ioExecutor); } }; diff --git a/velox/connectors/hive/HiveConnector.h b/velox/connectors/hive/HiveConnector.h index a98a7f753289..673c83792a88 100644 --- a/velox/connectors/hive/HiveConnector.h +++ b/velox/connectors/hive/HiveConnector.h @@ -93,8 +93,9 @@ class HiveConnectorFactory : public ConnectorFactory { std::shared_ptr newConnector( const std::string& id, std::shared_ptr config, - folly::Executor* executor = nullptr) override { - return std::make_shared(id, config, executor); + folly::Executor* ioExecutor = nullptr, + folly::Executor* cpuExecutor = nullptr) override { + return std::make_shared(id, config, ioExecutor); } }; diff --git a/velox/connectors/tests/ConnectorTest.cpp b/velox/connectors/tests/ConnectorTest.cpp index 5681bc80cb04..96d46e607be9 100644 --- a/velox/connectors/tests/ConnectorTest.cpp +++ b/velox/connectors/tests/ConnectorTest.cpp @@ -59,7 +59,8 @@ class TestConnectorFactory : public connector::ConnectorFactory { std::shared_ptr newConnector( const std::string& id, std::shared_ptr /*config*/, - folly::Executor* /*executor*/ = nullptr) override { + folly::Executor* /*ioExecutor*/ = nullptr, + folly::Executor* /*cpuExecutor*/ = nullptr) override { return std::make_shared(id); } }; diff --git a/velox/connectors/tpch/TpchConnector.h b/velox/connectors/tpch/TpchConnector.h index babcda8e5cb0..02f6579828de 100644 --- a/velox/connectors/tpch/TpchConnector.h +++ b/velox/connectors/tpch/TpchConnector.h @@ -171,8 +171,9 @@ class TpchConnectorFactory : public ConnectorFactory { std::shared_ptr newConnector( const std::string& id, std::shared_ptr config, - folly::Executor* executor = nullptr) override { - return std::make_shared(id, config, executor); + folly::Executor* ioExecutor = nullptr, + folly::Executor* cpuExecutor = nullptr) override { + return std::make_shared(id, config, ioExecutor); } }; diff --git a/velox/exec/tests/AsyncConnectorTest.cpp b/velox/exec/tests/AsyncConnectorTest.cpp index b4c532bdfa6f..7f2ad55c00d6 100644 --- a/velox/exec/tests/AsyncConnectorTest.cpp +++ b/velox/exec/tests/AsyncConnectorTest.cpp @@ -164,7 +164,8 @@ class TestConnectorFactory : public connector::ConnectorFactory { std::shared_ptr newConnector( const std::string& id, std::shared_ptr config, - folly::Executor* /* executor */) override { + folly::Executor* /* ioExecutor */, + folly::Executor* /* cpuExecutor */) override { return std::make_shared(id); } };