diff --git a/velox/connectors/hive/HiveConfig.cpp b/velox/connectors/hive/HiveConfig.cpp index ba30a0c19e75..1f99a98a3805 100644 --- a/velox/connectors/hive/HiveConfig.cpp +++ b/velox/connectors/hive/HiveConfig.cpp @@ -266,7 +266,7 @@ uint64_t HiveConfig::sortWriterFinishTimeSliceLimitMs( } uint64_t HiveConfig::footerEstimatedSize() const { - return config_->get(kFooterEstimatedSize, 1UL << 20); + return config_->get(kFooterEstimatedSize, 256UL << 10); } uint64_t HiveConfig::filePreloadThreshold() const { diff --git a/velox/dwio/common/BufferedInput.h b/velox/dwio/common/BufferedInput.h index ac969fdee2cc..2a1f1eba9826 100644 --- a/velox/dwio/common/BufferedInput.h +++ b/velox/dwio/common/BufferedInput.h @@ -91,7 +91,7 @@ class BufferedInput { virtual std::unique_ptr read(uint64_t offset, uint64_t length, LogType logType) const { - std::unique_ptr ret = readBuffer(offset, length); + auto ret = readBuffer(offset, length); if (ret != nullptr) { return ret; } diff --git a/velox/dwio/common/CacheInputStream.cpp b/velox/dwio/common/CacheInputStream.cpp index ca05a178e3b6..dedda0c72f6a 100644 --- a/velox/dwio/common/CacheInputStream.cpp +++ b/velox/dwio/common/CacheInputStream.cpp @@ -160,7 +160,7 @@ std::string CacheInputStream::getName() const { return result; } -size_t CacheInputStream::positionSize() { +size_t CacheInputStream::positionSize() const { // not compressed, so only need 1 position (uncompressed position) return 1; } diff --git a/velox/dwio/common/CacheInputStream.h b/velox/dwio/common/CacheInputStream.h index 54650d9ebc43..195bdbdd1351 100644 --- a/velox/dwio/common/CacheInputStream.h +++ b/velox/dwio/common/CacheInputStream.h @@ -43,13 +43,18 @@ class CacheInputStream : public SeekableInputStream { ~CacheInputStream() override; + CacheInputStream& operator=(const CacheInputStream&) = delete; + CacheInputStream(const CacheInputStream&) = delete; + CacheInputStream& operator=(CacheInputStream&&) = delete; + CacheInputStream(CacheInputStream&&) = delete; + bool Next(const void** data, int* size) override; void BackUp(int count) override; bool SkipInt64(int64_t count) override; google::protobuf::int64 ByteCount() const override; void seekToPosition(PositionProvider& position) override; std::string getName() const override; - size_t positionSize() override; + size_t positionSize() const override; /// Returns a copy of 'this', ranging over the same bytes. The clone is /// initially positioned at the position of 'this' and can be moved diff --git a/velox/dwio/common/DirectInputStream.cpp b/velox/dwio/common/DirectInputStream.cpp index 3d8c8a13f636..68173b40f259 100644 --- a/velox/dwio/common/DirectInputStream.cpp +++ b/velox/dwio/common/DirectInputStream.cpp @@ -105,7 +105,7 @@ std::string DirectInputStream::getName() const { "DirectInputStream {} of {}", offsetInRegion_, region_.length); } -size_t DirectInputStream::positionSize() { +size_t DirectInputStream::positionSize() const { // not compressed, so only need 1 position (uncompressed position) return 1; } diff --git a/velox/dwio/common/DirectInputStream.h b/velox/dwio/common/DirectInputStream.h index 3715da666682..3d75b4459568 100644 --- a/velox/dwio/common/DirectInputStream.h +++ b/velox/dwio/common/DirectInputStream.h @@ -48,7 +48,7 @@ class DirectInputStream : public SeekableInputStream { void seekToPosition(PositionProvider& position) override; std::string getName() const override; - size_t positionSize() override; + size_t positionSize() const override; /// Testing function to access loaded state. void testingData( diff --git a/velox/dwio/common/SeekableInputStream.cpp b/velox/dwio/common/SeekableInputStream.cpp index 7773445dea5d..be2fa90f4104 100644 --- a/velox/dwio/common/SeekableInputStream.cpp +++ b/velox/dwio/common/SeekableInputStream.cpp @@ -176,7 +176,7 @@ std::string SeekableArrayInputStream::getName() const { "SeekableArrayInputStream ", position_, " of ", length_); } -size_t SeekableArrayInputStream::positionSize() { +size_t SeekableArrayInputStream::positionSize() const { // not compressed, so only need 1 position (uncompressed position) return 1; } @@ -257,7 +257,7 @@ std::string SeekableFileInputStream::getName() const { input_->getName(), " from ", start_, " for ", length_); } -size_t SeekableFileInputStream::positionSize() { +size_t SeekableFileInputStream::positionSize() const { // not compressed, so only need 1 position (uncompressed position) return 1; } diff --git a/velox/dwio/common/SeekableInputStream.h b/velox/dwio/common/SeekableInputStream.h index a91fbb379732..71dbdc23c3e9 100644 --- a/velox/dwio/common/SeekableInputStream.h +++ b/velox/dwio/common/SeekableInputStream.h @@ -40,7 +40,7 @@ class SeekableInputStream : public google::protobuf::io::ZeroCopyInputStream { // Returns the number of position values this input stream uses to identify an // ORC/DWRF stream address. - virtual size_t positionSize() = 0; + virtual size_t positionSize() const = 0; virtual bool SkipInt64(int64_t count) = 0; @@ -82,7 +82,7 @@ class SeekableArrayInputStream : public SeekableInputStream { virtual google::protobuf::int64 ByteCount() const override; virtual void seekToPosition(PositionProvider& position) override; virtual std::string getName() const override; - virtual size_t positionSize() override; + virtual size_t positionSize() const override; /// Return the total number of bytes returned from Next() calls. Intended to /// be used for test validation. @@ -123,7 +123,7 @@ class SeekableFileInputStream : public SeekableInputStream { virtual google::protobuf::int64 ByteCount() const override; virtual void seekToPosition(PositionProvider& position) override; virtual std::string getName() const override; - virtual size_t positionSize() override; + virtual size_t positionSize() const override; private: const std::shared_ptr input_; diff --git a/velox/dwio/common/Statistics.h b/velox/dwio/common/Statistics.h index db5a91b06413..699066c1df4d 100644 --- a/velox/dwio/common/Statistics.h +++ b/velox/dwio/common/Statistics.h @@ -535,16 +535,39 @@ struct RuntimeStatistics { // Number of strides (row groups) skipped based on statistics. int64_t skippedStrides{0}; + int64_t footerBufferOverread{0}; + + int64_t numStripes{0}; + ColumnReaderStatistics columnReaderStatistics; std::unordered_map toMap() { - return { - {"skippedSplits", RuntimeCounter(skippedSplits)}, - {"skippedSplitBytes", - RuntimeCounter(skippedSplitBytes, RuntimeCounter::Unit::kBytes)}, - {"skippedStrides", RuntimeCounter(skippedStrides)}, - {"flattenStringDictionaryValues", - RuntimeCounter(columnReaderStatistics.flattenStringDictionaryValues)}}; + std::unordered_map result; + if (skippedSplits > 0) { + result.emplace("skippedSplits", RuntimeCounter(skippedSplits)); + } + if (skippedSplitBytes > 0) { + result.emplace( + "skippedSplitBytes", + RuntimeCounter(skippedSplitBytes, RuntimeCounter::Unit::kBytes)); + } + if (skippedStrides > 0) { + result.emplace("skippedStrides", RuntimeCounter(skippedStrides)); + } + if (footerBufferOverread > 0) { + result.emplace( + "footerBufferOverread", + RuntimeCounter(footerBufferOverread, RuntimeCounter::Unit::kBytes)); + } + if (numStripes > 0) { + result.emplace("numStripes", RuntimeCounter(numStripes)); + } + if (columnReaderStatistics.flattenStringDictionaryValues > 0) { + result.emplace( + "flattenStringDictionaryValues", + RuntimeCounter(columnReaderStatistics.flattenStringDictionaryValues)); + } + return result; } }; diff --git a/velox/dwio/common/compression/PagedInputStream.h b/velox/dwio/common/compression/PagedInputStream.h index b7ecd99fd4c9..15b1acd630a0 100644 --- a/velox/dwio/common/compression/PagedInputStream.h +++ b/velox/dwio/common/compression/PagedInputStream.h @@ -74,7 +74,7 @@ class PagedInputStream : public dwio::common::SeekableInputStream { ")"); } - size_t positionSize() override { + size_t positionSize() const override { // not compressed, so need 2 positions (compressed position + uncompressed // position) return 2; diff --git a/velox/dwio/dwrf/reader/DwrfReader.cpp b/velox/dwio/dwrf/reader/DwrfReader.cpp index ed9dede68142..42fd560a5830 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.cpp +++ b/velox/dwio/dwrf/reader/DwrfReader.cpp @@ -301,6 +301,9 @@ DwrfRowReader::DwrfRowReader( } unitLoader_ = getUnitLoader(); + if (!emptyFile()) { + getReader().loadCache(); + } } std::unique_ptr& DwrfRowReader::getColumnReader() { @@ -339,7 +342,7 @@ std::unique_ptr DwrfRowReader::getUnitLoader() { uint64_t DwrfRowReader::seekToRow(uint64_t rowNumber) { // Empty file - if (isEmptyFile()) { + if (emptyFile()) { return 0; } nextRowNumber_.reset(); @@ -422,7 +425,7 @@ uint64_t DwrfRowReader::seekToRow(uint64_t rowNumber) { } uint64_t DwrfRowReader::skipRows(uint64_t numberOfRowsToSkip) { - if (isEmptyFile()) { + if (emptyFile()) { VLOG(1) << "Empty file, nothing to skip"; return 0; } @@ -586,7 +589,7 @@ uint64_t DwrfRowReader::rowNumber() { if (nextRow != kAtEnd) { return nextRow; } - if (isEmptyFile()) { + if (emptyFile()) { return 0; } return getReader().footer().numberOfRows(); @@ -615,7 +618,7 @@ uint64_t DwrfRowReader::next( const dwio::common::Mutation* mutation) { const auto nextRow = nextRowNumber(); if (nextRow == kAtEnd) { - if (!isEmptyFile()) { + if (!emptyFile()) { previousRow_ = firstRowOfStripe_[stripeCeiling_ - 1] + getReader().footer().stripes(stripeCeiling_ - 1).numberOfRows(); } else { diff --git a/velox/dwio/dwrf/reader/DwrfReader.h b/velox/dwio/dwrf/reader/DwrfReader.h index a18b6982fa59..39e6f2a534ab 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.h +++ b/velox/dwio/dwrf/reader/DwrfReader.h @@ -108,6 +108,8 @@ class DwrfRowReader : public StrideIndexProvider, void updateRuntimeStats( dwio::common::RuntimeStatistics& stats) const override { stats.skippedStrides += skippedStrides_; + stats.footerBufferOverread += getReader().footerBufferOverread(); + stats.numStripes += stripeCeiling_ - firstStripe_; stats.columnReaderStatistics.flattenStringDictionaryValues += columnReaderStatistics_.flattenStringDictionaryValues; } @@ -152,7 +154,7 @@ class DwrfRowReader : public StrideIndexProvider, const dwio::common::Statistics& stats, uint32_t nodeId) const; - bool isEmptyFile() const { + bool emptyFile() const { return stripeCeiling_ == firstStripe_; } diff --git a/velox/dwio/dwrf/reader/ReaderBase.cpp b/velox/dwio/dwrf/reader/ReaderBase.cpp index b81c208276e0..71dbc2e41510 100644 --- a/velox/dwio/dwrf/reader/ReaderBase.cpp +++ b/velox/dwio/dwrf/reader/ReaderBase.cpp @@ -82,56 +82,76 @@ ReaderBase::ReaderBase( FileFormat fileFormat) : ReaderBase(createReaderOptions(pool, fileFormat), std::move(input)) {} +namespace { + +template +std::unique_ptr parsePostScript(const char* input, int size) { + auto impl = std::make_unique(); + VELOX_CHECK(impl->ParseFromArray(input, size)); + return std::make_unique(std::move(impl)); +} + +template +std::unique_ptr parseFooter( + dwio::common::SeekableInputStream* input, + google::protobuf::Arena* arena) { + auto* impl = google::protobuf::Arena::CreateMessage(arena); + VELOX_CHECK(impl->ParseFromZeroCopyStream(input)); + return std::make_unique(impl); +} + +} // namespace + ReaderBase::ReaderBase( const dwio::common::ReaderOptions& options, std::unique_ptr input) - : arena_(std::make_unique()), - options_{options}, + : options_{options}, input_(std::move(input)), - fileLength_(input_->getReadFile()->size()) { + fileLength_(input_->getReadFile()->size()), + arena_(std::make_unique()) { process::TraceContext trace("ReaderBase::ReaderBase"); // TODO: make a config DWIO_ENSURE(fileLength_ > 0, "ORC file is empty"); VELOX_CHECK_GE(fileLength_, 4, "File size too small"); const auto preloadFile = fileLength_ <= options_.filePreloadThreshold(); - const uint64_t readSize = preloadFile - ? fileLength_ - : std::min(fileLength_, options_.footerEstimatedSize()); + const int64_t footerBufSize = + std::min(fileLength_, options_.footerEstimatedSize()); + const uint64_t readSize = preloadFile ? fileLength_ : footerBufSize; if (input_->supportSyncLoad()) { input_->enqueue({fileLength_ - readSize, readSize, "footer"}); input_->load(preloadFile ? LogType::FILE : LogType::FOOTER); } // TODO: read footer from spectrum - { - const void* buf; - int32_t ignored; - auto lastByteStream = input_->read(fileLength_ - 1, 1, LogType::FOOTER); - const bool ret = lastByteStream->Next(&buf, &ignored); - VELOX_CHECK(ret, "Failed to read"); - // Make sure 'lastByteStream' is live while dereferencing 'buf'. - psLength_ = *static_cast(buf) & 0xff; - } + auto footerBuffer = + AlignedBuffer::allocate(footerBufSize, &options_.memoryPool()); + auto* rawFooterBuffer = footerBuffer->asMutable(); + input_->read(fileLength_ - footerBufSize, footerBufSize, LogType::FOOTER) + ->readFully(rawFooterBuffer, footerBufSize); + int32_t footerOffset = footerBufSize - 1; + psLength_ = static_cast(rawFooterBuffer[footerOffset]); VELOX_CHECK_LE( psLength_ + 4, // 1 byte for post script len, 3 byte "ORC" header. fileLength_, "Corrupted file, Post script size is invalid"); + VELOX_CHECK_GE(footerOffset, psLength_); + footerOffset -= psLength_; if (fileFormat() == FileFormat::DWRF) { - auto postScript = ProtoUtils::readProto( - input_->read(fileLength_ - psLength_ - 1, psLength_, LogType::FOOTER)); - postScript_ = std::make_unique(std::move(postScript)); + postScript_ = parsePostScript( + rawFooterBuffer + footerOffset, psLength_); } else { - auto postScript = ProtoUtils::readProto( - input_->read(fileLength_ - psLength_ - 1, psLength_, LogType::FOOTER)); - postScript_ = std::make_unique(std::move(postScript)); + postScript_ = parsePostScript( + rawFooterBuffer + footerOffset, psLength_); } const uint64_t footerSize = postScript_->footerLength(); const uint64_t cacheSize = postScript_->hasCacheSize() ? postScript_->cacheSize() : 0; const uint64_t tailSize = 1 + psLength_ + footerSize + cacheSize; + footerBufferOverread_ = + std::max(0, footerBufSize - static_cast(tailSize)); // There are cases in warehouse, where RC/text files are stored // in ORC partition. This causes the Reader to SIGSEGV. The following @@ -154,29 +174,57 @@ ReaderBase::ReaderBase( input_->load(LogType::FOOTER); } - auto footerStream = input_->read( - fileLength_ - psLength_ - footerSize - 1, footerSize, LogType::FOOTER); + BufferPtr fullFooterBuffer; + char* footerStart; + if (footerOffset >= footerSize) { + footerOffset -= footerSize; + footerStart = rawFooterBuffer + footerOffset; + } else { + fullFooterBuffer = + AlignedBuffer::allocate(footerSize, &options_.memoryPool()); + footerStart = fullFooterBuffer->asMutable(); + auto remainingBytes = footerSize - footerOffset; + input_ + ->read( + fileLength_ - footerSize - psLength_ - 1, + remainingBytes, + LogType::FOOTER) + ->readFully(footerStart, remainingBytes); + ::memcpy(footerStart + remainingBytes, rawFooterBuffer, footerOffset); + footerOffset = 0; + } + auto decompressed = createDecompressedStream( + std::make_unique( + footerStart, footerSize), + "File Footer"); if (fileFormat() == FileFormat::DWRF) { - auto footer = - google::protobuf::Arena::CreateMessage(arena_.get()); - ProtoUtils::readProtoInto( - createDecompressedStream(std::move(footerStream), "File Footer"), - footer); - footer_ = std::make_unique(footer); + footer_ = parseFooter(decompressed.get(), arena_.get()); } else { - auto footer = google::protobuf::Arena::CreateMessage( - arena_.get()); - ProtoUtils::readProtoInto( - createDecompressedStream(std::move(footerStream), "File Footer"), - footer); - footer_ = std::make_unique(footer); + footer_ = parseFooter(decompressed.get(), arena_.get()); } + stripeMetadataCacheBuffer_ = footerBuffer; + stripeMetadataCacheBufferSize_ = footerOffset; + schema_ = std::dynamic_pointer_cast( convertType(*footer_, 0, options_.fileColumnNamesReadAsLowerCase())); VELOX_CHECK_NOT_NULL(schema_, "invalid schema"); - // load stripe index/footer cache + // initialize file decrypter + handler_ = + DecryptionHandler::create(*footer_, options_.decrypterFactory().get()); +} + +void ReaderBase::loadCache() { + if (!stripeMetadataCacheBuffer_) { + // NOTE: we only expect call this once as stripeMetadataCacheBuffer_ is + // reset on the first call. + return; + } + const uint64_t footerSize = postScript_->footerLength(); + const uint64_t cacheSize = + postScript_->hasCacheSize() ? postScript_->cacheSize() : 0; + const uint64_t tailSize = 1 + psLength_ + footerSize + cacheSize; if (cacheSize > 0) { VELOX_CHECK_EQ(format(), DwrfFormat::kDwrf); const uint64_t cacheOffset = fileLength_ - tailSize; @@ -189,8 +237,20 @@ ReaderBase::ReaderBase( } else { auto cacheBuffer = std::make_shared>( options_.memoryPool(), cacheSize); - input_->read(cacheOffset, cacheSize, LogType::FOOTER) - ->readFully(cacheBuffer->data(), cacheSize); + auto* target = cacheBuffer->data(); + auto* source = stripeMetadataCacheBuffer_->as(); + auto copySize = cacheSize; + if (cacheSize > stripeMetadataCacheBufferSize_) { + auto remainingBytes = cacheSize - stripeMetadataCacheBufferSize_; + auto stream = + input_->read(cacheOffset, remainingBytes, LogType::FOOTER); + stream->readFully(target, remainingBytes); + target += remainingBytes; + copySize -= remainingBytes; + } else { + source += stripeMetadataCacheBufferSize_ - cacheSize; + } + ::memcpy(target, source, copySize); cache_ = std::make_unique( postScript_->cacheMode(), *footer_, std::move(cacheBuffer)); } @@ -208,9 +268,8 @@ ReaderBase::ReaderBase( input_->load(LogType::FOOTER); } } - // initialize file decrypter - handler_ = - DecryptionHandler::create(*footer_, options_.decrypterFactory().get()); + // Release the memory as we no longer need it. + stripeMetadataCacheBuffer_.reset(); } std::vector ReaderBase::rowsPerStripe() const { diff --git a/velox/dwio/dwrf/reader/ReaderBase.h b/velox/dwio/dwrf/reader/ReaderBase.h index 11f9070610e7..d43c660dc1ab 100644 --- a/velox/dwio/dwrf/reader/ReaderBase.h +++ b/velox/dwio/dwrf/reader/ReaderBase.h @@ -80,13 +80,13 @@ class ReaderBase { const proto::Footer* footer, std::unique_ptr cache, std::unique_ptr handler = nullptr) - : postScript_{std::move(ps)}, - footer_{std::make_unique(footer)}, - cache_{std::move(cache)}, - handler_{std::move(handler)}, - options_{dwio::common::ReaderOptions(&pool)}, + : options_{dwio::common::ReaderOptions(&pool)}, input_{std::move(input)}, fileLength_{0}, + postScript_{std::move(ps)}, + footer_{std::make_unique(footer)}, + handler_{std::move(handler)}, + cache_{std::move(cache)}, schema_{ std::dynamic_pointer_cast(convertType(*footer_))}, psLength_{0} { @@ -150,6 +150,8 @@ class ReaderBase { return *input_; } + void loadCache(); + const std::unique_ptr& metadataCache() const { return cache_; } @@ -246,6 +248,10 @@ class ReaderBase { return options_.randomSkip(); } + int footerBufferOverread() const { + return footerBufferOverread_; + } + private: static std::shared_ptr convertType( const FooterWrapper& footer, @@ -260,16 +266,19 @@ class ReaderBase { return options; } - std::unique_ptr arena_; - std::unique_ptr postScript_; - std::unique_ptr footer_ = nullptr; - std::unique_ptr cache_; - std::unique_ptr handler_; - const dwio::common::ReaderOptions options_; const std::unique_ptr input_; const uint64_t fileLength_; + BufferPtr stripeMetadataCacheBuffer_; + int32_t stripeMetadataCacheBufferSize_; + int32_t footerBufferOverread_; + std::unique_ptr arena_; + std::unique_ptr postScript_; + std::unique_ptr footer_; + std::unique_ptr handler_; + std::unique_ptr cache_; + RowTypePtr schema_; // Lazily populated mutable std::shared_ptr schemaWithId_; diff --git a/velox/dwio/dwrf/test/ReaderTest.cpp b/velox/dwio/dwrf/test/ReaderTest.cpp index fa2ee0259aa2..ad26b7954e6c 100644 --- a/velox/dwio/dwrf/test/ReaderTest.cpp +++ b/velox/dwio/dwrf/test/ReaderTest.cpp @@ -1392,7 +1392,7 @@ TEST_F(TestReader, fileColumnNamesReadAsLowerCaseComplexStruct) { TEST_F(TestReader, TestStripeSizeCallback) { dwio::common::ReaderOptions readerOpts{pool()}; readerOpts.setFilePreloadThreshold(0); - readerOpts.setFooterEstimatedSize(4); + readerOpts.setFooterEstimatedSize(17); RowReaderOptions rowReaderOpts; std::shared_ptr requestedType = std::dynamic_pointer_cast< @@ -1420,7 +1420,7 @@ TEST_F(TestReader, TestStripeSizeCallback) { TEST_F(TestReader, TestStripeSizeCallbackLimitsOneStripe) { dwio::common::ReaderOptions readerOpts{pool()}; readerOpts.setFilePreloadThreshold(0); - readerOpts.setFooterEstimatedSize(4); + readerOpts.setFooterEstimatedSize(17); RowReaderOptions rowReaderOpts; std::shared_ptr requestedType = std::dynamic_pointer_cast< @@ -1449,7 +1449,7 @@ TEST_F(TestReader, TestStripeSizeCallbackLimitsOneStripe) { TEST_F(TestReader, TestStripeSizeCallbackLimitsTwoStripe) { dwio::common::ReaderOptions readerOpts{pool()}; readerOpts.setFilePreloadThreshold(0); - readerOpts.setFooterEstimatedSize(4); + readerOpts.setFooterEstimatedSize(17); RowReaderOptions rowReaderOpts; std::shared_ptr requestedType = std::dynamic_pointer_cast< diff --git a/velox/dwio/dwrf/test/TestDecompression.cpp b/velox/dwio/dwrf/test/TestDecompression.cpp index fcba33d80cfd..37b0166558ff 100644 --- a/velox/dwio/dwrf/test/TestDecompression.cpp +++ b/velox/dwio/dwrf/test/TestDecompression.cpp @@ -1054,7 +1054,7 @@ class TestingSeekableInputStream : public SeekableInputStream { return "testing"; } - size_t positionSize() override { + size_t positionSize() const override { return 1; } diff --git a/velox/dwio/dwrf/test/WriterTest.cpp b/velox/dwio/dwrf/test/WriterTest.cpp index 1570f2988865..13d81ff9cb41 100644 --- a/velox/dwio/dwrf/test/WriterTest.cpp +++ b/velox/dwio/dwrf/test/WriterTest.cpp @@ -62,7 +62,9 @@ class WriterTest : public Test { auto readFile = std::make_shared(std::move(data)); auto input = std::make_unique(std::move(readFile), *pool_); dwio::common::ReaderOptions readerOpts{pool_.get()}; - return std::make_unique(readerOpts, std::move(input)); + auto reader = std::make_unique(readerOpts, std::move(input)); + reader->loadCache(); + return reader; } auto& getContext() { diff --git a/velox/exec/tests/PrintPlanWithStatsTest.cpp b/velox/exec/tests/PrintPlanWithStatsTest.cpp index 440f3b47f072..eb9f9bff740c 100644 --- a/velox/exec/tests/PrintPlanWithStatsTest.cpp +++ b/velox/exec/tests/PrintPlanWithStatsTest.cpp @@ -190,12 +190,13 @@ TEST_F(PrintPlanWithStatsTest, innerJoinWithTableScan) { {" dataSourceAddSplitWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" dataSourceReadWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" dynamicFiltersAccepted[ ]* sum: 1, count: 1, min: 1, max: 1"}, - {" flattenStringDictionaryValues [ ]* sum: 0, count: 1, min: 0, max: 0"}, + {" footerBufferOverread[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" ioWaitWallNanos [ ]* sum: .+, count: .+ min: .+, max: .+"}, {" maxSingleIoWaitWallNanos[ ]*sum: .+, count: 1, min: .+, max: .+"}, {" numPrefetch [ ]* sum: .+, count: 1, min: .+, max: .+"}, {" numRamRead [ ]* sum: 60, count: 1, min: 60, max: 60"}, {" numStorageRead [ ]* sum: .+, count: 1, min: .+, max: .+"}, + {" numStripes[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" overreadBytes[ ]* sum: 0B, count: 1, min: 0B, max: 0B"}, {" prefetchBytes [ ]* sum: .+, count: 1, min: .+, max: .+"}, {" preloadedSplits[ ]+sum: .+, count: .+, min: .+, max: .+", @@ -206,9 +207,6 @@ TEST_F(PrintPlanWithStatsTest, innerJoinWithTableScan) { {" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningFinishWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningGetOutputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, - {" skippedSplitBytes [ ]* sum: 0B, count: 1, min: 0B, max: 0B"}, - {" skippedSplits [ ]* sum: 0, count: 1, min: 0, max: 0"}, - {" skippedStrides [ ]* sum: 0, count: 1, min: 0, max: 0"}, {" storageReadBytes [ ]* sum: .+, count: 1, min: .+, max: .+"}, {" totalRemainingFilterTime\\s+sum: .+, count: .+, min: .+, max: .+"}, {" totalScanTime [ ]* sum: .+, count: .+, min: .+, max: .+"}, @@ -283,12 +281,13 @@ TEST_F(PrintPlanWithStatsTest, partialAggregateWithTableScan) { {" Input: 10000 rows \\(.+\\), Output: 10000 rows \\(.+\\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1, Splits: 1, CPU breakdown: B/I/O/F (.+/.+/.+/.+)"}, {" dataSourceAddSplitWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" dataSourceReadWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, - {" flattenStringDictionaryValues [ ]* sum: 0, count: 1, min: 0, max: 0"}, + {" footerBufferOverread[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" ioWaitWallNanos [ ]* sum: .+, count: .+ min: .+, max: .+"}, {" maxSingleIoWaitWallNanos[ ]*sum: .+, count: 1, min: .+, max: .+"}, {" numPrefetch [ ]* sum: .+, count: .+, min: .+, max: .+"}, {" numRamRead [ ]* sum: 7, count: 1, min: 7, max: 7"}, {" numStorageRead [ ]* sum: .+, count: 1, min: .+, max: .+"}, + {" numStripes[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" overreadBytes[ ]* sum: 0B, count: 1, min: 0B, max: 0B"}, {" prefetchBytes [ ]* sum: .+, count: 1, min: .+, max: .+"}, @@ -300,9 +299,6 @@ TEST_F(PrintPlanWithStatsTest, partialAggregateWithTableScan) { {" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningFinishWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningGetOutputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, - {" skippedSplitBytes[ ]* sum: 0B, count: 1, min: 0B, max: 0B"}, - {" skippedSplits [ ]* sum: 0, count: 1, min: 0, max: 0"}, - {" skippedStrides [ ]* sum: 0, count: 1, min: 0, max: 0"}, {" storageReadBytes [ ]* sum: .+, count: 1, min: .+, max: .+"}, {" totalRemainingFilterTime\\s+sum: .+, count: .+, min: .+, max: .+"}, {" totalScanTime [ ]* sum: .+, count: .+, min: .+, max: .+"}}); @@ -355,12 +351,13 @@ TEST_F(PrintPlanWithStatsTest, tableWriterWithTableScan) { {R"( Input: 100 rows \(.+\), Output: 100 rows \(.+\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1, Splits: 1, CPU breakdown: B/I/O/F (.+/.+/.+/.+))"}, {" dataSourceAddSplitWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" dataSourceReadWallNanos[ ]* sum: .+, count: 1, min: .+, max: .+"}, - {" flattenStringDictionaryValues [ ]* sum: 0, count: 1, min: 0, max: 0"}, + {" footerBufferOverread[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" ioWaitWallNanos [ ]* sum: .+, count: .+ min: .+, max: .+"}, {" maxSingleIoWaitWallNanos[ ]*sum: .+, count: 1, min: .+, max: .+"}, {" numPrefetch [ ]* sum: .+, count: .+, min: .+, max: .+"}, {" numRamRead [ ]* sum: 7, count: 1, min: 7, max: 7"}, {" numStorageRead [ ]* sum: .+, count: 1, min: .+, max: .+"}, + {" numStripes[ ]* sum: .+, count: 1, min: .+, max: .+"}, {" overreadBytes[ ]* sum: 0B, count: 1, min: 0B, max: 0B"}, {" prefetchBytes [ ]* sum: .+, count: 1, min: .+, max: .+"}, @@ -372,9 +369,6 @@ TEST_F(PrintPlanWithStatsTest, tableWriterWithTableScan) { {" runningAddInputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningFinishWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, {" runningGetOutputWallNanos\\s+sum: .+, count: 1, min: .+, max: .+"}, - {" skippedSplitBytes[ ]* sum: 0B, count: 1, min: 0B, max: 0B"}, - {" skippedSplits [ ]* sum: 0, count: 1, min: 0, max: 0"}, - {" skippedStrides [ ]* sum: 0, count: 1, min: 0, max: 0"}, {" storageReadBytes [ ]* sum: .+, count: 1, min: .+, max: .+"}, {" totalRemainingFilterTime\\s+sum: .+, count: .+, min: .+, max: .+"}, {" totalScanTime [ ]* sum: .+, count: .+, min: .+, max: .+"}}); diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index 035d1ed42d4d..10ea2cd0c454 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -2388,8 +2388,8 @@ TEST_F(TableScanTest, statsBasedSkippingNulls) { EXPECT_EQ(31'234, stats.rawInputRows); EXPECT_EQ(31'234, stats.inputRows); EXPECT_EQ(31'234, stats.outputRows); - ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedSplits").sum, 0); - ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedStrides").sum, 0); + ASSERT_EQ(getTableScanRuntimeStats(task).count("skippedSplits"), 0); + ASSERT_EQ(getTableScanRuntimeStats(task).count("skippedStrides"), 0); task = assertQuery("c0 IS NULL"); @@ -2398,7 +2398,7 @@ TEST_F(TableScanTest, statsBasedSkippingNulls) { EXPECT_EQ(0, stats.inputRows); EXPECT_EQ(0, stats.outputRows); ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedSplits").sum, 1); - ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedStrides").sum, 0); + ASSERT_EQ(getTableScanRuntimeStats(task).count("skippedStrides"), 0); // c1 IS NULL - first stride should be skipped based on stats task = assertQuery("c1 IS NULL"); @@ -2407,7 +2407,7 @@ TEST_F(TableScanTest, statsBasedSkippingNulls) { EXPECT_EQ(size - 10'000, stats.rawInputRows); EXPECT_EQ(size - 11'111, stats.inputRows); EXPECT_EQ(size - 11'111, stats.outputRows); - ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedSplits").sum, 0); + ASSERT_EQ(getTableScanRuntimeStats(task).count("skippedSplits"), 0); ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedStrides").sum, 1); // c1 IS NOT NULL - 3rd and 4th strides should be skipped based on stats @@ -2417,7 +2417,7 @@ TEST_F(TableScanTest, statsBasedSkippingNulls) { EXPECT_EQ(20'000, stats.rawInputRows); EXPECT_EQ(11'111, stats.inputRows); EXPECT_EQ(11'111, stats.outputRows); - ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedSplits").sum, 0); + ASSERT_EQ(getTableScanRuntimeStats(task).count("skippedSplits"), 0); ASSERT_EQ(getTableScanRuntimeStats(task).at("skippedStrides").sum, 2); } @@ -5419,3 +5419,20 @@ TEST_F(TableScanTest, rowId) { AssertQueryBuilder(plan).split(split).assertResults(expected); } } + +TEST_F(TableScanTest, footerIOCount) { + // We should issue only 1 IO for a split range that does not contain any + // stripe. + auto vector = makeRowVector({makeFlatVector(10, folly::identity)}); + auto file = TempFilePath::create(); + writeToFile(file->getPath(), {vector}); + auto plan = PlanBuilder().tableScan(asRowType(vector->type())).planNode(); + auto task = + AssertQueryBuilder(plan) + .split(makeHiveConnectorSplit(file->getPath(), 10'000, 10'000)) + .assertResults( + BaseVector::create(vector->type(), 0, pool())); + auto stats = getTableScanRuntimeStats(task); + ASSERT_EQ(stats.at("numStorageRead").sum, 1); + ASSERT_GT(stats.at("footerBufferOverread").sum, 0); +}