Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support to read single parquet file hosted in AWS S3 #4972

Merged
merged 41 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
9723094
Initial commit
malhotrashivam Dec 6, 2023
2fe31fb
Moved to AWS SDK V2
malhotrashivam Dec 12, 2023
c69f875
Added support for S3 nio AWS channel
malhotrashivam Dec 14, 2023
4b98551
Moved s3 nio files to local repo
malhotrashivam Dec 18, 2023
6d72723
Made a number of improvements to s3 nio channel files
malhotrashivam Dec 18, 2023
cca5051
Moved to a context based cache
malhotrashivam Dec 26, 2023
b5e2c96
Added s3 specific parquet instructions
malhotrashivam Dec 27, 2023
2cb1c3f
Moved to URIs from files for reading single parquet file
malhotrashivam Dec 28, 2023
af10a1a
Added s3 specific parquet instructions
malhotrashivam Dec 29, 2023
edbd29a
Improved comments and Javadocs
malhotrashivam Dec 28, 2023
404af99
Resolving Devin's comments
malhotrashivam Jan 8, 2024
9aa7549
Code review with Devin part 2
malhotrashivam Jan 9, 2024
ae87404
Merge branch 'main' into sm-s3-pq
malhotrashivam Jan 9, 2024
b85043e
Integrated with Ryan's patch
malhotrashivam Jan 9, 2024
0e5ff03
Resolving more comments
malhotrashivam Jan 10, 2024
98b6f31
Moved to a service loader style pattern for channel providers
malhotrashivam Jan 10, 2024
d7fc6a1
Minor fixes in gradle files
malhotrashivam Jan 10, 2024
aefa5ef
Resolved more comments
malhotrashivam Jan 11, 2024
82a4ef3
Resolving Ryan's comments
malhotrashivam Jan 12, 2024
11acf6e
Fixed one test failure
malhotrashivam Jan 12, 2024
ae5a371
Resolving Devin's comments
malhotrashivam Jan 12, 2024
2dc360e
Cosmetic cleanups
malhotrashivam Jan 12, 2024
2840ec5
Resolving Ryan's comments part 2
malhotrashivam Jan 13, 2024
27e1638
Moved ChannelContext outside of SeekableChannelsProvider
malhotrashivam Jan 13, 2024
3b7521e
Resolving more comments
malhotrashivam Jan 15, 2024
36cf3cb
Resolved python comments and updated corresponding descriptions in ja…
malhotrashivam Jan 19, 2024
ecb35a8
Resolved more python comments
malhotrashivam Jan 19, 2024
36db89c
Resolving Ryan's comments
malhotrashivam Jan 19, 2024
b191ae5
Resolving more comments
malhotrashivam Jan 22, 2024
c30fcfa
Moved files from io.deephaven.parquet.base.util to io.deephaven.util.…
malhotrashivam Jan 23, 2024
6bc9f54
Resolving more comments
malhotrashivam Jan 23, 2024
10172e3
Added buffer pool support and TODOs for future projects
malhotrashivam Jan 23, 2024
7264be1
Added a new module Util/channel
malhotrashivam Jan 23, 2024
e28b993
Minor fix
malhotrashivam Jan 23, 2024
3b3dbbd
Resolved more comments
malhotrashivam Jan 25, 2024
cf2b130
Added support to pass AWS credentials
malhotrashivam Jan 26, 2024
dfb2e4c
Merge branch 'main' into sm-s3-pq
malhotrashivam Jan 26, 2024
661424a
Improved comments
malhotrashivam Jan 26, 2024
5695f2a
Renamed static credentials to basic credentials
malhotrashivam Jan 26, 2024
9847b59
Renamed AwsCredentialsImpl to AwsSdkV2Credentials
malhotrashivam Jan 26, 2024
0750e56
Minor refactoring around python comments
malhotrashivam Jan 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions Util/channel/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
plugins {
id 'java-library'
id 'io.deephaven.project.register'
}

dependencies {
implementation project(':Base')

// Needed for SafeCloseable
implementation project(':Util')

compileOnly depAnnotations

Classpaths.inheritJUnitPlatform(project)
Classpaths.inheritAssertJ(project)
testImplementation 'org.junit.jupiter:junit-jupiter'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
}

test {
useJUnitPlatform()
}
1 change: 1 addition & 0 deletions Util/channel/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.deephaven.project.ProjectType=JAVA_PUBLIC
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
/**
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.parquet.base.util;
package io.deephaven.util.channel;

import io.deephaven.base.RAPriQueue;
import io.deephaven.base.verify.Assert;
import io.deephaven.base.verify.Require;
import io.deephaven.hash.KeyedObjectHashMap;
import io.deephaven.hash.KeyedObjectKey;
import io.deephaven.util.annotations.FinalDefault;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
Expand All @@ -22,6 +24,15 @@
*/
public class CachedChannelProvider implements SeekableChannelsProvider {

public interface ContextHolder {
void setContext(SeekableChannelContext channelContext);

@FinalDefault
default void clearContext() {
setContext(null);
}
}

private final SeekableChannelsProvider wrappedProvider;
private final int maximumPooledCount;

Expand Down Expand Up @@ -52,13 +63,27 @@ public CachedChannelProvider(@NotNull final SeekableChannelsProvider wrappedProv
}

@Override
public SeekableByteChannel getReadChannel(@NotNull final Path path) throws IOException {
final String pathKey = path.toAbsolutePath().toString();
public SeekableChannelContext makeContext() {
return wrappedProvider.makeContext();
}

@Override
public boolean isCompatibleWith(@NotNull final SeekableChannelContext channelContext) {
return wrappedProvider.isCompatibleWith(channelContext);
}

@Override
public SeekableByteChannel getReadChannel(@NotNull final SeekableChannelContext channelContext,
@NotNull final URI uri)
throws IOException {
final String uriString = uri.toString();
final KeyedObjectHashMap<String, PerPathPool> channelPool = channelPools.get(ChannelType.Read);
final CachedChannel result = tryGetPooledChannel(pathKey, channelPool);
return result == null
? new CachedChannel(wrappedProvider.getReadChannel(path), ChannelType.Read, pathKey)
final CachedChannel result = tryGetPooledChannel(uriString, channelPool);
final CachedChannel channel = result == null
? new CachedChannel(wrappedProvider.getReadChannel(channelContext, uri), ChannelType.Read, uriString)
: result.position(0);
channel.setContext(channelContext);
return channel;
}

@Override
Expand Down Expand Up @@ -125,10 +150,15 @@ private long advanceClock() {
return logicalClock = 1;
}

@Override
public void close() {
wrappedProvider.close();
}

/**
* {@link SeekableByteChannel Channel} wrapper for pooled usage.
*/
private class CachedChannel implements SeekableByteChannel {
private class CachedChannel implements SeekableByteChannel, ContextHolder {

private final SeekableByteChannel wrappedChannel;
private final ChannelType channelType;
Expand Down Expand Up @@ -163,7 +193,7 @@ public long position() throws IOException {
}

@Override
public SeekableByteChannel position(final long newPosition) throws IOException {
public CachedChannel position(final long newPosition) throws IOException {
Require.eqTrue(isOpen, "isOpen");
wrappedChannel.position(newPosition);
return this;
Expand Down Expand Up @@ -196,12 +226,20 @@ public boolean isOpen() {
public void close() throws IOException {
Require.eqTrue(isOpen, "isOpen");
isOpen = false;
clearContext();
returnPoolableChannel(this);
}

private void dispose() throws IOException {
wrappedChannel.close();
}

@Override
public final void setContext(@Nullable final SeekableChannelContext channelContext) {
if (wrappedChannel instanceof ContextHolder) {
((ContextHolder) wrappedChannel).setContext(channelContext);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
/**
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.parquet.base.util;
package io.deephaven.util.channel;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.IOException;
import java.net.URI;
import java.nio.channels.FileChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
Expand All @@ -14,8 +16,23 @@
public class LocalFSChannelProvider implements SeekableChannelsProvider {

@Override
public SeekableByteChannel getReadChannel(@NotNull final Path path) throws IOException {
return FileChannel.open(path, StandardOpenOption.READ);
public SeekableChannelContext makeContext() {
// No additional context required for local FS
return SeekableChannelContext.NULL;
}

@Override
public boolean isCompatibleWith(@Nullable final SeekableChannelContext channelContext) {
// Context is not used, hence always compatible
return true;
}

@Override
public SeekableByteChannel getReadChannel(@Nullable final SeekableChannelContext channelContext,
@NotNull final URI uri)
throws IOException {
// context is unused here
return FileChannel.open(Path.of(uri), StandardOpenOption.READ);
}

@Override
Expand All @@ -31,4 +48,7 @@ public SeekableByteChannel getWriteChannel(@NotNull final Path filePath, final b
}
return result;
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.deephaven.util.channel;

import io.deephaven.util.SafeCloseable;

/**
* Context object for reading and writing to channels created by {@link SeekableChannelsProvider}.
*/
public interface SeekableChannelContext extends SafeCloseable {

SeekableChannelContext NULL = new SeekableChannelContext() {};

/**
* Release any resources associated with this context. The context should not be used afterward.
*/
default void close() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.util.channel;

import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.NotNull;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.nio.file.Paths;

public interface SeekableChannelsProvider extends SafeCloseable {

/**
* Take the file source path or URI and convert it to a URI object.
*
* @param source The file source path or URI
* @return The URI object
*/
static URI convertToURI(final String source) {
final URI uri;
try {
uri = new URI(source);
} catch (final URISyntaxException e) {
// If the URI is invalid, assume it's a file path
return new File(source).toURI();
}
if (uri.getScheme() == null) {
// Need to convert to a "file" URI
return new File(source).toURI();
}
return uri;
}

/**
* Create a new {@link SeekableChannelContext} object for creating read channels via this provider.
*/
SeekableChannelContext makeContext();

/**
* Check if the given context is compatible with this provider. Useful to test if we can use provided
* {@code context} object for creating channels with this provider.
*/
boolean isCompatibleWith(@NotNull SeekableChannelContext channelContext);

default SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channelContext, @NotNull String uriStr)
throws IOException {
return getReadChannel(channelContext, convertToURI(uriStr));
}

SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channelContext, @NotNull URI uri)
throws IOException;

default SeekableByteChannel getWriteChannel(@NotNull final String path, final boolean append) throws IOException {
return getWriteChannel(Paths.get(path), append);
}

SeekableByteChannel getWriteChannel(@NotNull Path path, boolean append) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.util.channel;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.ServiceLoader;

/**
* A service loader class for loading {@link SeekableChannelsProviderPlugin} implementations at runtime and provide
* {@link SeekableChannelsProvider} implementations for different URI schemes, e.g., S3.
*/
public final class SeekableChannelsProviderLoader {

private static volatile SeekableChannelsProviderLoader instance;

public static SeekableChannelsProviderLoader getInstance() {
if (instance == null) {
instance = new SeekableChannelsProviderLoader();
}
return instance;
}

private final List<SeekableChannelsProviderPlugin> providers;

private SeekableChannelsProviderLoader() {
providers = new ArrayList<>();
// Load the plugins
for (final SeekableChannelsProviderPlugin plugin : ServiceLoader.load(SeekableChannelsProviderPlugin.class)) {
providers.add(plugin);
}
}

/**
* Create a new {@link SeekableChannelsProvider} based on given URI and object using the plugins loaded by the
* {@link ServiceLoader}. For example, for a "S3" URI, we will create a {@link SeekableChannelsProvider} which can
* read files from S3.
*
* @param uri The URI
* @param object An optional object to pass to the {@link SeekableChannelsProviderPlugin} implementations.
* @return A {@link SeekableChannelsProvider} for the given URI.
*/
public SeekableChannelsProvider fromServiceLoader(@NotNull final URI uri, @Nullable final Object object) {
for (final SeekableChannelsProviderPlugin plugin : providers) {
if (plugin.isCompatible(uri, object)) {
return plugin.createProvider(uri, object);
}
}
throw new UnsupportedOperationException("No plugin found for uri: " + uri);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.util.channel;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.net.URI;

/**
* A plugin interface for providing {@link SeekableChannelsProvider} implementations for different URI schemes, e.g. S3.
* Check out {@link SeekableChannelsProviderLoader} for more details.
*/
public interface SeekableChannelsProviderPlugin {
/**
* Check if this plugin is compatible with the given URI and config object.
*/
boolean isCompatible(@NotNull URI uri, @Nullable Object config);

/**
* Create a {@link SeekableChannelsProvider} for the given URI and config object.
*/
SeekableChannelsProvider createProvider(@NotNull URI uri, @Nullable Object object);
}
Loading
Loading