-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Create a local thrift service provider for RemoteFunction (#11538)
Summary: This diff is to extract the common logic related to starting RemoteFunctionService to a single place, so that we can later reuse it. - Create a Class RemoteFunctionServiceProvider and extract the logic related to starting a local thrift server for RemoteFunctionService there. - Create a Singleton remoteFunctionServiceProviderSingleton so that the server is only started once per process. - Expose helper methods `getRemoteFunctionServiceParamsForLocalThrift()` - Use this in RemoteFunctionTest.cpp Differential Revision: D65927269
- Loading branch information
1 parent
42bd38a
commit 7692181
Showing
6 changed files
with
198 additions
and
59 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,3 +15,4 @@ | |
add_subdirectory(if) | ||
add_subdirectory(client) | ||
add_subdirectory(server) | ||
add_subdirectory(utils) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
# Copyright (c) Facebook, Inc. and its affiliates. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
add_library(velox_functions_remote_utils RemoteFunctionServiceProvider.cpp) | ||
|
||
target_link_libraries( | ||
velox_functions_remote_utils | ||
PUBLIC velox_functions_remote_server) |
76 changes: 76 additions & 0 deletions
76
velox/functions/remote/utils/RemoteFunctionServiceProvider.cpp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
/* | ||
* Copyright (c) Facebook, Inc. and its affiliates. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
#include "velox/functions/remote/utils/RemoteFunctionServiceProvider.h" | ||
|
||
#include "thrift/lib/cpp2/server/ThriftServer.h" | ||
#include "velox/functions/remote/server/RemoteFunctionService.h" | ||
|
||
namespace facebook::velox::functions { | ||
|
||
RemoteFunctionServiceParams | ||
RemoteFunctionServiceProviderForLocalThrift::getRemoteFunctionServiceParams() { | ||
folly::call_once(initializeServiceFlag_, [&]() { initializeServer(); }); | ||
return RemoteFunctionServiceParams{ | ||
remotePrefix_, | ||
location_, | ||
}; | ||
} | ||
|
||
void RemoteFunctionServiceProviderForLocalThrift::initializeServer() { | ||
auto handler = | ||
std::make_shared<velox::functions::RemoteFunctionServiceHandler>( | ||
remotePrefix_); | ||
server_ = std::make_shared<apache::thrift::ThriftServer>(); | ||
server_->setInterface(handler); | ||
server_->setAddress(location_); | ||
|
||
thread_ = std::make_unique<std::thread>([&] { server_->serve(); }); | ||
VELOX_CHECK(waitForRunning(), "Unable to initialize thrift server."); | ||
LOG(INFO) << "Thrift server is up and running in local port " << location_; | ||
} | ||
|
||
bool RemoteFunctionServiceProviderForLocalThrift::waitForRunning() { | ||
for (size_t i = 0; i < 100; ++i) { | ||
if (server_->getServerStatus() == | ||
apache::thrift::ThriftServer::ServerStatus::RUNNING) { | ||
return true; | ||
} | ||
std::this_thread::sleep_for(std::chrono::milliseconds(500)); | ||
} | ||
return false; | ||
} | ||
|
||
RemoteFunctionServiceProviderForLocalThrift:: | ||
~RemoteFunctionServiceProviderForLocalThrift() { | ||
server_->stop(); | ||
thread_->join(); | ||
LOG(INFO) << "Thrift server stopped."; | ||
} | ||
|
||
RemoteFunctionServiceParams startLocalThriftServiceAndGetParams() { | ||
static folly::Singleton<IRemoteFunctionServiceProvider> | ||
remoteFunctionServiceProviderForLocalThriftSingleton{ | ||
[]() { return new RemoteFunctionServiceProviderForLocalThrift(); }}; | ||
auto provider = | ||
remoteFunctionServiceProviderForLocalThriftSingleton.try_get(); | ||
if (!provider) { | ||
throw std::runtime_error("local remoteFunctionProvider is not available"); | ||
} | ||
return provider->getRemoteFunctionServiceParams(); | ||
} | ||
|
||
} // namespace facebook::velox::functions |
91 changes: 91 additions & 0 deletions
91
velox/functions/remote/utils/RemoteFunctionServiceProvider.h
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
/* | ||
* Copyright (c) Facebook, Inc. and its affiliates. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
#pragma once | ||
|
||
#include <memory> | ||
#include <string> | ||
#include <thread> | ||
|
||
#include <folly/SocketAddress.h> | ||
#include <folly/synchronization/CallOnce.h> | ||
|
||
namespace apache::thrift { | ||
class ThriftServer; | ||
} // namespace apache::thrift | ||
|
||
namespace facebook::velox::functions { | ||
|
||
constexpr std::string_view kRemoteFunctionPrefix = "remote"; | ||
|
||
struct RemoteFunctionServiceParams { | ||
// Function prefix to be used for registering the actual functions. | ||
// This is needed when server is running in the same process. | ||
std::string functionPrefix; | ||
|
||
// The socket address that the thrift server is running on. | ||
folly::SocketAddress serverAddress; | ||
}; | ||
|
||
class IRemoteFunctionServiceProvider { | ||
public: | ||
virtual ~IRemoteFunctionServiceProvider() = default; | ||
virtual RemoteFunctionServiceParams getRemoteFunctionServiceParams() = 0; | ||
}; | ||
|
||
class RemoteFunctionServiceProviderForLocalThrift | ||
: public IRemoteFunctionServiceProvider { | ||
public: | ||
// Creates a thrift server that runs in a separate thread | ||
// and returns the parameters of the service. | ||
RemoteFunctionServiceParams getRemoteFunctionServiceParams() override; | ||
|
||
~RemoteFunctionServiceProviderForLocalThrift() override; | ||
|
||
private: | ||
void initializeServer(); | ||
|
||
// Loop until the server is up and running. | ||
bool waitForRunning(); | ||
|
||
std::shared_ptr<apache::thrift::ThriftServer> server_; | ||
std::unique_ptr<std::thread> thread_; | ||
|
||
// Creates a random temporary file name to use to communicate as a unix domain | ||
// socket. | ||
folly::SocketAddress location_ = []() { | ||
char name[] = "/tmp/socketXXXXXX"; | ||
int fd = mkstemp(name); | ||
if (fd < 0) { | ||
throw std::runtime_error("Failed to create temporary file for socket"); | ||
} | ||
close(fd); | ||
std::string socketPath(name); | ||
// Cleanup existing socket file if it exists. | ||
unlink(socketPath.c_str()); | ||
return folly::SocketAddress::makeFromPath(socketPath); | ||
}(); | ||
|
||
const std::string remotePrefix_{kRemoteFunctionPrefix}; | ||
folly::once_flag initializeServiceFlag_; | ||
}; | ||
|
||
// If no thrift server is currently running, creates a thrift server | ||
// that runs in a separate thread and returns the parameters of the service. | ||
// If a thrift server is already running, returns the parameters of the service. | ||
RemoteFunctionServiceParams startLocalThriftServiceAndGetParams(); | ||
|
||
} // namespace facebook::velox::functions |