Skip to content

Commit

Permalink
Add near zero-copy Import & Export for Arrow BinaryView/Utf8View form…
Browse files Browse the repository at this point in the history
…ats (facebookincubator#9726)

Summary:
This change adds support for import/export of BinaryView and StringView from/to Arrow significantly boosting the performance of import and export of strings through the Arrow bridge.

Benchmark results show a gain of 4.3x (non-inline strings) - 18x (inline strings) faster for export and 2.2x (non-inline strings) - 7x (inline strings) faster for import of utf8view strings as compared to utf8 strings.

Pull Request resolved: facebookincubator#9726

Reviewed By: xiaoxmeng

Differential Revision: D61394952

Pulled By: kgpai

fbshipit-source-id: 79114b7fac980d82f38389298318bf7976396833
  • Loading branch information
urvishdesai authored and facebook-github-bot committed Aug 20, 2024
1 parent 82bad79 commit 08dd2d4
Show file tree
Hide file tree
Showing 4 changed files with 305 additions and 41 deletions.
203 changes: 196 additions & 7 deletions velox/vector/arrow/Bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,20 @@ static constexpr size_t kMaxBuffers{3};
// carried by ArrowArray.private_data
class VeloxToArrowBridgeHolder {
public:
VeloxToArrowBridgeHolder() {
for (size_t i = 0; i < kMaxBuffers; ++i) {
VeloxToArrowBridgeHolder() = default;

// Call to this method may require a call to `getArrowBuffers()` in order to
// re-acquire the underlying buffer
void resizeBuffers(size_t bufferCount) {
if (bufferCount <= numBuffers_) {
return;
}
buffers_.resize(bufferCount);
bufferPtrs_.resize(bufferCount);
for (size_t i = numBuffers_; i < bufferCount; i++) {
buffers_[i] = nullptr;
}
numBuffers_ = bufferCount;
}

// Acquires a buffer at index `idx`.
Expand All @@ -58,7 +68,7 @@ class VeloxToArrowBridgeHolder {
}

const void** getArrowBuffers() {
return buffers_;
return (const void**)&(buffers_[0]);
}

// Allocates space for `numChildren` ArrowArray pointers.
Expand Down Expand Up @@ -88,12 +98,15 @@ class VeloxToArrowBridgeHolder {
}

private:
// Holds the count of total buffers
size_t numBuffers_ = kMaxBuffers;

// Holds the pointers to the arrow buffers.
const void* buffers_[kMaxBuffers];
std::vector<const void*> buffers_{numBuffers_, nullptr};

// Holds ownership over the Buffers being referenced by the buffers vector
// above.
BufferPtr bufferPtrs_[kMaxBuffers];
std::vector<BufferPtr> bufferPtrs_{numBuffers_};

// Auxiliary buffers to hold ownership over ArrowArray children structures.
std::vector<std::unique_ptr<ArrowArray>> childrenPtrs_;
Expand Down Expand Up @@ -277,8 +290,14 @@ const char* exportArrowFormatStr(
// We always map VARCHAR and VARBINARY to the "small" version (lower case
// format string), which uses 32 bit offsets.
case TypeKind::VARCHAR:
if (options.exportToStringView) {
return "vu";
}
return "u"; // utf-8 string
case TypeKind::VARBINARY:
if (options.exportToStringView) {
return "vz";
}
return "z"; // binary
case TypeKind::UNKNOWN:
return "n"; // NullType
Expand Down Expand Up @@ -528,6 +547,59 @@ VectorPtr createFlatVector(
using WrapInBufferViewFunc =
std::function<BufferPtr(const void* buffer, size_t length)>;

VectorPtr createStringFlatVectorFromUtf8View(
memory::MemoryPool* pool,
const TypePtr& type,
BufferPtr nulls,
const ArrowArray& arrowArray,
WrapInBufferViewFunc wrapInBufferView) {
int64_t num_buffers = arrowArray.n_buffers;
VELOX_USER_CHECK_GE(
num_buffers,
3,
"Expecting three or more buffers as input for string view types.");

// The last C data buffer stores buffer sizes
auto* bufferSizes = (uint64_t*)arrowArray.buffers[num_buffers - 1];
std::vector<BufferPtr> stringViewBuffers(num_buffers - 3);

// Skipping buffer_id = 0 (nulls buffer) and buffer_id = 1 (values buffer)
for (int32_t buffer_id = 2; buffer_id < num_buffers - 1; ++buffer_id) {
stringViewBuffers[buffer_id - 2] = wrapInBufferView(
arrowArray.buffers[buffer_id], bufferSizes[buffer_id - 2]);
}

BufferPtr stringViews =
AlignedBuffer::allocate<StringView>(arrowArray.length, pool);
auto* rawStringViews = stringViews->asMutable<uint64_t>();
auto* rawNulls = nulls->as<uint64_t>();

// Full copy for inline strings (length <= 12). For non-inline strings,
// convert 16-byte Arrow Utf8View [4-byte length, 4-byte prefix, 4-byte
// buffer-index, 4-byte buffer-offset] to 16-byte Velox StringView [4-byte
// length, 4-byte prefix, 8-byte buffer-ptr]
for (int32_t idx_64 = 0; idx_64 < arrowArray.length; ++idx_64) {
auto* view = (uint32_t*)(&((uint64_t*)arrowArray.buffers[1])[2 * idx_64]);
rawStringViews[2 * idx_64] = *(uint64_t*)view;
if (view[0] > 12)
rawStringViews[2 * idx_64 + 1] =
(uint64_t)arrowArray.buffers[2 + view[2]] + view[3];
else
rawStringViews[2 * idx_64 + 1] = *(uint64_t*)&view[2];
}

return std::make_shared<FlatVector<StringView>>(
pool,
type,
nulls,
arrowArray.length,
std::move(stringViews),
std::move(stringViewBuffers),
SimpleVectorStats<StringView>{},
std::nullopt,
optionalNullCount(arrowArray.null_count));
}

template <typename TOffset>
VectorPtr createStringFlatVector(
memory::MemoryPool* pool,
Expand Down Expand Up @@ -648,6 +720,94 @@ void exportValues(
holder.setBuffer(1, values);
}

void exportViews(
const FlatVector<StringView>& vec,
const Selection& rows,
ArrowArray& out,
memory::MemoryPool* pool,
VeloxToArrowBridgeHolder& holder) {
auto stringBuffers = vec.stringBuffers();
size_t numStringBuffers = stringBuffers.size();
// Buffers for nulls, values, variadic_buffer_sizes, and all stringBuffers.
size_t numBuffers = 3 + numStringBuffers;

// Resize and reassign holder buffers.
holder.resizeBuffers(numBuffers);
out.buffers = holder.getArrowBuffers();

// Given the difference b/w structures of the non-inline Arrow Utf8View and
// Velox::StringView as
//
// Velox: [4-byte len, 4-byte prefix, 8-byte pointer]
// Arrow: [4-byte len, 4-byte prefix, 4-byte buffer_idx, 4-byte buffer_offset]
//
// Velox::StringView only has a pointer to the buffer but Arrow requires the
// exact index to the string buffer for that value the offset from the start
// of this buffer. Hence, we must obtain this buffer's index and offset from
// stringBuffers_. This is done in two parts:
// 1. Create a vector to store stringBuffer indices and sort this vector by
// lowest to highest buffer pointer address to prepare for binary search.
// 2. Find the correct string buffer with binary search using the velox
// buffer-pointer as key on the sorted vector of buffer indices.
// We optimize further by caching the previous buffer's address and index
// and only search again if the key pointer does not lie within the cached
// buffer.

// Vector to store stringBuffer indices.
std::vector<int32_t> stringBufferVec(numStringBuffers);

BufferPtr variadicBufferSizes =
AlignedBuffer::allocate<size_t>(numStringBuffers, pool);
auto rawVariadicBufferSizes = variadicBufferSizes->asMutable<uint64_t>();
for (int32_t idx = 0; idx < numStringBuffers; ++idx) {
rawVariadicBufferSizes[idx] = stringBuffers[idx]->size();
holder.setBuffer(2 + idx, stringBuffers[idx]);
// 1a. Insert index into the vector.
stringBufferVec[idx] = idx;
}
holder.setBuffer(numBuffers - 1, variadicBufferSizes);
out.n_buffers = numBuffers;

// 1b. Sorting cache for fast binary search of [4-byte buffer-idx, 4-byte
// buffer-offset] from stringBufferVec with key [8-byte buffer-ptr].
std::stable_sort(
stringBufferVec.begin(),
stringBufferVec.end(),
[&out](const auto& lhs, const auto& rhs) {
return ((uint64_t*)&out.buffers[2])[lhs] <
((uint64_t*)&out.buffers[2])[rhs];
});

auto utf8Views = (uint64_t*)out.buffers[1];
int32_t bufferIdxCache = 0;
uint64_t bufferAddrCache = 0;

rows.apply([&](vector_size_t i) {
auto view = (uint32_t*)&utf8Views[2 * i];
if (!vec.isNullAt(i) && view[0] > 12) {
uint64_t currAddr = *(uint64_t*)&view[2];
// 2. Search for correct index with the buffer-pointer as key. Cache the
// found buffer's address and index in bufferAddrCache and bufferIdxCache
// respectively
if (i == 0 ||
(currAddr - bufferAddrCache) >
rawVariadicBufferSizes[bufferIdxCache]) {
auto it = std::prev(std::upper_bound(
stringBufferVec.begin(),
stringBufferVec.end(),
currAddr,
[&out](const auto& lhs, const auto& rhs) {
return lhs < ((uint64_t*)&out.buffers[2])[rhs];
}));
bufferAddrCache = ((uint64_t*)&out.buffers[2])[*it];
bufferIdxCache = *it;
}
view[2] = bufferIdxCache;
view[3] = currAddr - bufferAddrCache;
}
});
}

void exportStrings(
const FlatVector<StringView>& vec,
const Selection& rows,
Expand Down Expand Up @@ -705,8 +865,21 @@ void exportFlat(
break;
case TypeKind::VARCHAR:
case TypeKind::VARBINARY:
exportStrings(
*vec.asUnchecked<FlatVector<StringView>>(), rows, out, pool, holder);
if (options.exportToStringView) {
exportValues(vec, rows, options, out, pool, holder);
exportViews(
*vec.asUnchecked<FlatVector<StringView>>(),
rows,
out,
pool,
holder);
} else
exportStrings(
*vec.asUnchecked<FlatVector<StringView>>(),
rows,
out,
pool,
holder);
break;
default:
VELOX_NYI(
Expand Down Expand Up @@ -1103,6 +1276,15 @@ TypePtr importFromArrowImpl(
case 'Z':
return VARBINARY();

case 'v':
if (format[1] == 'u') {
return VARCHAR();
}
if (format[1] == 'z') {
return VARBINARY();
}
break;

case 't': // temporal types.
if (format[1] == 's') {
return TIMESTAMP();
Expand Down Expand Up @@ -1734,6 +1916,13 @@ VectorPtr importFromArrowImpl(

// String data types (VARCHAR and VARBINARY).
if (type->isVarchar() || type->isVarbinary()) {
// Import StringView from Utf8View/BinaryView (Zero-copy)
if (arrowSchema.format[0] == 'v') {
return createStringFlatVectorFromUtf8View(
pool, type, nulls, arrowArray, wrapInBufferView);
}

// Import StringView from Utf8/Binary
VELOX_USER_CHECK_EQ(
arrowArray.n_buffers,
3,
Expand Down
2 changes: 2 additions & 0 deletions velox/vector/arrow/Bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ struct ArrowOptions {
bool flattenConstant{false};
TimestampUnit timestampUnit = TimestampUnit::kNano;
std::optional<std::string> timestampTimeZone{std::nullopt};
// Export VARCHAR and VARBINARY to Arrow 15 StringView format
bool exportToStringView = false;
};

namespace facebook::velox {
Expand Down
54 changes: 50 additions & 4 deletions velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1511,15 +1511,18 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest {
}

template <typename F>
void testArrowRoundTrip(const arrow::Array& array, F validateVector) {
void testArrowRoundTrip(
const arrow::Array& array,
F validateVector,
const ArrowOptions& options = ArrowOptions{}) {
VectorPtr vec;
toVeloxVector(array, vec);
validateVector(*vec);

ArrowSchema schema;
ArrowArray data;
velox::exportToArrow(vec, schema, options_);
velox::exportToArrow(vec, data, pool_.get(), options_);
velox::exportToArrow(vec, schema, options);
velox::exportToArrow(vec, data, pool_.get(), options);
ASSERT_OK_AND_ASSIGN(auto arrowType, arrow::ImportType(&schema));
ASSERT_OK_AND_ASSIGN(auto array2, arrow::ImportArray(&data, arrowType));
ASSERT_OK(array2->ValidateFull());
Expand Down Expand Up @@ -1582,6 +1585,42 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest {
});
}

void testImportStringView() {
arrow::StringViewBuilder sb(arrow::default_memory_pool());
ASSERT_OK(sb.Append("hello world", 11));
ASSERT_OK(sb.Append("larger string which should not be inlined...", 44));
ASSERT_OK(sb.AppendNull());
ASSERT_OK(sb.Append("hello", 5));
ASSERT_OK(sb.Append("", 0));
ASSERT_OK(sb.Append("my string", 9));
ASSERT_OK(sb.Append("another slightly longer string", 30));
ASSERT_OK(sb.AppendNull());
ASSERT_OK(sb.Append("a", 1));
ASSERT_OK(sb.Append(
"another even longer string to ensure it's for sure not stored inline!!!",
71));
ASSERT_OK(sb.AppendNull());
ASSERT_OK(sb.AppendNull());
ASSERT_OK_AND_ASSIGN(auto array, sb.Finish());
const void* stringBuffers = array->data()->buffers[2]->data();

testArrowRoundTrip(
*array,
[stringBuffers](const BaseVector& vec) {
// assert zero-copy import to velox for stringBuffers
ASSERT_EQ(
vec.asFlatVector<StringView>()
->stringBuffers()
.data()
->get()
->as<void*>(),
stringBuffers);
ASSERT_EQ(*vec.type(), *VARCHAR());
EXPECT_EQ(vec.size(), 12);
},
ArrowOptions{.exportToStringView = true});
}

void testImportREE() {
testImportREENoRuns();
testImportREESingleRun();
Expand Down Expand Up @@ -1758,7 +1797,6 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest {
EXPECT_NO_THROW(importFromArrow(arrowSchema, arrowArray, pool_.get()));
}

ArrowOptions options_;
std::shared_ptr<memory::MemoryPool> pool_{
memory::memoryManager()->addLeafPool()};
};
Expand Down Expand Up @@ -1791,6 +1829,10 @@ TEST_F(ArrowBridgeArrayImportAsViewerTest, string) {
testImportString();
}

TEST_F(ArrowBridgeArrayImportAsViewerTest, stringview) {
testImportStringView();
}

TEST_F(ArrowBridgeArrayImportAsViewerTest, row) {
testImportRow();
}
Expand Down Expand Up @@ -1844,6 +1886,10 @@ TEST_F(ArrowBridgeArrayImportAsOwnerTest, string) {
testImportString();
}

TEST_F(ArrowBridgeArrayImportAsOwnerTest, stringview) {
testImportStringView();
}

TEST_F(ArrowBridgeArrayImportAsOwnerTest, row) {
testImportRow();
}
Expand Down
Loading

0 comments on commit 08dd2d4

Please sign in to comment.