Skip to content

Commit

Permalink
Added support to read single parquet file hosted in AWS S3 (#4972)
Browse files Browse the repository at this point in the history
Refactored local parquet reading code to support reading from URIs instead of file paths.
  • Loading branch information
malhotrashivam authored Jan 29, 2024
1 parent 2e747b4 commit db04b57
Show file tree
Hide file tree
Showing 77 changed files with 2,704 additions and 441 deletions.
22 changes: 22 additions & 0 deletions Util/channel/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
plugins {
id 'java-library'
id 'io.deephaven.project.register'
}

dependencies {
implementation project(':Base')

// Needed for SafeCloseable
implementation project(':Util')

compileOnly depAnnotations

Classpaths.inheritJUnitPlatform(project)
Classpaths.inheritAssertJ(project)
testImplementation 'org.junit.jupiter:junit-jupiter'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
}

test {
useJUnitPlatform()
}
1 change: 1 addition & 0 deletions Util/channel/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.deephaven.project.ProjectType=JAVA_PUBLIC
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
/**
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.parquet.base.util;
package io.deephaven.util.channel;

import io.deephaven.base.RAPriQueue;
import io.deephaven.base.verify.Assert;
import io.deephaven.base.verify.Require;
import io.deephaven.hash.KeyedObjectHashMap;
import io.deephaven.hash.KeyedObjectKey;
import io.deephaven.util.annotations.FinalDefault;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
Expand All @@ -22,6 +24,15 @@
*/
public class CachedChannelProvider implements SeekableChannelsProvider {

public interface ContextHolder {
void setContext(SeekableChannelContext channelContext);

@FinalDefault
default void clearContext() {
setContext(null);
}
}

private final SeekableChannelsProvider wrappedProvider;
private final int maximumPooledCount;

Expand Down Expand Up @@ -52,13 +63,27 @@ public CachedChannelProvider(@NotNull final SeekableChannelsProvider wrappedProv
}

@Override
public SeekableByteChannel getReadChannel(@NotNull final Path path) throws IOException {
final String pathKey = path.toAbsolutePath().toString();
public SeekableChannelContext makeContext() {
return wrappedProvider.makeContext();
}

@Override
public boolean isCompatibleWith(@NotNull final SeekableChannelContext channelContext) {
return wrappedProvider.isCompatibleWith(channelContext);
}

@Override
public SeekableByteChannel getReadChannel(@NotNull final SeekableChannelContext channelContext,
@NotNull final URI uri)
throws IOException {
final String uriString = uri.toString();
final KeyedObjectHashMap<String, PerPathPool> channelPool = channelPools.get(ChannelType.Read);
final CachedChannel result = tryGetPooledChannel(pathKey, channelPool);
return result == null
? new CachedChannel(wrappedProvider.getReadChannel(path), ChannelType.Read, pathKey)
final CachedChannel result = tryGetPooledChannel(uriString, channelPool);
final CachedChannel channel = result == null
? new CachedChannel(wrappedProvider.getReadChannel(channelContext, uri), ChannelType.Read, uriString)
: result.position(0);
channel.setContext(channelContext);
return channel;
}

@Override
Expand Down Expand Up @@ -125,10 +150,15 @@ private long advanceClock() {
return logicalClock = 1;
}

@Override
public void close() {
wrappedProvider.close();
}

/**
* {@link SeekableByteChannel Channel} wrapper for pooled usage.
*/
private class CachedChannel implements SeekableByteChannel {
private class CachedChannel implements SeekableByteChannel, ContextHolder {

private final SeekableByteChannel wrappedChannel;
private final ChannelType channelType;
Expand Down Expand Up @@ -163,7 +193,7 @@ public long position() throws IOException {
}

@Override
public SeekableByteChannel position(final long newPosition) throws IOException {
public CachedChannel position(final long newPosition) throws IOException {
Require.eqTrue(isOpen, "isOpen");
wrappedChannel.position(newPosition);
return this;
Expand Down Expand Up @@ -196,12 +226,20 @@ public boolean isOpen() {
public void close() throws IOException {
Require.eqTrue(isOpen, "isOpen");
isOpen = false;
clearContext();
returnPoolableChannel(this);
}

private void dispose() throws IOException {
wrappedChannel.close();
}

@Override
public final void setContext(@Nullable final SeekableChannelContext channelContext) {
if (wrappedChannel instanceof ContextHolder) {
((ContextHolder) wrappedChannel).setContext(channelContext);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
/**
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.parquet.base.util;
package io.deephaven.util.channel;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.IOException;
import java.net.URI;
import java.nio.channels.FileChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
Expand All @@ -14,8 +16,23 @@
public class LocalFSChannelProvider implements SeekableChannelsProvider {

@Override
public SeekableByteChannel getReadChannel(@NotNull final Path path) throws IOException {
return FileChannel.open(path, StandardOpenOption.READ);
public SeekableChannelContext makeContext() {
// No additional context required for local FS
return SeekableChannelContext.NULL;
}

@Override
public boolean isCompatibleWith(@Nullable final SeekableChannelContext channelContext) {
// Context is not used, hence always compatible
return true;
}

@Override
public SeekableByteChannel getReadChannel(@Nullable final SeekableChannelContext channelContext,
@NotNull final URI uri)
throws IOException {
// context is unused here
return FileChannel.open(Path.of(uri), StandardOpenOption.READ);
}

@Override
Expand All @@ -31,4 +48,7 @@ public SeekableByteChannel getWriteChannel(@NotNull final Path filePath, final b
}
return result;
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.deephaven.util.channel;

import io.deephaven.util.SafeCloseable;

/**
* Context object for reading and writing to channels created by {@link SeekableChannelsProvider}.
*/
public interface SeekableChannelContext extends SafeCloseable {

SeekableChannelContext NULL = new SeekableChannelContext() {};

/**
* Release any resources associated with this context. The context should not be used afterward.
*/
default void close() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.util.channel;

import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.NotNull;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.nio.file.Paths;

public interface SeekableChannelsProvider extends SafeCloseable {

/**
* Take the file source path or URI and convert it to a URI object.
*
* @param source The file source path or URI
* @return The URI object
*/
static URI convertToURI(final String source) {
final URI uri;
try {
uri = new URI(source);
} catch (final URISyntaxException e) {
// If the URI is invalid, assume it's a file path
return new File(source).toURI();
}
if (uri.getScheme() == null) {
// Need to convert to a "file" URI
return new File(source).toURI();
}
return uri;
}

/**
* Create a new {@link SeekableChannelContext} object for creating read channels via this provider.
*/
SeekableChannelContext makeContext();

/**
* Check if the given context is compatible with this provider. Useful to test if we can use provided
* {@code context} object for creating channels with this provider.
*/
boolean isCompatibleWith(@NotNull SeekableChannelContext channelContext);

default SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channelContext, @NotNull String uriStr)
throws IOException {
return getReadChannel(channelContext, convertToURI(uriStr));
}

SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channelContext, @NotNull URI uri)
throws IOException;

default SeekableByteChannel getWriteChannel(@NotNull final String path, final boolean append) throws IOException {
return getWriteChannel(Paths.get(path), append);
}

SeekableByteChannel getWriteChannel(@NotNull Path path, boolean append) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.util.channel;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.ServiceLoader;

/**
* A service loader class for loading {@link SeekableChannelsProviderPlugin} implementations at runtime and provide
* {@link SeekableChannelsProvider} implementations for different URI schemes, e.g., S3.
*/
public final class SeekableChannelsProviderLoader {

private static volatile SeekableChannelsProviderLoader instance;

public static SeekableChannelsProviderLoader getInstance() {
if (instance == null) {
instance = new SeekableChannelsProviderLoader();
}
return instance;
}

private final List<SeekableChannelsProviderPlugin> providers;

private SeekableChannelsProviderLoader() {
providers = new ArrayList<>();
// Load the plugins
for (final SeekableChannelsProviderPlugin plugin : ServiceLoader.load(SeekableChannelsProviderPlugin.class)) {
providers.add(plugin);
}
}

/**
* Create a new {@link SeekableChannelsProvider} based on given URI and object using the plugins loaded by the
* {@link ServiceLoader}. For example, for a "S3" URI, we will create a {@link SeekableChannelsProvider} which can
* read files from S3.
*
* @param uri The URI
* @param object An optional object to pass to the {@link SeekableChannelsProviderPlugin} implementations.
* @return A {@link SeekableChannelsProvider} for the given URI.
*/
public SeekableChannelsProvider fromServiceLoader(@NotNull final URI uri, @Nullable final Object object) {
for (final SeekableChannelsProviderPlugin plugin : providers) {
if (plugin.isCompatible(uri, object)) {
return plugin.createProvider(uri, object);
}
}
throw new UnsupportedOperationException("No plugin found for uri: " + uri);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.util.channel;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.net.URI;

/**
* A plugin interface for providing {@link SeekableChannelsProvider} implementations for different URI schemes, e.g. S3.
* Check out {@link SeekableChannelsProviderLoader} for more details.
*/
public interface SeekableChannelsProviderPlugin {
/**
* Check if this plugin is compatible with the given URI and config object.
*/
boolean isCompatible(@NotNull URI uri, @Nullable Object config);

/**
* Create a {@link SeekableChannelsProvider} for the given URI and config object.
*/
SeekableChannelsProvider createProvider(@NotNull URI uri, @Nullable Object object);
}
Loading

0 comments on commit db04b57

Please sign in to comment.