From d01107b4b440d9d83f3a94805e2a314cd6cadba0 Mon Sep 17 00:00:00 2001 From: rui-mo Date: Fri, 17 Nov 2023 10:44:32 +0800 Subject: [PATCH] Support timestamp units in arrow bridge (7625) --- velox/connectors/hive/HiveConfig.cpp | 5 + velox/connectors/hive/HiveConfig.h | 8 + velox/connectors/hive/HiveDataSink.cpp | 2 + velox/core/QueryConfig.h | 11 + velox/dwio/common/Options.h | 1 + velox/dwio/parquet/writer/Writer.cpp | 6 + velox/dwio/parquet/writer/Writer.h | 4 + velox/exec/ArrowStream.cpp | 4 +- velox/exec/ArrowStream.h | 1 + velox/exec/tests/ArrowStreamTest.cpp | 5 +- velox/vector/arrow/Bridge.cpp | 241 +++++++++++++----- velox/vector/arrow/Bridge.h | 13 +- .../arrow/tests/ArrowBridgeArrayTest.cpp | 194 ++++++++++---- .../arrow/tests/ArrowBridgeSchemaTest.cpp | 26 +- 14 files changed, 394 insertions(+), 127 deletions(-) diff --git a/velox/connectors/hive/HiveConfig.cpp b/velox/connectors/hive/HiveConfig.cpp index 604a352d0f6f..4da536f5b106 100644 --- a/velox/connectors/hive/HiveConfig.cpp +++ b/velox/connectors/hive/HiveConfig.cpp @@ -52,6 +52,11 @@ std::string HiveConfig::insertExistingPartitionsBehaviorString( } } +// static. +uint8_t HiveConfig::arrowBridgeTimestampUnit(const Config* config) { + return config->get(kArrowBridgeTimestampUnit, 9 /* nano */); +} + HiveConfig::InsertExistingPartitionsBehavior HiveConfig::insertExistingPartitionsBehavior(const Config* session) const { if (session->isValueExists(kInsertExistingPartitionsBehaviorSession)) { diff --git a/velox/connectors/hive/HiveConfig.h b/velox/connectors/hive/HiveConfig.h index 39be13d51d47..237d3c0623bb 100644 --- a/velox/connectors/hive/HiveConfig.h +++ b/velox/connectors/hive/HiveConfig.h @@ -136,6 +136,10 @@ class HiveConfig { static constexpr const char* kOrcWriterMaxDictionaryMemorySession = "orc_optimized_writer_max_dictionary_memory"; + // Timestamp unit used during Velox-Arrow conversion. + static constexpr const char* kArrowBridgeTimestampUnit = + "arrow_bridge_timestamp_unit"; + /// Maximum number of rows for sort writer in one batch of output. static constexpr const char* kSortWriterMaxOutputRows = "sort-writer-max-output-rows"; @@ -184,6 +188,10 @@ class HiveConfig { std::string gcsCredentials() const; + /// Returns the timestamp unit used in Velox-Arrow conversion. + /// 0: second, 3: milli, 6: micro, 9: nano. + static uint8_t arrowBridgeTimestampUnit(const Config* config); + bool isOrcUseColumnNames(const Config* session) const; bool isFileColumnNamesReadAsLowerCase(const Config* session) const; diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index 289c8a5897c7..5e30a09f6174 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -603,6 +603,8 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) { if (canReclaim()) { options.spillConfig = spillConfig_; } + options.arrowBridgeTimestampUnit = HiveConfig::arrowBridgeTimestampUnit( + connectorQueryCtx_->sessionProperties()); options.nonReclaimableSection = writerInfo_.back()->nonReclaimableSectionHolder.get(); options.maxStripeSize = std::optional( diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 6e847771ed6b..00c83f068738 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -369,6 +369,10 @@ class QueryConfig { static constexpr const char* kDriverCpuTimeSliceLimitMs = "driver_cpu_time_slice_limit_ms"; + // Timestamp unit used during Velox-Arrow conversion. + static constexpr const char* kArrowBridgeTimestampUnit = + "arrow_bridge_timestamp_unit"; + uint64_t queryMaxMemoryPerNode() const { return toCapacity( get(kQueryMaxMemoryPerNode, "0B"), CapacityUnit::BYTE); @@ -602,6 +606,13 @@ class QueryConfig { return get(kSpillStartPartitionBit, kDefaultStartBit); } + /// Returns the timestamp unit used in Velox-Arrow conversion. + /// 0: second, 3: milli, 6: micro, 9: nano. + uint8_t arrowBridgeTimestampUnit() const { + constexpr uint8_t kDefaultUnit = 9; + return get(kArrowBridgeTimestampUnit, kDefaultUnit); + } + /// Returns the number of bits used to calculate the spilling partition /// number for hash join. The number of spilling partitions will be power of /// two. diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index 619f8ea16648..9c7ed53bb5da 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -564,6 +564,7 @@ struct WriterOptions { std::optional maxStripeSize{std::nullopt}; std::optional maxDictionaryMemory{std::nullopt}; std::map serdeParameters; + std::optional arrowBridgeTimestampUnit; }; } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/parquet/writer/Writer.cpp b/velox/dwio/parquet/writer/Writer.cpp index cda965fb4a84..5ef5785cb13f 100644 --- a/velox/dwio/parquet/writer/Writer.cpp +++ b/velox/dwio/parquet/writer/Writer.cpp @@ -227,6 +227,8 @@ Writer::Writer( } else { flushPolicy_ = std::make_unique(); } + options_.timestampUnit = + static_cast(options.arrowBridgeTimestampUnit); arrowContext_->properties = getArrowParquetWriterOptions(options, flushPolicy_); } @@ -378,6 +380,10 @@ parquet::WriterOptions getParquetOptions( if (options.compressionKind.has_value()) { parquetOptions.compression = options.compressionKind.value(); } + if (options.arrowBridgeTimestampUnit.has_value()) { + parquetOptions.arrowBridgeTimestampUnit = + options.arrowBridgeTimestampUnit.value(); + } return parquetOptions; } diff --git a/velox/dwio/parquet/writer/Writer.h b/velox/dwio/parquet/writer/Writer.h index 0be94ea3881b..89bb76385254 100644 --- a/velox/dwio/parquet/writer/Writer.h +++ b/velox/dwio/parquet/writer/Writer.h @@ -25,6 +25,7 @@ #include "velox/dwio/common/WriterFactory.h" #include "velox/dwio/parquet/writer/arrow/util/Compression.h" #include "velox/vector/ComplexVector.h" +#include "velox/vector/arrow/Bridge.h" namespace facebook::velox::parquet { @@ -98,6 +99,7 @@ struct WriterOptions { // policy with the configs in its ctor. std::function()> flushPolicyFactory; std::shared_ptr codecOptions; + uint8_t arrowBridgeTimestampUnit = static_cast(TimestampUnit::kNano); }; // Writes Velox vectors into a DataSink using Arrow Parquet writer. @@ -151,6 +153,8 @@ class Writer : public dwio::common::Writer { std::unique_ptr flushPolicy_; const RowTypePtr schema_; + + ArrowOptions options_{.flattenDictionary = true, .flattenConstant = true}; }; class ParquetWriterFactory : public dwio::common::WriterFactory { diff --git a/velox/exec/ArrowStream.cpp b/velox/exec/ArrowStream.cpp index 863e43f8ba22..d90734e842e4 100644 --- a/velox/exec/ArrowStream.cpp +++ b/velox/exec/ArrowStream.cpp @@ -27,6 +27,8 @@ ArrowStream::ArrowStream( operatorId, arrowStreamNode->id(), "ArrowStream") { + options_.timestampUnit = static_cast( + driverCtx->queryConfig().arrowBridgeTimestampUnit()); arrowStream_ = arrowStreamNode->arrowStream(); } @@ -66,7 +68,7 @@ RowVectorPtr ArrowStream::getOutput() { // Convert Arrow Array into RowVector and return. return std::dynamic_pointer_cast( - importFromArrowAsOwner(arrowSchema, arrowArray, pool())); + importFromArrowAsOwner(arrowSchema, arrowArray, options_, pool())); } bool ArrowStream::isFinished() { diff --git a/velox/exec/ArrowStream.h b/velox/exec/ArrowStream.h index c35894d0d283..34225f5f44c5 100644 --- a/velox/exec/ArrowStream.h +++ b/velox/exec/ArrowStream.h @@ -45,6 +45,7 @@ class ArrowStream : public SourceOperator { bool finished_ = false; std::shared_ptr arrowStream_; + ArrowOptions options_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/tests/ArrowStreamTest.cpp b/velox/exec/tests/ArrowStreamTest.cpp index f0fe9b37e04e..1b450e7200a3 100644 --- a/velox/exec/tests/ArrowStreamTest.cpp +++ b/velox/exec/tests/ArrowStreamTest.cpp @@ -45,7 +45,7 @@ class ArrowStreamTest : public OperatorTestBase { int getNext(struct ArrowArray* outArray) { if (vectorIndex_ < vectors_.size()) { - exportToArrow(vectors_[vectorIndex_], *outArray, pool_.get()); + exportToArrow(vectors_[vectorIndex_], *outArray, pool_.get(), options_); vectorIndex_ += 1; } else { // End of stream. Mark the array released. @@ -56,12 +56,13 @@ class ArrowStreamTest : public OperatorTestBase { } int getArrowSchema(ArrowSchema& out) { - exportToArrow(BaseVector::create(type_, 0, pool_.get()), out); + exportToArrow(BaseVector::create(type_, 0, pool_.get()), out, options_); return failGetSchema_ ? (int)ErrorCode::kGetSchemaFailed : (int)ErrorCode::kNoError; } private: + ArrowOptions options_; const std::shared_ptr pool_; const std::vector& vectors_; const TypePtr type_; diff --git a/velox/vector/arrow/Bridge.cpp b/velox/vector/arrow/Bridge.cpp index 180bc720de95..3d36aa3f8b5c 100644 --- a/velox/vector/arrow/Bridge.cpp +++ b/velox/vector/arrow/Bridge.cpp @@ -215,6 +215,7 @@ static void releaseArrowSchema(ArrowSchema* arrowSchema) { // Returns the Arrow C data interface format type for a given Velox type. const char* exportArrowFormatStr( const TypePtr& type, + const ArrowOptions& options, std::string& formatBuffer) { if (type->isDecimal()) { // Decimal types encode the precision, scale values. @@ -248,9 +249,20 @@ const char* exportArrowFormatStr( return "u"; // utf-8 string case TypeKind::VARBINARY: return "z"; // binary - case TypeKind::TIMESTAMP: - return "ttn"; // time64 [nanoseconds] + switch (options.timestampUnit) { + case TimestampUnit::kSecond: + return "tss:"; + case TimestampUnit::kMilli: + return "tsm:"; + case TimestampUnit::kMicro: + return "tsu:"; + case TimestampUnit::kNano: + return "tsn:"; + default: + VELOX_USER_FAIL(fmt::format( + "Unsupported timestamp unit: {}.", options.timestampUnit)); + } // Complex/nested types. case TypeKind::ARRAY: static_assert(sizeof(vector_size_t) == 4); @@ -333,6 +345,7 @@ void gatherFromBuffer( const Type& type, const Buffer& buf, const Selection& rows, + const ArrowOptions& options, Buffer& out) { auto src = buf.as(); auto dst = out.asMutable(); @@ -350,8 +363,24 @@ void gatherFromBuffer( } else if (type.isTimestamp()) { auto srcTs = buf.as(); auto dstTs = out.asMutable(); - - rows.apply([&](vector_size_t i) { dstTs[j++] = srcTs[i].toNanos(); }); + switch (options.timestampUnit) { + case TimestampUnit::kSecond: + rows.apply( + [&](vector_size_t i) { dstTs[j++] = srcTs[i].getSeconds(); }); + break; + case TimestampUnit::kMilli: + rows.apply([&](vector_size_t i) { dstTs[j++] = srcTs[i].toMillis(); }); + break; + case TimestampUnit::kMicro: + rows.apply([&](vector_size_t i) { dstTs[j++] = srcTs[i].toMicros(); }); + break; + case TimestampUnit::kNano: + rows.apply([&](vector_size_t i) { dstTs[j++] = srcTs[i].toNanos(); }); + break; + default: + VELOX_USER_FAIL(fmt::format( + "Unsupported timestamp unit: {}.", options.timestampUnit)); + } } else { auto typeSize = type.cppSizeInBytes(); rows.apply([&](vector_size_t i) { @@ -381,9 +410,9 @@ struct BufferViewReleaser { const std::shared_ptr arrayReleaser_; }; -// Wraps a naked pointer using a Velox buffer view, without copying it. Adding a -// dummy releaser as the buffer lifetime is fully controled by the client of the -// API. +// Wraps a naked pointer using a Velox buffer view, without copying it. Adding +// a dummy releaser as the buffer lifetime is fully controled by the client of +// the API. BufferPtr wrapInBufferViewAsViewer(const void* buffer, size_t length) { static const BufferViewReleaser kViewerReleaser; return BufferView::create( @@ -470,11 +499,12 @@ VectorPtr createStringFlatVector( optionalNullCount(nullCount)); } -// This functions does two things: (a) sets the value of null_count, and (b) the -// validity buffer (if there is at least one null row). +// This functions does two things: (a) sets the value of null_count, and (b) +// the validity buffer (if there is at least one null row). void exportValidityBitmap( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -488,7 +518,7 @@ void exportValidityBitmap( // If we're only exporting a subset, create a new validity buffer. if (rows.changed()) { nulls = AlignedBuffer::allocate(out.length, pool); - gatherFromBuffer(*BOOLEAN(), *vec.nulls(), rows, *nulls); + gatherFromBuffer(*BOOLEAN(), *vec.nulls(), rows, options, *nulls); } // Set null counts. @@ -504,8 +534,8 @@ void exportValidityBitmap( } bool isFlatScalarZeroCopy(const TypePtr& type) { - // - Short decimals need to be converted to 128 bit values as they are mapped - // to Arrow Decimal128. + // - Short decimals need to be converted to 128 bit values as they are + // mapped to Arrow Decimal128. // - Velox's Timestamp representation (2x 64bit values) does not have an // equivalent in Arrow. return !type->isShortDecimal() && !type->isTimestamp(); @@ -526,6 +556,7 @@ size_t getArrowElementSize(const TypePtr& type) { void exportValues( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -543,7 +574,7 @@ void exportValues( ? AlignedBuffer::allocate(out.length, pool) : AlignedBuffer::allocate( checkedMultiply(out.length, size), pool); - gatherFromBuffer(*type, *vec.values(), rows, *values); + gatherFromBuffer(*type, *vec.values(), rows, options, *values); holder.setBuffer(1, values); } @@ -583,6 +614,7 @@ void exportStrings( void exportFlat( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -598,7 +630,7 @@ void exportFlat( case TypeKind::REAL: case TypeKind::DOUBLE: case TypeKind::TIMESTAMP: - exportValues(vec, rows, out, pool, holder); + exportValues(vec, rows, options, out, pool, holder); break; case TypeKind::VARCHAR: case TypeKind::VARBINARY: @@ -615,17 +647,17 @@ void exportFlat( void exportToArrowImpl( const BaseVector&, const Selection&, + const ArrowOptions& options, ArrowArray&, - memory::MemoryPool*, - const ArrowOptions& options); + memory::MemoryPool*); void exportRows( const RowVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, - VeloxToArrowBridgeHolder& holder, - const ArrowOptions& options) { + VeloxToArrowBridgeHolder& holder) { out.n_buffers = 1; holder.resizeChildren(vec.childrenSize()); out.n_children = vec.childrenSize(); @@ -635,9 +667,9 @@ void exportRows( exportToArrowImpl( *vec.childAt(i)->loadedVector(), rows, + options, *holder.allocateChild(i), - pool, - options); + pool); } catch (const VeloxException&) { for (column_index_t j = 0; j < i; ++j) { // When exception is thrown, i th child is guaranteed unset. @@ -696,19 +728,19 @@ void exportOffsets( void exportArrays( const ArrayVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, - VeloxToArrowBridgeHolder& holder, - const ArrowOptions& options) { + VeloxToArrowBridgeHolder& holder) { Selection childRows(vec.elements()->size()); exportOffsets(vec, rows, out, pool, holder, childRows); holder.resizeChildren(1); exportToArrowImpl( *vec.elements()->loadedVector(), childRows, + options, *holder.allocateChild(0), - pool, - options); + pool); out.n_children = 1; out.children = holder.getChildrenArrays(); } @@ -716,10 +748,10 @@ void exportArrays( void exportMaps( const MapVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, - VeloxToArrowBridgeHolder& holder, - const ArrowOptions& options) { + VeloxToArrowBridgeHolder& holder) { RowVector child( pool, ROW({"key", "value"}, {vec.mapKeys()->type(), vec.mapValues()->type()}), @@ -729,7 +761,7 @@ void exportMaps( Selection childRows(child.size()); exportOffsets(vec, rows, out, pool, holder, childRows); holder.resizeChildren(1); - exportToArrowImpl(child, childRows, *holder.allocateChild(0), pool, options); + exportToArrowImpl(child, childRows, options, *holder.allocateChild(0), pool); out.n_children = 1; out.children = holder.getChildrenArrays(); } @@ -738,6 +770,7 @@ template void flattenAndExport( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -755,19 +788,20 @@ void flattenAndExport( flatVector->set(row, decoded.valueAt(row)); } }); - exportValidityBitmap(*flatVector, rows, out, pool, holder); - exportFlat(*flatVector, rows, out, pool, holder); + exportValidityBitmap(*flatVector, rows, options, out, pool, holder); + exportFlat(*flatVector, rows, options, out, pool, holder); } else { allRows.applyToSelected([&](vector_size_t row) { flatVector->set(row, decoded.valueAt(row)); }); - exportFlat(*flatVector, rows, out, pool, holder); + exportFlat(*flatVector, rows, options, out, pool, holder); } } void exportDictionary( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -775,7 +809,7 @@ void exportDictionary( out.n_children = 0; if (rows.changed()) { auto indices = AlignedBuffer::allocate(out.length, pool); - gatherFromBuffer(*INTEGER(), *vec.wrapInfo(), rows, *indices); + gatherFromBuffer(*INTEGER(), *vec.wrapInfo(), rows, options, *indices); holder.setBuffer(1, indices); } else { holder.setBuffer(1, vec.wrapInfo()); @@ -783,12 +817,13 @@ void exportDictionary( auto& values = *vec.valueVector()->loadedVector(); out.dictionary = holder.allocateDictionary(); exportToArrowImpl( - values, Selection(values.size()), *out.dictionary, pool, ArrowOptions()); + values, Selection(values.size()), options, *out.dictionary, pool); } void exportFlattenedVector( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -797,7 +832,7 @@ void exportFlattenedVector( "An unsupported nested encoding was found."); VELOX_CHECK(vec.isScalar(), "Flattening is only supported for scalar types."); VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH( - flattenAndExport, vec.typeKind(), vec, rows, out, pool, holder); + flattenAndExport, vec.typeKind(), vec, rows, options, out, pool, holder); } // Set the array as using "Null Layout" - no buffers are allocated. @@ -815,6 +850,7 @@ void setNullArray(ArrowArray& array, size_t length) { void exportConstantValue( const BaseVector& vec, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool) { VectorPtr valuesVector; @@ -844,7 +880,7 @@ void exportConstantValue( wrapInBufferViewAsViewer(vec.valuesAsVoid(), bufferSize), vec.mayHaveNulls() ? 1 : 0); } - exportToArrowImpl(*valuesVector, selection, out, pool, ArrowOptions()); + exportToArrowImpl(*valuesVector, selection, options, out, pool); } // Velox constant vectors are exported as Arrow REE containing a single run @@ -852,6 +888,7 @@ void exportConstantValue( void exportConstant( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -863,7 +900,7 @@ void exportConstant( out.n_children = 2; holder.resizeChildren(2); out.children = holder.getChildrenArrays(); - exportConstantValue(vec, *holder.allocateChild(1), pool); + exportConstantValue(vec, options, *holder.allocateChild(1), pool); // Create the run ends child. auto* runEnds = holder.allocateChild(0); @@ -890,41 +927,41 @@ void exportConstant( void exportToArrowImpl( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, - memory::MemoryPool* pool, - const ArrowOptions& options) { + memory::MemoryPool* pool) { auto holder = std::make_unique(); out.buffers = holder->getArrowBuffers(); out.length = rows.count(); out.offset = 0; out.dictionary = nullptr; - exportValidityBitmap(vec, rows, out, pool, *holder); + exportValidityBitmap(vec, rows, options, out, pool, *holder); switch (vec.encoding()) { case VectorEncoding::Simple::FLAT: - exportFlat(vec, rows, out, pool, *holder); + exportFlat(vec, rows, options, out, pool, *holder); break; case VectorEncoding::Simple::ROW: exportRows( - *vec.asUnchecked(), rows, out, pool, *holder, options); + *vec.asUnchecked(), rows, options, out, pool, *holder); break; case VectorEncoding::Simple::ARRAY: exportArrays( - *vec.asUnchecked(), rows, out, pool, *holder, options); + *vec.asUnchecked(), rows, options, out, pool, *holder); break; case VectorEncoding::Simple::MAP: exportMaps( - *vec.asUnchecked(), rows, out, pool, *holder, options); + *vec.asUnchecked(), rows, options, out, pool, *holder); break; case VectorEncoding::Simple::DICTIONARY: options.flattenDictionary - ? exportFlattenedVector(vec, rows, out, pool, *holder) - : exportDictionary(vec, rows, out, pool, *holder); + ? exportFlattenedVector(vec, rows, options, out, pool, *holder) + : exportDictionary(vec, rows, options, out, pool, *holder); break; case VectorEncoding::Simple::CONSTANT: options.flattenConstant - ? exportFlattenedVector(vec, rows, out, pool, *holder) - : exportConstant(vec, rows, out, pool, *holder); + ? exportFlattenedVector(vec, rows, options, out, pool, *holder) + : exportConstant(vec, rows, options, out, pool, *holder); break; default: VELOX_NYI("{} cannot be exported to Arrow yet.", vec.encoding()); @@ -965,8 +1002,7 @@ TypePtr importFromArrowImpl( return VARBINARY(); case 't': // temporal types. - // Mapping it to ttn for now. - if (format[1] == 't' && format[2] == 'n') { + if (format[1] == 's') { return TIMESTAMP(); } if (format[1] == 'd' && format[2] == 'D') { @@ -1051,7 +1087,7 @@ void exportToArrow( memory::MemoryPool* pool, const ArrowOptions& options) { exportToArrowImpl( - *vector, Selection(vector->size()), arrowArray, pool, options); + *vector, Selection(vector->size()), options, arrowArray, pool); } void exportToArrow( @@ -1078,12 +1114,12 @@ void exportToArrow( // Dictionary data is flattened. Set the underlying data types. arrowSchema.dictionary = nullptr; arrowSchema.format = - exportArrowFormatStr(type, bridgeHolder->formatBuffer); + exportArrowFormatStr(type, options, bridgeHolder->formatBuffer); } else { arrowSchema.format = "i"; bridgeHolder->dictionary = std::make_unique(); arrowSchema.dictionary = bridgeHolder->dictionary.get(); - exportToArrow(vec->valueVector(), *arrowSchema.dictionary); + exportToArrow(vec->valueVector(), *arrowSchema.dictionary, options); } } else if ( vec->encoding() == VectorEncoding::Simple::CONSTANT && @@ -1102,7 +1138,7 @@ void exportToArrow( exportToArrow(valueVector, *valuesChild, options); } else { valuesChild->format = - exportArrowFormatStr(type, bridgeHolder->formatBuffer); + exportArrowFormatStr(type, options, bridgeHolder->formatBuffer); } valuesChild->name = "values"; @@ -1110,7 +1146,8 @@ void exportToArrow( 0, newArrowSchema("i", "run_ends"), arrowSchema); bridgeHolder->setChildAtIndex(1, std::move(valuesChild), arrowSchema); } else { - arrowSchema.format = exportArrowFormatStr(type, bridgeHolder->formatBuffer); + arrowSchema.format = + exportArrowFormatStr(type, options, bridgeHolder->formatBuffer); arrowSchema.dictionary = nullptr; if (type->kind() == TypeKind::MAP) { @@ -1205,12 +1242,14 @@ TypePtr importFromArrow(const ArrowSchema& arrowSchema) { namespace { VectorPtr importFromArrowImpl( + const ArrowOptions& options, ArrowSchema& arrowSchema, ArrowArray& arrowArray, memory::MemoryPool* pool, bool isViewer); RowVectorPtr createRowVector( + const ArrowOptions& options, memory::MemoryPool* pool, const RowTypePtr& rowType, BufferPtr nulls, @@ -1225,7 +1264,11 @@ RowVectorPtr createRowVector( for (size_t i = 0; i < arrowArray.n_children; ++i) { childrenVector.push_back(importFromArrowImpl( - *arrowSchema.children[i], *arrowArray.children[i], pool, isViewer)); + options, + *arrowSchema.children[i], + *arrowArray.children[i], + pool, + isViewer)); } return std::make_shared( pool, @@ -1250,6 +1293,7 @@ BufferPtr computeSizes( } ArrayVectorPtr createArrayVector( + const ArrowOptions& options, memory::MemoryPool* pool, const TypePtr& type, BufferPtr nulls, @@ -1265,7 +1309,11 @@ ArrayVectorPtr createArrayVector( auto sizes = computeSizes(offsets->as(), arrowArray.length, pool); auto elements = importFromArrowImpl( - *arrowSchema.children[0], *arrowArray.children[0], pool, isViewer); + options, + *arrowSchema.children[0], + *arrowArray.children[0], + pool, + isViewer); return std::make_shared( pool, type, @@ -1278,6 +1326,7 @@ ArrayVectorPtr createArrayVector( } MapVectorPtr createMapVector( + const ArrowOptions& options, memory::MemoryPool* pool, const TypePtr& type, BufferPtr nulls, @@ -1293,7 +1342,11 @@ MapVectorPtr createMapVector( computeSizes(offsets->as(), arrowArray.length, pool); // Arrow wraps keys and values into a struct. auto entries = importFromArrowImpl( - *arrowSchema.children[0], *arrowArray.children[0], pool, isViewer); + options, + *arrowSchema.children[0], + *arrowArray.children[0], + pool, + isViewer); VELOX_CHECK(entries->type()->isRow()); const auto& rows = *entries->asUnchecked(); VELOX_CHECK_EQ(rows.childrenSize(), 2); @@ -1310,6 +1363,7 @@ MapVectorPtr createMapVector( } VectorPtr createDictionaryVector( + const ArrowOptions& options, memory::MemoryPool* pool, const TypePtr& indexType, BufferPtr nulls, @@ -1328,7 +1382,7 @@ VectorPtr createDictionaryVector( arrowArray.buffers[1], arrowArray.length * sizeof(vector_size_t)); auto type = importFromArrow(*arrowSchema.dictionary); auto wrapped = importFromArrowImpl( - *arrowSchema.dictionary, *arrowArray.dictionary, pool, isViewer); + options, *arrowSchema.dictionary, *arrowArray.dictionary, pool, isViewer); return BaseVector::wrapInDictionary( std::move(nulls), std::move(indices), @@ -1337,6 +1391,7 @@ VectorPtr createDictionaryVector( } VectorPtr createTimestampVector( + const ArrowOptions& options, memory::MemoryPool* pool, const TypePtr& type, BufferPtr nulls, @@ -1348,10 +1403,38 @@ VectorPtr createTimestampVector( const auto* rawNulls = nulls->as(); if (length > nullCount) { - for (size_t i = 0; i < length; ++i) { - if (!bits::isBitNull(rawNulls, i)) { - rawTimestamps[i] = Timestamp::fromNanos(input[i]); - } + switch (options.timestampUnit) { + case TimestampUnit::kSecond: + for (size_t i = 0; i < length; ++i) { + if (!bits::isBitNull(rawNulls, i)) { + rawTimestamps[i] = Timestamp(input[i], 0); + } + } + break; + case TimestampUnit::kMilli: + for (size_t i = 0; i < length; ++i) { + if (!bits::isBitNull(rawNulls, i)) { + rawTimestamps[i] = Timestamp::fromMillis(input[i]); + } + } + break; + case TimestampUnit::kMicro: + for (size_t i = 0; i < length; ++i) { + if (!bits::isBitNull(rawNulls, i)) { + rawTimestamps[i] = Timestamp::fromMicros(input[i]); + } + } + break; + case TimestampUnit::kNano: + for (size_t i = 0; i < length; ++i) { + if (!bits::isBitNull(rawNulls, i)) { + rawTimestamps[i] = Timestamp::fromNanos(input[i]); + } + } + break; + default: + VELOX_USER_FAIL(fmt::format( + "Unsupported timestamp unit: {}.", options.timestampUnit)); } } return std::make_shared>( @@ -1367,6 +1450,7 @@ VectorPtr createTimestampVector( } VectorPtr importFromArrowImpl( + const ArrowOptions& options, ArrowSchema& arrowSchema, ArrowArray& arrowArray, memory::MemoryPool* pool, @@ -1404,6 +1488,7 @@ VectorPtr importFromArrowImpl( if (arrowSchema.dictionary) { auto indexType = importFromArrowImpl(arrowSchema.format, arrowSchema); return createDictionaryVector( + options, pool, indexType, nulls, @@ -1430,6 +1515,7 @@ VectorPtr importFromArrowImpl( wrapInBufferView); } else if (type->isTimestamp()) { return createTimestampVector( + options, pool, type, nulls, @@ -1439,6 +1525,7 @@ VectorPtr importFromArrowImpl( } else if (type->isRow()) { // Row/structs. return createRowVector( + options, pool, std::dynamic_pointer_cast(type), nulls, @@ -1447,10 +1534,24 @@ VectorPtr importFromArrowImpl( isViewer); } else if (type->isArray()) { return createArrayVector( - pool, type, nulls, arrowSchema, arrowArray, isViewer, wrapInBufferView); + options, + pool, + type, + nulls, + arrowSchema, + arrowArray, + isViewer, + wrapInBufferView); } else if (type->isMap()) { return createMapVector( - pool, type, nulls, arrowSchema, arrowArray, isViewer, wrapInBufferView); + options, + pool, + type, + nulls, + arrowSchema, + arrowArray, + isViewer, + wrapInBufferView); } else if (type->isPrimitiveType()) { // Other primitive types. @@ -1478,13 +1579,19 @@ VectorPtr importFromArrowImpl( } VectorPtr importFromArrowImpl( + const ArrowOptions& options, ArrowSchema& arrowSchema, ArrowArray& arrowArray, memory::MemoryPool* pool, bool isViewer) { if (isViewer) { return importFromArrowImpl( - arrowSchema, arrowArray, pool, isViewer, wrapInBufferViewAsViewer); + options, + arrowSchema, + arrowArray, + pool, + isViewer, + wrapInBufferViewAsViewer); } // This Vector will take over the ownership of `arrowSchema` and `arrowArray` @@ -1511,6 +1618,7 @@ VectorPtr importFromArrowImpl( } }); VectorPtr imported = importFromArrowImpl( + options, arrowSchema, arrowArray, pool, @@ -1531,8 +1639,10 @@ VectorPtr importFromArrowImpl( VectorPtr importFromArrowAsViewer( const ArrowSchema& arrowSchema, const ArrowArray& arrowArray, + const ArrowOptions& options, memory::MemoryPool* pool) { return importFromArrowImpl( + options, const_cast(arrowSchema), const_cast(arrowArray), pool, @@ -1542,8 +1652,9 @@ VectorPtr importFromArrowAsViewer( VectorPtr importFromArrowAsOwner( ArrowSchema& arrowSchema, ArrowArray& arrowArray, + const ArrowOptions& options, memory::MemoryPool* pool) { - return importFromArrowImpl(arrowSchema, arrowArray, pool, false); + return importFromArrowImpl(options, arrowSchema, arrowArray, pool, false); } } // namespace facebook::velox diff --git a/velox/vector/arrow/Bridge.h b/velox/vector/arrow/Bridge.h index ba30068f8957..d50fae740cb2 100644 --- a/velox/vector/arrow/Bridge.h +++ b/velox/vector/arrow/Bridge.h @@ -25,13 +25,15 @@ struct ArrowArray; struct ArrowSchema; +enum class TimestampUnit { kSecond = 0, kMilli = 3, kMicro = 6, kNano = 9 }; + struct ArrowOptions { bool flattenDictionary{false}; bool flattenConstant{false}; + TimestampUnit timestampUnit = TimestampUnit::kNano; }; namespace facebook::velox { - /// Export a generic Velox Vector to an ArrowArray, as defined by Arrow's C data /// interface: /// @@ -63,7 +65,7 @@ void exportToArrow( const VectorPtr& vector, ArrowArray& arrowArray, memory::MemoryPool* pool, - const ArrowOptions& options = ArrowOptions{}); + const ArrowOptions& options); /// Export the type of a Velox vector to an ArrowSchema. /// @@ -84,10 +86,7 @@ void exportToArrow( /// /// NOTE: Since Arrow couples type and encoding, we need both Velox type and /// actual data (containing encoding) to create an ArrowSchema. -void exportToArrow( - const VectorPtr&, - ArrowSchema&, - const ArrowOptions& = ArrowOptions{}); +void exportToArrow(const VectorPtr&, ArrowSchema&, const ArrowOptions&); /// Import an ArrowSchema into a Velox Type object. /// @@ -135,6 +134,7 @@ TypePtr importFromArrow(const ArrowSchema& arrowSchema); VectorPtr importFromArrowAsViewer( const ArrowSchema& arrowSchema, const ArrowArray& arrowArray, + const ArrowOptions& options, memory::MemoryPool* pool); /// Import an ArrowArray and ArrowSchema into a Velox vector, acquiring @@ -152,6 +152,7 @@ VectorPtr importFromArrowAsViewer( VectorPtr importFromArrowAsOwner( ArrowSchema& arrowSchema, ArrowArray& arrowArray, + const ArrowOptions& options, memory::MemoryPool* pool); } // namespace facebook::velox diff --git a/velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp b/velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp index 2cecd0740194..7a5573e93066 100644 --- a/velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp +++ b/velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp @@ -54,7 +54,7 @@ class ArrowBridgeArrayExportTest : public testing::Test { const TypePtr& type = CppToType::create()) { auto flatVector = vectorMaker_.flatVectorNullable(inputData, type); ArrowArray arrowArray; - velox::exportToArrow(flatVector, arrowArray, pool_.get()); + velox::exportToArrow(flatVector, arrowArray, pool_.get(), options_); validateArray(inputData, arrowArray); @@ -67,7 +67,7 @@ class ArrowBridgeArrayExportTest : public testing::Test { void testArrayVector(const T& inputData) { auto arrayVector = vectorMaker_.arrayVectorNullable(inputData); ArrowArray arrowArray; - velox::exportToArrow(arrayVector, arrowArray, pool_.get()); + velox::exportToArrow(arrayVector, arrowArray, pool_.get(), options_); validateListArray(inputData, arrowArray); @@ -93,7 +93,7 @@ class ArrowBridgeArrayExportTest : public testing::Test { const VectorPtr& constantVector, const TInput& input) { ArrowArray arrowArray; - velox::exportToArrow(constantVector, arrowArray, pool_.get()); + velox::exportToArrow(constantVector, arrowArray, pool_.get(), options_); validateConstant( input, constantVector->size(), @@ -161,8 +161,33 @@ class ArrowBridgeArrayExportTest : public testing::Test { bits::isBitSet(reinterpret_cast(values), i)) << "mismatch at index " << i; } else if constexpr (std::is_same_v) { - EXPECT_EQ(inputData[i], Timestamp::fromNanos(values[i])) - << "mismatch at index " << i; + switch (options_.timestampUnit) { + case TimestampUnit::kSecond: + EXPECT_EQ( + Timestamp(inputData[i].value().getSeconds(), 0), + Timestamp(values[i], 0)) + << "mismatch at index " << i; + break; + case TimestampUnit::kMilli: + EXPECT_EQ( + Timestamp::fromMillis(inputData[i].value().toMillis()), + Timestamp::fromMillis(values[i])) + << "mismatch at index " << i; + break; + case TimestampUnit::kMicro: + EXPECT_EQ( + Timestamp::fromMicros(inputData[i].value().toMicros()), + Timestamp::fromMicros(values[i])) + << "mismatch at index " << i; + break; + case TimestampUnit::kNano: + EXPECT_EQ(inputData[i], Timestamp::fromNanos(values[i])) + << "mismatch at index " << i; + break; + default: + VELOX_USER_FAIL(fmt::format( + "Timestamp unit not supported: {}.", options_.timestampUnit)); + } } else { EXPECT_EQ(inputData[i], values[i]) << "mismatch at index " << i; } @@ -353,10 +378,12 @@ class ArrowBridgeArrayExportTest : public testing::Test { } void exportToArrow(const TypePtr& type, ArrowSchema& out) { - velox::exportToArrow(BaseVector::create(type, 0, pool_.get()), out); + velox::exportToArrow( + BaseVector::create(type, 0, pool_.get()), out, options_); } // Boiler plate structures required by vectorMaker. + ArrowOptions options_; std::shared_ptr queryCtx_{std::make_shared()}; std::shared_ptr pool_{ memory::memoryManager()->addLeafPool()}; @@ -371,7 +398,7 @@ TEST_F(ArrowBridgeArrayExportTest, flatNotNull) { // Make sure that ArrowArray is correctly acquiring ownership, even after // the initial vector shared_ptr is gone. auto flatVector = vectorMaker_.flatVector(inputData); - velox::exportToArrow(flatVector, arrowArray, pool_.get()); + velox::exportToArrow(flatVector, arrowArray, pool_.get(), options_); } EXPECT_EQ(inputData.size(), arrowArray.length); @@ -494,16 +521,23 @@ TEST_F(ArrowBridgeArrayExportTest, flatDate) { } TEST_F(ArrowBridgeArrayExportTest, flatTimestamp) { - testFlatVector( - { - Timestamp(0, 0), - std::nullopt, - Timestamp(1699300965, 12'349), - Timestamp(-2208960000, 0), // 1900-01-01 - Timestamp(3155788800, 999'999'999), - std::nullopt, - }, - TIMESTAMP()); + for (uint8_t unit : + {(uint8_t)TimestampUnit::kSecond, + (uint8_t)TimestampUnit::kMilli, + (uint8_t)TimestampUnit::kMicro, + (uint8_t)TimestampUnit::kNano}) { + options_.timestampUnit = static_cast(unit); + testFlatVector( + { + Timestamp(0, 0), + std::nullopt, + Timestamp(1699300965, 12'349), + Timestamp(-2208960000, 0), // 1900-01-01 + Timestamp(3155788800, 999'999'999), + std::nullopt, + }, + TIMESTAMP()); + } // Out of range. If nanosecond precision is represented in Arrow, timestamps // starting around 2263-01-01 should overflow and throw a user exception. @@ -542,7 +576,7 @@ TEST_F(ArrowBridgeArrayExportTest, rowVector) { }); ArrowArray arrowArray; - velox::exportToArrow(vector, arrowArray, pool_.get()); + velox::exportToArrow(vector, arrowArray, pool_.get(), options_); EXPECT_EQ(col1.size(), arrowArray.length); EXPECT_EQ(0, arrowArray.null_count); @@ -579,7 +613,7 @@ TEST_F(ArrowBridgeArrayExportTest, rowVectorNullable) { vector->setNullCount(3); ArrowArray arrowArray; - velox::exportToArrow(vector, arrowArray, pool_.get()); + velox::exportToArrow(vector, arrowArray, pool_.get(), options_); EXPECT_EQ(col1.size(), arrowArray.length); EXPECT_EQ(3, arrowArray.null_count); @@ -607,7 +641,8 @@ TEST_F(ArrowBridgeArrayExportTest, rowVectorNullable) { TEST_F(ArrowBridgeArrayExportTest, rowVectorEmpty) { ArrowArray arrowArray; - velox::exportToArrow(vectorMaker_.rowVector({}), arrowArray, pool_.get()); + velox::exportToArrow( + vectorMaker_.rowVector({}), arrowArray, pool_.get(), options_); EXPECT_EQ(0, arrowArray.n_children); EXPECT_EQ(1, arrowArray.n_buffers); EXPECT_EQ(nullptr, arrowArray.children); @@ -617,11 +652,12 @@ TEST_F(ArrowBridgeArrayExportTest, rowVectorEmpty) { std::shared_ptr toArrow( const VectorPtr& vec, + const ArrowOptions& options, memory::MemoryPool* pool) { ArrowSchema schema; ArrowArray array; - exportToArrow(vec, schema); - exportToArrow(vec, array, pool); + exportToArrow(vec, schema, options); + exportToArrow(vec, array, pool, options); EXPECT_OK_AND_ASSIGN(auto type, arrow::ImportType(&schema)); EXPECT_OK_AND_ASSIGN(auto ans, arrow::ImportArray(&array, type)); return ans; @@ -655,7 +691,7 @@ TEST_F(ArrowBridgeArrayExportTest, arraySimple) { TEST_F(ArrowBridgeArrayExportTest, arrayCrossValidate) { auto vec = vectorMaker_.arrayVector({{1, 2, 3}, {4, 5}}); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ(*array->type(), *arrow::list(arrow::int64())); @@ -682,10 +718,10 @@ TEST_F(ArrowBridgeArrayExportTest, arrayDictionary) { ArrowSchema schema; ArrowArray data; - velox::exportToArrow(vec, schema); - velox::exportToArrow(vec, data, vec->pool()); + velox::exportToArrow(vec, schema, options_); + velox::exportToArrow(vec, data, vec->pool(), options_); - auto result = importFromArrowAsViewer(schema, data, vec->pool()); + auto result = importFromArrowAsViewer(schema, data, options_, vec->pool()); test::assertEqualVectors(result, vec); schema.release(&schema); data.release(&data); @@ -699,7 +735,7 @@ TEST_F(ArrowBridgeArrayExportTest, arrayGap) { auto sizes = makeBuffer({2, 2}); auto vec = std::make_shared( pool_.get(), ARRAY(BIGINT()), nullptr, 2, offsets, sizes, elements); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ(*array->type(), *arrow::list(arrow::int64())); @@ -724,7 +760,7 @@ TEST_F(ArrowBridgeArrayExportTest, arrayReorder) { auto sizes = makeBuffer({2, 2}); auto vec = std::make_shared( pool_.get(), ARRAY(BIGINT()), nullptr, 2, offsets, sizes, elements); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ(*array->type(), *arrow::list(arrow::int64())); @@ -754,7 +790,7 @@ TEST_F(ArrowBridgeArrayExportTest, arrayNested) { std::make_shared( pool_.get(), ARRAY(inner->type()), nullptr, 2, offsets, sizes, inner); }); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); ASSERT_EQ(*array->type(), *arrow::list(arrow::list(arrow::int64()))); auto& listArray = static_cast(*array); @@ -771,7 +807,7 @@ TEST_F(ArrowBridgeArrayExportTest, mapSimple) { auto allOnes = [](vector_size_t) { return 1; }; auto vec = vectorMaker_.mapVector(2, allOnes, allOnes, allOnes); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ(*array->type(), *arrow::map(arrow::int64(), arrow::int64())); @@ -806,7 +842,7 @@ TEST_F(ArrowBridgeArrayExportTest, mapNested) { std::make_shared( pool_.get(), type, nullptr, 2, offsets, sizes, keys, inner); }); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ( @@ -838,7 +874,7 @@ TEST_F(ArrowBridgeArrayExportTest, dictionarySimple) { allocateIndices(3, pool_.get()), 3, vectorMaker_.flatVector({1, 2, 3})); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); ASSERT_EQ(*array->type(), *arrow::dictionary(arrow::int32(), arrow::int64())); auto& dict = static_cast(*array); @@ -864,7 +900,7 @@ TEST_F(ArrowBridgeArrayExportTest, dictionaryNested) { std::make_shared( pool_.get(), ARRAY(inner->type()), nullptr, 2, offsets, sizes, inner); }); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); ASSERT_EQ( *array->type(), @@ -914,7 +950,7 @@ TEST_F(ArrowBridgeArrayExportTest, constantComplex) { TEST_F(ArrowBridgeArrayExportTest, constantCrossValidate) { auto vector = BaseVector::createConstant(VARCHAR(), "hello", 100, pool_.get()); - auto array = toArrow(vector, pool_.get()); + auto array = toArrow(vector, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); @@ -983,7 +1019,23 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { if constexpr (std::is_same_v) { bits::setBit(rawValues, i, *inputValues[i]); } else if constexpr (std::is_same_v) { - rawValues[i] = inputValues[i]->toNanos(); + switch (options_.timestampUnit) { + case TimestampUnit::kSecond: + rawValues[i] = inputValues[i]->getSeconds(); + break; + case TimestampUnit::kMilli: + rawValues[i] = inputValues[i]->toMillis(); + break; + case TimestampUnit::kMicro: + rawValues[i] = inputValues[i]->toMicros(); + break; + case TimestampUnit::kNano: + rawValues[i] = inputValues[i]->toNanos(); + break; + default: + VELOX_USER_FAIL(fmt::format( + "Timestamp unit not supported: {}.", options_.timestampUnit)); + } } else { rawValues[i] = *inputValues[i]; } @@ -1096,9 +1148,46 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { // Assert new vector contents. for (vector_size_t i = 0; i < convertedVector->size(); ++i) { - ASSERT_TRUE(expected->equalValueAt(convertedVector.get(), i, i)) - << "at " << i << ": " << expected->toString(i) << " vs. " - << convertedVector->toString(i); + if constexpr (std::is_same_v) { + if (expected->isNullAt(i)) { + ASSERT_TRUE(convertedVector->isNullAt(i)) + << "at " << i << ": " << expected->toString(i) << " vs. " + << convertedVector->toString(i); + } else { + auto convertedFlat = convertedVector->asFlatVector(); + switch (options_.timestampUnit) { + case TimestampUnit::kSecond: + EXPECT_EQ( + Timestamp(expected->valueAt(i).getSeconds(), 0), + convertedFlat->valueAt(i)) + << "mismatch at index " << i; + break; + case TimestampUnit::kMilli: + EXPECT_EQ( + Timestamp::fromMillis(expected->valueAt(i).toMillis()), + convertedFlat->valueAt(i)) + << "mismatch at index " << i; + break; + case TimestampUnit::kMicro: + EXPECT_EQ( + Timestamp::fromMicros(expected->valueAt(i).toMicros()), + convertedFlat->valueAt(i)) + << "mismatch at index " << i; + break; + case TimestampUnit::kNano: + EXPECT_EQ(expected->valueAt(i), convertedFlat->valueAt(i)) + << "mismatch at index " << i; + break; + default: + VELOX_USER_FAIL(fmt::format( + "Timestamp unit not supported: {}.", options_.timestampUnit)); + } + } + } else { + ASSERT_TRUE(expected->equalValueAt(convertedVector.get(), i, i)) + << "at " << i << ": " << expected->toString(i) << " vs. " + << convertedVector->toString(i); + } } } @@ -1131,8 +1220,15 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { testArrowImport("g", {-99.9, 4.3, 31.1, 129.11, -12}); testArrowImport("f", {-99.9, 4.3, 31.1, 129.11, -12}); - testArrowImport( - "ttn", {Timestamp(0, 0), std::nullopt, Timestamp(1699308257, 1234)}); + for (uint8_t unit : + {(uint8_t)TimestampUnit::kSecond, + (uint8_t)TimestampUnit::kMilli, + (uint8_t)TimestampUnit::kMicro, + (uint8_t)TimestampUnit::kNano}) { + options_.timestampUnit = static_cast(unit); + testArrowImport( + "ts", {Timestamp(0, 0), std::nullopt, Timestamp(1699308257, 1234)}); + } } void testImportWithoutNullsBuffer() { @@ -1309,8 +1405,8 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { EXPECT_FALSE(schema.release); EXPECT_FALSE(data.release); } - velox::exportToArrow(vec, schema); - velox::exportToArrow(vec, data, pool_.get()); + velox::exportToArrow(vec, schema, options_); + velox::exportToArrow(vec, data, pool_.get(), options_); ASSERT_OK_AND_ASSIGN(auto arrowType, arrow::ImportType(&schema)); ASSERT_OK_AND_ASSIGN(auto array2, arrow::ImportArray(&data, arrowType)); ASSERT_OK(array2->ValidateFull()); @@ -1433,6 +1529,8 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { std::shared_ptr pool_{ memory::memoryManager()->addLeafPool()}; + + ArrowOptions options_; }; class ArrowBridgeArrayImportAsViewerTest : public ArrowBridgeArrayImportTest { @@ -1445,7 +1543,7 @@ class ArrowBridgeArrayImportAsViewerTest : public ArrowBridgeArrayImportTest { ArrowArray& arrowArray, memory::MemoryPool* pool) override { return facebook::velox::importFromArrowAsViewer( - arrowSchema, arrowArray, pool); + arrowSchema, arrowArray, options_, pool); } }; @@ -1492,7 +1590,7 @@ class ArrowBridgeArrayImportAsOwnerTest ArrowArray& arrowArray, memory::MemoryPool* pool) override { return facebook::velox::importFromArrowAsOwner( - arrowSchema, arrowArray, pool); + arrowSchema, arrowArray, options_, pool); } }; @@ -1535,7 +1633,8 @@ TEST_F(ArrowBridgeArrayImportAsOwnerTest, inputsMarkedReleased) { ArrowSchema arrowSchema = makeArrowSchema("i"); ArrowArray arrowArray = makeArrowArray(buffers, 2, 4, 0); - auto _ = importFromArrowAsOwner(arrowSchema, arrowArray, pool_.get()); + auto _ = + importFromArrowAsOwner(arrowSchema, arrowArray, options_, pool_.get()); EXPECT_EQ(arrowSchema.release, nullptr); EXPECT_EQ(arrowArray.release, nullptr); @@ -1568,7 +1667,10 @@ TEST_F(ArrowBridgeArrayImportAsOwnerTest, releaseCalled) { // Create a Velox Vector from Arrow and then destruct it to trigger the // release callback calling - { auto _ = importFromArrowAsOwner(arrowSchema, arrowArray, pool_.get()); } + { + auto _ = + importFromArrowAsOwner(arrowSchema, arrowArray, options_, pool_.get()); + } EXPECT_TRUE(TestReleaseCalled::schemaReleaseCalled); EXPECT_TRUE(TestReleaseCalled::arrayReleaseCalled); diff --git a/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp b/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp index 8def65ce8e8e..88bf8d502db0 100644 --- a/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp +++ b/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp @@ -120,7 +120,7 @@ class ArrowBridgeSchemaExportTest : public testing::Test { 3, // index to use for the constant BaseVector::create(type, 100, pool_.get())); - velox::exportToArrow(constantVector, arrowSchema); + velox::exportToArrow(constantVector, arrowSchema, options_); EXPECT_STREQ("+r", arrowSchema.format); EXPECT_EQ(nullptr, arrowSchema.name); @@ -155,7 +155,8 @@ class ArrowBridgeSchemaExportTest : public testing::Test { } void exportToArrow(const TypePtr& type, ArrowSchema& out) { - velox::exportToArrow(BaseVector::create(type, 0, pool_.get()), out); + velox::exportToArrow( + BaseVector::create(type, 0, pool_.get()), out, options_); } ArrowSchema makeArrowSchema(const char* format) { @@ -174,6 +175,7 @@ class ArrowBridgeSchemaExportTest : public testing::Test { std::shared_ptr pool_{ memory::memoryManager()->addLeafPool()}; + ArrowOptions options_; }; TEST_F(ArrowBridgeSchemaExportTest, scalar) { @@ -190,7 +192,15 @@ TEST_F(ArrowBridgeSchemaExportTest, scalar) { testScalarType(VARCHAR(), "u"); testScalarType(VARBINARY(), "z"); - testScalarType(TIMESTAMP(), "ttn"); + options_.timestampUnit = TimestampUnit::kSecond; + testScalarType(TIMESTAMP(), "tss:"); + options_.timestampUnit = TimestampUnit::kMilli; + testScalarType(TIMESTAMP(), "tsm:"); + options_.timestampUnit = TimestampUnit::kMicro; + testScalarType(TIMESTAMP(), "tsu:"); + options_.timestampUnit = TimestampUnit::kNano; + testScalarType(TIMESTAMP(), "tsn:"); + testScalarType(DATE(), "tdD"); testScalarType(DECIMAL(10, 4), "d:10,4"); @@ -354,7 +364,7 @@ TEST_F(ArrowBridgeSchemaImportTest, scalar) { EXPECT_EQ(*VARBINARY(), *testSchemaImport("Z")); // Temporal. - EXPECT_EQ(*TIMESTAMP(), *testSchemaImport("ttn")); + EXPECT_EQ(*TIMESTAMP(), *testSchemaImport("tsu:")); EXPECT_EQ(*DATE(), *testSchemaImport("tdD")); EXPECT_EQ(*DECIMAL(10, 4), *testSchemaImport("d:10,4")); @@ -367,7 +377,7 @@ TEST_F(ArrowBridgeSchemaImportTest, scalar) { TEST_F(ArrowBridgeSchemaImportTest, complexTypes) { // Array. EXPECT_EQ(*ARRAY(BIGINT()), *testSchemaImportComplex("+l", {"l"})); - EXPECT_EQ(*ARRAY(TIMESTAMP()), *testSchemaImportComplex("+l", {"ttn"})); + EXPECT_EQ(*ARRAY(TIMESTAMP()), *testSchemaImportComplex("+l", {"tsn:"})); EXPECT_EQ(*ARRAY(DATE()), *testSchemaImportComplex("+l", {"tdD"})); EXPECT_EQ(*ARRAY(VARCHAR()), *testSchemaImportComplex("+l", {"U"})); @@ -433,11 +443,13 @@ class ArrowBridgeSchemaTest : public testing::Test { } void exportToArrow(const TypePtr& type, ArrowSchema& out) { - velox::exportToArrow(BaseVector::create(type, 0, pool_.get()), out); + velox::exportToArrow( + BaseVector::create(type, 0, pool_.get()), out, options_); } std::shared_ptr pool_{ memory::memoryManager()->addLeafPool()}; + ArrowOptions options_; }; TEST_F(ArrowBridgeSchemaTest, roundtrip) { @@ -512,7 +524,7 @@ TEST_F(ArrowBridgeSchemaImportTest, dictionaryTypeTest) { *testSchemaDictionaryImport( "i", makeComplexArrowSchema( - schemas, schemaPtrs, mapSchemas, mapSchemaPtrs, "+l", {"ttn"}))); + schemas, schemaPtrs, mapSchemas, mapSchemaPtrs, "+l", {"ts"}))); EXPECT_EQ( *ARRAY(DATE()), *testSchemaDictionaryImport(