Skip to content

Commit

Permalink
Sample commit for how URIs in place of files and strings look like
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Dec 27, 2023
1 parent b8a3bd7 commit 6ea1587
Show file tree
Hide file tree
Showing 16 changed files with 2,543 additions and 2,463 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.jetbrains.annotations.Nullable;

import java.io.File;
import java.net.URI;
import java.util.Map;

/**
Expand All @@ -24,15 +25,16 @@ public class FileTableLocationKey extends PartitionedTableLocationKey {

private static final String IMPLEMENTATION_NAME = FileTableLocationKey.class.getSimpleName();

protected final File file;
protected final URI parquetFileURI;
private final int order;

private int cachedHashCode;

/**
* Construct a new FileTableLocationKey for the supplied {@code file} and {@code partitions}.
*
* @param file The file (or directory) that backs the keyed location. Will be adjusted to an absolute path.
* @param parquetFileURI The file (or directory) that backs the keyed location. Will be adjusted to an absolute
* path.
* @param order Explicit ordering value for this location key. {@link Comparable#compareTo(Object)} will sort
* FileTableLocationKeys with a lower {@code order} before other keys. Comparing this ordering value takes
* precedence over other fields.
Expand All @@ -41,21 +43,21 @@ public class FileTableLocationKey extends PartitionedTableLocationKey {
* be made, so the calling code is free to mutate the map after this call completes, but the partition keys
* and values themselves <em>must</em> be effectively immutable.
*/
public FileTableLocationKey(@NotNull final File file, final int order,
public FileTableLocationKey(@NotNull final URI parquetFileURI, final int order,
@Nullable final Map<String, Comparable<?>> partitions) {
super(partitions);
this.file = file.getAbsoluteFile();
this.parquetFileURI = parquetFileURI;
this.order = order;
}

public final File getFile() {
return file;
public final URI getURI() {
return parquetFileURI;
}

@Override
public LogOutput append(@NotNull final LogOutput logOutput) {
return logOutput.append(getImplementationName())
.append(":[file=").append(file.getPath())
.append(":[file=").append(parquetFileURI.toString())
.append(",partitions=").append(PartitionsFormatter.INSTANCE, partitions)
.append(']');
}
Expand Down Expand Up @@ -84,15 +86,15 @@ public int compareTo(@NotNull final TableLocationKey other) {
if (partitionComparisonResult != 0) {
return partitionComparisonResult;
}
return file.compareTo(otherTyped.file);
return parquetFileURI.compareTo(otherTyped.parquetFileURI);
}
throw new ClassCastException("Cannot compare " + getClass() + " to " + other.getClass());
}

@Override
public int hashCode() {
if (cachedHashCode == 0) {
final int computedHashCode = 31 * partitions.hashCode() + file.hashCode();
final int computedHashCode = 31 * partitions.hashCode() + parquetFileURI.hashCode();
// Don't use 0; that's used by StandaloneTableLocationKey, and also our sentinel for the need to compute
if (computedHashCode == 0) {
final int fallbackHashCode = FileTableLocationKey.class.hashCode();
Expand All @@ -113,7 +115,7 @@ public boolean equals(@Nullable final Object other) {
return false;
}
final FileTableLocationKey otherTyped = (FileTableLocationKey) other;
return file.equals(otherTyped.file) && partitions.equals(otherTyped.partitions);
return parquetFileURI.equals(otherTyped.parquetFileURI) && partitions.equals(otherTyped.partitions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Paths;

Expand Down Expand Up @@ -94,14 +96,15 @@ public void setUp() {
}

@Test
public void doColumnsTest() throws IOException {
public void doColumnsTest() throws IOException, URISyntaxException {
final File dir = Files.createTempDirectory(Paths.get(""), "CODEC_TEST").toFile();
final File dest = new File(dir, "Test.parquet");
final URI destURI = new URI(dest.toString());
try {
ParquetTools.writeTable(table, dest, table.getDefinition(), writeInstructions);
final MutableObject<ParquetInstructions> instructionsOut = new MutableObject<>();
final Table result =
ParquetTools.readParquetSchemaAndTable(dest, ParquetInstructions.EMPTY, instructionsOut);
ParquetTools.readParquetSchemaAndTable(destURI, ParquetInstructions.EMPTY, instructionsOut);
TableTools.show(result);
TestCase.assertEquals(TABLE_DEFINITION, result.getDefinition());
final ParquetInstructions readInstructions = instructionsOut.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ public void testFlat() throws IOException {

final RecordingLocationKeyFinder<FileTableLocationKey> recorder = new RecordingLocationKeyFinder<>();
new KeyValuePartitionLayout<>(dataDirectory, path -> true, () -> new LocationTableBuilderCsv(dataDirectory),
(path, partitions) -> new FileTableLocationKey(path.toFile(), 0, partitions), 0).findKeys(recorder);
(path, partitions) -> new FileTableLocationKey(path.toUri(), 0, partitions), 0).findKeys(recorder);
final List<FileTableLocationKey> results =
recorder.getRecordedKeys().stream().sorted().collect(Collectors.toList());

TestCase.assertEquals(2, results.size());

TestCase.assertEquals(file1.getAbsoluteFile(), results.get(0).getFile());
TestCase.assertEquals(file2.getAbsoluteFile(), results.get(1).getFile());
TestCase.assertEquals(file1.getAbsoluteFile(), results.get(0).getURI());
TestCase.assertEquals(file2.getAbsoluteFile(), results.get(1).getURI());

TestCase.assertTrue(results.get(0).getPartitionKeys().isEmpty());
TestCase.assertTrue(results.get(1).getPartitionKeys().isEmpty());
Expand All @@ -82,14 +82,14 @@ public void testOneLevel() throws IOException {

final RecordingLocationKeyFinder<FileTableLocationKey> recorder = new RecordingLocationKeyFinder<>();
new KeyValuePartitionLayout<>(dataDirectory, path -> true, () -> new LocationTableBuilderCsv(dataDirectory),
(path, partitions) -> new FileTableLocationKey(path.toFile(), 0, partitions), 1).findKeys(recorder);
(path, partitions) -> new FileTableLocationKey(path.toUri(), 0, partitions), 1).findKeys(recorder);
final List<FileTableLocationKey> results =
recorder.getRecordedKeys().stream().sorted().collect(Collectors.toList());

TestCase.assertEquals(2, results.size());

TestCase.assertEquals(file2.getAbsoluteFile(), results.get(0).getFile());
TestCase.assertEquals(file1.getAbsoluteFile(), results.get(1).getFile());
TestCase.assertEquals(file2.getAbsoluteFile(), results.get(0).getURI());
TestCase.assertEquals(file1.getAbsoluteFile(), results.get(1).getURI());

TestCase.assertEquals(1, results.get(0).getPartitionKeys().size());
TestCase.assertEquals(1, results.get(1).getPartitionKeys().size());
Expand All @@ -115,15 +115,15 @@ public void testThreeLevels() throws IOException {

final RecordingLocationKeyFinder<FileTableLocationKey> recorder = new RecordingLocationKeyFinder<>();
new KeyValuePartitionLayout<>(dataDirectory, path -> true, () -> new LocationTableBuilderCsv(dataDirectory),
(path, partitions) -> new FileTableLocationKey(path.toFile(), 0, partitions), 3).findKeys(recorder);
(path, partitions) -> new FileTableLocationKey(path.toUri(), 0, partitions), 3).findKeys(recorder);
final List<FileTableLocationKey> results =
recorder.getRecordedKeys().stream().sorted().collect(Collectors.toList());

TestCase.assertEquals(3, results.size());

TestCase.assertEquals(file2.getAbsoluteFile(), results.get(0).getFile());
TestCase.assertEquals(file3.getAbsoluteFile(), results.get(1).getFile());
TestCase.assertEquals(file1.getAbsoluteFile(), results.get(2).getFile());
TestCase.assertEquals(file2.getAbsoluteFile(), results.get(0).getURI());
TestCase.assertEquals(file3.getAbsoluteFile(), results.get(1).getURI());
TestCase.assertEquals(file1.getAbsoluteFile(), results.get(2).getURI());

TestCase.assertEquals(3, results.get(0).getPartitionKeys().size());
TestCase.assertEquals(3, results.get(1).getPartitionKeys().size());
Expand Down Expand Up @@ -166,7 +166,7 @@ public void testTypesAndNameLegalization() throws IOException {
for (final Supplier<LocationTableBuilder> locationTableBuilderSupplier : locationTableBuilderSuppliers) {
final TableLocationKeyFinder<FileTableLocationKey> finder = new KeyValuePartitionLayout<>(
dataDirectory, path -> true, locationTableBuilderSupplier,
(path, partitions) -> new FileTableLocationKey(path.toFile(), 0, partitions), 3);
(path, partitions) -> new FileTableLocationKey(path.toUri(), 0, partitions), 3);

final RecordingLocationKeyFinder<FileTableLocationKey> recorder1 = new RecordingLocationKeyFinder<>();
finder.findKeys(recorder1);
Expand All @@ -180,9 +180,9 @@ public void testTypesAndNameLegalization() throws IOException {

TestCase.assertEquals(3, results1.size());

TestCase.assertEquals(file2.getAbsoluteFile(), results1.get(0).getFile());
TestCase.assertEquals(file3.getAbsoluteFile(), results1.get(1).getFile());
TestCase.assertEquals(file1.getAbsoluteFile(), results1.get(2).getFile());
TestCase.assertEquals(file2.getAbsoluteFile(), results1.get(0).getURI());
TestCase.assertEquals(file3.getAbsoluteFile(), results1.get(1).getURI());
TestCase.assertEquals(file1.getAbsoluteFile(), results1.get(2).getURI());

TestCase.assertEquals(3, results1.get(0).getPartitionKeys().size());
TestCase.assertEquals(3, results1.get(1).getPartitionKeys().size());
Expand Down Expand Up @@ -219,7 +219,7 @@ public void testMaxDepthEmpty() throws IOException {

final RecordingLocationKeyFinder<FileTableLocationKey> recorder = new RecordingLocationKeyFinder<>();
new KeyValuePartitionLayout<>(dataDirectory, path -> true, () -> new LocationTableBuilderCsv(dataDirectory),
(path, partitions) -> new FileTableLocationKey(path.toFile(), 0, partitions), 3).findKeys(recorder);
(path, partitions) -> new FileTableLocationKey(path.toUri(), 0, partitions), 3).findKeys(recorder);
final List<FileTableLocationKey> results =
recorder.getRecordedKeys().stream().sorted().collect(Collectors.toList());

Expand Down Expand Up @@ -247,15 +247,15 @@ public void testMaxDepth() throws IOException {

final RecordingLocationKeyFinder<FileTableLocationKey> recorder = new RecordingLocationKeyFinder<>();
new KeyValuePartitionLayout<>(dataDirectory, path -> true, () -> new LocationTableBuilderCsv(dataDirectory),
(path, partitions) -> new FileTableLocationKey(path.toFile(), 0, partitions), 3).findKeys(recorder);
(path, partitions) -> new FileTableLocationKey(path.toUri(), 0, partitions), 3).findKeys(recorder);
final List<FileTableLocationKey> results =
recorder.getRecordedKeys().stream().sorted().collect(Collectors.toList());

TestCase.assertEquals(3, results.size());

TestCase.assertEquals(file2.getAbsoluteFile(), results.get(0).getFile());
TestCase.assertEquals(file3.getAbsoluteFile(), results.get(1).getFile());
TestCase.assertEquals(file1.getAbsoluteFile(), results.get(2).getFile());
TestCase.assertEquals(file2.getAbsoluteFile(), results.get(0).getURI());
TestCase.assertEquals(file3.getAbsoluteFile(), results.get(1).getURI());
TestCase.assertEquals(file1.getAbsoluteFile(), results.get(2).getURI());
}

@Test
Expand All @@ -275,7 +275,7 @@ public void testMismatch() throws IOException {

try {
new KeyValuePartitionLayout<>(dataDirectory, path -> true, () -> new LocationTableBuilderCsv(dataDirectory),
(path, partitions) -> new FileTableLocationKey(path.toFile(), 0, partitions), 3).findKeys(ftlk -> {
(path, partitions) -> new FileTableLocationKey(path.toUri(), 0, partitions), 3).findKeys(ftlk -> {
});
TestCase.fail("Expected exception");
} catch (TableDataException expected) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SeekableByteChannel;
Expand All @@ -27,20 +28,24 @@ public class ParquetFileReader {
private static final int FOOTER_LENGTH_SIZE = 4;
private static final String MAGIC_STR = "PAR1";
static final byte[] MAGIC = MAGIC_STR.getBytes(StandardCharsets.US_ASCII);
public static final String S3_PARQUET_FILE_URI_SCHEME = "s3";

public final FileMetaData fileMetaData;
private final SeekableChannelsProvider channelsProvider;
private final Path rootPath;
private final MessageType type;

public ParquetFileReader(final String filePath, final SeekableChannelsProvider channelsProvider)
public ParquetFileReader(final URI parquetFileURI, final SeekableChannelsProvider channelsProvider)
throws IOException {
this.channelsProvider = channelsProvider;
// Root path should be this file if a single file, else the parent directory for a metadata
// file
rootPath =
filePath.endsWith(".parquet") ? Paths.get(filePath) : Paths.get(filePath).getParent();

final String filePath = parquetFileURI.toString();
// Root path should be this file if a single file, else the parent directory for a metadata file
if ((parquetFileURI.getScheme() != null && parquetFileURI.getScheme().equals(S3_PARQUET_FILE_URI_SCHEME))
|| parquetFileURI.getRawPath().endsWith(".parquet")) {
rootPath = Path.of(filePath);
} else {
rootPath = Paths.get(parquetFileURI).getParent();
}
final byte[] footer;
try (final SeekableChannelsProvider.ChannelContext context = channelsProvider.makeContext();
final SeekableByteChannel readChannel = channelsProvider.getReadChannel(context, filePath)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
import org.apache.parquet.schema.PrimitiveType;
import org.jetbrains.annotations.NotNull;

import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -98,8 +99,13 @@ public static ParquetInstructions readParquetSchema(
@NotNull final ParquetInstructions readInstructions,
@NotNull final ColumnDefinitionConsumer consumer,
@NotNull final BiFunction<String, Set<String>, String> legalizeColumnNameFunc) throws IOException {
final ParquetFileReader parquetFileReader =
ParquetTools.getParquetFileReaderChecked(new File(filePath), readInstructions);
final ParquetFileReader parquetFileReader;
try {
parquetFileReader =
ParquetTools.getParquetFileReaderChecked(new URI(filePath), readInstructions);
} catch (final URISyntaxException e) {
throw new UncheckedDeephavenException("Failed to parse URI " + filePath, e);
}
final ParquetMetadata parquetMetadata =
new ParquetMetadataConverter().fromParquetMetadata(parquetFileReader.fileMetaData);
return readParquetSchema(parquetFileReader.getSchema(), parquetMetadata.getFileMetaData().getKeyValueMetaData(),
Expand Down
Loading

0 comments on commit 6ea1587

Please sign in to comment.