Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zhli1142015 committed Dec 12, 2024
1 parent 3986f3b commit 68dab93
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 60 deletions.
10 changes: 5 additions & 5 deletions velox/docs/functions/spark/json.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ JSON Functions
match the field names of the ROW exactly (case-sensitive).
Behaviors of the casts are shown with the examples below. ::

SELECT from_json('{"a": true}'); -- {'a'=true} // Output type: ROW(a BOOLEAN)
SELECT from_json('{"a": 1}'); -- {'a'=1} // Output type: ROW(a INTEGER)
SELECT from_json('{"a": 1.0}'); -- {'a'=1.0} // Output type: ROW(a DOUBLE)
SELECT from_json('["name", "age", "id"]'); -- ['name', 'age', 'id'] // Output type: ARRAY(VARCHAR)
SELECT from_json('{"a": 1, "b": 2}'); -- {'a'=1, 'b'=2} // Output type: MAP(VARCHAR,INTEGER)
SELECT from_json('{"a": true}'); -- {'a'=true} // Output type: ROW({"a"}, {BOOLEAN()})
SELECT from_json('{"a": 1}'); -- {'a'=1} // Output type: ROW({"a"}, {INTEGER()})
SELECT from_json('{"a": 1.0}'); -- {'a'=1.0} // Output type: ROW({"a"}, {DOUBLE()})
SELECT from_json('["name", "age", "id"]'); -- ['name', 'age', 'id'] // Output type: ARRAY(VARCHAR())
SELECT from_json('{"a": 1, "b": 2}'); -- {'a'=1, 'b'=2} // Output type: MAP(VARCHAR(),INTEGER())
102 changes: 47 additions & 55 deletions velox/functions/sparksql/specialforms/FromJson.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
#include <limits>
#include <stdexcept>

#include "velox/expression/CastExpr.h"
#include "velox/expression/EvalCtx.h"
#include "velox/expression/PeeledEncoding.h"
#include "velox/expression/ScopedVarSetter.h"
#include "velox/expression/SpecialForm.h"
#include "velox/expression/VectorWriters.h"
#include "velox/functions/prestosql/json/SIMDJsonUtil.h"
Expand All @@ -33,8 +30,9 @@ using namespace facebook::velox::exec;
namespace facebook::velox::functions::sparksql {
namespace {

/// Struct for extracting JSON data and writing it with type-specific handling.
template <typename Input>
struct ParseJsonTypedImpl {
struct ExtractJsonTypedImpl {
template <TypeKind kind>
static simdjson::error_code
apply(Input input, exec::GenericWriter& writer, bool isRoot) {
Expand All @@ -55,7 +53,7 @@ struct ParseJsonTypedImpl {
template <typename Dummy>
struct KindDispatcher<TypeKind::VARCHAR, Dummy> {
static simdjson::error_code
apply(Input value, exec::GenericWriter& writer, bool isRoot) {
apply(Input value, exec::GenericWriter& writer, bool /*isRoot*/) {
SIMDJSON_ASSIGN_OR_RAISE(auto type, value.type());
std::string_view s;
switch (type) {
Expand All @@ -78,7 +76,7 @@ struct ParseJsonTypedImpl {
template <typename Dummy>
struct KindDispatcher<TypeKind::BOOLEAN, Dummy> {
static simdjson::error_code
apply(Input value, exec::GenericWriter& writer, bool isRoot) {
apply(Input value, exec::GenericWriter& writer, bool /*isRoot*/) {
SIMDJSON_ASSIGN_OR_RAISE(auto type, value.type());
auto& w = writer.castTo<bool>();
switch (type) {
Expand All @@ -96,47 +94,47 @@ struct ParseJsonTypedImpl {
template <typename Dummy>
struct KindDispatcher<TypeKind::TINYINT, Dummy> {
static simdjson::error_code
apply(Input value, exec::GenericWriter& writer, bool isRoot) {
apply(Input value, exec::GenericWriter& writer, bool /*isRoot*/) {
return castJsonToInt<int8_t>(value, writer);
}
};

template <typename Dummy>
struct KindDispatcher<TypeKind::SMALLINT, Dummy> {
static simdjson::error_code
apply(Input value, exec::GenericWriter& writer, bool isRoot) {
apply(Input value, exec::GenericWriter& writer, bool /*isRoot*/) {
return castJsonToInt<int16_t>(value, writer);
}
};

template <typename Dummy>
struct KindDispatcher<TypeKind::INTEGER, Dummy> {
static simdjson::error_code
apply(Input value, exec::GenericWriter& writer, bool isRoot) {
apply(Input value, exec::GenericWriter& writer, bool /*isRoot*/) {
return castJsonToInt<int32_t>(value, writer);
}
};

template <typename Dummy>
struct KindDispatcher<TypeKind::BIGINT, Dummy> {
static simdjson::error_code
apply(Input value, exec::GenericWriter& writer, bool isRoot) {
apply(Input value, exec::GenericWriter& writer, bool /*isRoot*/) {
return castJsonToInt<int64_t>(value, writer);
}
};

template <typename Dummy>
struct KindDispatcher<TypeKind::REAL, Dummy> {
static simdjson::error_code
apply(Input value, exec::GenericWriter& writer, bool isRoot) {
apply(Input value, exec::GenericWriter& writer, bool /*isRoot*/) {
return castJsonToFloatingPoint<float>(value, writer);
}
};

template <typename Dummy>
struct KindDispatcher<TypeKind::DOUBLE, Dummy> {
static simdjson::error_code
apply(Input value, exec::GenericWriter& writer, bool isRoot) {
apply(Input value, exec::GenericWriter& writer, bool /*isRoot*/) {
return castJsonToFloatingPoint<double>(value, writer);
}
};
Expand All @@ -146,7 +144,7 @@ struct ParseJsonTypedImpl {
static simdjson::error_code
apply(Input value, exec::GenericWriter& writer, bool isRoot) {
auto& writerTyped = writer.castTo<Array<Any>>();
auto& elementType = writer.type()->childAt(0);
const auto& elementType = writer.type()->childAt(0);
SIMDJSON_ASSIGN_OR_RAISE(auto type, value.type());
if (type == simdjson::ondemand::json_type::array) {
SIMDJSON_ASSIGN_OR_RAISE(auto array, value.get_array());
Expand All @@ -158,7 +156,7 @@ struct ParseJsonTypedImpl {
writerTyped.add_null();
} else {
SIMDJSON_TRY(VELOX_DYNAMIC_TYPE_DISPATCH(
ParseJsonTypedImpl<simdjson::ondemand::value>::apply,
ExtractJsonTypedImpl<simdjson::ondemand::value>::apply,
elementType->kind(),
element,
writerTyped.add_item(),
Expand All @@ -167,7 +165,7 @@ struct ParseJsonTypedImpl {
}
} else if (elementType->kind() == TypeKind::ROW && isRoot) {
SIMDJSON_TRY(VELOX_DYNAMIC_TYPE_DISPATCH(
ParseJsonTypedImpl<simdjson::ondemand::value>::apply,
ExtractJsonTypedImpl<simdjson::ondemand::value>::apply,
elementType->kind(),
value,
writerTyped.add_item(),
Expand All @@ -182,10 +180,9 @@ struct ParseJsonTypedImpl {
template <typename Dummy>
struct KindDispatcher<TypeKind::MAP, Dummy> {
static simdjson::error_code
apply(Input value, exec::GenericWriter& writer, bool isRoot) {
apply(Input value, exec::GenericWriter& writer, bool /*isRoot*/) {
auto& writerTyped = writer.castTo<Map<Any, Any>>();
auto& keyType = writer.type()->childAt(0);
auto& valueType = writer.type()->childAt(1);
const auto& valueType = writer.type()->childAt(1);
SIMDJSON_ASSIGN_OR_RAISE(auto object, value.get_object());
for (auto fieldResult : object) {
SIMDJSON_ASSIGN_OR_RAISE(auto field, fieldResult);
Expand All @@ -198,7 +195,7 @@ struct ParseJsonTypedImpl {
auto writers = writerTyped.add_item();
std::get<0>(writers).castTo<Varchar>().append(key);
SIMDJSON_TRY(VELOX_DYNAMIC_TYPE_DISPATCH(
ParseJsonTypedImpl<simdjson::ondemand::value>::apply,
ExtractJsonTypedImpl<simdjson::ondemand::value>::apply,
valueType->kind(),
field.value(),
std::get<1>(writers),
Expand All @@ -213,7 +210,7 @@ struct ParseJsonTypedImpl {
struct KindDispatcher<TypeKind::ROW, Dummy> {
static simdjson::error_code
apply(Input value, exec::GenericWriter& writer, bool isRoot) {
auto& rowType = writer.type()->asRow();
const auto& rowType = writer.type()->asRow();
auto& writerTyped = writer.castTo<DynamicRow>();
if (value.type().error() != ::simdjson::SUCCESS) {
writerTyped.set_null_at(0);
Expand Down Expand Up @@ -245,7 +242,7 @@ struct ParseJsonTypedImpl {
it->second = -1;

auto res = VELOX_DYNAMIC_TYPE_DISPATCH(
ParseJsonTypedImpl<simdjson::ondemand::value>::apply,
ExtractJsonTypedImpl<simdjson::ondemand::value>::apply,
rowType.childAt(index)->kind(),
field.value(),
writerTyped.get_writer_at(index),
Expand All @@ -257,12 +254,14 @@ struct ParseJsonTypedImpl {
}
}

for (const auto& [key, index] : fieldIndices) {
for (const auto& [_, index] : fieldIndices) {
if (index >= 0) {
writerTyped.set_null_at(index);
}
}
} else {
// Handle other JSON types: set null to the writer if it's the root doc,
// otherwise return INCORRECT_TYPE to the caller.
if (isRoot) {
writerTyped.set_null_at(0);
return simdjson::SUCCESS;
Expand All @@ -274,23 +273,6 @@ struct ParseJsonTypedImpl {
}
};

static simdjson::simdjson_result<std::string_view> rawJson(
Input value,
simdjson::ondemand::json_type type) {
switch (type) {
case simdjson::ondemand::json_type::array: {
SIMDJSON_ASSIGN_OR_RAISE(auto array, value.get_array());
return array.raw_json();
}
case simdjson::ondemand::json_type::object: {
SIMDJSON_ASSIGN_OR_RAISE(auto object, value.get_object());
return object.raw_json();
}
default:
return value.raw_json_token();
}
}

template <typename T>
static simdjson::error_code castJsonToInt(
Input value,
Expand All @@ -307,14 +289,15 @@ struct ParseJsonTypedImpl {
default:
return simdjson::INCORRECT_TYPE;
}
break;
}
default:
return simdjson::INCORRECT_TYPE;
}
return simdjson::SUCCESS;
}

// Casts a JSON value to a float point, handling both numeric Special cases
// for NaN and Infinity.
template <typename T>
static simdjson::error_code castJsonToFloatingPoint(
Input value,
Expand Down Expand Up @@ -368,8 +351,9 @@ struct ParseJsonTypedImpl {
/// - Failure Handling: Returns `NULL` for invalid JSON or incompatible values.
/// - Boolean: Only `true` and `false` are valid; others return `NULL`.
/// - Integral Types: Accepts only integers; floats or strings return `NULL`.
/// - Float/Double: All numbers are valid; strings like `"NaN"` , `"INF"`
/// `"Infinity"` are accepted, others return `NULL`.
/// - Float/Double: All numbers are valid; strings like `"NaN"`, `"+INF"`,
/// `"+Infinity"`, `"Infinity"`, `"-INF"`,
/// `"-Infinity"` are accepted, others return `NULL`.
/// - Array: Accepts JSON objects only if the array is the root type with ROW
/// child type.
/// - Map: Keys must be `VARCHAR` type.
Expand Down Expand Up @@ -425,7 +409,7 @@ class FromJsonFunction final : public exec::VectorFunction {
context.applyToSelectedNoThrow(rows, [&](auto row) {
writer.setOffset(row);
if (error != simdjson::SUCCESS ||
paseJsonOneRow(jsonDoc, writer) != simdjson::SUCCESS) {
extractJsonToWriter(jsonDoc, writer) != simdjson::SUCCESS) {
writer.commitNull();
}
});
Expand All @@ -448,7 +432,7 @@ class FromJsonFunction final : public exec::VectorFunction {
if (inputVector->isNullAt(row)) {
return;
}
auto& input = inputVector->valueAt(row);
const auto& input = inputVector->valueAt(row);
maxSize = std::max(maxSize, input.size());
});
paddedInput_.resize(maxSize + simdjson::SIMDJSON_PADDING);
Expand All @@ -458,52 +442,61 @@ class FromJsonFunction final : public exec::VectorFunction {
writer.commitNull();
return;
}
auto& input = inputVector->valueAt(row);
const auto& input = inputVector->valueAt(row);
memcpy(paddedInput_.data(), input.data(), input.size());
simdjson::padded_string_view paddedInput(
paddedInput_.data(), input.size(), paddedInput_.size());
simdjson::ondemand::document doc;
auto error = simdjsonParse(paddedInput).get(doc);
if (error != simdjson::SUCCESS ||
paseJsonOneRow(doc, writer) != simdjson::SUCCESS) {
extractJsonToWriter(doc, writer) != simdjson::SUCCESS) {
writer.commitNull();
}
});
writer.finish();
}

static simdjson::error_code paseJsonOneRow(
// Extracts data from json doc and writes it to writer.
static simdjson::error_code extractJsonToWriter(
simdjson::ondemand::document& doc,
exec::VectorWriter<Any>& writer) {
if (doc.is_null()) {
writer.commitNull();
} else {
SIMDJSON_TRY(
ParseJsonTypedImpl<simdjson::ondemand::document&>::apply<kind>(
ExtractJsonTypedImpl<simdjson::ondemand::document&>::apply<kind>(
doc, writer.current(), true));
writer.commit(true);
}
return simdjson::SUCCESS;
}

// The buffer with extra bytes for parser::parse(),
mutable std::string paddedInput_;
};

/// Determines whether a given type is supported.
/// @param isRootType. A flag indicating whether the type is the root type in
/// the evaluation context. Only ROW, ARRAY, and MAP are allowed as root types;
/// this flag helps differentiate such cases.
bool isSupportedType(const TypePtr& type, bool isRootType) {
switch (type->kind()) {
case TypeKind::ARRAY:
case TypeKind::ARRAY: {
return isSupportedType(type->childAt(0), false);
case TypeKind::ROW:
for (const auto& child : type->as<TypeKind::ROW>().children()) {
}
case TypeKind::ROW: {
for (const auto& child : asRowType(type)->children()) {
if (!isSupportedType(child, false)) {
return false;
}
}
return true;
case TypeKind::MAP:
}
case TypeKind::MAP: {
return (
type->childAt(0)->kind() == TypeKind::VARCHAR &&
isSupportedType(type->childAt(1), false));
}
case TypeKind::BIGINT: {
if (type->isDecimal()) {
return false;
Expand Down Expand Up @@ -547,9 +540,8 @@ exec::ExprPtr FromJsonCallToSpecialForm::constructSpecialForm(
TypeKind::VARCHAR,
"The first argument of from_json should be of varchar type.");

if (!isSupportedType(type, true)) {
VELOX_UNSUPPORTED("Unsupported type {}.", type->toString());
}
VELOX_USER_CHECK(
isSupportedType(type, true), "Unsupported type {}.", type->toString());

std::shared_ptr<exec::VectorFunction> func;
if (type->kind() == TypeKind::ARRAY) {
Expand Down

0 comments on commit 68dab93

Please sign in to comment.