Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow: add support for null vectors #10953

Closed
wants to merge 48 commits into from
Closed
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
ac6440a
#10275 - fix NullPointerException
sl255051 May 8, 2024
becf6f7
Change how the unit test asserts the correct exception is thrown
sl255051 May 8, 2024
4e2cb86
Remove test dependency on Apache Spark
sl255051 May 8, 2024
1193d02
Merge branch 'main' into issue-10275
sl255051 May 28, 2024
12bc3de
Add new unit test
sl255051 Jun 11, 2024
d8f3e13
Merge branch 'apache:main' into issue-10275
slessard Jun 11, 2024
bb4e010
Add comments to unit test
sl255051 Jun 12, 2024
6e7a1aa
Merge branch 'issue-10275' of https://github.com/slessard/iceberg int…
sl255051 Jun 12, 2024
28451a5
Update arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowR…
slessard Jun 14, 2024
24a9932
Update arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowR…
slessard Jun 14, 2024
9bcb2b1
Address code review comments
sl255051 Jun 14, 2024
7a25b52
Merge branch 'apache:main' into issue-10275
slessard Jun 14, 2024
a31bf94
Merge branch 'main' into issue-10275
sl255051 Jul 29, 2024
44a7f91
Merge branch 'main' into issue-10275
sl255051 Aug 5, 2024
c2eaf24
Merge branch 'apache:main' into issue-10275
slessard Aug 9, 2024
e323db7
DRAFT: alternate solution 2: hack in support for NullVector
slessard Aug 10, 2024
061ab02
Merge branch 'apache:main' into issue-10275-alt2
slessard Aug 12, 2024
bf0c905
Issue 10275 - Add rough draft vector support for null columns
slessard Aug 13, 2024
5610dd4
Merge branch 'issue-10275-alt2' into issue-10275-alt3
slessard Aug 13, 2024
a13415d
Merge branch 'main' into issue-10275-alt3
slessard Aug 13, 2024
2eaa63f
Merge branch 'main' into issue-10275-alt3
slessard Aug 16, 2024
62108da
remove obsolete comment; adapt unit test to match new functionality
slessard Aug 16, 2024
7115e93
Merge branch 'apache:main' into issue-10275-alt3
slessard Aug 16, 2024
08bb07c
Address code review feedback
slessard Sep 5, 2024
442b381
Add a NullabilityHolder instance to the NullVector instance
slessard Sep 5, 2024
5e7668e
Merge branch 'apache:main' into issue-10275-alt3
slessard Sep 6, 2024
83913a0
Remove test class GenericArrowVectorAccessorFactoryTest
slessard Sep 9, 2024
e2b428e
Fix compile error; format source code
slessard Sep 9, 2024
7ffa7ed
Address code review comments
slessard Sep 11, 2024
cda0423
Adopt changes suggested by @nastra in code review
slessard Sep 17, 2024
9aec9e5
Update unit test to add a second row to the table being tested
slessard Sep 17, 2024
0c87dc7
Code cleanup
slessard Sep 19, 2024
e5eebd0
Undo adding a second row to the table
slessard Sep 20, 2024
fe60793
Expand calls to checkAllVectorTypes and checkAllVectorValues
slessard Sep 20, 2024
1a3896b
replace hard-coded magic values with descriptively named variables
slessard Sep 20, 2024
5c3b460
Add unit tests for VectorHolder
slessard Sep 24, 2024
a2df95c
Update `isDummy` method to remove one condition that would never be r…
slessard Sep 24, 2024
bbc776d
Fix code style issues
slessard Sep 25, 2024
2bf5b2f
Update VectorHolder unit tests for isDummy method
slessard Sep 26, 2024
1edd680
Convert to fluent assertions
slessard Sep 26, 2024
e1b3931
inline variables that are only used once; remove `this.` prefix
slessard Sep 26, 2024
e574623
Merge branch 'main' into issue-10275-alt3
slessard Sep 26, 2024
c8bcc1c
Update arrow/src/main/java/org/apache/iceberg/arrow/vectorized/Vector…
nastra Sep 27, 2024
da9e514
Only create a NullVector when the constant value is null
slessard Sep 27, 2024
fe83726
Merge remote-tracking branch 'origin/issue-10275-alt3' into issue-102…
slessard Sep 27, 2024
01f96f0
Correct the comment in a test method
slessard Sep 30, 2024
f509e47
Add one more unit test
slessard Sep 30, 2024
163ee62
Make style changes as requested in code review feedback
slessard Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) {
"Number of rows in the vector %s didn't match expected %s ",
numRowsInVector,
numRowsToRead);
// Handle null vector for constant case
columnVectors[i] = new ColumnVector(vectorHolders[i]);
}
return new ColumnarBatch(numRowsToRead, columnVectors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.NullVector;
import org.apache.arrow.vector.TimeMicroVector;
import org.apache.arrow.vector.TimeStampMicroTZVector;
import org.apache.arrow.vector.TimeStampMicroVector;
Expand Down Expand Up @@ -220,8 +221,11 @@ private ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> getPlai
}
return new FixedSizeBinaryAccessor<>(
(FixedSizeBinaryVector) vector, stringFactorySupplier.get());
} else if (vector instanceof NullVector) {
return new NullAccessor<>((NullVector) vector);
slessard marked this conversation as resolved.
Show resolved Hide resolved
}
throw new UnsupportedOperationException("Unsupported vector: " + vector.getClass());
String vectorName = (vector == null) ? "null" : vector.getClass().toString();
slessard marked this conversation as resolved.
Show resolved Hide resolved
slessard marked this conversation as resolved.
Show resolved Hide resolved
throw new UnsupportedOperationException("Unsupported vector: " + vectorName);
}

private static boolean isDecimal(PrimitiveType primitive) {
Expand All @@ -244,6 +248,18 @@ public final boolean getBoolean(int rowId) {
}
}

private static class NullAccessor<
DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {

private final NullVector vector;
slessard marked this conversation as resolved.
Show resolved Hide resolved

NullAccessor(NullVector vector) {
super(vector);
this.vector = vector;
slessard marked this conversation as resolved.
Show resolved Hide resolved
}
}

private static class IntAccessor<
DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,20 @@ public boolean isDummy() {
return vector == null;
slessard marked this conversation as resolved.
Show resolved Hide resolved
}

public static class NullVectorHolder extends VectorHolder {
slessard marked this conversation as resolved.
Show resolved Hide resolved
private final int numRows;

public NullVectorHolder(FieldVector vec, Types.NestedField field, int numRows) {
super(vec, field, new NullabilityHolder(numRows));
this.numRows = numRows;
}

@Override
public int numValues() {
return this.numRows;
}
}

/**
* A Vector Holder which does not actually produce values, consumers of this class should use the
* constantValue to populate their ColumnVector implementation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.NullVector;
import org.apache.arrow.vector.TimeMicroVector;
import org.apache.arrow.vector.TimeStampMicroTZVector;
import org.apache.arrow.vector.TimeStampMicroVector;
Expand Down Expand Up @@ -451,10 +452,6 @@ public String toString() {
return columnDescriptor.toString();
}

public static VectorizedArrowReader nulls() {
slessard marked this conversation as resolved.
Show resolved Hide resolved
slessard marked this conversation as resolved.
Show resolved Hide resolved
return NullVectorReader.INSTANCE;
}

public static VectorizedArrowReader positions() {
return new PositionVectorReader(false);
}
Expand All @@ -463,12 +460,16 @@ public static VectorizedArrowReader positionsWithSetArrowValidityVector() {
return new PositionVectorReader(true);
}

private static final class NullVectorReader extends VectorizedArrowReader {
private static final NullVectorReader INSTANCE = new NullVectorReader();
public static final class NullVectorReader extends VectorizedArrowReader {
slessard marked this conversation as resolved.
Show resolved Hide resolved

public NullVectorReader(Types.NestedField icebergField) {
super(icebergField);
}

@Override
public VectorHolder read(VectorHolder reuse, int numValsToRead) {
return VectorHolder.dummyHolder(numValsToRead);
NullVector vector = new NullVector(icebergField().name(), numValsToRead);
return new VectorHolder.NullVectorHolder(vector, icebergField(), numValsToRead);
slessard marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iceberg.arrow.ArrowAllocation;
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.ConstantVectorReader;
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.DeletedVectorReader;
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.NullVectorReader;
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
import org.apache.iceberg.parquet.VectorizedReader;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -97,7 +98,7 @@ public VectorizedReader<?> message(
} else if (reader != null) {
reorderedFields.add(reader);
} else {
reorderedFields.add(VectorizedArrowReader.nulls());
nastra marked this conversation as resolved.
Show resolved Hide resolved
reorderedFields.add(new NullVectorReader(field));
}
}
return vectorizedReader(reorderedFields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.iceberg.arrow.vectorized;

import static org.apache.iceberg.Files.localInput;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.*;
slessard marked this conversation as resolved.
Show resolved Hide resolved

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -59,6 +59,7 @@
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
Expand Down Expand Up @@ -262,6 +263,89 @@ public void testReadColumnFilter2() throws Exception {
scan, NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH, ImmutableList.of("timestamp"));
}

@Test
public void testReadColumnThatDoesNotExistInParquetSchema() throws Exception {
setMaxStackTraceElementsDisplayed(15);
slessard marked this conversation as resolved.
Show resolved Hide resolved
rowsWritten = Lists.newArrayList();
tables = new HadoopTables();

Schema schema =
new Schema(
Types.NestedField.required(1, "a", Types.IntegerType.get()),
Types.NestedField.optional(2, "b", Types.IntegerType.get()));

PartitionSpec spec = PartitionSpec.builderFor(schema).build();
Table table1 = tables.create(schema, spec, tableLocation);

// Add one record to the table
GenericRecord rec = GenericRecord.create(schema);
rec.setField("a", 1);
List<GenericRecord> genericRecords = Lists.newArrayList();
genericRecords.add(rec);

AppendFiles appendFiles = table1.newAppend();
appendFiles.appendFile(writeParquetFile(table1, genericRecords));
appendFiles.commit();

// Alter the table schema by adding a new, optional column.
// Do not add any data for this new column in the one existing row in the table
// and do not insert any new rows into the table.
Comment on lines +299 to +301
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be confusing vectorized read paths but I'm curious why this isn't reproducible in Spark vectorized reads? Or is it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can try and repro in a unit test for Spark and see if it's the case. To be clear I don't want to hold up this PR on that though since it does seem like a legitimate problem based on the test being done here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I never tried reproducing the issue in Spark. My guess as to why there was no existing test case is that null vectors isn't a bug so much as it is a subfeature that was never implemented. No sense in creating a test for a subfeature that was knowingly never implemented.

What makes me say null vector support was knowingly never implemented? Look at the removed comment in arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowBatchReader.java. That comment wasn't a "this is how this code works" type of comment. In my opinion that comment is a TODO comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to explore and see how this looks in Spark's vectorized path

Table table = tables.load(tableLocation);
table.updateSchema().addColumn("z", Types.IntegerType.get()).commit();

// Select all columns, all rows from the table
TableScan scan = table.newScan().select("*");

List<String> columns = ImmutableList.of("a", "b", "z");
// Read the data and verify that the returned ColumnarBatches match expected rows.
int rowIndex = 0;
try (VectorizedTableScanIterable itr = new VectorizedTableScanIterable(scan, 1, false)) {
for (ColumnarBatch batch : itr) {
List<GenericRecord> expectedRows = rowsWritten.subList(rowIndex, rowIndex + 1);

Map<String, Integer> columnNameToIndex = Maps.newHashMap();
for (int i = 0; i < columns.size(); i++) {
columnNameToIndex.put(columns.get(i), i);
}
Set<String> columnSet = columnNameToIndex.keySet();

assertThat(batch.numRows()).isEqualTo(1);
assertThat(batch.numCols()).isEqualTo(columns.size());

checkColumnarArrayValues(
1,
expectedRows,
batch,
0,
columnSet,
"a",
(records, i) -> records.get(i).getField("a"),
ColumnVector::getInt);
checkColumnarArrayValues(
1,
expectedRows,
batch,
1,
columnSet,
"b",
(records, i) -> records.get(i).getField("b"),
(array, i) -> array.isNullAt(i) ? null : array.getInt(i));
slessard marked this conversation as resolved.
Show resolved Hide resolved
checkColumnarArrayValues(
1,
expectedRows,
batch,
2,
columnSet,
"z",
(records, i) -> records.get(i).getField("z"),
(array, i) -> array.isNullAt(i) ? null : array.getInt(i));
slessard marked this conversation as resolved.
Show resolved Hide resolved
rowIndex += 1;
}
}
// Read the data and verify that the returned Arrow VectorSchemaRoots match expected rows.
slessard marked this conversation as resolved.
Show resolved Hide resolved
readAndCheckArrowResult(scan, 1, 1, columns);
slessard marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* The test asserts that {@link CloseableIterator#hasNext()} returned by the {@link ArrowReader}
* is idempotent.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.arrow.vectorized;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.math.BigDecimal;
import java.util.function.Supplier;
import org.apache.arrow.vector.IntVector;
import org.apache.iceberg.types.Types;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.PrimitiveType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

class GenericArrowVectorAccessorFactoryTest {
slessard marked this conversation as resolved.
Show resolved Hide resolved
@Mock
Supplier<GenericArrowVectorAccessorFactory.DecimalFactory<BigDecimal>> decimalFactorySupplier;
slessard marked this conversation as resolved.
Show resolved Hide resolved

@Mock Supplier<GenericArrowVectorAccessorFactory.StringFactory<String>> stringFactorySupplier;

@Mock
Supplier<GenericArrowVectorAccessorFactory.StructChildFactory<Integer>>
structChildFactorySupplier;

@Mock
Supplier<GenericArrowVectorAccessorFactory.ArrayFactory<Integer, Integer[]>> arrayFactorySupplier;

@InjectMocks GenericArrowVectorAccessorFactory genericArrowVectorAccessorFactory;

@BeforeEach
void before() {
MockitoAnnotations.openMocks(this);
}

@Test
void testGetVectorAccessorWithIntVector() {
IntVector vector = mock(IntVector.class);
when(vector.get(0)).thenReturn(88);

Types.NestedField nestedField = Types.NestedField.optional(0, "a1", Types.IntegerType.get());
ColumnDescriptor columnDescriptor =
new ColumnDescriptor(
new String[] {nestedField.name()}, PrimitiveType.PrimitiveTypeName.INT32, 0, 1);
NullabilityHolder nullabilityHolder = new NullabilityHolder(10000);
VectorHolder vectorHolder =
new VectorHolder(columnDescriptor, vector, false, null, nullabilityHolder, nestedField);
ArrowVectorAccessor actual = genericArrowVectorAccessorFactory.getVectorAccessor(vectorHolder);
assertThat(actual).isNotNull();
assertThat(actual).isInstanceOf(ArrowVectorAccessor.class);
slessard marked this conversation as resolved.
Show resolved Hide resolved
int intValue = actual.getInt(0);
assertThat(intValue).isEqualTo(88);
}

@Test
void testGetVectorAccessorWithNullVector() {
assertThatThrownBy(
() -> {
slessard marked this conversation as resolved.
Show resolved Hide resolved
genericArrowVectorAccessorFactory.getVectorAccessor(VectorHolder.dummyHolder(1));
})
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Unsupported vector: null");
}
}
Loading