Skip to content

Commit

Permalink
Review comments part 1
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Nov 20, 2023
1 parent 2911749 commit 8fefcc1
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 239 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@
import java.util.function.Supplier;

public interface ColumnChunkReader {
/**
* @return -1 if the current column doesn't guarantee fixed page size, otherwise the fixed page size
*/
int getPageFixedSize();

/**
* @return The number of rows in this ColumnChunk, or -1 if it's unknown.
*/
Expand All @@ -42,22 +37,28 @@ public interface ColumnChunkReader {

interface ColumnPageReaderIterator extends Iterator<ColumnPageReader>, AutoCloseable {
@Override
void close() throws IOException;
void close();
}

/**
* @return An iterator over individual parquet pages
*/
ColumnPageReaderIterator getPageIterator() throws IOException;

interface ColumnPageDirectAccessor extends AutoCloseable {
/**
* Directly access a page reader for a given page number. This is an optional method that may not be
* implemented. Note that the user should either use {@link Iterator} methods or this method, but not both.
* Directly access a page reader for a given page number.
*/
@Nullable
default ColumnPageReader getPageReader(final int pageNum) {
return null;
};
ColumnPageReader getPageReader(final int pageNum);

@Override
void close();
}

/**
* @return An iterator over individual parquet pages
* @return An accessor for individual parquet pages
*/
ColumnPageReaderIterator getPageIterator() throws IOException;
ColumnPageDirectAccessor getPageAccessor();

/**
* @return Whether this column chunk uses a dictionary-based encoding on every page
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.Supplier;

import static org.apache.parquet.format.Encoding.PLAIN_DICTIONARY;
Expand Down Expand Up @@ -77,11 +78,6 @@ public class ColumnChunkReaderImpl implements ColumnChunkReader {
this.version = version;
}

@Override
public int getPageFixedSize() {
return -1;
}

@Override
public long numRows() {
return numRows;
Expand Down Expand Up @@ -112,6 +108,14 @@ public ColumnPageReaderIterator getPageIterator() {
}
}

@Override
public final ColumnPageDirectAccessor getPageAccessor() {
if (offsetIndex == null) {
throw new UnsupportedOperationException("Cannot use direct accessor without offset index");
}
return new ColumnPageDirectAccessorImpl(path, channelsProvider);
}

private Path getFilePath() {
if (filePath != null) {
return filePath;
Expand Down Expand Up @@ -306,23 +310,39 @@ public boolean hasNext() {
@Override
public ColumnPageReader next() {
if (!hasNext()) {
throw new RuntimeException("No next element");
throw new NoSuchElementException("No next element");
}
int rowCount =
(int) (offsetIndex.getLastRowIndex(pos, columnChunk.getMeta_data().getNum_values())
- offsetIndex.getFirstRowIndex(pos) + 1);
// Following logic assumes that offsetIndex will store the number of values for a page instead of number
// of rows (which can be different for array and vector columns). This behavior is because of a bug on
// parquet writing side which got fixed in deephaven-core/pull/4844 and is only kept to support reading
// parquet files written before deephaven-core/pull/4844.
final int numValues = (int) (offsetIndex.getLastRowIndex(pos, columnChunk.getMeta_data().getNum_values())
- offsetIndex.getFirstRowIndex(pos) + 1);
ColumnPageReaderImpl columnPageReader =
new ColumnPageReaderImpl(channelsProvider, decompressor, dictionarySupplier,
nullMaterializerFactory, path, getFilePath(), fieldTypes, offsetIndex.getOffset(pos), null,
rowCount);
numValues);
pos++;
return columnPageReader;
}

@Override
public void close() {}
}

class ColumnPageDirectAccessorImpl implements ColumnPageDirectAccessor {
private final SeekableChannelsProvider channelsProvider;
private final ColumnDescriptor path;

ColumnPageDirectAccessorImpl(final ColumnDescriptor path, final SeekableChannelsProvider channelsProvider) {
this.path = path;
this.channelsProvider = channelsProvider;
}

@Override
public ColumnPageReader getPageReader(final int pageNum) {
if (pageNum > offsetIndex.getPageCount()) {
throw new RuntimeException(
throw new NoSuchElementException(
"pageNum=" + pageNum + " > offsetIndex.getPageCount()=" + offsetIndex.getPageCount());
}
final int numValues = -1; // Will be populated properly when we read the page header
Expand All @@ -331,6 +351,6 @@ public ColumnPageReader getPageReader(final int pageNum) {
}

@Override
public void close() throws IOException {}
public void close() {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.jetbrains.annotations.VisibleForTesting;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -36,7 +35,6 @@ public abstract class ColumnChunkPageStore<ATTR extends Any>
private final ToPage<ATTR, ?> toPage;

private final long numRows;
final ColumnChunkReader.ColumnPageReaderIterator columnPageReaderIterator;

public static class CreatorResult<ATTR extends Any> {

Expand Down Expand Up @@ -75,7 +73,7 @@ private static boolean canUseOffsetIndexBasedPageStore(@NotNull final ColumnChun
private static final Pattern VERSION_PATTERN = Pattern.compile("(\\d+)\\.(\\d+)\\.(\\d+)");

/**
* Check if the version is greater than 0.31.0
* Check if the version is greater than or equal to 0.31.0
*/
@VisibleForTesting
public static boolean satisfiesMinimumVersionRequirements(@NotNull final String version) {
Expand Down Expand Up @@ -123,7 +121,6 @@ public static <ATTR extends Any> CreatorResult<ATTR> create(
this.toPage = toPage;

this.numRows = Require.inRange(columnChunkReader.numRows(), "numRows", mask, "mask");
this.columnPageReaderIterator = columnChunkReader.getPageIterator();
}

ChunkPage<ATTR> toPage(final long offset, @NotNull final ColumnPageReader columnPageReader)
Expand Down Expand Up @@ -170,11 +167,5 @@ public boolean usesDictionaryOnEveryPage() {
}

@Override
public void close() {
try {
columnPageReaderIterator.close();
} catch (IOException except) {
throw new UncheckedIOException(except);
}
}
public void close() {}
}

This file was deleted.

Loading

0 comments on commit 8fefcc1

Please sign in to comment.