diff --git a/velox/functions/remote/client/CMakeLists.txt b/velox/functions/remote/client/CMakeLists.txt index 4fe8172d81d04..46797076b138d 100644 --- a/velox/functions/remote/client/CMakeLists.txt +++ b/velox/functions/remote/client/CMakeLists.txt @@ -18,17 +18,22 @@ velox_link_libraries(velox_functions_remote_thrift_client velox_add_library(velox_functions_remote_rest_client RestClient.cpp) velox_link_libraries(velox_functions_remote_rest_client ${PROXYGEN_LIBRARIES} - Folly::folly) + velox_exec Folly::folly) velox_add_library(velox_functions_remote Remote.cpp) velox_link_libraries( - velox_functions_remote - PUBLIC velox_expression - velox_functions_remote_thrift_client - velox_functions_remote_rest_client - velox_functions_remote_get_serde - velox_type_fbhive - Folly::folly) + velox_functions_remote + PUBLIC velox_functions_remote_rest_client + velox_expression + velox_memory + velox_exec + velox_vector + velox_presto_serializer + velox_functions_remote_thrift_client + velox_functions_remote_get_serde + velox_type_fbhive + Folly::folly +) if(${VELOX_BUILD_TESTING}) add_subdirectory(tests) diff --git a/velox/functions/remote/client/Remote.cpp b/velox/functions/remote/client/Remote.cpp index c6818e970949a..d5d09252199f0 100644 --- a/velox/functions/remote/client/Remote.cpp +++ b/velox/functions/remote/client/Remote.cpp @@ -17,15 +17,23 @@ #include "velox/functions/remote/client/Remote.h" #include +#include "velox/common/memory/ByteStream.h" +// #include "velox/common/memory/StreamArena.h" +#include "velox/exec/ExchangeQueue.h" #include "velox/expression/Expr.h" #include "velox/expression/VectorFunction.h" #include "velox/functions/remote/client/RestClient.h" #include "velox/functions/remote/client/ThriftClient.h" #include "velox/functions/remote/if/GetSerde.h" #include "velox/functions/remote/if/gen-cpp2/RemoteFunctionServiceAsyncClient.h" +#include "velox/serializers/PrestoSerializer.h" #include "velox/type/fbhive/HiveTypeSerializer.h" +// #include "velox/vector/ComplexVector.h" +#include "velox/vector/FlatVector.h" #include "velox/vector/VectorStream.h" +using namespace folly; +using namespace proxygen; namespace facebook::velox::functions { namespace { @@ -34,17 +42,6 @@ std::string serializeType(const TypePtr& type) { return type::fbhive::HiveTypeSerializer::serialize(type); } -std::string iobufToString(const folly::IOBuf& buf) { - std::string result; - result.reserve(buf.computeChainDataLength()); - - for (auto range : buf) { - result.append(reinterpret_cast(range.data()), range.size()); - } - - return result; -} - class RemoteFunction : public exec::VectorFunction { public: RemoteFunction( @@ -116,43 +113,7 @@ class RemoteFunction : public exec::VectorFunction { exec::EvalCtx& context, VectorPtr& result) const { try { - std::string responseBody; - - // 1. Create a RowVector for the remote function call - auto remoteRowVector = std::make_shared( - context.pool(), - remoteInputType_, - BufferPtr{}, - rows.end(), - args); - - // 2. Serialize the input RowVector - auto serde = getSerde(metadata_.serdeFormat).get(); - auto serializedInput = - rowVectorToIOBuf(remoteRowVector, rows.end(), *context.pool(), serde); - - // Convert serialized input to a string (e.g., base64 encoding if binary) - std::string serializedInputStr = - serializedInput->moveToFbString().toStdString(); - - // 3. Build the JSON request with function and input details - folly::dynamic remoteFunctionHandle = folly::dynamic::object; - remoteFunctionHandle["functionName"] = functionName_; - remoteFunctionHandle["returnType"] = serializeType(outputType); - remoteFunctionHandle["argumentTypes"] = serializedInputTypes_; - - folly::dynamic inputs = folly::dynamic::object; - inputs["pageFormat"] = static_cast(metadata_.serdeFormat); - inputs["payload"] = serializedInputStr; - inputs["rowCount"] = remoteRowVector->size(); - - // 4. Create the final JSON object to be sent - folly::dynamic jsonObject = folly::dynamic::object; - jsonObject["remoteFunctionHandle"] = remoteFunctionHandle; - jsonObject["inputs"] = inputs; - jsonObject["throwOnError"] = context.throwOnError(); - - // 5. Construct the full URL for the REST request + // Prepare the full URL std::string functionId = metadata_.functionId.value_or("default_function_id"); std::string encodedFunctionId = urlEncode(functionId); @@ -165,37 +126,81 @@ class RemoteFunction : public exec::VectorFunction { encodedFunctionId, metadata_.version.value_or("default_version")); - // 6. Send the request via RestClient - RestClient restClient(fullUrl); - restClient.invoke_function(folly::toJson(jsonObject), responseBody); + // Prepare headers + std::unordered_map headers; + headers["Content-Type"] = "application/octet-stream"; + headers["Accept"] = "application/octet-stream"; + + // Create the RowVector from input arguments + auto remoteRowVector = std::make_shared( + context.pool(), remoteInputType_, BufferPtr{}, rows.end(), args); + + // Create PrestoVectorSerde instance + serializer::presto::PrestoVectorSerde serde; + + // Create options for serialization if needed + serializer::presto::PrestoVectorSerde::PrestoOptions options; - // 7. Parse the JSON response - auto responseJsonObj = parseJson(responseBody); - if (responseJsonObj.count("err") > 0) { - VELOX_FAIL(responseJsonObj["err"].asString()); + // Use OStreamOutputStream for serialization + std::ostringstream out; + serializer::presto::PrestoOutputStreamListener listener; + OStreamOutputStream output(&out, &listener); + + // Obtain a BatchVectorSerializer + auto batchSerializer = + serde.createBatchSerializer(context.pool(), &options); + + // Serialize the vector + batchSerializer->serialize(remoteRowVector, &output); + + // Get the serialized data as a string + std::string serializedData = out.str(); + + // Convert the serialized data into an IOBuf + auto payloadIOBuf = IOBuf::copyBuffer( + serializedData.data(), serializedData.size()); + + // Create a SerializedPage from the IOBuf + exec::SerializedPage requestPage(std::move(payloadIOBuf)); + + // Invoke the REST function with the SerializedPage + RestClient restClient(fullUrl, headers); + + // Send the SerializedPage and receive the response as a SerializedPage + auto [statusCode, responsePage] = restClient.invoke_function(requestPage); + + // Handle HTTP response status + if (statusCode != 200) { + VELOX_FAIL( + "Error while executing remote function '{}': HTTP status code {}", + functionName_, + statusCode); } - // 8. Deserialize the result payload - std::string resultPayloadStr = - responseJsonObj["result"]["payload"].asString(); + // Deserialize the response SerializedPage back into a RowVector + auto inputByteRanges = + byteRangesFromIOBuf(responsePage->getIOBuf().get()); + BufferInputStream inputStream(std::move(inputByteRanges)); - // Convert the payload string back to IOBuf (e.g., if base64 encoded) - auto payloadIobuf = folly::IOBuf::copyBuffer(resultPayloadStr); + // Prepare the output RowVectorPtr + RowVectorPtr outputRowVector; - auto outputRowVector = IOBufToRowVector( - *payloadIobuf, - ROW({outputType}), - *context.pool(), - serde); + // Deserialize using PrestoVectorSerde + serde.deserialize( + &inputStream, + context.pool(), + remoteInputType_, + &outputRowVector, + nullptr); + // Extract the result column result = outputRowVector->childAt(0); } catch (const std::exception& e) { // Log and throw an error if the remote call fails VELOX_FAIL( - "Error while executing remote function '{}' at '{}': {}", + "Error while executing remote function '{}': {}", functionName_, - url_.getUrl(), e.what()); } } @@ -278,9 +283,9 @@ class RemoteFunction : public exec::VectorFunction { const std::string functionName_; - folly::EventBase eventBase_; + EventBase eventBase_; std::unique_ptr thriftClient_; - folly::SocketAddress location_; + SocketAddress location_; proxygen::URL url_; diff --git a/velox/functions/remote/client/RestClient.cpp b/velox/functions/remote/client/RestClient.cpp index f21bb53485498..cdd2d0c377265 100644 --- a/velox/functions/remote/client/RestClient.cpp +++ b/velox/functions/remote/client/RestClient.cpp @@ -1,35 +1,158 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ #include "velox/functions/remote/client/RestClient.h" -#include - -using namespace facebook::velox::functions; +#include "velox/exec/ExchangeQueue.h" namespace facebook::velox::functions { -RestClient::RestClient(const std::string& url) : url_(url) { +// +// RestClient Implementation +// + +RestClient::RestClient( + const std::string& url, + const std::unordered_map& headers) + : url_(proxygen::URL(url)), headers_(headers) { httpClient_ = std::make_shared(url_); -}; - -void RestClient::invoke_function( - const std::string& requestBody, - std::string& responseBody) { - httpClient_->send(requestBody); - responseBody = httpClient_->getResponseBody(); - LOG(INFO) << responseBody; -}; +} + +std::pair> +RestClient::invoke_function(exec::SerializedPage& requestPage) { + httpClient_->setHeaders(headers_); + httpClient_->send(requestPage); + + // Retrieve the response page as a unique_ptr + auto responsePage = httpClient_->getResponsePage(); + + int statusCode = httpClient_->getResponseCode(); + + return {statusCode, std::move(responsePage)}; +} + +// +// HttpClient Implementation +// + +HttpClient::HttpClient(const proxygen::URL& url) + : url_(url), responseCode_(0) {} + +void HttpClient::setHeaders( + const std::unordered_map& headers) { + headers_ = headers; +} + +void HttpClient::send(const exec::SerializedPage& serializedPage) { + // Get the IOBuf from SerializedPage + requestBodyIOBuf_ = serializedPage.getIOBuf(); + + responseBodyIOBuf_.reset(); + responseCode_ = 0; + + // Reset connector and session for resending the request + connector_.reset(); + session_.reset(); + + // Create a new connector for the request + connector_ = std::make_unique( + this, proxygen::WheelTimerInstance(std::chrono::milliseconds(1000))); + + // Initiate connection + connector_->connect( + &evb_, + folly::SocketAddress(url_.getHost(), url_.getPort(), true), + std::chrono::milliseconds(10000)); + + // Run the event loop until we explicitly terminate it + evb_.loopForever(); +} + +std::unique_ptr HttpClient::getResponsePage() { + if (responseBodyIOBuf_) { + // Construct SerializedPage using the response IOBuf + return std::make_unique( + std::move(responseBodyIOBuf_)); + } else { + // Return nullptr or handle error + return nullptr; + } +} + +int HttpClient::getResponseCode() const { + return responseCode_; +} + +// HTTPConnector::Callback methods +void HttpClient::connectSuccess( + proxygen::HTTPUpstreamSession* session) noexcept { + session_ = std::shared_ptr( + session, [](proxygen::HTTPUpstreamSession* /*s*/) { + // No-op deleter, session is managed by Proxygen + }); + sendRequest(); +} + +void HttpClient::connectError( + const folly::AsyncSocketException& ex) noexcept { + LOG(ERROR) << "Failed to connect: " << ex.what(); + evb_.terminateLoopSoon(); +} + +// HTTPTransactionHandler methods +void HttpClient::setTransaction( + proxygen::HTTPTransaction* txn) noexcept { + txn_ = txn; +} + +void HttpClient::detachTransaction() noexcept { + txn_ = nullptr; + session_.reset(); + evb_.terminateLoopSoon(); +} + +void HttpClient::onHeadersComplete( + std::unique_ptr msg) noexcept { + responseCode_ = msg->getStatusCode(); +} + +void HttpClient::onBody( + std::unique_ptr chain) noexcept { + if (chain) { + if (responseBodyIOBuf_) { + responseBodyIOBuf_->prependChain(std::move(chain)); + } else { + responseBodyIOBuf_ = std::move(chain); + } + } +} + +void HttpClient::onEOM() noexcept { + evb_.terminateLoopSoon(); +} + +void HttpClient::onError( + const proxygen::HTTPException& error) noexcept { + LOG(ERROR) << "HTTP Error: " << error.what(); + evb_.terminateLoopSoon(); +} + +void HttpClient::sendRequest() { + auto txn = session_->newTransaction(this); + if (!txn) { + LOG(ERROR) << "Failed to create new transaction"; + evb_.terminateLoopSoon(); + return; + } + + proxygen::HTTPMessage req; + req.setMethod(proxygen::HTTPMethod::POST); + req.setURL(url_.makeRelativeURL()); + + req.getHeaders().add("Host", url_.getHostAndPort()); + for (const auto& header : headers_) { + req.getHeaders().add(header.first, header.second); + } + + txn->sendHeaders(req); + txn->sendBody(std::move(requestBodyIOBuf_)); + txn->sendEOM(); +} } // namespace facebook::velox::functions diff --git a/velox/functions/remote/client/RestClient.h b/velox/functions/remote/client/RestClient.h index ee5ea46ad3237..aff2658fbe212 100644 --- a/velox/functions/remote/client/RestClient.h +++ b/velox/functions/remote/client/RestClient.h @@ -1,19 +1,3 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - #pragma once #include @@ -25,104 +9,71 @@ #include #include #include -#include "velox/functions/remote/client/RestClient.h" - -using namespace proxygen; -using namespace folly; +#include "velox/exec/ExchangeQueue.h" namespace facebook::velox::functions { -class HttpClient : public HTTPConnector::Callback, - public HTTPTransactionHandler { +class HttpClient : public proxygen::HTTPConnector::Callback, + public proxygen::HTTPTransactionHandler { public: - HttpClient(const URL& url) : url_(url) {} - - void send(std::string requestBody) { - requestBody_ = requestBody; - connector_ = std::make_unique( - this, WheelTimerInstance(std::chrono::milliseconds(1000))); - connector_->connect( - &evb_, - SocketAddress(url_.getHost(), url_.getPort(), true), - std::chrono::milliseconds(10000)); - evb_.loop(); - } - - std::string getResponseBody() { - return std::move(responseBody_); - } + explicit HttpClient(const proxygen::URL& url); + + void setHeaders(const std::unordered_map& headers); + + void send(const exec::SerializedPage& serializedPage); + + // Return a unique_ptr to SerializedPage to avoid copy/move + std::unique_ptr getResponsePage(); + + int getResponseCode() const; private: - URL url_; - EventBase evb_; - std::unique_ptr connector_; - std::shared_ptr session_; - std::string requestBody_; - std::string responseBody_; - - void connectSuccess(HTTPUpstreamSession* session) noexcept override { - session_ = std::shared_ptr( - session, [](HTTPUpstreamSession* s) { - // No-op deleter, managed by Proxygen - }); - sendRequest(); - } - - void connectError(const folly::AsyncSocketException& ex) noexcept override { - LOG(ERROR) << "Failed to connect: " << ex.what(); - evb_.terminateLoopSoon(); - } - - void sendRequest() { - auto txn = session_->newTransaction(this); - HTTPMessage req; - req.setMethod(HTTPMethod::POST); - req.setURL(url_.getUrl()); - req.getHeaders().add(HTTP_HEADER_CONTENT_TYPE, "application/json"); - req.getHeaders().add( - HTTP_HEADER_CONTENT_LENGTH, std::to_string(requestBody_.size())); - req.getHeaders().add(HTTP_HEADER_USER_AGENT, "Velox HTTPClient"); - - txn->sendHeaders(req); - txn->sendBody(folly::IOBuf::copyBuffer(requestBody_)); - txn->sendEOM(); - } - - void setTransaction(HTTPTransaction*) noexcept override {} - void detachTransaction() noexcept override { - session_.reset(); - evb_.terminateLoopSoon(); - } - - void onHeadersComplete(std::unique_ptr msg) noexcept override {} - - void onBody(std::unique_ptr chain) noexcept override { - if (chain) { - responseBody_.append( - reinterpret_cast(chain->data()), chain->length()); - } - } - - void onEOM() noexcept override { - session_->drain(); - } - - void onError(const HTTPException& error) noexcept override { - LOG(ERROR) << "Error: " << error.what(); - } - void onUpgrade(UpgradeProtocol) noexcept override {} - void onTrailers(std::unique_ptr) noexcept override {} + // HTTPConnector::Callback methods + void connectSuccess(proxygen::HTTPUpstreamSession* session) noexcept override; + void connectError(const folly::AsyncSocketException& ex) noexcept override; + + // HTTPTransactionHandler methods + void setTransaction(proxygen::HTTPTransaction* txn) noexcept override; + void detachTransaction() noexcept override; + void onHeadersComplete( + std::unique_ptr msg) noexcept override; + void onBody(std::unique_ptr chain) noexcept override; + void onEOM() noexcept override; + void onError(const proxygen::HTTPException& error) noexcept override; + void onUpgrade(proxygen::UpgradeProtocol) noexcept override {} void onEgressPaused() noexcept override {} void onEgressResumed() noexcept override {} + void onTrailers(std::unique_ptr) noexcept override {} + + void sendRequest(); + + proxygen::URL url_; + folly::EventBase evb_; + std::unique_ptr connector_; + std::shared_ptr session_; + std::unordered_map headers_; + int responseCode_{0}; + + // Store request and response bodies as IOBuf pointers + std::unique_ptr requestBodyIOBuf_; + std::unique_ptr responseBodyIOBuf_; + + // Transaction pointer + proxygen::HTTPTransaction* txn_{nullptr}; }; class RestClient { public: - RestClient(const std::string& url); - void invoke_function(const std::string& request, std::string& response); + RestClient( + const std::string& url, + const std::unordered_map& headers = {}); + + std::pair> invoke_function( + exec::SerializedPage& requestPage); private: - URL url_; + proxygen::URL url_; + std::unordered_map headers_; std::shared_ptr httpClient_; };