Skip to content

Commit

Permalink
Remaining review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Nov 22, 2023
1 parent edc5312 commit 46c77d5
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,10 @@ public interface ColumnChunkReader {
@Nullable
OffsetIndex getOffsetIndex();

interface ColumnPageReaderIterator extends Iterator<ColumnPageReader>, AutoCloseable {
@Override
void close();
}

/**
* @return An iterator over individual parquet pages
*/
ColumnPageReaderIterator getPageIterator() throws IOException;
Iterator<ColumnPageReader> getPageIterator() throws IOException;

interface ColumnPageDirectAccessor {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.format.*;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
Expand All @@ -27,6 +28,7 @@
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.Supplier;
Expand Down Expand Up @@ -98,7 +100,7 @@ public final OffsetIndex getOffsetIndex() {
}

@Override
public ColumnPageReaderIterator getPageIterator() {
public Iterator<ColumnPageReader> getPageIterator() {
final long dataPageOffset = columnChunk.meta_data.getData_page_offset();
if (offsetIndex == null) {
return new ColumnPageReaderIteratorImpl(dataPageOffset, columnChunk.getMeta_data().getNum_values());
Expand Down Expand Up @@ -214,7 +216,7 @@ private Dictionary readDictionary(ReadableByteChannel file) throws IOException {
return dictionaryPage.getEncoding().initDictionary(path, dictionaryPage);
}

private final class ColumnPageReaderIteratorImpl implements ColumnPageReaderIterator {
private final class ColumnPageReaderIteratorImpl implements Iterator<ColumnPageReader> {
private long currentOffset;
private long remainingValues;

Expand All @@ -231,7 +233,7 @@ public boolean hasNext() {
@Override
public ColumnPageReader next() {
if (!hasNext()) {
throw new RuntimeException("No next element");
throw new NoSuchElementException("No next element");
}
// NB: The channels provider typically caches channels; this avoids maintaining a handle per column chunk
try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(getFilePath())) {
Expand Down Expand Up @@ -264,24 +266,21 @@ public ColumnPageReader next() {
throw new UncheckedDeephavenException(
"Unknown parquet data page header type " + pageHeader.type);
}
final Supplier<Dictionary> pageDictionarySupplier =
(encoding == PLAIN_DICTIONARY || encoding == RLE_DICTIONARY)
? dictionarySupplier
: () -> NULL_DICTIONARY;
return new ColumnPageReaderImpl(
channelsProvider, decompressor, pageDictionarySupplier,
if ((encoding == PLAIN_DICTIONARY || encoding == RLE_DICTIONARY)
&& dictionarySupplier.get() == NULL_DICTIONARY) {
throw new ParquetDecodingException("Error in decoding page because dictionary data not found for " +
" column " + path + " with encoding " + encoding);
}
return new ColumnPageReaderImpl(channelsProvider, decompressor, dictionarySupplier,
nullMaterializerFactory, path, getFilePath(), fieldTypes, readChannel.position(), pageHeader,
-1);
ColumnPageReaderImpl.NULL_NUM_VALUES);
} catch (IOException e) {
throw new RuntimeException("Error reading page header", e);
throw new UncheckedDeephavenException("Error reading page header", e);
}
}

@Override
public void close() {}
}

private final class ColumnPageReaderIteratorIndexImpl implements ColumnPageReaderIterator {
private final class ColumnPageReaderIteratorIndexImpl implements Iterator<ColumnPageReader> {
private int pos;

ColumnPageReaderIteratorIndexImpl() {
Expand Down Expand Up @@ -311,9 +310,6 @@ nullMaterializerFactory, path, getFilePath(), fieldTypes, offsetIndex.getOffset(
pos++;
return columnPageReader;
}

@Override
public void close() {}
}

private final class ColumnPageDirectAccessorImpl implements ColumnPageDirectAccessor {
Expand All @@ -326,9 +322,10 @@ public ColumnPageReader getPageReader(final int pageNum) {
throw new IndexOutOfBoundsException(
"pageNum=" + pageNum + ", offsetIndex.getPageCount()=" + offsetIndex.getPageCount());
}
final int numValues = -1; // Will be populated properly when we read the page header
// Page header and number of values will be populated later when we read the page header from the file
return new ColumnPageReaderImpl(channelsProvider, decompressor, dictionarySupplier, nullMaterializerFactory,
path, getFilePath(), fieldTypes, offsetIndex.getOffset(pageNum), null, numValues);
path, getFilePath(), fieldTypes, offsetIndex.getOffset(pageNum), null,
ColumnPageReaderImpl.NULL_NUM_VALUES);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class ColumnPageReaderImpl implements ColumnPageReader {
private static final int MAX_HEADER = 8192;
private static final int START_HEADER = 128;
public static final int NULL_OFFSET = -1;
static final int NULL_NUM_VALUES = -1;

private final SeekableChannelsProvider channelsProvider;
private final CompressorAdapter compressorAdapter;
Expand All @@ -64,8 +65,21 @@ public class ColumnPageReaderImpl implements ColumnPageReader {
private int rowCount = -1;

/**
* Returns a {@link ColumnPageReader} object for reading the column page data from the file.
*
* @param channelsProvider The provider for {@link SeekableByteChannel} for reading the file.
* @param compressorAdapter The adapter for decompressing the data.
* @param dictionarySupplier The supplier for dictionary data, set as {@link ColumnChunkReader#NULL_DICTIONARY} if
* page isn't dictionary encoded
* @param materializerFactory The factory for creating {@link PageMaterializer}.
* @param path The path of the column.
* @param filePath The path of the file.
* @param fieldTypes The types of the fields in the column.
* @param offset The offset for page header if supplied {@code pageHeader} is {@code null}. Else, the offset of data
* in the page.
* following the header in the page.
* @param pageHeader The page header if it is already read from the file. Else, {@code null}.
* @param numValues The number of values in the page if it is already read from the file. Else,
* {@value #NULL_NUM_VALUES}
*/
ColumnPageReaderImpl(SeekableChannelsProvider channelsProvider,
CompressorAdapter compressorAdapter,
Expand All @@ -92,15 +106,13 @@ public class ColumnPageReaderImpl implements ColumnPageReader {
@Override
public Object materialize(Object nullValue) throws IOException {
try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(filePath)) {
readChannel.position(offset);
ensurePageHeader(readChannel);
return readDataPage(nullValue, readChannel);
}
}

public int readRowCount() throws IOException {
try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(filePath)) {
readChannel.position(offset);
ensurePageHeader(readChannel);
return readRowCountFromDataPage(readChannel);
}
Expand All @@ -110,53 +122,66 @@ public int readRowCount() throws IOException {
@Override
public IntBuffer readKeyValues(IntBuffer keyDest, int nullPlaceholder) throws IOException {
try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(filePath)) {
readChannel.position(offset);
ensurePageHeader(readChannel);
return readKeyFromDataPage(keyDest, nullPlaceholder, readChannel);
}
}

/**
* If {@link #pageHeader} is {@code null}, read it from the file and increment the {@link #offset} and file position
* by the length of page header. This method assumes that file position is set to {@link #offset} before calling.
* This method also read the number of values in the page from the header.
* If {@link #pageHeader} is {@code null}, read it from the file, and increment the {@link #offset} by the length of
* page header. Channel position would be set to the end of page header or beginning of data before returning.
*/
private synchronized void ensurePageHeader(SeekableByteChannel file) throws IOException {
if (pageHeader == null) {
if (file.position() != offset) {
throw new IllegalStateException("File position = " + file.position() + " not equal to expected offset ="
+ offset);
}
int maxHeader = START_HEADER;
boolean success;
do {
ByteBuffer headerBuffer = ByteBuffer.allocate(maxHeader);
file.read(headerBuffer);
headerBuffer.flip();
ByteBufferInputStream bufferedIS = ByteBufferInputStream.wrap(headerBuffer);
try {
pageHeader = Util.readPageHeader(bufferedIS);
offset += bufferedIS.position();
success = true;
} catch (IOException e) {
success = false;
if (maxHeader > MAX_HEADER) {
throw e;
private void ensurePageHeader(final SeekableByteChannel file) throws IOException {
// Set this channel's position to appropriate offset for reading. If pageHeader is null, this offset would be
// the offset of page header, else it would be the offset of data.
file.position(offset);
synchronized (this) {
if (pageHeader == null) {
int maxHeader = START_HEADER;
boolean success;
do {
final ByteBuffer headerBuffer = ByteBuffer.allocate(maxHeader);
file.read(headerBuffer);
headerBuffer.flip();
final ByteBufferInputStream bufferedIS = ByteBufferInputStream.wrap(headerBuffer);
try {
pageHeader = Util.readPageHeader(bufferedIS);
offset += bufferedIS.position();
success = true;
} catch (IOException e) {
success = false;
if (maxHeader > MAX_HEADER) {
throw e;
}
maxHeader <<= 1;
file.position(offset);
}
} while (!success);
file.position(offset);
if (numValues >= 0) {
final int numValuesFromHeader = readNumValuesFromPageHeader(pageHeader);
if (numValues != numValuesFromHeader) {
throw new IllegalStateException(
"numValues = " + numValues + " different from number of values " +
"read from the page header = " + numValuesFromHeader + " for column " + path);
}
maxHeader *= 2;
file.position(offset);
}
} while (!success);
file.position(offset);
if (numValues >= 0) {
// Make sure the number of values are same as those in the header
if (numValues != readNumValuesFromPageHeader(pageHeader)) {
throw new IllegalStateException("numValues = " + numValues + " different from number of values " +
"read from the page header");
}
}
if (numValues == NULL_NUM_VALUES) {
numValues = readNumValuesFromPageHeader(pageHeader);
}
}
}

private static int readNumValuesFromPageHeader(@NotNull final PageHeader header) throws IOException {
switch (header.type) {
case DATA_PAGE:
return header.getData_page_header().getNum_values();
case DATA_PAGE_V2:
return header.getData_page_header_v2().getNum_values();
default:
throw new IOException(String.format("Unexpected page of type {%s}", header.getType()));
}
ensureNumValues();
}

private int readRowCountFromDataPage(ReadableByteChannel file) throws IOException {
Expand Down Expand Up @@ -573,9 +598,8 @@ private ValuesReader getDataReader(Encoding dataEncoding, ByteBuffer in, int val
if (dataEncoding.usesDictionary()) {
final Dictionary dictionary = dictionarySupplier.get();
if (dictionary == ColumnChunkReader.NULL_DICTIONARY) {
throw new ParquetDecodingException(
"Could not read page in col " + path
+ " as the dictionary was missing for encoding " + dataEncoding);
throw new ParquetDecodingException("Could not read page in col " + path + " as the dictionary was " +
"missing for encoding " + dataEncoding);
}
dataReader = new DictionaryValuesReader(dictionary);
} else {
Expand All @@ -596,33 +620,13 @@ public int numValues() throws IOException {
return numValues;
}
try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(filePath)) {
readChannel.position(offset);
ensurePageHeader(readChannel);
// Above will automatically populate numValues
Assert.geq(numValues, "numValues", 0);
// Above will block till it populates numValues
Assert.geqZero(numValues, "numValues");
return numValues;
}
}

private void ensureNumValues() throws IOException {
if (numValues >= 0) {
return;
}
Assert.neqNull(pageHeader, "pageHeader");
numValues = readNumValuesFromPageHeader(pageHeader);
}

private static int readNumValuesFromPageHeader(@NotNull final PageHeader header) throws IOException {
switch (header.type) {
case DATA_PAGE:
return header.getData_page_header().getNum_values();
case DATA_PAGE_V2:
return header.getData_page_header_v2().getNum_values();
default:
throw new IOException(String.format("Unexpected page of type {%s}", header.getType()));
}
}

@NotNull
@Override
public Dictionary getDictionary() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public String version() {
final String version = TableInfo.class.getPackage().getImplementationVersion();
if (version == null) {
// For unit tests
return "0.dev.0";
return "unknown";
}
return version;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private static boolean canUseOffsetIndexBasedPageStore(@NotNull final ColumnChun
// part of deephaven-core#4844
final Class<?> columnType = columnDefinition.getDataType();
if (columnType.isArray() || Vector.class.isAssignableFrom(columnType)) {
return satisfiesMinimumVersionRequirements(version);
return hasCorrectVectorOffsetIndexes(version);
}
return true;
}
Expand All @@ -76,7 +76,7 @@ private static boolean canUseOffsetIndexBasedPageStore(@NotNull final ColumnChun
* Check if the version is greater than or equal to 0.31.0, or it doesn't follow the versioning schema X.Y.Z
*/
@VisibleForTesting
public static boolean satisfiesMinimumVersionRequirements(@NotNull final String version) {
public static boolean hasCorrectVectorOffsetIndexes(@NotNull final String version) {
final Matcher matcher = VERSION_PATTERN.matcher(version);
if (!matcher.matches()) {
// Could be unit tests or some other versioning scheme
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ private static final class PageState<ATTR extends Any> {
@NotNull final ToPage<ATTR, ?> toPage) throws IOException {
super(pageCache, columnChunkReader, mask, toPage);
offsetIndex = columnChunkReader.getOffsetIndex();
Assert.assertion(offsetIndex != null, "offsetIndex != null");
Assert.neqNull(offsetIndex, "offsetIndex");
numPages = offsetIndex.getPageCount();
Assert.assertion(numPages > 0, "numPages > 0");
Assert.gtZero(numPages, "numPages");
pageStates = new AtomicReferenceArray<>(numPages);
columnPageDirectAccessor = columnChunkReader.getPageAccessor();

Expand Down Expand Up @@ -85,13 +85,13 @@ private static int findPageNumUsingOffsetIndex(final OffsetIndex offsetIndex, fi
while (low <= high) {
final int mid = (low + high) >>> 1;
final long midVal = offsetIndex.getFirstRowIndex(mid);

if (midVal < row)
if (midVal < row) {
low = mid + 1;
else if (midVal > row)
} else if (midVal > row) {
high = mid - 1;
else
} else {
return mid; // 'row' is the first row of page
}
}
return (low - 1); // 'row' is somewhere in the middle of page
}
Expand Down Expand Up @@ -141,8 +141,8 @@ public ChunkPage<ATTR> getPageContaining(@NotNull final FillContext fillContext,
if (pageNum >= numPages) {
// This can happen if the last page is larger than rest of the pages, which are all the same size.
// We have already checked that row is less than numRows.
Assert.assertion(row >= offsetIndex.getFirstRowIndex(numPages - 1),
"row >= offsetIndex.getFirstRowIndex(numPages - 1)");
Assert.geq(row, "row", offsetIndex.getFirstRowIndex(numPages - 1),
"offsetIndex.getFirstRowIndex(numPages - 1)");
pageNum = (numPages - 1);
}
}
Expand Down
Loading

0 comments on commit 46c77d5

Please sign in to comment.