Skip to content

Commit

Permalink
Integrate Analytics Accelerator Library for Amazon S3
Browse files Browse the repository at this point in the history
This commits is the initial integration of Analytics Accelerator
Library for Amazon S3 to S3A. It performs integration by introducing a new
S3ASeekableStream and modifying S3AFileSystem. Use of the Analytics
Accelerator Library is controlled by a configration and it is
off by default.
  • Loading branch information
fuatbasik committed Nov 26, 2024
1 parent 65a5bf3 commit 2d23c5b
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 1 deletion.
11 changes: 11 additions & 0 deletions hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,17 @@
<artifactId>amazon-s3-encryption-client-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>software.amazon.s3.analyticsaccelerator</groupId>
<artifactId>analyticsaccelerator-s3</artifactId>
<version>0.0.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk.crt</groupId>
<artifactId>aws-crt</artifactId>
<version>0.29.10</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1760,4 +1760,22 @@ private Constants() {
* Value: {@value}.
*/
public static final String S3A_IO_RATE_LIMIT = "fs.s3a.io.rate.limit";

/**
* Config to enable Analytics Accelerator Library for Amazon S3
* https://github.com/awslabs/analytics-accelerator-s3
*/
public static final String ANALYTICS_ACCELERATOR_ENABLED_KEY = "fs.s3a.analytics.accelerator.enabled";

/**
* Default value for {@link #ANALYTICS_ACCELERATOR_ENABLED_KEY }
* Value {@value}.
*/
public static final boolean ANALYTICS_ACCELERATOR_ENABLED_DEFAULT = false;

/**
* Prefix to configure Analytics Accelerator Library
*/
public static final String ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX = "fs.s3a.analytics.accelerator";

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@

import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncClient;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.GetBucketLocationRequest;
Expand Down Expand Up @@ -87,6 +89,11 @@
import software.amazon.awssdk.transfer.s3.model.Copy;
import software.amazon.awssdk.transfer.s3.model.CopyRequest;

import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;

import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -317,6 +324,13 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private S3Client s3Client;

/**
* CRT-Based S3Client created of analytics accelerator library is enabled
* and managed by the ClientManager. Analytics accelerator library can be
* enabled with {@link Constants#ANALYTICS_ACCELERATOR_ENABLED_KEY}
*/
private S3AsyncClient crtClient;

// initial callback policy is fail-once; it's there just to assist
// some mock tests and other codepaths trying to call the low level
// APIs on an uninitialized filesystem.
Expand Down Expand Up @@ -344,6 +358,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
// If true, the prefetching input stream is used for reads.
private boolean prefetchEnabled;

// If true, S3SeekableInputStream from Analytics Accelerator for Amazon S3 will be used.
private boolean analyticsAcceleratorEnabled;

// Size in bytes of a single prefetch block.
private int prefetchBlockSize;

Expand Down Expand Up @@ -525,6 +542,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private boolean s3AccessGrantsEnabled;

/**
* Factory to create S3SeekableInputStream if {@link this#analyticsAcceleratorEnabled} is true
*/
private S3SeekableInputStreamFactory s3SeekableInputStreamFactory;

/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
private static void addDeprecatedKeys() {
Expand Down Expand Up @@ -672,6 +694,7 @@ public void initialize(URI name, Configuration originalConf)
s3ExpressStore);

this.prefetchEnabled = conf.getBoolean(PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT);
this.analyticsAcceleratorEnabled = conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, ANALYTICS_ACCELERATOR_ENABLED_DEFAULT);
long prefetchBlockSizeLong =
longBytesOption(conf, PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE, 1);
if (prefetchBlockSizeLong > (long) Integer.MAX_VALUE) {
Expand Down Expand Up @@ -718,6 +741,17 @@ public void initialize(URI name, Configuration originalConf)
// the encryption algorithms)
ClientManager clientManager = createClientManager(name, delegationTokensEnabled);

if (this.analyticsAcceleratorEnabled) {
LOG.info("Using S3SeekableInputStream");
this.crtClient = S3CrtAsyncClient.builder().maxConcurrency(600).build();
ConnectorConfiguration configuration = new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
S3SeekableInputStreamConfiguration seekableInputStreamConfiguration =
S3SeekableInputStreamConfiguration.fromConfiguration(configuration);
this.s3SeekableInputStreamFactory =
new S3SeekableInputStreamFactory(
new S3SdkObjectClient(this.crtClient), seekableInputStreamConfiguration);
}

inputPolicy = S3AInputPolicy.getPolicy(
conf.getTrimmed(INPUT_FADVISE,
Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT),
Expand Down Expand Up @@ -1876,6 +1910,8 @@ private FSDataInputStream executeOpen(
final Path path,
final OpenFileSupport.OpenFileInformation fileInformation)
throws IOException {


// create the input stream statistics before opening
// the file so that the time to prepare to open the file is included.
S3AInputStreamStatistics inputStreamStats =
Expand All @@ -1892,6 +1928,14 @@ private FSDataInputStream executeOpen(
fileInformation.applyOptions(readContext);
LOG.debug("Opening '{}'", readContext);

if (this.analyticsAcceleratorEnabled) {
return new FSDataInputStream(
new S3ASeekableStream(
this.bucket,
pathToKey(path),
s3SeekableInputStreamFactory));
}

if (this.prefetchEnabled) {
Configuration configuration = getConf();
initLocalDirAllocatorIfNotInitialized(configuration);
Expand Down Expand Up @@ -4421,9 +4465,11 @@ public void close() throws IOException {
protected synchronized void stopAllServices() {
try {
trackDuration(getDurationTrackerFactory(), FILESYSTEM_CLOSE.getSymbol(), () -> {
closeAutocloseables(LOG, store);
closeAutocloseables(LOG, store, s3SeekableInputStreamFactory);
store = null;
s3Client = null;
crtClient = null;
s3SeekableInputStreamFactory = null;

// At this point the S3A client is shut down,
// now the executor pools are closed
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package org.apache.hadoop.fs.s3a;

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.FSInputStream;

import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
import software.amazon.s3.analyticsaccelerator.util.S3URI;

public class S3ASeekableStream extends FSInputStream {

private S3SeekableInputStream inputStream;
private final String key;

public static final Logger LOG = LoggerFactory.getLogger(S3ASeekableStream.class);


public S3ASeekableStream(String bucket, String key, S3SeekableInputStreamFactory s3SeekableInputStreamFactory)
throws IOException {
this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(bucket, key));
this.key = key;
}

@Override
public int read() throws IOException {
return inputStream.read();
}

@Override
public void seek(long pos) throws IOException {
inputStream.seek(pos);
}

@Override
public long getPos() throws IOException {
return inputStream.getPos();
}

@Override
public void close() throws IOException {
if (inputStream != null) {
inputStream.close();
inputStream = null;
super.close();
}
}


public void readTail(byte[] buf, int off, int n) throws IOException {
inputStream.readTail(buf, off, n);
}

@Override
public int read(byte[] buf, int off, int len) throws IOException {
return inputStream.read(buf, off, len);
}


@Override
public boolean seekToNewSource(long l) throws IOException {
return false;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package org.apache.hadoop.fs.s3a;

import java.io.IOException;

import org.junit.Test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;

import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;

public class ITestS3AS3SeekableStream extends AbstractS3ATestBase {

final String PHYSICAL_IO_PREFIX = "physicalio";
final String LOGICAL_IO_PREFIX = "logicalio";

@Test
public void testConnectorFrameWorkIntegration() throws IOException {
describe("Verify S3 connector framework integration");

Configuration conf = getConfiguration();
removeBaseAndBucketOverrides(conf, ANALYTICS_ACCELERATOR_ENABLED_KEY);
conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, true);

String testFile = "s3a://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz";
S3AFileSystem s3AFileSystem = (S3AFileSystem) FileSystem.newInstance(new Path(testFile).toUri(), conf);
byte[] buffer = new byte[500];

try (FSDataInputStream inputStream = s3AFileSystem.open(new Path(testFile))) {
inputStream.seek(5);
inputStream.read(buffer, 0, 500);
}

}


@Test
public void testConnectorFrameworkConfigurable() {
describe("Verify S3 connector framework reads configuration");

Configuration conf = getConfiguration();
removeBaseAndBucketOverrides(conf);

//Disable Predictive Prefetching
conf.set(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." + LOGICAL_IO_PREFIX + ".prefetching.mode", "all");

//Set Blobstore Capacity
conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", 1);

ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);

S3SeekableInputStreamConfiguration configuration = S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration);
assertEquals(configuration.getLogicalIOConfiguration().getPrefetchingMode(), PrefetchMode.ALL);
assert configuration.getPhysicalIOConfiguration().getBlobStoreCapacity() == 1;
}

@Test
public void testInvalidConfigurationThrows() {
describe("Verify S3 connector framework throws with invalid configuration");

Configuration conf = getConfiguration();
removeBaseAndBucketOverrides(conf);
//Disable Sequential Prefetching
conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", -1);

ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
assertThrows(IllegalArgumentException.class, () ->
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration));
}
}

0 comments on commit 2d23c5b

Please sign in to comment.