From 2d23c5b9e316fbc790b4025dfd9df19983ed720f Mon Sep 17 00:00:00 2001 From: Fuat Basik Date: Tue, 26 Nov 2024 20:38:23 +0000 Subject: [PATCH] Integrate Analytics Accelerator Library for Amazon S3 This commits is the initial integration of Analytics Accelerator Library for Amazon S3 to S3A. It performs integration by introducing a new S3ASeekableStream and modifying S3AFileSystem. Use of the Analytics Accelerator Library is controlled by a configration and it is off by default. --- hadoop-tools/hadoop-aws/pom.xml | 11 +++ .../org/apache/hadoop/fs/s3a/Constants.java | 18 +++++ .../apache/hadoop/fs/s3a/S3AFileSystem.java | 48 +++++++++++- .../hadoop/fs/s3a/S3ASeekableStream.java | 68 ++++++++++++++++ .../fs/s3a/ITestS3AS3SeekableStream.java | 78 +++++++++++++++++++ 5 files changed, 222 insertions(+), 1 deletion(-) create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index d28704b7c334e..0fa9cf9b5a166 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -525,6 +525,17 @@ amazon-s3-encryption-client-java provided + + software.amazon.s3.analyticsaccelerator + analyticsaccelerator-s3 + 0.0.1 + compile + + + software.amazon.awssdk.crt + aws-crt + 0.29.10 + org.assertj assertj-core diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index b03c41c7bb1d2..b70dd9cd2aa48 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1760,4 +1760,22 @@ private Constants() { * Value: {@value}. */ public static final String S3A_IO_RATE_LIMIT = "fs.s3a.io.rate.limit"; + + /** + * Config to enable Analytics Accelerator Library for Amazon S3 + * https://github.com/awslabs/analytics-accelerator-s3 + */ + public static final String ANALYTICS_ACCELERATOR_ENABLED_KEY = "fs.s3a.analytics.accelerator.enabled"; + + /** + * Default value for {@link #ANALYTICS_ACCELERATOR_ENABLED_KEY } + * Value {@value}. + */ + public static final boolean ANALYTICS_ACCELERATOR_ENABLED_DEFAULT = false; + + /** + * Prefix to configure Analytics Accelerator Library + */ + public static final String ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX = "fs.s3a.analytics.accelerator"; + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index c0e530cb5ce40..69859a95d5c49 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -54,7 +54,9 @@ import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncClient; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.GetBucketLocationRequest; @@ -87,6 +89,11 @@ import software.amazon.awssdk.transfer.s3.model.Copy; import software.amazon.awssdk.transfer.s3.model.CopyRequest; +import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient; +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration; +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory; +import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration; + import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -317,6 +324,13 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, */ private S3Client s3Client; + /** + * CRT-Based S3Client created of analytics accelerator library is enabled + * and managed by the ClientManager. Analytics accelerator library can be + * enabled with {@link Constants#ANALYTICS_ACCELERATOR_ENABLED_KEY} + */ + private S3AsyncClient crtClient; + // initial callback policy is fail-once; it's there just to assist // some mock tests and other codepaths trying to call the low level // APIs on an uninitialized filesystem. @@ -344,6 +358,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, // If true, the prefetching input stream is used for reads. private boolean prefetchEnabled; + // If true, S3SeekableInputStream from Analytics Accelerator for Amazon S3 will be used. + private boolean analyticsAcceleratorEnabled; + // Size in bytes of a single prefetch block. private int prefetchBlockSize; @@ -525,6 +542,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, */ private boolean s3AccessGrantsEnabled; + /** + * Factory to create S3SeekableInputStream if {@link this#analyticsAcceleratorEnabled} is true + */ + private S3SeekableInputStreamFactory s3SeekableInputStreamFactory; + /** Add any deprecated keys. */ @SuppressWarnings("deprecation") private static void addDeprecatedKeys() { @@ -672,6 +694,7 @@ public void initialize(URI name, Configuration originalConf) s3ExpressStore); this.prefetchEnabled = conf.getBoolean(PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT); + this.analyticsAcceleratorEnabled = conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, ANALYTICS_ACCELERATOR_ENABLED_DEFAULT); long prefetchBlockSizeLong = longBytesOption(conf, PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE, 1); if (prefetchBlockSizeLong > (long) Integer.MAX_VALUE) { @@ -718,6 +741,17 @@ public void initialize(URI name, Configuration originalConf) // the encryption algorithms) ClientManager clientManager = createClientManager(name, delegationTokensEnabled); + if (this.analyticsAcceleratorEnabled) { + LOG.info("Using S3SeekableInputStream"); + this.crtClient = S3CrtAsyncClient.builder().maxConcurrency(600).build(); + ConnectorConfiguration configuration = new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX); + S3SeekableInputStreamConfiguration seekableInputStreamConfiguration = + S3SeekableInputStreamConfiguration.fromConfiguration(configuration); + this.s3SeekableInputStreamFactory = + new S3SeekableInputStreamFactory( + new S3SdkObjectClient(this.crtClient), seekableInputStreamConfiguration); + } + inputPolicy = S3AInputPolicy.getPolicy( conf.getTrimmed(INPUT_FADVISE, Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT), @@ -1876,6 +1910,8 @@ private FSDataInputStream executeOpen( final Path path, final OpenFileSupport.OpenFileInformation fileInformation) throws IOException { + + // create the input stream statistics before opening // the file so that the time to prepare to open the file is included. S3AInputStreamStatistics inputStreamStats = @@ -1892,6 +1928,14 @@ private FSDataInputStream executeOpen( fileInformation.applyOptions(readContext); LOG.debug("Opening '{}'", readContext); + if (this.analyticsAcceleratorEnabled) { + return new FSDataInputStream( + new S3ASeekableStream( + this.bucket, + pathToKey(path), + s3SeekableInputStreamFactory)); + } + if (this.prefetchEnabled) { Configuration configuration = getConf(); initLocalDirAllocatorIfNotInitialized(configuration); @@ -4421,9 +4465,11 @@ public void close() throws IOException { protected synchronized void stopAllServices() { try { trackDuration(getDurationTrackerFactory(), FILESYSTEM_CLOSE.getSymbol(), () -> { - closeAutocloseables(LOG, store); + closeAutocloseables(LOG, store, s3SeekableInputStreamFactory); store = null; s3Client = null; + crtClient = null; + s3SeekableInputStreamFactory = null; // At this point the S3A client is shut down, // now the executor pools are closed diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java new file mode 100644 index 0000000000000..23b4a603e640c --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java @@ -0,0 +1,68 @@ +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FSInputStream; + +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream; +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory; +import software.amazon.s3.analyticsaccelerator.util.S3URI; + +public class S3ASeekableStream extends FSInputStream { + + private S3SeekableInputStream inputStream; + private final String key; + + public static final Logger LOG = LoggerFactory.getLogger(S3ASeekableStream.class); + + + public S3ASeekableStream(String bucket, String key, S3SeekableInputStreamFactory s3SeekableInputStreamFactory) + throws IOException { + this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(bucket, key)); + this.key = key; + } + + @Override + public int read() throws IOException { + return inputStream.read(); + } + + @Override + public void seek(long pos) throws IOException { + inputStream.seek(pos); + } + + @Override + public long getPos() throws IOException { + return inputStream.getPos(); + } + + @Override + public void close() throws IOException { + if (inputStream != null) { + inputStream.close(); + inputStream = null; + super.close(); + } + } + + + public void readTail(byte[] buf, int off, int n) throws IOException { + inputStream.readTail(buf, off, n); + } + + @Override + public int read(byte[] buf, int off, int len) throws IOException { + return inputStream.read(buf, off, len); + } + + + @Override + public boolean seekToNewSource(long l) throws IOException { + return false; + } + +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java new file mode 100644 index 0000000000000..aa2de18dc231a --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java @@ -0,0 +1,78 @@ +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; + +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX; +import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_ENABLED_KEY; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; + +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration; +import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration; +import software.amazon.s3.analyticsaccelerator.util.PrefetchMode; + +public class ITestS3AS3SeekableStream extends AbstractS3ATestBase { + + final String PHYSICAL_IO_PREFIX = "physicalio"; + final String LOGICAL_IO_PREFIX = "logicalio"; + + @Test + public void testConnectorFrameWorkIntegration() throws IOException { + describe("Verify S3 connector framework integration"); + + Configuration conf = getConfiguration(); + removeBaseAndBucketOverrides(conf, ANALYTICS_ACCELERATOR_ENABLED_KEY); + conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, true); + + String testFile = "s3a://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz"; + S3AFileSystem s3AFileSystem = (S3AFileSystem) FileSystem.newInstance(new Path(testFile).toUri(), conf); + byte[] buffer = new byte[500]; + + try (FSDataInputStream inputStream = s3AFileSystem.open(new Path(testFile))) { + inputStream.seek(5); + inputStream.read(buffer, 0, 500); + } + + } + + + @Test + public void testConnectorFrameworkConfigurable() { + describe("Verify S3 connector framework reads configuration"); + + Configuration conf = getConfiguration(); + removeBaseAndBucketOverrides(conf); + + //Disable Predictive Prefetching + conf.set(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." + LOGICAL_IO_PREFIX + ".prefetching.mode", "all"); + + //Set Blobstore Capacity + conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", 1); + + ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX); + + S3SeekableInputStreamConfiguration configuration = S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration); + assertEquals(configuration.getLogicalIOConfiguration().getPrefetchingMode(), PrefetchMode.ALL); + assert configuration.getPhysicalIOConfiguration().getBlobStoreCapacity() == 1; + } + + @Test + public void testInvalidConfigurationThrows() { + describe("Verify S3 connector framework throws with invalid configuration"); + + Configuration conf = getConfiguration(); + removeBaseAndBucketOverrides(conf); + //Disable Sequential Prefetching + conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", -1); + + ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX); + assertThrows(IllegalArgumentException.class, () -> + S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration)); + } +}