Skip to content

Commit

Permalink
Add support for REST based remote functions
Browse files Browse the repository at this point in the history
  • Loading branch information
wills-feng authored and Joe-Abraham committed Sep 9, 2024
1 parent 24dd2e9 commit 0cd4510
Show file tree
Hide file tree
Showing 17 changed files with 972 additions and 34 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ projects/*
!projects/*.*
!projects/Makefile
.venv
deps-install
deps-download

#==============================================================================#
# Autotools artifacts
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ option(VELOX_ENABLE_ABFS "Build Abfs Connector" OFF)
option(VELOX_ENABLE_HDFS "Build Hdfs Connector" OFF)
option(VELOX_ENABLE_PARQUET "Enable Parquet support" OFF)
option(VELOX_ENABLE_ARROW "Enable Arrow support" OFF)
option(VELOX_ENABLE_REMOTE_FUNCTIONS "Enable remote function support" OFF)
option(VELOX_ENABLE_REMOTE_FUNCTIONS "Enable remote function support" ON)
option(VELOX_ENABLE_CCACHE "Use ccache if installed." ON)

option(VELOX_BUILD_TEST_UTILS "Builds Velox test utilities" OFF)
Expand Down
30 changes: 20 additions & 10 deletions scripts/setup-macos.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@ PYTHON_VENV=${PYHTON_VENV:-"${SCRIPTDIR}/../.venv"}
NPROC=$(getconf _NPROCESSORS_ONLN)

DEPENDENCY_DIR=${DEPENDENCY_DIR:-$(pwd)}
MACOS_VELOX_DEPS="bison boost double-conversion flex gflags glog googletest icu4c libevent libsodium lz4 lzo openssl protobuf@21 simdjson snappy thrift xz xsimd zstd"
MACOS_VELOX_DEPS="bison boost double-conversion flex gflags glog googletest icu4c libevent libsodium lz4 lzo openssl protobuf@21 simdjson snappy thrift xz zstd"
MACOS_BUILD_DEPS="ninja cmake"
FB_OS_VERSION="v2024.05.20.00"
FMT_VERSION="10.1.1"
XSIMD_VERSION="10.0.0"

function update_brew {
DEFAULT_BREW_PATH=/usr/local/bin/brew
if [ `arch` == "arm64" ] ;
then
DEFAULT_BREW_PATH=$(which brew) ;
if [ "$(arch)" == "arm64" ]; then
DEFAULT_BREW_PATH=$(which brew)
fi
BREW_PATH=${BREW_PATH:-$DEFAULT_BREW_PATH}
$BREW_PATH update --auto-update --verbose
Expand All @@ -53,17 +53,16 @@ function update_brew {

function install_from_brew {
pkg=$1
if [[ "${pkg}" =~ ^([0-9a-z-]*):([0-9](\.[0-9\])*)$ ]];
then
if [[ "${pkg}" =~ ^([0-9a-z-]*):([0-9](\.[0-9\])*)$ ]]; then
pkg=${BASH_REMATCH[1]}
ver=${BASH_REMATCH[2]}
echo "Installing '${pkg}' at '${ver}'"
tap="velox/local-${pkg}"
brew tap-new "${tap}"
brew extract "--version=${ver}" "${pkg}" "${tap}"
brew install "${tap}/${pkg}@${ver}" || ( echo "Failed to install ${tap}/${pkg}@${ver}" ; exit 1 )
brew install "${tap}/${pkg}@${ver}" || { echo "Failed to install ${tap}/${pkg}@${ver}"; exit 1; }
else
( brew install --formula "${pkg}" && echo "Installation of ${pkg} is successful" || brew upgrade --formula "$pkg" ) || ( echo "Failed to install ${pkg}" ; exit 1 )
(brew install --formula "${pkg}" && echo "Installation of ${pkg} is successful") || brew upgrade --formula "${pkg}" || { echo "Failed to install ${pkg}"; exit 1; }
fi
}

Expand All @@ -82,9 +81,13 @@ function install_build_prerequisites {
mv ccache-4.10.2-darwin/ccache /usr/local/bin/
}

function install_xsimd {
wget_and_untar https://github.com/xtensor-stack/xsimd/archive/refs/tags/${XSIMD_VERSION}.tar.gz xsimd
cmake_install xsimd
}

function install_velox_deps_from_brew {
for pkg in ${MACOS_VELOX_DEPS}
do
for pkg in ${MACOS_VELOX_DEPS}; do
install_from_brew ${pkg}
done
}
Expand All @@ -94,6 +97,11 @@ function install_fmt {
cmake_install fmt -DFMT_TEST=OFF
}

function install_proxygen {
wget_and_untar https://github.com/facebook/proxygen/archive/refs/tags/${FB_OS_VERSION}.tar.gz proxygen
cmake_install proxygen -DBUILD_TESTS=OFF
}

function install_folly {
wget_and_untar https://github.com/facebook/folly/archive/refs/tags/${FB_OS_VERSION}.tar.gz folly
cmake_install folly -DBUILD_TESTS=OFF -DFOLLY_HAVE_INT128_T=ON
Expand Down Expand Up @@ -145,6 +153,8 @@ function install_velox_deps {
run_and_time install_wangle
run_and_time install_mvfst
run_and_time install_fbthrift
run_and_time install_xsimd
run_and_time install_proxygen
}

(return 2> /dev/null) && return # If script was sourced, don't run commands.
Expand Down
7 changes: 3 additions & 4 deletions velox/common/config/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

if (${VELOX_BUILD_TESTING})
if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
endif ()
endif()

velox_add_library(velox_common_config Config.cpp)
velox_link_libraries(
velox_common_config
PUBLIC velox_common_base
velox_exception
PUBLIC velox_common_base velox_exception
PRIVATE re2::re2)
21 changes: 21 additions & 0 deletions velox/functions/remote/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,27 @@
# See the License for the specific language governing permissions and
# limitations under the License.

if(NOT DEFINED PROXYGEN_LIBRARIES)
find_package(Sodium REQUIRED)

find_library(PROXYGEN proxygen)
find_library(PROXYGEN_HTTP_SERVER proxygenhttpserver)
find_library(FIZZ fizz)
find_library(WANGLE wangle)

if(NOT PROXYGEN
OR NOT PROXYGEN_HTTP_SERVER
OR NOT FIZZ
OR NOT WANGLE)
message(
FATAL_ERROR
"One or more proxygen libraries were not found. Please ensure proxygen, proxygenhttpserver, fizz, and wangle are installed."
)
endif()

set(PROXYGEN_LIBRARIES ${PROXYGEN_HTTP_SERVER} ${PROXYGEN} ${WANGLE} ${FIZZ})
endif()

add_subdirectory(if)
add_subdirectory(client)
add_subdirectory(server)
5 changes: 5 additions & 0 deletions velox/functions/remote/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@ velox_add_library(velox_functions_remote_thrift_client ThriftClient.cpp)
velox_link_libraries(velox_functions_remote_thrift_client
PUBLIC remote_function_thrift FBThrift::thriftcpp2)

velox_add_library(velox_functions_remote_rest_client RestClient.cpp)
velox_link_libraries(velox_functions_remote_rest_client ${PROXYGEN_LIBRARIES}
Folly::folly)

velox_add_library(velox_functions_remote Remote.cpp)
velox_link_libraries(
velox_functions_remote
PUBLIC velox_expression
velox_functions_remote_thrift_client
velox_functions_remote_rest_client
velox_functions_remote_get_serde
velox_type_fbhive
Folly::folly)
Expand Down
130 changes: 118 additions & 12 deletions velox/functions/remote/client/Remote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <folly/io/async/EventBase.h>
#include "velox/expression/Expr.h"
#include "velox/expression/VectorFunction.h"
#include "velox/functions/remote/client/RestClient.h"
#include "velox/functions/remote/client/ThriftClient.h"
#include "velox/functions/remote/if/GetSerde.h"
#include "velox/functions/remote/if/gen-cpp2/RemoteFunctionServiceAsyncClient.h"
Expand All @@ -33,17 +34,31 @@ std::string serializeType(const TypePtr& type) {
return type::fbhive::HiveTypeSerializer::serialize(type);
}

std::string iobufToString(const folly::IOBuf& buf) {
std::string result;
result.reserve(buf.computeChainDataLength());

for (auto range : buf) {
result.append(reinterpret_cast<const char*>(range.data()), range.size());
}

return result;
}

class RemoteFunction : public exec::VectorFunction {
public:
RemoteFunction(
const std::string& functionName,
const std::vector<exec::VectorFunctionArg>& inputArgs,
const RemoteVectorFunctionMetadata& metadata)
: functionName_(functionName),
location_(metadata.location),
thriftClient_(getThriftClient(location_, &eventBase_)),
serdeFormat_(metadata.serdeFormat),
serde_(getSerde(serdeFormat_)) {
: functionName_(functionName), metadata_(metadata) {
if (metadata.location.type() == typeid(SocketAddress)) {
location_ = boost::get<SocketAddress>(metadata.location);
thriftClient_ = getThriftClient(location_, &eventBase_);
} else if (metadata.location.type() == typeid(URL)) {
url_ = boost::get<URL>(metadata.location);
}

std::vector<TypePtr> types;
types.reserve(inputArgs.size());
serializedInputTypes_.reserve(inputArgs.size());
Expand All @@ -62,7 +77,11 @@ class RemoteFunction : public exec::VectorFunction {
exec::EvalCtx& context,
VectorPtr& result) const override {
try {
applyRemote(rows, args, outputType, context, result);
if ((metadata_.location.type() == typeid(SocketAddress))) {
applyRemote(rows, args, outputType, context, result);
} else if (metadata_.location.type() == typeid(URL)) {
applyRestRemote(rows, args, outputType, context, result);
}
} catch (const VeloxRuntimeError&) {
throw;
} catch (const std::exception&) {
Expand All @@ -71,6 +90,88 @@ class RemoteFunction : public exec::VectorFunction {
}

private:
void applyRestRemote(
const SelectivityVector& rows,
std::vector<VectorPtr>& args,
const TypePtr& outputType,
exec::EvalCtx& context,
VectorPtr& result) const {
try {
std::string responseBody;

// Create a RowVector for the remote function call
auto remoteRowVector = std::make_shared<RowVector>(
context.pool(),
remoteInputType_,
BufferPtr{},
rows.end(),
std::move(args));

// Build the JSON request with function and input details
folly::dynamic remoteFunctionHandle = folly::dynamic::object;
remoteFunctionHandle["functionName"] = functionName_;
remoteFunctionHandle["returnType"] = serializeType(outputType);
remoteFunctionHandle["argumentTypes"] = folly::dynamic::array;
for (const auto& value : serializedInputTypes_) {
remoteFunctionHandle["argumentTypes"].push_back(value);
}

folly::dynamic inputs = folly::dynamic::object;
inputs["pageFormat"] = static_cast<int>(metadata_.serdeFormat);
inputs["payload"] = iobufToString(rowVectorToIOBuf(
remoteRowVector,
rows.end(),
*context.pool(),
getSerde(metadata_.serdeFormat).get()));
inputs["rowCount"] = remoteRowVector->size();

// Create the final JSON object to be sent
folly::dynamic jsonObject = folly::dynamic::object;
jsonObject["remoteFunctionHandle"] = remoteFunctionHandle;
jsonObject["inputs"] = inputs;
jsonObject["throwOnError"] = context.throwOnError();

// Construct the full URL for the REST request
std::string fullUrl = fmt::format(
"{}/v1/functions/{}/{}/{}/{}",
url_.getUrl(),
metadata_.schema.value_or("default_schema"),
functionName_,
metadata_.functionId.value_or("default_function_id"),
metadata_.version.value_or("default_version"));

// Invoke the remote function using RestClient
RestClient restClient_(fullUrl);
restClient_.invoke_function(folly::toJson(jsonObject), responseBody);
LOG(INFO) << responseBody;

// Parse the JSON response
auto responseJsonObj = parseJson(responseBody);
if (responseJsonObj.count("err") > 0) {
VELOX_NYI(responseJsonObj["err"].asString());
}

// Deserialize the result payload
auto payloadIObuf = folly::IOBuf::copyBuffer(
responseJsonObj["result"]["payload"].asString());

auto outputRowVector = IOBufToRowVector(
*payloadIObuf,
ROW({outputType}),
*context.pool(),
getSerde(metadata_.serdeFormat).get());
result = outputRowVector->childAt(0);

} catch (const std::exception& e) {
// Log and throw an error if the remote call fails
VELOX_FAIL(
"Error while executing remote function '{}' at '{}': {}",
functionName_,
url_.getUrl(),
e.what());
}
}

void applyRemote(
const SelectivityVector& rows,
std::vector<VectorPtr>& args,
Expand All @@ -97,11 +198,14 @@ class RemoteFunction : public exec::VectorFunction {

auto requestInputs = request.inputs_ref();
requestInputs->rowCount_ref() = remoteRowVector->size();
requestInputs->pageFormat_ref() = serdeFormat_;
requestInputs->pageFormat_ref() = metadata_.serdeFormat;

// TODO: serialize only active rows.
requestInputs->payload_ref() = rowVectorToIOBuf(
remoteRowVector, rows.end(), *context.pool(), serde_.get());
remoteRowVector,
rows.end(),
*context.pool(),
getSerde(metadata_.serdeFormat).get());

try {
thriftClient_->sync_invokeFunction(remoteResponse, request);
Expand All @@ -117,21 +221,23 @@ class RemoteFunction : public exec::VectorFunction {
remoteResponse.get_result().get_payload(),
ROW({outputType}),
*context.pool(),
serde_.get());
getSerde(metadata_.serdeFormat).get());
result = outputRowVector->childAt(0);
}

const std::string functionName_;
folly::SocketAddress location_;

folly::EventBase eventBase_;
std::unique_ptr<RemoteFunctionClient> thriftClient_;
remote::PageFormat serdeFormat_;
std::unique_ptr<VectorSerde> serde_;
folly::SocketAddress location_;

proxygen::URL url_;

// Structures we construct once to cache:
RowTypePtr remoteInputType_;
std::vector<std::string> serializedInputTypes_;

const RemoteVectorFunctionMetadata metadata_;
};

std::shared_ptr<exec::VectorFunction> createRemoteFunction(
Expand Down
Loading

0 comments on commit 0cd4510

Please sign in to comment.