diff --git a/core/src/main/java/org/apache/iceberg/variants/SerializedArray.java b/core/src/main/java/org/apache/iceberg/variants/SerializedArray.java index eaaecedcc578..29f972021ad1 100644 --- a/core/src/main/java/org/apache/iceberg/variants/SerializedArray.java +++ b/core/src/main/java/org/apache/iceberg/variants/SerializedArray.java @@ -28,11 +28,6 @@ class SerializedArray extends Variants.SerializedValue implements VariantArray { private static final int OFFSET_SIZE_SHIFT = 2; private static final int IS_LARGE = 0b10000; - - static SerializedArray from(Variant variant) { - return from(SerializedMetadata.from(variant.getMetadata()), variant.getValue()); - } - @VisibleForTesting static SerializedArray from(SerializedMetadata metadata, byte[] bytes) { return from(metadata, ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]); diff --git a/core/src/main/java/org/apache/iceberg/variants/SerializedObject.java b/core/src/main/java/org/apache/iceberg/variants/SerializedObject.java index 55cf26ddf1ea..1e764dfa93c4 100644 --- a/core/src/main/java/org/apache/iceberg/variants/SerializedObject.java +++ b/core/src/main/java/org/apache/iceberg/variants/SerializedObject.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.Pair; @@ -36,10 +35,6 @@ class SerializedObject extends Variants.SerializedValue implements VariantObject private static final int FIELD_ID_SIZE_SHIFT = 4; private static final int IS_LARGE = 0b1000000; - static SerializedObject from(Variant variant) { - return from(SerializedMetadata.from(variant.getMetadata()), variant.getValue()); - } - static SerializedObject from(SerializedMetadata metadata, byte[] bytes) { return from(metadata, ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]); } @@ -116,8 +111,8 @@ private void initOffsetsAndLengths(int numElements) { } } - @VisibleForTesting - int numElements() { + @Override + public int numElements() { return fieldIds.length; } diff --git a/core/src/main/java/org/apache/iceberg/variants/SerializedPrimitive.java b/core/src/main/java/org/apache/iceberg/variants/SerializedPrimitive.java index eee62bcd37a5..1a6bd37a4ff3 100644 --- a/core/src/main/java/org/apache/iceberg/variants/SerializedPrimitive.java +++ b/core/src/main/java/org/apache/iceberg/variants/SerializedPrimitive.java @@ -28,10 +28,6 @@ class SerializedPrimitive extends Variants.SerializedValue implements VariantPri private static final int PRIMITIVE_TYPE_SHIFT = 2; private static final int PRIMITIVE_OFFSET = Variants.HEADER_SIZE; - static SerializedPrimitive from(Variant variant) { - return from(variant.getValue()); - } - static SerializedPrimitive from(byte[] bytes) { return from(ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]); } diff --git a/core/src/main/java/org/apache/iceberg/variants/SerializedShortString.java b/core/src/main/java/org/apache/iceberg/variants/SerializedShortString.java index 8d66ac2093e3..3004a075def1 100644 --- a/core/src/main/java/org/apache/iceberg/variants/SerializedShortString.java +++ b/core/src/main/java/org/apache/iceberg/variants/SerializedShortString.java @@ -26,11 +26,7 @@ class SerializedShortString extends Variants.SerializedValue implements VariantP private static final int LENGTH_MASK = 0b11111100; private static final int LENGTH_SHIFT = 2; - static SerializedShortString from(Variant variant) { - return from(variant.getValue()); - } - - static SerializedShortString from(byte[] bytes) { + static SerializedShortString from(byte[] bytes) { return from(ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]); } diff --git a/core/src/main/java/org/apache/iceberg/variants/Variant.java b/core/src/main/java/org/apache/iceberg/variants/Variant.java index 0a01f02d1541..b5606fa094b6 100644 --- a/core/src/main/java/org/apache/iceberg/variants/Variant.java +++ b/core/src/main/java/org/apache/iceberg/variants/Variant.java @@ -18,34 +18,11 @@ */ package org.apache.iceberg.variants; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +/** A variant metadata and value pair. */ +public interface Variant { + /** Returns the metadata for all values in the variant. */ + VariantMetadata metadata(); -public final class Variant { - private final byte[] value; - private final byte[] metadata; - - public Variant(byte[] value, byte[] metadata) { - Preconditions.checkArgument(metadata != null && metadata.length >= 1, - "Metadata must not be null or empty."); - Preconditions.checkArgument(value != null && value.length >= 1, - "Value must not be null or empty."); - - Preconditions.checkArgument((metadata[0] & VariantConstants.VERSION_MASK) == VariantConstants.VERSION, - "Unsupported metadata version."); - - if (value.length > VariantConstants.SIZE_LIMIT || metadata.length > VariantConstants.SIZE_LIMIT) { - throw new VariantSizeLimitException(); - } - - this.value = value; - this.metadata = metadata; - } - - public byte[] getMetadata() { - return metadata; - } - - public byte[] getValue() { - return value; - } + /** Returns the variant value. */ + VariantValue value(); } diff --git a/core/src/main/java/org/apache/iceberg/variants/VariantArray.java b/core/src/main/java/org/apache/iceberg/variants/VariantArray.java index a245d5cb3614..805c54227f06 100644 --- a/core/src/main/java/org/apache/iceberg/variants/VariantArray.java +++ b/core/src/main/java/org/apache/iceberg/variants/VariantArray.java @@ -20,7 +20,9 @@ /** An variant array value. */ public interface VariantArray extends VariantValue { - int numElements(); + default int numElements() { + throw new UnsupportedOperationException(); + } /** Returns the {@link VariantValue} at {@code index} in this array. */ VariantValue get(int index); diff --git a/core/src/main/java/org/apache/iceberg/variants/VariantArrayBuilder.java b/core/src/main/java/org/apache/iceberg/variants/VariantArrayBuilder.java new file mode 100644 index 000000000000..5d28b2aa7467 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/variants/VariantArrayBuilder.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.util.List; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.DateTimeUtil; + +public class VariantArrayBuilder extends VariantBuilderBase { + private final List offsets; + + public VariantArrayBuilder(ByteBufferWrapper buffer, Dictionary dict) { + super(buffer, dict); + offsets = Lists.newArrayList(); + } + + public VariantObjectBuilder startObject() { + addOffset(); + return new VariantObjectBuilder(buffer, dict); + } + + public VariantArrayBuilder startArray() { + addOffset(); + return new VariantArrayBuilder(buffer, dict); + } + + public VariantArrayBuilder writeNull() { + addOffset(); + writeNullInternal(); + return this; + } + + public VariantArrayBuilder writeBoolean(boolean value) { + addOffset(); + writeBooleanInternal(value); + return this; + } + + public VariantArrayBuilder writeNumeric(long value) { + addOffset(); + writeNumericInternal(value); + return this; + } + + public VariantArrayBuilder writeDouble(double value) { + addOffset(); + writeDoubleInternal(value); + return this; + } + + public VariantArrayBuilder writeDecimal(BigDecimal value) { + addOffset(); + writeDecimalInternal(value); + return this; + } + + public VariantArrayBuilder writeDate(LocalDate value) { + addOffset(); + writeDateInternal(DateTimeUtil.daysFromDate(value)); + return this; + } + + public VariantArrayBuilder writeTimestampTz(OffsetDateTime value) { + addOffset(); + writeTimestampTzInternal(DateTimeUtil.microsFromTimestamptz(value)); + return this; + } + + public VariantArrayBuilder writeTimestampNtz(LocalDateTime value) { + addOffset(); + writeTimestampNtzInternal(DateTimeUtil.microsFromTimestamp(value)); + return this; + } + + public VariantArrayBuilder writeFloat(float value) { + addOffset(); + writeFloatInternal(value); + return this; + } + + public VariantArrayBuilder writeBinary(byte[] value) { + addOffset(); + writeBinaryInternal(value); + return this; + } + + public VariantArrayBuilder writeString(String str) { + addOffset(); + writeStringInternal(str); + return this; + } + + private void addOffset() { + offsets.add(buffer.pos - startPos); + } + + public void endArray() { + super.endArray(startPos, offsets); + } +} diff --git a/core/src/main/java/org/apache/iceberg/variants/VariantBuilder.java b/core/src/main/java/org/apache/iceberg/variants/VariantBuilder.java index bb47acd9621e..0e4835c6fcbb 100644 --- a/core/src/main/java/org/apache/iceberg/variants/VariantBuilder.java +++ b/core/src/main/java/org/apache/iceberg/variants/VariantBuilder.java @@ -25,25 +25,28 @@ import com.fasterxml.jackson.core.exc.InputCoercionException; import java.io.IOException; import java.math.BigDecimal; -import java.math.BigInteger; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.Collections; import java.util.List; -import java.util.Map; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -public class VariantBuilder { - private static final int MAX_SHORT_STR_SIZE = 0x3F; +/** A builder class to build a primitive/array/object variant. */ +public class VariantBuilder extends VariantBuilderBase { + public VariantBuilder() { + super(new VariantBuilderBase.ByteBufferWrapper(), new VariantBuilderBase.Dictionary()); + } + + public VariantPrimitiveBuilder createPrimitive() { + VariantPrimitiveBuilder primitiveBuilder = new VariantPrimitiveBuilder(buffer, dict); + return primitiveBuilder; + } - private ByteBufferWrapper buffer = new ByteBufferWrapper(); + public VariantObjectBuilder startObject() { + return new VariantObjectBuilder(buffer, dict); + } - // Store the mapping from a string to a monotonically increasing assigned id - private final Map dictionary = Maps.newHashMap(); - // Store all the strings encoded with UTF8 in `dictionary` in the order of assigned ids. - private final List dictionaryKeys = Lists.newArrayList(); + public VariantArrayBuilder startArray() { + return new VariantArrayBuilder(buffer, dict); + } /** * Parses a JSON string and constructs a Variant object. @@ -66,54 +69,6 @@ public static Variant parseJson(String json) throws IOException { } } - /** - * Builds the variant metadata from `dictionaryKeys` and returns the resulting Variant object. - * - * @return The constructed Variant object. - */ - public Variant build() { - int numKeys = dictionaryKeys.size(); - - // Calculate total size of dictionary strings - long numStringBytes = dictionaryKeys.stream().mapToLong(key -> key.length).sum(); - if (numStringBytes > VariantConstants.SIZE_LIMIT) { - throw new VariantSizeLimitException(); - } - - // Determine the number of bytes required for dictionary size and offset entry - int offsetSize = sizeOf(Math.max((int) numStringBytes, numKeys)); - - // metadata: header byte, dictionary size, offsets and string bytes - long metadataSize = 1 + offsetSize + (numKeys + 1) * offsetSize + numStringBytes; - - // Ensure the metadata size is within limits - if (metadataSize > VariantConstants.SIZE_LIMIT) { - throw new VariantSizeLimitException(); - } - - ByteBufferWrapper metadataBuffer = - new ByteBufferWrapper((int) metadataSize, (int) metadataSize); - - // Write header byte (version + offset size) - metadataBuffer.addByte(VariantUtil.metadataHeader(VariantConstants.VERSION, offsetSize)); - - // Write number of keys - metadataBuffer.writeLittleEndianUnsigned(numKeys, offsetSize); - - // Write offsets - int currentOffset = 0; - for (byte[] key : dictionaryKeys) { - metadataBuffer.writeLittleEndianUnsigned(currentOffset, offsetSize); - currentOffset += key.length; - } - metadataBuffer.writeLittleEndianUnsigned(numStringBytes, offsetSize); - - // Write dictionary strings - dictionaryKeys.forEach(metadataBuffer::addBytes); - - return new Variant(buffer.toByteArray(), metadataBuffer.toByteArray()); - } - private void buildJson(JsonParser parser) throws IOException { JsonToken token = parser.currentToken(); @@ -123,36 +78,36 @@ private void buildJson(JsonParser parser) throws IOException { switch (token) { case START_OBJECT: - appendObject(parser); + writeObject(parser); break; case START_ARRAY: - appendArray(parser); + writeArray(parser); break; case VALUE_STRING: - appendString(parser.getText()); + writeStringInternal(parser.getText()); break; case VALUE_NUMBER_INT: - appendInteger(parser); + writeInteger(parser); break; case VALUE_NUMBER_FLOAT: - appendFloat(parser); + writeFloat(parser); break; case VALUE_TRUE: - appendBoolean(true); + writeBooleanInternal(true); break; case VALUE_FALSE: - appendBoolean(false); + writeBooleanInternal(false); break; case VALUE_NULL: - appendNull(); + writeNullInternal(); break; default: throw new JsonParseException(parser, "Unexpected token " + token); } } - private void appendObject(JsonParser parser) throws IOException { - List fields = Lists.newArrayList(); + private void writeObject(JsonParser parser) throws IOException { + List fields = Lists.newArrayList(); int startPos = buffer.pos; // Store object keys to dictionary of metadata @@ -160,275 +115,50 @@ private void appendObject(JsonParser parser) throws IOException { String key = parser.currentName(); parser.nextToken(); // Move to the value - int id = - dictionary.computeIfAbsent( - key, - k -> { - int newId = dictionary.size(); - dictionaryKeys.add(k.getBytes(StandardCharsets.UTF_8)); - return newId; - }); - - fields.add(new FieldEntry(key, id, buffer.pos - startPos)); + int id = dict.add(key); + fields.add(new VariantBuilderBase.FieldEntry(key, id, buffer.pos - startPos)); buildJson(parser); } endObject(startPos, fields); } - private void appendArray(JsonParser parser) throws IOException { + private void writeArray(JsonParser parser) throws IOException { List offsets = Lists.newArrayList(); - int start = buffer.pos; + int startPos = buffer.pos; while (parser.nextToken() != JsonToken.END_ARRAY) { - offsets.add(buffer.pos - start); + offsets.add(buffer.pos - startPos); buildJson(parser); } - endArray(start, offsets); + endArray(startPos, offsets); } - private void appendInteger(JsonParser parser) throws IOException { + private void writeInteger(JsonParser parser) throws IOException { try { - appendNumeric(parser.getLongValue()); + writeNumericInternal(parser.getLongValue()); } catch (InputCoercionException ignored) { - appendFloat(parser); // Fallback for large integers - } - } - - private void appendString(String str) { - byte[] text = str.getBytes(StandardCharsets.UTF_8); - boolean longStr = text.length > MAX_SHORT_STR_SIZE; - - // Write header - if (longStr) { - buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_STRING)); - buffer.writeLittleEndianUnsigned(text.length, 4); - } else { - buffer.addByte(VariantUtil.shortStrHeader(text.length)); - } - - // Write string content - buffer.addBytes(text); - } - - public void appendNull() { - buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_NULL)); - } - - public void appendBoolean(boolean value) { - buffer.addByte( - VariantUtil.primitiveHeader( - value ? Variants.Primitives.TYPE_TRUE : Variants.Primitives.TYPE_FALSE)); - } - - /** - * Appends a numeric value to the variant builder, automatically choosing the smallest type (INT8, - * INT16, INT32, or INT64) to store the value efficiently. - * - * @param value The numeric value to append. - */ - public void appendNumeric(long value) { - if (value == (byte) value) { - // INT8: Requires 1 byte for header + 1 byte for value - buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_INT8)); - buffer.writeLittleEndianUnsigned(value, 1); - } else if (value == (short) value) { - // INT16: Requires 1 byte for header + 2 bytes for value - buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_INT16)); - buffer.writeLittleEndianUnsigned(value, 2); - } else if (value == (int) value) { - // INT32: Requires 1 byte for header + 4 bytes for value - buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_INT32)); - buffer.writeLittleEndianUnsigned(value, 4); - } else { - // INT64: Requires 1 byte for header + 8 bytes for value - buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_INT64)); - buffer.writeLittleEndianUnsigned(value, 8); + writeFloat(parser); // Fallback for large integers } } - public void appendDouble(double value) { - buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_DOUBLE)); - buffer.writeLittleEndianUnsigned(Double.doubleToLongBits(value), 8); - } - - /** - * Appends a decimal value to the variant builder, choosing the smallest decimal type (DECIMAL4, - * DECIMAL8, DECIMAL16) that fits its precision and scale. - */ - public void appendDecimal(BigDecimal value) { - Preconditions.checkArgument( - value.precision() <= VariantConstants.MAX_DECIMAL16_PRECISION, - "Unsupported Decimal precision: %s", - value.precision()); - - BigInteger unscaled = value.unscaledValue(); - if (value.scale() <= VariantConstants.MAX_DECIMAL4_PRECISION - && value.precision() <= VariantConstants.MAX_DECIMAL4_PRECISION) { - buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_DECIMAL4)); - buffer.addByte((byte) value.scale()); - buffer.writeLittleEndianUnsigned(unscaled.intValueExact(), 4); - } else if (value.scale() <= VariantConstants.MAX_DECIMAL8_PRECISION - && value.precision() <= VariantConstants.MAX_DECIMAL8_PRECISION) { - buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_DECIMAL8)); - buffer.addByte((byte) value.scale()); - buffer.writeLittleEndianUnsigned(unscaled.longValueExact(), 8); - } else { - buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_DECIMAL16)); - buffer.addByte((byte) value.scale()); - byte[] bytes = unscaled.toByteArray(); - // TODO call addBytes - for (int i = 0; i < 16; i++) { - byte byteValue = - i < bytes.length ? bytes[bytes.length - 1 - i] : (byte) (bytes[0] < 0 ? -1 : 0); - buffer.addByte(byteValue); - } + private void writeFloat(JsonParser parser) throws IOException { + if (!tryWriteDecimal(parser.getText())) { + writeDoubleInternal(parser.getDoubleValue()); } } - public void appendDate(int daysSinceEpoch) { - buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_DATE)); - buffer.writeLittleEndianUnsigned(daysSinceEpoch, 4); - } - - /** Appends a timestamp with timezone (microseconds since epoch) to the variant builder. */ - public void appendTimestampTz(long microsSinceEpoch) { - buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_TIMESTAMPTZ)); - buffer.writeLittleEndianUnsigned(microsSinceEpoch, 8); - } - - /** Appends a timestamp without timezone (microseconds since epoch) to the variant builder. */ - public void appendTimestampNtz(long microsSinceEpoch) { - buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_TIMESTAMPNTZ)); - buffer.writeLittleEndianUnsigned(microsSinceEpoch, 8); - } - - public void appendFloat(float value) throws VariantSizeLimitException { - buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_FLOAT)); - buffer.writeLittleEndianUnsigned(Float.floatToIntBits(value), 4); - } - - public void appendBinary(byte[] value) throws VariantSizeLimitException { - buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_BINARY)); - buffer.writeLittleEndianUnsigned(value.length, 4); - buffer.addBytes(value); - } - /** - * Completes writing an object to the buffer. Object fields are already written, and this method - * inserts header including header byte, number of elements, field IDs, and field offsets. + * Attempts to parse a JSON number as a decimal and write it. The input must meet the following + * criteria: - Be in a valid decimal format (integer with an optional '.'). - Not in scientific + * notation. - Fit within the precision and scale limits of decimal types. * - * @param startPos The starting position of the object data in the buffer. - * @param fields The list of field entries (key, ID, offset). + * @param input the input string representing the JSON number + * @return true if the decimal is valid and written successfully; false otherwise */ - private void endObject(int startPos, List fields) { - int numElements = fields.size(); - - // Sort fields by key and ensure no duplicate keys - Collections.sort(fields); - int maxId = numElements == 0 ? 0 : fields.get(0).id; - for (int i = 1; i < numElements; i++) { - maxId = Math.max(maxId, fields.get(i).id); - if (fields.get(i).key.equals(fields.get(i - 1).key)) { - throw new IllegalStateException("Duplicate key in Variant: " + fields.get(i).key); - } - } - - int dataSize = buffer.pos - startPos; // Total byte size of the object values - boolean isLarge = numElements > 0xFF; // Determine whether to use large format - int sizeBytes = isLarge ? 4 : 1; // Number of bytes for the object size - int fieldIdSize = sizeOf(maxId); // Number of bytes for each field id - int fieldOffsetSize = sizeOf(dataSize); // Number of bytes for each field offset - int headerSize = - 1 + sizeBytes + numElements * fieldIdSize + (numElements + 1) * fieldOffsetSize; - - // Shift existing data to make room for header - buffer.shift(startPos, headerSize); - - buffer.insertByte( - VariantUtil.objectHeader(isLarge, fieldIdSize, fieldOffsetSize), - startPos); // Insert header byte - buffer.insertLittleEndianUnsigned( - numElements, sizeBytes, startPos + 1); // Insert number of elements - - // Insert field IDs and offsets - int fieldIdStart = startPos + 1 + sizeBytes; - int fieldOffsetStart = fieldIdStart + numElements * fieldIdSize; - for (int i = 0; i < numElements; i++) { - buffer.insertLittleEndianUnsigned( - fields.get(i).id, fieldIdSize, fieldIdStart + i * fieldIdSize); - buffer.insertLittleEndianUnsigned( - fields.get(i).offset, fieldOffsetSize, fieldOffsetStart + i * fieldOffsetSize); - } - - // Insert the offset to the end of the data - buffer.insertLittleEndianUnsigned( - dataSize, fieldOffsetSize, fieldOffsetStart + numElements * fieldOffsetSize); - } - - /** - * Completes writing an array to the buffer. Array values are already written, and this method - * inserts header including the header byte, number of elements, and field offsets. - * - * @param startPos The starting position of the array values in the buffer. - * @param offsets The offsets for each array value. - */ - private void endArray(int startPos, List offsets) { - int dataSize = buffer.pos - startPos; // Total byte size of the array values - int numElements = offsets.size(); - - boolean isLarge = numElements > 0xFF; // Determine whether to use large format - int sizeBytes = isLarge ? 4 : 1; // Number of bytes for the array size - int fieldOffsetSize = sizeOf(dataSize); // Number of bytes of each field offset - int headerSize = 1 + sizeBytes + (numElements + 1) * fieldOffsetSize; // header size - int offsetStart = startPos + 1 + sizeBytes; // Start position for offsets - - // Shift existing data to make room for header - buffer.shift(startPos, headerSize); - - buffer.insertByte( - VariantUtil.arrayHeader(isLarge, fieldOffsetSize), startPos); // Insert header byte - buffer.insertLittleEndianUnsigned( - numElements, sizeBytes, startPos + 1); // Insert number of elements - - // Insert field offsets - for (int i = 0; i < numElements; i++) { - buffer.insertLittleEndianUnsigned( - offsets.get(i), fieldOffsetSize, offsetStart + i * fieldOffsetSize); - } - - // Insert the offset to the end of the data - buffer.insertLittleEndianUnsigned( - dataSize, fieldOffsetSize, offsetStart + numElements * fieldOffsetSize); - } - - /** Choose the smallest number of bytes to store the given value. */ - private static int sizeOf(int maxValue) { - if (maxValue <= 0xFF) { - return 1; - } else if (maxValue <= 0xFFFF) { - return 2; - } else if (maxValue <= 0xFFFFFF) { - return 3; - } - - return 4; - } - - private void appendFloat(JsonParser parser) throws IOException { - if (!tryAppendDecimal(parser.getText())) { - appendDouble(parser.getDoubleValue()); - } - } - - /** - * Attempts to parse a JSON number as a decimal and append it. The input must: - Use only decimal - * format (integer with an optional '.'). - Avoid scientific notation. - Fit within the precision - * and scale limits of decimal types. - */ - private boolean tryAppendDecimal(String input) { - // Validate that the input only contains valid decimal characters. + private boolean tryWriteDecimal(String input) { + // Validate that the input matches a decimal format and is not in scientific notation. if (!input.matches("-?\\d+(\\.\\d+)?")) { return false; } @@ -436,155 +166,13 @@ private boolean tryAppendDecimal(String input) { // Parse the input string to BigDecimal. BigDecimal decimalValue = new BigDecimal(input); - // Check if the decimal value meets precision and scale limits. + // Ensure the decimal value meets precision and scale limits. if (decimalValue.scale() <= VariantConstants.MAX_DECIMAL16_PRECISION && decimalValue.precision() <= VariantConstants.MAX_DECIMAL16_PRECISION) { - appendDecimal(decimalValue); + writeDecimalInternal(decimalValue); return true; } return false; } - - // Temporarily store the information of a field. We need to collect all fields in an JSON object, - // sort them by their keys, and build the variant object in sorted order. - public static final class FieldEntry implements Comparable { - private final String key; - private final int id; - private final int offset; - - public FieldEntry(String key, int id, int offset) { - this.key = key; - this.id = id; - this.offset = offset; - } - - FieldEntry withNewOffset(int newOffset) { - return new FieldEntry(key, id, newOffset); - } - - @Override - public int compareTo(FieldEntry other) { - return key.compareTo(other.key); - } - } - - /** An auto-growing byte buffer that doubles its size whenever the capacity is exceeded. */ - private static class ByteBufferWrapper { - private static final int SIZE_LIMIT = 1 << 24; // 16MB size limit - private static final int INITIAL_CAPACITY = 128; // Starting capacity - private byte[] buffer; - private int pos = 0; - private final int sizeLimit; - - ByteBufferWrapper() { - this(INITIAL_CAPACITY, SIZE_LIMIT); - } - - ByteBufferWrapper(int initialCapacity, int sizeLimit) { - if (initialCapacity <= 0) { - throw new IllegalArgumentException("Initial capacity must be positive"); - } - this.buffer = new byte[initialCapacity]; - this.sizeLimit = sizeLimit; - } - - /** - * Ensures the buffer has enough capacity to hold additional bytes. - * - * @param additional The number of additional bytes required. - * @throws VariantSizeLimitException If the required capacity exceeds the size limit. - */ - private void ensureCapacity(int additional) { - int required = pos + additional; - if (required > buffer.length) { - int newCapacity = Integer.highestOneBit(required); - newCapacity = newCapacity < required ? newCapacity * 2 : newCapacity; // Double the capacity - if (newCapacity > this.sizeLimit) { - throw new VariantSizeLimitException(); - } - - byte[] newBuffer = new byte[newCapacity]; - System.arraycopy(buffer, 0, newBuffer, 0, pos); - buffer = newBuffer; - } - } - - /** Adds a byte to the buffer, growing the buffer if necessary. */ - public void addByte(byte value) throws VariantSizeLimitException { - ensureCapacity(1); - buffer[pos++] = value; - } - - /** Adds an array of bytes to the buffer, growing the buffer if necessary. */ - public void addBytes(byte[] values) throws VariantSizeLimitException { - ensureCapacity(values.length); - System.arraycopy(values, 0, buffer, pos, values.length); - pos += values.length; - } - - /** - * Writes a numeric value in little-endian order to the buffer, growing the buffer if necessary. - * - * @param value The numeric value to write. - * @param numBytes The number of bytes to write (e.g., 2 for short, 4 for int, 8 for long). - */ - public void writeLittleEndianUnsigned(long value, int numBytes) { - if (numBytes < 1 || numBytes > 8) { - throw new IllegalArgumentException("numBytes must be between 1 and 8"); - } - ensureCapacity(numBytes); - - for (int i = 0; i < numBytes; ++i) { - buffer[pos + i] = (byte) ((value >>> (8 * i)) & 0xFF); - } - pos += numBytes; - } - - /** - * Move the bytes of buffer range [start, pos) by the provided offset position. This is used for - * writing array/object header. - */ - public void shift(int start, int offset) { - Preconditions.checkArgument(offset > 0, "offset must be positive"); - Preconditions.checkArgument(pos >= start, "start must be no greater than pos"); - ensureCapacity(offset); - - if (pos > start) { - System.arraycopy(buffer, start, buffer, start + offset, pos - start); - } - - pos += offset; - } - - /** - * Insert a byte into the buffer of the provided position. Note: this assumes shift() has been - * called to leave space for insert. - */ - public void insertByte(byte value, int insertPos) { - Preconditions.checkArgument(insertPos < pos, "insertPos must be smaller than pos"); - - buffer[insertPos] = value; - } - - /** - * Insert a number into the buffer of the provided position. Note: this assumes shift() has been - * called to leave space for insert. - */ - public void insertLittleEndianUnsigned(long value, int numBytes, int insertPos) { - Preconditions.checkArgument(insertPos < pos, "insertPos must be smaller than pos"); - if (numBytes < 1 || numBytes > 8) { - throw new IllegalArgumentException("numBytes must be between 1 and 8"); - } - - for (int i = 0; i < numBytes; ++i) { - buffer[insertPos + i] = (byte) ((value >>> (8 * i)) & 0xFF); - } - } - - /** Returns the underlying byte array. */ - public byte[] toByteArray() { - return Arrays.copyOf(buffer, pos); - } - } } diff --git a/core/src/main/java/org/apache/iceberg/variants/VariantBuilderBase.java b/core/src/main/java/org/apache/iceberg/variants/VariantBuilderBase.java new file mode 100644 index 000000000000..81852785c228 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/variants/VariantBuilderBase.java @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +abstract class VariantBuilderBase { + protected static final int MAX_SHORT_STR_SIZE = 0x3F; + + protected final ByteBufferWrapper buffer; + protected final Dictionary dict; + protected int startPos; + + VariantBuilderBase(ByteBufferWrapper buffer, Dictionary dict) { + this.buffer = buffer; + this.dict = dict; + startPos = buffer.pos; + } + + /** + * Builds the variant metadata from `dictionaryKeys` and returns the resulting Variant object. + * + * @return The constructed Variant object. + */ + public Variant build() { + int numKeys = dict.size(); + + // Calculate total size of dictionary strings + long numStringBytes = dict.totalBytes(); + if (numStringBytes > VariantConstants.SIZE_LIMIT) { + throw new VariantSizeLimitException(); + } + + // Determine the number of bytes required for dictionary size and offset entry + int offsetSize = sizeOf(Math.max((int) numStringBytes, numKeys)); + + // metadata: header byte, dictionary size, offsets and string bytes + long metadataSize = 1 + offsetSize + (numKeys + 1) * offsetSize + numStringBytes; + + // Ensure the metadata size is within limits + if (metadataSize > VariantConstants.SIZE_LIMIT) { + throw new VariantSizeLimitException(); + } + + ByteBufferWrapper metadataBuffer = + new ByteBufferWrapper((int) metadataSize, (int) metadataSize); + + // Write header byte (version + offset size) + metadataBuffer.addByte(VariantUtil.metadataHeader(VariantConstants.VERSION, offsetSize)); + + // Write number of keys + metadataBuffer.writeLittleEndianUnsigned(numKeys, offsetSize); + + // Write offsets + int currentOffset = 0; + for (byte[] key : dict.getKeys()) { + metadataBuffer.writeLittleEndianUnsigned(currentOffset, offsetSize); + currentOffset += key.length; + } + metadataBuffer.writeLittleEndianUnsigned(numStringBytes, offsetSize); + + // Write dictionary strings + dict.getKeys().forEach(metadataBuffer::addBytes); + + return new VariantImpl(metadataBuffer.toByteArray(), buffer.toByteArray()); + } + + protected void writeNullInternal() { + buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_NULL)); + } + + protected void writeBooleanInternal(boolean value) { + buffer.addByte( + VariantUtil.primitiveHeader( + value ? Variants.Primitives.TYPE_TRUE : Variants.Primitives.TYPE_FALSE)); + } + + /** + * Writes a numeric value to the variant builder, automatically choosing the smallest type (INT8, + * INT16, INT32, or INT64) to store the value efficiently. + * + * @param value The numeric value to append. + */ + protected void writeNumericInternal(long value) { + if (value == (byte) value) { + // INT8: Requires 1 byte for header + 1 byte for value + buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_INT8)); + buffer.writeLittleEndianUnsigned(value, 1); + } else if (value == (short) value) { + // INT16: Requires 1 byte for header + 2 bytes for value + buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_INT16)); + buffer.writeLittleEndianUnsigned(value, 2); + } else if (value == (int) value) { + // INT32: Requires 1 byte for header + 4 bytes for value + buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_INT32)); + buffer.writeLittleEndianUnsigned(value, 4); + } else { + // INT64: Requires 1 byte for header + 8 bytes for value + buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_INT64)); + buffer.writeLittleEndianUnsigned(value, 8); + } + } + + protected void writeDoubleInternal(double value) { + buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_DOUBLE)); + buffer.writeLittleEndianUnsigned(Double.doubleToLongBits(value), 8); + } + + /** + * Writes a decimal value to the variant builder, choosing the smallest decimal type (DECIMAL4, + * DECIMAL8, DECIMAL16) that fits its precision and scale. + */ + public void writeDecimalInternal(BigDecimal value) { + Preconditions.checkArgument( + value.precision() <= VariantConstants.MAX_DECIMAL16_PRECISION, + "Unsupported Decimal precision: %s", + value.precision()); + + BigInteger unscaled = value.unscaledValue(); + if (value.scale() <= VariantConstants.MAX_DECIMAL4_PRECISION + && value.precision() <= VariantConstants.MAX_DECIMAL4_PRECISION) { + buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_DECIMAL4)); + buffer.addByte((byte) value.scale()); + buffer.writeLittleEndianUnsigned(unscaled.intValueExact(), 4); + } else if (value.scale() <= VariantConstants.MAX_DECIMAL8_PRECISION + && value.precision() <= VariantConstants.MAX_DECIMAL8_PRECISION) { + buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_DECIMAL8)); + buffer.addByte((byte) value.scale()); + buffer.writeLittleEndianUnsigned(unscaled.longValueExact(), 8); + } else { + buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_DECIMAL16)); + buffer.addByte((byte) value.scale()); + byte[] bytes = unscaled.toByteArray(); + for (int i = 0; i < 16; i++) { + byte byteValue = + i < bytes.length ? bytes[bytes.length - 1 - i] : (byte) (bytes[0] < 0 ? -1 : 0); + buffer.addByte(byteValue); + } + } + } + + protected void writeDateInternal(int daysSinceEpoch) { + buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_DATE)); + buffer.writeLittleEndianUnsigned(daysSinceEpoch, 4); + } + + /** Writes a timestamp with timezone (microseconds since epoch) to the variant builder. */ + protected void writeTimestampTzInternal(long microsSinceEpoch) { + buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_TIMESTAMPTZ)); + buffer.writeLittleEndianUnsigned(microsSinceEpoch, 8); + } + + /** Writes a timestamp without timezone (microseconds since epoch) to the variant builder. */ + protected void writeTimestampNtzInternal(long microsSinceEpoch) { + buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_TIMESTAMPNTZ)); + buffer.writeLittleEndianUnsigned(microsSinceEpoch, 8); + } + + protected void writeFloatInternal(float value) throws VariantSizeLimitException { + buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_FLOAT)); + buffer.writeLittleEndianUnsigned(Float.floatToIntBits(value), 4); + } + + protected void writeBinaryInternal(byte[] value) throws VariantSizeLimitException { + buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_BINARY)); + buffer.writeLittleEndianUnsigned(value.length, 4); + buffer.addBytes(value); + } + + protected void writeStringInternal(String value) { + byte[] text = value.getBytes(StandardCharsets.UTF_8); + boolean longStr = text.length > MAX_SHORT_STR_SIZE; + + // Write header + if (longStr) { + buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_STRING)); + buffer.writeLittleEndianUnsigned(text.length, 4); + } else { + buffer.addByte(VariantUtil.shortStrHeader(text.length)); + } + + // Write string content + buffer.addBytes(text); + } + + /** Choose the smallest number of bytes to store the given value. */ + protected static int sizeOf(int maxValue) { + if (maxValue <= 0xFF) { + return 1; + } else if (maxValue <= 0xFFFF) { + return 2; + } else if (maxValue <= 0xFFFFFF) { + return 3; + } + + return 4; + } + + /** + * Completes writing an object to the buffer. Object fields are already written, and this method + * inserts header including header byte, number of elements, field IDs, and field offsets. + * + * @param startPos The starting position of the object data in the buffer. + * @param fields The list of field entries (key, ID, offset). + */ + protected void endObject(int startPos, List fields) { + int numElements = fields.size(); + + // Sort fields by key and ensure no duplicate keys + Collections.sort(fields); + int maxId = numElements == 0 ? 0 : fields.get(0).id; + for (int i = 1; i < numElements; i++) { + maxId = Math.max(maxId, fields.get(i).id); + if (fields.get(i).key.equals(fields.get(i - 1).key)) { + throw new IllegalStateException("Duplicate key in Variant: " + fields.get(i).key); + } + } + + int dataSize = buffer.pos - startPos; // Total byte size of the object values + boolean isLarge = numElements > 0xFF; // Determine whether to use large format + int sizeBytes = isLarge ? 4 : 1; // Number of bytes for the object size + int fieldIdSize = sizeOf(maxId); // Number of bytes for each field id + int fieldOffsetSize = sizeOf(dataSize); // Number of bytes for each field offset + int headerSize = + 1 + sizeBytes + numElements * fieldIdSize + (numElements + 1) * fieldOffsetSize; + + // Shift existing data to make room for header + buffer.shift(startPos, headerSize); + + buffer.insertByte( + VariantUtil.objectHeader(isLarge, fieldIdSize, fieldOffsetSize), + startPos); // Insert header byte + buffer.insertLittleEndianUnsigned( + numElements, sizeBytes, startPos + 1); // Insert number of elements + + // Insert field IDs and offsets + int fieldIdStart = startPos + 1 + sizeBytes; + int fieldOffsetStart = fieldIdStart + numElements * fieldIdSize; + for (int i = 0; i < numElements; i++) { + buffer.insertLittleEndianUnsigned( + fields.get(i).id, fieldIdSize, fieldIdStart + i * fieldIdSize); + buffer.insertLittleEndianUnsigned( + fields.get(i).offset, fieldOffsetSize, fieldOffsetStart + i * fieldOffsetSize); + } + + // Insert the offset to the end of the data + buffer.insertLittleEndianUnsigned( + dataSize, fieldOffsetSize, fieldOffsetStart + numElements * fieldOffsetSize); + } + + /** + * Completes writing an array to the buffer. Array values are already written, and this method + * inserts header including the header byte, number of elements, and field offsets. + * + * @param startPos The starting position of the array values in the buffer. + * @param offsets The offsets for each array value. + */ + protected void endArray(int startPos, List offsets) { + int dataSize = buffer.pos - startPos; // Total byte size of the array values + int numElements = offsets.size(); + + boolean isLarge = numElements > 0xFF; // Determine whether to use large format + int sizeBytes = isLarge ? 4 : 1; // Number of bytes for the array size + int fieldOffsetSize = sizeOf(dataSize); // Number of bytes of each field offset + int headerSize = 1 + sizeBytes + (numElements + 1) * fieldOffsetSize; // header size + int offsetStart = startPos + 1 + sizeBytes; // Start position for offsets + + // Shift existing data to make room for header + buffer.shift(startPos, headerSize); + + buffer.insertByte( + VariantUtil.arrayHeader(isLarge, fieldOffsetSize), startPos); // Insert header byte + buffer.insertLittleEndianUnsigned( + numElements, sizeBytes, startPos + 1); // Insert number of elements + + // Insert field offsets + for (int i = 0; i < numElements; i++) { + buffer.insertLittleEndianUnsigned( + offsets.get(i), fieldOffsetSize, offsetStart + i * fieldOffsetSize); + } + + // Insert the offset to the end of the data + buffer.insertLittleEndianUnsigned( + dataSize, fieldOffsetSize, offsetStart + numElements * fieldOffsetSize); + } + + /** An auto-growing byte buffer that doubles its size whenever the capacity is exceeded. */ + protected static class ByteBufferWrapper { + private static final int INITIAL_CAPACITY = 128; // Starting capacity + private byte[] buffer; + int pos = 0; + private final int sizeLimit; + + ByteBufferWrapper() { + this(INITIAL_CAPACITY, VariantConstants.SIZE_LIMIT); + } + + ByteBufferWrapper(int initialCapacity, int sizeLimit) { + if (initialCapacity <= 0) { + throw new IllegalArgumentException("Initial capacity must be positive"); + } + this.buffer = new byte[initialCapacity]; + this.sizeLimit = sizeLimit; + } + + /** + * Ensures the buffer has enough capacity to hold additional bytes. + * + * @param additional The number of additional bytes required. + * @throws VariantSizeLimitException If the required capacity exceeds the size limit. + */ + private void ensureCapacity(int additional) { + int required = pos + additional; + if (required > buffer.length) { + int newCapacity = Integer.highestOneBit(required); + newCapacity = newCapacity < required ? newCapacity * 2 : newCapacity; // Double the capacity + if (newCapacity > this.sizeLimit) { + throw new VariantSizeLimitException(); + } + + byte[] newBuffer = new byte[newCapacity]; + System.arraycopy(buffer, 0, newBuffer, 0, pos); + buffer = newBuffer; + } + } + + /** Adds a byte to the buffer, growing the buffer if necessary. */ + void addByte(byte value) throws VariantSizeLimitException { + ensureCapacity(1); + buffer[pos++] = value; + } + + /** Adds an array of bytes to the buffer, growing the buffer if necessary. */ + void addBytes(byte[] values) throws VariantSizeLimitException { + ensureCapacity(values.length); + System.arraycopy(values, 0, buffer, pos, values.length); + pos += values.length; + } + + /** + * Writes a numeric value in little-endian order to the buffer, growing the buffer if necessary. + * + * @param value The numeric value to write. + * @param numBytes The number of bytes to write (e.g., 2 for short, 4 for int, 8 for long). + */ + void writeLittleEndianUnsigned(long value, int numBytes) { + if (numBytes < 1 || numBytes > 8) { + throw new IllegalArgumentException("numBytes must be between 1 and 8"); + } + ensureCapacity(numBytes); + + for (int i = 0; i < numBytes; ++i) { + buffer[pos + i] = (byte) ((value >>> (8 * i)) & 0xFF); + } + pos += numBytes; + } + + /** + * Move the bytes of buffer range [start, pos) by the provided offset position. This is used for + * writing array/object header. + */ + void shift(int start, int offset) { + Preconditions.checkArgument(offset > 0, "offset must be positive"); + Preconditions.checkArgument(pos >= start, "start must be no greater than pos"); + ensureCapacity(offset); + + if (pos > start) { + System.arraycopy(buffer, start, buffer, start + offset, pos - start); + } + + pos += offset; + } + + /** + * Insert a byte into the buffer of the provided position. Note: this assumes shift() has been + * called to leave space for insert. + */ + void insertByte(byte value, int insertPos) { + Preconditions.checkArgument(insertPos < pos, "insertPos must be smaller than pos"); + + buffer[insertPos] = value; + } + + /** + * Insert a number into the buffer of the provided position. Note: this assumes shift() has been + * called to leave space for insert. + */ + void insertLittleEndianUnsigned(long value, int numBytes, int insertPos) { + Preconditions.checkArgument(insertPos < pos, "insertPos must be smaller than pos"); + if (numBytes < 1 || numBytes > 8) { + throw new IllegalArgumentException("numBytes must be between 1 and 8"); + } + + for (int i = 0; i < numBytes; ++i) { + buffer[insertPos + i] = (byte) ((value >>> (8 * i)) & 0xFF); + } + } + + /** Returns the underlying byte array. */ + byte[] toByteArray() { + return Arrays.copyOf(buffer, pos); + } + } + + /** + * A Variant metadata dictionary implementation which assigns a monotonically increasing assigned + * id to newly added string + */ + protected static class Dictionary { + // Store the mapping from a string to a monotonically increasing assigned id + private final Map stringIds = Maps.newHashMap(); + // Store all the strings encoded with UTF8 in `dictionary` in the order of assigned ids. + private final List utf8Strings = Lists.newArrayList(); + + /** Return the assigned id if string exists; otherwise, assign the next id and return. */ + int add(String key) { + return stringIds.computeIfAbsent( + key, + k -> { + int newId = stringIds.size(); + utf8Strings.add(k.getBytes(StandardCharsets.UTF_8)); + return newId; + }); + } + + int size() { + return utf8Strings.size(); + } + + long totalBytes() { + return utf8Strings.stream().mapToLong(key -> key.length).sum(); + } + + List getKeys() { + return utf8Strings; + } + } + + /** + * Temporarily store the information of a field. We need to collect all fields in an JSON object, + * sort them by their keys, and build the variant object in sorted order. + */ + protected static final class FieldEntry implements Comparable { + final String key; + final int id; + final int offset; + + FieldEntry(String key, int id, int offset) { + this.key = key; + this.id = id; + this.offset = offset; + } + + FieldEntry withNewOffset(int newOffset) { + return new FieldEntry(key, id, newOffset); + } + + @Override + public int compareTo(FieldEntry other) { + return key.compareTo(other.key); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/variants/VariantImpl.java b/core/src/main/java/org/apache/iceberg/variants/VariantImpl.java new file mode 100644 index 000000000000..b88a0464f718 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/variants/VariantImpl.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public final class VariantImpl implements Variant { + private final VariantMetadata metadata; + private final VariantValue value; + + public VariantImpl(byte[] metadata, byte[] value) { + Preconditions.checkArgument( + metadata != null && metadata.length >= 1, "Metadata must not be null or empty."); + Preconditions.checkArgument( + value != null && value.length >= 1, "Value must not be null or empty."); + + Preconditions.checkArgument( + (metadata[0] & VariantConstants.VERSION_MASK) == VariantConstants.VERSION, + "Unsupported metadata version."); + + if (value.length > VariantConstants.SIZE_LIMIT + || metadata.length > VariantConstants.SIZE_LIMIT) { + throw new VariantSizeLimitException(); + } + + this.metadata = SerializedMetadata.from(metadata); + + int header = value[0]; + Variants.BasicType basicType = VariantUtil.basicType(header); + switch (basicType) { + case PRIMITIVE: + this.value = SerializedPrimitive.from(value); + break; + case ARRAY: + this.value = SerializedArray.from((SerializedMetadata) this.metadata, value); + break; + case OBJECT: + this.value = SerializedObject.from((SerializedMetadata) this.metadata, value); + break; + case SHORT_STRING: + this.value = SerializedShortString.from(value); + break; + default: + throw new UnsupportedOperationException("Unsupported basic type: " + basicType); + } + } + + @Override + public VariantMetadata metadata() { + return metadata; + } + + @Override + public VariantValue value() { + return value; + } +} diff --git a/core/src/main/java/org/apache/iceberg/variants/VariantObject.java b/core/src/main/java/org/apache/iceberg/variants/VariantObject.java index 7bb82f94a467..4df11fa8f259 100644 --- a/core/src/main/java/org/apache/iceberg/variants/VariantObject.java +++ b/core/src/main/java/org/apache/iceberg/variants/VariantObject.java @@ -20,6 +20,10 @@ /** An variant object value. */ public interface VariantObject extends VariantValue { + default int numElements() { + throw new UnsupportedOperationException(); + } + /** Returns the {@link VariantValue} for the field named {@code name} in this object. */ VariantValue get(String name); diff --git a/core/src/main/java/org/apache/iceberg/variants/VariantObjectBuilder.java b/core/src/main/java/org/apache/iceberg/variants/VariantObjectBuilder.java new file mode 100644 index 000000000000..6f70a367d7dc --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/variants/VariantObjectBuilder.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.util.List; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.DateTimeUtil; + +public class VariantObjectBuilder extends VariantBuilderBase { + private final List fields; + + VariantObjectBuilder(ByteBufferWrapper buffer, Dictionary dict) { + super(buffer, dict); + fields = Lists.newArrayList(); + } + + public VariantObjectBuilder startObject(String key) { + writeKey(key); + return new VariantObjectBuilder(buffer, dict); + } + + public VariantArrayBuilder startArray(String key) { + writeKey(key); + return new VariantArrayBuilder(buffer, dict); + } + + private void writeKey(String key) { + int id = dict.add(key); + fields.add(new FieldEntry(key, id, buffer.pos - startPos)); + } + + public VariantObjectBuilder writeNull(String key) { + writeKey(key); + writeNullInternal(); + return this; + } + + public VariantObjectBuilder writeBoolean(String key, boolean value) { + writeKey(key); + writeBooleanInternal(value); + return this; + } + + public VariantObjectBuilder writeNumeric(String key, long value) { + writeKey(key); + writeNumericInternal(value); + return this; + } + + public VariantObjectBuilder writeDouble(String key, double value) { + writeKey(key); + writeDoubleInternal(value); + return this; + } + + public VariantObjectBuilder writeDecimal(String key, BigDecimal value) { + writeKey(key); + writeDecimalInternal(value); + return this; + } + + public VariantObjectBuilder writeDate(String key, LocalDate value) { + writeKey(key); + writeDateInternal(DateTimeUtil.daysFromDate(value)); + return this; + } + + public VariantObjectBuilder writeTimestampTz(String key, OffsetDateTime value) { + writeKey(key); + writeTimestampTzInternal(DateTimeUtil.microsFromTimestamptz(value)); + return this; + } + + public VariantObjectBuilder writeTimestampNtz(String key, LocalDateTime value) { + writeKey(key); + writeTimestampNtzInternal(DateTimeUtil.microsFromTimestamp(value)); + return this; + } + + public VariantObjectBuilder writeFloat(String key, float value) { + writeKey(key); + writeFloatInternal(value); + return this; + } + + public VariantObjectBuilder writeBinary(String key, byte[] value) { + writeKey(key); + writeBinaryInternal(value); + return this; + } + + public VariantObjectBuilder writeString(String key, String value) { + writeKey(key); + writeStringInternal(value); + return this; + } + + public void endObject() { + super.endObject(startPos, fields); + } +} diff --git a/core/src/main/java/org/apache/iceberg/variants/VariantPrimitiveBuilder.java b/core/src/main/java/org/apache/iceberg/variants/VariantPrimitiveBuilder.java new file mode 100644 index 000000000000..634705a09245 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/variants/VariantPrimitiveBuilder.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import org.apache.iceberg.util.DateTimeUtil; + +public class VariantPrimitiveBuilder extends VariantBuilderBase { + public VariantPrimitiveBuilder(ByteBufferWrapper buffer, Dictionary dict) { + super(buffer, dict); + } + + public VariantPrimitiveBuilder writeNull() { + writeNullInternal(); + return this; + } + + public VariantPrimitiveBuilder writeBoolean(boolean value) { + writeBooleanInternal(value); + return this; + } + + public VariantPrimitiveBuilder writeNumeric(long value) { + writeNumericInternal(value); + return this; + } + + public VariantPrimitiveBuilder writeDouble(double value) { + writeDoubleInternal(value); + return this; + } + + public VariantPrimitiveBuilder writeDecimal(BigDecimal value) { + writeDecimalInternal(value); + return this; + } + + public VariantPrimitiveBuilder writeDate(LocalDate value) { + writeDateInternal(DateTimeUtil.daysFromDate(value)); + return this; + } + + public VariantPrimitiveBuilder writeTimestampTz(OffsetDateTime value) { + writeTimestampTzInternal(DateTimeUtil.microsFromTimestamptz(value)); + return this; + } + + public VariantPrimitiveBuilder writeTimestampNtz(LocalDateTime value) { + writeTimestampNtzInternal(DateTimeUtil.microsFromTimestamp(value)); + return this; + } + + public VariantPrimitiveBuilder writeFloat(float value) { + writeFloatInternal(value); + return this; + } + + public VariantPrimitiveBuilder writeBinary(byte[] value) { + writeBinaryInternal(value); + return this; + } + + public VariantPrimitiveBuilder writeString(String value) { + writeStringInternal(value); + return this; + } +} diff --git a/core/src/test/java/org/apache/iceberg/variants/TestVariantBuilder.java b/core/src/test/java/org/apache/iceberg/variants/TestVariantBuilder.java deleted file mode 100644 index d77272f0b35d..000000000000 --- a/core/src/test/java/org/apache/iceberg/variants/TestVariantBuilder.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.variants; - -import static org.assertj.core.api.Assertions.assertThat; - -import java.io.IOException; -import java.math.BigDecimal; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.stream.Stream; -import net.minidev.json.JSONArray; -import org.apache.iceberg.util.DateTimeUtil; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; - -public class TestVariantBuilder { - @ParameterizedTest - @MethodSource("primitiveInputs") - public void testPrimitive(String input, Variants.PhysicalType expectedType, Object expectedValue) throws IOException { - Variant variant = VariantBuilder.parseJson(input); - - SerializedPrimitive primitive = SerializedPrimitive.from(variant); - - assertThat(primitive.type()).isEqualTo(expectedType); - assertThat(primitive.get()).isEqualTo(expectedValue); - } - - private static Stream primitiveInputs() { - return Stream.of( - Arguments.of("null", Variants.PhysicalType.NULL, null), - Arguments.of("true", Variants.PhysicalType.BOOLEAN_TRUE, true), - Arguments.of("false", Variants.PhysicalType.BOOLEAN_FALSE, false), - Arguments.of("34", Variants.PhysicalType.INT8, (byte)34), - Arguments.of("1234", Variants.PhysicalType.INT16, (short)1234), - Arguments.of("1234567890", Variants.PhysicalType.INT32, 1234567890), - Arguments.of("1234567890987654321", Variants.PhysicalType.INT64, 1234567890987654321L), - Arguments.of("1234e-2", Variants.PhysicalType.DOUBLE, 12.34), - Arguments.of("123456.789", Variants.PhysicalType.DECIMAL4, new BigDecimal("123456.789")), - Arguments.of("123456789.987654321", Variants.PhysicalType.DECIMAL8, new BigDecimal("123456789.987654321")), - Arguments.of("12345678901234567890.987654321", Variants.PhysicalType.DECIMAL16, new BigDecimal("12345678901234567890.987654321")), - Arguments.of("\"This test string is used to generate a primitive string type of variant\"", Variants.PhysicalType.STRING, "This test string is used to generate a primitive string type of variant") - - ); - } - - @Test - public void testPrimitiveFloat() { - VariantBuilder builder = new VariantBuilder(); - builder.appendFloat(12.34f); - Variant variant = builder.build(); - SerializedPrimitive primitive = SerializedPrimitive.from(variant); - - assertThat(primitive.type()).isEqualTo(Variants.PhysicalType.FLOAT); - assertThat(primitive.get()).isEqualTo(12.34f); - } - - @Test - public void testPrimitiveDate() { - String dateString = "2017-08-18"; - int daysSinceEpoch = DateTimeUtil.isoDateToDays(dateString); - - VariantBuilder builder = new VariantBuilder(); - builder.appendDate(daysSinceEpoch); - Variant variant = builder.build(); - SerializedPrimitive primitive = SerializedPrimitive.from(variant); - - assertThat(primitive.type()).isEqualTo(Variants.PhysicalType.DATE); - assertThat(DateTimeUtil.daysToIsoDate((int)primitive.get())).isEqualTo(dateString); - } - - @Test - public void testPrimitiveTimestampTZ() { - String tzString = "2017-08-18T14:21:01.919+00:00"; - long microsSinceEpoch = DateTimeUtil.isoTimestamptzToMicros(tzString); - - VariantBuilder builder = new VariantBuilder(); - builder.appendTimestampTz(microsSinceEpoch); - Variant variant = builder.build(); - SerializedPrimitive primitive = SerializedPrimitive.from(variant); - - assertThat(primitive.type()).isEqualTo(Variants.PhysicalType.TIMESTAMPTZ); - assertThat(DateTimeUtil.microsToIsoTimestamptz((long)primitive.get())).isEqualTo(tzString); - } - - @Test - public void testPrimitiveTimestampNTZ() { - String ntzString = "2017-08-18T14:21:01.919"; - long microsSinceEpoch = DateTimeUtil.isoTimestampToMicros(ntzString); - - VariantBuilder builder = new VariantBuilder(); - builder.appendTimestampNtz(microsSinceEpoch); - Variant variant = builder.build(); - SerializedPrimitive primitive = SerializedPrimitive.from(variant); - - assertThat(primitive.type()).isEqualTo(Variants.PhysicalType.TIMESTAMPNTZ); - assertThat(DateTimeUtil.microsToIsoTimestamp((long)primitive.get())).isEqualTo(ntzString); - } - - @Test - public void testPrimitiveBinary() { - VariantBuilder builder = new VariantBuilder(); - builder.appendBinary("iceberg".getBytes()); - Variant variant = builder.build(); - SerializedPrimitive primitive = SerializedPrimitive.from(variant); - - assertThat(primitive.type()).isEqualTo(Variants.PhysicalType.BINARY); - assertThat(primitive.get()).isEqualTo(ByteBuffer.wrap("iceberg".getBytes())); - } - - @Test - public void testShortString() throws IOException { - Variant variant = VariantBuilder.parseJson("\"iceberg\""); - SerializedShortString shortString = SerializedShortString.from(variant); - - assertThat(shortString.type()).isEqualTo(Variants.PhysicalType.STRING); - assertThat(shortString.get()).isEqualTo("iceberg"); - } - - @Test - public void testArray() throws IOException { - List input = List.of("Ford", "BMW", "Fiat"); - Variant variant = VariantBuilder.parseJson(JSONArray.toJSONString(input)); - SerializedArray arr = SerializedArray.from(variant); - - assertThat(arr.type()).isEqualTo(Variants.PhysicalType.ARRAY); - for (int i = 0; i < arr.numElements(); i++) { - assertThat(arr.get(i).asPrimitive().get()).isEqualTo(input.get(i)); - } - } - - @Test - public void testEmptyObject() throws IOException { - Variant variant = VariantBuilder.parseJson("{}"); - SerializedObject object = SerializedObject.from(variant); - - assertThat(object.type()).isEqualTo(Variants.PhysicalType.OBJECT); - assertThat(object.numElements()).isEqualTo(0); - } - - @Test - public void testObject() throws IOException { - Variant variant = VariantBuilder.parseJson("{ \"id\": 1234, \"firstName\": \"Joe\", \"lastName\": \"Smith\", \"phones\":[\"123-456-7890\", \"789-123-4560\"] }"); - SerializedObject object = SerializedObject.from(variant); - - assertThat(object.type()).isEqualTo(Variants.PhysicalType.OBJECT); - assertThat(object.numElements()).isEqualTo(4); - - assertThat(object.get("id").asPrimitive().get()).isEqualTo((short)1234); - assertThat(object.get("firstName").asPrimitive().get()).isEqualTo("Joe"); - assertThat(object.get("lastName").asPrimitive().get()).isEqualTo("Smith"); - - VariantArray phones = object.get("phones").asArray(); - assertThat(phones.numElements()).isEqualTo(2); - assertThat(phones.get(0).asPrimitive().get()).isEqualTo("123-456-7890"); - assertThat(phones.get(1).asPrimitive().get()).isEqualTo("789-123-4560"); - } -} diff --git a/core/src/test/java/org/apache/iceberg/variants/TestVariantBuilderArray.java b/core/src/test/java/org/apache/iceberg/variants/TestVariantBuilderArray.java new file mode 100644 index 000000000000..20d820594b30 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/variants/TestVariantBuilderArray.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.util.List; +import net.minidev.json.JSONArray; +import org.apache.iceberg.util.DateTimeUtil; +import org.junit.jupiter.api.Test; + +public class TestVariantBuilderArray { + @Test + public void testSimpleArrayJson() throws IOException { + List input = List.of("Ford", "BMW", "Fiat"); + Variant variant = VariantBuilder.parseJson(JSONArray.toJSONString(input)); + VariantArray arr = variant.value().asArray(); + + assertThat(arr.type()).isEqualTo(Variants.PhysicalType.ARRAY); + for (int i = 0; i < arr.numElements(); i++) { + assertThat(arr.get(i).asPrimitive().get()).isEqualTo(input.get(i)); + } + } + + @Test + public void testArrayJson() throws IOException { + String input = + "[{\n" + + " \"firstName\": \"John\"," + + " \"lastName\": \"Smith\"," + + " \"age\": 25,\n" + + " \"address\" : {\n" + + " \"streetAddress\": \"21 2nd Street\",\n" + + " \"city\": \"New York\",\n" + + " \"state\": \"NY\",\n" + + " \"postalCode\": \"10021\"\n" + + " },\n" + + " \"phoneNumber\": [\n" + + " {\"type\": \"home\", \"number\": \"212 555-1234\"},\n" + + " {\"type\": \"fax\", \"number\": \"646 555-4567\"}\n" + + " ]\n" + + " }]"; + validateVariant(VariantBuilder.parseJson(input)); + } + + @Test + public void testBuildSimpleArray() { + List input = List.of("Ford", "BMW", "Fiat"); + VariantArrayBuilder builder = new VariantBuilder().startArray(); + for (String str : input) { + builder.writeString(str); + } + builder.endArray(); + + Variant variant = builder.build(); + VariantArray arr = variant.value().asArray(); + + assertThat(arr.type()).isEqualTo(Variants.PhysicalType.ARRAY); + assertThat(arr.numElements()).isEqualTo(3); + for (int i = 0; i < arr.numElements(); i++) { + assertThat(arr.get(i).asPrimitive().get()).isEqualTo(input.get(i)); + } + } + + @Test + public void testBuildArray() { + VariantArrayBuilder builder = new VariantBuilder().startArray(); + builder + .writeNull() + .writeBoolean(true) + .writeBoolean(false) + .writeNumeric(34) + .writeNumeric(1234) + .writeNumeric(1234567890) + .writeNumeric(1234567890987654321L) + .writeDouble(1234e-2) + .writeDecimal(new BigDecimal("123456.789")) + .writeDecimal(new BigDecimal("123456789.987654321")) + .writeDecimal(new BigDecimal("12345678901234567890.987654321")) + .writeDate(LocalDate.parse("2017-08-18")) + .writeTimestampTz(OffsetDateTime.parse("2017-08-18T14:21:01.919+00:00")) + .writeTimestampNtz(LocalDateTime.parse("2017-08-18T14:21:01.919")) + .writeFloat(12.34f) + .writeBinary("iceberg".getBytes()) + .writeString("This test string is used to generate a primitive string type of variant") + .writeString("iceberg"); + builder.startArray().writeString("Ford").writeString("BMW").writeString("Fiat").endArray(); + + builder + .startObject() + .writeString("firstName", "John") + .writeString("lastName", "Smith") + .writeNumeric("age", 25) + .endObject(); + builder.endArray(); + + Variant variant = builder.build(); + VariantArray arr = variant.value().asArray(); + assertThat(arr.type()).isEqualTo(Variants.PhysicalType.ARRAY); + assertThat(arr.numElements()).isEqualTo(20); + assertThat(arr.get(0).asPrimitive().get()).isNull(); + assertThat(arr.get(1).asPrimitive().get()).isEqualTo(true); + assertThat(arr.get(2).asPrimitive().get()).isEqualTo(false); + assertThat(arr.get(3).asPrimitive().get()).isEqualTo((byte) 34); + assertThat(arr.get(4).asPrimitive().get()).isEqualTo((short) 1234); + assertThat(arr.get(5).asPrimitive().get()).isEqualTo(1234567890); + assertThat(arr.get(6).asPrimitive().get()).isEqualTo(1234567890987654321L); + assertThat(arr.get(7).asPrimitive().get()).isEqualTo(12.34); + assertThat(arr.get(8).asPrimitive().get()).isEqualTo(new BigDecimal("123456.789")); + assertThat(arr.get(9).asPrimitive().get()).isEqualTo(new BigDecimal("123456789.987654321")); + assertThat(arr.get(10).asPrimitive().get()) + .isEqualTo(new BigDecimal("12345678901234567890.987654321")); + assertThat(arr.get(11).asPrimitive().get()) + .isEqualTo(DateTimeUtil.daysFromDate(LocalDate.parse("2017-08-18"))); + assertThat(arr.get(12).asPrimitive().get()) + .isEqualTo( + DateTimeUtil.microsFromTimestamptz( + OffsetDateTime.parse("2017-08-18T14:21:01.919+00:00"))); + assertThat(arr.get(13).asPrimitive().get()) + .isEqualTo( + DateTimeUtil.microsFromTimestamp(LocalDateTime.parse("2017-08-18T14:21:01.919"))); + assertThat(arr.get(14).asPrimitive().get()).isEqualTo(12.34f); + assertThat(arr.get(15).asPrimitive().get()).isEqualTo(ByteBuffer.wrap("iceberg".getBytes())); + assertThat(arr.get(16).asPrimitive().get()) + .isEqualTo("This test string is used to generate a primitive string type of variant"); + assertThat(arr.get(17).asPrimitive().get()).isEqualTo("iceberg"); + assertThat(arr.get(18).type()).isEqualTo(Variants.PhysicalType.ARRAY); + + assertThat(arr.get(19).type()).isEqualTo(Variants.PhysicalType.OBJECT); + } + + private void validateVariant(Variant variant) { + VariantArray arr = variant.value().asArray(); + assertThat(arr.numElements()).isEqualTo(1); + + VariantObject object = arr.get(0).asObject(); + assertThat(object.type()).isEqualTo(Variants.PhysicalType.OBJECT); + assertThat(object.numElements()).isEqualTo(5); + + assertThat(object.get("firstName").asPrimitive().get()).isEqualTo("John"); + assertThat(object.get("lastName").asPrimitive().get()).isEqualTo("Smith"); + assertThat(object.get("age").asPrimitive().get()).isEqualTo((byte) 25); + + VariantObject address = object.get("address").asObject(); + assertThat(address.type()).isEqualTo(Variants.PhysicalType.OBJECT); + assertThat(address.numElements()).isEqualTo(4); + assertThat(address.get("streetAddress").asPrimitive().get()).isEqualTo("21 2nd Street"); + assertThat(address.get("city").asPrimitive().get()).isEqualTo("New York"); + assertThat(address.get("state").asPrimitive().get()).isEqualTo("NY"); + assertThat(address.get("postalCode").asPrimitive().get()).isEqualTo("10021"); + + VariantArray phoneNumbers = object.get("phoneNumber").asArray(); + assertThat(phoneNumbers.numElements()).isEqualTo(2); + VariantObject phoneNumber1 = phoneNumbers.get(0).asObject(); + assertThat(phoneNumber1.get("type").asPrimitive().get()).isEqualTo("home"); + assertThat(phoneNumber1.get("number").asPrimitive().get()).isEqualTo("212 555-1234"); + VariantObject phoneNumber2 = phoneNumbers.get(1).asObject(); + assertThat(phoneNumber2.get("type").asPrimitive().get()).isEqualTo("fax"); + assertThat(phoneNumber2.get("number").asPrimitive().get()).isEqualTo("646 555-4567"); + } +} diff --git a/core/src/test/java/org/apache/iceberg/variants/TestVariantBuilderObject.java b/core/src/test/java/org/apache/iceberg/variants/TestVariantBuilderObject.java new file mode 100644 index 000000000000..26d6a4712fc6 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/variants/TestVariantBuilderObject.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import org.junit.jupiter.api.Test; + +public class TestVariantBuilderObject { + @Test + public void testEmptyObjectJson() throws IOException { + Variant variant = VariantBuilder.parseJson("{}"); + VariantObject object = variant.value().asObject(); + + assertThat(object.type()).isEqualTo(Variants.PhysicalType.OBJECT); + assertThat(object.numElements()).isEqualTo(0); + } + + @Test + public void testObjectJson() throws IOException { + String input = + "{\n" + + " \"firstName\": \"John\"," + + " \"lastName\": \"Smith\"," + + " \"age\": 25,\n" + + " \"address\" : {\n" + + " \"streetAddress\": \"21 2nd Street\",\n" + + " \"city\": \"New York\",\n" + + " \"state\": \"NY\",\n" + + " \"postalCode\": \"10021\"\n" + + " },\n" + + " \"phoneNumber\": [\n" + + " {\"type\": \"home\", \"number\": \"212 555-1234\"},\n" + + " {\"type\": \"fax\", \"number\": \"646 555-4567\"}\n" + + " ]\n" + + " }"; + + validateVariant(VariantBuilder.parseJson(input)); + } + + @Test + public void testBuildObject() { + VariantObjectBuilder builder = + new VariantBuilder() + .startObject() + .writeString("firstName", "John") + .writeString("lastName", "Smith") + .writeNumeric("age", 25); + builder + .startObject("address") + .writeString("streetAddress", "21 2nd Street") + .writeString("city", "New York") + .writeString("state", "NY") + .writeString("postalCode", "10021") + .endObject(); + VariantArrayBuilder phoneNumberBuilder = builder.startArray("phoneNumber"); + phoneNumberBuilder + .startObject() + .writeString("type", "home") + .writeString("number", "212 555-1234") + .endObject(); + phoneNumberBuilder + .startObject() + .writeString("type", "fax") + .writeString("number", "646 555-4567") + .endObject(); + phoneNumberBuilder.endArray(); + builder.endObject(); + + validateVariant(builder.build()); + } + + private void validateVariant(Variant variant) { + VariantObject object = variant.value().asObject(); + + assertThat(object.type()).isEqualTo(Variants.PhysicalType.OBJECT); + assertThat(object.numElements()).isEqualTo(5); + + assertThat(object.get("firstName").asPrimitive().get()).isEqualTo("John"); + assertThat(object.get("lastName").asPrimitive().get()).isEqualTo("Smith"); + assertThat(object.get("age").asPrimitive().get()).isEqualTo((byte) 25); + + VariantObject address = object.get("address").asObject(); + assertThat(address.type()).isEqualTo(Variants.PhysicalType.OBJECT); + assertThat(address.numElements()).isEqualTo(4); + assertThat(address.get("streetAddress").asPrimitive().get()).isEqualTo("21 2nd Street"); + assertThat(address.get("city").asPrimitive().get()).isEqualTo("New York"); + assertThat(address.get("state").asPrimitive().get()).isEqualTo("NY"); + assertThat(address.get("postalCode").asPrimitive().get()).isEqualTo("10021"); + + VariantArray phoneNumbers = object.get("phoneNumber").asArray(); + assertThat(phoneNumbers.numElements()).isEqualTo(2); + VariantObject phoneNumber1 = phoneNumbers.get(0).asObject(); + assertThat(phoneNumber1.get("type").asPrimitive().get()).isEqualTo("home"); + assertThat(phoneNumber1.get("number").asPrimitive().get()).isEqualTo("212 555-1234"); + VariantObject phoneNumber2 = phoneNumbers.get(1).asObject(); + assertThat(phoneNumber2.get("type").asPrimitive().get()).isEqualTo("fax"); + assertThat(phoneNumber2.get("number").asPrimitive().get()).isEqualTo("646 555-4567"); + } +} diff --git a/core/src/test/java/org/apache/iceberg/variants/TestVariantBuilderPrimitive.java b/core/src/test/java/org/apache/iceberg/variants/TestVariantBuilderPrimitive.java new file mode 100644 index 000000000000..aeb3148377bf --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/variants/TestVariantBuilderPrimitive.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.util.stream.Stream; +import org.apache.iceberg.util.DateTimeUtil; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +public class TestVariantBuilderPrimitive { + private static Stream primitiveInputs() { + return Stream.of( + Arguments.of("null", Variants.PhysicalType.NULL, null), + Arguments.of("true", Variants.PhysicalType.BOOLEAN_TRUE, true), + Arguments.of("false", Variants.PhysicalType.BOOLEAN_FALSE, false), + Arguments.of("34", Variants.PhysicalType.INT8, (byte) 34), + Arguments.of("1234", Variants.PhysicalType.INT16, (short) 1234), + Arguments.of("1234567890", Variants.PhysicalType.INT32, 1234567890), + Arguments.of("1234567890987654321", Variants.PhysicalType.INT64, 1234567890987654321L), + Arguments.of("1234e-2", Variants.PhysicalType.DOUBLE, 12.34), + Arguments.of("123456.789", Variants.PhysicalType.DECIMAL4, new BigDecimal("123456.789")), + Arguments.of( + "123456789.987654321", + Variants.PhysicalType.DECIMAL8, + new BigDecimal("123456789.987654321")), + Arguments.of( + "12345678901234567890.987654321", + Variants.PhysicalType.DECIMAL16, + new BigDecimal("12345678901234567890.987654321")), + Arguments.of( + "\"This test string is used to generate a primitive string type of variant\"", + Variants.PhysicalType.STRING, + "This test string is used to generate a primitive string type of variant")); + } + + @ParameterizedTest + @MethodSource("primitiveInputs") + public void testPrimitiveJson( + String input, Variants.PhysicalType expectedType, Object expectedValue) throws IOException { + Variant variant = VariantBuilder.parseJson(input); + VariantPrimitive primitive = variant.value().asPrimitive(); + + assertThat(primitive.type()).isEqualTo(expectedType); + assertThat(primitive.get()).isEqualTo(expectedValue); + } + + @Test + public void testShortStringJson() throws IOException { + Variant variant = VariantBuilder.parseJson("\"iceberg\""); + VariantPrimitive shortString = variant.value().asPrimitive(); + + assertThat(shortString.type()).isEqualTo(Variants.PhysicalType.STRING); + assertThat(shortString.get()).isEqualTo("iceberg"); + } + + @Test + public void testPrimitiveNull() { + VariantPrimitiveBuilder builder = new VariantBuilder().createPrimitive(); + builder.writeNull(); + Variant variant = builder.build(); + VariantPrimitive primitive = variant.value().asPrimitive(); + + assertThat(primitive.type()).isEqualTo(Variants.PhysicalType.NULL); + assertThat(primitive.get()).isEqualTo(null); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testPrimitiveBoolean(boolean value) { + VariantPrimitiveBuilder builder = new VariantBuilder().createPrimitive(); + builder.writeBoolean(value); + Variant variant = builder.build(); + VariantPrimitive primitive = variant.value().asPrimitive(); + + assertThat(primitive.type()) + .isEqualTo( + value ? Variants.PhysicalType.BOOLEAN_TRUE : Variants.PhysicalType.BOOLEAN_FALSE); + assertThat(primitive.get()).isEqualTo(value); + } + + private static Stream testPrimitiveNumericInputs() { + return Stream.of( + Arguments.of(34, Variants.PhysicalType.INT8, (byte) 34), + Arguments.of(1234, Variants.PhysicalType.INT16, (short) 1234), + Arguments.of(1234567890, Variants.PhysicalType.INT32, 1234567890), + Arguments.of(1234567890987654321L, Variants.PhysicalType.INT64, 1234567890987654321L)); + } + + @ParameterizedTest + @MethodSource("testPrimitiveNumericInputs") + public void testPrimitiveNumeric(long value, Variants.PhysicalType type, Object expectedValue) { + VariantPrimitiveBuilder builder = new VariantBuilder().createPrimitive(); + builder.writeNumeric(value); + Variant variant = builder.build(); + VariantPrimitive primitive = variant.value().asPrimitive(); + + assertThat(primitive.type()).isEqualTo(type); + assertThat(primitive.get()).isEqualTo(expectedValue); + } + + @Test + public void testPrimitiveDouble() { + VariantPrimitiveBuilder builder = new VariantBuilder().createPrimitive(); + builder.writeDouble(1234e-2); + Variant variant = builder.build(); + VariantPrimitive primitive = variant.value().asPrimitive(); + + assertThat(primitive.type()).isEqualTo(Variants.PhysicalType.DOUBLE); + assertThat(primitive.get()).isEqualTo(12.34); + } + + private static Stream testPrimitiveDecimalInputs() { + return Stream.of( + Arguments.of(new BigDecimal("123456.789"), Variants.PhysicalType.DECIMAL4), + Arguments.of(new BigDecimal("123456789.987654321"), Variants.PhysicalType.DECIMAL8), + Arguments.of( + new BigDecimal("12345678901234567890.987654321"), Variants.PhysicalType.DECIMAL16)); + } + + @ParameterizedTest + @MethodSource("testPrimitiveDecimalInputs") + public void testPrimitiveDecimal(BigDecimal value, Variants.PhysicalType type) { + VariantPrimitiveBuilder builder = new VariantBuilder().createPrimitive(); + builder.writeDecimal(value); + Variant variant = builder.build(); + VariantPrimitive primitive = variant.value().asPrimitive(); + + assertThat(primitive.type()).isEqualTo(type); + assertThat(primitive.get()).isEqualTo(value); + } + + @Test + public void testPrimitiveDate() { + String dateString = "2017-08-18"; + LocalDate date = LocalDate.parse(dateString); + + VariantPrimitiveBuilder builder = new VariantBuilder().createPrimitive(); + builder.writeDate(date); + Variant variant = builder.build(); + VariantPrimitive primitive = variant.value().asPrimitive(); + + assertThat(primitive.type()).isEqualTo(Variants.PhysicalType.DATE); + assertThat(primitive.get()).isEqualTo(DateTimeUtil.daysFromDate(date)); + } + + @Test + public void testPrimitiveTimestampTZ() { + String tzString = "2017-08-18T14:21:01.919+00:00"; + OffsetDateTime ts = OffsetDateTime.parse(tzString); + + VariantPrimitiveBuilder builder = new VariantBuilder().createPrimitive(); + builder.writeTimestampTz(ts); + Variant variant = builder.build(); + VariantPrimitive primitive = variant.value().asPrimitive(); + + assertThat(primitive.type()).isEqualTo(Variants.PhysicalType.TIMESTAMPTZ); + assertThat(primitive.get()).isEqualTo(DateTimeUtil.microsFromTimestamptz(ts)); + } + + @Test + public void testPrimitiveTimestampNTZ() { + String ntzString = "2017-08-18T14:21:01.919"; + LocalDateTime ts = LocalDateTime.parse(ntzString); + + VariantPrimitiveBuilder builder = new VariantBuilder().createPrimitive(); + builder.writeTimestampNtz(ts); + Variant variant = builder.build(); + VariantPrimitive primitive = variant.value().asPrimitive(); + + assertThat(primitive.type()).isEqualTo(Variants.PhysicalType.TIMESTAMPNTZ); + assertThat(primitive.get()).isEqualTo(DateTimeUtil.microsFromTimestamp(ts)); + } + + @Test + public void testPrimitiveFloat() { + VariantPrimitiveBuilder builder = new VariantBuilder().createPrimitive(); + builder.writeFloat(12.34f); + Variant variant = builder.build(); + VariantPrimitive primitive = variant.value().asPrimitive(); + + assertThat(primitive.type()).isEqualTo(Variants.PhysicalType.FLOAT); + assertThat(primitive.get()).isEqualTo(12.34f); + } + + @Test + public void testPrimitiveBinary() { + VariantPrimitiveBuilder builder = new VariantBuilder().createPrimitive(); + builder.writeBinary("iceberg".getBytes()); + Variant variant = builder.build(); + VariantPrimitive primitive = variant.value().asPrimitive(); + + assertThat(primitive.type()).isEqualTo(Variants.PhysicalType.BINARY); + assertThat(primitive.get()).isEqualTo(ByteBuffer.wrap("iceberg".getBytes())); + } + + @Test + public void testPrimitiveString() { + String value = "This test string is used to generate a primitive string type of variant"; + VariantPrimitiveBuilder builder = new VariantBuilder().createPrimitive(); + builder.writeString(value); + Variant variant = builder.build(); + VariantPrimitive primitive = variant.value().asPrimitive(); + + assertThat(primitive.type()).isEqualTo(Variants.PhysicalType.STRING); + assertThat(primitive.get()).isEqualTo(value); + } + + @Test + public void testPrimitiveShortString() { + String value = "iceberg"; + VariantPrimitiveBuilder builder = new VariantBuilder().createPrimitive(); + builder.writeString(value); + Variant variant = builder.build(); + VariantPrimitive shortString = variant.value().asPrimitive(); + + assertThat(shortString.type()).isEqualTo(Variants.PhysicalType.STRING); + assertThat(shortString.get()).isEqualTo("iceberg"); + } +}