Skip to content

Commit

Permalink
feat: implement S3 upload of transformation result
Browse files Browse the repository at this point in the history
Adds a simple S3 upload service that uploads the transformations result when
the necessary details and credentials were provided in the transformation
message.

SVC-1398
  • Loading branch information
florianesser committed Feb 26, 2024
1 parent bbeca02 commit 97ea3b1
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 21 deletions.
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ dependencies {
implementation 'org.springframework.boot:spring-boot-starter-amqp'
implementation 'org.apache.httpcomponents.client5:httpclient5:5.2.1'

// S3 API
implementation 'software.amazon.awssdk:s3:2.20.52'

// hale
implementation 'eu.esdihumboldt.unpuzzled:org.eclipse.equinox.nonosgi.registry:1.0.0'
implementation "to.wetransform:hale-cli:$cliVersion", {
Expand Down
35 changes: 19 additions & 16 deletions src/main/java/to/wetransform/hale/transformer/Transformer.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ public class Transformer {
private static final Logger LOG = LoggerFactory.getLogger(Transformer.class);

private final CountDownLatch latch = new CountDownLatch(1);
private TargetConfig targetConfig;
private ExecContext execContext;

public void transform(String sourceDataURL, String projectURL, String targetURL) {
public void transform(String sourceDataUrl, String projectUrl, String targetFileName) {
File transformationLogFile = null;

try {
Expand All @@ -62,22 +64,22 @@ public void transform(String sourceDataURL, String projectURL, String targetURL)
Init.init();
logPlatformVersion();

ExecContext context = new ExecContext();
execContext = new ExecContext();

// Set up project URI
URI projectUri = new URI(projectURL);
context.setProject(projectUri);
URI projectUri = new URI(projectUrl);
execContext.setProject(projectUri);
// Load project
Project project = loadProject(projectUri);

Value sourceCrs = initializeSourceConfig(context, sourceDataURL);
Value sourceCrs = initializeSourceConfig(execContext, sourceDataUrl);

TargetConfig targetConfig = configureTarget(project, sourceCrs);
configureTargetContext(context, tempDirectory, targetConfig, reportFile);
targetConfig = configureTarget(project, sourceCrs, targetFileName);
configureTargetContext(execContext, tempDirectory, targetConfig, reportFile);

// run the transformation
LOG.info("Transforming started.");
new ExecTransformation().run(context);
new ExecTransformation().run(execContext);

// evaluate results
boolean success = evaluateTransformationResults(reportFile);
Expand All @@ -86,11 +88,16 @@ public void transform(String sourceDataURL, String projectURL, String targetURL)
LOG.error("Failed to execute transformation: {}", t.getMessage(), t);
} finally {
latch.countDown();
deleteDir(transformationLogFile.getParentFile());
}
}

// Extracted Methods
public TargetConfig getTargetConfig() {
return targetConfig;
}

public ExecContext getExecContext() {
return execContext;
}

private Path createTempDirectory() throws IOException {
return Files.createTempDirectory("hale-transformer");
Expand Down Expand Up @@ -245,12 +252,11 @@ private Project loadProject(URI projectUri) {
return result;
}

private TargetConfig configureTarget(Project lp, Value sourceCrs) {
String filename;
private TargetConfig configureTarget(Project project, Value sourceCrs, String filename) {
String preset = null;
CustomTarget customTarget = null;

Map<String, IOConfiguration> presets = getPresets(lp);
Map<String, IOConfiguration> presets = getPresets(project);

// Preset names
String defaultPreset = "default";
Expand All @@ -260,12 +266,10 @@ private TargetConfig configureTarget(Project lp, Value sourceCrs) {
// Project contains hale-connect preset
preset = hcPreset;
IOConfiguration ioConfiguration = presets.get(hcPreset);
filename = determineTargetFileName(ioConfiguration);
} else if (presets.containsKey(defaultPreset)) {
// Project contains default preset
preset = defaultPreset;
IOConfiguration ioConfiguration = presets.get(defaultPreset);
filename = determineTargetFileName(ioConfiguration);
} else {
// No specific presets found, creating a custom target configuration

Expand All @@ -291,7 +295,6 @@ private TargetConfig configureTarget(Project lp, Value sourceCrs) {
// Create a custom target configuration
CustomTarget target = new CustomTarget(targetProvider, targetMap);

filename = "inspire.gml";
customTarget = target;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,45 @@
package to.wetransform.hale.transformer.api.messaging;

import java.io.Serializable;
import java.net.URI;
import java.nio.file.Paths;
import java.util.concurrent.TimeUnit;

import com.fasterxml.jackson.annotation.JsonProperty;
import eu.esdihumboldt.hale.app.transform.ExecContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.regions.Region;
import to.wetransform.hale.transformer.TargetConfig;
import to.wetransform.hale.transformer.Transformer;
import to.wetransform.hale.transformer.api.TransformerApiApplication;
import to.wetransform.hale.transformer.api.internal.CountdownLatchConfig;
import to.wetransform.hale.transformer.io.s3.S3Service;

@Service
public class TransformationMessageConsumer {
/**
*
*/
public record TransformationMessage(
@JsonProperty("projectUrl") String projectUrl, @JsonProperty("sourceDataUrl") String sourceDataUrl)
implements Serializable {}
@JsonProperty("projectUrl") String projectUrl,
@JsonProperty("sourceDataUrl") String sourceDataUrl,
@JsonProperty("targetFileName") String targetFileName,
@JsonProperty("s3Endpoint") String s3Endpoint,
@JsonProperty("s3Region") String s3Region,
@JsonProperty("s3BucketName") String s3BucketName,
@JsonProperty("s3AccessKey") String s3AccessKey,
@JsonProperty("s3SecretKey") String s3SecretKey)
implements Serializable {

public boolean hasS3Details() {
return s3Region != null && s3BucketName != null && s3AccessKey != null && s3SecretKey != null;
}
}

private static final Logger LOG = LoggerFactory.getLogger(TransformationMessageConsumer.class);

Expand All @@ -33,24 +52,55 @@ public TransformationMessageConsumer(CountdownLatchConfig countdownLatchConfig)

@RabbitListener(queues = TransformerApiApplication.QUEUE_NAME)
public void receiveMessage(final TransformationMessage message) {
LOG.info("Received projectUrl = " + message.projectUrl + " sourceDataUrl = " + message.sourceDataUrl);
LOG.info("Received projectUrl = " + message.projectUrl + " sourceDataUrl = " + message.sourceDataUrl
+ " targetFileName = " + message.targetFileName);

// TODO Implement mechanism to only accept a message from the queue if no
// transformation is currently running
if (message.projectUrl != null && message.sourceDataUrl() != null) {
if (message.projectUrl != null && message.sourceDataUrl() != null && message.targetFileName != null) {
Transformer tx = new Transformer();

try {
LOG.info("Transformation started");
tx.transform(message.sourceDataUrl(), message.projectUrl, null);
tx.transform(message.sourceDataUrl(), message.projectUrl, message.targetFileName);
tx.getLatch().await(countdownLatchConfig.getWaitingTime(), TimeUnit.MINUTES);
} catch (InterruptedException e) {
// TODO What should be done when the transformation fails or times out?
// - Simply requeuing the message is probably not helpful
// - Send a message back so that the producer can react?
Thread.currentThread().interrupt();
LOG.error("Transformation process timed out: " + e.getMessage(), e);
return;
}

if (message.hasS3Details()) {
try (S3Service s3 = buildS3Service(message)) {
ExecContext execContext = tx.getExecContext();
TargetConfig targetConfig = tx.getTargetConfig();
if (execContext != null && targetConfig != null) {
URI target = execContext.getTarget();
s3.putObject(
message.s3BucketName,
message.targetFileName,
Paths.get(target).toFile());
}
} catch (Throwable t) {
LOG.error("Error uploading result: " + t.getMessage(), t);
// TODO What now? Should the result just be discarded? Should we send a message back?
}
}
}
}

private static S3Service buildS3Service(TransformationMessage message) throws IllegalArgumentException {
URI endpoint = null;
if (message.s3Endpoint != null) {
endpoint = URI.create(message.s3Endpoint);
}

return new S3Service(
Region.of(message.s3Region),
AwsBasicCredentials.create(message.s3AccessKey, message.s3SecretKey),
endpoint);
}
}
60 changes: 60 additions & 0 deletions src/main/java/to/wetransform/hale/transformer/io/s3/S3Service.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package to.wetransform.hale.transformer.io.s3;

import java.io.File;
import java.net.URI;
import java.nio.file.Path;

import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;

/**
* Adapter for S3 object storages
*/
public class S3Service implements AutoCloseable {
private final S3Client s3Client;

/**
* Creates an S3 service instance with the given parameters.
*
* @param region S3 region to connect to
* @param credentials Credentials for accessing S3
* @param endpoint Optional endpoint URL. If null, AWS S3 endpoints are used.
*/
public S3Service(Region region, AwsCredentials credentials, URI endpoint) {
S3ClientBuilder builder =
S3Client.builder().region(region).credentialsProvider(StaticCredentialsProvider.create(credentials));

if (endpoint != null) {
builder = builder.endpointOverride(endpoint);
}

this.s3Client = builder.build();
}

/**
* Upload a file to an S3 bucket
*
* @param bucketName Name of the target bucket
* @param key Key (file name) of the created object
* @param file File to upload
* @return API response
*/
public PutObjectResponse putObject(String bucketName, String key, File file) {
PutObjectRequest req =
PutObjectRequest.builder().bucket(bucketName).key(key).build();

return s3Client.putObject(req, Path.of(file.toURI()));
}

/**
* Close the S3 connection
*/
public void close() {
this.s3Client.close();
}
}
10 changes: 10 additions & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
spring:
rabbitmq:
host: ${RABBITMQ_HOSTNAME}
username: ${RABBITMQ_USERNAME}
password: ${RABBITMQ_PASSWORD}

management:
health:
probes:
enabled: true

0 comments on commit 97ea3b1

Please sign in to comment.