Skip to content

Commit

Permalink
x
Browse files Browse the repository at this point in the history
  • Loading branch information
Joe-Abraham committed Sep 6, 2024
1 parent 75ccdfe commit e597f34
Showing 1 changed file with 27 additions and 24 deletions.
51 changes: 27 additions & 24 deletions velox/functions/remote/client/Remote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,12 @@ class RemoteFunction : public exec::VectorFunction {
const std::string& functionName,
const std::vector<exec::VectorFunctionArg>& inputArgs,
const RemoteVectorFunctionMetadata& metadata)
: functionName_(functionName),
metadata_(metadata),
serde_(getSerde(metadata_.serdeFormat)) {
: functionName_(functionName), metadata_(metadata) {
if (metadata.location.type() == typeid(SocketAddress)) {
location_ = boost::get<SocketAddress>(metadata.location);
thriftClient_ = getThriftClient(location_, &eventBase_);
} else if (metadata.location.type() == typeid(URL)) {
url_ = boost::get<URL>(metadata.location);
restClient_ = std::make_unique<RestClient>(url_.getUrl());
}

std::vector<TypePtr> types;
Expand All @@ -80,9 +77,9 @@ class RemoteFunction : public exec::VectorFunction {
exec::EvalCtx& context,
VectorPtr& result) const override {
try {
if (thriftClient_) {
if ((metadata_.location.type() == typeid(SocketAddress))) {
applyRemote(rows, args, outputType, context, result);
} else if (restClient_) {
} else if (metadata_.location.type() == typeid(URL)) {
applyRestRemote(rows, args, outputType, context, result);
}
} catch (const VeloxRuntimeError&) {
Expand All @@ -101,14 +98,16 @@ class RemoteFunction : public exec::VectorFunction {
VectorPtr& result) const {
try {
std::string responseBody;

// Create a RowVector for the remote function call
auto remoteRowVector = std::make_shared<RowVector>(
context.pool(),
remoteInputType_,
BufferPtr{},
rows.end(),
std::move(args));

// Construct JSON request
// Build the JSON request with function and input details
folly::dynamic remoteFunctionHandle = folly::dynamic::object;
remoteFunctionHandle["functionName"] = functionName_;
remoteFunctionHandle["returnType"] = serializeType(outputType);
Expand All @@ -119,18 +118,20 @@ class RemoteFunction : public exec::VectorFunction {

folly::dynamic inputs = folly::dynamic::object;
inputs["pageFormat"] = static_cast<int>(metadata_.serdeFormat);
// Use existing serializer (PrestoPage or SparkUnsafeRow)
inputs["payload"] = iobufToString(rowVectorToIOBuf(
remoteRowVector, rows.end(), *context.pool(), serde_.get()));
remoteRowVector,
rows.end(),
*context.pool(),
getSerde(metadata_.serdeFormat).get()));
inputs["rowCount"] = remoteRowVector->size();

// Create the final JSON object to be sent
folly::dynamic jsonObject = folly::dynamic::object;
jsonObject["remoteFunctionHandle"] = remoteFunctionHandle;
jsonObject["inputs"] = inputs;
jsonObject["throwOnError"] = context.throwOnError();

// URL format -
// {endpoint}/v1/functions/{schema}/{functionName}/{functionId}/{version}
// Construct the full URL for the REST request
std::string fullUrl = fmt::format(
"{}/v1/functions/{}/{}/{}/{}",
url_.getUrl(),
Expand All @@ -139,28 +140,30 @@ class RemoteFunction : public exec::VectorFunction {
metadata_.functionId.value_or("default_function_id"),
metadata_.version.value_or("default_version"));

// Set the full URL on the REST client.
restClient_->setUrl(fullUrl);

// Call Rest client to send request
restClient_->invoke_function(folly::toJson(jsonObject), responseBody);
// Invoke the remote function using RestClient
RestClient restClient_(fullUrl);
restClient_.invoke_function(folly::toJson(jsonObject), responseBody);
LOG(INFO) << responseBody;

// Parse JSON response
// Parse the JSON response
auto responseJsonObj = parseJson(responseBody);
if (responseJsonObj.count("err") > 0) {
VELOX_NYI(responseJsonObj["err"].asString());
}

// Deserialize the result payload
auto payloadIObuf = folly::IOBuf::copyBuffer(
responseJsonObj["result"]["payload"].asString());

// Use existing deserializer (PrestoPage or SparkUnsafeRow)
auto outputRowVector = IOBufToRowVector(
*payloadIObuf, ROW({outputType}), *context.pool(), serde_.get());
*payloadIObuf,
ROW({outputType}),
*context.pool(),
getSerde(metadata_.serdeFormat).get());
result = outputRowVector->childAt(0);

} catch (const std::exception& e) {
// Log and throw an error if the remote call fails
VELOX_FAIL(
"Error while executing remote function '{}' at '{}': {}",
functionName_,
Expand Down Expand Up @@ -199,7 +202,10 @@ class RemoteFunction : public exec::VectorFunction {

// TODO: serialize only active rows.
requestInputs->payload_ref() = rowVectorToIOBuf(
remoteRowVector, rows.end(), *context.pool(), serde_.get());
remoteRowVector,
rows.end(),
*context.pool(),
getSerde(metadata_.serdeFormat).get());

try {
thriftClient_->sync_invokeFunction(remoteResponse, request);
Expand All @@ -215,7 +221,7 @@ class RemoteFunction : public exec::VectorFunction {
remoteResponse.get_result().get_payload(),
ROW({outputType}),
*context.pool(),
serde_.get());
getSerde(metadata_.serdeFormat).get());
result = outputRowVector->childAt(0);
}

Expand All @@ -225,11 +231,8 @@ class RemoteFunction : public exec::VectorFunction {
std::unique_ptr<RemoteFunctionClient> thriftClient_;
folly::SocketAddress location_;

std::unique_ptr<RestClient> restClient_;
proxygen::URL url_;

std::unique_ptr<VectorSerde> serde_;

// Structures we construct once to cache:
RowTypePtr remoteInputType_;
std::vector<std::string> serializedInputTypes_;
Expand Down

0 comments on commit e597f34

Please sign in to comment.