diff --git a/.gitignore b/.gitignore index bec0c6a..8f0e027 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,5 @@ out/ *.code-workspace # Local History for Visual Studio Code .history/ +.vscode/settings.json + diff --git a/src/main/java/to/wetransform/hale/transformer/CustomTarget.java b/src/main/java/to/wetransform/hale/transformer/CustomTarget.java index 63c1f8d..e5509cd 100644 --- a/src/main/java/to/wetransform/hale/transformer/CustomTarget.java +++ b/src/main/java/to/wetransform/hale/transformer/CustomTarget.java @@ -4,10 +4,17 @@ import java.util.Map; import eu.esdihumboldt.hale.common.core.io.Value; +import org.json.JSONObject; public record CustomTarget(String providerId, Map settings) { public CustomTarget(String providerId) { this(providerId, new HashMap<>()); } + + public void setProviderId(String string) {} + + public JSONObject getSettings() { + return null; + } } diff --git a/src/main/java/to/wetransform/hale/transformer/ResultCallbackHelper.java b/src/main/java/to/wetransform/hale/transformer/ResultCallbackHelper.java new file mode 100644 index 0000000..50544cc --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/ResultCallbackHelper.java @@ -0,0 +1,132 @@ +package to.wetransform.hale.transformer; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import java.util.zip.GZIPOutputStream; + +import org.json.JSONException; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Helper to perform transformation result callbacks. + */ +public class ResultCallbackHelper { + + /** + * The logger instance for this class. + */ + private static Logger logger = LoggerFactory.getLogger(ResultCallbackHelper.class); + + /** + * Perform the transformation result callback. + * + * @param success Whether or not the transformation has been successful. + * @param logs The transformation log. + * @param callbackUrl The callback url. + * @param token The jwt token for the request. + * @param jobId The transformation job id. + */ + public static void performResultCallback( + boolean success, + String logs, + Optional stats, + String callbackUrl, + String token, + Optional jobId) { + try { + // Create transformation result object. + JSONObject resultObj = new JSONObject(); + resultObj.put("success", success); + + JSONObject propertiesObj = new JSONObject(); + if (Optional.ofNullable(logs).isPresent()) { + propertiesObj.put("logs", logs); + } + + if (jobId.isPresent()) { + propertiesObj.put("jobId", jobId.get()); + } else { + logger.warn("Transformation result callback does not contain a jobId."); + } + + if (stats.isPresent()) { + propertiesObj.put("stats", stats.get()); + } + + resultObj.put("properties", propertiesObj); + + // Send result object to callback URL. + sendTransformationResult(resultObj, callbackUrl, token); + + } catch (JSONException e) { + logger.warn("Failed to create result object to be sent "); + } + } + + /** + * Send the transformation result object to the given url. + * + * @param result The transformation result object. + * @param url The callback url. + * @param token The jwt token. + */ + private static void sendTransformationResult(JSONObject result, String url, String token) { + + try { + logger.info("Sending transformation result to " + url.toString()); + + // Create http request to send result object. + URL sendUrl = new URL(url); + HttpURLConnection http = (HttpURLConnection) sendUrl.openConnection(); + if (token != null) { + http.setRequestProperty("Authorization", "Bearer " + token); + } else { + logger.warn("Transformation callback is being performed without JWT token!"); + } + http.setRequestProperty("Content-Type", "application/json; charset=UTF-8"); + http.setRequestProperty("Content-Encoding", "gzip"); + http.setDoOutput(true); + http.setDoInput(true); + http.setRequestMethod("PUT"); + http.setConnectTimeout(10000); + + // write zipped content + OutputStream os = http.getOutputStream(); + OutputStream gzip = new GZIPOutputStream(os); + + try { + OutputStreamWriter osw = new OutputStreamWriter(gzip, StandardCharsets.UTF_8); + osw.write(result.toString()); + osw.flush(); + osw.close(); + } catch (IOException e) { + logger.warn("Failed to write result callback request body.", e); + } finally { + os.close(); + gzip.close(); + } + + // handle http response + int statusCode = http.getResponseCode(); + String statusMessage = http.getResponseMessage(); + String resLogMsg = "Transformation result callback endpoint responded with status code " + statusCode + ": " + + statusMessage; + if (statusCode != 200) { + logger.warn(resLogMsg); + } else { + logger.info(resLogMsg); + } + + http.disconnect(); + } catch (Throwable t) { + logger.warn("Failed to perform transformation result callback.", t); + } + } +} diff --git a/src/main/java/to/wetransform/hale/transformer/RunContext.java b/src/main/java/to/wetransform/hale/transformer/RunContext.java new file mode 100644 index 0000000..f4a77f7 --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/RunContext.java @@ -0,0 +1,28 @@ +package to.wetransform.hale.transformer; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.io.FileUtils; + +public class RunContext { + + private final List tempFiles = new ArrayList<>(); + + public File createTempDir() throws IOException { + Path path = Files.createTempDirectory("hale-progen"); + tempFiles.add(path); + return path.toFile(); + } + + public void cleanUp() throws IOException { + for (Path path : tempFiles) { + FileUtils.deleteDirectory(path.toFile()); + } + tempFiles.clear(); + } +} diff --git a/src/main/java/to/wetransform/hale/transformer/Transformer.java b/src/main/java/to/wetransform/hale/transformer/Transformer.java index 4f1465b..18fcecf 100644 --- a/src/main/java/to/wetransform/hale/transformer/Transformer.java +++ b/src/main/java/to/wetransform/hale/transformer/Transformer.java @@ -1,59 +1,262 @@ package to.wetransform.hale.transformer; -import java.io.InputStream; +import java.io.*; import java.net.URI; +import java.net.URISyntaxException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.text.MessageFormat; +import java.util.*; +import java.util.Map.Entry; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import com.google.common.base.Strings; import eu.esdihumboldt.hale.app.transform.ExecContext; +import eu.esdihumboldt.hale.app.transform.ExecTransformation; +import eu.esdihumboldt.hale.app.transform.ExecUtil; import eu.esdihumboldt.hale.common.core.HalePlatform; +import eu.esdihumboldt.hale.common.core.io.HaleIO; +import eu.esdihumboldt.hale.common.core.io.IOProvider; +import eu.esdihumboldt.hale.common.core.io.Value; +import eu.esdihumboldt.hale.common.core.io.extension.IOProviderDescriptor; +import eu.esdihumboldt.hale.common.core.io.extension.IOProviderExtension; +import eu.esdihumboldt.hale.common.core.io.project.model.IOConfiguration; import eu.esdihumboldt.hale.common.core.io.project.model.Project; import eu.esdihumboldt.hale.common.core.io.supplier.DefaultInputSupplier; +import eu.esdihumboldt.hale.common.core.report.Report; +import eu.esdihumboldt.hale.common.core.report.ReportSession; +import eu.esdihumboldt.hale.common.core.report.util.StatisticsHelper; +import eu.esdihumboldt.hale.common.core.report.writer.ReportReader; +import eu.esdihumboldt.hale.common.instance.io.InstanceIO; +import eu.esdihumboldt.hale.common.instance.io.InstanceWriter; +import eu.esdihumboldt.util.groovy.collector.StatsCollector; import eu.esdihumboldt.util.io.IOUtils; +import groovy.json.JsonOutput; +import org.apache.commons.collections.buffer.CircularFifoBuffer; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.LineIterator; +import org.apache.commons.io.output.TeeOutputStream; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.eclipse.core.runtime.content.IContentType; +import org.json.JSONException; +import org.json.JSONObject; import org.osgi.framework.Version; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import to.wetransform.halecli.internal.Init; +import to.wetransform.hale.transformer.api.Init; public class Transformer { - private static final Logger LOG = LoggerFactory.getLogger(Transformer.class); + /** + * Limit for log size, to include complete log. + *

+ * Current value: ~ 5MB with 4 byte characters (so likely ~1.3MB) We need to + * stay well below 16 MB due to MongoDB restrictions (plus megabytes of logs are + * not really helpful) + */ + private static final int MAX_LOG_SIZE_CHARACTERS = 1310720; - private CountDownLatch latch = new CountDownLatch(1); + private static final String MESSAGE_LINE_TRUNCATED = "[Line truncated...]"; + private static final String MESSAGE_LOG_TRUNCATED = "[Log truncated...]"; - public void transform(/* TODO add parameters for data and project sources */ ) { - // TODO setup log files for reports and transformation log + /** + * If the log is truncated, include lines from the end. + */ + private static final int TRUNCATED_INCLUDE_END_MAX_LINES = 50; + /** + * If the lines at the end to include exceed this limit, they are not included. + */ + private static final int TRUNCATED_INCLUDE_END_MAX_CHARACTERS = 25600; - long heapMaxSize = Runtime.getRuntime().maxMemory(); - LOG.info("Maximum heap size configured as " + IOUtils.humanReadableByteCount(heapMaxSize, false)); + private static final Logger LOG = LoggerFactory.getLogger(Transformer.class); + private final TransformerConfig config; + private final CountDownLatch latch = new CountDownLatch(1); - Init.init(); + public Transformer(TransformerConfig config) { + this.config = config; + } - Version version = HalePlatform.getCoreVersion(); - LOG.info(MessageFormat.format("Launching hale-transformer {0}...", version.toString())); + public void transform(String sourceDataURL, String projectURL, String targetURL) { + boolean success = false; + PrintStream sysOut = System.out; + File transformationLogFile = null; - ExecContext context = new ExecContext(); + RunContext run = new RunContext(); - // URI projectUri = .... - // context.setProject(projectUri); - // Project project = loadProject(projectUri); + // init working directory + File workingDir = null; + File reportFile = null; + try { + if (config.getWorkingDir() != null) { + workingDir = new File(config.getWorkingDir()); + Files.createDirectories(workingDir.toPath()); + } else { + workingDir = run.createTempDir(); + } + } catch (Throwable t) { + performCallback( + success, Optional.ofNullable(transformationLogFile), Optional.empty(), Optional.ofNullable(t)); + return; + } - // context.setSources(...) - // context.setSourceProviderIds(...) - // context.setSourcesSettings(...) + LOG.info("Using working directory " + workingDir.getAbsolutePath()); - // Value sourceCrs = null; - // TODO determine source CRS + try { + // setup report file + reportFile = new File(workingDir, "reports.log"); + reportFile.delete(); + reportFile.createNewFile(); + } catch (Throwable t) { + performCallback( + success, Optional.ofNullable(transformationLogFile), Optional.empty(), Optional.ofNullable(t)); + return; + } - // TargetConfig targetConfig = configureTarget(project, sourceCrs); + try { + // setup log file + transformationLogFile = new File(workingDir, "transformation.log"); + transformationLogFile.delete(); + transformationLogFile.createNewFile(); + + // fork streams + FileOutputStream transformationLogOut = new FileOutputStream(transformationLogFile, true); + OutputStream tOut = new TeeOutputStream(sysOut, transformationLogOut); + System.setOut(new PrintStream(tOut)); + } catch (Throwable t) { + performCallback( + success, Optional.ofNullable(transformationLogFile), Optional.empty(), Optional.ofNullable(t)); + return; + } try { + LOG.info("Startup..."); + + long heapMaxSize = Runtime.getRuntime().maxMemory(); + LOG.info("Maximum heap size configured as " + IOUtils.humanReadableByteCount(heapMaxSize, false)); + + // initialize + Init.init(config); + + Version version = HalePlatform.getCoreVersion(); + LOG.info(MessageFormat.format("Launching hale-transformer {0}...", version.toString())); + + ExecContext context = new ExecContext(); + + // Set up project URI + URI projectUri = new URI(projectURL); + context.setProject(projectUri); + + // Load project + Project project = loadProject(projectUri); + + // Set up custom defaultSrs + // TODO change this to something real + + Map sourceMapConfigs = Map.of("defaultSrs", Value.of("something")); + SourceConfig sourceConfig = new SourceConfig( + new URI(sourceDataURL), + "eu.esdihumboldt.hale.io.gml.reader", + sourceMapConfigs, + true, + new ArrayList<>()); + + List sourceConfigs = new ArrayList<>(); + sourceConfigs.add(sourceConfig); + + context.setSources(sourceConfigs.stream() + .filter(config -> config.transform()) + .map(config -> config.location()) + .collect(Collectors.toList())); + context.setSourceProviderIds( + sourceConfigs.stream().map(config -> config.providerId()).collect(Collectors.toList())); + context.setSourcesSettings( + sourceConfigs.stream().map(config -> config.settings()).collect(Collectors.toList())); + + // extract detected source data crs and use in target config + Value sourceCrs = null; + if (!sourceConfigs.isEmpty()) { + Throwable error = null; + // try each source config, in case it is not set for some + for (int i = 0; i < sourceConfigs.size() && (sourceCrs == null || sourceCrs.isEmpty()); i++) { + try { + sourceCrs = sourceConfigs.get(i).settings().get("defaultSrs"); + } catch (Throwable t) { + error = t; + } + } + + if (error != null) { + LOG.warn("Could not determine source data CRS", error); + } else if (sourceCrs == null || sourceCrs.isEmpty()) { + LOG.warn( + "Unable to determine source data CRS: None of {} sources is configured with a CRS", + sourceConfigs.size()); + } + } else { + LOG.warn("Unable to determine source data CRS: No source configured"); + } + + TargetConfig targetConfig = configureTarget(project, sourceCrs); + + File resultDir = new File(workingDir, "result"); + resultDir.mkdir(); + String targetFilename = targetConfig.filename(); + if (targetFilename == null) { + targetFilename = "result.out"; + } + File targetFile = new File(resultDir, targetFilename); + context.setTarget(targetFile.toURI()); + String preset = targetConfig.preset(); + CustomTarget customConfig = targetConfig.customTarget(); + if (preset != null) { + context.setPreset(preset); + } else { + if (customConfig == null || customConfig.providerId() == null) { + throw new IllegalStateException("No configuration on how to write transformed data available"); + } + context.setTargetProviderId(customConfig.providerId()); + } + + if (customConfig != null) { + context.setTargetSettings(customConfig.settings()); + } else { + context.setTargetSettings(new HashMap<>()); + } + + context.setReportsOut(reportFile); + + // general configuration + context.setLogException(true); + + // if (allowUnrestrictedGroovy(projectIdent)) { + // LOG.info("Lifting Groovy restriction..."); + // context.setRestrictGroovy(false); + // } + + // copy attachments + Set attachments = sourceConfigs.stream() + .flatMap(config -> config.attachments().stream()) + .collect(Collectors.toSet()); + copyAttachments(projectUri, attachments, resultDir); + // run the transformation + LOG.info("Transforming started."); + new ExecTransformation().run(context); - LOG.info("Transforming..."); - TimeUnit.SECONDS.sleep(30); - // new ExecTransformation().run(context); + // upload any result + + // evaluate results + ReportReader reader = new ReportReader(); + ReportSession reports = reader.readFile(reportFile); + JSONObject stats = getStats(reports); + success = evaluateReports(reports.getAllReports().values(), false); + LOG.info("Finished"); + + // perform result callback + performCallback( + success, Optional.ofNullable(transformationLogFile), Optional.ofNullable(stats), Optional.empty()); LOG.info("Transformation complete."); } catch (Throwable t) { @@ -63,6 +266,112 @@ public void transform(/* TODO add parameters for data and project sources */ ) { } } + /** + * Perform the result callback if configured. + * + * @param success The success state of the transformation + * @param optThrowable An optional throwable + */ + private void performCallback( + boolean success, Optional optLogFile, Optional stats, Optional optThrowable) { + if (config.getCallbackUrl().isPresent()) { + String logs = null; + + // get logs from log file + if (optLogFile.isPresent()) { + File logFile = optLogFile.get(); + try (LineIterator it = FileUtils.lineIterator(logFile, "UTF-8")) { + logs = processLog( + it, + MAX_LOG_SIZE_CHARACTERS, + TRUNCATED_INCLUDE_END_MAX_LINES, + TRUNCATED_INCLUDE_END_MAX_CHARACTERS); + } catch (IOException e) { + logs = e.getMessage(); + } + logFile.delete(); + } + + // append stacktrace from exception if any + if (logs != null) { + if (optThrowable.isPresent()) { + Throwable t = optThrowable.get(); + String lb = logs + System.lineSeparator() + ExceptionUtils.getStackTrace(t); + logs = lb; + } + } + + // invoke callback url + String url = config.getCallbackUrl().get(); + ResultCallbackHelper.performResultCallback( + success, logs, stats, url, config.getJwtToken(), config.getJobId()); + } + } + + public static String processLog( + LineIterator it, int maxLogSizeCharacters, int maxEndLines, int maxEndLinesCharacters) { + StringBuilder logBuilder = new StringBuilder(); + + boolean truncated = false; + + while (it.hasNext() && !truncated && logBuilder.length() < maxLogSizeCharacters) { + String line = it.nextLine(); + + int left = maxLogSizeCharacters - logBuilder.length(); + if (left >= line.length()) { + logBuilder.append(line); + logBuilder.append('\n'); + } else { + if (left > MESSAGE_LINE_TRUNCATED.length()) { + logBuilder.append(line, 0, left - MESSAGE_LINE_TRUNCATED.length()); + logBuilder.append(MESSAGE_LINE_TRUNCATED); + logBuilder.append('\n'); + } + truncated = true; + } + } + + if (it.hasNext()) { + truncated = true; + } + + if (truncated) { + // attempt to add lines from the end of the log + CircularFifoBuffer endLines = new CircularFifoBuffer(maxEndLines); + boolean overflow = false; + while (it.hasNext()) { + if (endLines.size() == maxEndLines) { + overflow = true; + } + endLines.add(it.nextLine()); + } + + int endSize = endLines.stream().mapToInt(l -> l.toString().length()).sum(); + if (endSize > maxEndLinesCharacters) { + // don't add end lines + logBuilder.append(MESSAGE_LOG_TRUNCATED); + logBuilder.append('\n'); + } else { + if (!overflow) { + // nothing was truncated -> just add the lines + } else { + logBuilder.append("[Log truncated... following are the last "); + logBuilder.append(endLines.size()); + logBuilder.append(" lines of the log]"); + logBuilder.append('\n'); + } + + // add end lines + for (Object line : endLines) { + logBuilder.append(line.toString()); + logBuilder.append('\n'); + } + } + } + + return logBuilder.toString(); + } + private Project loadProject(URI projectUri) { DefaultInputSupplier supplier = new DefaultInputSupplier(projectUri); Project result = null; @@ -74,6 +383,243 @@ private Project loadProject(URI projectUri) { return result; } + private TargetConfig configureTarget(Project lp, Value sourceCrs) { + String filename; + String preset = null; + CustomTarget customTarget = null; + + Map presets = getPresets(lp); + + // Preset names + String defaultPreset = "default"; + String hcPreset = "hale-connect"; + + if (presets.containsKey(hcPreset)) { + // Project contains hale-connect preset + preset = hcPreset; + IOConfiguration ioConfiguration = presets.get(hcPreset); + filename = determineTargetFileName(ioConfiguration); + } else if (presets.containsKey(defaultPreset)) { + // Project contains default preset + preset = defaultPreset; + IOConfiguration ioConfiguration = presets.get(defaultPreset); + filename = determineTargetFileName(ioConfiguration); + } else { + // No specific presets found, creating a custom target configuration + + Map targetMap = new HashMap<>(); + + // Specify target provider for GML FeatureCollection + String targetProvider = "eu.esdihumboldt.hale.io.gml.writer"; + + // Additional settings for testing + targetMap.put("xml.pretty", Value.of(true)); + targetMap.put("crs.epsg.prefix", Value.of("http://www.opengis.net/def/crs/EPSG/0/")); + + // Use CRS from source data analysis if available and a valid EPSG code, + // otherwise fallback to EPSG:4326 + Value targetCrs = + (sourceCrs != null && sourceCrs.getStringRepresentation().startsWith("code:EPSG")) + ? sourceCrs + : Value.of("code:EPSG:4326"); + + targetMap.put("crs", targetCrs); + LOG.info("Using {} as the transformation target CRS", targetCrs.getStringRepresentation()); + + // Create a custom target configuration + CustomTarget target = new CustomTarget(targetProvider, targetMap); + + filename = "inspire.gml"; + customTarget = target; + } + + // Create and return the target configuration + return new TargetConfig(filename, preset, customTarget); + } + + /** + * Determine the name of the target file based on an export preset. + * + * @param preset the export preset + * @return the file name for the target file + */ + public static String determineTargetFileName(IOConfiguration preset) { + // Default extension to "xml" to reflect old behavior + String extension = "xml"; + + IContentType contentType = determineContentType(preset); + if (contentType != null) { + // Derive extension from content type + String[] extensions = contentType.getFileSpecs(IContentType.FILE_EXTENSION_SPEC); + if (extensions != null && extensions.length > 0) { + extension = extensions[0]; // Choose the first one + } + } + + // If extension would be "gml," use "xml" instead for backward compatibility + extension = "gml".equalsIgnoreCase(extension) ? "xml" : extension; + + LOG.info("Chose .{} as the extension for the target file", extension); + + return "result." + extension; + } + + private static IContentType determineContentType(IOConfiguration preset) { + // Usually, the content type is part of the settings + Value value = preset.getProviderConfiguration().get(IOProvider.PARAM_CONTENT_TYPE); + if (value != null && !value.isEmpty()) { + return HalePlatform.getContentTypeManager().getContentType(value.as(String.class)); + } + + // Try to determine based on provider ID + String providerId = preset.getProviderId(); + if (providerId != null) { + IOProviderDescriptor providerDescriptor = + IOProviderExtension.getInstance().getFactory(providerId); + if (providerDescriptor != null) { + Set supportedTypes = providerDescriptor.getSupportedTypes(); + if (!supportedTypes.isEmpty()) { + IContentType contentType = supportedTypes.iterator().next(); + + if (supportedTypes.size() > 1) { + LOG.warn( + "Multiple content types as candidates ({}), chose {}", + supportedTypes.stream().map(IContentType::getId).collect(Collectors.joining(", ")), + contentType.getId()); + } + + return contentType; + } + } + } + + return null; + } + + /** + * Get all export presets from the project. + * + * @param project the hale project object + * @return the map of presets + */ + private Map getPresets(Project project) { + Map exportPresets = new HashMap<>(); + + if (project == null) { + return exportPresets; + } + + for (Entry entry : + project.getExportConfigurations().entrySet()) { + IOConfiguration originalConfiguration = entry.getValue(); + + if (InstanceIO.ACTION_SAVE_TRANSFORMED_DATA.equals(originalConfiguration.getActionId())) { + String presetName = entry.getKey(); + IOConfiguration clonedConfiguration = originalConfiguration.clone(); + + // Check and add the I/O provider to exportPresets + checkAndAddIOProvider(presetName, clonedConfiguration, exportPresets); + } + } + + return exportPresets; + } + + private void checkAndAddIOProvider( + String presetName, IOConfiguration configuration, Map exportPresets) { + String providerId = configuration.getProviderId(); + IOProviderDescriptor factory = HaleIO.findIOProviderFactory(InstanceWriter.class, null, providerId); + + if (factory != null) { + String name = Strings.isNullOrEmpty(presetName) ? factory.getDisplayName() : presetName; + exportPresets.computeIfAbsent(name, k -> configuration); + } else { + LOG.error("I/O provider {} for export preset {} not found", providerId, presetName); + } + } + + private void copyAttachments(URI projectUri, Set attachments, File targetDir) { + if (attachments == null || attachments.isEmpty()) { + return; + } + + for (String attachmentPath : attachments) { + String sourcePath = config.getAttachmentStoreBaseUrl().equals("") ? "bucket:///" : "attachment:///"; + StringBuilder uriPath = new StringBuilder(projectUri.toString()).append("/raw/"); + try { + uriPath.append(URLEncoder.encode(attachmentPath, StandardCharsets.UTF_8.name())); + URI loc = new URI(sourcePath + uriPath); + DefaultInputSupplier supp = new DefaultInputSupplier(loc); + try (InputStream in = supp.getInput()) { + Files.copy(in, new File(targetDir, attachmentPath).toPath()); + } + LOG.info("Successfully copied attachment " + attachmentPath + " to target data set"); + } catch (UnsupportedEncodingException e) { + LOG.error("Could not encode the given attachmentPath: {}", attachmentPath, e); + } catch (URISyntaxException e) { + // No need to terminate the process, just log and skip the attachment that cannot be read. + LOG.error("Could not create URI from the path: {} ", sourcePath + uriPath, e); + } catch (FileNotFoundException e) { + // log a warning message ignoring the exception trace as it might produce a lot of noise + LOG.warn("Attachment file {} not found", attachmentPath); + } catch (Exception e) { + // this is normal, as attachments may not be present in the data bucket, + // but it's good to know when and why they don't, in case they were expected to be copied + LOG.error("Could not copy attachment " + attachmentPath, e); + /* + * XXX Improvements: + * - also check source attachment bucket for files + * - instead of adding the attachments to the data bucket, add them to the target attachment bucket? + */ + } + } + } + + /** + * After transformation, assemble stats from reports. + * + * @param reports the reports + */ + private JSONObject getStats(ReportSession reports) { + StatsCollector root = + new StatisticsHelper().getStatistics(reports.getAllReports().values(), true); + + // FIXME handle stats also for local transformation -> for now just print stats + if (!config.getCallbackUrl().isPresent()) { + System.out.println(JsonOutput.prettyPrint(root.saveToJson(true))); + } + + try { + return new JSONObject(root.saveToJson(false)); + } catch (JSONException e) { + LOG.error("Error assembling stats JSON representation", e); + return null; + } + } + + private boolean evaluateReports(Collection> reports, boolean detailed) { + boolean ok = true; + ExecUtil.info("Transformation tasks summaries:"); + + for (Report report : reports) { + if (!report.isSuccess() || !report.getErrors().isEmpty()) { + ok = false; + + ExecUtil.error(report.getTaskName() + ": " + report.getSummary()); + if (detailed) { + report.getErrors().forEach(e -> { + ExecUtil.error(e.getStackTrace()); + }); + } + } else { + ExecUtil.info(report.getTaskName() + ": " + report.getSummary()); + } + // TODO process information, provide in a usable way? + } + + return ok; + } + public CountDownLatch getLatch() { return latch; } diff --git a/src/main/java/to/wetransform/hale/transformer/TransformerConfig.java b/src/main/java/to/wetransform/hale/transformer/TransformerConfig.java index e0b76a4..31861f4 100644 --- a/src/main/java/to/wetransform/hale/transformer/TransformerConfig.java +++ b/src/main/java/to/wetransform/hale/transformer/TransformerConfig.java @@ -1,6 +1,178 @@ package to.wetransform.hale.transformer; +import java.util.Optional; + +/** + * General configuration needed for setting up the transformer, independently of + * the concrete transformation to run. + */ public class TransformerConfig { - // empty for now + /** + * The JWT token to authenticate with against services. Should be a user token. + */ + private String jwtToken; + + /** + * Base URL of the service storing transformation projects. + */ + private String projectStoreBaseUrl; + + /** + * Base URL of the service storing datasets. + */ + private String serviceBaseUrl; + + /** + * Base URL of the attachment store. + */ + private String attachmentStoreBaseUrl; + + /** + * Path of the working directory for the transformation result and reports. + */ + private String workingDir; + + /** + * Whether or not the accessUrl property should be used to download source data + * for transformation. If set to false, the /raw endpoint of the bucket-service + * will be used. + */ + private Boolean useAccessUrl; + + /** + * The optional callback URL to be invoked with the result of the + * transformation. + */ + private Optional callbackUrl; + + /** + * The optional job id of this transformation. + */ + private Optional jobId; + + public TransformerConfig( + String jwtToken, + String projectStoreBaseUrl, + String serviceBaseUrl, + String attachmentStoreBaseUrl, + String workingDir, + Boolean useAccessUrl, + Optional callbackUrl, + Optional jobId) { + this.jwtToken = jwtToken; + this.projectStoreBaseUrl = projectStoreBaseUrl; + this.serviceBaseUrl = serviceBaseUrl; + this.attachmentStoreBaseUrl = attachmentStoreBaseUrl; + this.workingDir = workingDir; + this.useAccessUrl = useAccessUrl; + this.callbackUrl = callbackUrl; + this.jobId = jobId; + } + + public static TransformerConfig createDefault( + String overrideToken, + String overrideDir, + Optional overrideCallbackUrl, + Optional overrideJobId) { + String jwtToken = (overrideToken != null) ? overrideToken : System.getenv("JWT_TOKEN"); + if (jwtToken == null) { + throw new IllegalStateException("Missing JWT token configuration"); + } + String projectStoreBaseUrl = System.getenv("PROJECT_STORE_BASE_URL") != null + ? System.getenv("PROJECT_STORE_BASE_URL") + : "http://localdocker:9061"; + String serviceBaseUrl = System.getenv("SERVICE_BASE_URL") != null + ? System.getenv("SERVICE_BASE_URL") + : "http://localdocker:9010"; + String attachmentStoreBaseUrl; + if (System.getenv("PROJECT_STORE_BASE_URL") != null && System.getenv("SERVICE_BASE_URL") != null) { + attachmentStoreBaseUrl = (System.getenv("ATTACHMENT_STORE_BASE_URL") != null) + ? System.getenv("ATTACHMENT_STORE_BASE_URL") + : ""; + } else { + attachmentStoreBaseUrl = "http://localdocker:9150"; + } + String workingDir = (overrideDir != null) ? overrideDir : System.getenv("TRANSFORMATION_DIR"); + Optional callbackUrl = (overrideCallbackUrl != null) + ? overrideCallbackUrl + : Optional.ofNullable(System.getenv("CALLBACK_URL")); + Optional jobId = (overrideJobId != null) ? overrideJobId : Optional.ofNullable(System.getenv("JOB_ID")); + Boolean useAccessUrl = Boolean.valueOf(System.getenv("USE_ACCESS_URL")); + + return new TransformerConfig( + jwtToken, + projectStoreBaseUrl, + serviceBaseUrl, + attachmentStoreBaseUrl, + workingDir, + useAccessUrl, + callbackUrl, + jobId); + } + + // Getters and setters for private fields... + public String getAttachmentStoreBaseUrl() { + return attachmentStoreBaseUrl; + } + + public void setAttachmentStoreBaseUrl(String attachmentStoreBaseUrl) { + this.attachmentStoreBaseUrl = attachmentStoreBaseUrl; + } + + public Optional getCallbackUrl() { + return callbackUrl; + } + + public void setCallbackUrl(Optional callbackUrl) { + this.callbackUrl = callbackUrl; + } + + public Optional getJobId() { + return jobId; + } + + public void setJobId(Optional jobId) { + this.jobId = jobId; + } + + public String getJwtToken() { + return jwtToken; + } + + public void setJwtToken(String jwtToken) { + this.jwtToken = jwtToken; + } + + public String getProjectStoreBaseUrl() { + return projectStoreBaseUrl; + } + + public void setProjectStoreBaseUrl(String projectStoreBaseUrl) { + this.projectStoreBaseUrl = projectStoreBaseUrl; + } + + public String getServiceBaseUrl() { + return serviceBaseUrl; + } + + public void setServiceBaseUrl(String serviceBaseUrl) { + this.serviceBaseUrl = serviceBaseUrl; + } + + public Boolean getUseAccessUrl() { + return useAccessUrl; + } + + public void setUseAccessUrl(Boolean useAccessUrl) { + this.useAccessUrl = useAccessUrl; + } + + public String getWorkingDir() { + return workingDir; + } + + public void setWorkingDir(String workingDir) { + this.workingDir = workingDir; + } } diff --git a/src/main/java/to/wetransform/hale/transformer/api/Init.java b/src/main/java/to/wetransform/hale/transformer/api/Init.java new file mode 100644 index 0000000..9be31ef --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/api/Init.java @@ -0,0 +1,55 @@ +package to.wetransform.hale.transformer.api; + +import java.net.URL; +import java.util.concurrent.atomic.AtomicBoolean; + +import groovy.lang.GroovySystem; +import org.eclipse.equinox.nonosgi.registry.RegistryFactoryHelper; +import org.slf4j.bridge.SLF4JBridgeHandler; +import to.wetransform.hale.transformer.TransformerConfig; +import to.wetransform.hale.transformer.api.internal.CustomMetaClassCreationHandle; +import to.wetransform.hale.transformer.api.urlhandler.CustomURLStreamHandlerFactory; +import to.wetransform.hale.transformer.api.urlhandler.ServiceURLStreamHandler; +import to.wetransform.hale.transformer.api.urlhandler.TokenProvider; + +public class Init { + + private static AtomicBoolean initialized = new AtomicBoolean(false); + + public static void init(final TransformerConfig config) { + TokenProvider tokenProvider = new TokenProvider() { + + @Override + public String getAuthenticationToken() { + return config.getJwtToken(); + } + }; + init(config, tokenProvider); + } + + public static void init(final TransformerConfig config, final TokenProvider tokenProvider) { + if (initialized.compareAndSet(false, true)) { + SLF4JBridgeHandler.install(); + + // initialize registry + RegistryFactoryHelper.getRegistry(); + + // initialize meta extensions + GroovySystem.getMetaClassRegistry().setMetaClassCreationHandle(new CustomMetaClassCreationHandle()); + + // initialize URL handlers + CustomURLStreamHandlerFactory factory = new CustomURLStreamHandlerFactory(); + + factory.addHandler("bucket", new ServiceURLStreamHandler(config.getServiceBaseUrl(), tokenProvider, false)); + factory.addHandler( + "project", new ServiceURLStreamHandler(config.getProjectStoreBaseUrl(), tokenProvider, false)); + if (!config.getAttachmentStoreBaseUrl().equals("")) { + factory.addHandler( + "attachment", + new ServiceURLStreamHandler(config.getAttachmentStoreBaseUrl(), tokenProvider, false)); + } + + URL.setURLStreamHandlerFactory(factory); + } + } +} diff --git a/src/main/java/to/wetransform/hale/transformer/api/internal/CustomMetaClassCreationHandle.java b/src/main/java/to/wetransform/hale/transformer/api/internal/CustomMetaClassCreationHandle.java new file mode 100644 index 0000000..6b6c055 --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/api/internal/CustomMetaClassCreationHandle.java @@ -0,0 +1,76 @@ +package to.wetransform.hale.transformer.api.internal; + +import java.lang.reflect.Constructor; + +import eu.esdihumboldt.util.groovy.meta.extension.MetaClassDescriptor; +import eu.esdihumboldt.util.groovy.meta.extension.MetaClassExtension; +import groovy.lang.MetaClass; +import groovy.lang.MetaClassRegistry; +import groovy.lang.MetaClassRegistry.MetaClassCreationHandle; +import org.eclipse.equinox.nonosgi.registry.RegistryFactoryHelper; + +/** + * Adapts created meta classes with delegating meta classes registered in the + * {@link MetaClassExtension}. + */ +public class CustomMetaClassCreationHandle extends MetaClassCreationHandle { + + private final MetaClassExtension ext; + + public CustomMetaClassCreationHandle() { + // initialize registry + RegistryFactoryHelper.getRegistry(); + + ext = new MetaClassExtension(); + } + + @Override + protected MetaClass createNormalMetaClass( + @SuppressWarnings("rawtypes") Class theClass, MetaClassRegistry registry) { + MetaClass metaClass = super.createNormalMetaClass(theClass, registry); + + for (MetaClassDescriptor descriptor : ext.getElements()) { + if (descriptorApplies(descriptor, theClass)) { + // create meta class + Class delegatingMetaClass = descriptor.getMetaClass(); + try { + Constructor constructor = delegatingMetaClass.getConstructor(MetaClass.class); + metaClass = (MetaClass) constructor.newInstance(metaClass); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + return metaClass; + } + + /** + * Check if a meta class descriptor applies to a given class. + * + * @param descriptor the meta class descriptor + * @param theClass the class for which should be determined if the descriptor + * applies + * @return true if the descriptor applies to the class, + * false otherwise + */ + private boolean descriptorApplies(MetaClassDescriptor descriptor, @SuppressWarnings("rawtypes") Class theClass) { + Class forClass = descriptor.getForClass(); + if (descriptor.isForArray()) { + if (theClass.isArray()) { + Class componentClass = theClass.getComponentType(); + if (componentClass != null) { + return forClass.equals(componentClass) || forClass.isAssignableFrom(componentClass); + } else { + // should actually not happen + return false; + } + } else { + // no array + return false; + } + } else { + return forClass.equals(theClass) || forClass.isAssignableFrom(theClass); + } + } +} diff --git a/src/main/java/to/wetransform/hale/transformer/api/messaging/TransformationMessageConsumer.java b/src/main/java/to/wetransform/hale/transformer/api/messaging/TransformationMessageConsumer.java index 1f5d524..7feb2d7 100644 --- a/src/main/java/to/wetransform/hale/transformer/api/messaging/TransformationMessageConsumer.java +++ b/src/main/java/to/wetransform/hale/transformer/api/messaging/TransformationMessageConsumer.java @@ -9,6 +9,7 @@ import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import to.wetransform.hale.transformer.Transformer; +import to.wetransform.hale.transformer.TransformerConfig; import to.wetransform.hale.transformer.api.TransformerApiApplication; @Service @@ -28,12 +29,13 @@ public void receiveMessage(final TransformationMessage message) { // TODO Implement mechanism to only accept a message from the queue if no // transformation is currently running + TransformerConfig config = TransformerConfig.createDefault(null, null, null, null); if (message.projectUrl != null && message.sourceDataUrl() != null) { - Transformer tx = new Transformer(); + Transformer tx = new Transformer(config); try { - tx.transform(); + tx.transform(message.sourceDataUrl(), message.projectUrl, null); tx.getLatch().await(10, TimeUnit.MINUTES); // TODO make configurable } catch (InterruptedException e) { // TODO What should be done when the transformation fails or times out? diff --git a/src/main/java/to/wetransform/hale/transformer/api/urlhandler/CustomURLStreamHandlerFactory.java b/src/main/java/to/wetransform/hale/transformer/api/urlhandler/CustomURLStreamHandlerFactory.java new file mode 100644 index 0000000..e159c16 --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/api/urlhandler/CustomURLStreamHandlerFactory.java @@ -0,0 +1,31 @@ +package to.wetransform.hale.transformer.api.urlhandler; + +import java.net.URLStreamHandler; +import java.net.URLStreamHandlerFactory; +import java.util.HashMap; +import java.util.Map; + +/** + * Custom URL handlers to allow authentication for cases where the client does + * not support it, or resolving relative URLs for cases where that is not + * possible. + */ +public class CustomURLStreamHandlerFactory implements URLStreamHandlerFactory { + + private final Map handlers = new HashMap<>(); + + /** + * Add an additional URL handler. + * + * @param protocol the protocol name + * @param handler the handler for the protocol + */ + public void addHandler(String protocol, URLStreamHandler handler) { + handlers.put(protocol.toLowerCase(), handler); + } + + @Override + public URLStreamHandler createURLStreamHandler(String protocol) { + return handlers.get(protocol); + } +} diff --git a/src/main/java/to/wetransform/hale/transformer/api/urlhandler/ServiceURLConnection.java b/src/main/java/to/wetransform/hale/transformer/api/urlhandler/ServiceURLConnection.java new file mode 100644 index 0000000..178ff5b --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/api/urlhandler/ServiceURLConnection.java @@ -0,0 +1,127 @@ +package to.wetransform.hale.transformer.api.urlhandler; + +import java.io.IOException; +import java.io.InputStream; +import java.io.UnsupportedEncodingException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLConnection; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; + +public class ServiceURLConnection extends URLConnection { + + private String ownerType; + private String ownerId; + private String path; + private String bucket; + + private final String serviceBaseUrl; + private final TokenProvider tokenProvider; + private boolean rawOnly; + + protected ServiceURLConnection(URL url, String serviceBaseUrl, TokenProvider tokenProvider, boolean rawOnly) { + super(url); + this.serviceBaseUrl = serviceBaseUrl; + this.tokenProvider = tokenProvider; + this.rawOnly = rawOnly; + } + + @Override + public void connect() throws IOException { + if (connected) return; + + doConnect(); + + connected = true; + } + + /** + * Reads the connection information from the URL. + */ + protected void doConnect() throws IOException { + List parts = Splitter.on('/').omitEmptyStrings().splitToList(getURL().getPath()); + if (parts.size() < 3) { + throw new IOException("URL is missing valid owner identification"); + } + ownerType = parts.get(0); + ownerId = parts.get(1); + bucket = parts.get(2); + + // path in bucket + path = Joiner.on('/').join(parts.subList(3, parts.size())); + } + + @Override + public InputStream getInputStream() throws IOException { + if (!connected) { + connect(); + } + + /* + * XXX note that this bypasses any authentication mechanisms by authenticating + * as the service + */ + + /* + * There are different possibilities to implement this: - Query the bucket + * information and access S3 directly via the provided access URL - or access + * via the raw file URL provided by the schema store + * + * For now using the latter option + */ + + // delegate to http connection + StringBuilder urlBuilder = new StringBuilder(serviceBaseUrl); + // TODO what to put here? + urlBuilder.append("/buckets/"); + urlBuilder.append(ownerType); + urlBuilder.append("/"); + urlBuilder.append(ownerId); + urlBuilder.append("/"); + urlBuilder.append(bucket); + if (rawOnly) { + urlBuilder.append("/raw/"); + } else if (!path.isEmpty()) { + urlBuilder.append("/"); + + // Don't encode "/", as they're used for accessing raw files within folders + String encodedPath = + Arrays.stream(path.split("/")).map(this::encodeString).collect(Collectors.joining("/")); + + urlBuilder.append(encodedPath); + } + + URL fileURL = new URL(urlBuilder.toString()); + HttpURLConnection httpConnection = (HttpURLConnection) fileURL.openConnection(); + + String token; + try { + token = tokenProvider.getAuthenticationToken(); + } catch (Exception e) { + throw new IOException("Unable to create authentication token for schema protocol request", e); + } + + httpConnection.setRequestProperty("Authorization", "Bearer " + token); + httpConnection.setRequestMethod("GET"); + + httpConnection.connect(); + + return httpConnection.getInputStream(); + } + + private String encodeString(String stringToEncode) { + try { + return URLEncoder.encode(stringToEncode, StandardCharsets.UTF_8.name()); + } catch (UnsupportedEncodingException e) { + // Not going to happen since StandardCharsets is used + } + return null; + } +} diff --git a/src/main/java/to/wetransform/hale/transformer/api/urlhandler/ServiceURLStreamHandler.java b/src/main/java/to/wetransform/hale/transformer/api/urlhandler/ServiceURLStreamHandler.java new file mode 100644 index 0000000..25ce254 --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/api/urlhandler/ServiceURLStreamHandler.java @@ -0,0 +1,25 @@ +package to.wetransform.hale.transformer.api.urlhandler; + +import java.io.IOException; +import java.net.URL; +import java.net.URLConnection; +import java.net.URLStreamHandler; + +public class ServiceURLStreamHandler extends URLStreamHandler { + + private final String serviceBaseUrl; + private final TokenProvider tokenProvider; + private boolean rawOnly; + + public ServiceURLStreamHandler(String serviceBaseUrl, TokenProvider tokenProvider, boolean rawOnly) { + this.serviceBaseUrl = serviceBaseUrl; + this.tokenProvider = tokenProvider; + this.rawOnly = rawOnly; + } + + @Override + protected URLConnection openConnection(URL u) throws IOException { + // Provide your implementation to open a connection for the specified URL + return new ServiceURLConnection(u, serviceBaseUrl, tokenProvider, rawOnly); + } +} diff --git a/src/main/java/to/wetransform/hale/transformer/api/urlhandler/TokenProvider.java b/src/main/java/to/wetransform/hale/transformer/api/urlhandler/TokenProvider.java new file mode 100644 index 0000000..dde1c8b --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/api/urlhandler/TokenProvider.java @@ -0,0 +1,6 @@ +package to.wetransform.hale.transformer.api.urlhandler; + +public interface TokenProvider { + + String getAuthenticationToken(); +}