diff --git a/velox/docs/develop/view-and-writer-types.rst b/velox/docs/develop/view-and-writer-types.rst index e816a62f3dc2..5c8bf89ee116 100644 --- a/velox/docs/develop/view-and-writer-types.rst +++ b/velox/docs/develop/view-and-writer-types.rst @@ -333,10 +333,3 @@ When Zero-copy optimization is enabled (see zero-copy-string-result section abov - void **copy_from** (const GenericView& view) : assign data from another GenericView - template typename VectorWriter::exec_out_t& **castTo** () : cast to concrete writer type - template typename VectorWriter::exec_out_t* **tryCastTo** () : best-effort attempt to cast to a concrete writer type - -Limitations ------------ - -1. If a function throws an exception while writing a complex type, then the output of the -row being written as well as the output of the next row are undefined. Hence, it's recommended -to avoid throwing exceptions after writing has started for a complex output within the function. diff --git a/velox/expression/EvalCtx.h b/velox/expression/EvalCtx.h index c7f137aa44d2..0d4ba9c655f3 100644 --- a/velox/expression/EvalCtx.h +++ b/velox/expression/EvalCtx.h @@ -278,11 +278,20 @@ class EvalCtx { const SelectivityVector& rows, const std::exception_ptr& exceptionPtr); - /// Invokes a function on each selected row. Records per-row exceptions by - /// calling 'setError'. The function must take a single "row" argument of type - /// vector_size_t and return void. template void applyToSelectedNoThrow(const SelectivityVector& rows, Callable func) { + applyToSelectedNoThrow(rows, func, [](auto /* row */) INLINE_LAMBDA {}); + } + + /// Invokes a function on each selected row. Records per-row exceptions by + /// calling 'setError'. The function onErrorFunc is called before 'setError' + /// when exceptions are thrown. The functions Callable and OnError must take a + /// single "row" argument of type vector_size_t and return void. + template + void applyToSelectedNoThrow( + const SelectivityVector& rows, + Callable func, + OnError onErrorFunc) { rows.template applyToSelected([&](auto row) INLINE_LAMBDA { try { func(row); @@ -290,9 +299,13 @@ class EvalCtx { if (!e.isUserError()) { throw; } + + onErrorFunc(row); + // Avoid double throwing. setVeloxExceptionError(row, std::current_exception()); } catch (const std::exception&) { + onErrorFunc(row); setError(row, std::current_exception()); } }); diff --git a/velox/expression/SimpleFunctionAdapter.h b/velox/expression/SimpleFunctionAdapter.h index 8c0bb54d5b12..37250c7990df 100644 --- a/velox/expression/SimpleFunctionAdapter.h +++ b/velox/expression/SimpleFunctionAdapter.h @@ -197,6 +197,12 @@ class SimpleFunctionAdapter : public VectorFunction { context.template applyToSelectedNoThrow(*rows, func); } + template + void applyToSelectedNoThrow(Callable func, OnError onErrorFunc) { + context.template applyToSelectedNoThrow( + *rows, func, onErrorFunc); + } + void setError(vector_size_t row, Status status) { context.setStatus(row, status); } @@ -742,37 +748,20 @@ class SimpleFunctionAdapter : public VectorFunction { template void applyUdf(ApplyContext& applyContext, Func func) const { - if constexpr (IsArrayWriter::value || IsMapWriter::value) { - // An optimization for arrayProxy and mapWriter that force the - // localization of the writer. - auto& currentWriter = applyContext.resultWriter.writer_; - - applyContext.applyToSelectedNoThrow([&](auto row) INLINE_LAMBDA { - applyContext.resultWriter.setOffset(row); - // Force local copy of proxy. - auto localWriter = currentWriter; - bool notNull; - auto status = func(localWriter, notNull, row); - if UNLIKELY (!status.ok()) { - applyContext.setError(row, status); - } else { - currentWriter = localWriter; - applyContext.resultWriter.commit(notNull); - } - }); - applyContext.resultWriter.finish(); - } else { - applyContext.applyToSelectedNoThrow([&](auto row) INLINE_LAMBDA { - applyContext.resultWriter.setOffset(row); - bool notNull; - auto status = func(applyContext.resultWriter.current(), notNull, row); - if UNLIKELY (!status.ok()) { - applyContext.setError(row, status); - } else { - applyContext.resultWriter.commit(notNull); - } - }); - } + applyContext.applyToSelectedNoThrow( + [&](auto row) INLINE_LAMBDA { + applyContext.resultWriter.setOffset(row); + bool notNull; + auto status = func(applyContext.resultWriter.current(), notNull, row); + if UNLIKELY (!status.ok()) { + applyContext.setError(row, status); + applyContext.resultWriter.commitNull(); + } else { + applyContext.resultWriter.commit(notNull); + } + }, + [&](auto /* row */) + INLINE_LAMBDA { applyContext.resultWriter.commitNull(); }); } // == NULLABLE VARIANTS == diff --git a/velox/expression/tests/ArrayWriterTest.cpp b/velox/expression/tests/ArrayWriterTest.cpp index db2ec74df864..7398a9d6da53 100644 --- a/velox/expression/tests/ArrayWriterTest.cpp +++ b/velox/expression/tests/ArrayWriterTest.cpp @@ -1063,5 +1063,43 @@ TEST_F(ArrayWriterTest, copyFromNestedArrayOfOpaqueUDT) { } } +// Throws an error if n is even, otherwise creates a 3x3 array filled with n. +template +struct ThrowsErrorsFunc { + template + void call(TOut& out, const int64_t& n) { + for (auto i = 0; i < 3; i++) { + auto& innerArray = out.add_item(); + for (auto j = 0; j < 3; j++) { + // If commit isn't called as part of error handling, the first inner + // array in odd number rows will pick up the elements from the last + // inner array of the previous row. + innerArray.push_back(n); + } + } + + VELOX_USER_CHECK_EQ(n % 2, 1); + } +}; + +TEST_F(ArrayWriterTest, errorHandlingE2E) { + registerFunction>, int64_t>( + {"throws_errors"}); + + auto result = evaluate( + "try(throws_errors(c0))", + makeRowVector({makeFlatVector({1, 2, 3, 4, 5, 6})})); + + assertEqualVectors( + result, + makeNestedArrayVectorFromJson( + {"[[1, 1, 1], [1, 1, 1], [1, 1, 1]]", + "null", + "[[3, 3, 3], [3, 3, 3], [3, 3, 3]]", + "null", + "[[5, 5, 5], [5, 5, 5], [5, 5, 5]]", + "null"})); +} + } // namespace } // namespace facebook::velox diff --git a/velox/expression/tests/MapWriterTest.cpp b/velox/expression/tests/MapWriterTest.cpp index 7eec5053b0fb..0e543c2da41a 100644 --- a/velox/expression/tests/MapWriterTest.cpp +++ b/velox/expression/tests/MapWriterTest.cpp @@ -755,5 +755,53 @@ TEST_F(MapWriterTest, copyFromViewTypeResizedChildren) { ASSERT_EQ(outerValues->mapValues()->asFlatVector()->size(), 6); } +// Throws an error if n is even, otherwise creates a map of maps. +template +struct ThrowsErrorsFunc { + template + void call(TOut& out, const int64_t& n) { + for (auto i = 0; i < 3; i++) { + auto [keyWriter, valueWriter] = out.add_item(); + keyWriter = i + n * 3; + for (auto j = 0; j < 3; j++) { + // If commit isn't called as part of error handling, the first inner + // map in odd number rows will pick up the entries from the last + // inner map of the previous row. + auto [innerKeyWriter, innerValueWriter] = valueWriter.add_item(); + innerKeyWriter.copy_from(std::string(1, 'a' + (i * 3 + j))); + innerValueWriter = n * 10 + i * 3 + j; + } + } + + VELOX_USER_CHECK_EQ(n % 2, 1); + } +}; + +TEST_F(MapWriterTest, errorHandlingE2E) { + registerFunction< + ThrowsErrorsFunc, + Map>, + int64_t>({"throws_errors"}); + + auto result = evaluate( + "try(throws_errors(c0))", + makeRowVector({makeFlatVector({1, 2, 3, 4, 5, 6})})); + + auto innerKeys = makeFlatVector( + {"a", "b", "c", "d", "e", "f", "g", "h", "i", "a", "b", "c", "d", "e", + "f", "g", "h", "i", "a", "b", "c", "d", "e", "f", "g", "h", "i"}); + auto innerValues = makeFlatVector( + {10, 11, 12, 13, 14, 15, 16, 17, 18, 30, 31, 32, 33, 34, + 35, 36, 37, 38, 50, 51, 52, 53, 54, 55, 56, 57, 58}); + std::vector innerOffsets{0, 3, 6, 9, 12, 15, 18, 21, 24}; + auto innerMaps = makeMapVector(innerOffsets, innerKeys, innerValues); + + auto outerKeys = makeFlatVector({3, 4, 5, 9, 10, 11, 15, 16, 17}); + std::vector outerOffsets{0, 3, 3, 6, 6, 9}; + auto expected = makeMapVector(outerOffsets, outerKeys, innerMaps, {1, 3, 5}); + + assertEqualVectors(result, expected); +} + } // namespace } // namespace facebook::velox diff --git a/velox/expression/tests/RowWriterTest.cpp b/velox/expression/tests/RowWriterTest.cpp index 242851b52c2e..71c11704b6d8 100644 --- a/velox/expression/tests/RowWriterTest.cpp +++ b/velox/expression/tests/RowWriterTest.cpp @@ -537,5 +537,55 @@ TEST_F(RowWriterTest, finishPostSize) { 11); } +// Throws an error if n is even, otherwise creates a row. +template +struct ThrowsErrorsFunc { + template + void call(TOut& out, const int64_t& n) { + out.template get_writer_at<0>() = n; + auto& stringWriter = out.template get_writer_at<1>(); + auto& arrayWriter = out.template get_writer_at<2>(); + auto& mapWriter = out.template get_writer_at<3>(); + + for (auto i = 0; i < 3; i++) { + stringWriter.append(std::string(1, 'a' + i + n * 3)); + arrayWriter.add_item() = n + i; + auto [keyWriter, valueWriter] = mapWriter.add_item(); + keyWriter = n * 10 + i; + valueWriter = n * 100 + i; + } + + VELOX_USER_CHECK_EQ(n % 2, 1); + } +}; + +TEST_F(RowWriterTest, errorHandlingE2E) { + registerFunction< + ThrowsErrorsFunc, + Row, Map>, + int64_t>({"throws_errors"}); + + auto result = evaluate( + "try(throws_errors(c0))", + makeRowVector({makeFlatVector({1, 2, 3, 4, 5, 6})})); + + auto field1 = makeFlatVector({1, 0, 3, 0, 5, 0}); + auto field2 = makeFlatVector({"def", "", "jkl", "", "pqr", ""}); + auto field3 = + makeArrayVector({{1, 2, 3}, {}, {3, 4, 5}, {}, {5, 6, 7}, {}}); + auto field4 = makeMapVector( + {{{10, 100}, {11, 101}, {12, 102}}, + {}, + {{30, 300}, {31, 301}, {32, 302}}, + {}, + {{50, 500}, {51, 501}, {52, 502}}, + {}}); + + auto expected = makeRowVector( + {field1, field2, field3, field4}, [](auto row) { return row % 2 == 1; }); + + assertEqualVectors(result, expected); +} + } // namespace } // namespace facebook::velox::exec diff --git a/velox/functions/prestosql/benchmarks/ArrayWriterBenchmark.cpp b/velox/functions/prestosql/benchmarks/ArrayWriterBenchmark.cpp index 02372fa5c74f..3afff9df542f 100644 --- a/velox/functions/prestosql/benchmarks/ArrayWriterBenchmark.cpp +++ b/velox/functions/prestosql/benchmarks/ArrayWriterBenchmark.cpp @@ -130,7 +130,7 @@ class ArrayWriterBenchmark : public functions::test::FunctionBenchmarkBase { public: ArrayWriterBenchmark() : FunctionBenchmarkBase() { registerFunction, int64_t>( - {"simpl_resize"}); + {"simple_resize"}); registerFunction, int64_t>( {"simple_push_back"}); registerFunction, int64_t>( @@ -276,6 +276,7 @@ BENCHMARK_MULTI(std_reference) { int main(int argc, char** argv) { folly::Init init{&argc, &argv}; + facebook::velox::memory::MemoryManager::initialize({}); facebook::velox::exec::ArrayWriterBenchmark benchmark; benchmark.test(); diff --git a/velox/vector/tests/utils/VectorTestBase.h b/velox/vector/tests/utils/VectorTestBase.h index c794e18366a0..ef9a8cc84777 100644 --- a/velox/vector/tests/utils/VectorTestBase.h +++ b/velox/vector/tests/utils/VectorTestBase.h @@ -308,7 +308,7 @@ class VectorTestBase { if (!vector.has_value()) { bits::setNull(rawNulls, i, true); rawSizes[i] = 0; - rawOffsets[i] = 0; + rawOffsets[i] = (i == 0) ? 0 : rawOffsets[i - 1] + rawSizes[i - 1]; } else { flattenedData.insert( flattenedData.end(), vector->begin(), vector->end());