Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Add InternalData read and write builders #12060

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 164 additions & 0 deletions core/src/main/java/org/apache/iceberg/InternalData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.io.IOException;
import java.util.Map;
import java.util.function.Function;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.InternalReader;
import org.apache.iceberg.avro.InternalWriter;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InternalData {
private InternalData() {}

private static final Logger LOG = LoggerFactory.getLogger(InternalData.class);
private static final Map<FileFormat, Function<OutputFile, WriteBuilder>> WRITE_BUILDERS =
Maps.newConcurrentMap();
private static final Map<FileFormat, Function<InputFile, ReadBuilder>> READ_BUILDERS =
Maps.newConcurrentMap();

static {
InternalData.register(
FileFormat.AVRO,
outputFile -> Avro.write(outputFile).createWriterFunc(InternalWriter::create),
inputFile -> Avro.read(inputFile).createResolvingReader(InternalReader::create));

try {
DynMethods.StaticMethod registerParquet =
DynMethods.builder("register")
.impl("org.apache.iceberg.InternalParquet")
.buildStaticChecked();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This uses DynMethods to call Parquet's register method directly, rather than using a ServiceLoader. There is no need for the complexity because we want to keep the number of supported formats small rather than plugging in custom formats.

I'm also considering refactoring so that the register method here is package-private so that no one can easily call it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand this one, why can we call Avro Register() but not Parquet.register(). I'm also not clear on the Service Loader comment, is that just to note we don't want to make this dynamic and only want hardcoded formats to be supported?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is due to the gradle project level isolation. Avro is currently included in core, but Parquet is in a separate subproject. I'm in favor of being explicit about what is supported (i.e. hard-coded), but we would like to keep parquet in a separate project to reduce dependency proliferation from api/core.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using the Java ServiceLoader to load the internal readers and writers?

I have created a WIP for testing out how the DataFile readers could work: #12069

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fist I implemented using the registry method like on this PR (7e171cc), then moved to the ServiceLoader method.

In my head the 2 problems are very similar

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ServiceLoader framework is error prone and commonly broken because of Jar bundling. In addition, we do not want anything else registered so it is not needed. That would make it easier to plug in here, which we specifically are trying to avoid.


registerParquet.invoke();

} catch (NoSuchMethodException e) {
// failing to load Parquet is normal and does not require a stack trace
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if normal for now? Don't we expect this to be failing bug in the future? I'm also a little interested in when we would actually fail here if we are using the Iceberg repo as is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't say that it's normal to fail. I'm actually not aware of any situations where the api/core modules are used but parquet isn't included. I think in almost all scenarios, it'll be available.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be normal whenever the iceberg-parquet module isn't in the classpath. For instance, the manifest read and write tests that are currently using InternalData in this PR hit this but operate normally because Parquet isn't used.

LOG.info("Unable to register Parquet for metadata files: {}", e.getMessage());
}
}

static void register(
FileFormat format,
Function<OutputFile, WriteBuilder> writeBuilder,
Function<InputFile, ReadBuilder> readBuilder) {
WRITE_BUILDERS.put(format, writeBuilder);
READ_BUILDERS.put(format, readBuilder);
}

public static WriteBuilder write(FileFormat format, OutputFile file) {
Function<OutputFile, WriteBuilder> writeBuilder = WRITE_BUILDERS.get(format);
if (writeBuilder != null) {
return writeBuilder.apply(file);
}

throw new UnsupportedOperationException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Personally I think it may be a bit clearer to extract the handling the missing writer/reader

maybe have

writerFor(File format) {
  writer = WRITE_BUILDERS.get(format)
  if (writer == null) {
    throw new Unsupported Exception
  } else {
    return writer;
  }
}

So that this code is just

return writerFor(format).apply(file)

Mostly I feel a little unease about the implicit else in the current logic so having an else would also make me feel a little better

"Cannot write using unregistered internal data format: " + format);
}

public static ReadBuilder read(FileFormat format, InputFile file) {
Function<InputFile, ReadBuilder> readBuilder = READ_BUILDERS.get(format);
if (readBuilder != null) {
return readBuilder.apply(file);
}

throw new UnsupportedOperationException(
"Cannot read using unregistered internal data format: " + format);
}

public interface WriteBuilder {
/** Set the file schema. */
WriteBuilder schema(Schema schema);

/** Set the file schema's root name. */
WriteBuilder named(String name);

/**
* Set a writer configuration property.
*
* <p>Write configuration affects writer behavior. To add file metadata properties, use {@link
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* <p>Write configuration affects writer behavior. To add file metadata properties, use {@link
* <p>Write configuration affects this writer's behavior. To add metadata properties to the written file use {@link

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"This" doesn't refer to a writer. This is configuring a builder that creates the writer, so I think that the existing language is correct.

* #meta(String, String)}.
*
* @param property a writer config property name
* @param value config value
* @return this for method chaining
*/
WriteBuilder set(String property, String value);

/**
* Set a file metadata property.
*
* <p>Metadata properties are written into file metadata. To alter a writer configuration
* property, use {@link #set(String, String)}.
*
* @param property a file metadata property name
* @param value config value
* @return this for method chaining
*/
WriteBuilder meta(String property, String value);

/**
* Set a file metadata properties from a Map.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Set a file metadata properties from a Map.
* Set file metadata properties from a Map.

*
* <p>Metadata properties are written into file metadata. To alter a writer configuration
* property, use {@link #set(String, String)}.
*
* @param properties a map of file metadata properties
* @return this for method chaining
*/
default WriteBuilder meta(Map<String, String> properties) {
properties.forEach(this::meta);
return this;
}

/** Overwrite the file if it already exists. */
WriteBuilder overwrite();

/** Build the configured {@link FileAppender}. */
<D> FileAppender<D> build() throws IOException;
}

public interface ReadBuilder {
/** Set the projection schema. */
ReadBuilder project(Schema projectedSchema);

/** Read only the split that is {@code length} bytes starting at {@code start}. */
ReadBuilder split(long start, long length);

/** Reuse container classes, like structs, lists, and maps. */
ReadBuilder reuseContainers();

/** Set a custom class for in-memory objects at the schema root. */
ReadBuilder setRootType(Class<? extends StructLike> rootClass);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll probably get to this later in the PR but i'm interested in why we need this and setCustomType

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I see how it's used below, I'm wondering if instead of needing this, could we just automatically set these readers based on the root type? Ie

setRootType(ManifestEntry) --- Automatically sets field types based on Manifest entry?

Or do we have a plan for using this in a more custom manner in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem this is solving is that we don't have an assigned ID for the root type. We could use a sentinel value like -1, but that could technically collide. I just don't want to rely on setCustomType(ROOT_FIELD_ID, SomeObject.class).


/** Set a custom class for in-memory objects at the given field ID. */
ReadBuilder setCustomType(int fieldId, Class<? extends StructLike> structClass);

/** Build the configured reader. */
<D> CloseableIterable<D> build();
}
}
17 changes: 8 additions & 9 deletions core/src/main/java/org/apache/iceberg/ManifestListWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
Expand Down Expand Up @@ -71,7 +70,7 @@ public long length() {
}

static class V3Writer extends ManifestListWriter {
private final V3Metadata.IndexedManifestFile wrapper;
private final V3Metadata.ManifestFileWrapper wrapper;

V3Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId, long sequenceNumber) {
super(
Expand All @@ -81,7 +80,7 @@ static class V3Writer extends ManifestListWriter {
"parent-snapshot-id", String.valueOf(parentSnapshotId),
"sequence-number", String.valueOf(sequenceNumber),
"format-version", "3"));
this.wrapper = new V3Metadata.IndexedManifestFile(snapshotId, sequenceNumber);
this.wrapper = new V3Metadata.ManifestFileWrapper(snapshotId, sequenceNumber);
}

@Override
Expand All @@ -92,7 +91,7 @@ protected ManifestFile prepare(ManifestFile manifest) {
@Override
protected FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, String> meta) {
try {
return Avro.write(file)
return InternalData.write(FileFormat.AVRO, file)
.schema(V3Metadata.MANIFEST_LIST_SCHEMA)
.named("manifest_file")
.meta(meta)
Expand All @@ -106,7 +105,7 @@ protected FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, St
}

static class V2Writer extends ManifestListWriter {
private final V2Metadata.IndexedManifestFile wrapper;
private final V2Metadata.ManifestFileWrapper wrapper;

V2Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId, long sequenceNumber) {
super(
Expand All @@ -116,7 +115,7 @@ static class V2Writer extends ManifestListWriter {
"parent-snapshot-id", String.valueOf(parentSnapshotId),
"sequence-number", String.valueOf(sequenceNumber),
"format-version", "2"));
this.wrapper = new V2Metadata.IndexedManifestFile(snapshotId, sequenceNumber);
this.wrapper = new V2Metadata.ManifestFileWrapper(snapshotId, sequenceNumber);
}

@Override
Expand All @@ -127,7 +126,7 @@ protected ManifestFile prepare(ManifestFile manifest) {
@Override
protected FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, String> meta) {
try {
return Avro.write(file)
return InternalData.write(FileFormat.AVRO, file)
.schema(V2Metadata.MANIFEST_LIST_SCHEMA)
.named("manifest_file")
.meta(meta)
Expand All @@ -141,7 +140,7 @@ protected FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, St
}

static class V1Writer extends ManifestListWriter {
private final V1Metadata.IndexedManifestFile wrapper = new V1Metadata.IndexedManifestFile();
private final V1Metadata.ManifestFileWrapper wrapper = new V1Metadata.ManifestFileWrapper();

V1Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId) {
super(
Expand All @@ -163,7 +162,7 @@ protected ManifestFile prepare(ManifestFile manifest) {
@Override
protected FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, String> meta) {
try {
return Avro.write(file)
return InternalData.write(FileFormat.AVRO, file)
.schema(V1Metadata.MANIFEST_LIST_SCHEMA)
.named("manifest_file")
.meta(meta)
Expand Down
33 changes: 10 additions & 23 deletions core/src/main/java/org/apache/iceberg/ManifestReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.avro.io.DatumReader;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroIterable;
import org.apache.iceberg.avro.InternalReader;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
Expand Down Expand Up @@ -258,29 +256,18 @@ private CloseableIterable<ManifestEntry<F>> open(Schema projection) {
fields.addAll(projection.asStruct().fields());
fields.add(MetadataColumns.ROW_POSITION);

switch (format) {
case AVRO:
AvroIterable<ManifestEntry<F>> reader =
Avro.read(file)
.project(ManifestEntry.wrapFileSchema(Types.StructType.of(fields)))
.createResolvingReader(this::newReader)
.reuseContainers()
.build();
CloseableIterable<ManifestEntry<F>> reader =
InternalData.read(format, file)
.project(ManifestEntry.wrapFileSchema(Types.StructType.of(fields)))
.setRootType(GenericManifestEntry.class)
.setCustomType(ManifestEntry.DATA_FILE_ID, content.fileClass())
.setCustomType(DataFile.PARTITION_ID, PartitionData.class)
.reuseContainers()
.build();

addCloseable(reader);
addCloseable(reader);

return CloseableIterable.transform(reader, inheritableMetadata::apply);

default:
throw new UnsupportedOperationException("Invalid format for manifest file: " + format);
}
}

private DatumReader<?> newReader(Schema schema) {
return InternalReader.create(schema)
.setRootType(GenericManifestEntry.class)
.setCustomType(ManifestEntry.DATA_FILE_ID, content.fileClass())
.setCustomType(DataFile.PARTITION_ID, PartitionData.class);
return CloseableIterable.transform(reader, inheritableMetadata::apply);
}

CloseableIterable<ManifestEntry<F>> liveEntries() {
Expand Down
Loading
Loading