From 3b09737627e9e3a62525fb35b6d226b2f0877d0c Mon Sep 17 00:00:00 2001 From: Emanuela Epure <67077116+emanuelaepure10@users.noreply.github.com> Date: Thu, 14 Dec 2023 22:56:22 +0100 Subject: [PATCH] feat: add Transformer Add Transformer and related classes no-ticket --- .gitignore | 2 + .../hale/transformer/CustomTarget.java | 15 +- .../transformer/ResultCallbackHelper.java | 127 +++++ .../hale/transformer/RunContext.java | 28 ++ .../hale/transformer/SourceConfig.java | 10 +- .../hale/transformer/TargetConfig.java | 3 +- .../hale/transformer/Transformer.java | 434 ++++++++++++++++-- .../hale/transformer/TransformerConfig.java | 158 ++++++- .../hale/transformer/api/Init.java | 56 +++ .../api/TransformerApiApplication.java | 86 ++-- .../CustomMetaClassCreationHandle.java | 78 ++++ .../TransformationMessageConsumer.java | 62 +-- .../CustomURLStreamHandlerFactory.java | 32 ++ .../api/urlhandler/ServiceURLConnection.java | 128 ++++++ .../urlhandler/ServiceURLStreamHandler.java | 26 ++ .../api/urlhandler/TokenProvider.java | 8 + 16 files changed, 1120 insertions(+), 133 deletions(-) create mode 100644 src/main/java/to/wetransform/hale/transformer/ResultCallbackHelper.java create mode 100644 src/main/java/to/wetransform/hale/transformer/RunContext.java create mode 100644 src/main/java/to/wetransform/hale/transformer/api/Init.java create mode 100644 src/main/java/to/wetransform/hale/transformer/api/internal/CustomMetaClassCreationHandle.java create mode 100644 src/main/java/to/wetransform/hale/transformer/api/urlhandler/CustomURLStreamHandlerFactory.java create mode 100644 src/main/java/to/wetransform/hale/transformer/api/urlhandler/ServiceURLConnection.java create mode 100644 src/main/java/to/wetransform/hale/transformer/api/urlhandler/ServiceURLStreamHandler.java create mode 100644 src/main/java/to/wetransform/hale/transformer/api/urlhandler/TokenProvider.java diff --git a/.gitignore b/.gitignore index bec0c6a..8f0e027 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,5 @@ out/ *.code-workspace # Local History for Visual Studio Code .history/ +.vscode/settings.json + diff --git a/src/main/java/to/wetransform/hale/transformer/CustomTarget.java b/src/main/java/to/wetransform/hale/transformer/CustomTarget.java index 63c1f8d..29acfa1 100644 --- a/src/main/java/to/wetransform/hale/transformer/CustomTarget.java +++ b/src/main/java/to/wetransform/hale/transformer/CustomTarget.java @@ -3,11 +3,20 @@ import java.util.HashMap; import java.util.Map; +import org.json.JSONObject; + import eu.esdihumboldt.hale.common.core.io.Value; public record CustomTarget(String providerId, Map settings) { - public CustomTarget(String providerId) { - this(providerId, new HashMap<>()); - } + public CustomTarget(String providerId) { + this(providerId, new HashMap<>()); + } + + public void setProviderId(String string) { + } + + public JSONObject getSettings() { + return null; + } } diff --git a/src/main/java/to/wetransform/hale/transformer/ResultCallbackHelper.java b/src/main/java/to/wetransform/hale/transformer/ResultCallbackHelper.java new file mode 100644 index 0000000..dbb157e --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/ResultCallbackHelper.java @@ -0,0 +1,127 @@ +package to.wetransform.hale.transformer; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import java.util.zip.GZIPOutputStream; + +import org.json.JSONException; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Helper to perform transformation result callbacks. + */ +public class ResultCallbackHelper { + + /** + * The logger instance for this class. + */ + private static Logger logger = LoggerFactory.getLogger(ResultCallbackHelper.class); + + /** + * Perform the transformation result callback. + * + * @param success Whether or not the transformation has been successful. + * @param logs The transformation log. + * @param callbackUrl The callback url. + * @param token The jwt token for the request. + * @param jobId The transformation job id. + */ + public static void performResultCallback(boolean success, String logs, Optional stats, + String callbackUrl, String token, Optional jobId) { + try { + // Create transformation result object. + JSONObject resultObj = new JSONObject(); + resultObj.put("success", success); + + JSONObject propertiesObj = new JSONObject(); + if (Optional.ofNullable(logs).isPresent()) { + propertiesObj.put("logs", logs); + } + + if (jobId.isPresent()) { + propertiesObj.put("jobId", jobId.get()); + } else { + logger.warn("Transformation result callback does not contain a jobId."); + } + + if (stats.isPresent()) { + propertiesObj.put("stats", stats.get()); + } + + resultObj.put("properties", propertiesObj); + + // Send result object to callback URL. + sendTransformationResult(resultObj, callbackUrl, token); + + } catch (JSONException e) { + logger.warn("Failed to create result object to be sent "); + } + } + + /** + * Send the transformation result object to the given url. + * + * @param result The transformation result object. + * @param url The callback url. + * @param token The jwt token. + */ + private static void sendTransformationResult(JSONObject result, String url, String token) { + + try { + logger.info("Sending transformation result to " + url.toString()); + + // Create http request to send result object. + URL sendUrl = new URL(url); + HttpURLConnection http = (HttpURLConnection) sendUrl.openConnection(); + if (token != null) { + http.setRequestProperty("Authorization", "Bearer " + token); + } else { + logger.warn("Transformation callback is being performed without JWT token!"); + } + http.setRequestProperty("Content-Type", "application/json; charset=UTF-8"); + http.setRequestProperty("Content-Encoding", "gzip"); + http.setDoOutput(true); + http.setDoInput(true); + http.setRequestMethod("PUT"); + http.setConnectTimeout(10000); + + // write zipped content + OutputStream os = http.getOutputStream(); + OutputStream gzip = new GZIPOutputStream(os); + + try { + OutputStreamWriter osw = new OutputStreamWriter(gzip, StandardCharsets.UTF_8); + osw.write(result.toString()); + osw.flush(); + osw.close(); + } catch (IOException e) { + logger.warn("Failed to write result callback request body.", e); + } finally { + os.close(); + gzip.close(); + } + + // handle http response + int statusCode = http.getResponseCode(); + String statusMessage = http.getResponseMessage(); + String resLogMsg = "Transformation result callback endpoint responded with status code " + statusCode + ": " + + statusMessage; + if (statusCode != 200) { + logger.warn(resLogMsg); + } else { + logger.info(resLogMsg); + } + + http.disconnect(); + } catch (Throwable t) { + logger.warn("Failed to perform transformation result callback.", t); + } + } +} diff --git a/src/main/java/to/wetransform/hale/transformer/RunContext.java b/src/main/java/to/wetransform/hale/transformer/RunContext.java new file mode 100644 index 0000000..50a722f --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/RunContext.java @@ -0,0 +1,28 @@ +package to.wetransform.hale.transformer; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.io.FileUtils; + +public class RunContext { + + private final List tempFiles = new ArrayList<>(); + + public File createTempDir() throws IOException { + Path path = Files.createTempDirectory("hale-progen"); + tempFiles.add(path); + return path.toFile(); + } + + public void cleanUp() throws IOException { + for (Path path : tempFiles) { + FileUtils.deleteDirectory(path.toFile()); + } + tempFiles.clear(); + } +} \ No newline at end of file diff --git a/src/main/java/to/wetransform/hale/transformer/SourceConfig.java b/src/main/java/to/wetransform/hale/transformer/SourceConfig.java index 927bae2..4bf2dff 100644 --- a/src/main/java/to/wetransform/hale/transformer/SourceConfig.java +++ b/src/main/java/to/wetransform/hale/transformer/SourceConfig.java @@ -8,10 +8,10 @@ import eu.esdihumboldt.hale.common.core.io.Value; -public record SourceConfig( - URI location, String providerId, Map settings, boolean transform, List attachments) { +public record SourceConfig(URI location, String providerId, Map settings, boolean transform, + List attachments) { - public SourceConfig(URI location, String providerId) { - this(location, providerId, new HashMap<>(), true, new ArrayList<>()); - } + public SourceConfig(URI location, String providerId) { + this(location, providerId, new HashMap<>(), true, new ArrayList<>()); + } } diff --git a/src/main/java/to/wetransform/hale/transformer/TargetConfig.java b/src/main/java/to/wetransform/hale/transformer/TargetConfig.java index 38a6548..986fcb8 100644 --- a/src/main/java/to/wetransform/hale/transformer/TargetConfig.java +++ b/src/main/java/to/wetransform/hale/transformer/TargetConfig.java @@ -1,3 +1,4 @@ package to.wetransform.hale.transformer; -public record TargetConfig(String filename, String preset, CustomTarget customTarget) {} +public record TargetConfig(String filename, String preset, CustomTarget customTarget) { +} diff --git a/src/main/java/to/wetransform/hale/transformer/Transformer.java b/src/main/java/to/wetransform/hale/transformer/Transformer.java index 4f1465b..f7fbd45 100644 --- a/src/main/java/to/wetransform/hale/transformer/Transformer.java +++ b/src/main/java/to/wetransform/hale/transformer/Transformer.java @@ -1,80 +1,416 @@ package to.wetransform.hale.transformer; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; import java.text.MessageFormat; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Map.Entry; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; + +import org.osgi.framework.Version; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Strings; + +import org.apache.commons.collections.buffer.CircularFifoBuffer; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.LineIterator; +import org.apache.commons.io.output.TeeOutputStream; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.json.JSONObject; import eu.esdihumboldt.hale.app.transform.ExecContext; +import eu.esdihumboldt.hale.app.transform.ExecTransformation; import eu.esdihumboldt.hale.common.core.HalePlatform; +import eu.esdihumboldt.hale.common.core.io.HaleIO; +import eu.esdihumboldt.hale.common.core.io.Value; +import eu.esdihumboldt.hale.common.core.io.extension.IOProviderDescriptor; +import eu.esdihumboldt.hale.common.core.io.project.model.IOConfiguration; import eu.esdihumboldt.hale.common.core.io.project.model.Project; import eu.esdihumboldt.hale.common.core.io.supplier.DefaultInputSupplier; +import eu.esdihumboldt.hale.common.instance.io.InstanceIO; +import eu.esdihumboldt.hale.common.instance.io.InstanceWriter; import eu.esdihumboldt.util.io.IOUtils; -import org.osgi.framework.Version; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import to.wetransform.halecli.internal.Init; +import to.wetransform.hale.transformer.api.Init; public class Transformer { - private static final Logger LOG = LoggerFactory.getLogger(Transformer.class); + /** + * Limit for log size, to include complete log. + * + * Current value: ~ 5MB with 4 byte characters (so likely ~1.3MB) We need to + * stay well below 16 MB due to MongoDB restrictions (plus megabytes of logs are + * not really helpful) + */ + private static final int MAX_LOG_SIZE_CHARACTERS = 1310720; + + private static final String MESSAGE_LINE_TRUNCATED = "[Line truncated...]"; + private static final String MESSAGE_LOG_TRUNCATED = "[Log truncated...]"; + + /** + * If the log is truncated, include lines from the end. + */ + private static final int TRUNCATED_INCLUDE_END_MAX_LINES = 50; + /** + * If the lines at the end to include exceed this limit, they are not included. + */ + private static final int TRUNCATED_INCLUDE_END_MAX_CHARACTERS = 25600; + + private static final Logger LOG = LoggerFactory.getLogger(Transformer.class); + + private CountDownLatch latch = new CountDownLatch(1); + private final TransformerConfig config; + + public Transformer(TransformerConfig config) { + this.config = config; + } + + public void transform(String sourceDataURL, String projectURL, String targetURL) { + boolean success = false; + PrintStream sysOut = System.out; + File transformationLogFile = null; + + RunContext run = new RunContext(); + + // init working directory + File workingDir = null; + File reportFile = null; + try { + if (config.getWorkingDir() != null) { + workingDir = new File(config.getWorkingDir()); + Files.createDirectories(workingDir.toPath()); + } else { + workingDir = run.createTempDir(); + } + } catch (Throwable t) { + performCallback(success, Optional.ofNullable(transformationLogFile), Optional.empty(), + Optional.ofNullable(t)); + return; + } + + LOG.info("Using working directory " + workingDir.getAbsolutePath()); + + try { + // setup report file + reportFile = new File(workingDir, "reports.log"); + reportFile.delete(); + reportFile.createNewFile(); + } catch (Throwable t) { + performCallback(success, Optional.ofNullable(transformationLogFile), Optional.empty(), + Optional.ofNullable(t)); + return; + } + + try { + // setup log file + transformationLogFile = new File(workingDir, "transformation.log"); + transformationLogFile.delete(); + transformationLogFile.createNewFile(); + + // fork streams + FileOutputStream transformationLogOut = new FileOutputStream(transformationLogFile, true); + OutputStream tOut = new TeeOutputStream(sysOut, transformationLogOut); + System.setOut(new PrintStream(tOut)); + } catch (Throwable t) { + performCallback(success, Optional.ofNullable(transformationLogFile), Optional.empty(), + Optional.ofNullable(t)); + return; + } + + try { + LOG.info("Startup..."); + + long heapMaxSize = Runtime.getRuntime().maxMemory(); + LOG.info("Maximum heap size configured as " + IOUtils.humanReadableByteCount(heapMaxSize, false)); + + // initialize + Init.init(config); + + Version version = HalePlatform.getCoreVersion(); + LOG.info(MessageFormat.format("Launching hale-transformer {0}...", version.toString())); + + ExecContext context = new ExecContext(); + + // Set up project URI + URI projectUri = new URI(projectURL); + context.setProject(projectUri); + + // Load project + Project project = loadProject(projectUri); + + // Set up sources + List sourceList = List.of(new URI(sourceDataURL)); + context.setSources(sourceList); + + // Set up source provider IDs + List sourceProviderIds = List.of("eu.esdihumboldt.hale.io.gml.reader"); + context.setSourceProviderIds(sourceProviderIds); + + // Set up custom defaultSrs + List> sourcesSettings = List.of(Map.of("defaultSrs", Value.of("something"))); + context.setSourcesSettings(sourcesSettings); + + // extract detected source data crs and use in target config + Value sourceCrs = null; + if (!sourceConfigs.isEmpty()) { + Throwable error = null; + // try each source config, in case it is not set for some + for (int i = 0; i < sourceConfigs.size() && (sourceCrs == null || sourceCrs.isEmpty()); i++) { + try { + sourceCrs = sourceConfigs.get(i).getSettings().get("defaultSrs"); + } catch (Throwable t) { + error = t; + } + } + + if (error != null) { + log.warn("Could not determine source data CRS", error); + } + else if (sourceCrs == null || sourceCrs.isEmpty()) { + log.warn("Unable to determine source data CRS: None of {} sources is configured with a CRS", + sourceConfigs.size()); + } + } + else { + log.warn("Unable to determine source data CRS: No source configured"); + } + } catch (URISyntaxException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + // TargetConfig targetConfig = configureTarget(project, sourceCrs); + + try { + // run the transformation + LOG.info("Transforming..."); + new ExecTransformation().run(context); + + LOG.info("Transformation complete."); + } catch (Throwable t) { + LOG.error("Failed to execute transformation: " + t.getMessage(), t); + } finally { + latch.countDown(); + } + } + + /** + * Perform the result callback if configured. + * + * @param success The success state of the transformation + * @param optThrowable An optional throwable + */ + private void performCallback(boolean success, Optional optLogFile, Optional stats, + Optional optThrowable) { + if (config.getCallbackUrl().isPresent()) { + String logs = null; + + // get logs from log file + if (optLogFile.isPresent()) { + File logFile = optLogFile.get(); + try (LineIterator it = FileUtils.lineIterator(logFile, "UTF-8")) { + logs = processLog(it, MAX_LOG_SIZE_CHARACTERS, TRUNCATED_INCLUDE_END_MAX_LINES, + TRUNCATED_INCLUDE_END_MAX_CHARACTERS); + } catch (IOException e) { + logs = e.getMessage(); + } + logFile.delete(); + } + + // append stacktrace from exception if any + if (logs != null) { + if (optThrowable.isPresent()) { + Throwable t = optThrowable.get(); + StringBuilder lb = new StringBuilder(logs); + lb.append(System.lineSeparator()); + lb.append(ExceptionUtils.getStackTrace(t)); + logs = lb.toString(); + } + } + + // invoke callback url + String url = config.getCallbackUrl().get(); + ResultCallbackHelper.performResultCallback(success, logs, stats, url, config.getJwtToken(), + config.getJobId()); + } + } + + public static String processLog(LineIterator it, int maxLogSizeCharacters, int maxEndLines, + int maxEndLinesCharacters) { + StringBuilder logBuilder = new StringBuilder(); + + boolean truncated = false; + + while (it.hasNext() && !truncated && logBuilder.length() < maxLogSizeCharacters) { + String line = it.nextLine(); + + int left = maxLogSizeCharacters - logBuilder.length(); + if (left >= line.length()) { + logBuilder.append(line); + logBuilder.append('\n'); + } else { + if (left > MESSAGE_LINE_TRUNCATED.length()) { + logBuilder.append(line, 0, left - MESSAGE_LINE_TRUNCATED.length()); + logBuilder.append(MESSAGE_LINE_TRUNCATED); + logBuilder.append('\n'); + } + truncated = true; + } + } + + if (it.hasNext()) { + truncated = true; + } + + if (truncated) { + // attempt to add lines from the end of the log + CircularFifoBuffer endLines = new CircularFifoBuffer(maxEndLines); + boolean overflow = false; + while (it.hasNext()) { + if (endLines.size() == maxEndLines) { + overflow = true; + } + endLines.add(it.nextLine()); + } + + int endSize = endLines.stream().mapToInt(l -> l.toString().length()).sum(); + if (endSize > maxEndLinesCharacters) { + // don't add end lines + logBuilder.append(MESSAGE_LOG_TRUNCATED); + logBuilder.append('\n'); + } else { + if (!overflow) { + // nothing was truncated -> just add the lines + } else { + logBuilder.append("[Log truncated... following are the last "); + logBuilder.append(endLines.size()); + logBuilder.append(" lines of the log]"); + logBuilder.append('\n'); + } + + // add end lines + for (Object line : endLines) { + logBuilder.append(line.toString()); + logBuilder.append('\n'); + } + } + + } + + return logBuilder.toString(); + } - private CountDownLatch latch = new CountDownLatch(1); + private Project loadProject(URI projectUri) { + DefaultInputSupplier supplier = new DefaultInputSupplier(projectUri); + Project result = null; + try (InputStream in = supplier.getInput()) { + result = Project.load(in); + } catch (Exception e) { + LOG.warn("Could not load project file to determine presets: " + e.getStackTrace()); + } + return result; + } - public void transform(/* TODO add parameters for data and project sources */ ) { - // TODO setup log files for reports and transformation log + // private TargetConfig configureTarget(Project lp, Value sourceCrs) { + // Map presets = getPresets(lp); - long heapMaxSize = Runtime.getRuntime().maxMemory(); - LOG.info("Maximum heap size configured as " + IOUtils.humanReadableByteCount(heapMaxSize, false)); + // // create a target configuration + // //FIXME for now this is a fixed configuration - Init.init(); + // TargetConfig result = new TargetConfig(lp, sourceCrs, null); - Version version = HalePlatform.getCoreVersion(); - LOG.info(MessageFormat.format("Launching hale-transformer {0}...", version.toString())); + // String defaultPreset = "default"; + // String hcPreset = "hale-connect"; - ExecContext context = new ExecContext(); + // if (presets.containsKey(hcPreset)) { + // // project contains hale connect preset + // result.setPreset(hcPreset); + // IOConfiguration preset = presets.get(hcPreset); + // result.setFilename(determineTargetFileName(preset)); + // } + // else if (presets.containsKey(defaultPreset)) { + // // project contains default preset + // result.setPreset(defaultPreset); + // IOConfiguration preset = presets.get(defaultPreset); + // result.setFilename(determineTargetFileName(preset)); + // } + // else { + // CustomTarget target = new CustomTarget(); - // URI projectUri = .... - // context.setProject(projectUri); - // Project project = loadProject(projectUri); + // // WFS 2 FeatureCollection + // // target.setProviderId("eu.esdihumboldt.hale.io.wfs.fc.write-2.0"); + // // GML FeatureCollection (to make bsp happy) + // target.setProviderId("eu.esdihumboldt.hale.io.gml.writer"); - // context.setSources(...) - // context.setSourceProviderIds(...) - // context.setSourcesSettings(...) + // // for testing: + // target.getSettings().put("xml.pretty", Value.of(true)); + // target.getSettings().put("crs.epsg.prefix", + // Value.of("http://www.opengis.net/def/crs/EPSG/0/")); - // Value sourceCrs = null; - // TODO determine source CRS + // // use crs from source data analysis if available and a valid epsg code, + // otherwise fallback to EPSG:4326 + // Value targetCrs = (sourceCrs != null && + // sourceCrs.getStringRepresentation().startsWith("code:EPSG")) ? sourceCrs : + // Value.of("code:EPSG:4326"); + // target.getSettings().put("crs", targetCrs); + // LOG.info("Using " + targetCrs.getStringRepresentation() + " as transformation + // target crs"); - // TargetConfig targetConfig = configureTarget(project, sourceCrs); + // result.setFilename("inspire.gml"); + // result.setCustomTarget(target); + // } - try { - // run the transformation + // return result; + // } - LOG.info("Transforming..."); - TimeUnit.SECONDS.sleep(30); - // new ExecTransformation().run(context); + /** + * Get all export presets from the project. + * + * @param project the hale project object + * @return the map of presets + */ + private Map getPresets(Project project) { + Map exportPresets = new HashMap<>(); + if (project == null) { + return exportPresets; + } - LOG.info("Transformation complete."); - } catch (Throwable t) { - LOG.error("Failed to execute transformation: " + t.getMessage(), t); - } finally { - latch.countDown(); - } - } + for (Entry preset : project.getExportConfigurations().entrySet()) { + IOConfiguration conf = preset.getValue(); + if (InstanceIO.ACTION_SAVE_TRANSFORMED_DATA.equals(conf.getActionId())) { + // configuration for data export + IOConfiguration c = conf.clone(); + String name = preset.getKey(); - private Project loadProject(URI projectUri) { - DefaultInputSupplier supplier = new DefaultInputSupplier(projectUri); - Project result = null; - try (InputStream in = supplier.getInput()) { - result = Project.load(in); - } catch (Exception e) { - LOG.warn("Could not load project file to determine presets: " + e.getStackTrace()); - } - return result; - } + // check provider + IOProviderDescriptor factory = HaleIO.findIOProviderFactory(InstanceWriter.class, null, + c.getProviderId()); + if (factory != null) { + if (Strings.isNullOrEmpty(name)) { + name = factory.getDisplayName(); + } + exportPresets.put(name, c); + } else { + LOG.error(MessageFormat.format("I/O provider {1} for export preset {0} not found", name, + c.getProviderId())); + } + } + } + return exportPresets; + } - public CountDownLatch getLatch() { - return latch; - } + public CountDownLatch getLatch() { + return latch; + } } diff --git a/src/main/java/to/wetransform/hale/transformer/TransformerConfig.java b/src/main/java/to/wetransform/hale/transformer/TransformerConfig.java index e0b76a4..f0ea446 100644 --- a/src/main/java/to/wetransform/hale/transformer/TransformerConfig.java +++ b/src/main/java/to/wetransform/hale/transformer/TransformerConfig.java @@ -1,6 +1,160 @@ package to.wetransform.hale.transformer; +import java.util.Optional; + +/** + * General configuration needed for setting up the transformer, independently of + * the concrete transformation to run. + */ public class TransformerConfig { - // empty for now -} + /** + * The JWT token to authenticate with against services. Should be a user token. + */ + private String jwtToken; + + /** + * Base URL of the service storing transformation projects. + */ + private String projectStoreBaseUrl; + + /** + * Base URL of the service storing datasets. + */ + private String serviceBaseUrl; + + /** + * Base URL of the attachment store. + */ + private String attachmentStoreBaseUrl; + + /** + * Path of the working directory for the transformation result and reports. + */ + private String workingDir; + + /** + * Whether or not the accessUrl property should be used to download source data + * for transformation. If set to false, the /raw endpoint of the bucket-service + * will be used. + */ + private Boolean useAccessUrl; + + /** + * The optional callback URL to be invoked with the result of the + * transformation. + */ + private Optional callbackUrl; + + /** + * The optional job id of this transformation. + */ + private Optional jobId; + + public TransformerConfig(String jwtToken, String projectStoreBaseUrl, String serviceBaseUrl, + String attachmentStoreBaseUrl, String workingDir, Boolean useAccessUrl, Optional callbackUrl, + Optional jobId) { + this.jwtToken = jwtToken; + this.projectStoreBaseUrl = projectStoreBaseUrl; + this.serviceBaseUrl = serviceBaseUrl; + this.attachmentStoreBaseUrl = attachmentStoreBaseUrl; + this.workingDir = workingDir; + this.useAccessUrl = useAccessUrl; + this.callbackUrl = callbackUrl; + this.jobId = jobId; + } + + public static TransformerConfig createDefault(String overrideToken, String overrideDir, + Optional overrideCallbackUrl, Optional overrideJobId) { + String jwtToken = (overrideToken != null) ? overrideToken : System.getenv("JWT_TOKEN"); + if (jwtToken == null) { + throw new IllegalStateException("Missing JWT token configuration"); + } + String projectStoreBaseUrl = System.getenv("PROJECT_STORE_BASE_URL") != null + ? System.getenv("PROJECT_STORE_BASE_URL") + : "http://localdocker:9061"; + String serviceBaseUrl = System.getenv("SERVICE_BASE_URL") != null ? System.getenv("SERVICE_BASE_URL") + : "http://localdocker:9010"; + String attachmentStoreBaseUrl; + if (System.getenv("PROJECT_STORE_BASE_URL") != null && System.getenv("SERVICE_BASE_URL") != null) { + attachmentStoreBaseUrl = (System.getenv("ATTACHMENT_STORE_BASE_URL") != null) + ? System.getenv("ATTACHMENT_STORE_BASE_URL") + : ""; + } else { + attachmentStoreBaseUrl = "http://localdocker:9150"; + } + String workingDir = (overrideDir != null) ? overrideDir : System.getenv("TRANSFORMATION_DIR"); + Optional callbackUrl = (overrideCallbackUrl != null) ? overrideCallbackUrl + : Optional.ofNullable(System.getenv("CALLBACK_URL")); + Optional jobId = (overrideJobId != null) ? overrideJobId : Optional.ofNullable(System.getenv("JOB_ID")); + Boolean useAccessUrl = Boolean.valueOf(System.getenv("USE_ACCESS_URL")); + + return new TransformerConfig(jwtToken, projectStoreBaseUrl, serviceBaseUrl, attachmentStoreBaseUrl, workingDir, + useAccessUrl, callbackUrl, jobId); + } + + // Getters and setters for private fields... + public String getAttachmentStoreBaseUrl() { + return attachmentStoreBaseUrl; + } + + public void setAttachmentStoreBaseUrl(String attachmentStoreBaseUrl) { + this.attachmentStoreBaseUrl = attachmentStoreBaseUrl; + } + + public Optional getCallbackUrl() { + return callbackUrl; + } + + public void setCallbackUrl(Optional callbackUrl) { + this.callbackUrl = callbackUrl; + } + + public Optional getJobId() { + return jobId; + } + + public void setJobId(Optional jobId) { + this.jobId = jobId; + } + + public String getJwtToken() { + return jwtToken; + } + + public void setJwtToken(String jwtToken) { + this.jwtToken = jwtToken; + } + + public String getProjectStoreBaseUrl() { + return projectStoreBaseUrl; + } + + public void setProjectStoreBaseUrl(String projectStoreBaseUrl) { + this.projectStoreBaseUrl = projectStoreBaseUrl; + } + + public String getServiceBaseUrl() { + return serviceBaseUrl; + } + + public void setServiceBaseUrl(String serviceBaseUrl) { + this.serviceBaseUrl = serviceBaseUrl; + } + + public Boolean getUseAccessUrl() { + return useAccessUrl; + } + + public void setUseAccessUrl(Boolean useAccessUrl) { + this.useAccessUrl = useAccessUrl; + } + + public String getWorkingDir() { + return workingDir; + } + + public void setWorkingDir(String workingDir) { + this.workingDir = workingDir; + } +} \ No newline at end of file diff --git a/src/main/java/to/wetransform/hale/transformer/api/Init.java b/src/main/java/to/wetransform/hale/transformer/api/Init.java new file mode 100644 index 0000000..a475f14 --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/api/Init.java @@ -0,0 +1,56 @@ +package to.wetransform.hale.transformer.api; + +import java.net.URL; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.eclipse.equinox.nonosgi.registry.RegistryFactoryHelper; +import org.slf4j.bridge.SLF4JBridgeHandler; + +import groovy.lang.GroovySystem; +import to.wetransform.hale.transformer.TransformerConfig; +import to.wetransform.hale.transformer.api.internal.CustomMetaClassCreationHandle; +import to.wetransform.hale.transformer.api.urlhandler.CustomURLStreamHandlerFactory; +import to.wetransform.hale.transformer.api.urlhandler.ServiceURLStreamHandler; +import to.wetransform.hale.transformer.api.urlhandler.TokenProvider; + +public class Init { + + private static AtomicBoolean initialized = new AtomicBoolean(false); + + public static void init(final TransformerConfig config) { + TokenProvider tokenProvider = new TokenProvider() { + + @Override + public String getAuthenticationToken() { + return config.getJwtToken(); + } + }; + init(config, tokenProvider); + } + + public static void init(final TransformerConfig config, final TokenProvider tokenProvider) { + if (initialized.compareAndSet(false, true)) { + SLF4JBridgeHandler.install(); + + // initialize registry + RegistryFactoryHelper.getRegistry(); + + // initialize meta extensions + GroovySystem.getMetaClassRegistry().setMetaClassCreationHandle(new CustomMetaClassCreationHandle()); + + // initialize URL handlers + CustomURLStreamHandlerFactory factory = new CustomURLStreamHandlerFactory(); + + factory.addHandler("bucket", new ServiceURLStreamHandler(config.getServiceBaseUrl(), tokenProvider, false)); + factory.addHandler("project", + new ServiceURLStreamHandler(config.getProjectStoreBaseUrl(), tokenProvider, false)); + if (!config.getAttachmentStoreBaseUrl().equals("")) { + factory.addHandler("attachment", + new ServiceURLStreamHandler(config.getAttachmentStoreBaseUrl(), tokenProvider, false)); + } + + URL.setURLStreamHandlerFactory(factory); + } + } + +} diff --git a/src/main/java/to/wetransform/hale/transformer/api/TransformerApiApplication.java b/src/main/java/to/wetransform/hale/transformer/api/TransformerApiApplication.java index d8c6d80..244febb 100644 --- a/src/main/java/to/wetransform/hale/transformer/api/TransformerApiApplication.java +++ b/src/main/java/to/wetransform/hale/transformer/api/TransformerApiApplication.java @@ -17,47 +17,47 @@ @EnableScheduling public class TransformerApiApplication { - // TODO Should be configurable - private static final String ROUTING_KEY = "hale.transformation.#"; - - // TODO Should be configurable - public static final String TOPIC_EXCHANGE_NAME = "hale-transformer-exchange"; - - // TODO Should be configurable - public static final String QUEUE_NAME = "hale-transformation"; - - @Bean - Queue queue() { - // TODO Queue should be declared passively, i.e. it should be created - // outside of this application - return new Queue(QUEUE_NAME, false); - } - - @Bean - TopicExchange exchange() { - // TODO Exchange should be declared passively, i.e. it should be created - // outside of this application - return new TopicExchange(TOPIC_EXCHANGE_NAME); - } - - @Bean - Binding binding(Queue queue, TopicExchange exchange) { - return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); - } - - @Bean - public MessageConverter jsonMessageConverter() { - return new Jackson2JsonMessageConverter(); - } - - @Bean - public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) { - final var rabbitTemplate = new RabbitTemplate(connectionFactory); - rabbitTemplate.setMessageConverter(jsonMessageConverter()); - return rabbitTemplate; - } - - public static void main(String[] args) { - SpringApplication.run(TransformerApiApplication.class, args); - } + // TODO Should be configurable + private static final String ROUTING_KEY = "hale.transformation.#"; + + // TODO Should be configurable + public static final String TOPIC_EXCHANGE_NAME = "hale-transformer-exchange"; + + // TODO Should be configurable + public static final String QUEUE_NAME = "hale-transformation"; + + @Bean + Queue queue() { + // TODO Queue should be declared passively, i.e. it should be created + // outside of this application + return new Queue(QUEUE_NAME, false); + } + + @Bean + TopicExchange exchange() { + // TODO Exchange should be declared passively, i.e. it should be created + // outside of this application + return new TopicExchange(TOPIC_EXCHANGE_NAME); + } + + @Bean + Binding binding(Queue queue, TopicExchange exchange) { + return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); + } + + @Bean + public MessageConverter jsonMessageConverter() { + return new Jackson2JsonMessageConverter(); + } + + @Bean + public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) { + final var rabbitTemplate = new RabbitTemplate(connectionFactory); + rabbitTemplate.setMessageConverter(jsonMessageConverter()); + return rabbitTemplate; + } + + public static void main(String[] args) { + SpringApplication.run(TransformerApiApplication.class, args); + } } diff --git a/src/main/java/to/wetransform/hale/transformer/api/internal/CustomMetaClassCreationHandle.java b/src/main/java/to/wetransform/hale/transformer/api/internal/CustomMetaClassCreationHandle.java new file mode 100644 index 0000000..42a0f05 --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/api/internal/CustomMetaClassCreationHandle.java @@ -0,0 +1,78 @@ +package to.wetransform.hale.transformer.api.internal; + +import java.lang.reflect.Constructor; + +import org.eclipse.equinox.nonosgi.registry.RegistryFactoryHelper; + +import eu.esdihumboldt.util.groovy.meta.extension.MetaClassDescriptor; +import eu.esdihumboldt.util.groovy.meta.extension.MetaClassExtension; +import groovy.lang.MetaClass; +import groovy.lang.MetaClassRegistry; +import groovy.lang.MetaClassRegistry.MetaClassCreationHandle; + +/** + * Adapts created meta classes with delegating meta classes registered in the + * {@link MetaClassExtension}. + */ +public class CustomMetaClassCreationHandle extends MetaClassCreationHandle { + + private final MetaClassExtension ext; + + public CustomMetaClassCreationHandle() { + // initialize registry + RegistryFactoryHelper.getRegistry(); + + ext = new MetaClassExtension(); + } + + @Override + protected MetaClass createNormalMetaClass(@SuppressWarnings("rawtypes") Class theClass, + MetaClassRegistry registry) { + MetaClass metaClass = super.createNormalMetaClass(theClass, registry); + + for (MetaClassDescriptor descriptor : ext.getElements()) { + if (descriptorApplies(descriptor, theClass)) { + // create meta class + Class delegatingMetaClass = descriptor.getMetaClass(); + try { + Constructor constructor = delegatingMetaClass.getConstructor(MetaClass.class); + metaClass = (MetaClass) constructor.newInstance(metaClass); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + return metaClass; + } + + /** + * Check if a meta class descriptor applies to a given class. + * + * @param descriptor the meta class descriptor + * @param theClass the class for which should be determined if the descriptor + * applies + * @return true if the descriptor applies to the class, + * false otherwise + */ + private boolean descriptorApplies(MetaClassDescriptor descriptor, @SuppressWarnings("rawtypes") Class theClass) { + Class forClass = descriptor.getForClass(); + if (descriptor.isForArray()) { + if (theClass.isArray()) { + Class componentClass = theClass.getComponentType(); + if (componentClass != null) { + return forClass.equals(componentClass) || forClass.isAssignableFrom(componentClass); + } else { + // should actually not happen + return false; + } + } else { + // no array + return false; + } + } else { + return forClass.equals(theClass) || forClass.isAssignableFrom(theClass); + } + } + +} \ No newline at end of file diff --git a/src/main/java/to/wetransform/hale/transformer/api/messaging/TransformationMessageConsumer.java b/src/main/java/to/wetransform/hale/transformer/api/messaging/TransformationMessageConsumer.java index 1f5d524..cdd19e4 100644 --- a/src/main/java/to/wetransform/hale/transformer/api/messaging/TransformationMessageConsumer.java +++ b/src/main/java/to/wetransform/hale/transformer/api/messaging/TransformationMessageConsumer.java @@ -9,38 +9,40 @@ import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import to.wetransform.hale.transformer.Transformer; +import to.wetransform.hale.transformer.TransformerConfig; import to.wetransform.hale.transformer.api.TransformerApiApplication; @Service public class TransformationMessageConsumer { - /** - * - */ - public record TransformationMessage( - @JsonProperty("projectUrl") String projectUrl, @JsonProperty("sourceDataUrl") String sourceDataUrl) - implements Serializable {} - - private static final Logger LOG = LoggerFactory.getLogger(TransformationMessageConsumer.class); - - @RabbitListener(queues = TransformerApiApplication.QUEUE_NAME) - public void receiveMessage(final TransformationMessage message) { - LOG.info("Received projectUrl = " + message.projectUrl + " sourceDataUrl = " + message.sourceDataUrl); - - // TODO Implement mechanism to only accept a message from the queue if no - // transformation is currently running - - if (message.projectUrl != null && message.sourceDataUrl() != null) { - Transformer tx = new Transformer(); - - try { - tx.transform(); - tx.getLatch().await(10, TimeUnit.MINUTES); // TODO make configurable - } catch (InterruptedException e) { - // TODO What should be done when the transformation fails or times out? - // - Simply requeuing the message is probably not helpful - // - Send a message back so that the producer can react? - LOG.error("Transformation process timed out: " + e.getMessage(), e); - } - } - } + /** + * + */ + public record TransformationMessage(@JsonProperty("projectUrl") String projectUrl, + @JsonProperty("sourceDataUrl") String sourceDataUrl) implements Serializable { + } + + private static final Logger LOG = LoggerFactory.getLogger(TransformationMessageConsumer.class); + + @RabbitListener(queues = TransformerApiApplication.QUEUE_NAME) + public void receiveMessage(final TransformationMessage message) { + LOG.info("Received projectUrl = " + message.projectUrl + " sourceDataUrl = " + message.sourceDataUrl); + + // TODO Implement mechanism to only accept a message from the queue if no + // transformation is currently running + TransformerConfig config = TransformerConfig.createDefault(null, null, null, null); + + if (message.projectUrl != null && message.sourceDataUrl() != null) { + Transformer tx = new Transformer(config); + + try { + tx.transform(message.sourceDataUrl(), message.projectUrl, null); + tx.getLatch().await(10, TimeUnit.MINUTES); // TODO make configurable + } catch (InterruptedException e) { + // TODO What should be done when the transformation fails or times out? + // - Simply requeuing the message is probably not helpful + // - Send a message back so that the producer can react? + LOG.error("Transformation process timed out: " + e.getMessage(), e); + } + } + } } diff --git a/src/main/java/to/wetransform/hale/transformer/api/urlhandler/CustomURLStreamHandlerFactory.java b/src/main/java/to/wetransform/hale/transformer/api/urlhandler/CustomURLStreamHandlerFactory.java new file mode 100644 index 0000000..4ac921f --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/api/urlhandler/CustomURLStreamHandlerFactory.java @@ -0,0 +1,32 @@ +package to.wetransform.hale.transformer.api.urlhandler; + +import java.net.URLStreamHandler; +import java.net.URLStreamHandlerFactory; +import java.util.HashMap; +import java.util.Map; + +/** + * Custom URL handlers to allow authentication for cases where the client does + * not support it, or resolving relative URLs for cases where that is not + * possible. + */ +public class CustomURLStreamHandlerFactory implements URLStreamHandlerFactory { + + private final Map handlers = new HashMap<>(); + + /** + * Add an additional URL handler. + * + * @param protocol the protocol name + * @param handler the handler for the protocol + */ + public void addHandler(String protocol, URLStreamHandler handler) { + handlers.put(protocol.toLowerCase(), handler); + } + + @Override + public URLStreamHandler createURLStreamHandler(String protocol) { + return handlers.get(protocol); + } + +} diff --git a/src/main/java/to/wetransform/hale/transformer/api/urlhandler/ServiceURLConnection.java b/src/main/java/to/wetransform/hale/transformer/api/urlhandler/ServiceURLConnection.java new file mode 100644 index 0000000..59ccb1b --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/api/urlhandler/ServiceURLConnection.java @@ -0,0 +1,128 @@ +package to.wetransform.hale.transformer.api.urlhandler; + +import java.io.IOException; +import java.io.InputStream; +import java.io.UnsupportedEncodingException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLConnection; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; + +public class ServiceURLConnection extends URLConnection { + + private String ownerType; + private String ownerId; + private String path; + private String bucket; + + private final String serviceBaseUrl; + private final TokenProvider tokenProvider; + private boolean rawOnly; + + protected ServiceURLConnection(URL url, String serviceBaseUrl, TokenProvider tokenProvider, boolean rawOnly) { + super(url); + this.serviceBaseUrl = serviceBaseUrl; + this.tokenProvider = tokenProvider; + this.rawOnly = rawOnly; + } + + @Override + public void connect() throws IOException { + if (connected) + return; + + doConnect(); + + connected = true; + } + + /** + * Reads the connection information from the URL. + */ + protected void doConnect() throws IOException { + List parts = Splitter.on('/').omitEmptyStrings().splitToList(getURL().getPath()); + if (parts.size() < 3) { + throw new IOException("URL is missing valid owner identification"); + } + ownerType = parts.get(0); + ownerId = parts.get(1); + bucket = parts.get(2); + + // path in bucket + path = Joiner.on('/').join(parts.subList(3, parts.size())); + } + + @Override + public InputStream getInputStream() throws IOException { + if (!connected) { + connect(); + } + + /* + * XXX note that this bypasses any authentication mechanisms by authenticating + * as the service + */ + + /* + * There are different possibilities to implement this: - Query the bucket + * information and access S3 directly via the provided access URL - or access + * via the raw file URL provided by the schema store + * + * For now using the latter option + */ + + // delegate to http connection + StringBuilder urlBuilder = new StringBuilder(serviceBaseUrl); + // TODO what to put here? + urlBuilder.append("/buckets/"); + urlBuilder.append(ownerType); + urlBuilder.append("/"); + urlBuilder.append(ownerId); + urlBuilder.append("/"); + urlBuilder.append(bucket); + if (rawOnly) { + urlBuilder.append("/raw/"); + } else if (!path.isEmpty()) { + urlBuilder.append("/"); + + // Don't encode "/", as they're used for accessing raw files within folders + String encodedPath = Arrays.stream(path.split("/")).map(this::encodeString) + .collect(Collectors.joining("/")); + + urlBuilder.append(encodedPath); + } + + URL fileURL = new URL(urlBuilder.toString()); + HttpURLConnection httpConnection = (HttpURLConnection) fileURL.openConnection(); + + String token; + try { + token = tokenProvider.getAuthenticationToken(); + } catch (Exception e) { + throw new IOException("Unable to create authentication token for schema protocol request", e); + } + + httpConnection.setRequestProperty("Authorization", "Bearer " + token); + httpConnection.setRequestMethod("GET"); + + httpConnection.connect(); + + return httpConnection.getInputStream(); + } + + private String encodeString(String stringToEncode) { + try { + return URLEncoder.encode(stringToEncode, StandardCharsets.UTF_8.name()); + } catch (UnsupportedEncodingException e) { + // Not going to happen since StandardCharsets is used + } + return null; + } +} diff --git a/src/main/java/to/wetransform/hale/transformer/api/urlhandler/ServiceURLStreamHandler.java b/src/main/java/to/wetransform/hale/transformer/api/urlhandler/ServiceURLStreamHandler.java new file mode 100644 index 0000000..1190cbe --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/api/urlhandler/ServiceURLStreamHandler.java @@ -0,0 +1,26 @@ +package to.wetransform.hale.transformer.api.urlhandler; + +import java.io.IOException; +import java.net.URL; +import java.net.URLConnection; +import java.net.URLStreamHandler; + +public class ServiceURLStreamHandler extends URLStreamHandler { + + private final String serviceBaseUrl; + private final TokenProvider tokenProvider; + private boolean rawOnly; + + public ServiceURLStreamHandler(String serviceBaseUrl, TokenProvider tokenProvider, boolean rawOnly) { + this.serviceBaseUrl = serviceBaseUrl; + this.tokenProvider = tokenProvider; + this.rawOnly = rawOnly; + } + + @Override + protected URLConnection openConnection(URL u) throws IOException { + // Provide your implementation to open a connection for the specified URL + return new ServiceURLConnection(u, serviceBaseUrl, tokenProvider, rawOnly); + } + +} \ No newline at end of file diff --git a/src/main/java/to/wetransform/hale/transformer/api/urlhandler/TokenProvider.java b/src/main/java/to/wetransform/hale/transformer/api/urlhandler/TokenProvider.java new file mode 100644 index 0000000..1cb190b --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/api/urlhandler/TokenProvider.java @@ -0,0 +1,8 @@ +package to.wetransform.hale.transformer.api.urlhandler; + +public interface TokenProvider { + + String getAuthenticationToken(); + +} +