Skip to content

Commit

Permalink
Core: Add InternalData read and write builders (#12060)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdblue authored Feb 13, 2025
1 parent 6f341ab commit b8fdd84
Show file tree
Hide file tree
Showing 13 changed files with 520 additions and 188 deletions.
169 changes: 169 additions & 0 deletions core/src/main/java/org/apache/iceberg/InternalData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.io.IOException;
import java.util.Map;
import java.util.function.Function;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.InternalReader;
import org.apache.iceberg.avro.InternalWriter;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InternalData {
private InternalData() {}

private static final Logger LOG = LoggerFactory.getLogger(InternalData.class);
private static final Map<FileFormat, Function<OutputFile, WriteBuilder>> WRITE_BUILDERS =
Maps.newConcurrentMap();
private static final Map<FileFormat, Function<InputFile, ReadBuilder>> READ_BUILDERS =
Maps.newConcurrentMap();

static void register(
FileFormat format,
Function<OutputFile, WriteBuilder> writeBuilder,
Function<InputFile, ReadBuilder> readBuilder) {
WRITE_BUILDERS.put(format, writeBuilder);
READ_BUILDERS.put(format, readBuilder);
}

@SuppressWarnings("CatchBlockLogException")
private static void registerSupportedFormats() {
InternalData.register(
FileFormat.AVRO,
outputFile -> Avro.write(outputFile).createWriterFunc(InternalWriter::create),
inputFile -> Avro.read(inputFile).createResolvingReader(InternalReader::create));

try {
DynMethods.StaticMethod registerParquet =
DynMethods.builder("register")
.impl("org.apache.iceberg.InternalParquet")
.buildStaticChecked();

registerParquet.invoke();

} catch (NoSuchMethodException e) {
// failing to load Parquet is normal and does not require a stack trace
LOG.info("Unable to register Parquet for metadata files: {}", e.getMessage());
}
}

static {
registerSupportedFormats();
}

public static WriteBuilder write(FileFormat format, OutputFile file) {
Function<OutputFile, WriteBuilder> writeBuilder = WRITE_BUILDERS.get(format);
if (writeBuilder != null) {
return writeBuilder.apply(file);
}

throw new UnsupportedOperationException(
"Cannot write using unregistered internal data format: " + format);
}

public static ReadBuilder read(FileFormat format, InputFile file) {
Function<InputFile, ReadBuilder> readBuilder = READ_BUILDERS.get(format);
if (readBuilder != null) {
return readBuilder.apply(file);
}

throw new UnsupportedOperationException(
"Cannot read using unregistered internal data format: " + format);
}

public interface WriteBuilder {
/** Set the file schema. */
WriteBuilder schema(Schema schema);

/** Set the file schema's root name. */
WriteBuilder named(String name);

/**
* Set a writer configuration property.
*
* <p>Write configuration affects writer behavior. To add file metadata properties, use {@link
* #meta(String, String)}.
*
* @param property a writer config property name
* @param value config value
* @return this for method chaining
*/
WriteBuilder set(String property, String value);

/**
* Set a file metadata property.
*
* <p>Metadata properties are written into file metadata. To alter a writer configuration
* property, use {@link #set(String, String)}.
*
* @param property a file metadata property name
* @param value config value
* @return this for method chaining
*/
WriteBuilder meta(String property, String value);

/**
* Set a file metadata properties from a Map.
*
* <p>Metadata properties are written into file metadata. To alter a writer configuration
* property, use {@link #set(String, String)}.
*
* @param properties a map of file metadata properties
* @return this for method chaining
*/
default WriteBuilder meta(Map<String, String> properties) {
properties.forEach(this::meta);
return this;
}

/** Overwrite the file if it already exists. */
WriteBuilder overwrite();

/** Build the configured {@link FileAppender}. */
<D> FileAppender<D> build() throws IOException;
}

public interface ReadBuilder {
/** Set the projection schema. */
ReadBuilder project(Schema projectedSchema);

/** Read only the split that is {@code length} bytes starting at {@code start}. */
ReadBuilder split(long newStart, long newLength);

/** Reuse container classes, like structs, lists, and maps. */
ReadBuilder reuseContainers();

/** Set a custom class for in-memory objects at the schema root. */
ReadBuilder setRootType(Class<? extends StructLike> rootClass);

/** Set a custom class for in-memory objects at the given field ID. */
ReadBuilder setCustomType(int fieldId, Class<? extends StructLike> structClass);

/** Build the configured reader. */
<D> CloseableIterable<D> build();
}
}
17 changes: 8 additions & 9 deletions core/src/main/java/org/apache/iceberg/ManifestListWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
Expand Down Expand Up @@ -71,7 +70,7 @@ public long length() {
}

static class V3Writer extends ManifestListWriter {
private final V3Metadata.IndexedManifestFile wrapper;
private final V3Metadata.ManifestFileWrapper wrapper;

V3Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId, long sequenceNumber) {
super(
Expand All @@ -81,7 +80,7 @@ static class V3Writer extends ManifestListWriter {
"parent-snapshot-id", String.valueOf(parentSnapshotId),
"sequence-number", String.valueOf(sequenceNumber),
"format-version", "3"));
this.wrapper = new V3Metadata.IndexedManifestFile(snapshotId, sequenceNumber);
this.wrapper = new V3Metadata.ManifestFileWrapper(snapshotId, sequenceNumber);
}

@Override
Expand All @@ -92,7 +91,7 @@ protected ManifestFile prepare(ManifestFile manifest) {
@Override
protected FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, String> meta) {
try {
return Avro.write(file)
return InternalData.write(FileFormat.AVRO, file)
.schema(V3Metadata.MANIFEST_LIST_SCHEMA)
.named("manifest_file")
.meta(meta)
Expand All @@ -106,7 +105,7 @@ protected FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, St
}

static class V2Writer extends ManifestListWriter {
private final V2Metadata.IndexedManifestFile wrapper;
private final V2Metadata.ManifestFileWrapper wrapper;

V2Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId, long sequenceNumber) {
super(
Expand All @@ -116,7 +115,7 @@ static class V2Writer extends ManifestListWriter {
"parent-snapshot-id", String.valueOf(parentSnapshotId),
"sequence-number", String.valueOf(sequenceNumber),
"format-version", "2"));
this.wrapper = new V2Metadata.IndexedManifestFile(snapshotId, sequenceNumber);
this.wrapper = new V2Metadata.ManifestFileWrapper(snapshotId, sequenceNumber);
}

@Override
Expand All @@ -127,7 +126,7 @@ protected ManifestFile prepare(ManifestFile manifest) {
@Override
protected FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, String> meta) {
try {
return Avro.write(file)
return InternalData.write(FileFormat.AVRO, file)
.schema(V2Metadata.MANIFEST_LIST_SCHEMA)
.named("manifest_file")
.meta(meta)
Expand All @@ -141,7 +140,7 @@ protected FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, St
}

static class V1Writer extends ManifestListWriter {
private final V1Metadata.IndexedManifestFile wrapper = new V1Metadata.IndexedManifestFile();
private final V1Metadata.ManifestFileWrapper wrapper = new V1Metadata.ManifestFileWrapper();

V1Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId) {
super(
Expand All @@ -163,7 +162,7 @@ protected ManifestFile prepare(ManifestFile manifest) {
@Override
protected FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, String> meta) {
try {
return Avro.write(file)
return InternalData.write(FileFormat.AVRO, file)
.schema(V1Metadata.MANIFEST_LIST_SCHEMA)
.named("manifest_file")
.meta(meta)
Expand Down
33 changes: 10 additions & 23 deletions core/src/main/java/org/apache/iceberg/ManifestReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.avro.io.DatumReader;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroIterable;
import org.apache.iceberg.avro.InternalReader;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
Expand Down Expand Up @@ -258,29 +256,18 @@ private CloseableIterable<ManifestEntry<F>> open(Schema projection) {
fields.addAll(projection.asStruct().fields());
fields.add(MetadataColumns.ROW_POSITION);

switch (format) {
case AVRO:
AvroIterable<ManifestEntry<F>> reader =
Avro.read(file)
.project(ManifestEntry.wrapFileSchema(Types.StructType.of(fields)))
.createResolvingReader(this::newReader)
.reuseContainers()
.build();
CloseableIterable<ManifestEntry<F>> reader =
InternalData.read(format, file)
.project(ManifestEntry.wrapFileSchema(Types.StructType.of(fields)))
.setRootType(GenericManifestEntry.class)
.setCustomType(ManifestEntry.DATA_FILE_ID, content.fileClass())
.setCustomType(DataFile.PARTITION_ID, PartitionData.class)
.reuseContainers()
.build();

addCloseable(reader);
addCloseable(reader);

return CloseableIterable.transform(reader, inheritableMetadata::apply);

default:
throw new UnsupportedOperationException("Invalid format for manifest file: " + format);
}
}

private DatumReader<?> newReader(Schema schema) {
return InternalReader.create(schema)
.setRootType(GenericManifestEntry.class)
.setCustomType(ManifestEntry.DATA_FILE_ID, content.fileClass())
.setCustomType(DataFile.PARTITION_ID, PartitionData.class);
return CloseableIterable.transform(reader, inheritableMetadata::apply);
}

CloseableIterable<ManifestEntry<F>> liveEntries() {
Expand Down
Loading

0 comments on commit b8fdd84

Please sign in to comment.