Skip to content

Commit

Permalink
Recent changrs
Browse files Browse the repository at this point in the history
  • Loading branch information
Joe-Abraham committed Oct 24, 2024
1 parent 98ba3be commit 66dca87
Show file tree
Hide file tree
Showing 4 changed files with 289 additions and 205 deletions.
21 changes: 13 additions & 8 deletions velox/functions/remote/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,22 @@ velox_link_libraries(velox_functions_remote_thrift_client

velox_add_library(velox_functions_remote_rest_client RestClient.cpp)
velox_link_libraries(velox_functions_remote_rest_client ${PROXYGEN_LIBRARIES}
Folly::folly)
velox_exec Folly::folly)

velox_add_library(velox_functions_remote Remote.cpp)
velox_link_libraries(
velox_functions_remote
PUBLIC velox_expression
velox_functions_remote_thrift_client
velox_functions_remote_rest_client
velox_functions_remote_get_serde
velox_type_fbhive
Folly::folly)
velox_functions_remote
PUBLIC velox_functions_remote_rest_client
velox_expression
velox_memory
velox_exec
velox_vector
velox_presto_serializer
velox_functions_remote_thrift_client
velox_functions_remote_get_serde
velox_type_fbhive
Folly::folly
)

if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
Expand Down
143 changes: 74 additions & 69 deletions velox/functions/remote/client/Remote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,23 @@
#include "velox/functions/remote/client/Remote.h"

#include <folly/io/async/EventBase.h>
#include "velox/common/memory/ByteStream.h"
// #include "velox/common/memory/StreamArena.h"
#include "velox/exec/ExchangeQueue.h"
#include "velox/expression/Expr.h"
#include "velox/expression/VectorFunction.h"
#include "velox/functions/remote/client/RestClient.h"
#include "velox/functions/remote/client/ThriftClient.h"
#include "velox/functions/remote/if/GetSerde.h"
#include "velox/functions/remote/if/gen-cpp2/RemoteFunctionServiceAsyncClient.h"
#include "velox/serializers/PrestoSerializer.h"
#include "velox/type/fbhive/HiveTypeSerializer.h"
// #include "velox/vector/ComplexVector.h"
#include "velox/vector/FlatVector.h"
#include "velox/vector/VectorStream.h"

using namespace folly;
using namespace proxygen;
namespace facebook::velox::functions {
namespace {

Expand All @@ -34,17 +42,6 @@ std::string serializeType(const TypePtr& type) {
return type::fbhive::HiveTypeSerializer::serialize(type);
}

std::string iobufToString(const folly::IOBuf& buf) {
std::string result;
result.reserve(buf.computeChainDataLength());

for (auto range : buf) {
result.append(reinterpret_cast<const char*>(range.data()), range.size());
}

return result;
}

class RemoteFunction : public exec::VectorFunction {
public:
RemoteFunction(
Expand Down Expand Up @@ -116,43 +113,7 @@ class RemoteFunction : public exec::VectorFunction {
exec::EvalCtx& context,
VectorPtr& result) const {
try {
std::string responseBody;

// 1. Create a RowVector for the remote function call
auto remoteRowVector = std::make_shared<RowVector>(
context.pool(),
remoteInputType_,
BufferPtr{},
rows.end(),
args);

// 2. Serialize the input RowVector
auto serde = getSerde(metadata_.serdeFormat).get();
auto serializedInput =
rowVectorToIOBuf(remoteRowVector, rows.end(), *context.pool(), serde);

// Convert serialized input to a string (e.g., base64 encoding if binary)
std::string serializedInputStr =
serializedInput->moveToFbString().toStdString();

// 3. Build the JSON request with function and input details
folly::dynamic remoteFunctionHandle = folly::dynamic::object;
remoteFunctionHandle["functionName"] = functionName_;
remoteFunctionHandle["returnType"] = serializeType(outputType);
remoteFunctionHandle["argumentTypes"] = serializedInputTypes_;

folly::dynamic inputs = folly::dynamic::object;
inputs["pageFormat"] = static_cast<int>(metadata_.serdeFormat);
inputs["payload"] = serializedInputStr;
inputs["rowCount"] = remoteRowVector->size();

// 4. Create the final JSON object to be sent
folly::dynamic jsonObject = folly::dynamic::object;
jsonObject["remoteFunctionHandle"] = remoteFunctionHandle;
jsonObject["inputs"] = inputs;
jsonObject["throwOnError"] = context.throwOnError();

// 5. Construct the full URL for the REST request
// Prepare the full URL
std::string functionId =
metadata_.functionId.value_or("default_function_id");
std::string encodedFunctionId = urlEncode(functionId);
Expand All @@ -165,37 +126,81 @@ class RemoteFunction : public exec::VectorFunction {
encodedFunctionId,
metadata_.version.value_or("default_version"));

// 6. Send the request via RestClient
RestClient restClient(fullUrl);
restClient.invoke_function(folly::toJson(jsonObject), responseBody);
// Prepare headers
std::unordered_map<std::string, std::string> headers;
headers["Content-Type"] = "application/octet-stream";
headers["Accept"] = "application/octet-stream";

// Create the RowVector from input arguments
auto remoteRowVector = std::make_shared<RowVector>(
context.pool(), remoteInputType_, BufferPtr{}, rows.end(), args);

// Create PrestoVectorSerde instance
serializer::presto::PrestoVectorSerde serde;

// Create options for serialization if needed
serializer::presto::PrestoVectorSerde::PrestoOptions options;

// 7. Parse the JSON response
auto responseJsonObj = parseJson(responseBody);
if (responseJsonObj.count("err") > 0) {
VELOX_FAIL(responseJsonObj["err"].asString());
// Use OStreamOutputStream for serialization
std::ostringstream out;
serializer::presto::PrestoOutputStreamListener listener;
OStreamOutputStream output(&out, &listener);

// Obtain a BatchVectorSerializer
auto batchSerializer =
serde.createBatchSerializer(context.pool(), &options);

// Serialize the vector
batchSerializer->serialize(remoteRowVector, &output);

// Get the serialized data as a string
std::string serializedData = out.str();

// Convert the serialized data into an IOBuf
auto payloadIOBuf = IOBuf::copyBuffer(
serializedData.data(), serializedData.size());

// Create a SerializedPage from the IOBuf
exec::SerializedPage requestPage(std::move(payloadIOBuf));

// Invoke the REST function with the SerializedPage
RestClient restClient(fullUrl, headers);

// Send the SerializedPage and receive the response as a SerializedPage
auto [statusCode, responsePage] = restClient.invoke_function(requestPage);

// Handle HTTP response status
if (statusCode != 200) {
VELOX_FAIL(
"Error while executing remote function '{}': HTTP status code {}",
functionName_,
statusCode);
}

// 8. Deserialize the result payload
std::string resultPayloadStr =
responseJsonObj["result"]["payload"].asString();
// Deserialize the response SerializedPage back into a RowVector
auto inputByteRanges =
byteRangesFromIOBuf(responsePage->getIOBuf().get());
BufferInputStream inputStream(std::move(inputByteRanges));

// Convert the payload string back to IOBuf (e.g., if base64 encoded)
auto payloadIobuf = folly::IOBuf::copyBuffer(resultPayloadStr);
// Prepare the output RowVectorPtr
RowVectorPtr outputRowVector;

auto outputRowVector = IOBufToRowVector(
*payloadIobuf,
ROW({outputType}),
*context.pool(),
serde);
// Deserialize using PrestoVectorSerde
serde.deserialize(
&inputStream,
context.pool(),
remoteInputType_,
&outputRowVector,
nullptr);

// Extract the result column
result = outputRowVector->childAt(0);

} catch (const std::exception& e) {
// Log and throw an error if the remote call fails
VELOX_FAIL(
"Error while executing remote function '{}' at '{}': {}",
"Error while executing remote function '{}': {}",
functionName_,
url_.getUrl(),
e.what());
}
}
Expand Down Expand Up @@ -278,9 +283,9 @@ class RemoteFunction : public exec::VectorFunction {

const std::string functionName_;

folly::EventBase eventBase_;
EventBase eventBase_;
std::unique_ptr<RemoteFunctionClient> thriftClient_;
folly::SocketAddress location_;
SocketAddress location_;

proxygen::URL url_;

Expand Down
Loading

0 comments on commit 66dca87

Please sign in to comment.