Skip to content

Commit

Permalink
Data: Add partition stats writer and reader
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat committed Sep 26, 2024
1 parent 2d9c344 commit 941505a
Show file tree
Hide file tree
Showing 8 changed files with 1,176 additions and 16 deletions.
49 changes: 46 additions & 3 deletions core/src/main/java/org/apache/iceberg/PartitionStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
*/
package org.apache.iceberg;

import java.util.Objects;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public class PartitionStats implements StructLike {

private static final int STATS_COUNT = 12;

private StructLike partition;
private Record partition; // PartitionData as Record
private int specId;
private long dataRecordCount;
private int dataFileCount;
Expand All @@ -37,7 +39,7 @@ public class PartitionStats implements StructLike {
private Long lastUpdatedAt; // null by default
private Long lastUpdatedSnapshotId; // null by default

public PartitionStats(StructLike partition, int specId) {
public PartitionStats(Record partition, int specId) {
this.partition = partition;
this.specId = specId;
}
Expand Down Expand Up @@ -205,7 +207,7 @@ public <T> T get(int pos, Class<T> javaClass) {
public <T> void set(int pos, T value) {
switch (pos) {
case 0:
this.partition = (StructLike) value;
this.partition = (Record) value;
break;
case 1:
this.specId = (int) value;
Expand Down Expand Up @@ -249,4 +251,45 @@ public <T> void set(int pos, T value) {
throw new UnsupportedOperationException("Unknown position: " + pos);
}
}

@Override
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public boolean equals(Object other) {
if (this == other) {
return true;
} else if (!(other instanceof PartitionStats)) {
return false;
}

PartitionStats that = (PartitionStats) other;
return Objects.equals(partition, that.partition)
&& specId == that.specId
&& dataRecordCount == that.dataRecordCount
&& dataFileCount == that.dataFileCount
&& totalDataFileSizeInBytes == that.totalDataFileSizeInBytes
&& positionDeleteRecordCount == that.positionDeleteRecordCount
&& positionDeleteFileCount == that.positionDeleteFileCount
&& equalityDeleteRecordCount == that.equalityDeleteRecordCount
&& equalityDeleteFileCount == that.equalityDeleteFileCount
&& totalRecordCount == that.totalRecordCount
&& Objects.equals(lastUpdatedAt, that.lastUpdatedAt)
&& Objects.equals(lastUpdatedSnapshotId, that.lastUpdatedSnapshotId);
}

@Override
public int hashCode() {
return Objects.hash(
partition,
specId,
dataRecordCount,
dataFileCount,
totalDataFileSizeInBytes,
positionDeleteRecordCount,
positionDeleteFileCount,
equalityDeleteRecordCount,
equalityDeleteFileCount,
totalRecordCount,
lastUpdatedAt,
lastUpdatedSnapshotId);
}
}
23 changes: 19 additions & 4 deletions core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.PartitionMap;
import org.apache.iceberg.util.PartitionUtil;
Expand Down Expand Up @@ -87,13 +90,10 @@ private static PartitionMap<PartitionStats> collectStats(
PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
int specId = manifest.partitionSpecId();
PartitionSpec spec = table.specs().get(specId);
PartitionData keyTemplate = new PartitionData(partitionType);

for (ManifestEntry<?> entry : reader.entries()) {
ContentFile<?> file = entry.file();
StructLike coercedPartition =
PartitionUtil.coercePartition(partitionType, spec, file.partition());
StructLike key = keyTemplate.copyFor(coercedPartition);
Record key = coercedPartitionRecord(file, spec, partitionType);
Snapshot snapshot = table.snapshot(entry.snapshotId());
PartitionStats stats =
statsMap.computeIfAbsent(specId, key, () -> new PartitionStats(key, specId));
Expand Down Expand Up @@ -133,4 +133,19 @@ private static Collection<PartitionStats> mergeStats(

return statsMap.values();
}

private static Record coercedPartitionRecord(
ContentFile<?> file, PartitionSpec spec, StructType partitionType) {
// keep the partition data as per the unified spec by coercing
StructLike partition = PartitionUtil.coercePartition(partitionType, spec, file.partition());

GenericRecord record = GenericRecord.create(partitionType);
List<Types.NestedField> fields = partitionType.fields();
for (int index = 0; index < fields.size(); index++) {
Object val = partition.get(index, fields.get(index).type().typeId().javaClass());
record.set(index, val);
}

return record;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
*/
package org.apache.iceberg.data;

import java.nio.ByteBuffer;
import java.util.UUID;
import org.apache.avro.generic.GenericData;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.DateTimeUtil;

public class IdentityPartitionConverters {
Expand Down Expand Up @@ -48,6 +51,18 @@ public static Object convertConstant(Type type, Object value) {
case FIXED:
if (value instanceof GenericData.Fixed) {
return ((GenericData.Fixed) value).bytes();
} else if (value instanceof ByteBuffer) {
return ByteBuffers.toByteArray((ByteBuffer) value);
}
return value;
case UUID:
if (value instanceof ByteBuffer) {
return ByteBuffers.toByteArray((ByteBuffer) value);
}
return value;
case BINARY:
if (value instanceof byte[]) {
return ByteBuffer.wrap((byte[]) value);
}
return value;
default:
Expand Down
170 changes: 170 additions & 0 deletions core/src/main/java/org/apache/iceberg/data/PartitionStatsRecord.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.data;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.PartitionStats;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.StructType;

/** Wraps the {@link PartitionStats} as {@link Record}. Used by generic writers and readers. */
public class PartitionStatsRecord implements Record, StructLike {
private static final LoadingCache<StructType, Map<String, Integer>> NAME_MAP_CACHE =
Caffeine.newBuilder()
.weakKeys()
.build(
struct -> {
Map<String, Integer> idToPos = Maps.newHashMap();
List<Types.NestedField> fields = struct.fields();
for (int index = 0; index < fields.size(); index++) {
idToPos.put(fields.get(index).name(), index);
}
return idToPos;
});

private final StructType struct;
private final PartitionStats partitionStats;
private final Map<String, Integer> nameToPos;

public static PartitionStatsRecord create(Schema schema, PartitionStats partitionStats) {
return new PartitionStatsRecord(schema.asStruct(), partitionStats);
}

public static PartitionStatsRecord create(StructType struct, PartitionStats partitionStats) {
return new PartitionStatsRecord(struct, partitionStats);
}

public PartitionStats unwrap() {
return partitionStats;
}

private PartitionStatsRecord(StructType struct, PartitionStats partitionStats) {
this.struct = struct;
this.partitionStats = partitionStats;
this.nameToPos = NAME_MAP_CACHE.get(struct);
}

private PartitionStatsRecord(PartitionStatsRecord toCopy) {
this.struct = toCopy.struct;
this.partitionStats = toCopy.partitionStats;
this.nameToPos = toCopy.nameToPos;
}

private PartitionStatsRecord(PartitionStatsRecord toCopy, Map<String, Object> overwrite) {
this.struct = toCopy.struct;
this.partitionStats = toCopy.partitionStats;
this.nameToPos = toCopy.nameToPos;
for (Map.Entry<String, Object> entry : overwrite.entrySet()) {
setField(entry.getKey(), entry.getValue());
}
}

@Override
public StructType struct() {
return struct;
}

@Override
public Object getField(String name) {
Integer pos = nameToPos.get(name);
Preconditions.checkArgument(pos != null, "Cannot get unknown field named: %s", name);
return partitionStats.get(pos, Object.class);
}

@Override
public void setField(String name, Object value) {
Integer pos = nameToPos.get(name);
Preconditions.checkArgument(pos != null, "Cannot set unknown field named: %s", name);
partitionStats.set(pos, value);
}

@Override
public int size() {
return partitionStats.size();
}

@Override
public Object get(int pos) {
return partitionStats.get(pos, Object.class);
}

@Override
public <T> T get(int pos, Class<T> javaClass) {
Object value = get(pos);
if (value == null || javaClass.isInstance(value)) {
return javaClass.cast(value);
} else {
throw new IllegalStateException("Not an instance of " + javaClass.getName() + ": " + value);
}
}

@Override
public <T> void set(int pos, T value) {
partitionStats.set(pos, value);
}

@Override
public PartitionStatsRecord copy() {
return new PartitionStatsRecord(this);
}

@Override
public PartitionStatsRecord copy(Map<String, Object> overwriteValues) {
return new PartitionStatsRecord(this, overwriteValues);
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Record(");
for (int index = 0; index < partitionStats.size(); index++) {
if (index != 0) {
sb.append(", ");
}
sb.append(partitionStats.get(index, Object.class));
}
sb.append(")");
return sb.toString();
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
} else if (!(other instanceof PartitionStatsRecord)) {
return false;
}

PartitionStatsRecord that = (PartitionStatsRecord) other;
return this.partitionStats.equals(that.partitionStats);
}

@Override
public int hashCode() {
return Objects.hashCode(partitionStats);
}
}
20 changes: 11 additions & 9 deletions core/src/test/java/org/apache/iceberg/TestPartitionStatsUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.assertj.core.groups.Tuple;
Expand Down Expand Up @@ -370,17 +372,17 @@ public void testPartitionStatsWithSchemaEvolution() throws Exception {
snapshot2.snapshotId()));
}

private static PartitionData partitionData(Types.StructType partitionType, String c2, String c3) {
PartitionData partitionData = new PartitionData(partitionType);
partitionData.set(0, c2);
partitionData.set(1, c3);
return partitionData;
private static Record partitionData(Types.StructType partitionType, String c2, String c3) {
GenericRecord record = GenericRecord.create(partitionType);
record.set(0, c2);
record.set(1, c3);
return record;
}

private static PartitionData partitionData(Types.StructType partitionType, String c2) {
PartitionData partitionData = new PartitionData(partitionType);
partitionData.set(0, c2);
return partitionData;
private static Record partitionData(Types.StructType partitionType, String c2) {
GenericRecord record = GenericRecord.create(partitionType);
record.set(0, c2);
return record;
}

private static List<DataFile> prepareDataFiles(Table table) {
Expand Down
20 changes: 20 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestTables.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,26 @@ public static TestTable create(
return new TestTable(ops, name, reporter);
}

public static TestTable create(
File temp,
String name,
Schema schema,
PartitionSpec spec,
int formatVersion,
Map<String, String> properties) {
TestTableOperations ops = new TestTableOperations(name, temp);
if (ops.current() != null) {
throw new AlreadyExistsException("Table %s already exists at location: %s", name, temp);
}

ops.commit(
null,
newTableMetadata(
schema, spec, SortOrder.unsorted(), temp.toString(), properties, formatVersion));

return new TestTable(ops, name);
}

public static Transaction beginCreate(File temp, String name, Schema schema, PartitionSpec spec) {
return beginCreate(temp, name, schema, spec, SortOrder.unsorted());
}
Expand Down
Loading

0 comments on commit 941505a

Please sign in to comment.