From bfee4b788901fbebf881162074eae3b09c5550d1 Mon Sep 17 00:00:00 2001 From: Hongbin Ma Date: Wed, 29 Nov 2023 20:41:27 +0800 Subject: [PATCH] support rowrange interface --- cpp/src/parquet/CMakeLists.txt | 1 + cpp/src/parquet/arrow/reader.cc | 185 ++++++++- cpp/src/parquet/arrow/reader.h | 10 + cpp/src/parquet/arrow/reader_internal.h | 5 + cpp/src/parquet/column_reader.cc | 32 +- cpp/src/parquet/column_reader.h | 276 +++++++++++++ cpp/src/parquet/range_reader_test.cc | 491 ++++++++++++++++++++++++ 7 files changed, 976 insertions(+), 24 deletions(-) create mode 100644 cpp/src/parquet/range_reader_test.cc diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index e6aad7cee2a3e..0b947af762b20 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -354,6 +354,7 @@ add_parquet_test(reader-test level_conversion_test.cc column_scanner_test.cc reader_test.cc + range_reader_test.cc stream_reader_test.cc test_util.cc) diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 40fbdcbb562b1..10c731a6a8b9e 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -17,6 +17,8 @@ #include "parquet/arrow/reader.h" +#include + #include #include #include @@ -72,6 +74,8 @@ using arrow::internal::Iota; // Help reduce verbosity using ParquetReader = parquet::ParquetFileReader; +using parquet::Range; +using parquet::RowRangesPtr; using parquet::internal::RecordReader; namespace bit_util = arrow::bit_util; @@ -200,10 +204,11 @@ class FileReaderImpl : public FileReader { return ReadRowGroups(Iota(reader_->metadata()->num_row_groups()), indices, out); } - Status GetFieldReader(int i, - const std::shared_ptr>& included_leaves, - const std::vector& row_groups, - std::unique_ptr* out) { + Status GetFieldReader( + int i, const std::shared_ptr>& included_leaves, + const std::vector& row_groups, + const std::shared_ptr>& row_ranges_map, + std::unique_ptr* out) { // Should be covered by GetRecordBatchReader checks but // manifest_.schema_fields is a separate variable so be extra careful. if (ARROW_PREDICT_FALSE(i < 0 || @@ -219,13 +224,15 @@ class FileReaderImpl : public FileReader { ctx->iterator_factory = SomeRowGroupsFactory(row_groups); ctx->filter_leaves = true; ctx->included_leaves = included_leaves; + ctx->row_ranges_map = row_ranges_map; return GetReader(manifest_.schema_fields[i], ctx, out); } - Status GetFieldReaders(const std::vector& column_indices, - const std::vector& row_groups, - std::vector>* out, - std::shared_ptr<::arrow::Schema>* out_schema) { + Status GetFieldReaders( + const std::vector& column_indices, const std::vector& row_groups, + const std::shared_ptr>& row_ranges_map, + std::vector>* out, + std::shared_ptr<::arrow::Schema>* out_schema) { // We only need to read schema fields which have columns indicated // in the indices vector ARROW_ASSIGN_OR_RAISE(std::vector field_indices, @@ -237,8 +244,8 @@ class FileReaderImpl : public FileReader { ::arrow::FieldVector out_fields(field_indices.size()); for (size_t i = 0; i < out->size(); ++i) { std::unique_ptr reader; - RETURN_NOT_OK( - GetFieldReader(field_indices[i], included_leaves, row_groups, &reader)); + RETURN_NOT_OK(GetFieldReader(field_indices[i], included_leaves, row_groups, + row_ranges_map, &reader)); out_fields[i] = reader->field(); out->at(i) = std::move(reader); @@ -265,7 +272,7 @@ class FileReaderImpl : public FileReader { std::vector row_groups = Iota(reader_->metadata()->num_row_groups()); std::unique_ptr reader; - RETURN_NOT_OK(GetFieldReader(i, included_leaves, row_groups, &reader)); + RETURN_NOT_OK(GetFieldReader(i, included_leaves, row_groups, NULLPTR, &reader)); return ReadColumn(i, row_groups, reader.get(), out); } @@ -336,19 +343,26 @@ class FileReaderImpl : public FileReader { return ReadRowGroup(i, Iota(reader_->metadata()->num_columns()), table); } + Status GetRecordBatchReader( + const std::vector& row_group_indices, const std::vector& column_indices, + const std::shared_ptr>& row_ranges_map, + std::unique_ptr* out) override; + Status GetRecordBatchReader(const std::vector& row_group_indices, const std::vector& column_indices, - std::unique_ptr* out) override; + std::unique_ptr* out) override { + return GetRecordBatchReader(row_group_indices, column_indices, NULLPTR, out); + } Status GetRecordBatchReader(const std::vector& row_group_indices, std::unique_ptr* out) override { return GetRecordBatchReader(row_group_indices, - Iota(reader_->metadata()->num_columns()), out); + Iota(reader_->metadata()->num_columns()), NULLPTR, out); } Status GetRecordBatchReader(std::unique_ptr* out) override { return GetRecordBatchReader(Iota(num_row_groups()), - Iota(reader_->metadata()->num_columns()), out); + Iota(reader_->metadata()->num_columns()), NULLPTR, out); } ::arrow::Result<::arrow::AsyncGenerator>> @@ -451,6 +465,42 @@ class RowGroupReaderImpl : public RowGroupReader { // ---------------------------------------------------------------------- // Column reader implementations +struct RowRangesPageFilter { + explicit RowRangesPageFilter(const RowRangesPtr& row_ranges_, + const RowRangesPtr& page_ranges_) + : row_ranges(row_ranges_), page_ranges(page_ranges_) { + assert(page_ranges != nullptr); + assert(page_ranges->getRanges().size() > 0); + } + + bool operator()(const DataPageStats& stats) { + ++page_range_idx; + + if (row_range_idx >= row_ranges->getRanges().size()) { + return true; + } + + Range current_page_range = (*page_ranges)[page_range_idx]; + + if (current_page_range.isBefore((*row_ranges)[row_range_idx])) { + return true; + } + + while (row_range_idx < row_ranges->getRanges().size() && + current_page_range.isAfter((*row_ranges)[row_range_idx])) { + row_range_idx++; + } + + return row_range_idx >= row_ranges->getRanges().size(); + } + + size_t row_range_idx = 0; + const RowRangesPtr row_ranges; + + int page_range_idx = -1; + const RowRangesPtr page_ranges; +}; + // Leaf reader is for primitive arrays and primitive children of nested arrays class LeafReader : public ColumnReaderImpl { public: @@ -512,8 +562,90 @@ class LeafReader : public ColumnReaderImpl { private: std::shared_ptr out_; + + void checkAndGetPageRanges(const std::shared_ptr& row_ranges, + std::shared_ptr& page_ranges) { + // check offset exists + const auto rg_pg_index_reader = + ctx_->reader->GetPageIndexReader()->RowGroup(input_->current_row_group()); + + if (!rg_pg_index_reader) { + throw ParquetException( + "Attempting to read with Ranges but Page Index is not found for Row " + "Group: " + + std::to_string(input_->current_row_group())); + } + const auto offset_index = rg_pg_index_reader->GetOffsetIndex(input_->column_index()); + + if (!offset_index) { + throw ParquetException( + "Attempting to read with Ranges but Offset index is not found for " + "column: " + + field_->name()); + } + + if (!row_ranges->isValid()) { + throw ParquetException( + "The provided row range is invalid, keep it monotone and non-interleaving: " + + row_ranges->toString()); + } + + const auto page_locations = offset_index->page_locations(); + page_ranges = std::make_shared(); + for (size_t i = 0; i < page_locations.size() - 1; i++) { + page_ranges->add( + {page_locations[i].first_row_index, page_locations[i + 1].first_row_index - 1}, + false); + } + if (page_locations.size() >= 1) { + page_ranges->add( + {page_locations[page_locations.size() - 1].first_row_index, + ctx_->reader->metadata()->RowGroup(input_->current_row_group())->num_rows() - + 1}, + false); + } + + if (row_ranges->getRanges().size() > 0) { + if ((*row_ranges).getRanges().back().to > page_ranges->getRanges().back().to) { + throw ParquetException( + "The provided row range " + row_ranges->toString() + + " exceeds last page :" + page_ranges->getRanges().back().toString()); + } + } + } + void NextRowGroup() { std::unique_ptr page_reader = input_->NextChunk(); + + /// using page index to reduce cost + if (page_reader != nullptr && ctx_->row_ranges_map) { + // reset skipper + record_reader_->set_record_skipper(NULLPTR); + + // if specific row range is provided for this rg + if (const auto iter = ctx_->row_ranges_map->find(input_->current_row_group()); + iter != ctx_->row_ranges_map->end()) { + if (iter->second != nullptr && iter->second->rowCount() != 0) { + std::shared_ptr page_ranges; + checkAndGetPageRanges(iter->second, page_ranges); + + // part 1, skip decompressing & decoding unnecessary pages + page_reader->set_data_page_filter( + RowRangesPageFilter(iter->second, page_ranges)); + + // part 2, skip unnecessary rows in necessary pages + record_reader_->set_record_skipper( + std::make_shared(*page_ranges, + *iter->second)); + } else { + NextRowGroup(); + return; + } + } + // Else iff row_ranges_map exists but no row_ranges is found for this RG key, this RG will be read + } + + record_reader_->reset_current_rg_processed_records(); record_reader_->SetPageReader(std::move(page_reader)); } @@ -982,9 +1114,10 @@ Status GetReader(const SchemaField& field, const std::shared_ptr& } // namespace -Status FileReaderImpl::GetRecordBatchReader(const std::vector& row_groups, - const std::vector& column_indices, - std::unique_ptr* out) { +Status FileReaderImpl::GetRecordBatchReader( + const std::vector& row_groups, const std::vector& column_indices, + const std::shared_ptr>& row_ranges_map, + std::unique_ptr* out) { RETURN_NOT_OK(BoundsCheck(row_groups, column_indices)); if (reader_properties_.pre_buffer()) { @@ -997,7 +1130,8 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector& row_groups, std::vector> readers; std::shared_ptr<::arrow::Schema> batch_schema; - RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &batch_schema)); + RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, row_ranges_map, &readers, + &batch_schema)); if (readers.empty()) { // Just generate all batches right now; they're cheap since they have no columns. @@ -1218,6 +1352,7 @@ Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_facto ctx->pool = pool_; ctx->iterator_factory = iterator_factory; ctx->filter_leaves = false; + std::unique_ptr result; RETURN_NOT_OK(GetReader(manifest_.schema_fields[i], ctx, &result)); *out = std::move(result); @@ -1251,7 +1386,8 @@ Future> FileReaderImpl::DecodeRowGroups( // in a sync context too so use `this` over `self` std::vector> readers; std::shared_ptr<::arrow::Schema> result_schema; - RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema)); + RETURN_NOT_OK( + GetFieldReaders(column_indices, row_groups, NULLPTR, &readers, &result_schema)); // OptionalParallelForAsync requires an executor if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool(); @@ -1314,6 +1450,17 @@ Status FileReader::GetRecordBatchReader(const std::vector& row_group_indice return Status::OK(); } +Status FileReader::GetRecordBatchReader( + const std::vector& row_group_indices, const std::vector& column_indices, + const std::shared_ptr>& row_ranges_map, + std::shared_ptr* out) { + std::unique_ptr tmp; + RETURN_NOT_OK( + GetRecordBatchReader(row_group_indices, column_indices, row_ranges_map, &tmp)); + out->reset(tmp.release()); + return Status::OK(); +} + Status FileReader::Make(::arrow::MemoryPool* pool, std::unique_ptr reader, const ArrowReaderProperties& properties, diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h index 2cbd36176f5e3..0cd8f298d79dc 100644 --- a/cpp/src/parquet/arrow/reader.h +++ b/cpp/src/parquet/arrow/reader.h @@ -23,6 +23,7 @@ #include #include +#include "parquet/column_reader.h" #include "parquet/file_reader.h" #include "parquet/platform.h" #include "parquet/properties.h" @@ -187,6 +188,11 @@ class PARQUET_EXPORT FileReader { const std::vector& row_group_indices, const std::vector& column_indices, std::unique_ptr<::arrow::RecordBatchReader>* out) = 0; + virtual ::arrow::Status GetRecordBatchReader( + const std::vector& row_group_indices, const std::vector& column_indices, + const std::shared_ptr>& row_ranges_map, + std::unique_ptr<::arrow::RecordBatchReader>* out) = 0; + /// \brief Return a RecordBatchReader of row groups selected from /// row_group_indices, whose columns are selected by column_indices. /// @@ -199,6 +205,10 @@ class PARQUET_EXPORT FileReader { /// /// \returns error Status if either row_group_indices or column_indices /// contains an invalid index + ::arrow::Status GetRecordBatchReader( + const std::vector& row_group_indices, const std::vector& column_indices, + const std::shared_ptr>& row_ranges_map, + std::shared_ptr<::arrow::RecordBatchReader>* out); ::arrow::Status GetRecordBatchReader(const std::vector& row_group_indices, const std::vector& column_indices, std::shared_ptr<::arrow::RecordBatchReader>* out); diff --git a/cpp/src/parquet/arrow/reader_internal.h b/cpp/src/parquet/arrow/reader_internal.h index cf9dbb86577b5..56be0f93f4140 100644 --- a/cpp/src/parquet/arrow/reader_internal.h +++ b/cpp/src/parquet/arrow/reader_internal.h @@ -76,6 +76,7 @@ class FileColumnIterator { } auto row_group_reader = reader_->RowGroup(row_groups_.front()); + current_rg = row_groups_.front(); row_groups_.pop_front(); return row_group_reader->GetColumnPageReader(column_index_); } @@ -88,11 +89,14 @@ class FileColumnIterator { int column_index() const { return column_index_; } + int current_row_group() const { return current_rg; } + protected: int column_index_; ParquetFileReader* reader_; const SchemaDescriptor* schema_; std::deque row_groups_; + int current_rg = 0; }; using FileColumnIteratorFactory = @@ -109,6 +113,7 @@ struct ReaderContext { FileColumnIteratorFactory iterator_factory; bool filter_leaves; std::shared_ptr> included_leaves; + std::shared_ptr> row_ranges_map; bool IncludesLeaf(int leaf_index) const { if (this->filter_leaves) { diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 3294aaaf283f1..b517ee7c798ea 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1373,7 +1373,7 @@ class TypedRecordReader : public TypedColumnReaderImpl, int64_t records_read = 0; if (has_values_to_process()) { - records_read += ReadRecordData(num_records); + records_read += ReadRecordDataWithSkipCheck(num_records); } int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records); @@ -1427,11 +1427,11 @@ class TypedRecordReader : public TypedColumnReaderImpl, } levels_written_ += levels_read; - records_read += ReadRecordData(num_records - records_read); + records_read += ReadRecordDataWithSkipCheck(num_records - records_read); } else { // No repetition or definition levels batch_size = std::min(num_records - records_read, batch_size); - records_read += ReadRecordData(batch_size); + records_read += ReadRecordDataWithSkipCheck(batch_size); } } @@ -1634,10 +1634,12 @@ class TypedRecordReader : public TypedColumnReaderImpl, // Top level required field. Number of records equals to number of levels, // and there is not read-ahead for levels. + int64_t skipped_records = 0; if (this->max_rep_level_ == 0 && this->max_def_level_ == 0) { - return this->Skip(num_records); + skipped_records = this->Skip(num_records); + current_rg_processed_records += skipped_records; + return skipped_records; } - int64_t skipped_records = 0; if (this->max_rep_level_ == 0) { // Non-repeated optional field. // First consume whatever is in the buffer. @@ -1653,6 +1655,8 @@ class TypedRecordReader : public TypedColumnReaderImpl, } else { skipped_records += this->SkipRecordsRepeated(num_records); } + + current_rg_processed_records += skipped_records; return skipped_records; } @@ -1984,9 +1988,27 @@ class TypedRecordReader : public TypedColumnReaderImpl, this->ConsumeBufferedValues(values_to_read); } + current_rg_processed_records += records_read; return records_read; } + int64_t ReadRecordDataWithSkipCheck(const int64_t num_records) { + if (!skipper) { + return ReadRecordData(num_records); + } + + while (true) { + const auto advise = skipper->advise_next(current_rg_processed_records); + if (advise == 0) { + return 0; + } + if (advise > 0) { + return ReadRecordData(std::min(num_records, advise)); + } + SkipRecords(-advise); + } + } + void DebugPrintState() override { const int16_t* def_levels = this->def_levels(); const int16_t* rep_levels = this->rep_levels(); diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index 334b8bcffe0b8..dde78d5115c34 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include #include @@ -302,8 +303,275 @@ class TypedColumnReader : public ColumnReader { int32_t* dict_len) = 0; }; +struct Range { + static Range unionRange(const Range& left, const Range& right) { + if (left.from <= right.from) { + if (left.to + 1 >= right.from) { + return {left.from, std::max(left.to, right.to)}; + } + } else if (right.to + 1 >= left.from) { + return {right.from, std::max(left.to, right.to)}; + } + return {-1, -1}; + } + + static Range intersection(const Range& left, const Range& right) { + if (left.from <= right.from) { + if (left.to >= right.from) { + return {right.from, std::min(left.to, right.to)}; + } + } else if (right.to >= left.from) { + return {left.from, std::min(left.to, right.to)}; + } + return {-1, -1}; // Return a default Range object if no intersection range found + } + + int64_t from; + int64_t to; + + Range(const int64_t from_, const int64_t to_) : from(from_), to(to_) { + assert(from <= to); + } + + size_t count() const { return to - from + 1; } + + bool isBefore(const Range& other) const { return to < other.from; } + + bool isAfter(const Range& other) const { return from > other.to; } + + bool isOverlap(const Range& other) const { return !isBefore(other) && !isAfter(other); } + + std::string toString() const { + return "[" + std::to_string(from) + ", " + std::to_string(to) + "]"; + } +}; + +class RowRanges { + std::vector ranges; + + public: + RowRanges() = default; + + explicit RowRanges(const Range& range) { ranges.push_back(range); } + + RowRanges(const std::vector& ranges) { this->ranges = ranges; } + + // copy cstr + RowRanges(const RowRanges& other) { ranges = other.ranges; } + + RowRanges(RowRanges&& other) noexcept { ranges = std::move(other.ranges); } + + static RowRanges unionRanges(const RowRanges& left, const RowRanges& right) { + RowRanges result; + auto it1 = left.ranges.begin(); + auto it2 = right.ranges.begin(); + if (it2 != right.ranges.end()) { + Range range2 = *it2; + while (it1 != left.ranges.end()) { + Range range1 = *it1; + if (range1.isAfter(range2)) { + result.add(range2); + range2 = range1; + const auto tmp = it1; + it1 = it2; + it2 = tmp; + } else { + result.add(range1); + } + ++it1; + } + result.add(range2); + } else { + it2 = it1; + } + while (it2 != right.ranges.end()) { + result.add(*it2); + ++it2; + } + + return result; + } + + static RowRanges intersection(const RowRanges& left, const RowRanges& right) { + RowRanges result; + + size_t rightIndex = 0; + for (const Range& l : left.ranges) { + for (size_t i = rightIndex, n = right.ranges.size(); i < n; ++i) { + const Range& r = right.ranges[i]; + if (l.isBefore(r)) { + break; + } else if (l.isAfter(r)) { + rightIndex = i + 1; + continue; + } + result.add(Range::intersection(l, r)); + } + } + + return result; + } + + RowRanges slice(const int64_t from, const int64_t to) const { + RowRanges result; + for (const Range& range : ranges) { + if (range.from >= from && range.to <= to) { + result.add(range); + } + } + return result; + } + + void add(const Range& range, bool merge = true) { + Range rangeToAdd = range; + if (merge) { + for (int i = static_cast(ranges.size()) - 1; i >= 0; --i) { + Range last = ranges[i]; + if (last.isAfter(range)) { + throw ParquetException(range.toString() + " cannot be added to " + + this->toString()); + } + const Range u = Range::unionRange(last, rangeToAdd); + if (u.from == -1 && u.to == -1) { + break; + } + rangeToAdd = u; + ranges.erase(ranges.begin() + i); + } + } else { + if (ranges.size() > 1) assert(rangeToAdd.from > ranges.back().to); + } + ranges.push_back(rangeToAdd); + } + + size_t rowCount() const { + size_t cnt = 0; + for (const Range& range : ranges) { + cnt += range.count(); + } + return cnt; + } + + bool isValid() const { + if (ranges.size() == 0) return true; + if (ranges[0].from < 0) { + return false; + } + for (size_t i = 1; i < ranges.size(); i++) { + if (ranges[i].from <= ranges[i - 1].to) { + return false; + } + } + return true; + } + + bool isOverlapping(int64_t from, int64_t to) const { + const Range searchRange(from, to); + return isOverlapping(searchRange); + } + + bool isOverlapping(const Range& searchRange) const { + auto it = std::lower_bound( + ranges.begin(), ranges.end(), searchRange, + [](const Range& r1, const Range& r2) { return r1.isBefore(r2); }); + return it != ranges.end() && !(*it).isAfter(searchRange); + } + + std::vector& getRanges() { return ranges; } + + const Range& operator[](size_t index) const { + assert(index < ranges.size()); + return ranges[index]; + } + + std::string toString() const { + std::string result = "["; + for (const Range& range : ranges) { + result += + "(" + std::to_string(range.from) + ", " + std::to_string(range.to) + "), "; + } + if (!ranges.empty()) { + result = result.substr(0, result.size() - 2); + } + result += "]"; + return result; + } +}; + +using RowRangesPtr = std::shared_ptr; + namespace internal { +class PARQUET_EXPORT RecordSkipper { + public: + RecordSkipper(RowRanges& pages, RowRanges& row_ranges_) + : row_ranges(row_ranges_) { // copy row_ranges + RowRanges will_process_pages, skip_pages; + for (auto& page : pages.getRanges()) { + if (!row_ranges.isOverlapping(page)) { + skip_pages.add(page, false); + } + } + + /// Since the skipped pages will be slienly skipped without updating + /// current_rg_processed_records or records_read_, we need to pre-process the row + /// ranges as if these skipped pages never existed + adjust_ranges(skip_pages, row_ranges); + + total_rows_to_process = pages.rowCount() - skip_pages.rowCount(); + } + + /// \brief Return the number of records to read or to skip + /// if return values is positive, it means to read N records + /// if return values is negative, it means to skip N records + /// if return values is 0, it means end of RG + int64_t advise_next(const int64_t current_rg_procesed) { + if (row_ranges.getRanges().size() == row_range_idx) { + return 0; + } + + if (row_ranges[row_range_idx].to < current_rg_procesed) { + row_range_idx++; + if (row_ranges.getRanges().size() == row_range_idx) { + // negative, skip the ramaining rows + return current_rg_procesed - total_rows_to_process; + } + } + + if (row_ranges[row_range_idx].from > current_rg_procesed) { + // negative, skip + return current_rg_procesed - row_ranges[row_range_idx].from; + } + + const auto ret = row_ranges[row_range_idx].to - current_rg_procesed + 1; + assert(ret > 0); + return ret; + } + + private: + /// Keep copy of ranges, because advise_next() will modify them + RowRanges row_ranges; + + size_t row_range_idx = 0; + + size_t total_rows_to_process = 0; + + void adjust_ranges(RowRanges& skip_pages, RowRanges& to_adjust) { + size_t skipped_rows = 0; + auto iter = to_adjust.getRanges().begin(); + auto skip_iter = skip_pages.getRanges().begin(); + while (iter != to_adjust.getRanges().end()) { + while (skip_iter != skip_pages.getRanges().end() && skip_iter->isBefore(*iter)) { + skipped_rows += skip_iter->count(); + ++skip_iter; + } + iter->from -= skipped_rows; + iter->to -= skipped_rows; + ++iter; + } + } +}; + /// \brief Stateful column reader that delimits semantic records for both flat /// and nested columns /// @@ -414,6 +682,10 @@ class PARQUET_EXPORT RecordReader { /// \brief True if reading dense for nullable columns. bool read_dense_for_nullable() const { return read_dense_for_nullable_; } + void reset_current_rg_processed_records() { current_rg_processed_records = 0; } + + void set_record_skipper(std::shared_ptr skipper_) { skipper = skipper_; } + protected: /// \brief Indicates if we can have nullable values. Note that repeated fields /// may or may not be nullable. @@ -422,6 +694,8 @@ class PARQUET_EXPORT RecordReader { bool at_record_start_; int64_t records_read_; + int64_t current_rg_processed_records; // counting both read and skip records + /// \brief Stores values. These values are populated based on each ReadRecords /// call. No extra values are buffered for the next call. SkipRecords will not /// add any value to this buffer. @@ -463,6 +737,8 @@ class PARQUET_EXPORT RecordReader { // If true, we will not leave any space for the null values in the values_ // vector. bool read_dense_for_nullable_ = false; + + std::shared_ptr skipper = NULLPTR; }; class BinaryRecordReader : virtual public RecordReader { diff --git a/cpp/src/parquet/range_reader_test.cc b/cpp/src/parquet/range_reader_test.cc new file mode 100644 index 0000000000000..7a7c7e001bb71 --- /dev/null +++ b/cpp/src/parquet/range_reader_test.cc @@ -0,0 +1,491 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/api.h" +#include "arrow/io/api.h" +#include "arrow/io/memory.h" +#include "arrow/result.h" +#include "arrow/util/type_fwd.h" +#include "parquet/arrow/reader.h" +#include "parquet/arrow/writer.h" + +#include +#include +#include +#include +#include +#include + +/// The table looks like (with_nulls = false): +// { +// { a: {x: 0, y: 0}, b: {0, 0, 0}, c: "0", d: 0}, +// { a: {x: 1, y: 1}, b: {1, 1, 1}, c: "1", d: 1}, +// ... +// { a: {x: 99, y: 99}, b: {99, 99, 99}, c: "99", d: 99} +// } +arrow::Result> GetTable(bool with_nulls = false) { + // if with_nulls, the generated table should null values + // set first 10 rows and last 10 rows to null + std::shared_ptr null_bitmap; + std::vector flags(100, true); + if (with_nulls) { + std::fill_n(flags.begin(), 10, false); + std::fill_n(flags.begin() + 90, 10, false); + + size_t length = flags.size(); + + ARROW_ASSIGN_OR_RAISE(null_bitmap, arrow::AllocateEmptyBitmap(length)); + + uint8_t* bitmap = null_bitmap->mutable_data(); + for (size_t i = 0; i < length; ++i) { + if (flags[i]) { + arrow::bit_util::SetBit(bitmap, i); + } + } + } + + auto int32_builder = arrow::Int32Builder(); + + // Struct col + std::shared_ptr arr_a_x; + std::shared_ptr arr_a_y; + ARROW_RETURN_NOT_OK(int32_builder.AppendValues(arrow::internal::Iota(0, 100))); + ARROW_RETURN_NOT_OK(int32_builder.Finish(&arr_a_x)); + ARROW_RETURN_NOT_OK(int32_builder.AppendValues(arrow::internal::Iota(0, 100))); + ARROW_RETURN_NOT_OK(int32_builder.Finish(&arr_a_y)); + ARROW_ASSIGN_OR_RAISE(auto arr_a, arrow::StructArray::Make( + {arr_a_x, arr_a_y}, + std::vector{"x", "y"}, null_bitmap)); + + // List col + std::shared_ptr arr_b_values; + std::shared_ptr arr_b_offsets; + std::vector b_values; + for (int i = 0; i < 100; ++i) { + for (int j = 0; j < 3; ++j) { + b_values.push_back(i); + } + } + ARROW_RETURN_NOT_OK(int32_builder.AppendValues(b_values)); + ARROW_RETURN_NOT_OK(int32_builder.Finish(&arr_b_values)); + std::vector offsets = arrow::internal::Iota(0, 101); + std::transform(offsets.begin(), offsets.end(), offsets.begin(), + [](const int x) { return x * 3; }); + ARROW_RETURN_NOT_OK(int32_builder.AppendValues(offsets)); + ARROW_RETURN_NOT_OK(int32_builder.Finish(&arr_b_offsets)); + ARROW_ASSIGN_OR_RAISE(auto arr_b, arrow::ListArray::FromArrays( + *arr_b_offsets, *arr_b_values, + arrow::default_memory_pool(), null_bitmap)); + + // string col + auto string_builder = arrow::StringBuilder(); + std::shared_ptr arr_c; + std::vector strs; + uint8_t valid_bytes[100]; + for (size_t i = 0; i < 100; i++) { + strs.push_back(std::to_string(i)); + valid_bytes[i] = flags[i]; + } + ARROW_RETURN_NOT_OK(string_builder.AppendValues(strs, &valid_bytes[0])); + ARROW_RETURN_NOT_OK(string_builder.Finish(&arr_c)); + + // int col + std::shared_ptr arr_d; + ARROW_RETURN_NOT_OK(int32_builder.AppendValues(arrow::internal::Iota(0, 100), flags)); + ARROW_RETURN_NOT_OK(int32_builder.Finish(&arr_d)); + + auto schema = arrow::schema({ + // complex types prior to simple types + field("a", arr_a->type()), + field("b", list(arrow::int32())), + field("c", arrow::utf8()), + field("d", arrow::int32()), + }); + + return arrow::Table::Make(schema, {arr_a, arr_b, arr_c, arr_d}); +} + +arrow::Result> WriteFullFile( + const bool with_nulls = false) { + using parquet::ArrowWriterProperties; + using parquet::WriterProperties; + + ARROW_ASSIGN_OR_RAISE(const auto table, GetTable(with_nulls)); + + const std::shared_ptr props = + WriterProperties::Builder() + .max_row_group_length(30) + ->enable_write_page_index() + ->write_batch_size(13) + ->data_pagesize(1) // this will cause every batch creating a page + ->compression(arrow::Compression::SNAPPY) + ->build(); + + const std::shared_ptr arrow_props = + ArrowWriterProperties::Builder().store_schema()->build(); + + ARROW_ASSIGN_OR_RAISE(const auto out_stream, ::arrow::io::BufferOutputStream::Create()); + + ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(*table.get(), + arrow::default_memory_pool(), out_stream, + /*chunk_size=*/100, props, arrow_props)); + + // { + // // output to a local file for debugging + // ARROW_ASSIGN_OR_RAISE(auto outfile, arrow::io::FileOutputStream::Open( + // "/tmp/range_reader_test.parquet")); + // + // ARROW_RETURN_NOT_OK( + // parquet::arrow::WriteTable(*table.get(), arrow::default_memory_pool(), outfile, + // /*chunk_size=*/100, props, arrow_props)); + // } + + return out_stream->Finish(); +} + +bool checking_col(const std::string& col_name, + const std::vector& column_names) { + return std::find(column_names.begin(), column_names.end(), col_name) != + column_names.end(); +} + +void check_rb(std::shared_ptr rb_reader, + const size_t expected_rows, const int64_t expected_sum) { + const std::vector column_names = rb_reader->schema()->field_names(); + + size_t total_rows = 0; + int64_t sum_a = 0; + int64_t sum_b = 0; + int64_t sum_c = 0; + int64_t sum_d = 0; + for (arrow::Result> maybe_batch : *rb_reader) { + ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + total_rows += batch->num_rows(); + + if (checking_col("a", column_names)) { + auto a_array = + std::dynamic_pointer_cast(batch->GetColumnByName("a")); + auto a_x_array = std::dynamic_pointer_cast(a_array->field(0)); + auto a_y_array = std::dynamic_pointer_cast(a_array->field(1)); + for (auto iter = a_x_array->begin(); iter != a_x_array->end(); ++iter) { + sum_a += (*iter).has_value() ? (*iter).value() : 0; + } + for (auto iter = a_y_array->begin(); iter != a_y_array->end(); ++iter) { + sum_a += (*iter).has_value() ? (*iter).value() : 0; + } + } + + if (checking_col("b", column_names)) { + auto b_array = + std::dynamic_pointer_cast(batch->GetColumnByName("b")); + ASSERT_OK_AND_ASSIGN(auto flatten_b_array, b_array->Flatten()); + auto b_array_values = std::dynamic_pointer_cast(flatten_b_array); + for (auto iter = b_array_values->begin(); iter != b_array_values->end(); ++iter) { + sum_b += (*iter).has_value() ? (*iter).value() : 0; + } + } + + if (checking_col("c", column_names)) { + auto c_array = + std::dynamic_pointer_cast(batch->GetColumnByName("c")); + for (auto iter = c_array->begin(); iter != c_array->end(); ++iter) { + sum_c += std::stoi(std::string((*iter).has_value() ? (*iter).value() : "0")); + } + } + + if (checking_col("d", column_names)) { + auto d_array = + std::dynamic_pointer_cast(batch->GetColumnByName("d")); + for (auto iter = d_array->begin(); iter != d_array->end(); ++iter) { + sum_d += (*iter).has_value() ? (*iter).value() : 0; + } + } + } + ASSERT_EQ(expected_rows, total_rows); + + if (checking_col("a", column_names)) ASSERT_EQ(expected_sum * 2, sum_a); + if (checking_col("b", column_names)) ASSERT_EQ(expected_sum * 3, sum_b); + if (checking_col("c", column_names)) ASSERT_EQ(expected_sum, sum_c); + if (checking_col("d", column_names)) ASSERT_EQ(expected_sum, sum_d); +} + +class TestRecordBatchReaderWithRanges : public testing::Test { + public: + void SetUp() { + ASSERT_OK_AND_ASSIGN(auto buffer, WriteFullFile()); + + arrow::MemoryPool* pool = arrow::default_memory_pool(); + + auto reader_properties = parquet::ReaderProperties(pool); + reader_properties.set_buffer_size(4096 * 4); + reader_properties.enable_buffered_stream(); + + auto arrow_reader_props = parquet::ArrowReaderProperties(); + arrow_reader_props.set_batch_size(10); // default 64 * 1024 + + parquet::arrow::FileReaderBuilder reader_builder; + const auto in_file = std::make_shared(buffer); + ASSERT_OK(reader_builder.Open(in_file, /*memory_map=*/reader_properties)); + reader_builder.memory_pool(pool); + reader_builder.properties(arrow_reader_props); + + ASSERT_OK_AND_ASSIGN(arrow_reader, reader_builder.Build()); + } + + void TearDown() {} + + protected: + std::unique_ptr arrow_reader; +}; + +TEST_F(TestRecordBatchReaderWithRanges, SelectSomePageForEachRG) { + std::shared_ptr rb_reader; + const auto row_ranges_map = std::make_shared>(); + row_ranges_map->insert({0, std::make_shared(parquet::Range{0, 9})}); + row_ranges_map->insert( + {1, std::make_shared(parquet::Range{10, 19})}); + row_ranges_map->insert( + {2, std::make_shared(parquet::Range{20, 29})}); + row_ranges_map->insert({3, std::make_shared(parquet::Range{0, 9})}); + + const std::vector column_indices{0, 1, 2, 3, 4}; + ASSERT_OK(arrow_reader->GetRecordBatchReader({0, 1, 2, 3}, column_indices, + row_ranges_map, &rb_reader)); + + // (0+...+9) + (40+...+49) + (80+...+89) + (90+...+99) = 2280 + check_rb(rb_reader, 40, 2280); +} + +TEST_F(TestRecordBatchReaderWithRanges, SelectAllRange) { + std::shared_ptr rb_reader; + const auto row_ranges_map = std::make_shared>(); + row_ranges_map->insert( + {0, std::make_shared(parquet::Range{0, 29})}); + row_ranges_map->insert( + {1, std::make_shared(parquet::Range{0, 29})}); + row_ranges_map->insert( + {2, std::make_shared(parquet::Range{0, 29})}); + row_ranges_map->insert({3, std::make_shared(parquet::Range{0, 9})}); + + const std::vector column_indices{0, 1, 2, 3, 4}; + ASSERT_OK(arrow_reader->GetRecordBatchReader({0, 1, 2, 3}, column_indices, + row_ranges_map, &rb_reader)); + + // (0+...+99) = 4950 + check_rb(rb_reader, 100, 4950); +} + +TEST_F(TestRecordBatchReaderWithRanges, SelectEmptyRange) { + std::shared_ptr rb_reader; + const auto row_ranges_map = std::make_shared>(); + // here we test four kinds of empty range: + + // rg 0 not put into map -> will read + row_ranges_map->insert({1, nullptr}); // value is nullptr -> will skip + row_ranges_map->insert( + {2, std::make_shared( + std::vector())}); // value is empty -> will skip + row_ranges_map->insert({3, std::make_shared()}); // value is empty -> will skip + + const std::vector column_indices{0, 1, 2, 3, 4}; + const auto status = arrow_reader->GetRecordBatchReader({0, 1, 2, 3}, column_indices, + row_ranges_map, &rb_reader); + ASSERT_OK(status); + // (0+...29) = 435 + check_rb(rb_reader, 30, 435); +} + +TEST_F(TestRecordBatchReaderWithRanges, SelectOneRowSkipOneRow) { + // case 1: only care about RG 0 + { + std::shared_ptr rb_reader; + const auto row_ranges_map = std::make_shared>(); + std::vector ranges; + for (int64_t i = 0; i < 30; i++) { + if (i % 2 == 0) ranges.push_back({i, i}); + } + row_ranges_map->insert({0, std::make_shared(ranges)}); + row_ranges_map->insert({1, nullptr}); + row_ranges_map->insert({2, nullptr}); + row_ranges_map->insert({3, nullptr}); + const std::vector column_indices{0, 1, 2, 3, 4}; + ASSERT_OK(arrow_reader->GetRecordBatchReader({0, 1, 2, 3}, column_indices, + row_ranges_map, &rb_reader)); + + check_rb(rb_reader, 15, 210); // 0 + 2 + ... + 28 = 210 + } + + // case 2: care about RG 0 and 2 + { + std::shared_ptr rb_reader; + const auto row_ranges_map = std::make_shared>(); + std::vector ranges; + for (int64_t i = 0; i < 30; i++) { + if (i % 2 == 0) ranges.push_back({i, i}); + } + row_ranges_map->insert({0, std::make_shared(ranges)}); + row_ranges_map->insert({1, nullptr}); + row_ranges_map->insert({2, std::make_shared(ranges)}); + row_ranges_map->insert({3, nullptr}); + const std::vector column_indices{0, 1, 2, 3, 4}; + ASSERT_OK(arrow_reader->GetRecordBatchReader({0, 1, 2, 3}, column_indices, + row_ranges_map, &rb_reader)); + + check_rb(rb_reader, 30, 1320); // (0 + 2 + ... + 28) + (60 + 62 ... + 88) = 1320 + } +} + +TEST_F(TestRecordBatchReaderWithRanges, InvalidRanges) { + std::shared_ptr rb_reader; + { + const auto row_ranges_map = std::make_shared>(); + row_ranges_map->insert( + {0, std::make_shared(parquet::Range{-1, 5})}); + const std::vector column_indices{0, 1, 2, 3, 4}; + const auto status = arrow_reader->GetRecordBatchReader({0, 1, 2, 3}, column_indices, + row_ranges_map, &rb_reader); + ASSERT_NOT_OK(status); + EXPECT_TRUE(status.message().find("The provided row range is invalid, keep it " + "monotone and non-interleaving: [(-1, 5)]") != + std::string::npos); + } + + { + const auto row_ranges_map = std::make_shared>(); + row_ranges_map->insert({0, std::make_shared(std::vector{ + parquet::Range{0, 4}, parquet::Range{2, 5}})}); + const std::vector column_indices{0, 1, 2, 3, 4}; + const auto status = arrow_reader->GetRecordBatchReader({0, 1, 2, 3}, column_indices, + row_ranges_map, &rb_reader); + ASSERT_NOT_OK(status); + EXPECT_TRUE( + status.message().find("The provided row range is invalid, keep it monotone and " + "non-interleaving: [(0, 4), (2, 5)]") != std::string::npos); + } + { + const auto row_ranges_map = std::make_shared>(); + row_ranges_map->insert( + {0, std::make_shared(std::vector{parquet::Range{0, 30}})}); + const std::vector column_indices{0, 1, 2, 3, 4}; + const auto status = arrow_reader->GetRecordBatchReader({0, 1, 2, 3}, column_indices, + row_ranges_map, &rb_reader); + ASSERT_NOT_OK(status); + EXPECT_TRUE(status.message().find( + "The provided row range [(0, 30)] exceeds last page :[26, 29]") != + std::string::npos); + } +} + +TEST(TestRecordBatchReaderWithRangesBadCases, NoPageIndex) { + using parquet::ArrowWriterProperties; + using parquet::WriterProperties; + + // write a file without page index + ASSERT_OK_AND_ASSIGN(std::shared_ptr table, GetTable()); + std::shared_ptr props = + WriterProperties::Builder() + .max_row_group_length(30) + ->disable_write_page_index() // NO INDEX !!!! + ->write_batch_size(13) + ->data_pagesize(1) + ->compression(arrow::Compression::SNAPPY) + ->build(); + std::shared_ptr arrow_props = + ArrowWriterProperties::Builder().store_schema()->build(); + ASSERT_OK_AND_ASSIGN(auto out_stream, ::arrow::io::BufferOutputStream::Create()); + ASSERT_OK(parquet::arrow::WriteTable(*table.get(), arrow::default_memory_pool(), + out_stream, + /*chunk_size=*/100, props, arrow_props)); + ASSERT_OK_AND_ASSIGN(auto buffer, out_stream->Finish()); + + // try to read the file with Range + arrow::MemoryPool* pool = arrow::default_memory_pool(); + auto reader_properties = parquet::ReaderProperties(pool); + reader_properties.set_buffer_size(4096 * 4); + reader_properties.enable_buffered_stream(); + auto arrow_reader_props = parquet::ArrowReaderProperties(); + arrow_reader_props.set_batch_size(10); // default 64 * 1024 + + parquet::arrow::FileReaderBuilder reader_builder; + auto in_file = std::make_shared(buffer); + ASSERT_OK(reader_builder.Open(in_file, /*memory_map=*/reader_properties)); + reader_builder.memory_pool(pool); + reader_builder.properties(arrow_reader_props); + ASSERT_OK_AND_ASSIGN(auto arrow_reader, reader_builder.Build()); + + std::shared_ptr rb_reader; + auto row_ranges_map = std::make_shared>(); + row_ranges_map->insert( + {0, std::make_shared(parquet::Range{0, 29})}); + std::vector column_indices{0, 1, 2, 3, 4}; + auto status = arrow_reader->GetRecordBatchReader({0, 1, 2, 3}, column_indices, + row_ranges_map, &rb_reader); + ASSERT_NOT_OK(status); + EXPECT_TRUE(status.message().find("Attempting to read with Ranges but Page Index is " + "not found for Row Group: 0") != std::string::npos); +} + +class TestRecordBatchReaderWithRangesWithNulls : public testing::Test { + public: + void SetUp() { + ASSERT_OK_AND_ASSIGN(auto buffer, WriteFullFile(true)); + + arrow::MemoryPool* pool = arrow::default_memory_pool(); + + auto reader_properties = parquet::ReaderProperties(pool); + reader_properties.set_buffer_size(4096 * 4); + reader_properties.enable_buffered_stream(); + + auto arrow_reader_props = parquet::ArrowReaderProperties(); + arrow_reader_props.set_batch_size(10); // default 64 * 1024 + + parquet::arrow::FileReaderBuilder reader_builder; + const auto in_file = std::make_shared(buffer); + ASSERT_OK(reader_builder.Open(in_file, /*memory_map=*/reader_properties)); + reader_builder.memory_pool(pool); + reader_builder.properties(arrow_reader_props); + + ASSERT_OK_AND_ASSIGN(arrow_reader, reader_builder.Build()); + } + + void TearDown() {} + + protected: + std::unique_ptr arrow_reader; +}; + +TEST_F(TestRecordBatchReaderWithRangesWithNulls, SelectOneRowSkipOneRow) { + { + std::shared_ptr rb_reader; + const auto row_ranges_map = std::make_shared>(); + std::vector ranges; + for (int64_t i = 0; i < 30; i++) { + if (i % 2 == 0) ranges.push_back({i, i}); + } + row_ranges_map->insert({0, std::make_shared(ranges)}); + row_ranges_map->insert({1, nullptr}); + row_ranges_map->insert({2, std::make_shared(ranges)}); + row_ranges_map->insert({3, nullptr}); + const std::vector column_indices{0, 1, 2, 3, 4}; + ASSERT_OK(arrow_reader->GetRecordBatchReader({0, 1, 2, 3}, column_indices, + row_ranges_map, &rb_reader)); + + // 0-9 is masked as null, so the ramaining is: + // (10 + 12 + ... + 28) + (60 + 62 ... + 88) = 1320 + check_rb(rb_reader, 30, 1300); + } +} \ No newline at end of file