From cc7143ef4dcf6b41877bcea17a61b03c178da315 Mon Sep 17 00:00:00 2001 From: Jimmy Lu Date: Mon, 28 Oct 2024 13:45:18 -0700 Subject: [PATCH] Add Row ID column reading support (#11363) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/11363 Create framework in selective readers for generating row number field and composite with other constant fields. Then use this framework to add support for Row ID column. We move the row number generation logic into column reader during this process, so that we could filter them if needed in the future. Differential Revision: D65028172 --- velox/common/base/RawVector.cpp | 3 +- velox/connectors/hive/HiveConnectorSplit.h | 8 + velox/connectors/hive/HiveConnectorUtil.cpp | 37 +- velox/connectors/hive/HiveConnectorUtil.h | 7 +- velox/connectors/hive/HiveDataSource.cpp | 61 ++- velox/connectors/hive/HiveDataSource.h | 5 +- velox/connectors/hive/SplitReader.cpp | 26 +- velox/connectors/hive/SplitReader.h | 11 +- velox/connectors/hive/TableHandle.h | 3 +- .../hive/iceberg/IcebergSplitReader.cpp | 5 +- .../hive/iceberg/IcebergSplitReader.h | 3 +- .../tests/IcebergSplitReaderBenchmark.cpp | 2 +- .../hive/tests/HiveConnectorTest.cpp | 46 +-- velox/dwio/common/ColumnSelector.cpp | 2 +- velox/dwio/common/Options.h | 8 +- velox/dwio/common/Reader.cpp | 90 ++--- velox/dwio/common/ScanSpec.h | 28 +- velox/dwio/common/SelectiveColumnReader.h | 6 +- .../common/SelectiveStructColumnReader.cpp | 379 +++++++++++------- .../dwio/common/SelectiveStructColumnReader.h | 27 +- velox/dwio/common/tests/OptionsTests.cpp | 2 - velox/dwio/dwrf/reader/DwrfReader.cpp | 15 +- .../reader/SelectiveStructColumnReader.cpp | 6 +- velox/dwio/dwrf/test/ReaderTest.cpp | 45 ++- velox/dwio/parquet/reader/ParquetReader.cpp | 3 +- .../parquet/reader/StructColumnReader.cpp | 2 +- velox/exec/tests/TableScanTest.cpp | 108 +++++ 27 files changed, 570 insertions(+), 368 deletions(-) diff --git a/velox/common/base/RawVector.cpp b/velox/common/base/RawVector.cpp index 6dc1c048d57f6..91ae094b8249e 100644 --- a/velox/common/base/RawVector.cpp +++ b/velox/common/base/RawVector.cpp @@ -21,10 +21,11 @@ namespace facebook::velox { namespace { -std::vector iotaData; +raw_vector iotaData; bool initializeIota() { iotaData.resize(10000); + iotaData.resize(iotaData.capacity()); std::iota(iotaData.begin(), iotaData.end(), 0); return true; } diff --git a/velox/connectors/hive/HiveConnectorSplit.h b/velox/connectors/hive/HiveConnectorSplit.h index c86c9ef9c2da6..9af4d7ef357a7 100644 --- a/velox/connectors/hive/HiveConnectorSplit.h +++ b/velox/connectors/hive/HiveConnectorSplit.h @@ -36,6 +36,12 @@ struct HiveBucketConversion { std::vector> bucketColumnHandles; }; +struct RowIdProperties { + int64_t metadataVersion; + int64_t partitionId; + std::string tableGuid; +}; + struct HiveConnectorSplit : public connector::ConnectorSplit { const std::string filePath; dwio::common::FileFormat fileFormat; @@ -62,6 +68,8 @@ struct HiveConnectorSplit : public connector::ConnectorSplit { /// the file handle. std::optional properties; + std::optional rowIdProperties; + HiveConnectorSplit( const std::string& connectorId, const std::string& _filePath, diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index ec087c8657b01..42677e87382c5 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -260,10 +260,10 @@ inline bool isSynthesizedColumn( return infoColumns.count(name) != 0; } -inline bool isRowIndexColumn( +bool isSpecialColumn( const std::string& name, - std::shared_ptr rowIndexColumn) { - return rowIndexColumn != nullptr && rowIndexColumn->name() == name; + const std::optional& specialName) { + return specialName.has_value() && name == *specialName; } } // namespace @@ -368,7 +368,7 @@ std::shared_ptr makeScanSpec( partitionKeys, const std::unordered_map>& infoColumns, - const std::shared_ptr& rowIndexColumn, + const SpecialColumnNames& specialColumns, memory::MemoryPool* pool) { auto spec = std::make_shared("root"); folly::F14FastMap> @@ -377,8 +377,9 @@ std::shared_ptr makeScanSpec( for (auto& [subfield, _] : filters) { if (auto name = subfield.toString(); !isSynthesizedColumn(name, infoColumns) && - !isRowIndexColumn(name, rowIndexColumn) && partitionKeys.count(name) == 0) { + VELOX_CHECK(!isSpecialColumn(name, specialColumns.rowIndex)); + VELOX_CHECK(!isSpecialColumn(name, specialColumns.rowId)); filterSubfields[getColumnName(subfield)].push_back(&subfield); } } @@ -387,13 +388,24 @@ std::shared_ptr makeScanSpec( for (int i = 0; i < rowType->size(); ++i) { auto& name = rowType->nameOf(i); auto& type = rowType->childAt(i); + if (isSpecialColumn(name, specialColumns.rowIndex)) { + VELOX_CHECK(type->isBigint()); + auto* fieldSpec = spec->addField(name, i); + fieldSpec->setColumnType(common::ScanSpec::ColumnType::kRowIndex); + continue; + } + if (isSpecialColumn(name, specialColumns.rowId)) { + VELOX_CHECK(type->isRow() && type->size() == 5); + auto& rowType = type->asRow(); + auto* fieldSpec = spec->addFieldRecursively(name, rowType, i); + fieldSpec->setColumnType(common::ScanSpec::ColumnType::kComposite); + fieldSpec->childByName(rowType.nameOf(0)) + ->setColumnType(common::ScanSpec::ColumnType::kRowIndex); + continue; + } auto it = outputSubfields.find(name); if (it == outputSubfields.end()) { auto* fieldSpec = spec->addFieldRecursively(name, *type, i); - if (isRowIndexColumn(name, rowIndexColumn)) { - VELOX_CHECK(type->isBigint()); - fieldSpec->setExplicitRowNumber(true); - } processFieldSpec(dataColumns, type, *fieldSpec); filterSubfields.erase(name); continue; @@ -409,12 +421,6 @@ std::shared_ptr makeScanSpec( filterSubfields.erase(it); } auto* fieldSpec = spec->addField(name, i); - if (isRowIndexColumn(name, rowIndexColumn)) { - VELOX_CHECK(type->isBigint()); - // Set the flag for the case that the row index column only exists in - // remaining filters. - fieldSpec->setExplicitRowNumber(true); - } addSubfields(*type, subfieldSpecs, 1, pool, *fieldSpec); processFieldSpec(dataColumns, type, *fieldSpec); subfieldSpecs.clear(); @@ -448,7 +454,6 @@ std::shared_ptr makeScanSpec( if (isSynthesizedColumn(name, infoColumns)) { continue; } - VELOX_CHECK(!isRowIndexColumn(name, rowIndexColumn)); auto fieldSpec = spec->getOrCreateChild(pair.first); fieldSpec->addFilter(*pair.second); } diff --git a/velox/connectors/hive/HiveConnectorUtil.h b/velox/connectors/hive/HiveConnectorUtil.h index 84a2dc67782da..e6480a78165de 100644 --- a/velox/connectors/hive/HiveConnectorUtil.h +++ b/velox/connectors/hive/HiveConnectorUtil.h @@ -44,6 +44,11 @@ void checkColumnNameLowerCase( void checkColumnNameLowerCase(const core::TypedExprPtr& typeExpr); +struct SpecialColumnNames { + std::optional rowIndex; + std::optional rowId; +}; + std::shared_ptr makeScanSpec( const RowTypePtr& rowType, const folly::F14FastMap>& @@ -54,7 +59,7 @@ std::shared_ptr makeScanSpec( partitionKeys, const std::unordered_map>& infoColumns, - const std::shared_ptr& rowIndexColumn, + const SpecialColumnNames& specialColumns, memory::MemoryPool* pool); void configureReaderOptions( diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 546e6cd9a4cb5..bd930bbffd4f8 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -22,7 +22,6 @@ #include "velox/common/testutil/TestValue.h" #include "velox/connectors/hive/HiveConfig.h" -#include "velox/connectors/hive/HiveConnectorUtil.h" #include "velox/dwio/common/ReaderFactory.h" #include "velox/expression/FieldReference.h" @@ -81,18 +80,21 @@ HiveDataSource::HiveDataSource( handle, "ColumnHandle must be an instance of HiveColumnHandle for {}", canonicalizedName); - - if (handle->columnType() == HiveColumnHandle::ColumnType::kPartitionKey) { - partitionKeys_.emplace(handle->name(), handle); - } - - if (handle->columnType() == HiveColumnHandle::ColumnType::kSynthesized) { - infoColumns_.emplace(handle->name(), handle); - } - - if (handle->columnType() == HiveColumnHandle::ColumnType::kRowIndex) { - VELOX_CHECK_NULL(rowIndexColumn_); - rowIndexColumn_ = handle; + switch (handle->columnType()) { + case HiveColumnHandle::ColumnType::kRegular: + break; + case HiveColumnHandle::ColumnType::kPartitionKey: + partitionKeys_.emplace(handle->name(), handle); + break; + case HiveColumnHandle::ColumnType::kSynthesized: + infoColumns_.emplace(handle->name(), handle); + break; + case HiveColumnHandle::ColumnType::kRowIndex: + specialColumns_.rowIndex = handle->name(); + break; + case HiveColumnHandle::ColumnType::kRowId: + specialColumns_.rowId = handle->name(); + break; } } @@ -192,7 +194,7 @@ HiveDataSource::HiveDataSource( hiveTableHandle_->dataColumns(), partitionKeys_, infoColumns_, - rowIndexColumn_, + specialColumns_, pool_); if (remainingFilter) { metadataFilter_ = std::make_shared( @@ -257,7 +259,7 @@ std::unique_ptr HiveDataSource::setupBucketConversion() { hiveTableHandle_->dataColumns(), partitionKeys_, infoColumns_, - rowIndexColumn_, + specialColumns_, pool_); newScanSpec->moveAdaptationFrom(*scanSpec_); scanSpec_ = std::move(newScanSpec); @@ -266,6 +268,30 @@ std::unique_ptr HiveDataSource::setupBucketConversion() { split_->bucketConversion->tableBucketCount, std::move(bucketChannels)); } +void HiveDataSource::setupRowIdColumn() { + VELOX_CHECK(split_->rowIdProperties.has_value()); + const auto& props = *split_->rowIdProperties; + auto* rowId = scanSpec_->childByName(*specialColumns_.rowId); + VELOX_CHECK_NOT_NULL(rowId); + auto& rowIdType = + readerOutputType_->findChild(*specialColumns_.rowId)->asRow(); + auto rowGroupId = split_->getFileName(); + rowId->childByName(rowIdType.nameOf(1)) + ->setConstantValue( + StringView(rowGroupId), VARCHAR(), connectorQueryCtx_->memoryPool()); + rowId->childByName(rowIdType.nameOf(2)) + ->setConstantValue( + props.metadataVersion, BIGINT(), connectorQueryCtx_->memoryPool()); + rowId->childByName(rowIdType.nameOf(3)) + ->setConstantValue( + props.partitionId, BIGINT(), connectorQueryCtx_->memoryPool()); + rowId->childByName(rowIdType.nameOf(4)) + ->setConstantValue( + StringView(props.tableGuid), + VARCHAR(), + connectorQueryCtx_->memoryPool()); +} + void HiveDataSource::addSplit(std::shared_ptr split) { VELOX_CHECK_NULL( split_, @@ -284,12 +310,15 @@ void HiveDataSource::addSplit(std::shared_ptr split) { } else { partitionFunction_.reset(); } + if (specialColumns_.rowId.has_value()) { + setupRowIdColumn(); + } splitReader_ = createSplitReader(); // Split reader subclasses may need to use the reader options in prepareSplit // so we initialize it beforehand. splitReader_->configureReaderOptions(randomSkip_); - splitReader_->prepareSplit(metadataFilter_, runtimeStats_, rowIndexColumn_); + splitReader_->prepareSplit(metadataFilter_, runtimeStats_); } vector_size_t HiveDataSource::applyBucketConversion( diff --git a/velox/connectors/hive/HiveDataSource.h b/velox/connectors/hive/HiveDataSource.h index b7ca9bcb63444..a870966603cab 100644 --- a/velox/connectors/hive/HiveDataSource.h +++ b/velox/connectors/hive/HiveDataSource.h @@ -20,6 +20,7 @@ #include "velox/connectors/Connector.h" #include "velox/connectors/hive/FileHandle.h" #include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/connectors/hive/HiveConnectorUtil.h" #include "velox/connectors/hive/HivePartitionFunction.h" #include "velox/connectors/hive/SplitReader.h" #include "velox/connectors/hive/TableHandle.h" @@ -125,7 +126,6 @@ class HiveDataSource : public DataSource { partitionKeys_; std::shared_ptr ioStats_; - std::shared_ptr rowIndexColumn_; private: std::unique_ptr setupBucketConversion(); @@ -133,6 +133,8 @@ class HiveDataSource : public DataSource { const RowVectorPtr& rowVector, BufferPtr& indices); + void setupRowIdColumn(); + // Evaluates remainingFilter_ on the specified vector. Returns number of rows // passed. Populates filterEvalCtx_.selectedIndices and selectedBits if only // some rows passed the filter. If none or all rows passed @@ -157,6 +159,7 @@ class HiveDataSource : public DataSource { // Column handles for the Split info columns keyed on their column names. std::unordered_map> infoColumns_; + SpecialColumnNames specialColumns_{}; folly::F14FastMap> subfields_; SubfieldFilters filters_; diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index 31b5964678178..bfa9e52828c40 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -145,9 +145,8 @@ void SplitReader::configureReaderOptions( void SplitReader::prepareSplit( std::shared_ptr metadataFilter, - dwio::common::RuntimeStatistics& runtimeStats, - const std::shared_ptr& rowIndexColumn) { - createReader(std::move(metadataFilter), rowIndexColumn); + dwio::common::RuntimeStatistics& runtimeStats) { + createReader(std::move(metadataFilter)); if (checkIfSplitIsEmpty(runtimeStats)) { VELOX_CHECK(emptySplit_); @@ -222,8 +221,7 @@ std::string SplitReader::toString() const { } void SplitReader::createReader( - std::shared_ptr metadataFilter, - const std::shared_ptr& rowIndexColumn) { + std::shared_ptr metadataFilter) { VELOX_CHECK_NE( baseReaderOpts_.fileFormat(), dwio::common::FileFormat::UNKNOWN); @@ -264,10 +262,6 @@ void SplitReader::createReader( auto& fileType = baseReader_->rowType(); auto columnTypes = adaptColumns(fileType, baseReaderOpts_.fileSchema()); auto columnNames = fileType->names(); - if (rowIndexColumn != nullptr) { - bool isExplicit = scanSpec_->childByName(rowIndexColumn->name()) != nullptr; - setRowIndexColumn(rowIndexColumn, isExplicit); - } configureRowReaderOptions( hiveTableHandle_->tableParameters(), scanSpec_, @@ -312,17 +306,6 @@ void SplitReader::createRowReader() { baseRowReader_ = baseReader_->createRowReader(baseRowReaderOpts_); } -void SplitReader::setRowIndexColumn( - const std::shared_ptr& rowIndexColumn, - bool isExplicit) { - dwio::common::RowNumberColumnInfo rowNumberColumnInfo; - rowNumberColumnInfo.insertPosition = - readerOutputType_->getChildIdx(rowIndexColumn->name()); - rowNumberColumnInfo.name = rowIndexColumn->name(); - rowNumberColumnInfo.isExplicit = isExplicit; - baseRowReaderOpts_.setRowNumberColumnInfo(std::move(rowNumberColumnInfo)); -} - std::vector SplitReader::adaptColumns( const RowTypePtr& fileType, const std::shared_ptr& tableSchema) { @@ -350,7 +333,8 @@ std::vector SplitReader::adaptColumns( connectorQueryCtx_->memoryPool(), connectorQueryCtx_->sessionTimezone()); childSpec->setConstantValue(constant); - } else if (!childSpec->isExplicitRowNumber()) { + } else if ( + childSpec->columnType() == common::ScanSpec::ColumnType::kRegular) { auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName); if (!fileTypeIdx.has_value()) { // Column is missing. Most likely due to schema evolution. diff --git a/velox/connectors/hive/SplitReader.h b/velox/connectors/hive/SplitReader.h index 80231d4d563d8..b8ab6e10fd040 100644 --- a/velox/connectors/hive/SplitReader.h +++ b/velox/connectors/hive/SplitReader.h @@ -77,8 +77,7 @@ class SplitReader { /// would be called only once per incoming split virtual void prepareSplit( std::shared_ptr metadataFilter, - dwio::common::RuntimeStatistics& runtimeStats, - const std::shared_ptr& rowIndexColumn); + dwio::common::RuntimeStatistics& runtimeStats); virtual uint64_t next(uint64_t size, VectorPtr& output); @@ -114,9 +113,7 @@ class SplitReader { /// Create the dwio::common::Reader object baseReader_, which will be used to /// read the data file's metadata and schema - void createReader( - std::shared_ptr metadataFilter, - const std::shared_ptr& rowIndexColumn); + void createReader(std::shared_ptr metadataFilter); /// Check if the hiveSplit_ is empty. The split is considered empty when /// 1) The data file is missing but the user chooses to ignore it @@ -136,10 +133,6 @@ class SplitReader { const RowTypePtr& fileType, const std::shared_ptr& tableSchema); - void setRowIndexColumn( - const std::shared_ptr& rowIndexColumn, - bool isExplicit); - void setPartitionValue( common::ScanSpec* spec, const std::string& partitionKey, diff --git a/velox/connectors/hive/TableHandle.h b/velox/connectors/hive/TableHandle.h index cfc9295bd4055..9a02baa18d6fe 100644 --- a/velox/connectors/hive/TableHandle.h +++ b/velox/connectors/hive/TableHandle.h @@ -34,7 +34,8 @@ class HiveColumnHandle : public ColumnHandle { kSynthesized, /// A zero-based row number of type BIGINT auto-generated by the connector. /// Rows numbers are unique within a single file only. - kRowIndex + kRowIndex, + kRowId, }; /// NOTE: 'dataType' is the column type in target write table. 'hiveType' is diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp index b7c1f6b523408..1923837d112cd 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -54,9 +54,8 @@ IcebergSplitReader::IcebergSplitReader( void IcebergSplitReader::prepareSplit( std::shared_ptr metadataFilter, - dwio::common::RuntimeStatistics& runtimeStats, - const std::shared_ptr& rowIndexColumn) { - createReader(std::move(metadataFilter), rowIndexColumn); + dwio::common::RuntimeStatistics& runtimeStats) { + createReader(std::move(metadataFilter)); if (checkIfSplitIsEmpty(runtimeStats)) { VELOX_CHECK(emptySplit_); diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.h b/velox/connectors/hive/iceberg/IcebergSplitReader.h index b5ab7da64480a..5f1196038f96d 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.h +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.h @@ -43,8 +43,7 @@ class IcebergSplitReader : public SplitReader { void prepareSplit( std::shared_ptr metadataFilter, - dwio::common::RuntimeStatistics& runtimeStats, - const std::shared_ptr& rowIndexColumn) override; + dwio::common::RuntimeStatistics& runtimeStats) override; uint64_t next(uint64_t size, VectorPtr& output) override; diff --git a/velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.cpp b/velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.cpp index e0b2a6c31f853..e623a2ec9d413 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.cpp +++ b/velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.cpp @@ -346,7 +346,7 @@ void IcebergSplitReaderBenchmark::readSingleColumn( std::shared_ptr randomSkip; icebergSplitReader->configureReaderOptions(randomSkip); - icebergSplitReader->prepareSplit(nullptr, runtimeStats_, nullptr); + icebergSplitReader->prepareSplit(nullptr, runtimeStats_); // Filter range is generated from a small sample data of 4096 rows. So the // upperBound and lowerBound are introduced to estimate the result size. diff --git a/velox/connectors/hive/tests/HiveConnectorTest.cpp b/velox/connectors/hive/tests/HiveConnectorTest.cpp index 8a64513b85ec8..126215b9d312d 100644 --- a/velox/connectors/hive/tests/HiveConnectorTest.cpp +++ b/velox/connectors/hive/tests/HiveConnectorTest.cpp @@ -92,14 +92,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_multilevel) { auto rowType = ROW({{"c0", columnType}}); auto subfields = makeSubfields({"c0.c0c1[3][\"foo\"].c0c1c0"}); auto scanSpec = makeScanSpec( - rowType, - groupSubfields(subfields), - {}, - nullptr, - {}, - {}, - nullptr, - pool_.get()); + rowType, groupSubfields(subfields), {}, nullptr, {}, {}, {}, pool_.get()); auto* c0c0 = scanSpec->childByName("c0")->childByName("c0c0"); validateNullConstant(*c0c0, *BIGINT()); auto* c0c1 = scanSpec->childByName("c0")->childByName("c0c1"); @@ -135,7 +128,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeFields) { nullptr, {}, {}, - nullptr, + {}, pool_.get()); auto* c0c0 = scanSpec->childByName("c0")->childByName("c0c0"); ASSERT_FALSE(c0c0->childByName("c0c0c0")->isConstant()); @@ -159,7 +152,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeArray) { nullptr, {}, {}, - nullptr, + {}, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_EQ(c0->maxArrayElementsCount(), 2); @@ -178,7 +171,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeArrayNegative) { auto groupedSubfields = groupSubfields(subfields); VELOX_ASSERT_USER_THROW( makeScanSpec( - rowType, groupedSubfields, {}, nullptr, {}, {}, nullptr, pool_.get()), + rowType, groupedSubfields, {}, nullptr, {}, {}, {}, pool_.get()), "Non-positive array subscript cannot be push down"); } @@ -194,7 +187,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeMap) { nullptr, {}, {}, - nullptr, + {}, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_EQ( @@ -226,7 +219,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_allSubscripts) { nullptr, {}, {}, - nullptr, + {}, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_TRUE(c0->flatMapFeatureSelection().empty()); @@ -247,7 +240,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_allSubscripts) { nullptr, {}, {}, - nullptr, + {}, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_TRUE(mapKeyIsNotNull(*c0)); @@ -271,7 +264,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) { nullptr, {}, {}, - nullptr, + {}, pool_.get()); auto* keysFilter = scanSpec->childByName("c0") ->childByName(ScanSpec::kMapKeysFieldName) @@ -300,7 +293,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) { nullptr, {}, {}, - nullptr, + {}, pool_.get()); keysFilter = scanSpec->childByName("c0") ->childByName(ScanSpec::kMapKeysFieldName) @@ -320,7 +313,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) { nullptr, {}, {}, - nullptr, + {}, pool_.get()); keysFilter = scanSpec->childByName("c0") ->childByName(ScanSpec::kMapKeysFieldName) @@ -337,7 +330,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) { nullptr, {}, {}, - nullptr, + {}, pool_.get()); keysFilter = scanSpec->childByName("c0") ->childByName(ScanSpec::kMapKeysFieldName) @@ -378,7 +371,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_onlyInFilters) { ROW({{"c0", c0Type}, {"c1", c1Type}}), {}, {}, - nullptr, + {}, pool_.get()); auto c0 = scanSpec->childByName("c0"); @@ -459,7 +452,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_duplicateSubfields) { nullptr, {}, {}, - nullptr, + {}, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_EQ(c0->children().size(), 2); @@ -473,14 +466,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_filterPartitionKey) { SubfieldFilters filters; filters.emplace(Subfield("ds"), exec::equal("2023-10-13")); auto scanSpec = makeScanSpec( - rowType, - {}, - filters, - rowType, - {{"ds", nullptr}}, - {}, - nullptr, - pool_.get()); + rowType, {}, filters, rowType, {{"ds", nullptr}}, {}, {}, pool_.get()); ASSERT_TRUE(scanSpec->childByName("c0")->projectOut()); ASSERT_FALSE(scanSpec->childByName("ds")->projectOut()); } @@ -498,7 +484,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_prunedMapNonNullMapKey) { nullptr, {}, {}, - nullptr, + {}, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_EQ(c0->children().size(), 2); @@ -513,7 +499,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_prunedMapNonNullMapKey) { nullptr, {}, {}, - nullptr, + {}, pool_.get()); c0 = scanSpec->childByName("c0"); ASSERT_EQ(c0->children().size(), 2); diff --git a/velox/dwio/common/ColumnSelector.cpp b/velox/dwio/common/ColumnSelector.cpp index 228da970af4ca..deddf43c644b7 100644 --- a/velox/dwio/common/ColumnSelector.cpp +++ b/velox/dwio/common/ColumnSelector.cpp @@ -344,7 +344,7 @@ std::shared_ptr ColumnSelector::fromScanSpec( const RowTypePtr& rowType) { std::vector columnNames; for (auto& child : spec.children()) { - if (child->isConstant() || child->isExplicitRowNumber()) { + if (!child->readFromFile()) { continue; } std::string name = child->fieldName(); diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index 8ba90fa9ac2a0..569d30ffd77fc 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -109,14 +109,12 @@ struct TableParameter { "serialization.null.format"; }; +/// Implicit row number column to be added. This column will be removed in the +/// output of split reader. Should use the ScanSpec::ColumnType::kRowIndex if +/// the column is suppose to be explicit and kept in the output. struct RowNumberColumnInfo { column_index_t insertPosition; std::string name; - // This flag is used to distinguish the explicit and implicit use cases. In - // explicit case, row index column is declared in the output type or used in - // subfield filters or remaining filter. In implicit case, it's not declared - // in the output columns but only in the split reader. - bool isExplicit; }; class FormatSpecificOptions { diff --git a/velox/dwio/common/Reader.cpp b/velox/dwio/common/Reader.cpp index 1beec1b02491c..3fd7241ecb9e4 100644 --- a/velox/dwio/common/Reader.cpp +++ b/velox/dwio/common/Reader.cpp @@ -151,7 +151,6 @@ VectorPtr RowReader::projectColumns( namespace { void fillRowNumberVector( VectorPtr& rowNumVector, - bool contiguousRowNumbers, uint64_t previousRow, uint64_t rowsToRead, const dwio::common::SelectiveColumnReader* columnReader, @@ -174,15 +173,10 @@ void fillRowNumberVector( flatRowNum = rowNumVector->asUnchecked>(); } auto* rawRowNum = flatRowNum->mutableRawValues(); - if (contiguousRowNumbers) { - VELOX_DCHECK_EQ(rowsToRead, result->size()); - std::iota(rawRowNum, rawRowNum + rowsToRead, previousRow); - } else { - const auto rowOffsets = columnReader->outputRows(); - VELOX_DCHECK_EQ(rowOffsets.size(), result->size()); - for (int i = 0; i < rowOffsets.size(); ++i) { - rawRowNum[i] = previousRow + rowOffsets[i]; - } + const auto rowOffsets = columnReader->outputRows(); + VELOX_DCHECK_EQ(rowOffsets.size(), result->size()); + for (int i = 0; i < rowOffsets.size(); ++i) { + rawRowNum[i] = previousRow + rowOffsets[i]; } } } // namespace @@ -199,64 +193,25 @@ void RowReader::readWithRowNumber( const auto rowNumberColumnIndex = rowNumberColumnInfo->insertPosition; const auto& rowNumberColumnName = rowNumberColumnInfo->name; column_index_t numChildren{0}; - column_index_t numNotReadFromFileChildren{0}; for (auto& column : options.scanSpec()->children()) { if (column->projectOut()) { ++numChildren; - if (column->isConstant() || column->isExplicitRowNumber()) { - ++numNotReadFromFileChildren; - } } } VELOX_CHECK_LE(rowNumberColumnIndex, numChildren); - const bool contiguousRowNumbers = - (numChildren == numNotReadFromFileChildren) && !hasDeletion(mutation); - if (rowNumberColumnInfo->isExplicit) { - columnReader->next(rowsToRead, result, mutation); - fillRowNumberVector( - result->asUnchecked()->childAt(rowNumberColumnIndex), - contiguousRowNumbers, - previousRow, - rowsToRead, - columnReader.get(), - result); - } else { - auto* rowVector = result->asUnchecked(); - VectorPtr rowNumVector; - if (rowVector->childrenSize() != numChildren) { - VELOX_CHECK_EQ(rowVector->childrenSize(), numChildren + 1); - rowNumVector = rowVector->childAt(rowNumberColumnIndex); - const auto& rowType = rowVector->type()->asRow(); - auto names = rowType.names(); - auto types = rowType.children(); - auto children = rowVector->children(); - VELOX_DCHECK(!names.empty() && !types.empty() && !children.empty()); - names.erase(names.begin() + rowNumberColumnIndex); - types.erase(types.begin() + rowNumberColumnIndex); - children.erase(children.begin() + rowNumberColumnIndex); - result = std::make_shared( - rowVector->pool(), - ROW(std::move(names), std::move(types)), - rowVector->nulls(), - rowVector->size(), - std::move(children)); - } - columnReader->next(rowsToRead, result, mutation); - fillRowNumberVector( - rowNumVector, - contiguousRowNumbers, - previousRow, - rowsToRead, - columnReader.get(), - result); - rowVector = result->asUnchecked(); - auto& rowType = rowVector->type()->asRow(); + auto* rowVector = result->asUnchecked(); + VectorPtr rowNumVector; + if (rowVector->childrenSize() != numChildren) { + VELOX_CHECK_EQ(rowVector->childrenSize(), numChildren + 1); + rowNumVector = rowVector->childAt(rowNumberColumnIndex); + const auto& rowType = rowVector->type()->asRow(); auto names = rowType.names(); auto types = rowType.children(); auto children = rowVector->children(); - names.insert(names.begin() + rowNumberColumnIndex, rowNumberColumnName); - types.insert(types.begin() + rowNumberColumnIndex, BIGINT()); - children.insert(children.begin() + rowNumberColumnIndex, rowNumVector); + VELOX_DCHECK(!names.empty() && !types.empty() && !children.empty()); + names.erase(names.begin() + rowNumberColumnIndex); + types.erase(types.begin() + rowNumberColumnIndex); + children.erase(children.begin() + rowNumberColumnIndex); result = std::make_shared( rowVector->pool(), ROW(std::move(names), std::move(types)), @@ -264,6 +219,23 @@ void RowReader::readWithRowNumber( rowVector->size(), std::move(children)); } + columnReader->next(rowsToRead, result, mutation); + fillRowNumberVector( + rowNumVector, previousRow, rowsToRead, columnReader.get(), result); + rowVector = result->asUnchecked(); + auto& rowType = rowVector->type()->asRow(); + auto names = rowType.names(); + auto types = rowType.children(); + auto children = rowVector->children(); + names.insert(names.begin() + rowNumberColumnIndex, rowNumberColumnName); + types.insert(types.begin() + rowNumberColumnIndex, BIGINT()); + children.insert(children.begin() + rowNumberColumnIndex, rowNumVector); + result = std::make_shared( + rowVector->pool(), + ROW(std::move(names), std::move(types)), + rowVector->nulls(), + rowVector->size(), + std::move(children)); } } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/ScanSpec.h b/velox/dwio/common/ScanSpec.h index 2639b6c354a68..25bbc543045f3 100644 --- a/velox/dwio/common/ScanSpec.h +++ b/velox/dwio/common/ScanSpec.h @@ -22,6 +22,7 @@ #include "velox/type/Subfield.h" #include "velox/vector/BaseVector.h" #include "velox/vector/ComplexVector.h" +#include "velox/vector/ConstantVector.h" #include "velox/vector/LazyVector.h" #include @@ -39,6 +40,12 @@ namespace common { // mutable by readers to reflect filter order and other adaptation. class ScanSpec { public: + enum class ColumnType : int8_t { + kRegular, // Read from file or constant + kRowIndex, // Row number in the file starting from 0 + kComposite, // A struct with all children not read from file + }; + static constexpr column_index_t kNoChannel = ~0; static constexpr const char* kMapKeysFieldName = "keys"; static constexpr const char* kMapValuesFieldName = "values"; @@ -97,16 +104,26 @@ class ScanSpec { constantValue_ = value; } + template + void setConstantValue(T val, TypePtr type, memory::MemoryPool* pool) { + constantValue_ = std::make_shared>( + pool, 1, false, std::move(type), std::move(val)); + } + bool isConstant() const { return constantValue_ != nullptr; } - void setExplicitRowNumber(bool isExplicitRowNumber) { - isExplicitRowNumber_ = isExplicitRowNumber; + void setColumnType(ColumnType value) { + columnType_ = value; } - bool isExplicitRowNumber() const { - return isExplicitRowNumber_; + ColumnType columnType() const { + return columnType_; + } + + bool readFromFile() const { + return columnType_ == ColumnType::kRegular && !isConstant(); } // Name of the value in its container, i.e. field name in struct or @@ -354,7 +371,8 @@ class ScanSpec { VectorPtr constantValue_; bool projectOut_ = false; - bool isExplicitRowNumber_ = false; + ColumnType columnType_ = ColumnType::kRegular; + // True if a string dictionary or flat map in this field should be // returned as flat. bool makeFlat_ = false; diff --git a/velox/dwio/common/SelectiveColumnReader.h b/velox/dwio/common/SelectiveColumnReader.h index c3c6585b3dd3a..438bb469bc0b5 100644 --- a/velox/dwio/common/SelectiveColumnReader.h +++ b/velox/dwio/common/SelectiveColumnReader.h @@ -494,11 +494,7 @@ class SelectiveColumnReader { return StringView(data, value.size()); } - // Whether output rows should be filled when there is no column projected out - // and there is delete mutation. Used for row number generation. The case - // for no delete mutation is handled more efficiently outside column reader in - // `RowReader::readWithRowNumber'. - virtual void setFillMutatedOutputRows(bool /*value*/) { + virtual void setCurrentRowNumber(int64_t /*value*/) { VELOX_UNREACHABLE("Only struct reader supports this method"); } diff --git a/velox/dwio/common/SelectiveStructColumnReader.cpp b/velox/dwio/common/SelectiveStructColumnReader.cpp index 839abc8aaad9b..e3bdf614af2a7 100644 --- a/velox/dwio/common/SelectiveStructColumnReader.cpp +++ b/velox/dwio/common/SelectiveStructColumnReader.cpp @@ -21,6 +21,163 @@ namespace facebook::velox::dwio::common { +namespace { + +bool testFilterOnConstant(const velox::common::ScanSpec& spec) { + if (spec.isConstant() && !spec.constantValue()->isNullAt(0)) { + // Non-null constant is known value during split scheduling and filters on + // them should not be handled at execution level. + return true; + } + // Check filter on missing field. + return !spec.hasFilter() || spec.testNull(); +} + +// Recursively makes empty RowVectors for positions in 'children' where the +// corresponding child type in 'rowType' is a row. The reader expects RowVector +// outputs to be initialized so that the content corresponds to the query schema +// regardless of the file schema. An empty RowVector can have nullptr for all +// its non-row children. +void fillRowVectorChildren( + memory::MemoryPool& pool, + const RowType& rowType, + std::vector& children) { + for (auto i = 0; i < children.size(); ++i) { + const auto& type = rowType.childAt(i); + if (type->isRow()) { + std::vector innerChildren(type->size()); + fillRowVectorChildren(pool, type->asRow(), innerChildren); + children[i] = std::make_shared( + &pool, type, nullptr, 0, std::move(innerChildren)); + } + } +} + +VectorPtr tryReuseResult(const VectorPtr& result) { + if (!result.unique()) { + return nullptr; + } + switch (result->encoding()) { + case VectorEncoding::Simple::ROW: + // Do not call prepareForReuse as it would throw away constant vectors + // that can be reused. Reusability of children should be checked in + // getValues of child readers (all readers other than struct are + // recreating the result vector on each batch currently, so no issue with + // reusability now). + result->reuseNulls(); + result->clearContainingLazyAndWrapped(); + return result; + case VectorEncoding::Simple::LAZY: { + auto& lazy = static_cast(*result); + if (!lazy.isLoaded()) { + return nullptr; + } + return tryReuseResult(lazy.loadedVectorShared()); + } + case VectorEncoding::Simple::DICTIONARY: + return tryReuseResult(result->valueVector()); + default: + return nullptr; + } +} + +void prepareResult(VectorPtr& result) { + if (auto reused = tryReuseResult(result)) { + result = std::move(reused); + return; + } + auto& rowType = result->type()->asRow(); + VLOG(1) << "Reallocating result row vector with " << rowType.size() + << " children"; + std::vector children(rowType.size()); + fillRowVectorChildren(*result->pool(), rowType, children); + result = std::make_shared( + result->pool(), result->type(), nullptr, 0, std::move(children)); +} + +void setConstantField( + const VectorPtr& constant, + vector_size_t size, + VectorPtr& field) { + if (field && field->isConstantEncoding() && field.unique() && + field->size() > 0 && field->equalValueAt(constant.get(), 0, 0)) { + field->resize(size); + } else { + field = BaseVector::wrapInConstant(size, 0, constant); + } +} + +void setNullField( + vector_size_t size, + VectorPtr& field, + const TypePtr& type, + memory::MemoryPool* pool) { + if (field && field->isConstantEncoding() && field.unique() && + field->size() > 0 && field->isNullAt(0)) { + field->resize(size); + } else { + field = BaseVector::createNullConstant(type, size, pool); + } +} + +void setRowNumberField( + int64_t offset, + const RowSet& rows, + memory::MemoryPool* pool, + VectorPtr& field) { + VELOX_CHECK_GE(offset, 0); + if (field && BaseVector::isVectorWritable(field)) { + VELOX_CHECK( + *field->type() == *BIGINT(), + "Unexpected row number type: {}", + field->type()->toString()); + field->clearAllNulls(); + field->resize(rows.size()); + } else { + field = std::make_shared>( + pool, + BIGINT(), + nullptr, + rows.size(), + AlignedBuffer::allocate(rows.size(), pool), + std::vector()); + } + auto* values = field->asChecked>()->mutableRawValues(); + for (int i = 0; i < rows.size(); ++i) { + values[i] = offset + rows[i]; + } +} + +void setCompositeField( + const velox::common::ScanSpec& spec, + int64_t offset, + const RowSet& rows, + memory::MemoryPool* pool, + VectorPtr& field) { + VELOX_CHECK(field && field->type()->isRow()); + prepareResult(field); + auto* rowField = field->asChecked(); + rowField->clearAllNulls(); + rowField->unsafeResize(rows.size()); + for (auto& child : spec.children()) { + auto& childField = rowField->childAt(child->channel()); + switch (child->columnType()) { + case velox::common::ScanSpec::ColumnType::kRegular: + VELOX_CHECK(child->isConstant()); + setConstantField(child->constantValue(), rows.size(), childField); + break; + case velox::common::ScanSpec::ColumnType::kRowIndex: + setRowNumberField(offset, rows, pool, childField); + break; + case velox::common::ScanSpec::ColumnType::kComposite: + setCompositeField(*child, offset, rows, pool, childField); + break; + } + } +} + +} // namespace + void SelectiveStructColumnReaderBase::filterRowGroups( uint64_t rowGroupSize, const dwio::common::StatsContext& context, @@ -79,20 +236,6 @@ void SelectiveStructColumnReaderBase::fillOutputRowsFromMutation( } } -namespace { - -bool testFilterOnConstant(const velox::common::ScanSpec& spec) { - if (spec.isConstant() && !spec.constantValue()->isNullAt(0)) { - // Non-null constant is known value during split scheduling and filters on - // them should not be handled at execution level. - return true; - } - // Check filter on missing field. - return !spec.hasFilter() || spec.testNull(); -} - -} // namespace - void SelectiveStructColumnReaderBase::next( uint64_t numValues, VectorPtr& result, @@ -100,52 +243,61 @@ void SelectiveStructColumnReaderBase::next( process::TraceContext trace("SelectiveStructColumnReaderBase::next"); mutation_ = mutation; hasDeletion_ = common::hasDeletion(mutation); - if (children_.empty()) { - if (hasDeletion_) { - if (fillMutatedOutputRows_) { - fillOutputRowsFromMutation(numValues); - numValues = outputRows_.size(); - } else { - if (mutation->deletedRows) { - numValues -= bits::countBits(mutation->deletedRows, 0, numValues); - } - if (mutation->randomSkip) { - numValues *= mutation->randomSkip->sampleRate(); - } - } - } - for (const auto& childSpec : scanSpec_->children()) { - if (isChildConstant(*childSpec) && !testFilterOnConstant(*childSpec)) { - numValues = 0; - break; - } - } + const RowSet rows(iota(numValues, rows_), numValues); - // No readers - // This can be either count(*) query or a query that select only - // constant columns (partition keys or columns missing from an old file - // due to schema evolution) or row number column. - auto resultRowVector = std::dynamic_pointer_cast(result); - resultRowVector->unsafeResize(numValues); - - for (auto& childSpec : scanSpec_->children()) { - VELOX_CHECK(childSpec->isConstant() || childSpec->isExplicitRowNumber()); - if (childSpec->projectOut() && childSpec->isConstant()) { - const auto channel = childSpec->channel(); - resultRowVector->childAt(channel) = BaseVector::wrapInConstant( - numValues, 0, childSpec->constantValue()); - } - } + if (!children_.empty()) { + read(readOffset_, rows, nullptr); + getValues(outputRows(), &result); return; } - const auto oldSize = rows_.size(); - rows_.resize(numValues); - if (numValues > oldSize) { - std::iota(&rows_[oldSize], &rows_[rows_.size()], oldSize); + // No child readers. This can be either count(*) query or a query that select + // only constant columns (partition keys or columns missing from an old file + // due to schema evolution) or columns not from file (e.g. row numbers). + inputRows_ = rows; + outputRows_.clear(); + if (hasDeletion_) { + fillOutputRowsFromMutation(numValues); + numValues = outputRows_.size(); + } + for (const auto& childSpec : scanSpec_->children()) { + if (isChildConstant(*childSpec) && !testFilterOnConstant(*childSpec)) { + outputRows_.clear(); + numValues = 0; + break; + } + } + prepareResult(result); + auto* resultRowVector = result->asChecked(); + resultRowVector->unsafeResize(numValues); + for (auto& childSpec : scanSpec_->children()) { + if (!childSpec->projectOut()) { + continue; + } + auto& childField = resultRowVector->childAt(childSpec->channel()); + if (childSpec->isConstant()) { + childField = + BaseVector::wrapInConstant(numValues, 0, childSpec->constantValue()); + } else if ( + childSpec->columnType() == + velox::common::ScanSpec::ColumnType::kRowIndex) { + VELOX_CHECK(!childSpec->hasFilter()); + setRowNumberField( + currentRowNumber_, outputRows(), resultRowVector->pool(), childField); + } else if ( + childSpec->columnType() == + velox::common::ScanSpec::ColumnType::kComposite) { + VELOX_CHECK(!childSpec->hasFilter()); + setCompositeField( + *childSpec, + currentRowNumber_, + outputRows(), + resultRowVector->pool(), + childField); + } else { + VELOX_UNREACHABLE(); + } } - read(readOffset_, rows_, nullptr); - getValues(outputRows(), &result); } void SelectiveStructColumnReaderBase::read( @@ -201,7 +353,8 @@ void SelectiveStructColumnReaderBase::read( continue; } - if (childSpec->isExplicitRowNumber()) { + if (!childSpec->readFromFile()) { + VELOX_CHECK(!childSpec->hasFilter()); continue; } @@ -258,7 +411,7 @@ void SelectiveStructColumnReaderBase::recordParentNullsInChildren( if (isChildConstant(*childSpec)) { continue; } - if (childSpec->isExplicitRowNumber()) { + if (!childSpec->readFromFile()) { continue; } @@ -290,84 +443,6 @@ bool SelectiveStructColumnReaderBase::isChildMissing( childSpec.channel() >= fileType_->size()); } -namespace { - -// Recursively makes empty RowVectors for positions in 'children' -// where the corresponding child type in 'rowType' is a row. The -// reader expects RowVector outputs to be initialized so that the -// content corresponds to the query schema regardless of the file -// schema. An empty RowVector can have nullptr for all its non-row -// children. -void fillRowVectorChildren( - memory::MemoryPool& pool, - const RowType& rowType, - std::vector& children) { - for (auto i = 0; i < children.size(); ++i) { - const auto& type = rowType.childAt(i); - if (type->isRow()) { - std::vector innerChildren(type->size()); - fillRowVectorChildren(pool, type->asRow(), innerChildren); - children[i] = std::make_shared( - &pool, type, nullptr, 0, std::move(innerChildren)); - } - } -} - -VectorPtr tryReuseResult(const VectorPtr& result) { - if (!result.unique()) { - return nullptr; - } - switch (result->encoding()) { - case VectorEncoding::Simple::ROW: - // Do not call prepareForReuse as it would throw away constant vectors - // that can be reused. Reusability of children should be checked in - // getValues of child readers (all readers other than struct are - // recreating the result vector on each batch currently, so no issue with - // reusability now). - result->reuseNulls(); - result->clearContainingLazyAndWrapped(); - return result; - case VectorEncoding::Simple::LAZY: { - auto& lazy = static_cast(*result); - if (!lazy.isLoaded()) { - return nullptr; - } - return tryReuseResult(lazy.loadedVectorShared()); - } - case VectorEncoding::Simple::DICTIONARY: - return tryReuseResult(result->valueVector()); - default: - return nullptr; - } -} - -void setConstantField( - const VectorPtr& constant, - vector_size_t size, - VectorPtr& field) { - if (field && field->isConstantEncoding() && field.unique() && - field->size() > 0 && field->equalValueAt(constant.get(), 0, 0)) { - field->resize(size); - } else { - field = BaseVector::wrapInConstant(size, 0, constant); - } -} - -void setNullField( - vector_size_t size, - VectorPtr& field, - const TypePtr& type, - memory::MemoryPool* pool) { - if (field && field->isConstantEncoding() && field.unique() && - field->size() > 0 && field->isNullAt(0)) { - field->resize(size); - } else { - field = BaseVector::createNullConstant(type, size, pool); - } -} - -} // namespace - void SelectiveStructColumnReaderBase::getValues( const RowSet& rows, VectorPtr* result) { @@ -378,21 +453,7 @@ void SelectiveStructColumnReaderBase::getValues( result->get()->type()->isRow(), "Struct reader expects a result of type ROW."); auto& rowType = result->get()->type()->asRow(); - if (auto reused = tryReuseResult(*result)) { - *result = std::move(reused); - } else { - VLOG(1) << "Reallocating result row vector with " << rowType.size() - << " children"; - std::vector children(rowType.size()); - fillRowVectorChildren(*result->get()->pool(), rowType, children); - *result = std::make_shared( - result->get()->pool(), - result->get()->type(), - nullptr, - 0, - std::move(children)); - } - + prepareResult(*result); auto* resultRow = static_cast(result->get()); resultRow->unsafeResize(rows.size()); if (rows.empty()) { @@ -407,10 +468,6 @@ void SelectiveStructColumnReaderBase::getValues( continue; } - if (childSpec->isExplicitRowNumber()) { - // Row number data is generated after, skip data loading for it. - continue; - } const auto channel = childSpec->channel(); auto& childResult = resultRow->childAt(channel); if (childSpec->isConstant()) { @@ -418,6 +475,20 @@ void SelectiveStructColumnReaderBase::getValues( continue; } + if (childSpec->columnType() == + velox::common::ScanSpec::ColumnType::kRowIndex) { + setRowNumberField( + currentRowNumber_, rows, resultRow->pool(), childResult); + continue; + } + + if (childSpec->columnType() == + velox::common::ScanSpec::ColumnType::kComposite) { + setCompositeField( + *childSpec, currentRowNumber_, rows, resultRow->pool(), childResult); + continue; + } + const auto index = childSpec->subscript(); // Set missing fields to be null constant, if we're in the top level struct // missing columns should already be a null constant from the check above. diff --git a/velox/dwio/common/SelectiveStructColumnReader.h b/velox/dwio/common/SelectiveStructColumnReader.h index 3b264c948d71e..17d98815c0154 100644 --- a/velox/dwio/common/SelectiveStructColumnReader.h +++ b/velox/dwio/common/SelectiveStructColumnReader.h @@ -98,8 +98,8 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader { return debugString_; } - void setFillMutatedOutputRows(bool value) final { - fillMutatedOutputRows_ = value; + void setCurrentRowNumber(int64_t value) final { + currentRowNumber_ = value; } protected: @@ -121,12 +121,6 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader { getExceptionContext().message(VeloxException::Type::kSystem)), isRoot_(isRoot) {} - /// Records the number of nulls added by 'this' between the end position of - /// each child reader and the end of the range of 'read(). This must be done - /// also if a child is not read so that we know how much to skip when seeking - /// forward within the row group. - void recordParentNullsInChildren(int64_t offset, const RowSet& rows); - bool hasDeletion() const final { return hasDeletion_; } @@ -141,8 +135,17 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader { isChildMissing(childSpec); } + std::vector children_; + + private: void fillOutputRowsFromMutation(vector_size_t size); + /// Records the number of nulls added by 'this' between the end position of + /// each child reader and the end of the range of 'read(). This must be done + /// also if a child is not read so that we know how much to skip when seeking + /// forward within the row group. + void recordParentNullsInChildren(int64_t offset, const RowSet& rows); + // Context information obtained from ExceptionContext. Stored here // so that LazyVector readers under this can add this to their // ExceptionContext. Allows contextualizing reader errors to split @@ -154,7 +157,8 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader { // table. const bool isRoot_; - std::vector children_; + // Dense set of rows to read in next(). + raw_vector rows_; // Sequence number of output batch. Checked against ColumnLoaders // created by 'this' to verify they are still valid at load. @@ -162,16 +166,13 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader { int64_t lazyVectorReadOffset_; - // Dense set of rows to read in next(). - raw_vector rows_; + int64_t currentRowNumber_ = -1; const Mutation* mutation_ = nullptr; // After read() call mutation_ could go out of scope. Need to keep this // around for lazy columns. bool hasDeletion_ = false; - - bool fillMutatedOutputRows_ = false; }; class SelectiveStructColumnReader : public SelectiveStructColumnReaderBase { diff --git a/velox/dwio/common/tests/OptionsTests.cpp b/velox/dwio/common/tests/OptionsTests.cpp index 4492a62420d0a..82335821c5218 100644 --- a/velox/dwio/common/tests/OptionsTests.cpp +++ b/velox/dwio/common/tests/OptionsTests.cpp @@ -30,12 +30,10 @@ TEST(OptionsTests, setRowNumberColumnInfoTest) { RowNumberColumnInfo rowNumberColumnInfo; rowNumberColumnInfo.insertPosition = 0; rowNumberColumnInfo.name = "test"; - rowNumberColumnInfo.isExplicit = true; rowReaderOptions.setRowNumberColumnInfo(rowNumberColumnInfo); auto rowNumberColumn = rowReaderOptions.rowNumberColumnInfo().value(); ASSERT_EQ(rowNumberColumnInfo.insertPosition, rowNumberColumn.insertPosition); ASSERT_EQ(rowNumberColumnInfo.name, rowNumberColumn.name); - ASSERT_EQ(rowNumberColumnInfo.isExplicit, rowNumberColumn.isExplicit); } TEST(OptionsTests, testRowNumberColumnInfoInCopy) { diff --git a/velox/dwio/dwrf/reader/DwrfReader.cpp b/velox/dwio/dwrf/reader/DwrfReader.cpp index 797c3551b6258..2f63deef5961a 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.cpp +++ b/velox/dwio/dwrf/reader/DwrfReader.cpp @@ -169,8 +169,6 @@ void DwrfUnit::ensureDecoders() { flatMapContext, /*isRoot=*/true); selectiveColumnReader_->setIsTopLevel(); - selectiveColumnReader_->setFillMutatedOutputRows( - options_.rowNumberColumnInfo().has_value()); } else { auto requestedType = columnSelector_->getSchemaWithId(); auto factory = &ColumnReaderFactory::defaultFactory(); @@ -526,19 +524,14 @@ void DwrfRowReader::readNext( } return; } - + auto& columnReader = getSelectiveColumnReader(); + columnReader->setCurrentRowNumber(previousRow_); if (!options_.rowNumberColumnInfo().has_value()) { - getSelectiveColumnReader()->next(rowsToRead, result, mutation); + columnReader->next(rowsToRead, result, mutation); return; } - readWithRowNumber( - getSelectiveColumnReader(), - options_, - previousRow_, - rowsToRead, - mutation, - result); + columnReader, options_, previousRow_, rowsToRead, mutation, result); } uint64_t DwrfRowReader::skip(uint64_t numValues) { diff --git a/velox/dwio/dwrf/reader/SelectiveStructColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveStructColumnReader.cpp index 6e817dc678dbf..c8efa7e409aaa 100644 --- a/velox/dwio/dwrf/reader/SelectiveStructColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveStructColumnReader.cpp @@ -50,13 +50,13 @@ SelectiveStructColumnReader::SelectiveStructColumnReader( const auto& rowType = requestedType_->asRow(); for (auto i = 0; i < childSpecs.size(); ++i) { auto* childSpec = childSpecs[i]; - if (childSpec->isExplicitRowNumber()) { - continue; - } if (childSpec->isConstant() || isChildMissing(*childSpec)) { childSpec->setSubscript(kConstantChildSpecSubscript); continue; } + if (!childSpec->readFromFile()) { + continue; + } const auto childFileType = fileType_->childByName(childSpec->fieldName()); const auto childRequestedType = rowType.findChild(childSpec->fieldName()); diff --git a/velox/dwio/dwrf/test/ReaderTest.cpp b/velox/dwio/dwrf/test/ReaderTest.cpp index daeb070d1c7c6..1f9fac33ab0a0 100644 --- a/velox/dwio/dwrf/test/ReaderTest.cpp +++ b/velox/dwio/dwrf/test/ReaderTest.cpp @@ -2054,13 +2054,19 @@ namespace { void verifyRowNumbers( RowReader& rowReader, memory::MemoryPool* pool, - int expectedNumRows) { - auto result = BaseVector::create(ROW({{"c0", INTEGER()}}), 0, pool); + int expectedNumRows, + bool explicitRowNumber = false) { + auto result = explicitRowNumber + ? BaseVector::create( + ROW({{"c0", INTEGER()}, {"$row_number", BIGINT()}}), 0, pool) + : BaseVector::create(ROW({{"c0", INTEGER()}}), 0, pool); int numRows = 0; while (rowReader.next(10, result) > 0) { auto* rowVector = result->asUnchecked(); ASSERT_EQ(2, rowVector->childrenSize()); - ASSERT_EQ(rowVector->type()->asRow().nameOf(1), ""); + ASSERT_EQ( + rowVector->type()->asRow().nameOf(1), + explicitRowNumber ? "$row_number" : ""); DecodedVector values(*rowVector->childAt(0)); DecodedVector rowNumbers(*rowVector->childAt(1)); for (size_t i = 0; i < rowVector->size(); ++i) { @@ -2118,7 +2124,6 @@ TEST_F(TestReader, setRowNumberColumnInfo) { RowNumberColumnInfo rowNumberColumnInfo; rowNumberColumnInfo.insertPosition = 1; rowNumberColumnInfo.name = ""; - rowNumberColumnInfo.isExplicit = false; rowReaderOpts.setRowNumberColumnInfo(rowNumberColumnInfo); { SCOPED_TRACE("Selective no filter"); @@ -2148,7 +2153,6 @@ TEST_F(TestReader, reuseRowNumberColumn) { RowNumberColumnInfo rowNumberColumnInfo; rowNumberColumnInfo.insertPosition = 1; rowNumberColumnInfo.name = ""; - rowNumberColumnInfo.isExplicit = false; rowReaderOpts.setRowNumberColumnInfo(rowNumberColumnInfo); { SCOPED_TRACE("Reuse passed in"); @@ -2197,6 +2201,37 @@ TEST_F(TestReader, reuseRowNumberColumn) { } } +TEST_F(TestReader, explicitRowNumberColumn) { + const std::vector> integerValues{ + {0, 1, 2, 3, 4}, + {5, 6, 7}, + {8}, + {}, + {9, 10, 11, 12, 13, 14, 15}, + }; + auto batches = createBatches(integerValues); + auto [writer, reader] = createWriterReader(batches, pool()); + auto spec = std::make_shared(""); + spec->addField("c0", 0); + spec->addField("$row_number", 1) + ->setColumnType(common::ScanSpec::ColumnType::kRowIndex); + RowReaderOptions rowReaderOpts; + rowReaderOpts.setScanSpec(spec); + { + SCOPED_TRACE("Selective no filter"); + auto rowReader = reader->createRowReader(rowReaderOpts); + verifyRowNumbers(*rowReader, pool(), 16, true); + } + spec->childByName("c0")->setFilter( + common::createBigintValues({1, 4, 5, 7, 11, 14}, false)); + spec->resetCachedValues(true); + { + SCOPED_TRACE("Selective with filter"); + auto rowReader = reader->createRowReader(rowReaderOpts); + verifyRowNumbers(*rowReader, pool(), 6, true); + } +} + TEST_F(TestReader, failToReuseReaderNulls) { auto c0 = makeRowVector( {"a", "b"}, diff --git a/velox/dwio/parquet/reader/ParquetReader.cpp b/velox/dwio/parquet/reader/ParquetReader.cpp index 5e2d73221dec7..4e83898b69ff2 100644 --- a/velox/dwio/parquet/reader/ParquetReader.cpp +++ b/velox/dwio/parquet/reader/ParquetReader.cpp @@ -947,8 +947,6 @@ class ParquetRowReader::Impl { readerBase_->schemaWithId(), // Id is schema id params, *options_.scanSpec()); - columnReader_->setFillMutatedOutputRows( - options_.rowNumberColumnInfo().has_value()); columnReader_->setIsTopLevel(); filterRowGroups(); @@ -1021,6 +1019,7 @@ class ParquetRowReader::Impl { return 0; } VELOX_DCHECK_GT(rowsToRead, 0); + columnReader_->setCurrentRowNumber(nextRowNumber()); if (!options_.rowNumberColumnInfo().has_value()) { columnReader_->next(rowsToRead, result, mutation); } else { diff --git a/velox/dwio/parquet/reader/StructColumnReader.cpp b/velox/dwio/parquet/reader/StructColumnReader.cpp index 4dfdc4e0f7565..0b808da74e9a5 100644 --- a/velox/dwio/parquet/reader/StructColumnReader.cpp +++ b/velox/dwio/parquet/reader/StructColumnReader.cpp @@ -39,7 +39,7 @@ StructColumnReader::StructColumnReader( childSpec->setSubscript(kConstantChildSpecSubscript); continue; } - if (childSpecs[i]->isExplicitRowNumber()) { + if (!childSpecs[i]->readFromFile()) { continue; } auto childFileType = fileType_->childByName(childSpec->fieldName()); diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index 9d5d4edc1ac2c..da5b7c02147fb 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -5213,3 +5213,111 @@ TEST_F(TableScanTest, hugeStripe) { } ASSERT_EQ(numRows, 4'294'980'000); } + +TEST_F(TableScanTest, rowId) { + const auto rowIdType = + ROW({"row_number", + "row_group_id", + "metadata_version", + "partition_id", + "table_guid"}, + {BIGINT(), VARCHAR(), BIGINT(), BIGINT(), VARCHAR()}); + auto data = makeFlatVector(10, [](auto i) { return i + 1; }); + auto vector = makeRowVector({data}); + auto file = TempFilePath::create(); + writeToFile(file->getPath(), {vector}); + auto makeRowIdColumnHandle = [&](auto& name) { + return std::make_shared( + name, HiveColumnHandle::ColumnType::kRowId, rowIdType, rowIdType); + }; + { + SCOPED_TRACE("Preload"); + auto outputType = ROW({"c0", "c1"}, {BIGINT(), rowIdType}); + auto plan = PlanBuilder() + .startTableScan() + .outputType(outputType) + .assignments({ + {"c0", makeColumnHandle("c0", BIGINT(), {})}, + {"c1", makeRowIdColumnHandle("c1")}, + }) + .endTableScan() + .planNode(); + auto query = AssertQueryBuilder(plan); + query.config(core::QueryConfig::kMaxSplitPreloadPerDriver, "4"); + auto expected = BaseVector::create(outputType, 0, pool()); + for (int i = 0; i < 10; ++i) { + auto split = makeHiveConnectorSplit(file->getPath()); + split->rowIdProperties = { + .metadataVersion = i, + .partitionId = 2 * i, + .tableGuid = fmt::format("table-guid-{}", i), + }; + query.split(split); + auto rowGroupId = split->getFileName(); + auto newExpected = makeRowVector({ + data, + makeRowVector({ + makeFlatVector(10, folly::identity), + makeConstant(StringView(rowGroupId), 10), + makeConstant(split->rowIdProperties->metadataVersion, 10), + makeConstant(split->rowIdProperties->partitionId, 10), + makeConstant(StringView(split->rowIdProperties->tableGuid), 10), + }), + }); + expected->append(newExpected.get()); + } + auto task = query.assertResults(expected); + auto stats = getTableScanRuntimeStats(task); + ASSERT_GT(stats.at("preloadedSplits").sum, 0); + } + { + SCOPED_TRACE("Remaining filter only"); + auto remainingFilter = + parseExpr("c1.row_number % 2 == 0", ROW({"c1"}, {rowIdType})); + auto plan = PlanBuilder() + .startTableScan() + .tableHandle(makeTableHandle({}, remainingFilter)) + .outputType(ROW({"c0"}, {BIGINT()})) + .assignments({ + {"c0", makeColumnHandle("c0", BIGINT(), {})}, + {"c1", makeRowIdColumnHandle("c1")}, + }) + .endTableScan() + .planNode(); + auto split = makeHiveConnectorSplit(file->getPath()); + split->rowIdProperties = { + .metadataVersion = 42, + .partitionId = 24, + .tableGuid = "foo", + }; + auto expected = makeRowVector( + {makeFlatVector(5, [](auto i) { return 1 + 2 * i; })}); + AssertQueryBuilder(plan).split(split).assertResults(expected); + } + { + SCOPED_TRACE("Row ID only"); + auto plan = PlanBuilder() + .startTableScan() + .outputType(ROW({"c0"}, {rowIdType})) + .assignments({{"c0", makeRowIdColumnHandle("c0")}}) + .endTableScan() + .planNode(); + auto split = makeHiveConnectorSplit(file->getPath()); + split->rowIdProperties = { + .metadataVersion = 42, + .partitionId = 24, + .tableGuid = "foo", + }; + auto rowGroupId = split->getFileName(); + auto expected = makeRowVector({ + makeRowVector({ + makeFlatVector(10, folly::identity), + makeConstant(StringView(rowGroupId), 10), + makeConstant(split->rowIdProperties->metadataVersion, 10), + makeConstant(split->rowIdProperties->partitionId, 10), + makeConstant(StringView(split->rowIdProperties->tableGuid), 10), + }), + }); + AssertQueryBuilder(plan).split(split).assertResults(expected); + } +}