diff --git a/Base/src/main/java/io/deephaven/base/FileUtils.java b/Base/src/main/java/io/deephaven/base/FileUtils.java index c0196aba58a..da11eef1289 100644 --- a/Base/src/main/java/io/deephaven/base/FileUtils.java +++ b/Base/src/main/java/io/deephaven/base/FileUtils.java @@ -3,7 +3,6 @@ // package io.deephaven.base; -import io.deephaven.base.verify.Assert; import io.deephaven.base.verify.Require; import org.jetbrains.annotations.Nullable; diff --git a/Util/channel/src/main/java/io/deephaven/util/channel/CachedChannelProvider.java b/Util/channel/src/main/java/io/deephaven/util/channel/CachedChannelProvider.java index 2927ce5b706..bb4f5eecab4 100644 --- a/Util/channel/src/main/java/io/deephaven/util/channel/CachedChannelProvider.java +++ b/Util/channel/src/main/java/io/deephaven/util/channel/CachedChannelProvider.java @@ -58,7 +58,15 @@ enum ChannelType { private final RAPriQueue releasePriority = new RAPriQueue<>(8, PerPathPool.RAPQ_ADAPTER, PerPathPool.class); - public CachedChannelProvider(@NotNull final SeekableChannelsProvider wrappedProvider, + public static CachedChannelProvider create(@NotNull final SeekableChannelsProvider wrappedProvider, + final int maximumPooledCount) { + if (wrappedProvider instanceof CachedChannelProvider) { + throw new IllegalArgumentException("Cannot wrap a CachedChannelProvider in another CachedChannelProvider"); + } + return new CachedChannelProvider(wrappedProvider, maximumPooledCount); + } + + private CachedChannelProvider(@NotNull final SeekableChannelsProvider wrappedProvider, final int maximumPooledCount) { this.wrappedProvider = wrappedProvider; this.maximumPooledCount = Require.gtZero(maximumPooledCount, "maximumPooledCount"); diff --git a/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProviderLoader.java b/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProviderLoader.java index 343bf3f7683..98ab4d8c583 100644 --- a/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProviderLoader.java +++ b/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProviderLoader.java @@ -37,18 +37,19 @@ private SeekableChannelsProviderLoader() { } /** - * Create a new {@link SeekableChannelsProvider} based on given URI and object using the plugins loaded by the - * {@link ServiceLoader}. For example, for a "S3" URI, we will create a {@link SeekableChannelsProvider} which can - * read files from S3. + * Create a new {@link SeekableChannelsProvider} compatible for reading from and writing to the given URI, using the + * plugins loaded by the {@link ServiceLoader}. For example, for a "S3" URI, we will create a + * {@link SeekableChannelsProvider} which can read files from S3. * * @param uri The URI - * @param object An optional object to pass to the {@link SeekableChannelsProviderPlugin} implementations. + * @param specialInstructions An optional object to pass special instructions to the provider. * @return A {@link SeekableChannelsProvider} for the given URI. */ - public SeekableChannelsProvider fromServiceLoader(@NotNull final URI uri, @Nullable final Object object) { + public SeekableChannelsProvider fromServiceLoader(@NotNull final URI uri, + @Nullable final Object specialInstructions) { for (final SeekableChannelsProviderPlugin plugin : providers) { - if (plugin.isCompatible(uri, object)) { - return plugin.createProvider(uri, object); + if (plugin.isCompatible(uri, specialInstructions)) { + return plugin.createProvider(uri, specialInstructions); } } throw new UnsupportedOperationException("No plugin found for uri: " + uri); diff --git a/Util/channel/src/test/java/io/deephaven/util/channel/CachedChannelProviderTest.java b/Util/channel/src/test/java/io/deephaven/util/channel/CachedChannelProviderTest.java index 4eed02d427e..1bd3339df60 100644 --- a/Util/channel/src/test/java/io/deephaven/util/channel/CachedChannelProviderTest.java +++ b/Util/channel/src/test/java/io/deephaven/util/channel/CachedChannelProviderTest.java @@ -20,6 +20,7 @@ import java.util.function.Supplier; import java.util.stream.Stream; +import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; @@ -32,7 +33,7 @@ public class CachedChannelProviderTest { @Test public void testSimpleRead() throws IOException { final SeekableChannelsProvider wrappedProvider = new TestChannelProvider(); - final CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100); + final CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100); for (int ii = 0; ii < 100; ++ii) { final SeekableByteChannel[] sameFile = new SeekableByteChannel[10]; for (int jj = 0; jj < sameFile.length; ++jj) { @@ -55,7 +56,7 @@ public void testSimpleRead() throws IOException { @Test public void testSimpleReadWrite() throws IOException { SeekableChannelsProvider wrappedProvider = new TestChannelProvider(); - CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100); + CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100); for (int i = 0; i < 1000; i++) { SeekableByteChannel rc = ((i / 100) % 2 == 0 ? cachedChannelProvider.getReadChannel(wrappedProvider.makeContext(), "r" + i) @@ -69,7 +70,7 @@ public void testSimpleReadWrite() throws IOException { @Test public void testSimpleWrite() throws IOException { SeekableChannelsProvider wrappedProvider = new TestChannelProvider(); - CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100); + CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100); for (int i = 0; i < 1000; i++) { SeekableByteChannel rc = cachedChannelProvider.getWriteChannel("w" + i, false); // Call write to hit the assertions inside the mock channel @@ -86,7 +87,7 @@ public void testSimpleWrite() throws IOException { @Test public void testSimpleAppend() throws IOException { SeekableChannelsProvider wrappedProvider = new TestChannelProvider(); - CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100); + CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100); for (int i = 0; i < 1000; i++) { SeekableByteChannel rc = cachedChannelProvider.getWriteChannel("a" + i, true); rc.close(); @@ -100,7 +101,7 @@ public void testSimpleAppend() throws IOException { @Test public void testCloseOrder() throws IOException { SeekableChannelsProvider wrappedProvider = new TestChannelProvider(); - CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100); + CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100); for (int i = 0; i < 20; i++) { List channels = new ArrayList<>(); for (int j = 0; j < 50; j++) { @@ -121,7 +122,7 @@ public void testCloseOrder() throws IOException { @Test public void testReuse() throws IOException { final SeekableChannelsProvider wrappedProvider = new TestChannelProvider(); - final CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 50); + final CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 50); final SeekableByteChannel[] someResult = new SeekableByteChannel[50]; final ByteBuffer buffer = ByteBuffer.allocate(1); for (int ci = 0; ci < someResult.length; ++ci) { @@ -149,7 +150,7 @@ public void testReuse() throws IOException { @Test public void testReuse10() throws IOException { final SeekableChannelsProvider wrappedProvider = new TestChannelProvider(); - final CachedChannelProvider cachedChannelProvider = new CachedChannelProvider(wrappedProvider, 100); + final CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100); final SeekableByteChannel[] someResult = new SeekableByteChannel[100]; for (int pi = 0; pi < 10; ++pi) { for (int ci = 0; ci < 10; ++ci) { @@ -173,6 +174,17 @@ public void testReuse10() throws IOException { assertEquals(0, closed.size()); } + @Test + void testRewrapCachedChannelProvider() { + final SeekableChannelsProvider wrappedProvider = new TestChannelProvider(); + final CachedChannelProvider cachedChannelProvider = CachedChannelProvider.create(wrappedProvider, 100); + try { + CachedChannelProvider.create(cachedChannelProvider, 100); + fail("Expected IllegalArgumentException on rewrapping CachedChannelProvider"); + } catch (final IllegalArgumentException expected) { + } + } + private class TestChannelProvider implements SeekableChannelsProvider { diff --git a/Util/src/main/java/io/deephaven/util/thread/ThreadHelpers.java b/Util/src/main/java/io/deephaven/util/thread/ThreadHelpers.java new file mode 100644 index 00000000000..e9e2f9af791 --- /dev/null +++ b/Util/src/main/java/io/deephaven/util/thread/ThreadHelpers.java @@ -0,0 +1,26 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.util.thread; + +import io.deephaven.configuration.Configuration; + +public class ThreadHelpers { + /** + * Get the number of threads to use for a given configuration key, defaulting to the number of available processors + * if the configuration key is set to a non-positive value, or the configuration key is not set and the provided + * default is non-positive. + * + * @param configKey The configuration key to look up + * @param defaultValue The default value to use if the configuration key is not set + * @return The number of threads to use + */ + public static int getOrComputeThreadCountProperty(final String configKey, final int defaultValue) { + final int numThreads = Configuration.getInstance().getIntegerWithDefault(configKey, defaultValue); + if (numThreads <= 0) { + return Runtime.getRuntime().availableProcessors(); + } else { + return numThreads; + } + } +} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java index 43498b1029c..6ac54c0f980 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java @@ -4,7 +4,6 @@ package io.deephaven.engine.table.impl; import io.deephaven.chunk.util.pools.MultiChunkPool; -import io.deephaven.configuration.Configuration; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.updategraph.OperationInitializer; import io.deephaven.util.thread.NamingThreadFactory; @@ -17,6 +16,8 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import static io.deephaven.util.thread.ThreadHelpers.getOrComputeThreadCountProperty; + /** * Implementation of OperationInitializer that delegates to a pool of threads. */ @@ -25,17 +26,8 @@ public class OperationInitializationThreadPool implements OperationInitializer { /** * The number of threads that will be used for parallel initialization in this process */ - public static final int NUM_THREADS; - - static { - final int numThreads = - Configuration.getInstance().getIntegerWithDefault("OperationInitializationThreadPool.threads", -1); - if (numThreads <= 0) { - NUM_THREADS = Runtime.getRuntime().availableProcessors(); - } else { - NUM_THREADS = numThreads; - } - } + private static final int NUM_THREADS = + getOrComputeThreadCountProperty("OperationInitializationThreadPool.threads", -1); private final ThreadLocal isInitializationThread = ThreadLocal.withInitial(() -> false); private final ThreadPoolExecutor executorService; diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableAggregationTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableAggregationTest.java index 0456fb78dcc..edfa267f894 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableAggregationTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableAggregationTest.java @@ -3890,7 +3890,7 @@ public void testMultiPartitionSymbolTableBy() throws IOException { t4.updateView("Date=`2021-07-21`", "Num=400")).moveColumnsUp("Date", "Num"); final Table loaded = ParquetTools.readPartitionedTableInferSchema( - new ParquetKeyValuePartitionedLayout(testRootFile, 2, ParquetInstructions.EMPTY), + new ParquetKeyValuePartitionedLayout(testRootFile.toURI(), 2, ParquetInstructions.EMPTY), ParquetInstructions.EMPTY); // verify the sources are identical diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java index 3b423e9cff4..1cca18b8b4b 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java @@ -3,21 +3,19 @@ // package io.deephaven.parquet.base; -import io.deephaven.UncheckedDeephavenException; import io.deephaven.util.channel.CachedChannelProvider; import io.deephaven.util.channel.SeekableChannelContext; import io.deephaven.util.channel.SeekableChannelsProvider; -import io.deephaven.util.channel.SeekableChannelsProviderLoader; import org.apache.parquet.format.*; import org.apache.parquet.format.ColumnOrder; import org.apache.parquet.format.Type; import org.apache.parquet.schema.*; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.io.UncheckedIOException; import java.net.URI; import java.nio.channels.SeekableByteChannel; import java.util.*; @@ -44,94 +42,50 @@ public class ParquetFileReader { /** * Make a {@link ParquetFileReader} for the supplied {@link File}. Wraps {@link IOException} as - * {@link UncheckedDeephavenException}. + * {@link UncheckedIOException}. * * @param parquetFile The parquet file or the parquet metadata file - * @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating - * channels + * @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file * @return The new {@link ParquetFileReader} */ public static ParquetFileReader create( @NotNull final File parquetFile, - @Nullable final Object specialInstructions) { + @NotNull final SeekableChannelsProvider channelsProvider) { try { - return createChecked(parquetFile, specialInstructions); - } catch (IOException e) { - throw new UncheckedDeephavenException("Failed to create Parquet file reader: " + parquetFile, e); + return new ParquetFileReader(convertToURI(parquetFile, false), channelsProvider); + } catch (final IOException e) { + throw new UncheckedIOException("Failed to create Parquet file reader: " + parquetFile, e); } } /** * Make a {@link ParquetFileReader} for the supplied {@link URI}. Wraps {@link IOException} as - * {@link UncheckedDeephavenException}. + * {@link UncheckedIOException}. * * @param parquetFileURI The URI for the parquet file or the parquet metadata file - * @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating - * channels + * @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file * @return The new {@link ParquetFileReader} */ public static ParquetFileReader create( @NotNull final URI parquetFileURI, - @Nullable final Object specialInstructions) { + @NotNull final SeekableChannelsProvider channelsProvider) { try { - return createChecked(parquetFileURI, specialInstructions); - } catch (IOException e) { - throw new UncheckedDeephavenException("Failed to create Parquet file reader: " + parquetFileURI, e); + return new ParquetFileReader(parquetFileURI, channelsProvider); + } catch (final IOException e) { + throw new UncheckedIOException("Failed to create Parquet file reader: " + parquetFileURI, e); } } - /** - * Make a {@link ParquetFileReader} for the supplied {@link File}. - * - * @param parquetFile The parquet file or the parquet metadata file - * @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating - * channels - * @return The new {@link ParquetFileReader} - * @throws IOException if an IO exception occurs - */ - public static ParquetFileReader createChecked( - @NotNull final File parquetFile, - @Nullable final Object specialInstructions) throws IOException { - return createChecked(convertToURI(parquetFile, false), specialInstructions); - } - - /** - * Make a {@link ParquetFileReader} for the supplied {@link URI}. - * - * @param parquetFileURI The URI for the parquet file or the parquet metadata file - * @param specialInstructions Optional read instructions to pass to {@link SeekableChannelsProvider} while creating - * channels - * @return The new {@link ParquetFileReader} - * @throws IOException if an IO exception occurs - */ - public static ParquetFileReader createChecked( - @NotNull final URI parquetFileURI, - @Nullable final Object specialInstructions) throws IOException { - final SeekableChannelsProvider provider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader( - parquetFileURI, specialInstructions); - return new ParquetFileReader(parquetFileURI, new CachedChannelProvider(provider, 1 << 7)); - } - - /** - * Create a new ParquetFileReader for the provided source. - * - * @param source The source path or URI for the parquet file or the parquet metadata file - * @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file - */ - public ParquetFileReader(final String source, final SeekableChannelsProvider channelsProvider) - throws IOException { - this(convertToURI(source, false), channelsProvider); - } - /** * Create a new ParquetFileReader for the provided source. * * @param parquetFileURI The URI for the parquet file or the parquet metadata file - * @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file + * @param provider The {@link SeekableChannelsProvider} to use for reading the file */ - public ParquetFileReader(final URI parquetFileURI, final SeekableChannelsProvider channelsProvider) - throws IOException { - this.channelsProvider = channelsProvider; + private ParquetFileReader( + @NotNull final URI parquetFileURI, + @NotNull final SeekableChannelsProvider provider) throws IOException { + this.channelsProvider = CachedChannelProvider.create(provider, 1 << 7); if (!parquetFileURI.getRawPath().endsWith(".parquet") && FILE_URI_SCHEME.equals(parquetFileURI.getScheme())) { // Construct a new file URI for the parent directory rootURI = convertToURI(new File(parquetFileURI).getParentFile(), true); @@ -270,7 +224,7 @@ private Set calculateColumnsWithDictionaryUsedOnEveryDataPage() { /** * Create a {@link RowGroupReader} object for provided row group number - * + * * @param version The "version" string from deephaven specific parquet metadata, or null if it's not present. */ public RowGroupReader getRowGroup(final int groupNumber, final String version) { @@ -506,7 +460,7 @@ private static LogicalTypeAnnotation getLogicalTypeAnnotation(final ConvertedTyp /** * Helper method to determine if a logical type is adjusted to UTC. - * + * * @param logicalType the logical type to check * @return true if the logical type is a timestamp adjusted to UTC, false otherwise */ diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java index f67e11bd650..e7e773e1bfe 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java @@ -678,13 +678,13 @@ public static class Builder { private TableDefinition tableDefinition; private Collection> indexColumns; - public Builder() {} - /** - * Creates a new {@link ParquetInstructions} object by only copying the column name to instructions mapping and - * parquet column name to instructions mapping from the given {@link ParquetInstructions} object. For copying - * all properties, use something like {@link ParquetInstructions#withTableDefinition}. + * For each additional field added, make sure to update the copy constructor builder + * {@link #Builder(ParquetInstructions)} */ + + public Builder() {} + public Builder(final ParquetInstructions parquetInstructions) { if (parquetInstructions == EMPTY) { return; @@ -692,6 +692,18 @@ public Builder(final ParquetInstructions parquetInstructions) { final ReadOnly readOnlyParquetInstructions = (ReadOnly) parquetInstructions; columnNameToInstructions = readOnlyParquetInstructions.copyColumnNameToInstructions(); parquetColumnNameToInstructions = readOnlyParquetInstructions.copyParquetColumnNameToInstructions(); + compressionCodecName = readOnlyParquetInstructions.getCompressionCodecName(); + maximumDictionaryKeys = readOnlyParquetInstructions.getMaximumDictionaryKeys(); + maximumDictionarySize = readOnlyParquetInstructions.getMaximumDictionarySize(); + isLegacyParquet = readOnlyParquetInstructions.isLegacyParquet(); + targetPageSize = readOnlyParquetInstructions.getTargetPageSize(); + isRefreshing = readOnlyParquetInstructions.isRefreshing(); + specialInstructions = readOnlyParquetInstructions.getSpecialInstructions(); + generateMetadataFiles = readOnlyParquetInstructions.generateMetadataFiles(); + baseNameForPartitionedParquetData = readOnlyParquetInstructions.baseNameForPartitionedParquetData(); + fileLayout = readOnlyParquetInstructions.getFileLayout().orElse(null); + tableDefinition = readOnlyParquetInstructions.getTableDefinition().orElse(null); + indexColumns = readOnlyParquetInstructions.getIndexColumns().orElse(null); } private void newColumnNameToInstructionsMap() { diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java index a8e599b04e0..466b7ee1096 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java @@ -34,7 +34,6 @@ import org.apache.parquet.schema.PrimitiveType; import org.jetbrains.annotations.NotNull; -import java.io.File; import java.io.IOException; import java.math.BigInteger; import java.time.Instant; @@ -95,30 +94,6 @@ void reset() { } } - /** - * Obtain schema information from a parquet file - * - * @param filePath Location for input parquet file - * @param readInstructions Parquet read instructions specifying transformations like column mappings and codecs. - * Note a new read instructions based on this one may be returned by this method to provide necessary - * transformations, eg, replacing unsupported characters like ' ' (space) in column names. - * @param consumer A ColumnDefinitionConsumer whose accept method would be called for each column in the file - * @return Parquet read instructions, either the ones supplied or a new object based on the supplied with necessary - * transformations added. - */ - public static ParquetInstructions readParquetSchema( - @NotNull final String filePath, - @NotNull final ParquetInstructions readInstructions, - @NotNull final ColumnDefinitionConsumer consumer, - @NotNull final BiFunction, String> legalizeColumnNameFunc) throws IOException { - final ParquetFileReader parquetFileReader = - ParquetFileReader.createChecked(new File(filePath), readInstructions.getSpecialInstructions()); - final ParquetMetadata parquetMetadata = - new ParquetMetadataConverter().fromParquetMetadata(parquetFileReader.fileMetaData); - return readParquetSchema(parquetFileReader.getSchema(), parquetMetadata.getFileMetaData().getKeyValueMetaData(), - readInstructions, consumer, legalizeColumnNameFunc); - } - public static Optional parseMetadata(@NotNull final Map keyValueMetadata) { final String tableInfoRaw = keyValueMetadata.get(METADATA_KEY); if (tableInfoRaw == null) { diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java index 9b56ae15bb8..a5adb0915c5 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java @@ -41,7 +41,6 @@ import io.deephaven.parquet.table.layout.ParquetFlatPartitionedLayout; import io.deephaven.parquet.table.layout.ParquetKeyValuePartitionedLayout; import io.deephaven.parquet.table.layout.ParquetMetadataFileLayout; -import io.deephaven.parquet.table.layout.ParquetSingleFileLayout; import io.deephaven.parquet.table.location.ParquetTableLocationFactory; import io.deephaven.parquet.table.location.ParquetTableLocationKey; import io.deephaven.parquet.table.ParquetInstructions.ParquetFileLayout; @@ -103,7 +102,6 @@ private ParquetTools() {} * * @param source The path or URI of file or directory to examine * @return table - * @see ParquetSingleFileLayout * @see ParquetMetadataFileLayout * @see ParquetKeyValuePartitionedLayout * @see ParquetFlatPartitionedLayout @@ -127,7 +125,6 @@ public static Table readTable(@NotNull final String source) { * @param source The path or URI of file or directory to examine * @param readInstructions Instructions for customizations while reading * @return table - * @see ParquetSingleFileLayout * @see ParquetMetadataFileLayout * @see ParquetKeyValuePartitionedLayout * @see ParquetFlatPartitionedLayout @@ -174,7 +171,6 @@ public static Table readTable( * * @param sourceFile The file or directory to examine * @return table - * @see ParquetSingleFileLayout * @see ParquetMetadataFileLayout * @see ParquetKeyValuePartitionedLayout * @see ParquetFlatPartitionedLayout @@ -204,7 +200,6 @@ public static Table readTable(@NotNull final File sourceFile) { * @param sourceFile The file or directory to examine * @param readInstructions Instructions for customizations while reading * @return table - * @see ParquetSingleFileLayout * @see ParquetMetadataFileLayout * @see ParquetKeyValuePartitionedLayout * @see ParquetFlatPartitionedLayout @@ -1378,8 +1373,8 @@ public static Table readPartitionedTableInferSchema( } private static Pair infer( - KnownLocationKeyFinder inferenceKeys, - ParquetInstructions readInstructions) { + final KnownLocationKeyFinder inferenceKeys, + final ParquetInstructions readInstructions) { // TODO(deephaven-core#877): Support schema merge when discovering multiple parquet files final ParquetTableLocationKey lastKey = inferenceKeys.getLastKey().orElse(null); if (lastKey == null) { @@ -1661,14 +1656,13 @@ private static Table readSingleFileTable( @NotNull final URI parquetFileURI, @NotNull final ParquetInstructions readInstructions) { verifyFileLayout(readInstructions, ParquetFileLayout.SINGLE_FILE); + final ParquetTableLocationKey locationKey = + new ParquetTableLocationKey(parquetFileURI, 0, null, readInstructions); if (readInstructions.getTableDefinition().isPresent()) { - return readTable(new ParquetTableLocationKey(parquetFileURI, 0, null, readInstructions), - readInstructions); + return readTable(locationKey, readInstructions); } // Infer the table definition - final TableLocationKeyFinder singleFileLayout = - new ParquetSingleFileLayout(parquetFileURI, readInstructions); - final KnownLocationKeyFinder inferenceKeys = toKnownKeys(singleFileLayout); + final KnownLocationKeyFinder inferenceKeys = new KnownLocationKeyFinder<>(locationKey); final Pair inference = infer(inferenceKeys, readInstructions); final TableDefinition inferredTableDefinition = inference.getFirst(); final ParquetInstructions inferredInstructions = inference.getSecond(); @@ -1722,8 +1716,9 @@ public static Table readParquetSchemaAndTable( @NotNull final File source, @NotNull final ParquetInstructions readInstructionsIn, @Nullable final MutableObject mutableInstructionsOut) { + final URI sourceURI = convertToURI(source, false); final ParquetTableLocationKey tableLocationKey = - new ParquetTableLocationKey(source, 0, null, readInstructionsIn); + new ParquetTableLocationKey(sourceURI, 0, null, readInstructionsIn); final Pair>, ParquetInstructions> schemaInfo = ParquetSchemaReader.convertSchema( tableLocationKey.getFileReader().getSchema(), tableLocationKey.getMetadata().getFileMetaData().getKeyValueMetaData(), diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/DeephavenNestedPartitionLayout.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/DeephavenNestedPartitionLayout.java index 056312818c7..830e3e719cf 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/DeephavenNestedPartitionLayout.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/DeephavenNestedPartitionLayout.java @@ -9,6 +9,8 @@ import io.deephaven.parquet.table.ParquetInstructions; import io.deephaven.parquet.table.location.ParquetTableLocationKey; import io.deephaven.util.annotations.VisibleForTesting; +import io.deephaven.util.channel.SeekableChannelsProvider; +import io.deephaven.util.channel.SeekableChannelsProviderLoader; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -24,6 +26,10 @@ import java.util.function.Consumer; import java.util.function.Predicate; +import java.net.URI; + +import static io.deephaven.base.FileUtils.convertToURI; + /** * {@link TableLocationKeyFinder} that will traverse a directory hierarchy laid out in Deephaven's "nested-partitioned" * format, e.g. @@ -47,13 +53,16 @@ public static DeephavenNestedPartitionLayout forParquet @NotNull final String columnPartitionKey, @Nullable final Predicate internalPartitionValueFilter, @NotNull final ParquetInstructions readInstructions) { + final SeekableChannelsProvider channelsProvider = + SeekableChannelsProviderLoader.getInstance().fromServiceLoader(convertToURI(tableRootDirectory, true), + readInstructions.getSpecialInstructions()); return new DeephavenNestedPartitionLayout<>(tableRootDirectory, tableName, columnPartitionKey, internalPartitionValueFilter) { @Override protected ParquetTableLocationKey makeKey(@NotNull Path tableLeafDirectory, @NotNull Map> partitions) { - return new ParquetTableLocationKey(tableLeafDirectory.resolve(PARQUET_FILE_NAME).toFile(), 0, - partitions, readInstructions); + final URI fileURI = convertToURI(tableLeafDirectory.resolve(PARQUET_FILE_NAME), false); + return new ParquetTableLocationKey(fileURI, 0, partitions, readInstructions, channelsProvider); } }; } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetFlatPartitionedLayout.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetFlatPartitionedLayout.java index 31914804dc1..71f0068383b 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetFlatPartitionedLayout.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetFlatPartitionedLayout.java @@ -3,7 +3,6 @@ // package io.deephaven.parquet.table.layout; -import io.deephaven.base.FileUtils; import io.deephaven.engine.table.impl.locations.TableDataException; import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder; import io.deephaven.parquet.base.ParquetUtils; @@ -30,23 +29,10 @@ */ public final class ParquetFlatPartitionedLayout implements TableLocationKeyFinder { - private static ParquetTableLocationKey locationKey(@NotNull final URI uri, - @NotNull final ParquetInstructions readInstructions) { - return new ParquetTableLocationKey(uri, 0, null, readInstructions); - } - private final URI tableRootDirectory; private final Map cache; private final ParquetInstructions readInstructions; - - /** - * @param tableRootDirectory The directory to search for .parquet files. - * @param readInstructions the instructions for customizations while reading - */ - public ParquetFlatPartitionedLayout(@NotNull final File tableRootDirectory, - @NotNull final ParquetInstructions readInstructions) { - this(FileUtils.convertToURI(tableRootDirectory, true), readInstructions); - } + private final SeekableChannelsProvider channelsProvider; /** * @param tableRootDirectoryURI The directory URI to search for .parquet files. @@ -57,6 +43,8 @@ public ParquetFlatPartitionedLayout(@NotNull final URI tableRootDirectoryURI, this.tableRootDirectory = tableRootDirectoryURI; this.cache = Collections.synchronizedMap(new HashMap<>()); this.readInstructions = readInstructions; + this.channelsProvider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader(tableRootDirectory, + readInstructions.getSpecialInstructions()); } public String toString() { @@ -74,19 +62,14 @@ public void findKeys(@NotNull final Consumer locationKe } else { uriFilter = uri -> uri.getPath().endsWith(ParquetUtils.PARQUET_FILE_EXTENSION); } - try (final SeekableChannelsProvider provider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader( - tableRootDirectory, readInstructions.getSpecialInstructions()); - final Stream stream = provider.list(tableRootDirectory)) { + try (final Stream stream = channelsProvider.list(tableRootDirectory)) { stream.filter(uriFilter).forEach(uri -> { cache.compute(uri, (key, existingLocationKey) -> { if (existingLocationKey != null) { locationKeyObserver.accept(existingLocationKey); return existingLocationKey; } - final ParquetTableLocationKey newLocationKey = locationKey(uri, readInstructions); - if (!newLocationKey.verifyFileReader()) { - return null; - } + final ParquetTableLocationKey newLocationKey = locationKey(uri); locationKeyObserver.accept(newLocationKey); return newLocationKey; }); @@ -95,4 +78,8 @@ public void findKeys(@NotNull final Consumer locationKe throw new TableDataException("Error finding parquet locations under " + tableRootDirectory, e); } } + + private ParquetTableLocationKey locationKey(@NotNull final URI uri) { + return new ParquetTableLocationKey(uri, 0, null, readInstructions, channelsProvider); + } } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetKeyValuePartitionedLayout.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetKeyValuePartitionedLayout.java index 4836a176066..53a057c0c95 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetKeyValuePartitionedLayout.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetKeyValuePartitionedLayout.java @@ -19,7 +19,6 @@ import io.deephaven.util.channel.SeekableChannelsProviderLoader; import org.jetbrains.annotations.NotNull; -import java.io.File; import java.io.IOException; import java.net.URI; import java.nio.file.Path; @@ -48,42 +47,50 @@ public class ParquetKeyValuePartitionedLayout extends URIStreamKeyValuePartitionLayout implements TableLocationKeyFinder { - private final ParquetInstructions readInstructions; + private final SeekableChannelsProvider channelsProvider; public ParquetKeyValuePartitionedLayout( - @NotNull final File tableRootDirectory, + @NotNull final URI tableRootDirectory, @NotNull final TableDefinition tableDefinition, @NotNull final ParquetInstructions readInstructions) { - this(convertToURI(tableRootDirectory, true), tableDefinition, readInstructions); + this(tableRootDirectory, tableDefinition, readInstructions, + SeekableChannelsProviderLoader.getInstance().fromServiceLoader(tableRootDirectory, + readInstructions.getSpecialInstructions())); } - public ParquetKeyValuePartitionedLayout( + private ParquetKeyValuePartitionedLayout( @NotNull final URI tableRootDirectory, @NotNull final TableDefinition tableDefinition, - @NotNull final ParquetInstructions readInstructions) { + @NotNull final ParquetInstructions readInstructions, + @NotNull final SeekableChannelsProvider channelsProvider) { super(tableRootDirectory, () -> new LocationTableBuilderDefinition(tableDefinition), - (uri, partitions) -> new ParquetTableLocationKey(uri, 0, partitions, readInstructions), + (uri, partitions) -> new ParquetTableLocationKey(uri, 0, partitions, readInstructions, + channelsProvider), Math.toIntExact(tableDefinition.getColumnStream().filter(ColumnDefinition::isPartitioning).count())); - this.readInstructions = readInstructions; + this.channelsProvider = channelsProvider; } public ParquetKeyValuePartitionedLayout( - @NotNull final File tableRootDirectory, + @NotNull final URI tableRootDirectory, final int maxPartitioningLevels, @NotNull final ParquetInstructions readInstructions) { - this(convertToURI(tableRootDirectory, true), maxPartitioningLevels, readInstructions); + this(tableRootDirectory, maxPartitioningLevels, readInstructions, + SeekableChannelsProviderLoader.getInstance().fromServiceLoader(tableRootDirectory, + readInstructions.getSpecialInstructions())); } - public ParquetKeyValuePartitionedLayout( + private ParquetKeyValuePartitionedLayout( @NotNull final URI tableRootDirectory, final int maxPartitioningLevels, - @NotNull final ParquetInstructions readInstructions) { + @NotNull final ParquetInstructions readInstructions, + @NotNull final SeekableChannelsProvider channelsProvider) { super(tableRootDirectory, () -> new LocationTableBuilderCsv(tableRootDirectory), - (uri, partitions) -> new ParquetTableLocationKey(uri, 0, partitions, readInstructions), + (uri, partitions) -> new ParquetTableLocationKey(uri, 0, partitions, readInstructions, + channelsProvider), maxPartitioningLevels); - this.readInstructions = readInstructions; + this.channelsProvider = channelsProvider; } @Override @@ -95,11 +102,8 @@ public final void findKeys(@NotNull final Consumer loca } else { uriFilter = uri -> uri.getPath().endsWith(ParquetUtils.PARQUET_FILE_EXTENSION); } - try (final SeekableChannelsProvider provider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader( - tableRootDirectory, readInstructions.getSpecialInstructions()); - final Stream uriStream = provider.walk(tableRootDirectory)) { - final Stream filteredStream = uriStream.filter(uriFilter); - findKeys(filteredStream, locationKeyObserver); + try (final Stream filteredUriStream = channelsProvider.walk(tableRootDirectory).filter(uriFilter)) { + findKeys(filteredUriStream, locationKeyObserver); } catch (final IOException e) { throw new TableDataException("Error finding parquet locations under " + tableRootDirectory, e); } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java index 82eee8e2353..bd0588770b5 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java @@ -16,6 +16,8 @@ import io.deephaven.parquet.table.location.ParquetTableLocationKey; import io.deephaven.parquet.table.ParquetInstructions; import io.deephaven.parquet.base.ParquetFileReader; +import io.deephaven.util.channel.SeekableChannelsProvider; +import io.deephaven.util.channel.SeekableChannelsProviderLoader; import org.apache.commons.io.FilenameUtils; import org.apache.parquet.format.converter.ParquetMetadataConverter; import io.deephaven.util.type.TypeUtils; @@ -28,6 +30,7 @@ import java.io.File; import java.io.IOException; +import java.net.URI; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; @@ -39,6 +42,7 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static io.deephaven.base.FileUtils.convertToURI; import static io.deephaven.parquet.base.ParquetUtils.COMMON_METADATA_FILE_NAME; import static io.deephaven.parquet.base.ParquetUtils.METADATA_FILE_NAME; import static io.deephaven.parquet.base.ParquetUtils.METADATA_KEY; @@ -69,6 +73,7 @@ public class ParquetMetadataFileLayout implements TableLocationKeyFinder keys; + private final SeekableChannelsProvider channelsProvider; public ParquetMetadataFileLayout(@NotNull final File directory) { this(directory, ParquetInstructions.EMPTY); @@ -96,12 +101,13 @@ public ParquetMetadataFileLayout( } this.metadataFile = metadataFile; this.commonMetadataFile = commonMetadataFile; + channelsProvider = + SeekableChannelsProviderLoader.getInstance().fromServiceLoader(convertToURI(metadataFile, false), + inputInstructions.getSpecialInstructions()); if (!metadataFile.exists()) { throw new TableDataException(String.format("Parquet metadata file %s does not exist", metadataFile)); } - final ParquetFileReader metadataFileReader = - ParquetFileReader.create(metadataFile, inputInstructions.getSpecialInstructions()); - + final ParquetFileReader metadataFileReader = ParquetFileReader.create(metadataFile, channelsProvider); final ParquetMetadataConverter converter = new ParquetMetadataConverter(); final ParquetMetadata metadataFileMetadata = convertMetadata(metadataFile, metadataFileReader, converter); final Pair>, ParquetInstructions> leafSchemaInfo = ParquetSchemaReader.convertSchema( @@ -111,7 +117,7 @@ public ParquetMetadataFileLayout( if (commonMetadataFile != null && commonMetadataFile.exists()) { final ParquetFileReader commonMetadataFileReader = - ParquetFileReader.create(commonMetadataFile, inputInstructions.getSpecialInstructions()); + ParquetFileReader.create(commonMetadataFile, channelsProvider); final Pair>, ParquetInstructions> fullSchemaInfo = ParquetSchemaReader.convertSchema( commonMetadataFileReader.getSchema(), @@ -202,9 +208,9 @@ public ParquetMetadataFileLayout( partitions.put(partitionKey, partitionValue); } } - final File partitionFile = new File(directory, relativePathString); - final ParquetTableLocationKey tlk = new ParquetTableLocationKey(partitionFile, - partitionOrder.getAndIncrement(), partitions, inputInstructions); + final URI partitionFileURI = convertToURI(new File(directory, relativePathString), false); + final ParquetTableLocationKey tlk = new ParquetTableLocationKey(partitionFileURI, + partitionOrder.getAndIncrement(), partitions, inputInstructions, channelsProvider); tlk.setFileReader(metadataFileReader); tlk.setMetadata(getParquetMetadataForFile(relativePathString, metadataFileMetadata)); tlk.setRowGroupIndices(rowGroupIndices); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetSingleFileLayout.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetSingleFileLayout.java deleted file mode 100644 index cc328eaba63..00000000000 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetSingleFileLayout.java +++ /dev/null @@ -1,39 +0,0 @@ -// -// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending -// -package io.deephaven.parquet.table.layout; - -import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder; -import io.deephaven.parquet.table.ParquetInstructions; -import io.deephaven.parquet.table.location.ParquetTableLocationKey; -import org.jetbrains.annotations.NotNull; - -import java.net.URI; -import java.util.function.Consumer; - -/** - * Parquet {@link TableLocationKeyFinder location finder} that will discover a single file. - */ -public final class ParquetSingleFileLayout implements TableLocationKeyFinder { - private final URI parquetFileUri; - private final ParquetInstructions readInstructions; - - /** - * @param parquetFileUri URI of single parquet file to find - * @param readInstructions the instructions for customizations while reading - */ - public ParquetSingleFileLayout(@NotNull final URI parquetFileUri, - @NotNull final ParquetInstructions readInstructions) { - this.parquetFileUri = parquetFileUri; - this.readInstructions = readInstructions; - } - - public String toString() { - return ParquetSingleFileLayout.class.getSimpleName() + '[' + parquetFileUri + ']'; - } - - @Override - public void findKeys(@NotNull final Consumer locationKeyObserver) { - locationKeyObserver.accept(new ParquetTableLocationKey(parquetFileUri, 0, null, readInstructions)); - } -} diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java index 7d77ba5e240..da0efbefbe8 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java @@ -82,6 +82,8 @@ public ParquetTableLocation(@NotNull final TableKey tableKey, final ParquetMetadata parquetMetadata; // noinspection SynchronizationOnLocalVariableOrMethodParameter synchronized (tableLocationKey) { + // Following methods are internally synchronized, we synchronize them together here to minimize lock/unlock + // calls parquetFileReader = tableLocationKey.getFileReader(); parquetMetadata = tableLocationKey.getMetadata(); rowGroupIndices = tableLocationKey.getRowGroupIndices(); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java index a33cd48f609..cafff0a3210 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java @@ -8,6 +8,8 @@ import io.deephaven.engine.table.impl.locations.TableDataException; import io.deephaven.engine.table.impl.locations.TableLocationKey; import io.deephaven.parquet.base.ParquetFileReader; +import io.deephaven.util.channel.SeekableChannelsProvider; +import io.deephaven.util.channel.SeekableChannelsProviderLoader; import org.apache.commons.io.FilenameUtils; import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.format.RowGroup; @@ -15,7 +17,6 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.io.File; import java.io.IOException; import java.net.URI; import java.util.List; @@ -35,23 +36,27 @@ public class ParquetTableLocationKey extends URITableLocationKey { private ParquetFileReader fileReader; private ParquetMetadata metadata; private int[] rowGroupIndices; - private final ParquetInstructions readInstructions; + private SeekableChannelsProvider channelsProvider; /** - * Construct a new ParquetTableLocationKey for the supplied {@code file} and {@code partitions}. + * Construct a new ParquetTableLocationKey for the supplied {@code parquetFileUri} and {@code partitions}. + *

+ * This constructor will create a new {@link SeekableChannelsProvider} for reading the file. If you have multiple + * location keys that should share a provider, use the other constructor and set the provider manually. * - * @param file The parquet file that backs the keyed location. Will be adjusted to an absolute path. + * @param parquetFileUri The parquet file that backs the keyed location. Will be adjusted to an absolute path. * @param order Explicit ordering index, taking precedence over other fields * @param partitions The table partitions enclosing the table location keyed by {@code this}. Note that if this * parameter is {@code null}, the location will be a member of no partitions. An ordered copy of the map will * be made, so the calling code is free to mutate the map after this call * @param readInstructions the instructions for customizations while reading */ - public ParquetTableLocationKey(@NotNull final File file, final int order, + public ParquetTableLocationKey(@NotNull final URI parquetFileUri, final int order, @Nullable final Map> partitions, @NotNull final ParquetInstructions readInstructions) { - super(validateParquetFile(file), order, partitions); - this.readInstructions = readInstructions; + this(parquetFileUri, order, partitions, readInstructions, + SeekableChannelsProviderLoader.getInstance().fromServiceLoader(parquetFileUri, + readInstructions.getSpecialInstructions())); } /** @@ -63,21 +68,20 @@ public ParquetTableLocationKey(@NotNull final File file, final int order, * parameter is {@code null}, the location will be a member of no partitions. An ordered copy of the map will * be made, so the calling code is free to mutate the map after this call * @param readInstructions the instructions for customizations while reading + * @param channelsProvider the provider for reading the file */ public ParquetTableLocationKey(@NotNull final URI parquetFileUri, final int order, @Nullable final Map> partitions, - @NotNull final ParquetInstructions readInstructions) { + @NotNull final ParquetInstructions readInstructions, + @NotNull final SeekableChannelsProvider channelsProvider) { super(validateParquetFile(parquetFileUri), order, partitions); - this.readInstructions = readInstructions; - } - - private static URI validateParquetFile(@NotNull final File file) { - return validateParquetFile(convertToURI(file, false)); + this.channelsProvider = channelsProvider; } private static URI validateParquetFile(@NotNull final URI parquetFileUri) { if (!parquetFileUri.getRawPath().endsWith(PARQUET_FILE_EXTENSION)) { - throw new IllegalArgumentException("Parquet file must end in " + PARQUET_FILE_EXTENSION); + throw new IllegalArgumentException("Parquet file must end in " + PARQUET_FILE_EXTENSION + ", found: " + + parquetFileUri.getRawPath()); } return parquetFileUri; } @@ -87,36 +91,6 @@ public String getImplementationName() { return IMPLEMENTATION_NAME; } - /** - * Returns {@code true} if a previous {@link ParquetFileReader} has been created, or if one was successfully created - * on-demand. - * - *

- * When {@code false}, this may mean that the file: - *

    - *
  1. does not exist, or is otherwise inaccessible
  2. - *
  3. is in the process of being written, and is not yet a valid parquet file
  4. - *
  5. is _not_ a parquet file
  6. - *
  7. is a corrupt parquet file
  8. - *
- * - * Callers wishing to handle these cases more explicit may call - * {@link ParquetFileReader#createChecked(URI, Object)}. - * - * @return true if the file reader exists or was successfully created - */ - public synchronized boolean verifyFileReader() { - if (fileReader != null) { - return true; - } - try { - fileReader = ParquetFileReader.createChecked(uri, readInstructions.getSpecialInstructions()); - } catch (IOException e) { - return false; - } - return true; - } - /** * Get a previously-{@link #setFileReader(ParquetFileReader) set} or on-demand created {@link ParquetFileReader} for * this location key's {@code file}. @@ -127,7 +101,7 @@ public synchronized ParquetFileReader getFileReader() { if (fileReader != null) { return fileReader; } - return fileReader = ParquetFileReader.create(uri, readInstructions.getSpecialInstructions()); + return fileReader = ParquetFileReader.create(uri, channelsProvider); } /** diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index 44ed9586a61..1fc4cf47f16 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -746,6 +746,13 @@ public void flatPartitionedParquetWithMetadataTest() throws IOException { final Table fromDiskWithMetadataWithoutData = readTable(metadataFile); assertEquals(source.size(), fromDiskWithMetadataWithoutData.size()); + // If we call select now, this should fail because the data files are empty + try { + fromDiskWithMetadataWithoutData.select(); + fail("Expected exception when reading table with empty data files"); + } catch (final RuntimeException expected) { + } + // Now write with flat partitioned parquet files to different directories with metadata file parentDir.delete(); final File updatedSecondDataFile = new File(rootFile, "testDir/data2.parquet"); @@ -786,6 +793,56 @@ public void flatPartitionedParquetWithBigDecimalMetadataTest() throws IOExceptio assertTableEquals(expected, fromDiskWithMetadata); } + @Test + public void keyValuePartitionedWithMetadataTest() throws IOException { + final TableDefinition definition = TableDefinition.of( + ColumnDefinition.ofInt("PC1").withPartitioning(), + ColumnDefinition.ofInt("PC2").withPartitioning(), + ColumnDefinition.ofLong("I")); + final Table source = ((QueryTable) TableTools.emptyTable(1_000_000) + .updateView("PC1 = (int)(ii%3)", + "PC2 = (int)(ii%2)", + "I = ii")) + .withDefinitionUnsafe(definition); + + final File parentDir = new File(rootFile, "keyValuePartitionedWithMetadataTest"); + final ParquetInstructions writeInstructions = ParquetInstructions.builder() + .setGenerateMetadataFiles(true) + .setBaseNameForPartitionedParquetData("data") + .build(); + writeKeyValuePartitionedTable(source, parentDir.getAbsolutePath(), writeInstructions); + + final Table fromDisk = readTable(parentDir); + assertTableEquals(source.sort("PC1", "PC2"), fromDisk.sort("PC1", "PC2")); + + final File metadataFile = new File(parentDir, "_metadata"); + final Table fromDiskWithMetadata = readTable(metadataFile); + assertTableEquals(source.sort("PC1", "PC2"), fromDiskWithMetadata.sort("PC1", "PC2")); + + final File firstDataFile = + new File(parentDir, "PC1=0" + File.separator + "PC2=0" + File.separator + "data.parquet"); + final File secondDataFile = + new File(parentDir, "PC1=0" + File.separator + "PC2=1" + File.separator + "data.parquet"); + assertTrue(firstDataFile.exists()); + assertTrue(secondDataFile.exists()); + + // Now replace the underlying data files with empty files and read the size from metadata file verifying that + // we can read the size without touching the data + firstDataFile.delete(); + firstDataFile.createNewFile(); + secondDataFile.delete(); + secondDataFile.createNewFile(); + final Table fromDiskWithMetadataWithoutData = readTable(metadataFile); + assertEquals(source.size(), fromDiskWithMetadataWithoutData.size()); + + // If we call select now, this should fail because the data files are empty + try { + fromDiskWithMetadataWithoutData.select(); + fail("Expected exception when reading table with empty data files"); + } catch (final RuntimeException expected) { + } + } + @Test public void writeKeyValuePartitionedDataWithIntegerPartitionsTest() { final TableDefinition definition = TableDefinition.of( @@ -1404,7 +1461,8 @@ public void testArrayColumns() { writeReadTableTest(arrayTable, dest, writeInstructions); // Make sure the column didn't use dictionary encoding - ParquetMetadata metadata = new ParquetTableLocationKey(dest, 0, null, ParquetInstructions.EMPTY).getMetadata(); + ParquetMetadata metadata = + new ParquetTableLocationKey(dest.toURI(), 0, null, ParquetInstructions.EMPTY).getMetadata(); String firstColumnMetadata = metadata.getBlocks().get(0).getColumns().get(0).toString(); assertTrue(firstColumnMetadata.contains("someStringArrayColumn") && !firstColumnMetadata.contains("RLE_DICTIONARY")); @@ -1413,7 +1471,7 @@ public void testArrayColumns() { writeReadTableTest(vectorTable, dest, writeInstructions); // Make sure the column didn't use dictionary encoding - metadata = new ParquetTableLocationKey(dest, 0, null, ParquetInstructions.EMPTY).getMetadata(); + metadata = new ParquetTableLocationKey(dest.toURI(), 0, null, ParquetInstructions.EMPTY).getMetadata(); firstColumnMetadata = metadata.getBlocks().get(0).getColumns().get(0).toString(); assertTrue(firstColumnMetadata.contains("someStringArrayColumn") && !firstColumnMetadata.contains("RLE_DICTIONARY")); @@ -1699,7 +1757,7 @@ public void testReadOldParquetData() { String path = ParquetTableReadWriteTest.class.getResource("/ReferenceParquetData.parquet").getFile(); readParquetFileFromGitLFS(new File(path)).select(); final ParquetMetadata metadata = - new ParquetTableLocationKey(new File(path), 0, null, ParquetInstructions.EMPTY).getMetadata(); + new ParquetTableLocationKey(new File(path).toURI(), 0, null, ParquetInstructions.EMPTY).getMetadata(); assertTrue(metadata.getFileMetaData().getKeyValueMetaData().get("deephaven").contains("\"version\":\"0.4.0\"")); path = ParquetTableReadWriteTest.class.getResource("/ReferenceParquetVectorData.parquet").getFile(); @@ -2083,7 +2141,7 @@ private void indexedColumnsBasicWriteTestsImpl(TestParquetTableWriter writer) { // Verify that the key-value metadata in the file has the correct name ParquetTableLocationKey tableLocationKey = - new ParquetTableLocationKey(destFile, 0, null, ParquetInstructions.EMPTY); + new ParquetTableLocationKey(destFile.toURI(), 0, null, ParquetInstructions.EMPTY); String metadataString = tableLocationKey.getMetadata().getFileMetaData().toString(); assertTrue(metadataString.contains(vvvIndexFilePath)); @@ -2116,7 +2174,7 @@ public void legacyGroupingFileReadTest() { // Verify that the key-value metadata in the file has the correct legacy grouping file name final ParquetTableLocationKey tableLocationKey = - new ParquetTableLocationKey(destFile, 0, null, ParquetInstructions.EMPTY); + new ParquetTableLocationKey(destFile.toURI(), 0, null, ParquetInstructions.EMPTY); final String metadataString = tableLocationKey.getMetadata().getFileMetaData().toString(); String groupingFileName = ParquetTools.legacyGroupingFileName(destFile, groupingColName); assertTrue(metadataString.contains(groupingFileName)); @@ -2281,10 +2339,10 @@ public void writeMultiTableIndexTest() { // Verify that the key-value metadata in the file has the correct name ParquetTableLocationKey tableLocationKey = - new ParquetTableLocationKey(firstDestFile, 0, null, ParquetInstructions.EMPTY); + new ParquetTableLocationKey(firstDestFile.toURI(), 0, null, ParquetInstructions.EMPTY); String metadataString = tableLocationKey.getMetadata().getFileMetaData().toString(); assertTrue(metadataString.contains(firstIndexFilePath)); - tableLocationKey = new ParquetTableLocationKey(secondDestFile, 0, null, ParquetInstructions.EMPTY); + tableLocationKey = new ParquetTableLocationKey(secondDestFile.toURI(), 0, null, ParquetInstructions.EMPTY); metadataString = tableLocationKey.getMetadata().getFileMetaData().toString(); assertTrue(metadataString.contains(secondIndexFilePath)); @@ -2334,7 +2392,7 @@ private void indexOverwritingTestsImpl(TestParquetTableWriter writer) { checkSingleTable(anotherTableToSave, destFile); ParquetTableLocationKey tableLocationKey = - new ParquetTableLocationKey(destFile, 0, null, ParquetInstructions.EMPTY); + new ParquetTableLocationKey(destFile.toURI(), 0, null, ParquetInstructions.EMPTY); String metadataString = tableLocationKey.getMetadata().getFileMetaData().toString(); assertTrue(metadataString.contains(xxxIndexFilePath) && !metadataString.contains(vvvIndexFilePath)); @@ -2350,7 +2408,7 @@ private void indexOverwritingTestsImpl(TestParquetTableWriter writer) { Map.of("vvv", new String[] {vvvIndexFilePath}, "xxx", new String[] {xxxIndexFilePath})); - tableLocationKey = new ParquetTableLocationKey(destFile, 0, null, ParquetInstructions.EMPTY); + tableLocationKey = new ParquetTableLocationKey(destFile.toURI(), 0, null, ParquetInstructions.EMPTY); metadataString = tableLocationKey.getMetadata().getFileMetaData().toString(); assertTrue(metadataString.contains(xxxIndexFilePath) && !metadataString.contains(vvvIndexFilePath) && !metadataString.contains(backupXXXIndexFileName)); @@ -2442,7 +2500,7 @@ public void dictionaryEncodingTest() { // Verify that string columns are properly dictionary encoded final ParquetMetadata metadata = - new ParquetTableLocationKey(dest, 0, null, ParquetInstructions.EMPTY).getMetadata(); + new ParquetTableLocationKey(dest.toURI(), 0, null, ParquetInstructions.EMPTY).getMetadata(); final String firstColumnMetadata = metadata.getBlocks().get(0).getColumns().get(0).toString(); assertTrue(firstColumnMetadata.contains("shortStringColumn") && firstColumnMetadata.contains("RLE_DICTIONARY")); final String secondColumnMetadata = metadata.getBlocks().get(0).getColumns().get(1).toString(); @@ -2515,7 +2573,8 @@ private static ColumnChunkMetaData overflowingStringsTestHelper(final Collection writeTable(stringTable, dest, writeInstructions); checkSingleTable(stringTable, dest); - ParquetMetadata metadata = new ParquetTableLocationKey(dest, 0, null, ParquetInstructions.EMPTY).getMetadata(); + ParquetMetadata metadata = + new ParquetTableLocationKey(dest.toURI(), 0, null, ParquetInstructions.EMPTY).getMetadata(); return metadata.getBlocks().get(0).getColumns().get(0); } @@ -2539,7 +2598,7 @@ public void overflowingCodecsTest() { checkSingleTable(table, dest); final ParquetMetadata metadata = - new ParquetTableLocationKey(dest, 0, null, ParquetInstructions.EMPTY).getMetadata(); + new ParquetTableLocationKey(dest.toURI(), 0, null, ParquetInstructions.EMPTY).getMetadata(); final String metadataStr = metadata.getFileMetaData().getKeyValueMetaData().get("deephaven"); assertTrue( metadataStr.contains("VariableWidthByteArrayColumn") && metadataStr.contains("SimpleByteArrayCodec")); @@ -2600,7 +2659,7 @@ public void readWriteDateTimeTest() { // Verify that the types are correct in the schema final ParquetMetadata metadata = - new ParquetTableLocationKey(dest, 0, null, ParquetInstructions.EMPTY).getMetadata(); + new ParquetTableLocationKey(dest.toURI(), 0, null, ParquetInstructions.EMPTY).getMetadata(); final ColumnChunkMetaData dateColMetadata = metadata.getBlocks().get(0).getColumns().get(0); assertTrue(dateColMetadata.toString().contains("someDateColumn")); assertEquals(PrimitiveType.PrimitiveTypeName.INT32, dateColMetadata.getPrimitiveType().getPrimitiveTypeName()); @@ -3061,7 +3120,7 @@ public void readSingleColumn() { private void assertTableStatistics(Table inputTable, File dest) { // Verify that the columns have the correct statistics. final ParquetMetadata metadata = - new ParquetTableLocationKey(dest, 0, null, ParquetInstructions.EMPTY).getMetadata(); + new ParquetTableLocationKey(dest.toURI(), 0, null, ParquetInstructions.EMPTY).getMetadata(); final String[] colNames = inputTable.getDefinition().getColumnNamesArray(); for (int colIdx = 0; colIdx < inputTable.numColumns(); ++colIdx) { diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java index db959e8483e..644fda8bbe7 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java @@ -353,7 +353,7 @@ public void testPartitionedRead() { final TableDefinition partitionedDefinition = TableDefinition.of(allColumns); final Table result = ParquetTools.readPartitionedTableInferSchema( - new ParquetKeyValuePartitionedLayout(testRootFile, 2, ParquetInstructions.EMPTY), + new ParquetKeyValuePartitionedLayout(testRootFile.toURI(), 2, ParquetInstructions.EMPTY), ParquetInstructions.EMPTY); TestCase.assertEquals(partitionedDefinition, result.getDefinition()); final Table expected = TableTools.merge( diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3AsyncClientFactory.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3AsyncClientFactory.java new file mode 100644 index 00000000000..2b2071fde08 --- /dev/null +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3AsyncClientFactory.java @@ -0,0 +1,151 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.s3; + +import io.deephaven.internal.log.LoggerFactory; +import io.deephaven.io.logger.Logger; +import org.jetbrains.annotations.NotNull; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption; +import software.amazon.awssdk.core.retry.RetryMode; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; +import software.amazon.awssdk.utils.ThreadFactoryBuilder; + +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import static io.deephaven.util.thread.ThreadHelpers.getOrComputeThreadCountProperty; + +class S3AsyncClientFactory { + + private static final int NUM_FUTURE_COMPLETION_THREADS = + getOrComputeThreadCountProperty("S3.numFutureCompletionThreads", -1); + private static final int NUM_SCHEDULED_EXECUTOR_THREADS = + getOrComputeThreadCountProperty("S3.numScheduledExecutorThreads", 5); + + private static final Logger log = LoggerFactory.getLogger(S3AsyncClientFactory.class); + private static final Map httpClientCache = new ConcurrentHashMap<>(); + + private static volatile Executor futureCompletionExecutor; + private static volatile ScheduledExecutorService scheduledExecutor; + + static S3AsyncClient getAsyncClient(@NotNull final S3Instructions instructions) { + final S3AsyncClientBuilder builder = S3AsyncClient.builder() + .asyncConfiguration( + b -> b.advancedOption(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, + ensureAsyncFutureCompletionExecutor())) + .httpClient(getOrBuildHttpClient(instructions)) + .overrideConfiguration(ClientOverrideConfiguration.builder() + // If we find that the STANDARD retry policy does not work well in all situations, we might + // try experimenting with ADAPTIVE retry policy, potentially with fast fail. + // .retryPolicy(RetryPolicy.builder(RetryMode.ADAPTIVE).fastFailRateLimiting(true).build()) + .retryPolicy(RetryMode.STANDARD) + .apiCallAttemptTimeout(instructions.readTimeout().dividedBy(3)) + .apiCallTimeout(instructions.readTimeout()) + // Adding a metrics publisher may be useful for debugging, but it's very verbose. + // .addMetricPublisher(LoggingMetricPublisher.create(Level.INFO, Format.PRETTY)) + .scheduledExecutorService(ensureScheduledExecutor()) + .build()) + .region(Region.of(instructions.regionName())) + .credentialsProvider(instructions.awsV2CredentialsProvider()); + instructions.endpointOverride().ifPresent(builder::endpointOverride); + final S3AsyncClient ret = builder.build(); + if (log.isDebugEnabled()) { + log.debug().append("Building S3AsyncClient with instructions: ").append(instructions).endl(); + } + return ret; + } + + private static class HttpClientConfig { + private final int maxConcurrentRequests; + private final Duration connectionTimeout; + + HttpClientConfig(final int maxConcurrentRequests, final Duration connectionTimeout) { + this.maxConcurrentRequests = maxConcurrentRequests; + this.connectionTimeout = connectionTimeout; + } + + int maxConcurrentRequests() { + return maxConcurrentRequests; + } + + Duration connectionTimeout() { + return connectionTimeout; + } + + @Override + public int hashCode() { + int result = maxConcurrentRequests; + result = 31 * result + connectionTimeout.hashCode(); + return result; + } + + @Override + public boolean equals(final Object other) { + if (this == other) { + return true; + } + if (other == null || getClass() != other.getClass()) { + return false; + } + final HttpClientConfig that = (HttpClientConfig) other; + return maxConcurrentRequests == that.maxConcurrentRequests + && connectionTimeout.equals(that.connectionTimeout); + } + } + + private static SdkAsyncHttpClient getOrBuildHttpClient(@NotNull final S3Instructions instructions) { + final HttpClientConfig config = new HttpClientConfig(instructions.maxConcurrentRequests(), + instructions.connectionTimeout()); + return httpClientCache.computeIfAbsent(config, key -> AwsCrtAsyncHttpClient.builder() + .maxConcurrency(config.maxConcurrentRequests()) + .connectionTimeout(config.connectionTimeout()) + .build()); + } + + /** + * The following executor will be used to complete the futures returned by the async client. This is a shared + * executor across all clients with fixed number of threads. This pattern is inspired by the default executor used + * by the SDK + * ({@code software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder#resolveAsyncFutureCompletionExecutor}) + * + */ + private static Executor ensureAsyncFutureCompletionExecutor() { + if (futureCompletionExecutor == null) { + synchronized (S3AsyncClientFactory.class) { + if (futureCompletionExecutor == null) { + futureCompletionExecutor = Executors.newFixedThreadPool(NUM_FUTURE_COMPLETION_THREADS, + new ThreadFactoryBuilder().threadNamePrefix("s3-async-future-completion").build()); + } + } + } + return futureCompletionExecutor; + } + + /** + * The following executor will be used to schedule tasks for the async client, such as timeouts and retries. This is + * a shared executor across all clients with fixed number of threads. This pattern is inspired by the default + * executor used by the SDK + * ({@code software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder#resolveScheduledExecutorService}). + */ + private static ScheduledExecutorService ensureScheduledExecutor() { + if (scheduledExecutor == null) { + synchronized (S3AsyncClientFactory.class) { + if (scheduledExecutor == null) { + scheduledExecutor = Executors.newScheduledThreadPool(NUM_SCHEDULED_EXECUTOR_THREADS, + new ThreadFactoryBuilder().threadNamePrefix("s3-scheduled-executor").build()); + } + } + } + return scheduledExecutor; + } +} diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableByteChannel.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableByteChannel.java index 3579961b3a3..6f0d6ffa057 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableByteChannel.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableByteChannel.java @@ -41,8 +41,12 @@ final class S3SeekableByteChannel implements SeekableByteChannel, CachedChannelP private long size; S3SeekableByteChannel(final S3Uri uri) { + this(uri, UNINITIALIZED_SIZE); + } + + S3SeekableByteChannel(final S3Uri uri, final long size) { this.uri = Objects.requireNonNull(uri); - this.size = UNINITIALIZED_SIZE; + this.size = size; this.position = INIT_POSITION; } diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java index 5d773ea484b..414ae9fd715 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java @@ -5,29 +5,29 @@ import io.deephaven.UncheckedDeephavenException; import io.deephaven.base.verify.Assert; +import io.deephaven.base.verify.Require; +import io.deephaven.hash.KeyedObjectHashMap; +import io.deephaven.hash.KeyedObjectKey; import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.logger.Logger; import io.deephaven.util.channel.Channels; import io.deephaven.util.channel.SeekableChannelContext; import io.deephaven.util.channel.SeekableChannelsProvider; import org.jetbrains.annotations.NotNull; -import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; -import software.amazon.awssdk.core.retry.RetryMode; -import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient; -import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; -import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; import software.amazon.awssdk.services.s3.S3Uri; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import java.io.IOException; import java.io.InputStream; +import java.lang.ref.SoftReference; import java.net.URI; import java.net.URISyntaxException; import java.nio.channels.SeekableByteChannel; import java.nio.file.Path; import java.util.Iterator; +import java.util.Map; import java.util.NoSuchElementException; import java.util.Spliterator; import java.util.Spliterators; @@ -35,6 +35,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -62,36 +63,17 @@ final class S3SeekableChannelProvider implements SeekableChannelsProvider { private final S3AsyncClient s3AsyncClient; private final S3Instructions s3Instructions; + @SuppressWarnings("rawtypes") + private static final AtomicReferenceFieldUpdater FILE_SIZE_CACHE_REF_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(S3SeekableChannelProvider.class, SoftReference.class, + "fileSizeCacheRef"); + + private volatile SoftReference> fileSizeCacheRef; + S3SeekableChannelProvider(@NotNull final S3Instructions s3Instructions) { - // TODO(deephaven-core#5062): Add support for async client recovery and auto-close - // TODO(deephaven-core#5063): Add support for caching clients for re-use - this.s3AsyncClient = buildClient(s3Instructions); + this.s3AsyncClient = S3AsyncClientFactory.getAsyncClient(s3Instructions); this.s3Instructions = s3Instructions; - } - - private static S3AsyncClient buildClient(@NotNull S3Instructions s3Instructions) { - final S3AsyncClientBuilder builder = S3AsyncClient.builder() - .httpClient(AwsCrtAsyncHttpClient.builder() - .maxConcurrency(s3Instructions.maxConcurrentRequests()) - .connectionTimeout(s3Instructions.connectionTimeout()) - .build()) - .overrideConfiguration(ClientOverrideConfiguration.builder() - // If we find that the STANDARD retry policy does not work well in all situations, we might - // try experimenting with ADAPTIVE retry policy, potentially with fast fail. - // .retryPolicy(RetryPolicy.builder(RetryMode.ADAPTIVE).fastFailRateLimiting(true).build()) - .retryPolicy(RetryMode.STANDARD) - .apiCallAttemptTimeout(s3Instructions.readTimeout().dividedBy(3)) - .apiCallTimeout(s3Instructions.readTimeout()) - // Adding a metrics publisher may be useful for debugging, but it's very verbose. - // .addMetricPublisher(LoggingMetricPublisher.create(Level.INFO, Format.PRETTY)) - .build()) - .region(Region.of(s3Instructions.regionName())) - .credentialsProvider(s3Instructions.awsV2CredentialsProvider()); - s3Instructions.endpointOverride().ifPresent(builder::endpointOverride); - if (log.isDebugEnabled()) { - log.debug().append("Building client with instructions: ").append(s3Instructions).endl(); - } - return builder.build(); + this.fileSizeCacheRef = new SoftReference<>(new KeyedObjectHashMap<>(FileSizeInfo.URI_MATCH_KEY)); } @Override @@ -99,11 +81,15 @@ public SeekableByteChannel getReadChannel(@NotNull final SeekableChannelContext @NotNull final URI uri) { final S3Uri s3Uri = s3AsyncClient.utilities().parseUri(uri); // context is unused here, will be set before reading from the channel + final Map fileSizeCache = fileSizeCacheRef.get(); + if (fileSizeCache != null && fileSizeCache.containsKey(uri)) { + return new S3SeekableByteChannel(s3Uri, fileSizeCache.get(uri).size); + } return new S3SeekableByteChannel(s3Uri); } @Override - public InputStream getInputStream(SeekableByteChannel channel) { + public InputStream getInputStream(final SeekableByteChannel channel) { // S3SeekableByteChannel is internally buffered, no need to re-buffer return Channels.newInputStreamNoClose(channel); } @@ -213,14 +199,17 @@ private void fetchNextBatch() throws IOException { if (path.contains(REPEATED_URI_SEPARATOR)) { path = REPEATED_URI_SEPARATOR_PATTERN.matcher(path).replaceAll(URI_SEPARATOR); } + final URI uri; try { - return new URI(S3_URI_SCHEME, directory.getUserInfo(), directory.getHost(), + uri = new URI(S3_URI_SCHEME, directory.getUserInfo(), directory.getHost(), directory.getPort(), path, null, null); } catch (final URISyntaxException e) { throw new UncheckedDeephavenException("Failed to create URI for S3 object with key: " + s3Object.key() + " and bucket " + bucketName + " inside directory " + directory, e); } + updateFileSizeCache(getFileSizeCache(), uri, s3Object.size()); + return uri; }).iterator(); // The following token is null when the last batch is fetched. continuationToken = response.nextContinuationToken(); @@ -230,6 +219,56 @@ private void fetchNextBatch() throws IOException { Spliterator.ORDERED | Spliterator.DISTINCT | Spliterator.NONNULL), false); } + /** + * Get a strong reference to the file size cache, creating it if necessary. + */ + private Map getFileSizeCache() { + SoftReference> cacheRef; + Map cache; + while ((cache = (cacheRef = fileSizeCacheRef).get()) == null) { + if (FILE_SIZE_CACHE_REF_UPDATER.compareAndSet(this, cacheRef, + new SoftReference<>(cache = new KeyedObjectHashMap<>(FileSizeInfo.URI_MATCH_KEY)))) { + return cache; + } + } + return cache; + } + + /** + * Update the given file size cache with the given URI and size. + */ + private static void updateFileSizeCache( + @NotNull final Map fileSizeCache, + @NotNull final URI uri, + final long size) { + fileSizeCache.compute(uri, (key, existingInfo) -> { + if (existingInfo == null) { + return new FileSizeInfo(uri, size); + } else if (existingInfo.size != size) { + throw new IllegalStateException("Existing size " + existingInfo.size + " does not match " + + " the new size " + size + " for key " + key); + } + return existingInfo; + }); + } + + private static final class FileSizeInfo { + private final URI uri; + private final long size; + + FileSizeInfo(@NotNull final URI uri, final long size) { + this.uri = Require.neqNull(uri, "uri"); + this.size = size; + } + + private static final KeyedObjectKey URI_MATCH_KEY = new KeyedObjectKey.Basic<>() { + @Override + public URI getKey(@NotNull final FileSizeInfo value) { + return value.uri; + } + }; + } + @Override public void close() { s3AsyncClient.close(); diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelTestBase.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelTestBase.java index 74e941966f2..0075568581b 100644 --- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelTestBase.java +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelTestBase.java @@ -71,7 +71,7 @@ void readSimpleFiles() final ByteBuffer buffer = ByteBuffer.allocate(1); try ( final SeekableChannelsProvider providerImpl = providerImpl(uri); - final SeekableChannelsProvider provider = new CachedChannelProvider(providerImpl, 32); + final SeekableChannelsProvider provider = CachedChannelProvider.create(providerImpl, 32); final SeekableChannelContext context = provider.makeContext(); final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) { assertThat(readChannel.read(buffer)).isEqualTo(-1); @@ -81,7 +81,7 @@ void readSimpleFiles() final URI uri = uri("hello/world.txt"); try ( final SeekableChannelsProvider providerImpl = providerImpl(uri); - final SeekableChannelsProvider provider = new CachedChannelProvider(providerImpl, 32); + final SeekableChannelsProvider provider = CachedChannelProvider.create(providerImpl, 32); final SeekableChannelContext context = provider.makeContext(); final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) { final ByteBuffer bytes = readAll(readChannel, 32); @@ -103,7 +103,7 @@ public int read() { final ByteBuffer buffer = ByteBuffer.allocate(1); try ( final SeekableChannelsProvider providerImpl = providerImpl(uri); - final SeekableChannelsProvider provider = new CachedChannelProvider(providerImpl, 32); + final SeekableChannelsProvider provider = CachedChannelProvider.create(providerImpl, 32); final SeekableChannelContext context = provider.makeContext(); final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) { for (long p = 0; p < numBytes; ++p) {