Skip to content

Commit

Permalink
Call commitNull in SimpleFunctionAdapter on exceptions (facebookincub…
Browse files Browse the repository at this point in the history
…ator#10377)

Summary:
Pull Request resolved: facebookincubator#10377

The way the VectorWriters work today, if its writing a variable length type and it is not committed
(e.g. because an exception was thrown) when the next value is written it will start with the state of
the previous value rather than a clean slate.  This can result in e.g. strings starting with the contents
that were written for the previous string.

SimpleFunctionAdapter tried to compensate for this by making a local copy of the top level
VectorWriter and only copying back into the original if processing the current row succeeds.  This
does nothing for nested writers (it also wasn't implemented for Strings).

To fix this, I've added an optional lambda to applyToSelectedNoThrow that gets invoked when an
exception is caught.  We can use this to call commitNull on the writer which should reset the state
of all writers (top level and nested).  Note that if we're catching exceptions and not throwing anything
we must be in a try so committing null is safe and reasonable to do.

This shouldn't impact the performance of the path without exceptions (I ran the ArrayWriterBenchmark to confirm this).  I also do not need to make this change in the fast path as
the fast path is only invoked if the output type is primitive and fixed width, and in this there is no
state other than the value in the Vector so failing to commit does not cause issues.

This combined with facebookincubator#10376 addresses the issue
identified in facebookincubator#10162

Reviewed By: mbasmanova

Differential Revision: D59292285

fbshipit-source-id: 00c000626bb12451b7c95c24460eb62816532403
  • Loading branch information
Kevin Wilfong authored and facebook-github-bot committed Jul 3, 2024
1 parent a033968 commit decd91e
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 43 deletions.
7 changes: 0 additions & 7 deletions velox/docs/develop/view-and-writer-types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,3 @@ When Zero-copy optimization is enabled (see zero-copy-string-result section abov
- void **copy_from** (const GenericView& view) : assign data from another GenericView
- template <typename ToType> typename VectorWriter<ToType, void>::exec_out_t& **castTo** () : cast to concrete writer type
- template <typename ToType> typename VectorWriter<ToType, void>::exec_out_t* **tryCastTo** () : best-effort attempt to cast to a concrete writer type

Limitations
-----------

1. If a function throws an exception while writing a complex type, then the output of the
row being written as well as the output of the next row are undefined. Hence, it's recommended
to avoid throwing exceptions after writing has started for a complex output within the function.
19 changes: 16 additions & 3 deletions velox/expression/EvalCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,21 +278,34 @@ class EvalCtx {
const SelectivityVector& rows,
const std::exception_ptr& exceptionPtr);

/// Invokes a function on each selected row. Records per-row exceptions by
/// calling 'setError'. The function must take a single "row" argument of type
/// vector_size_t and return void.
template <typename Callable>
void applyToSelectedNoThrow(const SelectivityVector& rows, Callable func) {
applyToSelectedNoThrow(rows, func, [](auto /* row */) INLINE_LAMBDA {});
}

/// Invokes a function on each selected row. Records per-row exceptions by
/// calling 'setError'. The function onErrorFunc is called before 'setError'
/// when exceptions are thrown. The functions Callable and OnError must take a
/// single "row" argument of type vector_size_t and return void.
template <typename Callable, typename OnError>
void applyToSelectedNoThrow(
const SelectivityVector& rows,
Callable func,
OnError onErrorFunc) {
rows.template applyToSelected([&](auto row) INLINE_LAMBDA {
try {
func(row);
} catch (const VeloxException& e) {
if (!e.isUserError()) {
throw;
}

onErrorFunc(row);

// Avoid double throwing.
setVeloxExceptionError(row, std::current_exception());
} catch (const std::exception&) {
onErrorFunc(row);
setError(row, std::current_exception());
}
});
Expand Down
51 changes: 20 additions & 31 deletions velox/expression/SimpleFunctionAdapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ class SimpleFunctionAdapter : public VectorFunction {
context.template applyToSelectedNoThrow<Callable>(*rows, func);
}

template <typename Callable, typename OnError>
void applyToSelectedNoThrow(Callable func, OnError onErrorFunc) {
context.template applyToSelectedNoThrow<Callable>(
*rows, func, onErrorFunc);
}

void setError(vector_size_t row, Status status) {
context.setStatus(row, status);
}
Expand Down Expand Up @@ -742,37 +748,20 @@ class SimpleFunctionAdapter : public VectorFunction {

template <typename Func>
void applyUdf(ApplyContext& applyContext, Func func) const {
if constexpr (IsArrayWriter<T>::value || IsMapWriter<T>::value) {
// An optimization for arrayProxy and mapWriter that force the
// localization of the writer.
auto& currentWriter = applyContext.resultWriter.writer_;

applyContext.applyToSelectedNoThrow([&](auto row) INLINE_LAMBDA {
applyContext.resultWriter.setOffset(row);
// Force local copy of proxy.
auto localWriter = currentWriter;
bool notNull;
auto status = func(localWriter, notNull, row);
if UNLIKELY (!status.ok()) {
applyContext.setError(row, status);
} else {
currentWriter = localWriter;
applyContext.resultWriter.commit(notNull);
}
});
applyContext.resultWriter.finish();
} else {
applyContext.applyToSelectedNoThrow([&](auto row) INLINE_LAMBDA {
applyContext.resultWriter.setOffset(row);
bool notNull;
auto status = func(applyContext.resultWriter.current(), notNull, row);
if UNLIKELY (!status.ok()) {
applyContext.setError(row, status);
} else {
applyContext.resultWriter.commit(notNull);
}
});
}
applyContext.applyToSelectedNoThrow(
[&](auto row) INLINE_LAMBDA {
applyContext.resultWriter.setOffset(row);
bool notNull;
auto status = func(applyContext.resultWriter.current(), notNull, row);
if UNLIKELY (!status.ok()) {
applyContext.setError(row, status);
applyContext.resultWriter.commitNull();
} else {
applyContext.resultWriter.commit(notNull);
}
},
[&](auto /* row */)
INLINE_LAMBDA { applyContext.resultWriter.commitNull(); });
}

// == NULLABLE VARIANTS ==
Expand Down
38 changes: 38 additions & 0 deletions velox/expression/tests/ArrayWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1063,5 +1063,43 @@ TEST_F(ArrayWriterTest, copyFromNestedArrayOfOpaqueUDT) {
}
}

// Throws an error if n is even, otherwise creates a 3x3 array filled with n.
template <typename T>
struct ThrowsErrorsFunc {
template <typename TOut>
void call(TOut& out, const int64_t& n) {
for (auto i = 0; i < 3; i++) {
auto& innerArray = out.add_item();
for (auto j = 0; j < 3; j++) {
// If commit isn't called as part of error handling, the first inner
// array in odd number rows will pick up the elements from the last
// inner array of the previous row.
innerArray.push_back(n);
}
}

VELOX_USER_CHECK_EQ(n % 2, 1);
}
};

TEST_F(ArrayWriterTest, errorHandlingE2E) {
registerFunction<ThrowsErrorsFunc, Array<Array<int64_t>>, int64_t>(
{"throws_errors"});

auto result = evaluate(
"try(throws_errors(c0))",
makeRowVector({makeFlatVector<int64_t>({1, 2, 3, 4, 5, 6})}));

assertEqualVectors(
result,
makeNestedArrayVectorFromJson<int64_t>(
{"[[1, 1, 1], [1, 1, 1], [1, 1, 1]]",
"null",
"[[3, 3, 3], [3, 3, 3], [3, 3, 3]]",
"null",
"[[5, 5, 5], [5, 5, 5], [5, 5, 5]]",
"null"}));
}

} // namespace
} // namespace facebook::velox
48 changes: 48 additions & 0 deletions velox/expression/tests/MapWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -755,5 +755,53 @@ TEST_F(MapWriterTest, copyFromViewTypeResizedChildren) {
ASSERT_EQ(outerValues->mapValues()->asFlatVector<int64_t>()->size(), 6);
}

// Throws an error if n is even, otherwise creates a map of maps.
template <typename T>
struct ThrowsErrorsFunc {
template <typename TOut>
void call(TOut& out, const int64_t& n) {
for (auto i = 0; i < 3; i++) {
auto [keyWriter, valueWriter] = out.add_item();
keyWriter = i + n * 3;
for (auto j = 0; j < 3; j++) {
// If commit isn't called as part of error handling, the first inner
// map in odd number rows will pick up the entries from the last
// inner map of the previous row.
auto [innerKeyWriter, innerValueWriter] = valueWriter.add_item();
innerKeyWriter.copy_from(std::string(1, 'a' + (i * 3 + j)));
innerValueWriter = n * 10 + i * 3 + j;
}
}

VELOX_USER_CHECK_EQ(n % 2, 1);
}
};

TEST_F(MapWriterTest, errorHandlingE2E) {
registerFunction<
ThrowsErrorsFunc,
Map<int64_t, Map<Varchar, float>>,
int64_t>({"throws_errors"});

auto result = evaluate(
"try(throws_errors(c0))",
makeRowVector({makeFlatVector<int64_t>({1, 2, 3, 4, 5, 6})}));

auto innerKeys = makeFlatVector<StringView>(
{"a", "b", "c", "d", "e", "f", "g", "h", "i", "a", "b", "c", "d", "e",
"f", "g", "h", "i", "a", "b", "c", "d", "e", "f", "g", "h", "i"});
auto innerValues = makeFlatVector<float>(
{10, 11, 12, 13, 14, 15, 16, 17, 18, 30, 31, 32, 33, 34,
35, 36, 37, 38, 50, 51, 52, 53, 54, 55, 56, 57, 58});
std::vector<vector_size_t> innerOffsets{0, 3, 6, 9, 12, 15, 18, 21, 24};
auto innerMaps = makeMapVector(innerOffsets, innerKeys, innerValues);

auto outerKeys = makeFlatVector<int64_t>({3, 4, 5, 9, 10, 11, 15, 16, 17});
std::vector<vector_size_t> outerOffsets{0, 3, 3, 6, 6, 9};
auto expected = makeMapVector(outerOffsets, outerKeys, innerMaps, {1, 3, 5});

assertEqualVectors(result, expected);
}

} // namespace
} // namespace facebook::velox
50 changes: 50 additions & 0 deletions velox/expression/tests/RowWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -537,5 +537,55 @@ TEST_F(RowWriterTest, finishPostSize) {
11);
}

// Throws an error if n is even, otherwise creates a row.
template <typename T>
struct ThrowsErrorsFunc {
template <typename TOut>
void call(TOut& out, const int64_t& n) {
out.template get_writer_at<0>() = n;
auto& stringWriter = out.template get_writer_at<1>();
auto& arrayWriter = out.template get_writer_at<2>();
auto& mapWriter = out.template get_writer_at<3>();

for (auto i = 0; i < 3; i++) {
stringWriter.append(std::string(1, 'a' + i + n * 3));
arrayWriter.add_item() = n + i;
auto [keyWriter, valueWriter] = mapWriter.add_item();
keyWriter = n * 10 + i;
valueWriter = n * 100 + i;
}

VELOX_USER_CHECK_EQ(n % 2, 1);
}
};

TEST_F(RowWriterTest, errorHandlingE2E) {
registerFunction<
ThrowsErrorsFunc,
Row<int64_t, Varchar, Array<float>, Map<int32_t, double>>,
int64_t>({"throws_errors"});

auto result = evaluate(
"try(throws_errors(c0))",
makeRowVector({makeFlatVector<int64_t>({1, 2, 3, 4, 5, 6})}));

auto field1 = makeFlatVector<int64_t>({1, 0, 3, 0, 5, 0});
auto field2 = makeFlatVector<StringView>({"def", "", "jkl", "", "pqr", ""});
auto field3 =
makeArrayVector<float>({{1, 2, 3}, {}, {3, 4, 5}, {}, {5, 6, 7}, {}});
auto field4 = makeMapVector<int32_t, double>(
{{{10, 100}, {11, 101}, {12, 102}},
{},
{{30, 300}, {31, 301}, {32, 302}},
{},
{{50, 500}, {51, 501}, {52, 502}},
{}});

auto expected = makeRowVector(
{field1, field2, field3, field4}, [](auto row) { return row % 2 == 1; });

assertEqualVectors(result, expected);
}

} // namespace
} // namespace facebook::velox::exec
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class ArrayWriterBenchmark : public functions::test::FunctionBenchmarkBase {
public:
ArrayWriterBenchmark() : FunctionBenchmarkBase() {
registerFunction<SimpleFunctionResize, Array<int64_t>, int64_t>(
{"simpl_resize"});
{"simple_resize"});
registerFunction<SimpleFunctionPushBack, Array<int64_t>, int64_t>(
{"simple_push_back"});
registerFunction<SimpleGeneralInterface, Array<int64_t>, int64_t>(
Expand Down Expand Up @@ -276,6 +276,7 @@ BENCHMARK_MULTI(std_reference) {

int main(int argc, char** argv) {
folly::Init init{&argc, &argv};
facebook::velox::memory::MemoryManager::initialize({});

facebook::velox::exec::ArrayWriterBenchmark benchmark;
benchmark.test();
Expand Down
2 changes: 1 addition & 1 deletion velox/vector/tests/utils/VectorTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ class VectorTestBase {
if (!vector.has_value()) {
bits::setNull(rawNulls, i, true);
rawSizes[i] = 0;
rawOffsets[i] = 0;
rawOffsets[i] = (i == 0) ? 0 : rawOffsets[i - 1] + rawSizes[i - 1];
} else {
flattenedData.insert(
flattenedData.end(), vector->begin(), vector->end());
Expand Down

0 comments on commit decd91e

Please sign in to comment.