Skip to content

Commit

Permalink
Core: Update variant class visibility.
Browse files Browse the repository at this point in the history
  • Loading branch information
rdblue committed Jan 25, 2025
1 parent d693f83 commit a5229ee
Show file tree
Hide file tree
Showing 15 changed files with 186 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.variants.Variants.PhysicalType;
import org.apache.iceberg.variants.Variants.Primitives;

class PrimitiveWrapper<T> implements VariantPrimitive<T> {
Expand All @@ -47,17 +48,23 @@ class PrimitiveWrapper<T> implements VariantPrimitive<T> {
private static final byte BINARY_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_BINARY);
private static final byte STRING_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_STRING);

private final Variants.PhysicalType type;
private final PhysicalType type;
private final T value;
private ByteBuffer buffer = null;

PrimitiveWrapper(Variants.PhysicalType type, T value) {
this.type = type;
PrimitiveWrapper(PhysicalType type, T value) {
if (value instanceof Boolean
&& (type == PhysicalType.BOOLEAN_TRUE || type == PhysicalType.BOOLEAN_FALSE)) {
// set the boolean type from the value so that callers can use BOOLEAN_* interchangeably
this.type = ((Boolean) value) ? PhysicalType.BOOLEAN_TRUE : PhysicalType.BOOLEAN_FALSE;
} else {
this.type = type;
}
this.value = value;
}

@Override
public Variants.PhysicalType type() {
public PhysicalType type() {
return type;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ class SerializedArray extends Variants.SerializedValue implements VariantArray {
private static final int IS_LARGE = 0b10000;

@VisibleForTesting
static SerializedArray from(SerializedMetadata metadata, byte[] bytes) {
static SerializedArray from(VariantMetadata metadata, byte[] bytes) {
return from(metadata, ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]);
}

static SerializedArray from(SerializedMetadata metadata, ByteBuffer value, int header) {
static SerializedArray from(VariantMetadata metadata, ByteBuffer value, int header) {
Preconditions.checkArgument(
value.order() == ByteOrder.LITTLE_ENDIAN, "Unsupported byte order: big endian");
Variants.BasicType basicType = VariantUtil.basicType(header);
Expand All @@ -42,14 +42,14 @@ static SerializedArray from(SerializedMetadata metadata, ByteBuffer value, int h
return new SerializedArray(metadata, value, header);
}

private final SerializedMetadata metadata;
private final VariantMetadata metadata;
private final ByteBuffer value;
private final int offsetSize;
private final int offsetListOffset;
private final int dataOffset;
private final VariantValue[] array;

private SerializedArray(SerializedMetadata metadata, ByteBuffer value, int header) {
private SerializedArray(VariantMetadata metadata, ByteBuffer value, int header) {
this.metadata = metadata;
this.value = value;
this.offsetSize = 1 + ((header & OFFSET_SIZE_MASK) >> OFFSET_SIZE_SHIFT);
Expand All @@ -61,8 +61,8 @@ private SerializedArray(SerializedMetadata metadata, ByteBuffer value, int heade
this.array = new VariantValue[numElements];
}

@VisibleForTesting
int numElements() {
@Override
public int numElements() {
return array.length;
}

Expand All @@ -76,7 +76,7 @@ public VariantValue get(int index) {
VariantUtil.readLittleEndianUnsigned(
value, offsetListOffset + (offsetSize * (1 + index)), offsetSize);
array[index] =
Variants.from(metadata, VariantUtil.slice(value, dataOffset + offset, next - offset));
Variants.value(metadata, VariantUtil.slice(value, dataOffset + offset, next - offset));
}
return array[index];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class SerializedMetadata implements VariantMetadata, Variants.Serialized {

static final ByteBuffer EMPTY_V1_BUFFER =
ByteBuffer.wrap(new byte[] {0x01, 0x00}).order(ByteOrder.LITTLE_ENDIAN);
static final SerializedMetadata EMPTY_V1_METADATA = from(EMPTY_V1_BUFFER);

static SerializedMetadata from(byte[] bytes) {
return from(ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN));
Expand Down Expand Up @@ -63,8 +64,8 @@ private SerializedMetadata(ByteBuffer metadata, int header) {
this.dataOffset = offsetListOffset + ((1 + dictSize) * offsetSize);
}

@VisibleForTesting
int dictionarySize() {
@Override
public int dictionarySize() {
return dict.length;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ class SerializedObject extends Variants.SerializedValue implements VariantObject
private static final int FIELD_ID_SIZE_SHIFT = 4;
private static final int IS_LARGE = 0b1000000;

static SerializedObject from(SerializedMetadata metadata, byte[] bytes) {
static SerializedObject from(VariantMetadata metadata, byte[] bytes) {
return from(metadata, ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]);
}

static SerializedObject from(SerializedMetadata metadata, ByteBuffer value, int header) {
static SerializedObject from(VariantMetadata metadata, ByteBuffer value, int header) {
Preconditions.checkArgument(
value.order() == ByteOrder.LITTLE_ENDIAN, "Unsupported byte order: big endian");
Variants.BasicType basicType = VariantUtil.basicType(header);
Expand All @@ -49,7 +49,7 @@ static SerializedObject from(SerializedMetadata metadata, ByteBuffer value, int
return new SerializedObject(metadata, value, header);
}

private final SerializedMetadata metadata;
private final VariantMetadata metadata;
private final ByteBuffer value;
private final int fieldIdSize;
private final int fieldIdListOffset;
Expand All @@ -61,7 +61,7 @@ static SerializedObject from(SerializedMetadata metadata, ByteBuffer value, int
private final int dataOffset;
private final VariantValue[] values;

private SerializedObject(SerializedMetadata metadata, ByteBuffer value, int header) {
private SerializedObject(VariantMetadata metadata, ByteBuffer value, int header) {
this.metadata = metadata;
this.value = value;
this.offsetSize = 1 + ((header & OFFSET_SIZE_MASK) >> OFFSET_SIZE_SHIFT);
Expand Down Expand Up @@ -112,12 +112,13 @@ private void initOffsetsAndLengths(int numElements) {
}
}

@VisibleForTesting
int numElements() {
@Override
public int numElements() {
return fieldIds.length;
}

SerializedMetadata metadata() {
@VisibleForTesting
VariantMetadata metadata() {
return metadata;
}

Expand All @@ -140,6 +141,7 @@ public Pair<String, Integer> next() {
};
}

@Override
public Iterable<String> fieldNames() {
return () ->
new Iterator<>() {
Expand Down Expand Up @@ -180,7 +182,7 @@ public VariantValue get(String name) {

if (null == values[index]) {
values[index] =
Variants.from(
Variants.value(
metadata, VariantUtil.slice(value, dataOffset + offsets[index], lengths[index]));
}

Expand Down
85 changes: 68 additions & 17 deletions core/src/main/java/org/apache/iceberg/variants/ShreddedObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.SortedMerge;

Expand All @@ -35,22 +39,53 @@
* fields. This also does not allow updating or replacing the metadata for the unshredded object,
* which could require recursively rewriting field IDs.
*/
class ShreddedObject implements VariantObject {
private final SerializedMetadata metadata;
private final SerializedObject unshredded;
public class ShreddedObject implements VariantObject {
private final VariantMetadata metadata;
private final VariantObject unshredded;
private final Map<String, VariantValue> shreddedFields = Maps.newHashMap();
private final Set<String> removedFields = Sets.newHashSet();
private SerializationState serializationState = null;

ShreddedObject(SerializedMetadata metadata) {
ShreddedObject(VariantMetadata metadata) {
this.metadata = metadata;
this.unshredded = null;
}

ShreddedObject(SerializedObject unshredded) {
this.metadata = unshredded.metadata();
ShreddedObject(VariantMetadata metadata, VariantObject unshredded) {
this.metadata = metadata;
this.unshredded = unshredded;
}

@VisibleForTesting
VariantMetadata metadata() {
return metadata;
}

private Set<String> nameSet() {
Set<String> names = Sets.newHashSet(shreddedFields.keySet());

if (unshredded != null) {
Iterables.addAll(names, unshredded.fieldNames());
}

return names;
}

@Override
public Iterable<String> fieldNames() {
return nameSet();
}

@Override
public int numElements() {
return nameSet().size();
}

public void remove(String field) {
shreddedFields.remove(field);
removedFields.add(field);
}

public void put(String field, VariantValue value) {
Preconditions.checkArgument(
metadata.id(field) >= 0, "Cannot find field name in metadata: %s", field);
Expand All @@ -63,6 +98,10 @@ public void put(String field, VariantValue value) {

@Override
public VariantValue get(String field) {
if (removedFields.contains(field)) {
return null;
}

// the shredded value takes precedence if there is a conflict
VariantValue value = shreddedFields.get(field);
if (value != null) {
Expand All @@ -79,7 +118,8 @@ public VariantValue get(String field) {
@Override
public int sizeInBytes() {
if (null == serializationState) {
this.serializationState = new SerializationState(metadata, unshredded, shreddedFields);
this.serializationState =
new SerializationState(metadata, unshredded, shreddedFields, removedFields);
}

return serializationState.size();
Expand All @@ -91,15 +131,16 @@ public int writeTo(ByteBuffer buffer, int offset) {
buffer.order() == ByteOrder.LITTLE_ENDIAN, "Invalid byte order: big endian");

if (null == serializationState) {
this.serializationState = new SerializationState(metadata, unshredded, shreddedFields);
this.serializationState =
new SerializationState(metadata, unshredded, shreddedFields, removedFields);
}

return serializationState.writeTo(buffer, offset);
}

/** Common state for {@link #size()} and {@link #writeTo(ByteBuffer, int)} */
private static class SerializationState {
private final SerializedMetadata metadata;
private final VariantMetadata metadata;
private final Map<String, ByteBuffer> unshreddedFields;
private final Map<String, VariantValue> shreddedFields;
private final int dataSize;
Expand All @@ -109,28 +150,38 @@ private static class SerializationState {
private final int offsetSize;

private SerializationState(
SerializedMetadata metadata,
SerializedObject unshredded,
Map<String, VariantValue> shreddedFields) {
VariantMetadata metadata,
VariantObject unshredded,
Map<String, VariantValue> shreddedFields,
Set<String> removedFields) {
this.metadata = metadata;
// field ID size is the size needed to store the largest field ID in the data
this.fieldIdSize = VariantUtil.sizeOf(metadata.dictionarySize());
this.shreddedFields = shreddedFields;
this.shreddedFields = Maps.newHashMap(shreddedFields);

int totalDataSize = 0;
// get the unshredded field names and values as byte buffers
ImmutableMap.Builder<String, ByteBuffer> unshreddedBuilder = ImmutableMap.builder();
if (unshredded != null) {
for (Pair<String, Integer> field : unshredded.fields()) {
if (unshredded instanceof SerializedObject) {
// for serialized objects, use existing buffers instead of materializing values
SerializedObject serialized = (SerializedObject) unshredded;
for (Pair<String, Integer> field : serialized.fields()) {
// if the value is replaced by an unshredded field, don't include it
String name = field.first();
boolean replaced = shreddedFields.containsKey(name);
boolean replaced = shreddedFields.containsKey(name) || removedFields.contains(name);
if (!replaced) {
ByteBuffer value = unshredded.sliceValue(field.second());
ByteBuffer value = serialized.sliceValue(field.second());
unshreddedBuilder.put(name, value);
totalDataSize += value.remaining();
}
}
} else if (unshredded != null) {
for (String name : unshredded.fieldNames()) {
boolean replaced = shreddedFields.containsKey(name) || removedFields.contains(name);
if (!replaced) {
shreddedFields.put(name, unshredded.get(name));
}
}
}

this.unshreddedFields = unshreddedBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ public interface VariantArray extends VariantValue {
/** Returns the {@link VariantValue} at {@code index} in this array. */
VariantValue get(int index);

/** Returns the number of fields stored in this array. */
int numElements();

@Override
default Variants.PhysicalType type() {
return Variants.PhysicalType.ARRAY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,7 @@ public interface VariantMetadata extends Variants.Serialized {
* @throws NoSuchElementException if the dictionary does not contain the ID
*/
String get(int id);

/** Returns the size of the metadata dictionary. */
int dictionarySize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ public interface VariantObject extends VariantValue {
/** Returns the {@link VariantValue} for the field named {@code name} in this object. */
VariantValue get(String name);

/** Returns the names of fields stored in this object. */
Iterable<String> fieldNames();

/** Returns the number of fields stored in this object. */
int numElements();

@Override
default Variants.PhysicalType type() {
return Variants.PhysicalType.OBJECT;
Expand Down
Loading

0 comments on commit a5229ee

Please sign in to comment.