Skip to content

Commit

Permalink
Add support for $internal$_json_string_to_array/map/cast (facebookinc…
Browse files Browse the repository at this point in the history
…ubator#12159)

Summary:

We add support for $internal$_json_string_to_array/map/cast functions. These are used by the presto co-ordinator when we have expressions of the form 
cast( json as array()/map()/row()) . Consider for example cast(json_parse(x) as array(varchar)) ; Currently we will have to parse the json represented by x twice, once in json_parse and a second time in the cast . We currently spend significant CPU canonicalizing these jsons, and this is wasted if we end up casting the result. The presto plan actually tries to invoke $internal$_json_string_to_array/map/cast which is converted to a cast currently in prestissimo (https://github.com/prestodb/presto/blob/master/presto-native-execution/presto_cpp/main/types/PrestoToVeloxExpr.cpp#L233) . Once this is merged , I will submit a PR to prestissimo to fix this in Presto. 

Note: At the moment these functions do not behave correctly if the type casted to has a JSON , as that still should be canonicalized. I will be changing PrestoToVeloxExpr.cpp to ensure its only called for non json types.

Reviewed By: Yuhta

Differential Revision: D68582388
  • Loading branch information
Krishna Pai authored and facebook-github-bot committed Jan 30, 2025
1 parent 5f8e7f1 commit c484b83
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 76 deletions.
58 changes: 58 additions & 0 deletions velox/functions/prestosql/JsonFunctions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,49 @@ class JsonParseFunction : public exec::VectorFunction {
mutable std::vector<FastSortKey> fastSortKeys_;
};

// This function is called when $internal$json_string_to_array/map/row
// is called. It is used for expressions like 'Cast(json_parse(x) as
// ARRAY<...>)' etc. This is an optimization to avoid parsing the json string
// twice.
class JsonInternalCastFunction : public exec::VectorFunction {
public:
void apply(
const SelectivityVector& rows,
std::vector<VectorPtr>& args,
const TypePtr& resultType,
exec::EvalCtx& context,
VectorPtr& result) const override {
VELOX_CHECK_EQ(args.size(), 1);
const auto& arg = *args[0];
jsonCastOperator_.castFrom(arg, context, rows, resultType, result);
}

static std::vector<std::shared_ptr<exec::FunctionSignature>>
signaturesArray() {
return {exec::FunctionSignatureBuilder()
.argumentType("varchar")
.returnType("array(unknown)")
.build()};
}

static std::vector<std::shared_ptr<exec::FunctionSignature>> signaturesMap() {
return {exec::FunctionSignatureBuilder()
.argumentType("varchar")
.returnType("map(unknown, unknown)")
.build()};
}

static std::vector<std::shared_ptr<exec::FunctionSignature>> signaturesRow() {
return {exec::FunctionSignatureBuilder()
.argumentType("varchar")
.returnType("row(unknown)")
.build()};
}

private:
mutable JsonCastOperator jsonCastOperator_;
};

} // namespace

VELOX_DECLARE_VECTOR_FUNCTION(
Expand All @@ -490,4 +533,19 @@ VELOX_DECLARE_STATEFUL_VECTOR_FUNCTION(
return std::make_shared<JsonParseFunction>();
});

VELOX_DECLARE_VECTOR_FUNCTION(
udf_$internal$_json_string_to_array,
JsonInternalCastFunction::signaturesArray(),
std::make_unique<JsonInternalCastFunction>());

VELOX_DECLARE_VECTOR_FUNCTION(
udf_$internal$_json_string_to_map,
JsonInternalCastFunction::signaturesMap(),
std::make_unique<JsonInternalCastFunction>());

VELOX_DECLARE_VECTOR_FUNCTION(
udf_$internal$_json_string_to_row,
JsonInternalCastFunction::signaturesRow(),
std::make_unique<JsonInternalCastFunction>());

} // namespace facebook::velox::functions
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@ void registerJsonFunctions(const std::string& prefix) {
VELOX_REGISTER_VECTOR_FUNCTION(udf_json_format, prefix + "json_format");

VELOX_REGISTER_VECTOR_FUNCTION(udf_json_parse, prefix + "json_parse");

VELOX_REGISTER_VECTOR_FUNCTION(
udf_$internal$_json_string_to_array,
prefix + "$internal$json_string_to_array_cast");

VELOX_REGISTER_VECTOR_FUNCTION(
udf_$internal$_json_string_to_map,
prefix + "$internal$json_string_to_map_cast");

VELOX_REGISTER_VECTOR_FUNCTION(
udf_$internal$_json_string_to_row,
prefix + "$internal$json_string_to_row_cast");
}

} // namespace facebook::velox::functions
93 changes: 93 additions & 0 deletions velox/functions/prestosql/tests/JsonFunctionsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,26 @@ class JsonFunctionsTest : public functions::test::FunctionBaseTest {
EXPECT_EQ(jsonResult, varcharResult);
return jsonResult;
}

void checkInternalFn(
const std::string& functionName,
const TypePtr& returnType,
const RowVectorPtr& data,
const VectorPtr& expected) {
auto inputFeild =
std::make_shared<core::FieldAccessTypedExpr>(VARCHAR(), "c0");

auto expression = std::make_shared<core::CallTypedExpr>(
returnType, std::vector<core::TypedExprPtr>{inputFeild}, functionName);

SelectivityVector rows(data->size());
std::vector<VectorPtr> result(1);
exec::ExprSet exprSet({expression}, &execCtx_);
exec::EvalCtx evalCtx(&execCtx_, &exprSet, data.get());

exprSet.eval(rows, evalCtx, result);
velox::test::assertEqualVectors(expected, result[0]);
};
};

TEST_F(JsonFunctionsTest, jsonFormat) {
Expand Down Expand Up @@ -876,6 +896,79 @@ TEST_F(JsonFunctionsTest, jsonExtract) {
VELOX_ASSERT_THROW(jsonExtract(kJson, "$.store.keys()"), "Invalid JSON path");
}

// The following tests ensure that the internal json functions
// $internal$json_string_to_array/map/row_cast can be invoked without issues
// from Prestissimo. The actual functionality is tested in JsonCastTest.

TEST_F(JsonFunctionsTest, jsonStringToArrayCast) {
// Array of strings.
auto data = makeRowVector({makeNullableFlatVector<std::string>(
{R"(["red","blue"])"_sv,
R"([null,null,"purple"])"_sv,
"[]"_sv,
"null"_sv})});
auto expected = makeNullableArrayVector<StringView>(
{{{"red"_sv, "blue"_sv}},
{{std::nullopt, std::nullopt, "purple"_sv}},
{{}},
std::nullopt});

checkInternalFn(
"$internal$json_string_to_array_cast", ARRAY(VARCHAR()), data, expected);

// Array of integers.
data = makeRowVector({makeNullableFlatVector<std::string>(
{R"(["10212","1015353"])"_sv, R"(["10322","285000"])"})});
expected =
makeNullableArrayVector<int64_t>({{10212, 1015353}, {10322, 285000}});

checkInternalFn(
"$internal$json_string_to_array_cast", ARRAY(BIGINT()), data, expected);
}

TEST_F(JsonFunctionsTest, jsonStringToMapCast) {
// Map of strings.
auto data = makeRowVector({makeFlatVector<std::string>(
{R"({"red":1,"blue":2})"_sv,
R"({"green":3,"magenta":4})"_sv,
R"({"violet":1,"blue":2})"_sv,
R"({"yellow":1,"blue":2})"_sv,
R"({"purple":10,"cyan":5})"_sv})});

auto expected = makeMapVector<StringView, int64_t>(
{{{"red"_sv, 1}, {"blue"_sv, 2}},
{{"green"_sv, 3}, {"magenta"_sv, 4}},
{{"violet"_sv, 1}, {"blue"_sv, 2}},
{{"yellow"_sv, 1}, {"blue"_sv, 2}},
{{"purple"_sv, 10}, {"cyan"_sv, 5}}});

checkInternalFn(
"$internal$json_string_to_map_cast",
MAP(VARCHAR(), BIGINT()),
data,
expected);
}

TEST_F(JsonFunctionsTest, jsonStringToRowCast) {
// Row of strings.
auto data = makeRowVector({makeFlatVector<std::string>(
{R"({"red":1,"blue":2})"_sv,
R"({"red":3,"blue":4})"_sv,
R"({"red":1,"blue":2})"_sv,
R"({"red":1,"blue":2})"_sv,
R"({"red":10,"blue":5})"_sv})});

auto expected = makeRowVector(
{makeFlatVector<int64_t>({1, 3, 1, 1, 10}),
makeFlatVector<int64_t>({2, 4, 2, 2, 5})});

checkInternalFn(
"$internal$json_string_to_row_cast",
ROW({{"red", BIGINT()}, {"blue", BIGINT()}}),
data,
expected);
}

} // namespace

} // namespace facebook::velox::functions::prestosql
116 changes: 40 additions & 76 deletions velox/functions/prestosql/types/JsonType.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#include "velox/functions/lib/RowsTranslationUtil.h"
#include "velox/functions/lib/string/StringCore.h"
#include "velox/functions/prestosql/json/JsonStringUtil.h"
#include "velox/functions/prestosql/json/SIMDJsonUtil.h"
#include "velox/type/Conversions.h"
#include "velox/type/Type.h"

Expand Down Expand Up @@ -1096,79 +1095,7 @@ bool isSupportedBasicType(const TypePtr& type) {
}
}

/// Custom operator for casts from and to Json type.
class JsonCastOperator : public exec::CastOperator {
public:
bool isSupportedFromType(const TypePtr& other) const override;

bool isSupportedToType(const TypePtr& other) const override;

void castTo(
const BaseVector& input,
exec::EvalCtx& context,
const SelectivityVector& rows,
const TypePtr& resultType,
VectorPtr& result) const override;

void castTo(
const BaseVector& input,
exec::EvalCtx& context,
const SelectivityVector& rows,
const TypePtr& resultType,
VectorPtr& result,
const std::shared_ptr<exec::CastHooks>& hooks) const override;

void castFrom(
const BaseVector& input,
exec::EvalCtx& context,
const SelectivityVector& rows,
const TypePtr& resultType,
VectorPtr& result) const override;

private:
template <TypeKind kind>
void castFromJson(
const BaseVector& input,
exec::EvalCtx& context,
const SelectivityVector& rows,
BaseVector& result) const {
// Result is guaranteed to be a flat writable vector.
auto* flatResult = result.as<typename KindToFlatVector<kind>::type>();
exec::VectorWriter<Any> writer;
writer.init(*flatResult);
// Input is guaranteed to be in flat or constant encodings when passed in.
auto* inputVector = input.as<SimpleVector<StringView>>();
size_t maxSize = 0;
rows.applyToSelected([&](auto row) {
if (inputVector->isNullAt(row)) {
return;
}
auto& input = inputVector->valueAt(row);
maxSize = std::max(maxSize, input.size());
});
paddedInput_.resize(maxSize + simdjson::SIMDJSON_PADDING);
context.applyToSelectedNoThrow(rows, [&](auto row) {
writer.setOffset(row);
if (inputVector->isNullAt(row)) {
writer.commitNull();
return;
}
auto& input = inputVector->valueAt(row);
memcpy(paddedInput_.data(), input.data(), input.size());
simdjson::padded_string_view paddedInput(
paddedInput_.data(), input.size(), paddedInput_.size());
if (auto error = castFromJsonOneRow<kind>(paddedInput, writer)) {
context.setVeloxExceptionError(row, errors_[error]);
writer.commitNull();
}
});
writer.finish();
}

mutable folly::once_flag initializeErrors_;
mutable std::exception_ptr errors_[simdjson::NUM_ERROR_CODES];
mutable std::string paddedInput_;
};
} // namespace

bool JsonCastOperator::isSupportedFromType(const TypePtr& other) const {
if (isSupportedBasicType(other)) {
Expand Down Expand Up @@ -1197,6 +1124,45 @@ bool JsonCastOperator::isSupportedFromType(const TypePtr& other) const {
}
}

template <TypeKind kind>
void JsonCastOperator::castFromJson(
const BaseVector& input,
exec::EvalCtx& context,
const SelectivityVector& rows,
BaseVector& result) const {
// Result is guaranteed to be a flat writable vector.
auto* flatResult = result.as<typename KindToFlatVector<kind>::type>();
exec::VectorWriter<Any> writer;
writer.init(*flatResult);
// Input is guaranteed to be in flat or constant encodings when passed in.
auto* inputVector = input.as<SimpleVector<StringView>>();
size_t maxSize = 0;
rows.applyToSelected([&](auto row) {
if (inputVector->isNullAt(row)) {
return;
}
auto& input = inputVector->valueAt(row);
maxSize = std::max(maxSize, input.size());
});
paddedInput_.resize(maxSize + simdjson::SIMDJSON_PADDING);
context.applyToSelectedNoThrow(rows, [&](auto row) {
writer.setOffset(row);
if (inputVector->isNullAt(row)) {
writer.commitNull();
return;
}
auto& input = inputVector->valueAt(row);
memcpy(paddedInput_.data(), input.data(), input.size());
simdjson::padded_string_view paddedInput(
paddedInput_.data(), input.size(), paddedInput_.size());
if (auto error = castFromJsonOneRow<kind>(paddedInput, writer)) {
context.setVeloxExceptionError(row, errors_[error]);
writer.commitNull();
}
});
writer.finish();
}

bool JsonCastOperator::isSupportedToType(const TypePtr& other) const {
if (other->isDate()) {
return false;
Expand Down Expand Up @@ -1315,8 +1281,6 @@ class JsonTypeFactories : public CustomTypeFactories {
}
};

} // namespace

void registerJsonType() {
registerCustomType("json", std::make_unique<const JsonTypeFactories>());
}
Expand Down
43 changes: 43 additions & 0 deletions velox/functions/prestosql/types/JsonType.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#pragma once

#include "velox/expression/CastExpr.h"
#include "velox/functions/prestosql/json/SIMDJsonUtil.h"
#include "velox/type/SimpleFunctionApi.h"
#include "velox/type/Type.h"

Expand Down Expand Up @@ -72,4 +73,46 @@ using Json = CustomType<JsonT>;

void registerJsonType();

/// Custom operator for casts from and to Json type.
class JsonCastOperator : public exec::CastOperator {
public:
bool isSupportedFromType(const TypePtr& other) const override;

bool isSupportedToType(const TypePtr& other) const override;

void castTo(
const BaseVector& input,
exec::EvalCtx& context,
const SelectivityVector& rows,
const TypePtr& resultType,
VectorPtr& result) const override;

void castTo(
const BaseVector& input,
exec::EvalCtx& context,
const SelectivityVector& rows,
const TypePtr& resultType,
VectorPtr& result,
const std::shared_ptr<exec::CastHooks>& hooks) const override;

void castFrom(
const BaseVector& input,
exec::EvalCtx& context,
const SelectivityVector& rows,
const TypePtr& resultType,
VectorPtr& result) const override;

private:
template <TypeKind kind>
void castFromJson(
const BaseVector& input,
exec::EvalCtx& context,
const SelectivityVector& rows,
BaseVector& result) const;

mutable folly::once_flag initializeErrors_;
mutable std::exception_ptr errors_[simdjson::NUM_ERROR_CODES];
mutable std::string paddedInput_;
};

} // namespace facebook::velox

0 comments on commit c484b83

Please sign in to comment.