diff --git a/velox/functions/sparksql/specialforms/CMakeLists.txt b/velox/functions/sparksql/specialforms/CMakeLists.txt index 8360c79b5dd0e..cfce8dc29df28 100644 --- a/velox/functions/sparksql/specialforms/CMakeLists.txt +++ b/velox/functions/sparksql/specialforms/CMakeLists.txt @@ -15,8 +15,8 @@ velox_add_library( velox_functions_spark_specialforms AtLeastNNonNulls.cpp - FromJson.cpp DecimalRound.cpp + FromJson.cpp MakeDecimal.cpp SparkCastExpr.cpp SparkCastHooks.cpp) diff --git a/velox/functions/sparksql/specialforms/FromJson.cpp b/velox/functions/sparksql/specialforms/FromJson.cpp index ad7e6628909d6..954b80118274c 100644 --- a/velox/functions/sparksql/specialforms/FromJson.cpp +++ b/velox/functions/sparksql/specialforms/FromJson.cpp @@ -48,8 +48,8 @@ struct ParseJsonTypedImpl { struct KindDispatcher { static simdjson::error_code apply(Input, exec::GenericWriter&, bool) { VELOX_NYI( - "Casting from JSON to {} is not supported.", TypeTraits::name); - return simdjson::error_code::UNEXPECTED_ERROR; // Make compiler happy. + "Parse json to {} is not supported.", TypeTraits::name); + return simdjson::error_code::UNEXPECTED_ERROR; } }; @@ -367,136 +367,55 @@ struct ParseJsonTypedImpl { } }; -template -simdjson::error_code parseJsonOneRow( - simdjson::padded_string_view input, - exec::VectorWriter& writer) { - SIMDJSON_ASSIGN_OR_RAISE(auto doc, simdjsonParse(input)); - if (doc.is_null()) { - writer.commitNull(); - } else { - SIMDJSON_TRY(ParseJsonTypedImpl::apply( - doc, writer.current(), true)); - writer.commit(true); - } - return simdjson::SUCCESS; -} - -class FromJsonExpr : public SpecialForm { +/// @brief Parses a JSON string into the specified data type. Supports ROW, +/// ARRAY, and MAP as root types. Key Behavior: +/// - Failure Handling: Returns `NULL` for invalid JSON or incompatible values. +/// - Boolean: Only `true` and `false` are valid; others return `NULL`. +/// - Integral Types: Accepts only integers; floats or strings return `NULL`. +/// - Float/Double: All numbers are valid; strings like `"NaN"` , `"INF"` +/// `"Infinity"` are accepted, others return `NULL`. +/// - Array: Accepts JSON objects only if the array is the root type with ROW +/// child type. +/// - Map: Keys must be `VARCHAR` type. +/// - Row: Partial parsing is supported, but JSON arrays cannot be parsed into a +/// ROW type. +class FromJsonFunction final : public exec::VectorFunction { public: - /// @param type The target type of the cast expression - /// @param expr The expression to gerenate input - /// @param trackCpuUsage Whether to track CPU usage - FromJsonExpr(TypePtr type, ExprPtr&& expr, bool trackCpuUsage) - : SpecialForm( - type, - std::vector({expr}), - FromJsonCallToSpecialForm::kFromJson, - false /* supportsFlatNoNullsFastPath */, - trackCpuUsage) { - if (!isSupportedType(type)) { - VELOX_UNSUPPORTED("Unsupported type {}.", type->toString()); - } - simdjsonErrorsToExceptions(errors_); - } - - void evalSpecialForm( - const SelectivityVector& rows, - EvalCtx& context, - VectorPtr& result) override { - VectorPtr input; - inputs_[0]->eval(rows, context, input); - auto toType = std::const_pointer_cast(type_); - apply(rows, input, context, toType, result); - // Return 'input' back to the vector pool in 'context' so it can be reused. - context.releaseVector(input); - } - - private: - void computePropagatesNulls() override { - propagatesNulls_ = false; - } - - // Peal data. void apply( const SelectivityVector& rows, - const VectorPtr& input, - exec::EvalCtx& context, - const TypePtr& toType, - VectorPtr& result) { - LocalSelectivityVector remainingRows(context, rows); - - context.deselectErrors(*remainingRows); - - LocalDecodedVector decoded(context, *input, *remainingRows); - auto* rawNulls = decoded->nulls(remainingRows.get()); - - if (rawNulls) { - remainingRows->deselectNulls( - rawNulls, remainingRows->begin(), remainingRows->end()); - } - - VectorPtr localResult; - if (!remainingRows->hasSelections()) { - localResult = - BaseVector::createNullConstant(toType, rows.end(), context.pool()); - } else if (decoded->isIdentityMapping()) { - applyPeeled( - *remainingRows, *decoded->base(), context, toType, localResult); - } else { - withContextSaver([&](ContextSaver& saver) { - LocalSelectivityVector newRowsHolder(*context.execCtx()); - - LocalDecodedVector localDecoded(context); - std::vector peeledVectors; - auto peeledEncoding = PeeledEncoding::peel( - {input}, *remainingRows, localDecoded, true, peeledVectors); - VELOX_CHECK_EQ(peeledVectors.size(), 1); - if (peeledVectors[0]->isLazy()) { - peeledVectors[0] = - peeledVectors[0]->as()->loadedVectorShared(); - } - auto newRows = - peeledEncoding->translateToInnerRows(*remainingRows, newRowsHolder); - // Save context and set the peel. - context.saveAndReset(saver, *remainingRows); - context.setPeeledEncoding(peeledEncoding); - applyPeeled(*newRows, *peeledVectors[0], context, toType, localResult); - - localResult = context.getPeeledEncoding()->wrap( - toType, context.pool(), localResult, *remainingRows); - }); - } - context.moveOrCopyResult(localResult, *remainingRows, result); - context.releaseVector(localResult); - - // If there are nulls or rows that encountered errors in the input, add - // nulls to the result at the same rows. - VELOX_CHECK_NOT_NULL(result); - if (rawNulls || context.errors()) { - EvalCtx::addNulls( - rows, remainingRows->asRange().bits(), context, toType, result); - } - } - - void applyPeeled( - const SelectivityVector& rows, - const BaseVector& input, + std::vector& args, // Not using const ref so we can reuse args + const TypePtr& outputType, exec::EvalCtx& context, - const TypePtr& toType, - VectorPtr& result) { - context.ensureWritable(rows, toType, result); + VectorPtr& result) const final { + VELOX_USER_CHECK_EQ(args.size(), 1, "from_json expects one argument."); + VELOX_USER_CHECK( + args[0]->isConstantEncoding() || args[0]->isFlatEncoding(), + "Single-arg deterministic functions receive their only argument as flat or constant vector."); + context.ensureWritable(rows, outputType, result); + result->clearNulls(rows); switch (result->typeKind()) { case TypeKind::ARRAY: { - parseJson(input, context, rows, *result); + if (args[0]->isConstantEncoding()) { + parseJsonConstant(args[0], context, rows, *result); + } else { + parseJsonFlat(args[0], context, rows, *result); + } break; } case TypeKind::MAP: { - parseJson(input, context, rows, *result); + if (args[0]->isConstantEncoding()) { + parseJsonConstant(args[0], context, rows, *result); + } else { + parseJsonFlat(args[0], context, rows, *result); + } break; } case TypeKind::ROW: { - parseJson(input, context, rows, *result); + if (args[0]->isConstantEncoding()) { + parseJsonConstant(args[0], context, rows, *result); + } else { + parseJsonFlat(args[0], context, rows, *result); + } break; } default: @@ -504,9 +423,10 @@ class FromJsonExpr : public SpecialForm { } } + private: template - void parseJson( - const BaseVector& input, + void parseJsonConstant( + VectorPtr& input, exec::EvalCtx& context, const SelectivityVector& rows, BaseVector& result) const { @@ -514,8 +434,44 @@ class FromJsonExpr : public SpecialForm { auto* flatResult = result.as::type>(); exec::VectorWriter writer; writer.init(*flatResult); - // Input is guaranteed to be in flat or constant encodings when passed in. - auto* inputVector = input.as>(); + const auto constInput = input->asUnchecked>(); + if (constInput->isNullAt(0)) { + context.applyToSelectedNoThrow(rows, [&](auto row) { + writer.setOffset(row); + writer.commitNull(); + }); + } else { + const auto constant = constInput->valueAt(0); + paddedInput_.resize(constant.size() + simdjson::SIMDJSON_PADDING); + memcpy(paddedInput_.data(), constant.data(), constant.size()); + simdjson::padded_string_view paddedInput( + paddedInput_.data(), constant.size(), paddedInput_.size()); + + simdjson::ondemand::document jsonDoc; + auto error = simdjsonParse(paddedInput).get(jsonDoc); + + context.applyToSelectedNoThrow(rows, [&](auto row) { + writer.setOffset(row); + if (error != simdjson::SUCCESS || + paseJsonOneRow(jsonDoc, writer) != simdjson::SUCCESS) { + writer.commitNull(); + } + }); + } + + writer.finish(); + } + + template + void parseJsonFlat( + VectorPtr& input, + exec::EvalCtx& context, + const SelectivityVector& rows, + BaseVector& result) const { + auto* flatResult = result.as::type>(); + exec::VectorWriter writer; + writer.init(*flatResult); + auto* inputVector = input->asUnchecked>(); size_t maxSize = 0; rows.applyToSelected([&](auto row) { if (inputVector->isNullAt(row)) { @@ -535,49 +491,67 @@ class FromJsonExpr : public SpecialForm { memcpy(paddedInput_.data(), input.data(), input.size()); simdjson::padded_string_view paddedInput( paddedInput_.data(), input.size(), paddedInput_.size()); - if (auto error = parseJsonOneRow(paddedInput, writer)) { + simdjson::ondemand::document doc; + auto error = simdjsonParse(paddedInput).get(doc); + if (error != simdjson::SUCCESS || + paseJsonOneRow(doc, writer) != simdjson::SUCCESS) { writer.commitNull(); } }); writer.finish(); } - bool isSupportedType(const TypePtr& other, bool isRootType = true) const { - switch (other->kind()) { - case TypeKind::ARRAY: - return isSupportedType(other->childAt(0), false); - case TypeKind::ROW: - for (const auto& child : other->as().children()) { - if (!isSupportedType(child, false)) { - return false; - } - } - return true; - case TypeKind::MAP: - return ( - other->childAt(0)->kind() == TypeKind::VARCHAR && - isSupportedType(other->childAt(1), false)); - case TypeKind::BOOLEAN: - case TypeKind::BIGINT: - case TypeKind::INTEGER: - case TypeKind::SMALLINT: - case TypeKind::TINYINT: - case TypeKind::DOUBLE: - case TypeKind::REAL: - case TypeKind::VARCHAR: { - if (other->isDate() || other->isDecimal()) { + template + static simdjson::error_code paseJsonOneRow( + simdjson::ondemand::document& doc, + exec::VectorWriter& writer) { + if (doc.is_null()) { + writer.commitNull(); + } else { + SIMDJSON_TRY( + ParseJsonTypedImpl::apply( + doc, writer.current(), true)); + writer.commit(true); + } + return simdjson::SUCCESS; + } + + mutable std::string paddedInput_; +}; + +bool isSupportedType(const TypePtr& other, bool isRootType = true) { + switch (other->kind()) { + case TypeKind::ARRAY: + return isSupportedType(other->childAt(0), false); + case TypeKind::ROW: + for (const auto& child : other->as().children()) { + if (!isSupportedType(child, false)) { return false; } - return !isRootType; } - default: + return true; + case TypeKind::MAP: + return ( + other->childAt(0)->kind() == TypeKind::VARCHAR && + isSupportedType(other->childAt(1), false)); + case TypeKind::BOOLEAN: + case TypeKind::BIGINT: + case TypeKind::INTEGER: + case TypeKind::SMALLINT: + case TypeKind::TINYINT: + case TypeKind::DOUBLE: + case TypeKind::REAL: + case TypeKind::VARCHAR: { + if (other->isDate() || other->isDecimal()) { return false; + } + return !isRootType; } + default: + return false; } +} - std::exception_ptr errors_[simdjson::NUM_ERROR_CODES]; - mutable std::string paddedInput_; -}; } // namespace TypePtr FromJsonCallToSpecialForm::resolveType( @@ -596,7 +570,16 @@ exec::ExprPtr FromJsonCallToSpecialForm::constructSpecialForm( TypeKind::VARCHAR, "The first argument of from_json should be of varchar type."); - return std::make_shared( - type, std::move(args[0]), trackCpuUsage); + if (!isSupportedType(type)) { + VELOX_UNSUPPORTED("Unsupported type {}.", type->toString()); + } + + return std::make_shared( + type, + std::move(args), + std::make_shared(), + exec::VectorFunctionMetadata{}, + kFromJson, + trackCpuUsage); } } // namespace facebook::velox::functions::sparksql