Skip to content

Commit

Permalink
GH-37630: [C++][Python][Dataset] Allow disabling fragment metadata ca…
Browse files Browse the repository at this point in the history
…ching
  • Loading branch information
pitrou committed Jan 22, 2025
1 parent 3a3f2ec commit 5b7c3f2
Show file tree
Hide file tree
Showing 17 changed files with 95 additions and 33 deletions.
6 changes: 2 additions & 4 deletions cpp/src/arrow/acero/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@
#include "arrow/compute/api_vector.h"
#include "arrow/compute/exec.h"
#include "arrow/compute/expression.h"
#include "arrow/record_batch.h"
#include "arrow/result.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/async_util.h"
#include "arrow/util/future.h"

namespace arrow {

Expand All @@ -55,7 +53,7 @@ namespace acero {
/// \brief This must not be used in release-mode
struct DebugOptions;

using AsyncExecBatchGenerator = AsyncGenerator<std::optional<ExecBatch>>;
using AsyncExecBatchGenerator = std::function<Future<std::optional<ExecBatch>>()>;

/// \addtogroup acero-nodes
/// @{
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/acero/source_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "arrow/acero/map_node.h"
#include "arrow/acero/options.h"
#include "arrow/acero/test_nodes.h"
#include "arrow/record_batch.h"

namespace arrow {
namespace acero {
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/dataset/dataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "arrow/acero/util.h"
#include "arrow/dataset/dataset.h"
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/projector.h"
#include "arrow/dataset/scanner.h"
#include "arrow/table.h"
#include "arrow/util/async_generator.h"
Expand Down Expand Up @@ -75,6 +76,8 @@ Future<std::optional<int64_t>> Fragment::CountRows(compute::Expression,
return Future<std::optional<int64_t>>::MakeFinished(std::nullopt);
}

Status Fragment::ClearCachedMetadata() { return Status::OK(); }

Result<std::shared_ptr<Schema>> InMemoryFragment::ReadPhysicalSchemaImpl() {
return physical_schema_;
}
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/dataset/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ class ARROW_DS_EXPORT Fragment : public std::enable_shared_from_this<Fragment> {
virtual Future<std::optional<int64_t>> CountRows(
compute::Expression predicate, const std::shared_ptr<ScanOptions>& options);

/// XXX
virtual Status ClearCachedMetadata();

virtual std::string type_name() const = 0;
virtual std::string ToString() const { return type_name(); }

Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/dataset/dataset_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/discovery.h"
#include "arrow/dataset/partition.h"
#include "arrow/dataset/projector.h"
#include "arrow/dataset/test_util_internal.h"
#include "arrow/filesystem/mockfs.h"
#include "arrow/stl.h"
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/dataset/discovery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "arrow/dataset/partition.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/record_batch.h"
#include "arrow/util/logging.h"
#include "arrow/util/string.h"

Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/dataset_writer.h"
#include "arrow/dataset/forest_internal.h"
#include "arrow/dataset/projector.h"
#include "arrow/dataset/scanner.h"
#include "arrow/dataset/subtree_internal.h"
#include "arrow/filesystem/filesystem.h"
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,13 @@ Status ParquetFileFragment::SetMetadata(
return Status::OK();
}

Status ParquetFileFragment::ClearCachedMetadata() {
metadata_.reset();
manifest_.reset();
original_metadata_.reset();
return Status::OK();
}

Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
compute::Expression predicate) {
RETURN_NOT_OK(EnsureCompleteMetadata());
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment {
/// \brief Ensure this fragment's FileMetaData is in memory.
Status EnsureCompleteMetadata(parquet::arrow::FileReader* reader = NULLPTR);

Status ClearCachedMetadata() override;

/// \brief Return fragment which selects a filtered subset of this fragment's RowGroups.
Result<std::shared_ptr<Fragment>> Subset(compute::Expression predicate);
Result<std::shared_ptr<Fragment>> Subset(std::vector<int> row_group_ids);
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "arrow/dataset/dataset.h"
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/plan.h"
#include "arrow/record_batch.h"
#include "arrow/table.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/config.h"
Expand Down Expand Up @@ -936,6 +937,11 @@ Status ScannerBuilder::UseThreads(bool use_threads) {
return Status::OK();
}

Status ScannerBuilder::CacheMetadata(bool cache_metadata) {
scan_options_->cache_metadata = cache_metadata;
return Status::OK();
}

Status ScannerBuilder::BatchSize(int64_t batch_size) {
if (batch_size <= 0) {
return Status::Invalid("BatchSize must be greater than 0, got ", batch_size);
Expand Down Expand Up @@ -1058,6 +1064,10 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,
batch->values.emplace_back(partial.record_batch.last);
batch->values.emplace_back(partial.fragment.value->ToString());
if (index != compute::kUnsequencedIndex) batch->index = index++;

if (!scan_options->cache_metadata) {
RETURN_NOT_OK(partial.fragment.value->ClearCachedMetadata());
}
return batch;
});

Expand Down
11 changes: 7 additions & 4 deletions cpp/src/arrow/dataset/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,12 @@
#include "arrow/compute/expression.h"
#include "arrow/compute/type_fwd.h"
#include "arrow/dataset/dataset.h"
#include "arrow/dataset/projector.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
#include "arrow/io/interfaces.h"
#include "arrow/memory_pool.h"
#include "arrow/type_fwd.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/async_generator_fwd.h"
#include "arrow/util/iterator.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/type_fwd.h"

namespace arrow {
Expand Down Expand Up @@ -117,6 +114,9 @@ struct ARROW_DS_EXPORT ScanOptions {
/// If true the scanner will add augmented fields to the output schema.
bool add_augmented_fields = true;

/// XXX
bool cache_metadata = true;

/// Fragment-specific scan options.
std::shared_ptr<FragmentScanOptions> fragment_scan_options;

Expand Down Expand Up @@ -505,6 +505,9 @@ class ARROW_DS_EXPORT ScannerBuilder {
/// ThreadPool found in ScanOptions;
Status UseThreads(bool use_threads = true);

/// XXX
Status CacheMetadata(bool cache_metadata = true);

/// \brief Set the maximum number of rows per RecordBatch.
///
/// \param[in] batch_size the maximum number of rows.
Expand Down
13 changes: 0 additions & 13 deletions cpp/src/arrow/util/async_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -1489,19 +1489,6 @@ AsyncGenerator<T> MakeConcatenatedGenerator(AsyncGenerator<AsyncGenerator<T>> so
return MergedGenerator<T>(std::move(source), 1);
}

template <typename T>
struct Enumerated {
T value;
int index;
bool last;
};

template <typename T>
struct IterationTraits<Enumerated<T>> {
static Enumerated<T> End() { return Enumerated<T>{IterationEnd<T>(), -1, false}; }
static bool IsEnd(const Enumerated<T>& val) { return val.index < 0; }
};

/// \see MakeEnumeratedGenerator
template <typename T>
class EnumeratingGenerator {
Expand Down
4 changes: 1 addition & 3 deletions cpp/src/arrow/util/async_generator_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <functional>

#include "arrow/type_fwd.h"
#include "arrow/util/type_fwd.h"

namespace arrow {

Expand Down Expand Up @@ -47,9 +48,6 @@ class PushGenerator;
template <typename T>
class MergedGenerator;

template <typename T>
struct Enumerated;

template <typename T>
class EnumeratingGenerator;

Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/util/iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "arrow/util/compare.h"
#include "arrow/util/functional.h"
#include "arrow/util/macros.h"
#include "arrow/util/type_fwd.h"
#include "arrow/util/visibility.h"

namespace arrow {
Expand Down Expand Up @@ -82,6 +83,12 @@ struct IterationTraits<std::optional<T>> {
// is nullopt. Add IterationTraits::GetRangeElement() to handle this case
};

template <typename T>
struct IterationTraits<Enumerated<T>> {
static Enumerated<T> End() { return Enumerated<T>{IterationEnd<T>(), -1, false}; }
static bool IsEnd(const Enumerated<T>& val) { return val.index < 0; }
};

/// \brief A generic Iterator that can return errors
template <typename T>
class Iterator : public util::EqualityComparable<Iterator<T>> {
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/util/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,11 @@ class Codec;
class Uri;
} // namespace util

template <typename T>
struct Enumerated {
T value;
int index;
bool last;
};

} // namespace arrow
Loading

0 comments on commit 5b7c3f2

Please sign in to comment.