-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(functions): Add support for REST based remote functions #10911
base: main
Are you sure you want to change the base?
Conversation
✅ Deploy Preview for meta-velox canceled.
|
b88d136
to
b85e0e6
Compare
0cd4510
to
74023dc
Compare
Pretty cool! I see the PR is still as draft, but I can help review when it's ready. Would also be nice to add some documentation on how to use it, the configs parameters, etc. |
abe87e1
to
6c1606e
Compare
05115f4
to
2ffec26
Compare
@aditi-pandit Can you please review the changes? |
/// (non-remote) function registered with the same name. The `overwrite` flag | ||
/// controls whether to overwrite in these cases. | ||
/// (non-remote) function registered with the same name. The `overwrite` | ||
/// flagwrite controls whether to overwrite in these cases. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit : wording... maybe write is not needed here.
#include <folly/io/async/EventBase.h> | ||
#include <sstream> | ||
#include <string> | ||
#include "velox/common/memory/ByteStream.h" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add empty line between the system and velox includes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added new line
#include "velox/vector/VectorStream.h" | ||
|
||
#include "velox/functions/remote/client/RestClient.h" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this include to the correct alphabetical order in the previous velox includes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
corrected the inclusion list
/// Network address of the servr to communicate with. Note that this can hold | ||
/// a network location (ip/port pair) or a unix domain socket path (see | ||
/// URL of the HTTP/REST server for remote function. | ||
/// Or Network address of the servr to communicate with. Note that this can |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit : spelling "server"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected it
} | ||
size_t writeCallback(char* ptr, size_t size, size_t nmemb, void* userdata) { | ||
auto* outputBuf = static_cast<IOBufQueue*>(userdata); | ||
size_t total_size = size * nmemb; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit : Use camelCase naming -> totalSize
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected it
return totalCopied; | ||
} | ||
size_t writeCallback(char* ptr, size_t size, size_t nmemb, void* userdata) { | ||
auto* outputBuf = static_cast<IOBufQueue*>(userdata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use camel case "userData"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected it
using namespace folly; | ||
namespace facebook::velox::functions { | ||
namespace { | ||
size_t readCallback(char* dest, size_t size, size_t nmemb, void* userp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please write comments explaining the signature and the parameters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the documentation
|
||
class RestClient : public HttpClient { | ||
public: | ||
std::unique_ptr<folly::IOBuf> performCurlRequest( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please write the documentation for this API. What is it for ? What do the parameters mean ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the documentation
memory::memoryManager()->addLeafPool()}; | ||
}; | ||
|
||
class listener : public std::enable_shared_from_this<listener> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add some documentation about these classes and what are they for ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added documentation
// called to use the functions mentioned in this map | ||
}; | ||
|
||
TypePtr deserializeType(const std::string& input) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a lot of repetition between this code and https://github.com/facebookincubator/velox/blob/main/velox/functions/remote/server/RemoteFunctionService.cpp. Please can you refactor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
introduced the RemoteFunctionHelper.h and moved the duplicate code.
394cc7f
to
98a6281
Compare
@@ -16,11 +16,23 @@ velox_add_library(velox_functions_remote_thrift_client ThriftClient.cpp) | |||
velox_link_libraries(velox_functions_remote_thrift_client | |||
PUBLIC remote_function_thrift FBThrift::thriftcpp2) | |||
|
|||
set(curl_SOURCE BUNDLED) | |||
velox_resolve_dependency(curl) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't actually build curl. We only use it to force cpr to use the version we want. So if you require curl I would say use set_source
to allow system curl to be used (this might require changes to cpr.cmake but iirc cpr can work with system curl as well).
Also move this into the root cml within an if().
|
||
velox_add_library(velox_functions_remote_rest_client RestClient.cpp) | ||
velox_link_libraries(velox_functions_remote_rest_client Folly::folly | ||
${CURL_LIBRARIES}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
${CURL_LIBRARIES}) | |
CURL::libcurl) |
Always prefer targets over variables.
d031069
to
c0dd88d
Compare
CMakeLists.txt
Outdated
velox_set_source(curl) | ||
velox_resolve_dependency(curl) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. If you now try a build without testing you will see that curl is not installed.
But it should be enough to add FetchContent_MakeAvailable(curl)
to curl.cmake. This might interact weirdly with cpr trying it's own curl build but you'll just have to test that (I recommend make clean
before).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @assignUser for the review points and I have made the necessary changes.
I am bit confused where to add FetchContent_MakeAvailable(curl)
. Can you please suggest the change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Joe-Abraham : @assignUser has suggested to add it to curl.cmake
a4a213f
to
8da88f3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Joe-Abraham : A quick round of comments. I have to look at the server/ files in more detail still.
#include <folly/init/Init.h> | ||
#include <gmock/gmock.h> | ||
#include <gtest/gtest.h> | ||
#include <cstdio> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This header should move to the top as it is a standard C library.
CMakeLists.txt
Outdated
velox_set_source(curl) | ||
velox_resolve_dependency(curl) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Joe-Abraham : @assignUser has suggested to add it to curl.cmake
class RemoteFunction : public exec::VectorFunction { | ||
public: | ||
RemoteFunction( | ||
const std::string& functionName, | ||
const std::vector<exec::VectorFunctionArg>& inputArgs, | ||
const RemoteVectorFunctionMetadata& metadata) | ||
const RemoteVectorFunctionMetadata& metadata, | ||
std::unique_ptr<HttpClient> httpClient = nullptr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we prefer to pass an HttpClient here ? For thrift, a ThriftClient is created per RemoteFunction. Might be better to do so for HttpClient as well. You have an eventBase_ to work with as well.
serde_(getSerde(serdeFormat_)) { | ||
restClient_(httpClient ? std::move(httpClient) : getRestClient()), | ||
metadata_(metadata) { | ||
if (metadata.location.type() == typeid(SocketAddress)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can have a member variable of enum for RemoteType = {REST, HTTP}, and set it in the constructor and use it in the apply instead of doing the type check each time.
std::unique_ptr<RemoteFunctionClient> thriftClient_; | ||
remote::PageFormat serdeFormat_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you remove the serdeFormat_ and serde_ member variables ? It was better to construct them once in the constructor and use them in the function apply code, instead of call getSerde each time.
IOBufQueue inputBufQueue(IOBufQueue::cacheChainLength()); | ||
inputBufQueue.append(std::move(requestPayload)); | ||
|
||
CURL* curl = curl_easy_init(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this initialization need to be done each time invokeFunction is called ? Can these be member variables initialized in the constructor ?
} | ||
} | ||
|
||
VELOX_INSTANTIATE_TEST_SUITE_P( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be better to enhance RemoteFunctionTest fixtures to do both Rest and Thrift Server testing. You could use Rest/Thrift as a parameterization to TEST_SUITE.
// Always registers all Presto functions and make them available under a | ||
// certain prefix/namespace. | ||
LOG(INFO) << "Registering Presto functions"; | ||
functions::prestosql::registerAllScalarFunctions(FLAGS_function_prefix); | ||
|
||
std::remove(FLAGS_uds_path.c_str()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't understand this change.
@@ -18,6 +18,7 @@ | |||
#include <gflags/gflags.h> | |||
#include <glog/logging.h> | |||
#include <thrift/lib/cpp2/server/ThriftServer.h> | |||
#include "velox/functions/prestosql/StringFunctions.h" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be good to separate the changes in this file into a separate PR. Though the only change needed here really is the memory::initializeMemoryManager({}); line.
I'll send out a PR.
Co-authored-by: Wills Feng <[email protected]>
set(cpr_SOURCE BUNDLED) | ||
velox_resolve_dependency(cpr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixes - #11036