Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Update variant class visibility #12105

Merged
merged 2 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.variants.Variants.PhysicalType;
import org.apache.iceberg.variants.Variants.Primitives;

class PrimitiveWrapper<T> implements VariantPrimitive<T> {
Expand All @@ -47,17 +48,23 @@ class PrimitiveWrapper<T> implements VariantPrimitive<T> {
private static final byte BINARY_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_BINARY);
private static final byte STRING_HEADER = VariantUtil.primitiveHeader(Primitives.TYPE_STRING);

private final Variants.PhysicalType type;
private final PhysicalType type;
private final T value;
private ByteBuffer buffer = null;

PrimitiveWrapper(Variants.PhysicalType type, T value) {
this.type = type;
PrimitiveWrapper(PhysicalType type, T value) {
if (value instanceof Boolean
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: seems I prefer the existing implementation which is cleaner and consistent with other types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The trade-off is that this would require a separate reader just for boolean values. I'm open to that if there are strong objections, but it seems to me that allowing the actual type to be fixed up depending on the value is a reasonable change that saves quite a bit of code. If you object to this, I can revert it and add the boolean-specific reader.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. You are right. BOOLEAN_TRUE and BOOLEAN_FALSE physical types need special handling. When we shred them, they should be grouped together, considered as one type.

And also we may need to have a type list {NULL, BOOLEAN, INT8, INT16, etc}, which is almost same as physical type list with BOOLEAN for BOOLEAN_TRUE and BOOLEAN_FALSE. Otherwise, we can't represent a shredded column type for true/false.

I'm fine to keep it simple for now. We can revisit if needed.

&& (type == PhysicalType.BOOLEAN_TRUE || type == PhysicalType.BOOLEAN_FALSE)) {
// set the boolean type from the value so that callers can use BOOLEAN_* interchangeably
this.type = ((Boolean) value) ? PhysicalType.BOOLEAN_TRUE : PhysicalType.BOOLEAN_FALSE;
} else {
this.type = type;
}
this.value = value;
}

@Override
public Variants.PhysicalType type() {
public PhysicalType type() {
return type;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ class SerializedArray extends Variants.SerializedValue implements VariantArray {
private static final int IS_LARGE = 0b10000;

@VisibleForTesting
static SerializedArray from(SerializedMetadata metadata, byte[] bytes) {
static SerializedArray from(VariantMetadata metadata, byte[] bytes) {
return from(metadata, ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]);
}

static SerializedArray from(SerializedMetadata metadata, ByteBuffer value, int header) {
static SerializedArray from(VariantMetadata metadata, ByteBuffer value, int header) {
Preconditions.checkArgument(
value.order() == ByteOrder.LITTLE_ENDIAN, "Unsupported byte order: big endian");
Variants.BasicType basicType = VariantUtil.basicType(header);
Expand All @@ -42,14 +42,14 @@ static SerializedArray from(SerializedMetadata metadata, ByteBuffer value, int h
return new SerializedArray(metadata, value, header);
}

private final SerializedMetadata metadata;
private final VariantMetadata metadata;
private final ByteBuffer value;
private final int offsetSize;
private final int offsetListOffset;
private final int dataOffset;
private final VariantValue[] array;

private SerializedArray(SerializedMetadata metadata, ByteBuffer value, int header) {
private SerializedArray(VariantMetadata metadata, ByteBuffer value, int header) {
this.metadata = metadata;
this.value = value;
this.offsetSize = 1 + ((header & OFFSET_SIZE_MASK) >> OFFSET_SIZE_SHIFT);
Expand All @@ -61,8 +61,8 @@ private SerializedArray(SerializedMetadata metadata, ByteBuffer value, int heade
this.array = new VariantValue[numElements];
}

@VisibleForTesting
int numElements() {
@Override
public int numElements() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing it. I was trying to make Parquet change in (https://github.com/apache/iceberg/pull/11653/files#diff-b8e8443fcec3843e538dbc702d4c131ff58359cb83ccdb211d8679c1d77c16bd) and we need to expose this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I needed it for the readers and for the updates here to ShreddedObject, too.

return array.length;
}

Expand All @@ -76,7 +76,7 @@ public VariantValue get(int index) {
VariantUtil.readLittleEndianUnsigned(
value, offsetListOffset + (offsetSize * (1 + index)), offsetSize);
array[index] =
Variants.from(metadata, VariantUtil.slice(value, dataOffset + offset, next - offset));
Variants.value(metadata, VariantUtil.slice(value, dataOffset + offset, next - offset));
}
return array[index];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class SerializedMetadata implements VariantMetadata, Variants.Serialized {

static final ByteBuffer EMPTY_V1_BUFFER =
ByteBuffer.wrap(new byte[] {0x01, 0x00}).order(ByteOrder.LITTLE_ENDIAN);
static final SerializedMetadata EMPTY_V1_METADATA = from(EMPTY_V1_BUFFER);

static SerializedMetadata from(byte[] bytes) {
return from(ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN));
Expand Down Expand Up @@ -63,8 +64,8 @@ private SerializedMetadata(ByteBuffer metadata, int header) {
this.dataOffset = offsetListOffset + ((1 + dictSize) * offsetSize);
}

@VisibleForTesting
int dictionarySize() {
@Override
public int dictionarySize() {
return dict.length;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ class SerializedObject extends Variants.SerializedValue implements VariantObject
private static final int FIELD_ID_SIZE_SHIFT = 4;
private static final int IS_LARGE = 0b1000000;

static SerializedObject from(SerializedMetadata metadata, byte[] bytes) {
static SerializedObject from(VariantMetadata metadata, byte[] bytes) {
return from(metadata, ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]);
}

static SerializedObject from(SerializedMetadata metadata, ByteBuffer value, int header) {
static SerializedObject from(VariantMetadata metadata, ByteBuffer value, int header) {
Preconditions.checkArgument(
value.order() == ByteOrder.LITTLE_ENDIAN, "Unsupported byte order: big endian");
Variants.BasicType basicType = VariantUtil.basicType(header);
Expand All @@ -49,7 +49,7 @@ static SerializedObject from(SerializedMetadata metadata, ByteBuffer value, int
return new SerializedObject(metadata, value, header);
}

private final SerializedMetadata metadata;
private final VariantMetadata metadata;
private final ByteBuffer value;
private final int fieldIdSize;
private final int fieldIdListOffset;
Expand All @@ -61,7 +61,7 @@ static SerializedObject from(SerializedMetadata metadata, ByteBuffer value, int
private final int dataOffset;
private final VariantValue[] values;

private SerializedObject(SerializedMetadata metadata, ByteBuffer value, int header) {
private SerializedObject(VariantMetadata metadata, ByteBuffer value, int header) {
this.metadata = metadata;
this.value = value;
this.offsetSize = 1 + ((header & OFFSET_SIZE_MASK) >> OFFSET_SIZE_SHIFT);
Expand Down Expand Up @@ -112,12 +112,13 @@ private void initOffsetsAndLengths(int numElements) {
}
}

@VisibleForTesting
int numElements() {
@Override
public int numFields() {
return fieldIds.length;
}

SerializedMetadata metadata() {
@VisibleForTesting
VariantMetadata metadata() {
return metadata;
}

Expand All @@ -140,6 +141,7 @@ public Pair<String, Integer> next() {
};
}

@Override
public Iterable<String> fieldNames() {
return () ->
new Iterator<>() {
Expand Down Expand Up @@ -180,7 +182,7 @@ public VariantValue get(String name) {

if (null == values[index]) {
values[index] =
Variants.from(
Variants.value(
metadata, VariantUtil.slice(value, dataOffset + offsets[index], lengths[index]));
}

Expand Down
87 changes: 70 additions & 17 deletions core/src/main/java/org/apache/iceberg/variants/ShreddedObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.SortedMerge;

Expand All @@ -35,22 +39,55 @@
* fields. This also does not allow updating or replacing the metadata for the unshredded object,
* which could require recursively rewriting field IDs.
*/
class ShreddedObject implements VariantObject {
private final SerializedMetadata metadata;
private final SerializedObject unshredded;
public class ShreddedObject implements VariantObject {
private final VariantMetadata metadata;
private final VariantObject unshredded;
private final Map<String, VariantValue> shreddedFields = Maps.newHashMap();
private final Set<String> removedFields = Sets.newHashSet();
private SerializationState serializationState = null;

ShreddedObject(SerializedMetadata metadata) {
ShreddedObject(VariantMetadata metadata) {
this.metadata = metadata;
this.unshredded = null;
}

ShreddedObject(SerializedObject unshredded) {
this.metadata = unshredded.metadata();
ShreddedObject(VariantMetadata metadata, VariantObject unshredded) {
this.metadata = metadata;
this.unshredded = unshredded;
}

@VisibleForTesting
VariantMetadata metadata() {
return metadata;
}

private Set<String> nameSet() {
Set<String> names = Sets.newHashSet(shreddedFields.keySet());

if (unshredded != null) {
Iterables.addAll(names, unshredded.fieldNames());
}

names.removeAll(removedFields);

return names;
}

@Override
public Iterable<String> fieldNames() {
return nameSet();
}

@Override
public int numFields() {
return nameSet().size();
}

public void remove(String field) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this remove() try to support or to be used?

Copy link
Contributor Author

@rdblue rdblue Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of this class is to create objects from an unshredded, serialized variant in value and the fields in its corresponding typed_value. The serialized object is used to construct the ShreddedObject instance and then the shredded fields are set through put.

This is intended to handle fields that are "missing" because the field's value and typed_value are null. In those cases, we need to basically add a null value to the shreddedFields map. We could do that, but the map implementations that we use (from Guava) don't allow null values. Even if we used a map that could handle null, we would have to handle those nulls in places like nameSet and in serialization. That way we correctly store that the field was missing according to the shredding spec, rather than defined and equal to a Variant null.

I thought it was cleaner to handle missing fields by calling remove for the field name to show that it is not present in the shredded fields. I also think that using a separate set of field names makes the most sense for handling these instead of using null as a sentinel value in the shreddedFields map.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explanation.

If a field is missing and we remove the field from shreddedFields, why do we still need removedFields to keep track of it? Would the following get the correct field list?

  private Set<String> nameSet() {
    Set<String> names = Sets.newHashSet(shreddedFields.keySet());

    if (unshredded != null) {
      Iterables.addAll(names, unshredded.fieldNames());
    }

    return names;
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That doesn't handle the case where unshredded incorrectly includes the field. We need to keep track of the shredded fields, whether present or missing, so that the shredded fields are always used.

shreddedFields.remove(field);
removedFields.add(field);
}

public void put(String field, VariantValue value) {
Preconditions.checkArgument(
metadata.id(field) >= 0, "Cannot find field name in metadata: %s", field);
Expand All @@ -63,6 +100,10 @@ public void put(String field, VariantValue value) {

@Override
public VariantValue get(String field) {
if (removedFields.contains(field)) {
return null;
}

// the shredded value takes precedence if there is a conflict
VariantValue value = shreddedFields.get(field);
if (value != null) {
Expand All @@ -79,7 +120,8 @@ public VariantValue get(String field) {
@Override
public int sizeInBytes() {
if (null == serializationState) {
this.serializationState = new SerializationState(metadata, unshredded, shreddedFields);
this.serializationState =
new SerializationState(metadata, unshredded, shreddedFields, removedFields);
}

return serializationState.size();
Expand All @@ -91,15 +133,16 @@ public int writeTo(ByteBuffer buffer, int offset) {
buffer.order() == ByteOrder.LITTLE_ENDIAN, "Invalid byte order: big endian");

if (null == serializationState) {
this.serializationState = new SerializationState(metadata, unshredded, shreddedFields);
this.serializationState =
new SerializationState(metadata, unshredded, shreddedFields, removedFields);
}

return serializationState.writeTo(buffer, offset);
}

/** Common state for {@link #size()} and {@link #writeTo(ByteBuffer, int)} */
private static class SerializationState {
private final SerializedMetadata metadata;
private final VariantMetadata metadata;
private final Map<String, ByteBuffer> unshreddedFields;
private final Map<String, VariantValue> shreddedFields;
private final int dataSize;
Expand All @@ -109,28 +152,38 @@ private static class SerializationState {
private final int offsetSize;

private SerializationState(
SerializedMetadata metadata,
SerializedObject unshredded,
Map<String, VariantValue> shreddedFields) {
VariantMetadata metadata,
VariantObject unshredded,
Map<String, VariantValue> shreddedFields,
Set<String> removedFields) {
this.metadata = metadata;
// field ID size is the size needed to store the largest field ID in the data
this.fieldIdSize = VariantUtil.sizeOf(metadata.dictionarySize());
this.shreddedFields = shreddedFields;
this.shreddedFields = Maps.newHashMap(shreddedFields);

int totalDataSize = 0;
// get the unshredded field names and values as byte buffers
ImmutableMap.Builder<String, ByteBuffer> unshreddedBuilder = ImmutableMap.builder();
if (unshredded != null) {
for (Pair<String, Integer> field : unshredded.fields()) {
if (unshredded instanceof SerializedObject) {
// for serialized objects, use existing buffers instead of materializing values
SerializedObject serialized = (SerializedObject) unshredded;
for (Pair<String, Integer> field : serialized.fields()) {
// if the value is replaced by an unshredded field, don't include it
String name = field.first();
boolean replaced = shreddedFields.containsKey(name);
boolean replaced = shreddedFields.containsKey(name) || removedFields.contains(name);
if (!replaced) {
ByteBuffer value = unshredded.sliceValue(field.second());
ByteBuffer value = serialized.sliceValue(field.second());
unshreddedBuilder.put(name, value);
totalDataSize += value.remaining();
}
}
} else if (unshredded != null) {
for (String name : unshredded.fieldNames()) {
boolean replaced = shreddedFields.containsKey(name) || removedFields.contains(name);
if (!replaced) {
shreddedFields.put(name, unshredded.get(name));
}
}
}

this.unshreddedFields = unshreddedBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ public interface VariantArray extends VariantValue {
/** Returns the {@link VariantValue} at {@code index} in this array. */
VariantValue get(int index);

/** Returns the number of fields stored in this array. */
int numElements();

@Override
default Variants.PhysicalType type() {
return Variants.PhysicalType.ARRAY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,7 @@ public interface VariantMetadata extends Variants.Serialized {
* @throws NoSuchElementException if the dictionary does not contain the ID
*/
String get(int id);

/** Returns the size of the metadata dictionary. */
int dictionarySize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ public interface VariantObject extends VariantValue {
/** Returns the {@link VariantValue} for the field named {@code name} in this object. */
VariantValue get(String name);

/** Returns the names of fields stored in this object. */
Iterable<String> fieldNames();

/** Returns the number of fields stored in this object. */
int numFields();

@Override
default Variants.PhysicalType type() {
return Variants.PhysicalType.OBJECT;
Expand Down
Loading