From 0668d6ac80efd81d58403ba6d2b698e1ff6458cf Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Wed, 25 Sep 2024 18:23:09 +0530 Subject: [PATCH] Data: Add partition stats writer and reader --- .../apache/iceberg/PartitionStatsUtil.java | 24 + .../java/org/apache/iceberg/TestTables.java | 20 + .../iceberg/data/PartitionStatsHandler.java | 273 ++++++++ .../data/TestPartitionStatsHandler.java | 582 ++++++++++++++++++ 4 files changed, 899 insertions(+) create mode 100644 data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java create mode 100644 data/src/test/java/org/apache/iceberg/data/TestPartitionStatsHandler.java diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java b/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java index 1fe4e6767fe6..d902e5f2e8f7 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java +++ b/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java @@ -24,6 +24,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Queue; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -133,4 +134,27 @@ private static Collection mergeStats( return statsMap.values(); } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") + public static boolean isEqual( + Comparator partitionComparator, PartitionStats stats1, PartitionStats stats2) { + if (stats1 == stats2) { + return true; + } else if (stats1 == null || stats2 == null) { + return false; + } + + return partitionComparator.compare(stats1.partition(), stats2.partition()) == 0 + && stats1.specId() == stats2.specId() + && stats1.dataRecordCount() == stats2.dataRecordCount() + && stats1.dataFileCount() == stats2.dataFileCount() + && stats1.totalDataFileSizeInBytes() == stats2.totalDataFileSizeInBytes() + && stats1.positionDeleteRecordCount() == stats2.positionDeleteRecordCount() + && stats1.positionDeleteFileCount() == stats2.positionDeleteFileCount() + && stats1.equalityDeleteRecordCount() == stats2.equalityDeleteRecordCount() + && stats1.equalityDeleteFileCount() == stats2.equalityDeleteFileCount() + && stats1.totalRecordCount() == stats2.totalRecordCount() + && Objects.equals(stats1.lastUpdatedAt(), stats2.lastUpdatedAt()) + && Objects.equals(stats1.lastUpdatedSnapshotId(), stats2.lastUpdatedSnapshotId()); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java b/core/src/test/java/org/apache/iceberg/TestTables.java index eeff5db8e5a6..ca9dbe5e9fcb 100644 --- a/core/src/test/java/org/apache/iceberg/TestTables.java +++ b/core/src/test/java/org/apache/iceberg/TestTables.java @@ -93,6 +93,26 @@ public static TestTable create( return new TestTable(ops, name, reporter); } + public static TestTable create( + File temp, + String name, + Schema schema, + PartitionSpec spec, + int formatVersion, + Map properties) { + TestTableOperations ops = new TestTableOperations(name, temp); + if (ops.current() != null) { + throw new AlreadyExistsException("Table %s already exists at location: %s", name, temp); + } + + ops.commit( + null, + newTableMetadata( + schema, spec, SortOrder.unsorted(), temp.toString(), properties, formatVersion)); + + return new TestTable(ops, name); + } + public static Transaction beginCreate(File temp, String name, Schema schema, PartitionSpec spec) { return beginCreate(temp, name, schema, spec, SortOrder.unsorted()); } diff --git a/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java b/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java new file mode 100644 index 000000000000..95546e483e97 --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionStatisticsFile; +import org.apache.iceberg.PartitionStats; +import org.apache.iceberg.PartitionStatsUtil; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.avro.InternalReader; +import org.apache.iceberg.data.parquet.InternalWriter; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Types.IntegerType; +import org.apache.iceberg.types.Types.LongType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.util.SnapshotUtil; + +/** + * Computes, writes and reads the {@link PartitionStatisticsFile}. Uses generic readers and writers + * to support writing and reading of the stats in table default format. + */ +public final class PartitionStatsHandler { + + private PartitionStatsHandler() {} + + public enum Column { + PARTITION(0), + SPEC_ID(1), + DATA_RECORD_COUNT(2), + DATA_FILE_COUNT(3), + TOTAL_DATA_FILE_SIZE_IN_BYTES(4), + POSITION_DELETE_RECORD_COUNT(5), + POSITION_DELETE_FILE_COUNT(6), + EQUALITY_DELETE_RECORD_COUNT(7), + EQUALITY_DELETE_FILE_COUNT(8), + TOTAL_RECORD_COUNT(9), + LAST_UPDATED_AT(10), + LAST_UPDATED_SNAPSHOT_ID(11); + + private final int id; + + Column(int id) { + this.id = id; + } + + public int id() { + return id; + } + } + + /** + * Generates the partition stats file schema based on a given partition type. + * + *

Note: Provide the unified partition schema type as mentioned in the spec. + * + * @param partitionType unified partition schema type. + * @return a schema that corresponds to the provided unified partition type. + */ + public static Schema schema(StructType partitionType) { + Preconditions.checkState(!partitionType.fields().isEmpty(), "table must be partitioned"); + return new Schema( + NestedField.required(1, Column.PARTITION.name(), partitionType), + NestedField.required(2, Column.SPEC_ID.name(), IntegerType.get()), + NestedField.required(3, Column.DATA_RECORD_COUNT.name(), LongType.get()), + NestedField.required(4, Column.DATA_FILE_COUNT.name(), IntegerType.get()), + NestedField.required(5, Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.name(), LongType.get()), + NestedField.optional(6, Column.POSITION_DELETE_RECORD_COUNT.name(), LongType.get()), + NestedField.optional(7, Column.POSITION_DELETE_FILE_COUNT.name(), IntegerType.get()), + NestedField.optional(8, Column.EQUALITY_DELETE_RECORD_COUNT.name(), LongType.get()), + NestedField.optional(9, Column.EQUALITY_DELETE_FILE_COUNT.name(), IntegerType.get()), + NestedField.optional(10, Column.TOTAL_RECORD_COUNT.name(), LongType.get()), + NestedField.optional(11, Column.LAST_UPDATED_AT.name(), LongType.get()), + NestedField.optional(12, Column.LAST_UPDATED_SNAPSHOT_ID.name(), LongType.get())); + } + + /** + * Computes and writes the {@link PartitionStatisticsFile} for a given table's current snapshot. + * + * @param table The {@link Table} for which the partition statistics is computed. + * @return {@link PartitionStatisticsFile} for the current snapshot. + */ + public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) throws IOException { + return computeAndWriteStatsFile(table, null); + } + + /** + * Computes and writes the {@link PartitionStatisticsFile} for a given table and branch. + * + * @param table The {@link Table} for which the partition statistics is computed. + * @param branch A branch information to select the required snapshot. + * @return {@link PartitionStatisticsFile} for the given branch. + */ + public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, String branch) { + Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); + if (currentSnapshot == null) { + Preconditions.checkArgument( + branch == null, "Couldn't find the snapshot for the branch %s", branch); + return null; + } + + StructType partitionType = Partitioning.partitionType(table); + Collection stats = PartitionStatsUtil.computeStats(table, currentSnapshot); + List sortedStats = PartitionStatsUtil.sortStats(stats, partitionType); + return writePartitionStatsFile( + table, currentSnapshot.snapshotId(), schema(partitionType), sortedStats.iterator()); + } + + @VisibleForTesting + static PartitionStatisticsFile writePartitionStatsFile( + Table table, long snapshotId, Schema dataSchema, Iterator records) { + OutputFile outputFile = newPartitionStatsFile(table, snapshotId); + + try (DataWriter writer = dataWriter(dataSchema, outputFile); ) { + records.forEachRemaining(writer::write); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + return ImmutableGenericPartitionStatisticsFile.builder() + .snapshotId(snapshotId) + .path(outputFile.location()) + .fileSizeInBytes(outputFile.toInputFile().getLength()) + .build(); + } + + /** + * Reads partition statistics from the specified {@link InputFile} using given schema. + * + * @param schema The {@link Schema} of the partition statistics file. + * @param inputFile An {@link InputFile} pointing to the partition stats file. + */ + public static CloseableIterable readPartitionStatsFile( + Schema schema, InputFile inputFile) { + CloseableIterable records = dataReader(schema, inputFile); + return CloseableIterable.transform(records, PartitionStatsHandler::recordToPartitionStats); + } + + private static FileFormat fileFormat(String fileLocation) { + return FileFormat.fromString(fileLocation.substring(fileLocation.lastIndexOf(".") + 1)); + } + + private static OutputFile newPartitionStatsFile(Table table, long snapshotId) { + FileFormat fileFormat = + fileFormat( + table.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT)); + return table + .io() + .newOutputFile( + ((HasTableOperations) table) + .operations() + .metadataFileLocation( + fileFormat.addExtension( + String.format(Locale.ROOT, "partition-stats-%d", snapshotId)))); + } + + private static DataWriter dataWriter(Schema dataSchema, OutputFile outputFile) + throws IOException { + FileFormat fileFormat = fileFormat(outputFile.location()); + switch (fileFormat) { + case PARQUET: + return Parquet.writeData(outputFile) + .schema(dataSchema) + .createWriterFunc(InternalWriter::buildWriter) + .overwrite() + .withSpec(PartitionSpec.unpartitioned()) + .build(); + case AVRO: + return Avro.writeData(outputFile) + .schema(dataSchema) + .createWriterFunc(org.apache.iceberg.avro.InternalWriter::create) + .overwrite() + .withSpec(PartitionSpec.unpartitioned()) + .build(); + case ORC: + // Internal writers are not supported for ORC yet. + default: + throw new UnsupportedOperationException("Unsupported file format:" + fileFormat.name()); + } + } + + private static CloseableIterable dataReader(Schema schema, InputFile inputFile) { + FileFormat fileFormat = fileFormat(inputFile.location()); + switch (fileFormat) { + case PARQUET: + return Parquet.read(inputFile) + .project(schema) + .createReaderFunc( + fileSchema -> + org.apache.iceberg.data.parquet.InternalReader.buildReader(schema, fileSchema)) + .build(); + case AVRO: + return Avro.read(inputFile) + .project(schema) + .createReaderFunc(fileSchema -> InternalReader.create(schema)) + .build(); + case ORC: + // Internal readers are not supported for ORC yet. + default: + throw new UnsupportedOperationException("Unsupported file format:" + fileFormat.name()); + } + } + + private static PartitionStats recordToPartitionStats(StructLike record) { + PartitionStats stats = + new PartitionStats( + record.get(Column.PARTITION.id(), StructLike.class), + record.get(Column.SPEC_ID.id(), Integer.class)); + stats.set(Column.DATA_RECORD_COUNT.id(), record.get(Column.DATA_RECORD_COUNT.id(), Long.class)); + stats.set(Column.DATA_FILE_COUNT.id(), record.get(Column.DATA_FILE_COUNT.id(), Integer.class)); + stats.set( + Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.id(), + record.get(Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.id(), Long.class)); + stats.set( + Column.POSITION_DELETE_RECORD_COUNT.id(), + record.get(Column.POSITION_DELETE_RECORD_COUNT.id(), Long.class)); + stats.set( + Column.POSITION_DELETE_FILE_COUNT.id(), + record.get(Column.POSITION_DELETE_FILE_COUNT.id(), Integer.class)); + stats.set( + Column.EQUALITY_DELETE_RECORD_COUNT.id(), + record.get(Column.EQUALITY_DELETE_RECORD_COUNT.id(), Long.class)); + stats.set( + Column.EQUALITY_DELETE_FILE_COUNT.id(), + record.get(Column.EQUALITY_DELETE_FILE_COUNT.id(), Integer.class)); + stats.set( + Column.TOTAL_RECORD_COUNT.id(), record.get(Column.TOTAL_RECORD_COUNT.id(), Long.class)); + stats.set(Column.LAST_UPDATED_AT.id(), record.get(Column.LAST_UPDATED_AT.id(), Long.class)); + stats.set( + Column.LAST_UPDATED_SNAPSHOT_ID.id(), + record.get(Column.LAST_UPDATED_SNAPSHOT_ID.id(), Long.class)); + return stats; + } +} diff --git a/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsHandler.java b/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsHandler.java new file mode 100644 index 000000000000..11ef516f2e48 --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsHandler.java @@ -0,0 +1,582 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import static org.apache.iceberg.data.PartitionStatsHandler.Column; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.File; +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.PartitionData; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionStatisticsFile; +import org.apache.iceberg.PartitionStats; +import org.apache.iceberg.PartitionStatsUtil; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.TestTables; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.types.Types; +import org.assertj.core.api.Assumptions; +import org.assertj.core.groups.Tuple; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestPartitionStatsHandler { + private static final Schema SCHEMA = + new Schema( + optional(1, "c1", Types.IntegerType.get()), + optional(2, "c2", Types.StringType.get()), + optional(3, "c3", Types.StringType.get())); + + protected static final PartitionSpec SPEC = + PartitionSpec.builderFor(SCHEMA).identity("c2").identity("c3").build(); + + @TempDir public File temp; + + private static final Random RANDOM = ThreadLocalRandom.current(); + + @Parameters(name = "fileFormat = {0}") + public static List parameters() { + return Arrays.asList(FileFormat.PARQUET, FileFormat.ORC, FileFormat.AVRO); + } + + @Parameter private FileFormat format; + + @Test + public void testPartitionStatsOnEmptyTable() throws Exception { + Table testTable = TestTables.create(tempDir("empty_table"), "empty_table", SCHEMA, SPEC, 2); + assertThat(PartitionStatsHandler.computeAndWriteStatsFile(testTable)).isNull(); + } + + @Test + public void testPartitionStatsOnEmptyBranch() throws Exception { + Table testTable = TestTables.create(tempDir("empty_branch"), "empty_branch", SCHEMA, SPEC, 2); + testTable.manageSnapshots().createBranch("b1").commit(); + PartitionStatisticsFile partitionStatisticsFile = + PartitionStatsHandler.computeAndWriteStatsFile(testTable, "b1"); + // creates an empty stats file since the dummy snapshot exist + assertThat(partitionStatisticsFile.fileSizeInBytes()).isEqualTo(0L); + assertThat(partitionStatisticsFile.snapshotId()) + .isEqualTo(testTable.refs().get("b1").snapshotId()); + } + + @Test + public void testPartitionStatsOnInvalidSnapshot() throws Exception { + Table testTable = + TestTables.create(tempDir("invalid_snapshot"), "invalid_snapshot", SCHEMA, SPEC, 2); + assertThatThrownBy( + () -> PartitionStatsHandler.computeAndWriteStatsFile(testTable, "INVALID_BRANCH")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Couldn't find the snapshot for the branch INVALID_BRANCH"); + } + + @Test + public void testPartitionStatsOnUnPartitionedTable() throws Exception { + Table testTable = + TestTables.create( + tempDir("unpartitioned_table"), + "unpartitioned_table", + SCHEMA, + PartitionSpec.unpartitioned(), + 2); + + List records = prepareRecords(testTable.schema()); + DataFile dataFile = FileHelpers.writeDataFile(testTable, outputFile(), records); + testTable.newAppend().appendFile(dataFile).commit(); + + assertThatThrownBy(() -> PartitionStatsHandler.computeAndWriteStatsFile(testTable)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("table must be partitioned"); + } + + @Test + public void testAllDatatypePartitionWriting() throws Exception { + Schema schema = + new Schema( + required(100, "id", Types.LongType.get()), + optional(101, "data", Types.StringType.get()), + required(102, "b", Types.BooleanType.get()), + optional(103, "i", Types.IntegerType.get()), + required(104, "l", Types.LongType.get()), + optional(105, "f", Types.FloatType.get()), + required(106, "d", Types.DoubleType.get()), + optional(107, "date", Types.DateType.get()), + required(108, "ts", Types.TimestampType.withoutZone()), + required(110, "s", Types.StringType.get()), + required(111, "uuid", Types.UUIDType.get()), + required(112, "fixed", Types.FixedType.ofLength(7)), + optional(113, "bytes", Types.BinaryType.get()), + required(114, "dec_9_0", Types.DecimalType.of(9, 0)), + required(115, "dec_11_2", Types.DecimalType.of(11, 2)), + required(116, "dec_38_10", Types.DecimalType.of(38, 10)), // maximum precision + required(117, "time", Types.TimeType.get())); + + PartitionSpec spec = + PartitionSpec.builderFor(schema) + .identity("b") + .identity("i") + .identity("l") + .identity("f") + .identity("d") + .identity("date") + .identity("ts") + .identity("s") + .identity("uuid") + .identity("fixed") + .identity("bytes") + .identity("dec_9_0") + .identity("dec_11_2") + .identity("dec_38_10") + .identity("time") + .build(); + + Table testTable = + TestTables.create( + tempDir("test_all_type"), "test_all_type", schema, spec, SortOrder.unsorted(), 2); + + Types.StructType partitionSchema = Partitioning.partitionType(testTable); + Schema dataSchema = PartitionStatsHandler.schema(partitionSchema); + + PartitionData partitionData = + new PartitionData(dataSchema.findField(Column.PARTITION.name()).type().asStructType()); + partitionData.set(0, true); + partitionData.set(1, 42); + partitionData.set(2, 42L); + partitionData.set(3, 3.14f); + partitionData.set(4, 3.141592653589793); + partitionData.set(5, Literal.of("2022-01-01").to(Types.DateType.get()).value()); + partitionData.set( + 6, Literal.of("2017-12-01T10:12:55.038194").to(Types.TimestampType.withoutZone()).value()); + partitionData.set(7, "string"); + partitionData.set(8, UUID.randomUUID()); + partitionData.set(9, ByteBuffer.wrap(new byte[] {0, 1, 2, 3, 4, 5, 6})); + partitionData.set(10, ByteBuffer.wrap(new byte[] {1, 2, 3})); + partitionData.set(11, new BigDecimal("123456789")); + partitionData.set(12, new BigDecimal("1234567.89")); + partitionData.set(13, new BigDecimal("12345678901234567890.1234567890")); + partitionData.set(14, Literal.of("10:10:10").to(Types.TimeType.get()).value()); + + PartitionStats partitionStats = new PartitionStats(partitionData, RANDOM.nextInt(10)); + partitionStats.set(Column.DATA_RECORD_COUNT.id(), RANDOM.nextLong()); + partitionStats.set(Column.DATA_FILE_COUNT.id(), RANDOM.nextInt()); + partitionStats.set(Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.id(), 1024L * RANDOM.nextInt(20)); + List expected = Collections.singletonList(partitionStats); + PartitionStatisticsFile statisticsFile = + PartitionStatsHandler.writePartitionStatsFile( + testTable, 42L, dataSchema, expected.iterator()); + + List written; + try (CloseableIterable recordIterator = + PartitionStatsHandler.readPartitionStatsFile( + dataSchema, Files.localInput(statisticsFile.path()))) { + written = Lists.newArrayList(recordIterator); + } + + assertThat(written).hasSize(expected.size()); + Comparator comparator = Comparators.forType(partitionSchema); + for (int i = 0; i < written.size(); i++) { + assertThat(PartitionStatsUtil.isEqual(comparator, written.get(i), expected.get(i))).isTrue(); + } + } + + @Test + public void testOptionalFieldsWriting() throws Exception { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").build(); + Table testTable = + TestTables.create( + tempDir("test_partition_stats_optional"), + "test_partition_stats_optional", + SCHEMA, + spec, + SortOrder.unsorted(), + 2); + + Types.StructType partitionSchema = Partitioning.partitionType(testTable); + Schema dataSchema = PartitionStatsHandler.schema(partitionSchema); + + ImmutableList.Builder partitionListBuilder = ImmutableList.builder(); + for (int i = 0; i < 5; i++) { + PartitionData partitionData = + new PartitionData(dataSchema.findField(Column.PARTITION.name()).type().asStructType()); + partitionData.set(0, RANDOM.nextInt()); + + PartitionStats stats = new PartitionStats(partitionData, RANDOM.nextInt(10)); + stats.set(Column.PARTITION.ordinal(), partitionData); + stats.set(Column.DATA_RECORD_COUNT.ordinal(), RANDOM.nextLong()); + stats.set(Column.DATA_FILE_COUNT.ordinal(), RANDOM.nextInt()); + stats.set(Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.ordinal(), 1024L * RANDOM.nextInt(20)); + stats.set(Column.POSITION_DELETE_RECORD_COUNT.ordinal(), null); + stats.set(Column.POSITION_DELETE_FILE_COUNT.ordinal(), null); + stats.set(Column.EQUALITY_DELETE_RECORD_COUNT.ordinal(), null); + stats.set(Column.EQUALITY_DELETE_FILE_COUNT.ordinal(), null); + stats.set(Column.TOTAL_RECORD_COUNT.ordinal(), null); + stats.set(Column.LAST_UPDATED_AT.ordinal(), null); + stats.set(Column.LAST_UPDATED_SNAPSHOT_ID.ordinal(), null); + + partitionListBuilder.add(stats); + } + + List expected = partitionListBuilder.build(); + + assertThat(expected.get(0)) + .extracting( + PartitionStats::positionDeleteRecordCount, + PartitionStats::positionDeleteFileCount, + PartitionStats::equalityDeleteRecordCount, + PartitionStats::equalityDeleteFileCount, + PartitionStats::totalRecordCount, + PartitionStats::lastUpdatedAt, + PartitionStats::lastUpdatedSnapshotId) + .isEqualTo( + Arrays.asList( + 0L, 0, 0L, 0, 0L, null, null)); // null counters must be initialized to zero. + + PartitionStatisticsFile statisticsFile = + PartitionStatsHandler.writePartitionStatsFile( + testTable, 42L, dataSchema, expected.iterator()); + + List written; + try (CloseableIterable recordIterator = + PartitionStatsHandler.readPartitionStatsFile( + dataSchema, Files.localInput(statisticsFile.path()))) { + written = Lists.newArrayList(recordIterator); + } + + assertThat(written).hasSize(expected.size()); + Comparator comparator = Comparators.forType(partitionSchema); + for (int i = 0; i < written.size(); i++) { + assertThat(PartitionStatsUtil.isEqual(comparator, written.get(i), expected.get(i))).isTrue(); + } + } + + @SuppressWarnings("checkstyle:MethodLength") + @TestTemplate // Tests for all the table formats (PARQUET, ORC, AVRO) + public void testPartitionStats() throws Exception { + Assumptions.assumeThat(format) + .as("ORC internal readers and writers are not supported") + .isNotEqualTo(FileFormat.ORC); + + Table testTable = + TestTables.create( + tempDir("partition_stats_" + format.name()), + "partition_stats_compute_" + format.name(), + SCHEMA, + SPEC, + 2, + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name())); + + List records = prepareRecords(testTable.schema()); + DataFile dataFile1 = + FileHelpers.writeDataFile( + testTable, outputFile(), TestHelpers.Row.of("foo", "A"), records.subList(0, 3)); + DataFile dataFile2 = + FileHelpers.writeDataFile( + testTable, outputFile(), TestHelpers.Row.of("foo", "B"), records.subList(3, 4)); + DataFile dataFile3 = + FileHelpers.writeDataFile( + testTable, outputFile(), TestHelpers.Row.of("bar", "A"), records.subList(4, 5)); + DataFile dataFile4 = + FileHelpers.writeDataFile( + testTable, outputFile(), TestHelpers.Row.of("bar", "B"), records.subList(5, 7)); + + for (int i = 0; i < 3; i++) { + // insert same set of seven records thrice to have a new manifest files + testTable + .newAppend() + .appendFile(dataFile1) + .appendFile(dataFile2) + .appendFile(dataFile3) + .appendFile(dataFile4) + .commit(); + } + + Snapshot snapshot1 = testTable.currentSnapshot(); + Schema recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable)); + Types.StructType partitionType = + recordSchema.findField(Column.PARTITION.name()).type().asStructType(); + computeAndValidatePartitionStats( + testTable, + recordSchema, + Tuple.tuple( + partitionRecord(partitionType, "foo", "A"), + 0, + 9L, + 3, + 3 * dataFile1.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "foo", "B"), + 0, + 3L, + 3, + 3 * dataFile2.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "bar", "A"), + 0, + 3L, + 3, + 3 * dataFile3.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "bar", "B"), + 0, + 6L, + 3, + 3 * dataFile4.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId())); + + DeleteFile posDeletes = commitPositionDeletes(testTable, dataFile1); + Snapshot snapshot2 = testTable.currentSnapshot(); + + DeleteFile eqDeletes = commitEqualityDeletes(testTable); + Snapshot snapshot3 = testTable.currentSnapshot(); + + recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable)); + partitionType = recordSchema.findField(Column.PARTITION.name()).type().asStructType(); + computeAndValidatePartitionStats( + testTable, + recordSchema, + Tuple.tuple( + partitionRecord(partitionType, "foo", "A"), + 0, + 9L, + 3, + 3 * dataFile1.fileSizeInBytes(), + 0L, + 0, + eqDeletes.recordCount(), + 1, + 0L, + snapshot3.timestampMillis(), + snapshot3.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "foo", "B"), + 0, + 3L, + 3, + 3 * dataFile2.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "bar", "A"), + 0, + 3L, + 3, + 3 * dataFile3.fileSizeInBytes(), + posDeletes.recordCount(), + 1, + 0L, + 0, + 0L, + snapshot2.timestampMillis(), + snapshot2.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "bar", "B"), + 0, + 6L, + 3, + 3 * dataFile4.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + 0L, + snapshot1.timestampMillis(), + snapshot1.snapshotId())); + } + + private OutputFile outputFile() throws IOException { + return Files.localOutput(File.createTempFile("data", null, tempDir("stats"))); + } + + private static StructLike partitionRecord( + Types.StructType partitionType, String val1, String val2) { + GenericRecord record = GenericRecord.create(partitionType); + record.set(0, val1); + record.set(1, val2); + return record; + } + + private static List prepareRecords(Schema schema) { + GenericRecord record = GenericRecord.create(schema); + List records = Lists.newArrayList(); + // foo 4 records, bar 3 records + // foo, A -> 3 records + records.add(record.copy("c1", 0, "c2", "foo", "c3", "A")); + records.add(record.copy("c1", 1, "c2", "foo", "c3", "A")); + records.add(record.copy("c1", 2, "c2", "foo", "c3", "A")); + // foo, B -> 1 record + records.add(record.copy("c1", 3, "c2", "foo", "c3", "B")); + // bar, A -> 1 record + records.add(record.copy("c1", 4, "c2", "bar", "c3", "A")); + // bar, B -> 2 records + records.add(record.copy("c1", 5, "c2", "bar", "c3", "B")); + records.add(record.copy("c1", 6, "c2", "bar", "c3", "B")); + return records; + } + + private static void computeAndValidatePartitionStats( + Table testTable, Schema recordSchema, Tuple... expectedValues) throws IOException { + // compute and commit partition stats file + Snapshot currentSnapshot = testTable.currentSnapshot(); + PartitionStatisticsFile result = PartitionStatsHandler.computeAndWriteStatsFile(testTable); + testTable.updatePartitionStatistics().setPartitionStatistics(result).commit(); + assertThat(result.snapshotId()).isEqualTo(currentSnapshot.snapshotId()); + + // read the partition entries from the stats file + List partitionStats; + try (CloseableIterable recordIterator = + PartitionStatsHandler.readPartitionStatsFile( + recordSchema, Files.localInput(result.path()))) { + partitionStats = Lists.newArrayList(recordIterator); + } + + assertThat(partitionStats) + .extracting( + PartitionStats::partition, + PartitionStats::specId, + PartitionStats::dataRecordCount, + PartitionStats::dataFileCount, + PartitionStats::totalDataFileSizeInBytes, + PartitionStats::positionDeleteRecordCount, + PartitionStats::positionDeleteFileCount, + PartitionStats::equalityDeleteRecordCount, + PartitionStats::equalityDeleteFileCount, + PartitionStats::totalRecordCount, + PartitionStats::lastUpdatedAt, + PartitionStats::lastUpdatedSnapshotId) + .containsExactlyInAnyOrder(expectedValues); + } + + private DeleteFile commitEqualityDeletes(Table testTable) throws IOException { + Schema deleteRowSchema = testTable.schema().select("c1"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + List dataDeletes = + Lists.newArrayList(dataDelete.copy("c1", 1), dataDelete.copy("c1", 2)); + + DeleteFile eqDeletes = + FileHelpers.writeDeleteFile( + testTable, + Files.localOutput(File.createTempFile("junit", null, tempDir("eq_delete"))), + TestHelpers.Row.of("foo", "A"), + dataDeletes, + deleteRowSchema); + testTable.newRowDelta().addDeletes(eqDeletes).commit(); + return eqDeletes; + } + + private DeleteFile commitPositionDeletes(Table testTable, DataFile dataFile1) throws IOException { + List> deletes = Lists.newArrayList(); + for (long i = 0; i < 2; i++) { + deletes.add( + positionDelete(testTable.schema(), dataFile1.location(), i, (int) i, String.valueOf(i))); + } + + DeleteFile posDeletes = + FileHelpers.writePosDeleteFile( + testTable, + Files.localOutput(File.createTempFile("junit", null, tempDir("pos_delete"))), + TestHelpers.Row.of("bar", "A"), + deletes); + testTable.newRowDelta().addDeletes(posDeletes).commit(); + return posDeletes; + } + + private static PositionDelete positionDelete( + Schema tableSchema, CharSequence path, Long position, Object... values) { + PositionDelete posDelete = PositionDelete.create(); + GenericRecord nested = GenericRecord.create(tableSchema); + for (int i = 0; i < values.length; i++) { + nested.set(i, values[i]); + } + + posDelete.set(path, position, nested); + return posDelete; + } + + private File tempDir(String folderName) throws IOException { + return java.nio.file.Files.createTempDirectory(temp.toPath(), folderName).toFile(); + } +}