From afd08b1f9cf48769f5d5d9f20da07b28c16cdcb2 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 22 Jan 2025 15:45:03 -0800 Subject: [PATCH 01/12] Core: Add InternalData read and write builders. --- .../java/org/apache/iceberg/InternalData.java | 148 ++++++++++++++++++ .../org/apache/iceberg/ManifestReader.java | 33 ++-- .../java/org/apache/iceberg/avro/Avro.java | 39 ++++- .../apache/iceberg/avro/InternalReader.java | 11 +- .../iceberg/avro/SupportsCustomRecords.java | 2 +- .../iceberg/avro/SupportsCustomTypes.java | 31 ++++ 6 files changed, 237 insertions(+), 27 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/InternalData.java create mode 100644 core/src/main/java/org/apache/iceberg/avro/SupportsCustomTypes.java diff --git a/core/src/main/java/org/apache/iceberg/InternalData.java b/core/src/main/java/org/apache/iceberg/InternalData.java new file mode 100644 index 000000000000..2faf567c7acc --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/InternalData.java @@ -0,0 +1,148 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, + * * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * * KIND, either express or implied. See the License for the + * * specific language governing permissions and limitations + * * under the License. + * + */ + +package org.apache.iceberg; + +import java.io.IOException; +import java.util.Map; +import java.util.function.Function; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class InternalData { + private InternalData() {} + + private static final Logger LOG = LoggerFactory.getLogger(InternalData.class); + private static final Map> WRITE_BUILDERS = + Maps.newConcurrentMap(); + private static final Map> READ_BUILDERS = + Maps.newConcurrentMap(); + + static { + Avro.register(); + + try { + DynMethods.StaticMethod registerParquet = + DynMethods.builder("register") + .impl("org.apache.iceberg.parquet.Parquet") + .buildStaticChecked(); + + registerParquet.invoke(); + + } catch (NoSuchMethodException e) { + // failing to load Parquet is normal and does not require a stack trace + LOG.info("Unable to register Parquet for metadata files: {}", e.getMessage()); + } + } + + public static void register( + FileFormat format, + Function writeBuilder, + Function readBuilder) { + WRITE_BUILDERS.put(format, writeBuilder); + READ_BUILDERS.put(format, readBuilder); + } + + public static WriteBuilder write(FileFormat format, OutputFile file) { + Function writeBuilder = WRITE_BUILDERS.get(format); + if (writeBuilder != null) { + return writeBuilder.apply(file); + } + + throw new UnsupportedOperationException( + "Cannot write using unregistered internal data format: " + format); + } + + public static ReadBuilder read(FileFormat format, InputFile file) { + Function readBuilder = READ_BUILDERS.get(format); + if (readBuilder != null) { + return readBuilder.apply(file); + } + + throw new UnsupportedOperationException( + "Cannot read using unregistered internal data format: " + format); + } + + public interface WriteBuilder { + /** Set the file schema. */ + WriteBuilder schema(Schema schema); + + /** Set the file schema's root name. */ + WriteBuilder named(String name); + + /** + * Set a writer configuration property. + * + *

Write configuration affects writer behavior. To add file metadata properties, use {@link + * #meta(String, String)}. + * + * @param property a writer config property name + * @param value config value + * @return this for method chaining + */ + WriteBuilder set(String property, String value); + + /** + * Set a file metadata property. + * + *

Metadata properties are written into file metadata. To alter a writer configuration + * property, use {@link #set(String, String)}. + * + * @param property a file metadata property name + * @param value config value + * @return this for method chaining + */ + WriteBuilder meta(String property, String value); + + /** Overwrite the file if it already exists. */ + WriteBuilder overwrite(); + + /** Build the configured {@link FileAppender}. */ + FileAppender build() throws IOException; + } + + public interface ReadBuilder { + /** Set the projection schema. */ + ReadBuilder project(Schema projectedSchema); + + /** Read only the split that is {@code length} bytes starting at {@code start}. */ + ReadBuilder split(long start, long length); + + /** Reuse container classes, like structs, lists, and maps. */ + ReadBuilder reuseContainers(); + + /** Set a custom class for in-memory objects at the schema root. */ + ReadBuilder setRootType(Class rootClass); + + /** Set a custom class for in-memory objects at the given field ID. */ + ReadBuilder setCustomType(int fieldId, Class structClass); + + /** Build the configured reader. */ + CloseableIterable build(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java index cf04eb7c472a..22a93a85ca6b 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestReader.java +++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java @@ -25,10 +25,8 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.avro.io.DatumReader; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.avro.AvroIterable; -import org.apache.iceberg.avro.InternalReader; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; @@ -258,29 +256,18 @@ private CloseableIterable> open(Schema projection) { fields.addAll(projection.asStruct().fields()); fields.add(MetadataColumns.ROW_POSITION); - switch (format) { - case AVRO: - AvroIterable> reader = - Avro.read(file) - .project(ManifestEntry.wrapFileSchema(Types.StructType.of(fields))) - .createResolvingReader(this::newReader) - .reuseContainers() - .build(); + CloseableIterable> reader = + InternalData.read(format, file) + .project(ManifestEntry.wrapFileSchema(Types.StructType.of(fields))) + .setRootType(GenericManifestEntry.class) + .setCustomType(ManifestEntry.DATA_FILE_ID, content.fileClass()) + .setCustomType(DataFile.PARTITION_ID, PartitionData.class) + .reuseContainers() + .build(); - addCloseable(reader); + addCloseable(reader); - return CloseableIterable.transform(reader, inheritableMetadata::apply); - - default: - throw new UnsupportedOperationException("Invalid format for manifest file: " + format); - } - } - - private DatumReader newReader(Schema schema) { - return InternalReader.create(schema) - .setRootType(GenericManifestEntry.class) - .setCustomType(ManifestEntry.DATA_FILE_ID, content.fileClass()) - .setCustomType(DataFile.PARTITION_ID, PartitionData.class); + return CloseableIterable.transform(reader, inheritableMetadata::apply); } CloseableIterable> liveEntries() { diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index 0eaa3f2d2400..4588ec3f80df 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -46,6 +46,7 @@ import org.apache.avro.specific.SpecificData; import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.InternalData; import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.SchemaParser; @@ -71,6 +72,18 @@ public class Avro { private Avro() {} + public static void register() { + InternalData.register(FileFormat.AVRO, Avro::writeInternal, Avro::readInternal); + } + + private static WriteBuilder writeInternal(OutputFile outputFile) { + return write(outputFile).createWriterFunc(InternalWriter::create); + } + + private static ReadBuilder readInternal(InputFile inputFile) { + return read(inputFile).createResolvingReader(InternalReader::create); + } + private enum Codec { UNCOMPRESSED, SNAPPY, @@ -90,6 +103,10 @@ private enum Codec { } public static WriteBuilder write(OutputFile file) { + if (file instanceof EncryptedOutputFile) { + return write((EncryptedOutputFile) file); + } + return new WriteBuilder(file); } @@ -97,7 +114,7 @@ public static WriteBuilder write(EncryptedOutputFile file) { return new WriteBuilder(file.encryptingOutputFile()); } - public static class WriteBuilder { + public static class WriteBuilder implements InternalData.WriteBuilder { private final OutputFile file; private final Map config = Maps.newHashMap(); private final Map metadata = Maps.newLinkedHashMap(); @@ -615,9 +632,11 @@ public static ReadBuilder read(InputFile file) { return new ReadBuilder(file); } - public static class ReadBuilder { + public static class ReadBuilder implements InternalData.ReadBuilder { private final InputFile file; private final Map renames = Maps.newLinkedHashMap(); + private final Map> typeMap = Maps.newHashMap(); + private Class rootType = null; private ClassLoader loader = Thread.currentThread().getContextClassLoader(); private NameMapping nameMapping; private boolean reuseContainers = false; @@ -701,6 +720,18 @@ public ReadBuilder rename(String fullName, String newName) { return this; } + @Override + public InternalData.ReadBuilder setRootType(Class rootClass) { + this.rootType = rootClass; + return this; + } + + @Override + public InternalData.ReadBuilder setCustomType(int fieldId, Class structClass) { + typeMap.put(fieldId, structClass); + return this; + } + public ReadBuilder withNameMapping(NameMapping newNameMapping) { this.nameMapping = newNameMapping; return this; @@ -737,6 +768,10 @@ public AvroIterable build() { ((SupportsCustomRecords) reader).setRenames(renames); } + if (reader instanceof SupportsCustomTypes) { + ((SupportsCustomTypes) reader).setCustomTypes(rootType, typeMap); + } + return new AvroIterable<>( file, new NameMappingDatumReader<>(nameMapping, reader), start, length, reuseContainers); } diff --git a/core/src/main/java/org/apache/iceberg/avro/InternalReader.java b/core/src/main/java/org/apache/iceberg/avro/InternalReader.java index 0dab3646d4d4..e7bf0b7553d8 100644 --- a/core/src/main/java/org/apache/iceberg/avro/InternalReader.java +++ b/core/src/main/java/org/apache/iceberg/avro/InternalReader.java @@ -42,7 +42,7 @@ * * @param Java type returned by the reader */ -public class InternalReader implements DatumReader, SupportsRowPosition { +public class InternalReader implements DatumReader, SupportsRowPosition, SupportsCustomTypes { private static final int ROOT_ID = -1; private final Types.StructType expectedType; @@ -76,6 +76,15 @@ public void setSchema(Schema schema) { initReader(); } + @Override + public void setCustomTypes( + Class rootType, Map> typesById) { + setRootType(rootType); + for (Map.Entry> entry : typesById.entrySet()) { + setCustomType(entry.getKey(), entry.getValue()); + } + } + public InternalReader setRootType(Class rootClass) { typeMap.put(ROOT_ID, rootClass); return this; diff --git a/core/src/main/java/org/apache/iceberg/avro/SupportsCustomRecords.java b/core/src/main/java/org/apache/iceberg/avro/SupportsCustomRecords.java index c05e206e1307..c29dc28cd8ba 100644 --- a/core/src/main/java/org/apache/iceberg/avro/SupportsCustomRecords.java +++ b/core/src/main/java/org/apache/iceberg/avro/SupportsCustomRecords.java @@ -20,7 +20,7 @@ import java.util.Map; -/** An interface for Avro DatumReaders to support custom record classes. */ +/** An interface for Avro DatumReaders to support custom record classes by name. */ interface SupportsCustomRecords { void setClassLoader(ClassLoader loader); diff --git a/core/src/main/java/org/apache/iceberg/avro/SupportsCustomTypes.java b/core/src/main/java/org/apache/iceberg/avro/SupportsCustomTypes.java new file mode 100644 index 000000000000..e7e9a060eb2f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/avro/SupportsCustomTypes.java @@ -0,0 +1,31 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, + * * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * * KIND, either express or implied. See the License for the + * * specific language governing permissions and limitations + * * under the License. + * + */ + +package org.apache.iceberg.avro; + +import java.util.Map; +import org.apache.iceberg.StructLike; + +/** An interface to support custom record types by ID. */ +public interface SupportsCustomTypes { + void setCustomTypes( + Class rootType, Map> typesById); +} From 1262417ba9b164fa124118b3a43a716e27e4e383 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 22 Jan 2025 16:31:36 -0800 Subject: [PATCH 02/12] Use InternalData in ManifestWriter and ManifestListWriter. --- .../java/org/apache/iceberg/InternalData.java | 14 +++ .../apache/iceberg/ManifestListWriter.java | 17 ++-- .../org/apache/iceberg/ManifestWriter.java | 28 +++--- .../java/org/apache/iceberg/V1Metadata.java | 93 ++++++++++--------- .../java/org/apache/iceberg/V2Metadata.java | 91 +++++++++--------- .../java/org/apache/iceberg/V3Metadata.java | 91 +++++++++--------- 6 files changed, 181 insertions(+), 153 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/InternalData.java b/core/src/main/java/org/apache/iceberg/InternalData.java index 2faf567c7acc..586853371f13 100644 --- a/core/src/main/java/org/apache/iceberg/InternalData.java +++ b/core/src/main/java/org/apache/iceberg/InternalData.java @@ -119,6 +119,20 @@ public interface WriteBuilder { */ WriteBuilder meta(String property, String value); + /** + * Set a file metadata properties from a Map. + * + *

Metadata properties are written into file metadata. To alter a writer configuration + * property, use {@link #set(String, String)}. + * + * @param properties a map of file metadata properties + * @return this for method chaining + */ + default WriteBuilder meta(Map properties) { + properties.forEach(this::meta); + return this; + } + /** Overwrite the file if it already exists. */ WriteBuilder overwrite(); diff --git a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java index b17eedad18af..e70acd4ee159 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.Iterator; import java.util.Map; -import org.apache.iceberg.avro.Avro; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.OutputFile; @@ -71,7 +70,7 @@ public long length() { } static class V3Writer extends ManifestListWriter { - private final V3Metadata.IndexedManifestFile wrapper; + private final V3Metadata.ManifestFileWrapper wrapper; V3Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId, long sequenceNumber) { super( @@ -81,7 +80,7 @@ static class V3Writer extends ManifestListWriter { "parent-snapshot-id", String.valueOf(parentSnapshotId), "sequence-number", String.valueOf(sequenceNumber), "format-version", "3")); - this.wrapper = new V3Metadata.IndexedManifestFile(snapshotId, sequenceNumber); + this.wrapper = new V3Metadata.ManifestFileWrapper(snapshotId, sequenceNumber); } @Override @@ -92,7 +91,7 @@ protected ManifestFile prepare(ManifestFile manifest) { @Override protected FileAppender newAppender(OutputFile file, Map meta) { try { - return Avro.write(file) + return InternalData.write(FileFormat.AVRO, file) .schema(V3Metadata.MANIFEST_LIST_SCHEMA) .named("manifest_file") .meta(meta) @@ -106,7 +105,7 @@ protected FileAppender newAppender(OutputFile file, Map newAppender(OutputFile file, Map meta) { try { - return Avro.write(file) + return InternalData.write(FileFormat.AVRO, file) .schema(V2Metadata.MANIFEST_LIST_SCHEMA) .named("manifest_file") .meta(meta) @@ -141,7 +140,7 @@ protected FileAppender newAppender(OutputFile file, Map newAppender(OutputFile file, Map meta) { try { - return Avro.write(file) + return InternalData.write(FileFormat.AVRO, file) .schema(V1Metadata.MANIFEST_LIST_SCHEMA) .named("manifest_file") .meta(meta) diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index fbfc62b94fe4..496fa8d38e83 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -219,11 +219,11 @@ public void close() throws IOException { } static class V3Writer extends ManifestWriter { - private final V3Metadata.IndexedManifestEntry entryWrapper; + private final V3Metadata.ManifestEntryWrapper entryWrapper; V3Writer(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) { super(spec, file, snapshotId); - this.entryWrapper = new V3Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType()); + this.entryWrapper = new V3Metadata.ManifestEntryWrapper<>(snapshotId); } @Override @@ -253,11 +253,11 @@ protected FileAppender> newAppender( } static class V3DeleteWriter extends ManifestWriter { - private final V3Metadata.IndexedManifestEntry entryWrapper; + private final V3Metadata.ManifestEntryWrapper entryWrapper; V3DeleteWriter(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) { super(spec, file, snapshotId); - this.entryWrapper = new V3Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType()); + this.entryWrapper = new V3Metadata.ManifestEntryWrapper<>(snapshotId); } @Override @@ -270,7 +270,7 @@ protected FileAppender> newAppender( PartitionSpec spec, OutputFile file) { Schema manifestSchema = V3Metadata.entrySchema(spec.partitionType()); try { - return Avro.write(file) + return InternalData.write(FileFormat.AVRO, file) .schema(manifestSchema) .named("manifest_entry") .meta("schema", SchemaParser.toJson(spec.schema())) @@ -292,11 +292,11 @@ protected ManifestContent content() { } static class V2Writer extends ManifestWriter { - private final V2Metadata.IndexedManifestEntry entryWrapper; + private final V2Metadata.ManifestEntryWrapper entryWrapper; V2Writer(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) { super(spec, file, snapshotId); - this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType()); + this.entryWrapper = new V2Metadata.ManifestEntryWrapper<>(snapshotId); } @Override @@ -309,7 +309,7 @@ protected FileAppender> newAppender( PartitionSpec spec, OutputFile file) { Schema manifestSchema = V2Metadata.entrySchema(spec.partitionType()); try { - return Avro.write(file) + return InternalData.write(FileFormat.AVRO, file) .schema(manifestSchema) .named("manifest_entry") .meta("schema", SchemaParser.toJson(spec.schema())) @@ -326,11 +326,11 @@ protected FileAppender> newAppender( } static class V2DeleteWriter extends ManifestWriter { - private final V2Metadata.IndexedManifestEntry entryWrapper; + private final V2Metadata.ManifestEntryWrapper entryWrapper; V2DeleteWriter(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) { super(spec, file, snapshotId); - this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType()); + this.entryWrapper = new V2Metadata.ManifestEntryWrapper<>(snapshotId); } @Override @@ -343,7 +343,7 @@ protected FileAppender> newAppender( PartitionSpec spec, OutputFile file) { Schema manifestSchema = V2Metadata.entrySchema(spec.partitionType()); try { - return Avro.write(file) + return InternalData.write(FileFormat.AVRO, file) .schema(manifestSchema) .named("manifest_entry") .meta("schema", SchemaParser.toJson(spec.schema())) @@ -365,11 +365,11 @@ protected ManifestContent content() { } static class V1Writer extends ManifestWriter { - private final V1Metadata.IndexedManifestEntry entryWrapper; + private final V1Metadata.ManifestEntryWrapper entryWrapper; V1Writer(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) { super(spec, file, snapshotId); - this.entryWrapper = new V1Metadata.IndexedManifestEntry(spec.partitionType()); + this.entryWrapper = new V1Metadata.ManifestEntryWrapper(); } @Override @@ -382,7 +382,7 @@ protected FileAppender> newAppender( PartitionSpec spec, OutputFile file) { Schema manifestSchema = V1Metadata.entrySchema(spec.partitionType()); try { - return Avro.write(file) + return InternalData.write(FileFormat.AVRO, file) .schema(manifestSchema) .named("manifest_entry") .meta("schema", SchemaParser.toJson(spec.schema())) diff --git a/core/src/main/java/org/apache/iceberg/V1Metadata.java b/core/src/main/java/org/apache/iceberg/V1Metadata.java index 81fd65d99803..34bcd691a9cf 100644 --- a/core/src/main/java/org/apache/iceberg/V1Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V1Metadata.java @@ -24,8 +24,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.avro.generic.IndexedRecord; -import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.types.Types; class V1Metadata { @@ -52,10 +50,7 @@ private V1Metadata() {} *

This is used to maintain compatibility with v1 by writing manifest list files with the old * schema, instead of writing a sequence number into metadata files in v1 tables. */ - static class IndexedManifestFile implements ManifestFile, IndexedRecord { - private static final org.apache.avro.Schema AVRO_SCHEMA = - AvroSchemaUtil.convert(MANIFEST_LIST_SCHEMA, "manifest_file"); - + static class ManifestFileWrapper implements ManifestFile, StructLike { private ManifestFile wrapped = null; public ManifestFile wrap(ManifestFile file) { @@ -64,17 +59,21 @@ public ManifestFile wrap(ManifestFile file) { } @Override - public org.apache.avro.Schema getSchema() { - return AVRO_SCHEMA; + public int size() { + return MANIFEST_LIST_SCHEMA.columns().size(); } @Override - public void put(int i, Object v) { - throw new UnsupportedOperationException("Cannot modify IndexedManifestFile wrapper via put"); + public void set(int pos, T value) { + throw new UnsupportedOperationException("Cannot modify ManifestFileWrapper wrapper via set"); } @Override - public Object get(int pos) { + public T get(int pos, Class javaClass) { + return javaClass.cast(get(pos)); + } + + private Object get(int pos) { switch (pos) { case 0: return path(); @@ -236,34 +235,38 @@ static Types.StructType dataFileSchema(Types.StructType partitionType) { } /** Wrapper used to write a ManifestEntry to v1 metadata. */ - static class IndexedManifestEntry implements ManifestEntry, IndexedRecord { - private final org.apache.avro.Schema avroSchema; - private final IndexedDataFile fileWrapper; + static class ManifestEntryWrapper implements ManifestEntry, StructLike { + private final int size; + private final DataFileWrapper fileWrapper; private ManifestEntry wrapped = null; - IndexedManifestEntry(Types.StructType partitionType) { - this.avroSchema = AvroSchemaUtil.convert(entrySchema(partitionType), "manifest_entry"); - this.fileWrapper = new IndexedDataFile(avroSchema.getField("data_file").schema()); + ManifestEntryWrapper() { + this.size = entrySchema(Types.StructType.of()).columns().size(); + this.fileWrapper = new DataFileWrapper(); } - public IndexedManifestEntry wrap(ManifestEntry entry) { + public ManifestEntryWrapper wrap(ManifestEntry entry) { this.wrapped = entry; return this; } @Override - public org.apache.avro.Schema getSchema() { - return avroSchema; + public int size() { + return size; } @Override - public void put(int i, Object v) { - throw new UnsupportedOperationException("Cannot modify IndexedManifestEntry wrapper via put"); + public void set(int pos, T value) { + throw new UnsupportedOperationException("Cannot modify ManifestEntryWrapper wrapper via set"); } @Override - public Object get(int i) { - switch (i) { + public T get(int pos, Class javaClass) { + return javaClass.cast(get(pos)); + } + + private Object get(int pos) { + switch (pos) { case 0: return wrapped.status().id(); case 1: @@ -275,7 +278,7 @@ public Object get(int i) { } return null; default: - throw new UnsupportedOperationException("Unknown field ordinal: " + i); + throw new UnsupportedOperationException("Unknown field ordinal: " + pos); } } @@ -330,32 +333,44 @@ public ManifestEntry copyWithoutStats() { } } - static class IndexedDataFile implements DataFile, IndexedRecord { + static class DataFileWrapper implements DataFile, StructLike { private static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024; - private final org.apache.avro.Schema avroSchema; - private final IndexedStructLike partitionWrapper; + private final int size; private DataFile wrapped = null; - IndexedDataFile(org.apache.avro.Schema avroSchema) { - this.avroSchema = avroSchema; - this.partitionWrapper = new IndexedStructLike(avroSchema.getField("partition").schema()); + DataFileWrapper() { + this.size = dataFileSchema(Types.StructType.of()).fields().size(); } - IndexedDataFile wrap(DataFile file) { + DataFileWrapper wrap(DataFile file) { this.wrapped = file; return this; } @Override - public Object get(int pos) { + public int size() { + return size; + } + + @Override + public void set(int pos, T value) { + throw new UnsupportedOperationException("Cannot modify DataFileWrapper wrapper via set"); + } + + @Override + public T get(int pos, Class javaClass) { + return javaClass.cast(get(pos)); + } + + private Object get(int pos) { switch (pos) { case 0: return wrapped.location(); case 1: return wrapped.format() != null ? wrapped.format().toString() : null; case 2: - return partitionWrapper.wrap(wrapped.partition()); + return wrapped.partition(); case 3: return wrapped.recordCount(); case 4: @@ -384,16 +399,6 @@ public Object get(int pos) { throw new IllegalArgumentException("Unknown field ordinal: " + pos); } - @Override - public void put(int i, Object v) { - throw new UnsupportedOperationException("Cannot modify IndexedDataFile wrapper via put"); - } - - @Override - public org.apache.avro.Schema getSchema() { - return avroSchema; - } - @Override public Long pos() { return null; diff --git a/core/src/main/java/org/apache/iceberg/V2Metadata.java b/core/src/main/java/org/apache/iceberg/V2Metadata.java index 2b98cd767c94..832e5c383fe5 100644 --- a/core/src/main/java/org/apache/iceberg/V2Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V2Metadata.java @@ -24,8 +24,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.avro.generic.IndexedRecord; -import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.Types; @@ -56,15 +54,12 @@ private V2Metadata() {} *

This is used to maintain compatibility with v2 by writing manifest list files with the old * schema, instead of writing a sequence number into metadata files in v2 tables. */ - static class IndexedManifestFile implements ManifestFile, IndexedRecord { - private static final org.apache.avro.Schema AVRO_SCHEMA = - AvroSchemaUtil.convert(MANIFEST_LIST_SCHEMA, "manifest_file"); - + static class ManifestFileWrapper implements ManifestFile, StructLike { private final long commitSnapshotId; private final long sequenceNumber; private ManifestFile wrapped = null; - IndexedManifestFile(long commitSnapshotId, long sequenceNumber) { + ManifestFileWrapper(long commitSnapshotId, long sequenceNumber) { this.commitSnapshotId = commitSnapshotId; this.sequenceNumber = sequenceNumber; } @@ -75,17 +70,21 @@ public ManifestFile wrap(ManifestFile file) { } @Override - public org.apache.avro.Schema getSchema() { - return AVRO_SCHEMA; + public int size() { + return MANIFEST_LIST_SCHEMA.columns().size(); } @Override - public void put(int i, Object v) { - throw new UnsupportedOperationException("Cannot modify IndexedManifestFile wrapper via put"); + public void set(int pos, T value) { + throw new UnsupportedOperationException("Cannot modify ManifestFileWrapper wrapper via set"); } @Override - public Object get(int pos) { + public T get(int pos, Class javaClass) { + return javaClass.cast(get(pos)); + } + + private Object get(int pos) { switch (pos) { case 0: return wrapped.path(); @@ -278,37 +277,41 @@ static Types.StructType fileType(Types.StructType partitionType) { DataFile.REFERENCED_DATA_FILE); } - static class IndexedManifestEntry> - implements ManifestEntry, IndexedRecord { - private final org.apache.avro.Schema avroSchema; + static class ManifestEntryWrapper> + implements ManifestEntry, StructLike { + private final int size; private final Long commitSnapshotId; - private final IndexedDataFile fileWrapper; + private final DataFileWrapper fileWrapper; private ManifestEntry wrapped = null; - IndexedManifestEntry(Long commitSnapshotId, Types.StructType partitionType) { - this.avroSchema = AvroSchemaUtil.convert(entrySchema(partitionType), "manifest_entry"); + ManifestEntryWrapper(Long commitSnapshotId) { + this.size = entrySchema(Types.StructType.of()).columns().size(); this.commitSnapshotId = commitSnapshotId; - this.fileWrapper = new IndexedDataFile<>(partitionType); + this.fileWrapper = new DataFileWrapper<>(); } - public IndexedManifestEntry wrap(ManifestEntry entry) { + public ManifestEntryWrapper wrap(ManifestEntry entry) { this.wrapped = entry; return this; } @Override - public org.apache.avro.Schema getSchema() { - return avroSchema; + public int size() { + return size; } @Override - public void put(int i, Object v) { - throw new UnsupportedOperationException("Cannot modify IndexedManifestEntry wrapper via put"); + public void set(int pos, T value) { + throw new UnsupportedOperationException("Cannot modify ManifestEntryWrapper wrapper via set"); } @Override - public Object get(int i) { - switch (i) { + public T get(int pos, Class javaClass) { + return javaClass.cast(get(pos)); + } + + private Object get(int pos) { + switch (pos) { case 0: return wrapped.status().id(); case 1: @@ -337,7 +340,7 @@ public Object get(int i) { case 4: return fileWrapper.wrap(wrapped.file()); default: - throw new UnsupportedOperationException("Unknown field ordinal: " + i); + throw new UnsupportedOperationException("Unknown field ordinal: " + pos); } } @@ -393,29 +396,36 @@ public ManifestEntry copyWithoutStats() { } /** Wrapper used to write DataFile or DeleteFile to v2 metadata. */ - static class IndexedDataFile implements ContentFile, IndexedRecord { - private final org.apache.avro.Schema avroSchema; - private final IndexedStructLike partitionWrapper; + static class DataFileWrapper implements ContentFile, StructLike { + private final int size; private ContentFile wrapped = null; - IndexedDataFile(Types.StructType partitionType) { - this.avroSchema = AvroSchemaUtil.convert(fileType(partitionType), "data_file"); - this.partitionWrapper = new IndexedStructLike(avroSchema.getField("partition").schema()); + DataFileWrapper() { + this.size = fileType(Types.StructType.of()).fields().size(); } @SuppressWarnings("unchecked") - IndexedDataFile wrap(ContentFile file) { + DataFileWrapper wrap(ContentFile file) { this.wrapped = (ContentFile) file; return this; } @Override - public org.apache.avro.Schema getSchema() { - return avroSchema; + public int size() { + return size; } @Override - public Object get(int pos) { + public void set(int pos, T value) { + throw new UnsupportedOperationException("Cannot modify DataFileWrapper wrapper via set"); + } + + @Override + public T get(int pos, Class javaClass) { + return javaClass.cast(get(pos)); + } + + private Object get(int pos) { switch (pos) { case 0: return wrapped.content().id(); @@ -424,7 +434,7 @@ public Object get(int pos) { case 2: return wrapped.format() != null ? wrapped.format().toString() : null; case 3: - return partitionWrapper.wrap(wrapped.partition()); + return wrapped.partition(); case 4: return wrapped.recordCount(); case 5: @@ -459,11 +469,6 @@ public Object get(int pos) { throw new IllegalArgumentException("Unknown field ordinal: " + pos); } - @Override - public void put(int i, Object v) { - throw new UnsupportedOperationException("Cannot modify IndexedDataFile wrapper via put"); - } - @Override public Long pos() { return null; diff --git a/core/src/main/java/org/apache/iceberg/V3Metadata.java b/core/src/main/java/org/apache/iceberg/V3Metadata.java index 458796d4752a..d9134951dabf 100644 --- a/core/src/main/java/org/apache/iceberg/V3Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V3Metadata.java @@ -24,8 +24,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.avro.generic.IndexedRecord; -import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.Types; @@ -56,15 +54,12 @@ private V3Metadata() {} *

This is used to maintain compatibility with v3 by writing manifest list files with the old * schema, instead of writing a sequence number into metadata files in v3 tables. */ - static class IndexedManifestFile implements ManifestFile, IndexedRecord { - private static final org.apache.avro.Schema AVRO_SCHEMA = - AvroSchemaUtil.convert(MANIFEST_LIST_SCHEMA, "manifest_file"); - + static class ManifestFileWrapper implements ManifestFile, StructLike { private final long commitSnapshotId; private final long sequenceNumber; private ManifestFile wrapped = null; - IndexedManifestFile(long commitSnapshotId, long sequenceNumber) { + ManifestFileWrapper(long commitSnapshotId, long sequenceNumber) { this.commitSnapshotId = commitSnapshotId; this.sequenceNumber = sequenceNumber; } @@ -75,17 +70,21 @@ public ManifestFile wrap(ManifestFile file) { } @Override - public org.apache.avro.Schema getSchema() { - return AVRO_SCHEMA; + public int size() { + return MANIFEST_LIST_SCHEMA.columns().size(); } @Override - public void put(int i, Object v) { - throw new UnsupportedOperationException("Cannot modify IndexedManifestFile wrapper via put"); + public void set(int pos, T value) { + throw new UnsupportedOperationException("Cannot modify ManifestFileWrapper wrapper via set"); } @Override - public Object get(int pos) { + public T get(int pos, Class javaClass) { + return javaClass.cast(get(pos)); + } + + private Object get(int pos) { switch (pos) { case 0: return wrapped.path(); @@ -280,37 +279,41 @@ static Types.StructType fileType(Types.StructType partitionType) { DataFile.CONTENT_SIZE); } - static class IndexedManifestEntry> - implements ManifestEntry, IndexedRecord { - private final org.apache.avro.Schema avroSchema; + static class ManifestEntryWrapper> + implements ManifestEntry, StructLike { + private final int size; private final Long commitSnapshotId; - private final IndexedDataFile fileWrapper; + private final DataFileWrapper fileWrapper; private ManifestEntry wrapped = null; - IndexedManifestEntry(Long commitSnapshotId, Types.StructType partitionType) { - this.avroSchema = AvroSchemaUtil.convert(entrySchema(partitionType), "manifest_entry"); + ManifestEntryWrapper(Long commitSnapshotId) { + this.size = entrySchema(Types.StructType.of()).columns().size(); this.commitSnapshotId = commitSnapshotId; - this.fileWrapper = new IndexedDataFile<>(partitionType); + this.fileWrapper = new DataFileWrapper<>(); } - public IndexedManifestEntry wrap(ManifestEntry entry) { + public ManifestEntryWrapper wrap(ManifestEntry entry) { this.wrapped = entry; return this; } @Override - public org.apache.avro.Schema getSchema() { - return avroSchema; + public int size() { + return size; } @Override - public void put(int i, Object v) { - throw new UnsupportedOperationException("Cannot modify IndexedManifestEntry wrapper via put"); + public void set(int pos, T value) { + throw new UnsupportedOperationException("Cannot modify ManifestEntryWrapper wrapper via set"); } @Override - public Object get(int i) { - switch (i) { + public T get(int pos, Class javaClass) { + return javaClass.cast(get(pos)); + } + + private Object get(int pos) { + switch (pos) { case 0: return wrapped.status().id(); case 1: @@ -339,7 +342,7 @@ public Object get(int i) { case 4: return fileWrapper.wrap(wrapped.file()); default: - throw new UnsupportedOperationException("Unknown field ordinal: " + i); + throw new UnsupportedOperationException("Unknown field ordinal: " + pos); } } @@ -395,29 +398,36 @@ public ManifestEntry copyWithoutStats() { } /** Wrapper used to write DataFile or DeleteFile to v3 metadata. */ - static class IndexedDataFile implements ContentFile, IndexedRecord { - private final org.apache.avro.Schema avroSchema; - private final IndexedStructLike partitionWrapper; + static class DataFileWrapper implements ContentFile, StructLike { + private final int size; private ContentFile wrapped = null; - IndexedDataFile(Types.StructType partitionType) { - this.avroSchema = AvroSchemaUtil.convert(fileType(partitionType), "data_file"); - this.partitionWrapper = new IndexedStructLike(avroSchema.getField("partition").schema()); + DataFileWrapper() { + this.size = fileType(Types.StructType.of()).fields().size(); } @SuppressWarnings("unchecked") - IndexedDataFile wrap(ContentFile file) { + DataFileWrapper wrap(ContentFile file) { this.wrapped = (ContentFile) file; return this; } @Override - public org.apache.avro.Schema getSchema() { - return avroSchema; + public int size() { + return size; } @Override - public Object get(int pos) { + public void set(int pos, T value) { + throw new UnsupportedOperationException("Cannot modify DataFileWrapper wrapper via set"); + } + + @Override + public T get(int pos, Class javaClass) { + return javaClass.cast(get(pos)); + } + + private Object get(int pos) { switch (pos) { case 0: return wrapped.content().id(); @@ -426,7 +436,7 @@ public Object get(int pos) { case 2: return wrapped.format() != null ? wrapped.format().toString() : null; case 3: - return partitionWrapper.wrap(wrapped.partition()); + return wrapped.partition(); case 4: return wrapped.recordCount(); case 5: @@ -473,11 +483,6 @@ public Object get(int pos) { throw new IllegalArgumentException("Unknown field ordinal: " + pos); } - @Override - public void put(int i, Object v) { - throw new UnsupportedOperationException("Cannot modify IndexedDataFile wrapper via put"); - } - @Override public String manifestLocation() { return null; From 3835e04835a0a87e3738b198ebbaf0ac4bb16534 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 22 Jan 2025 16:40:57 -0800 Subject: [PATCH 03/12] Implement Parquet registration for InternalData. --- .../org/apache/iceberg/parquet/Parquet.java | 32 +++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 310435209bac..04ca9b15aca3 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -62,6 +62,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; +import org.apache.iceberg.InternalData; import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -125,6 +126,19 @@ public class Parquet { private Parquet() {} + public void register() { + InternalData.register(FileFormat.PARQUET, Parquet::writeInternal, Parquet::readInternal); + } + + private static WriteBuilder writeInternal(OutputFile outputFile) { + return write(outputFile); + } + + private static ReadBuilder readInternal(InputFile inputFile) { + return read(inputFile); + } + + private static final Collection READ_PROPERTIES_TO_REMOVE = Sets.newHashSet( "parquet.read.filter", @@ -133,6 +147,10 @@ private Parquet() {} "parquet.crypto.factory.class"); public static WriteBuilder write(OutputFile file) { + if (file instanceof EncryptedOutputFile) { + return write((EncryptedOutputFile) file); + } + return new WriteBuilder(file); } @@ -147,7 +165,7 @@ public static WriteBuilder write(EncryptedOutputFile file) { } } - public static class WriteBuilder { + public static class WriteBuilder implements InternalData.WriteBuilder { private final OutputFile file; private final Configuration conf; private final Map metadata = Maps.newLinkedHashMap(); @@ -1056,7 +1074,7 @@ public static ReadBuilder read(InputFile file) { } } - public static class ReadBuilder { + public static class ReadBuilder implements InternalData.ReadBuilder { private final InputFile file; private final Map properties = Maps.newHashMap(); private Long start = null; @@ -1171,6 +1189,16 @@ public ReadBuilder withNameMapping(NameMapping newNameMapping) { return this; } + @Override + public ReadBuilder setRootType(Class rootClass) { + throw new UnsupportedOperationException("Custom types are not yet supported"); + } + + @Override + public ReadBuilder setCustomType(int fieldId, Class structClass) { + throw new UnsupportedOperationException("Custom types are not yet supported"); + } + public ReadBuilder withFileEncryptionKey(ByteBuffer encryptionKey) { this.fileEncryptionKey = encryptionKey; return this; From 183cd19d67ef12ce0b03396dd236f76bc77b4119 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 22 Jan 2025 16:50:32 -0800 Subject: [PATCH 04/12] Apply spotless. --- .../java/org/apache/iceberg/InternalData.java | 31 +++++++++---------- .../java/org/apache/iceberg/avro/Avro.java | 3 +- .../iceberg/avro/SupportsCustomTypes.java | 31 +++++++++---------- .../org/apache/iceberg/parquet/Parquet.java | 1 - 4 files changed, 30 insertions(+), 36 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/InternalData.java b/core/src/main/java/org/apache/iceberg/InternalData.java index 586853371f13..8f9aaa03bb96 100644 --- a/core/src/main/java/org/apache/iceberg/InternalData.java +++ b/core/src/main/java/org/apache/iceberg/InternalData.java @@ -1,24 +1,21 @@ /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * * Licensed to the Apache Software Foundation (ASF) under one - * * or more contributor license agreements. See the NOTICE file - * * distributed with this work for additional information - * * regarding copyright ownership. The ASF licenses this file - * * to you under the Apache License, Version 2.0 (the - * * "License"); you may not use this file except in compliance - * * with the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, - * * software distributed under the License is distributed on an - * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * * KIND, either express or implied. See the License for the - * * specific language governing permissions and limitations - * * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ - package org.apache.iceberg; import java.io.IOException; diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index 4588ec3f80df..5907028fb68a 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -727,7 +727,8 @@ public InternalData.ReadBuilder setRootType(Class rootClas } @Override - public InternalData.ReadBuilder setCustomType(int fieldId, Class structClass) { + public InternalData.ReadBuilder setCustomType( + int fieldId, Class structClass) { typeMap.put(fieldId, structClass); return this; } diff --git a/core/src/main/java/org/apache/iceberg/avro/SupportsCustomTypes.java b/core/src/main/java/org/apache/iceberg/avro/SupportsCustomTypes.java index e7e9a060eb2f..1fd1d5dada6e 100644 --- a/core/src/main/java/org/apache/iceberg/avro/SupportsCustomTypes.java +++ b/core/src/main/java/org/apache/iceberg/avro/SupportsCustomTypes.java @@ -1,24 +1,21 @@ /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * * Licensed to the Apache Software Foundation (ASF) under one - * * or more contributor license agreements. See the NOTICE file - * * distributed with this work for additional information - * * regarding copyright ownership. The ASF licenses this file - * * to you under the Apache License, Version 2.0 (the - * * "License"); you may not use this file except in compliance - * * with the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, - * * software distributed under the License is distributed on an - * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * * KIND, either express or implied. See the License for the - * * specific language governing permissions and limitations - * * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ - package org.apache.iceberg.avro; import java.util.Map; diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 04ca9b15aca3..7a975756cbac 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -138,7 +138,6 @@ private static ReadBuilder readInternal(InputFile inputFile) { return read(inputFile); } - private static final Collection READ_PROPERTIES_TO_REMOVE = Sets.newHashSet( "parquet.read.filter", From 3373d06e53a7e7b681e493b46f15cac3c5064581 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 22 Jan 2025 17:04:28 -0800 Subject: [PATCH 05/12] Make Parquet.register() static. --- parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 7a975756cbac..c2570a3d8c30 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -126,7 +126,7 @@ public class Parquet { private Parquet() {} - public void register() { + public static void register() { InternalData.register(FileFormat.PARQUET, Parquet::writeInternal, Parquet::readInternal); } From 4127a4dc9b091959d70ee531c4256f92d780f1d8 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Mon, 27 Jan 2025 09:42:17 -0800 Subject: [PATCH 06/12] Parquet: Use InternalReader and InternalWriter for InternalData. --- .../org/apache/iceberg/parquet/Parquet.java | 38 ++++++++++++++++--- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index c2570a3d8c30..8e59e21c4c0a 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -56,6 +56,7 @@ import java.util.Map; import java.util.Objects; import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -73,6 +74,8 @@ import org.apache.iceberg.Table; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.data.parquet.InternalReader; +import org.apache.iceberg.data.parquet.InternalWriter; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; @@ -131,11 +134,11 @@ public static void register() { } private static WriteBuilder writeInternal(OutputFile outputFile) { - return write(outputFile); + return write(outputFile).createWriterFunc(InternalWriter::create); } private static ReadBuilder readInternal(InputFile inputFile) { - return read(inputFile); + return read(inputFile).createReaderFunc(InternalReader::create); } private static final Collection READ_PROPERTIES_TO_REMOVE = @@ -1083,6 +1086,7 @@ public static class ReadBuilder implements InternalData.ReadBuilder { private ReadSupport readSupport = null; private Function> batchedReaderFunc = null; private Function> readerFunc = null; + private BiFunction> readerFuncWithSchema = null; private boolean filterRecords = true; private boolean caseSensitive = true; private boolean callInit = false; @@ -1146,15 +1150,33 @@ public ReadBuilder createReaderFunc( Function> newReaderFunction) { Preconditions.checkArgument( this.batchedReaderFunc == null, - "Reader function cannot be set since the batched version is already set"); + "Cannot set reader function: batched reader function already set"); + Preconditions.checkArgument( + this.readerFuncWithSchema == null, + "Cannot set reader function: 2-argument reader function already set"); this.readerFunc = newReaderFunction; return this; } + private ReadBuilder createReaderFunc( + BiFunction> newReaderFunction) { + Preconditions.checkArgument( + this.readerFunc == null, + "Cannot set 2-argument reader function: reader function already set"); + Preconditions.checkArgument( + this.batchedReaderFunc == null, + "Cannot set 2-argument reader function: batched reader function already set"); + this.readerFuncWithSchema = newReaderFunction; + return this; + } + public ReadBuilder createBatchedReaderFunc(Function> func) { Preconditions.checkArgument( this.readerFunc == null, - "Batched reader function cannot be set since the non-batched version is already set"); + "Cannot set batched reader function: reader function already set"); + Preconditions.checkArgument( + this.readerFuncWithSchema == null, + "Cannot set batched reader function: 2-argument reader function already set"); this.batchedReaderFunc = func; return this; } @@ -1223,7 +1245,7 @@ public CloseableIterable build() { Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with null encryption key"); } - if (readerFunc != null || batchedReaderFunc != null) { + if (readerFunc != null || readerFuncWithSchema != null || batchedReaderFunc != null) { ParquetReadOptions.Builder optionsBuilder; if (file instanceof HadoopInputFile) { // remove read properties already set that may conflict with this read @@ -1271,8 +1293,12 @@ public CloseableIterable build() { caseSensitive, maxRecordsPerBatch); } else { + Function> readBuilder = + readerFuncWithSchema != null + ? (fileType) -> readerFuncWithSchema.apply(schema, fileType) + : readerFunc; return new org.apache.iceberg.parquet.ParquetReader<>( - file, schema, options, readerFunc, mapping, filter, reuseContainers, caseSensitive); + file, schema, options, readBuilder, mapping, filter, reuseContainers, caseSensitive); } } From acb5778b22682a68fe0f9accd1effc9cb3e4cd8e Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Mon, 27 Jan 2025 14:39:13 -0800 Subject: [PATCH 07/12] Update registration to package-private. --- .../java/org/apache/iceberg/InternalData.java | 10 +++-- .../java/org/apache/iceberg/avro/Avro.java | 12 ------ .../org/apache/iceberg/InternalParquet.java | 43 +++++++++++++++++++ .../org/apache/iceberg/parquet/Parquet.java | 16 +------ 4 files changed, 51 insertions(+), 30 deletions(-) create mode 100644 parquet/src/main/java/org/apache/iceberg/InternalParquet.java diff --git a/core/src/main/java/org/apache/iceberg/InternalData.java b/core/src/main/java/org/apache/iceberg/InternalData.java index 8f9aaa03bb96..f07b0f3d85e1 100644 --- a/core/src/main/java/org/apache/iceberg/InternalData.java +++ b/core/src/main/java/org/apache/iceberg/InternalData.java @@ -22,6 +22,8 @@ import java.util.Map; import java.util.function.Function; import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.avro.InternalReader; +import org.apache.iceberg.avro.InternalWriter; import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; @@ -41,12 +43,14 @@ private InternalData() {} Maps.newConcurrentMap(); static { - Avro.register(); + InternalData.register(FileFormat.AVRO, + outputFile -> Avro.write(outputFile).createWriterFunc(InternalWriter::create), + inputFile -> Avro.read(inputFile).createResolvingReader(InternalReader::create)); try { DynMethods.StaticMethod registerParquet = DynMethods.builder("register") - .impl("org.apache.iceberg.parquet.Parquet") + .impl("org.apache.iceberg.InternalParquet") .buildStaticChecked(); registerParquet.invoke(); @@ -57,7 +61,7 @@ private InternalData() {} } } - public static void register( + static void register( FileFormat format, Function writeBuilder, Function readBuilder) { diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index 5907028fb68a..b1814d707f9e 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -72,18 +72,6 @@ public class Avro { private Avro() {} - public static void register() { - InternalData.register(FileFormat.AVRO, Avro::writeInternal, Avro::readInternal); - } - - private static WriteBuilder writeInternal(OutputFile outputFile) { - return write(outputFile).createWriterFunc(InternalWriter::create); - } - - private static ReadBuilder readInternal(InputFile inputFile) { - return read(inputFile).createResolvingReader(InternalReader::create); - } - private enum Codec { UNCOMPRESSED, SNAPPY, diff --git a/parquet/src/main/java/org/apache/iceberg/InternalParquet.java b/parquet/src/main/java/org/apache/iceberg/InternalParquet.java new file mode 100644 index 000000000000..3e162c01186a --- /dev/null +++ b/parquet/src/main/java/org/apache/iceberg/InternalParquet.java @@ -0,0 +1,43 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, + * * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * * KIND, either express or implied. See the License for the + * * specific language governing permissions and limitations + * * under the License. + * + */ + +package org.apache.iceberg; + +import org.apache.iceberg.data.parquet.InternalReader; +import org.apache.iceberg.data.parquet.InternalWriter; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; + +public class InternalParquet { + public static void register() { + InternalData.register( + FileFormat.PARQUET, InternalParquet::writeInternal, InternalParquet::readInternal); + } + + private static Parquet.WriteBuilder writeInternal(OutputFile outputFile) { + return Parquet.write(outputFile).createWriterFunc(InternalWriter::create); + } + + private static Parquet.ReadBuilder readInternal(InputFile inputFile) { + return Parquet.read(inputFile).createReaderFunc(InternalReader::create); + } +} diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 8e59e21c4c0a..d8de89de3cc5 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -74,8 +74,6 @@ import org.apache.iceberg.Table; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.data.parquet.GenericParquetWriter; -import org.apache.iceberg.data.parquet.InternalReader; -import org.apache.iceberg.data.parquet.InternalWriter; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; @@ -129,18 +127,6 @@ public class Parquet { private Parquet() {} - public static void register() { - InternalData.register(FileFormat.PARQUET, Parquet::writeInternal, Parquet::readInternal); - } - - private static WriteBuilder writeInternal(OutputFile outputFile) { - return write(outputFile).createWriterFunc(InternalWriter::create); - } - - private static ReadBuilder readInternal(InputFile inputFile) { - return read(inputFile).createReaderFunc(InternalReader::create); - } - private static final Collection READ_PROPERTIES_TO_REMOVE = Sets.newHashSet( "parquet.read.filter", @@ -1158,7 +1144,7 @@ public ReadBuilder createReaderFunc( return this; } - private ReadBuilder createReaderFunc( + public ReadBuilder createReaderFunc( BiFunction> newReaderFunction) { Preconditions.checkArgument( this.readerFunc == null, From 799914b02dcf81618788bdecbc9ccb1ab4f29151 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Thu, 6 Feb 2025 16:39:31 -0800 Subject: [PATCH 08/12] Apply spotless. --- .../java/org/apache/iceberg/InternalData.java | 3 +- .../org/apache/iceberg/InternalParquet.java | 31 +++++++++---------- 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/InternalData.java b/core/src/main/java/org/apache/iceberg/InternalData.java index f07b0f3d85e1..fd2615e4d4db 100644 --- a/core/src/main/java/org/apache/iceberg/InternalData.java +++ b/core/src/main/java/org/apache/iceberg/InternalData.java @@ -43,7 +43,8 @@ private InternalData() {} Maps.newConcurrentMap(); static { - InternalData.register(FileFormat.AVRO, + InternalData.register( + FileFormat.AVRO, outputFile -> Avro.write(outputFile).createWriterFunc(InternalWriter::create), inputFile -> Avro.read(inputFile).createResolvingReader(InternalReader::create)); diff --git a/parquet/src/main/java/org/apache/iceberg/InternalParquet.java b/parquet/src/main/java/org/apache/iceberg/InternalParquet.java index 3e162c01186a..76752eefdcd3 100644 --- a/parquet/src/main/java/org/apache/iceberg/InternalParquet.java +++ b/parquet/src/main/java/org/apache/iceberg/InternalParquet.java @@ -1,24 +1,21 @@ /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * * Licensed to the Apache Software Foundation (ASF) under one - * * or more contributor license agreements. See the NOTICE file - * * distributed with this work for additional information - * * regarding copyright ownership. The ASF licenses this file - * * to you under the Apache License, Version 2.0 (the - * * "License"); you may not use this file except in compliance - * * with the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, - * * software distributed under the License is distributed on an - * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * * KIND, either express or implied. See the License for the - * * specific language governing permissions and limitations - * * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ - package org.apache.iceberg; import org.apache.iceberg.data.parquet.InternalReader; From f432b2beb551775e52e3fd9740aa4bec3c32909f Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Tue, 11 Feb 2025 13:29:40 -0800 Subject: [PATCH 09/12] Add missing @Override annotations. --- core/src/main/java/org/apache/iceberg/avro/Avro.java | 11 +++++++++++ .../main/java/org/apache/iceberg/parquet/Parquet.java | 10 ++++++++++ 2 files changed, 21 insertions(+) diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index b1814d707f9e..557a20daf303 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -124,11 +124,13 @@ public WriteBuilder forTable(Table table) { return this; } + @Override public WriteBuilder schema(org.apache.iceberg.Schema newSchema) { this.schema = newSchema; return this; } + @Override public WriteBuilder named(String newName) { this.name = newName; return this; @@ -139,6 +141,7 @@ public WriteBuilder createWriterFunc(Function> writerFunc return this; } + @Override public WriteBuilder set(String property, String value) { config.put(property, value); return this; @@ -149,11 +152,13 @@ public WriteBuilder setAll(Map properties) { return this; } + @Override public WriteBuilder meta(String property, String value) { metadata.put(property, value); return this; } + @Override public WriteBuilder meta(Map properties) { metadata.putAll(properties); return this; @@ -164,6 +169,7 @@ public WriteBuilder metricsConfig(MetricsConfig newMetricsConfig) { return this; } + @Override public WriteBuilder overwrite() { return overwrite(true); } @@ -180,6 +186,7 @@ private WriteBuilder createContextFunc( return this; } + @Override public FileAppender build() throws IOException { Preconditions.checkNotNull(schema, "Schema is required"); Preconditions.checkNotNull(name, "Table name is required and cannot be null"); @@ -682,17 +689,20 @@ public ReadBuilder createReaderFunc( * @param newLength the length of the range this read should scan * @return this builder for method chaining */ + @Override public ReadBuilder split(long newStart, long newLength) { this.start = newStart; this.length = newLength; return this; } + @Override public ReadBuilder project(org.apache.iceberg.Schema projectedSchema) { this.schema = projectedSchema; return this; } + @Override public ReadBuilder reuseContainers() { this.reuseContainers = true; return this; @@ -731,6 +741,7 @@ public ReadBuilder classLoader(ClassLoader classLoader) { return this; } + @Override @SuppressWarnings("unchecked") public AvroIterable build() { Preconditions.checkNotNull(schema, "Schema is required"); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index d8de89de3cc5..7677a53de0c1 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -185,11 +185,13 @@ public WriteBuilder forTable(Table table) { return this; } + @Override public WriteBuilder schema(Schema newSchema) { this.schema = newSchema; return this; } + @Override public WriteBuilder named(String newName) { this.name = newName; return this; @@ -200,6 +202,7 @@ public WriteBuilder writeSupport(WriteSupport newWriteSupport) { return this; } + @Override public WriteBuilder set(String property, String value) { config.put(property, value); return this; @@ -210,6 +213,7 @@ public WriteBuilder setAll(Map properties) { return this; } + @Override public WriteBuilder meta(String property, String value) { metadata.put(property, value); return this; @@ -226,6 +230,7 @@ public WriteBuilder metricsConfig(MetricsConfig newMetricsConfig) { return this; } + @Override public WriteBuilder overwrite() { return overwrite(true); } @@ -315,6 +320,7 @@ private void setBloomFilterConfig( }); } + @Override public FileAppender build() throws IOException { Preconditions.checkNotNull(schema, "Schema is required"); Preconditions.checkNotNull(name, "Table name is required and cannot be null"); @@ -1093,12 +1099,14 @@ private ReadBuilder(InputFile file) { * @param newLength the length of the range this read should scan * @return this builder for method chaining */ + @Override public ReadBuilder split(long newStart, long newLength) { this.start = newStart; this.length = newLength; return this; } + @Override public ReadBuilder project(Schema newSchema) { this.schema = newSchema; return this; @@ -1181,6 +1189,7 @@ public ReadBuilder callInit() { return this; } + @Override public ReadBuilder reuseContainers() { this.reuseContainers = true; return this; @@ -1216,6 +1225,7 @@ public ReadBuilder withAADPrefix(ByteBuffer aadPrefix) { return this; } + @Override @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"}) public CloseableIterable build() { FileDecryptionProperties fileDecryptionProperties = null; From c3b804524f15430c0a59737682ea178e68dfadd4 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Tue, 11 Feb 2025 13:36:42 -0800 Subject: [PATCH 10/12] Fix errorprone issues. --- .../java/org/apache/iceberg/InternalData.java | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/InternalData.java b/core/src/main/java/org/apache/iceberg/InternalData.java index fd2615e4d4db..fa39d23e43fe 100644 --- a/core/src/main/java/org/apache/iceberg/InternalData.java +++ b/core/src/main/java/org/apache/iceberg/InternalData.java @@ -42,7 +42,16 @@ private InternalData() {} private static final Map> READ_BUILDERS = Maps.newConcurrentMap(); - static { + static void register( + FileFormat format, + Function writeBuilder, + Function readBuilder) { + WRITE_BUILDERS.put(format, writeBuilder); + READ_BUILDERS.put(format, readBuilder); + } + + @SuppressWarnings("CatchBlockLogException") + private static void registerSupportedFormats() { InternalData.register( FileFormat.AVRO, outputFile -> Avro.write(outputFile).createWriterFunc(InternalWriter::create), @@ -62,12 +71,8 @@ private InternalData() {} } } - static void register( - FileFormat format, - Function writeBuilder, - Function readBuilder) { - WRITE_BUILDERS.put(format, writeBuilder); - READ_BUILDERS.put(format, readBuilder); + static { + registerSupportedFormats(); } public static WriteBuilder write(FileFormat format, OutputFile file) { @@ -147,7 +152,7 @@ public interface ReadBuilder { ReadBuilder project(Schema projectedSchema); /** Read only the split that is {@code length} bytes starting at {@code start}. */ - ReadBuilder split(long start, long length); + ReadBuilder split(long newStart, long newLength); /** Reuse container classes, like structs, lists, and maps. */ ReadBuilder reuseContainers(); From 0c25b3851e8530ccb3f84a38333b6b0aa685124f Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Tue, 11 Feb 2025 13:44:08 -0800 Subject: [PATCH 11/12] Core: Update v3 data manifests writer to use InternalData. --- core/src/main/java/org/apache/iceberg/ManifestWriter.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index 496fa8d38e83..cda20c759352 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.nio.ByteBuffer; -import org.apache.iceberg.avro.Avro; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.FileAppender; @@ -236,7 +235,7 @@ protected FileAppender> newAppender( PartitionSpec spec, OutputFile file) { Schema manifestSchema = V3Metadata.entrySchema(spec.partitionType()); try { - return Avro.write(file) + return InternalData.write(FileFormat.AVRO, file) .schema(manifestSchema) .named("manifest_entry") .meta("schema", SchemaParser.toJson(spec.schema())) From 952e89bb77113d311921525b63f7b43f999bd1ac Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Tue, 11 Feb 2025 13:46:46 -0800 Subject: [PATCH 12/12] Parquet: Add missing private constructor for util class. --- parquet/src/main/java/org/apache/iceberg/InternalParquet.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/parquet/src/main/java/org/apache/iceberg/InternalParquet.java b/parquet/src/main/java/org/apache/iceberg/InternalParquet.java index 76752eefdcd3..701245332f80 100644 --- a/parquet/src/main/java/org/apache/iceberg/InternalParquet.java +++ b/parquet/src/main/java/org/apache/iceberg/InternalParquet.java @@ -25,6 +25,8 @@ import org.apache.iceberg.parquet.Parquet; public class InternalParquet { + private InternalParquet() {} + public static void register() { InternalData.register( FileFormat.PARQUET, InternalParquet::writeInternal, InternalParquet::readInternal);