Skip to content

Commit

Permalink
Data: Add partition stats writer and reader
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat committed Jan 25, 2025
1 parent d693f83 commit 0668d6a
Show file tree
Hide file tree
Showing 4 changed files with 899 additions and 0 deletions.
24 changes: 24 additions & 0 deletions core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -133,4 +134,27 @@ private static Collection<PartitionStats> mergeStats(

return statsMap.values();
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
public static boolean isEqual(
Comparator<StructLike> partitionComparator, PartitionStats stats1, PartitionStats stats2) {
if (stats1 == stats2) {
return true;
} else if (stats1 == null || stats2 == null) {
return false;
}

return partitionComparator.compare(stats1.partition(), stats2.partition()) == 0
&& stats1.specId() == stats2.specId()
&& stats1.dataRecordCount() == stats2.dataRecordCount()
&& stats1.dataFileCount() == stats2.dataFileCount()
&& stats1.totalDataFileSizeInBytes() == stats2.totalDataFileSizeInBytes()
&& stats1.positionDeleteRecordCount() == stats2.positionDeleteRecordCount()
&& stats1.positionDeleteFileCount() == stats2.positionDeleteFileCount()
&& stats1.equalityDeleteRecordCount() == stats2.equalityDeleteRecordCount()
&& stats1.equalityDeleteFileCount() == stats2.equalityDeleteFileCount()
&& stats1.totalRecordCount() == stats2.totalRecordCount()
&& Objects.equals(stats1.lastUpdatedAt(), stats2.lastUpdatedAt())
&& Objects.equals(stats1.lastUpdatedSnapshotId(), stats2.lastUpdatedSnapshotId());
}
}
20 changes: 20 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestTables.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,26 @@ public static TestTable create(
return new TestTable(ops, name, reporter);
}

public static TestTable create(
File temp,
String name,
Schema schema,
PartitionSpec spec,
int formatVersion,
Map<String, String> properties) {
TestTableOperations ops = new TestTableOperations(name, temp);
if (ops.current() != null) {
throw new AlreadyExistsException("Table %s already exists at location: %s", name, temp);
}

ops.commit(
null,
newTableMetadata(
schema, spec, SortOrder.unsorted(), temp.toString(), properties, formatVersion));

return new TestTable(ops, name);
}

public static Transaction beginCreate(File temp, String name, Schema schema, PartitionSpec spec) {
return beginCreate(temp, name, schema, spec, SortOrder.unsorted());
}
Expand Down
273 changes: 273 additions & 0 deletions data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.data;

import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.PartitionStats;
import org.apache.iceberg.PartitionStatsUtil;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.InternalReader;
import org.apache.iceberg.data.parquet.InternalWriter;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Types.IntegerType;
import org.apache.iceberg.types.Types.LongType;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.SnapshotUtil;

/**
* Computes, writes and reads the {@link PartitionStatisticsFile}. Uses generic readers and writers
* to support writing and reading of the stats in table default format.
*/
public final class PartitionStatsHandler {

private PartitionStatsHandler() {}

public enum Column {
PARTITION(0),
SPEC_ID(1),
DATA_RECORD_COUNT(2),
DATA_FILE_COUNT(3),
TOTAL_DATA_FILE_SIZE_IN_BYTES(4),
POSITION_DELETE_RECORD_COUNT(5),
POSITION_DELETE_FILE_COUNT(6),
EQUALITY_DELETE_RECORD_COUNT(7),
EQUALITY_DELETE_FILE_COUNT(8),
TOTAL_RECORD_COUNT(9),
LAST_UPDATED_AT(10),
LAST_UPDATED_SNAPSHOT_ID(11);

private final int id;

Column(int id) {
this.id = id;
}

public int id() {
return id;
}
}

/**
* Generates the partition stats file schema based on a given partition type.
*
* <p>Note: Provide the unified partition schema type as mentioned in the spec.
*
* @param partitionType unified partition schema type.
* @return a schema that corresponds to the provided unified partition type.
*/
public static Schema schema(StructType partitionType) {
Preconditions.checkState(!partitionType.fields().isEmpty(), "table must be partitioned");
return new Schema(
NestedField.required(1, Column.PARTITION.name(), partitionType),
NestedField.required(2, Column.SPEC_ID.name(), IntegerType.get()),
NestedField.required(3, Column.DATA_RECORD_COUNT.name(), LongType.get()),
NestedField.required(4, Column.DATA_FILE_COUNT.name(), IntegerType.get()),
NestedField.required(5, Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.name(), LongType.get()),
NestedField.optional(6, Column.POSITION_DELETE_RECORD_COUNT.name(), LongType.get()),
NestedField.optional(7, Column.POSITION_DELETE_FILE_COUNT.name(), IntegerType.get()),
NestedField.optional(8, Column.EQUALITY_DELETE_RECORD_COUNT.name(), LongType.get()),
NestedField.optional(9, Column.EQUALITY_DELETE_FILE_COUNT.name(), IntegerType.get()),
NestedField.optional(10, Column.TOTAL_RECORD_COUNT.name(), LongType.get()),
NestedField.optional(11, Column.LAST_UPDATED_AT.name(), LongType.get()),
NestedField.optional(12, Column.LAST_UPDATED_SNAPSHOT_ID.name(), LongType.get()));
}

/**
* Computes and writes the {@link PartitionStatisticsFile} for a given table's current snapshot.
*
* @param table The {@link Table} for which the partition statistics is computed.
* @return {@link PartitionStatisticsFile} for the current snapshot.
*/
public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) throws IOException {
return computeAndWriteStatsFile(table, null);
}

/**
* Computes and writes the {@link PartitionStatisticsFile} for a given table and branch.
*
* @param table The {@link Table} for which the partition statistics is computed.
* @param branch A branch information to select the required snapshot.
* @return {@link PartitionStatisticsFile} for the given branch.
*/
public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, String branch) {
Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
if (currentSnapshot == null) {
Preconditions.checkArgument(
branch == null, "Couldn't find the snapshot for the branch %s", branch);
return null;
}

StructType partitionType = Partitioning.partitionType(table);
Collection<PartitionStats> stats = PartitionStatsUtil.computeStats(table, currentSnapshot);
List<PartitionStats> sortedStats = PartitionStatsUtil.sortStats(stats, partitionType);
return writePartitionStatsFile(
table, currentSnapshot.snapshotId(), schema(partitionType), sortedStats.iterator());
}

@VisibleForTesting
static PartitionStatisticsFile writePartitionStatsFile(
Table table, long snapshotId, Schema dataSchema, Iterator<PartitionStats> records) {
OutputFile outputFile = newPartitionStatsFile(table, snapshotId);

try (DataWriter<StructLike> writer = dataWriter(dataSchema, outputFile); ) {
records.forEachRemaining(writer::write);
} catch (IOException e) {
throw new UncheckedIOException(e);
}

return ImmutableGenericPartitionStatisticsFile.builder()
.snapshotId(snapshotId)
.path(outputFile.location())
.fileSizeInBytes(outputFile.toInputFile().getLength())
.build();
}

/**
* Reads partition statistics from the specified {@link InputFile} using given schema.
*
* @param schema The {@link Schema} of the partition statistics file.
* @param inputFile An {@link InputFile} pointing to the partition stats file.
*/
public static CloseableIterable<PartitionStats> readPartitionStatsFile(
Schema schema, InputFile inputFile) {
CloseableIterable<StructLike> records = dataReader(schema, inputFile);
return CloseableIterable.transform(records, PartitionStatsHandler::recordToPartitionStats);
}

private static FileFormat fileFormat(String fileLocation) {
return FileFormat.fromString(fileLocation.substring(fileLocation.lastIndexOf(".") + 1));
}

private static OutputFile newPartitionStatsFile(Table table, long snapshotId) {
FileFormat fileFormat =
fileFormat(
table.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT));
return table
.io()
.newOutputFile(
((HasTableOperations) table)
.operations()
.metadataFileLocation(
fileFormat.addExtension(
String.format(Locale.ROOT, "partition-stats-%d", snapshotId))));
}

private static DataWriter<StructLike> dataWriter(Schema dataSchema, OutputFile outputFile)
throws IOException {
FileFormat fileFormat = fileFormat(outputFile.location());
switch (fileFormat) {
case PARQUET:
return Parquet.writeData(outputFile)
.schema(dataSchema)
.createWriterFunc(InternalWriter::buildWriter)
.overwrite()
.withSpec(PartitionSpec.unpartitioned())
.build();
case AVRO:
return Avro.writeData(outputFile)
.schema(dataSchema)
.createWriterFunc(org.apache.iceberg.avro.InternalWriter::create)
.overwrite()
.withSpec(PartitionSpec.unpartitioned())
.build();
case ORC:
// Internal writers are not supported for ORC yet.
default:
throw new UnsupportedOperationException("Unsupported file format:" + fileFormat.name());
}
}

private static CloseableIterable<StructLike> dataReader(Schema schema, InputFile inputFile) {
FileFormat fileFormat = fileFormat(inputFile.location());
switch (fileFormat) {
case PARQUET:
return Parquet.read(inputFile)
.project(schema)
.createReaderFunc(
fileSchema ->
org.apache.iceberg.data.parquet.InternalReader.buildReader(schema, fileSchema))
.build();
case AVRO:
return Avro.read(inputFile)
.project(schema)
.createReaderFunc(fileSchema -> InternalReader.create(schema))
.build();
case ORC:
// Internal readers are not supported for ORC yet.
default:
throw new UnsupportedOperationException("Unsupported file format:" + fileFormat.name());
}
}

private static PartitionStats recordToPartitionStats(StructLike record) {
PartitionStats stats =
new PartitionStats(
record.get(Column.PARTITION.id(), StructLike.class),
record.get(Column.SPEC_ID.id(), Integer.class));
stats.set(Column.DATA_RECORD_COUNT.id(), record.get(Column.DATA_RECORD_COUNT.id(), Long.class));
stats.set(Column.DATA_FILE_COUNT.id(), record.get(Column.DATA_FILE_COUNT.id(), Integer.class));
stats.set(
Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.id(),
record.get(Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.id(), Long.class));
stats.set(
Column.POSITION_DELETE_RECORD_COUNT.id(),
record.get(Column.POSITION_DELETE_RECORD_COUNT.id(), Long.class));
stats.set(
Column.POSITION_DELETE_FILE_COUNT.id(),
record.get(Column.POSITION_DELETE_FILE_COUNT.id(), Integer.class));
stats.set(
Column.EQUALITY_DELETE_RECORD_COUNT.id(),
record.get(Column.EQUALITY_DELETE_RECORD_COUNT.id(), Long.class));
stats.set(
Column.EQUALITY_DELETE_FILE_COUNT.id(),
record.get(Column.EQUALITY_DELETE_FILE_COUNT.id(), Integer.class));
stats.set(
Column.TOTAL_RECORD_COUNT.id(), record.get(Column.TOTAL_RECORD_COUNT.id(), Long.class));
stats.set(Column.LAST_UPDATED_AT.id(), record.get(Column.LAST_UPDATED_AT.id(), Long.class));
stats.set(
Column.LAST_UPDATED_SNAPSHOT_ID.id(),
record.get(Column.LAST_UPDATED_SNAPSHOT_ID.id(), Long.class));
return stats;
}
}
Loading

0 comments on commit 0668d6a

Please sign in to comment.