diff --git a/.gitignore b/.gitignore index bec0c6a..8f0e027 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,5 @@ out/ *.code-workspace # Local History for Visual Studio Code .history/ +.vscode/settings.json + diff --git a/src/main/java/to/wetransform/hale/transformer/RunContext.java b/src/main/java/to/wetransform/hale/transformer/RunContext.java new file mode 100644 index 0000000..0c5a817 --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/RunContext.java @@ -0,0 +1,28 @@ +package to.wetransform.hale.transformer; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.io.FileUtils; + +public class RunContext { + + private final List tempFiles = new ArrayList<>(); + + public File createTempDir() throws IOException { + Path path = Files.createTempDirectory("hale-transformer"); + tempFiles.add(path); + return path.toFile(); + } + + public void cleanUp() throws IOException { + for (Path path : tempFiles) { + FileUtils.deleteDirectory(path.toFile()); + } + tempFiles.clear(); + } +} diff --git a/src/main/java/to/wetransform/hale/transformer/Transformer.java b/src/main/java/to/wetransform/hale/transformer/Transformer.java index 4f1465b..763dc33 100644 --- a/src/main/java/to/wetransform/hale/transformer/Transformer.java +++ b/src/main/java/to/wetransform/hale/transformer/Transformer.java @@ -1,65 +1,238 @@ package to.wetransform.hale.transformer; -import java.io.InputStream; +import java.io.*; import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; import java.text.MessageFormat; +import java.util.*; +import java.util.Map.Entry; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import com.google.common.base.Strings; import eu.esdihumboldt.hale.app.transform.ExecContext; +import eu.esdihumboldt.hale.app.transform.ExecTransformation; +import eu.esdihumboldt.hale.app.transform.ExecUtil; import eu.esdihumboldt.hale.common.core.HalePlatform; +import eu.esdihumboldt.hale.common.core.io.HaleIO; +import eu.esdihumboldt.hale.common.core.io.IOProvider; +import eu.esdihumboldt.hale.common.core.io.Value; +import eu.esdihumboldt.hale.common.core.io.extension.IOProviderDescriptor; +import eu.esdihumboldt.hale.common.core.io.extension.IOProviderExtension; +import eu.esdihumboldt.hale.common.core.io.project.model.IOConfiguration; import eu.esdihumboldt.hale.common.core.io.project.model.Project; import eu.esdihumboldt.hale.common.core.io.supplier.DefaultInputSupplier; +import eu.esdihumboldt.hale.common.core.report.Report; +import eu.esdihumboldt.hale.common.core.report.ReportSession; +import eu.esdihumboldt.hale.common.core.report.util.StatisticsHelper; +import eu.esdihumboldt.hale.common.core.report.writer.ReportReader; +import eu.esdihumboldt.hale.common.instance.io.InstanceIO; +import eu.esdihumboldt.hale.common.instance.io.InstanceWriter; +import eu.esdihumboldt.util.groovy.collector.StatsCollector; import eu.esdihumboldt.util.io.IOUtils; +import org.apache.commons.io.output.TeeOutputStream; +import org.eclipse.core.runtime.content.IContentType; +import org.json.JSONException; +import org.json.JSONObject; import org.osgi.framework.Version; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import to.wetransform.halecli.internal.Init; +import to.wetransform.hale.transformer.api.Init; public class Transformer { + /** + * Limit for log size, to include complete log. + *

+ * Current value: ~ 5MB with 4 byte characters (so likely ~1.3MB) We need to + * stay well below 16 MB due to MongoDB restrictions (plus megabytes of logs are + * not really helpful) + */ + private static final int MAX_LOG_SIZE_CHARACTERS = 1310720; + + private static final String MESSAGE_LINE_TRUNCATED = "[Line truncated...]"; + private static final String MESSAGE_LOG_TRUNCATED = "[Log truncated...]"; + + /** + * If the log is truncated, include lines from the end. + */ + private static final int TRUNCATED_INCLUDE_END_MAX_LINES = 50; + /** + * If the lines at the end to include exceed this limit, they are not included. + */ + private static final int TRUNCATED_INCLUDE_END_MAX_CHARACTERS = 25600; + private static final Logger LOG = LoggerFactory.getLogger(Transformer.class); - private CountDownLatch latch = new CountDownLatch(1); + private final CountDownLatch latch = new CountDownLatch(1); - public void transform(/* TODO add parameters for data and project sources */ ) { - // TODO setup log files for reports and transformation log + public void transform(String sourceDataURL, String projectURL, String targetURL) { - long heapMaxSize = Runtime.getRuntime().maxMemory(); - LOG.info("Maximum heap size configured as " + IOUtils.humanReadableByteCount(heapMaxSize, false)); + boolean success = false; + PrintStream sysOut = System.out; + File transformationLogFile = null; - Init.init(); + RunContext run = new RunContext(); - Version version = HalePlatform.getCoreVersion(); - LOG.info(MessageFormat.format("Launching hale-transformer {0}...", version.toString())); + // init working directory + // Create a temporary directory + Path tempDirectory = null; + try { + tempDirectory = Files.createTempDirectory("hale-transformer"); + File workingFile = null; - ExecContext context = new ExecContext(); + if (tempDirectory != null) { + workingFile = tempDirectory.toFile(); + } else { + workingFile = run.createTempDir(); + } - // URI projectUri = .... - // context.setProject(projectUri); - // Project project = loadProject(projectUri); + LOG.info("Using working directory " + tempDirectory.toString()); + File reportFile = null; + try { + // setup report file + reportFile = + Files.createTempFile(tempDirectory, "reports", ".log").toFile(); + reportFile.delete(); + reportFile.createNewFile(); + } catch (Throwable t) { + LOG.error("Error in creating report log file."); + return; + } - // context.setSources(...) - // context.setSourceProviderIds(...) - // context.setSourcesSettings(...) + try { + // setup log file + transformationLogFile = Files.createTempFile(tempDirectory, "transformation", ".log") + .toFile(); + transformationLogFile.delete(); + transformationLogFile.createNewFile(); - // Value sourceCrs = null; - // TODO determine source CRS + // fork streams + FileOutputStream transformationLogOut = new FileOutputStream(transformationLogFile, true); + OutputStream tOut = new TeeOutputStream(sysOut, transformationLogOut); + System.setOut(new PrintStream(tOut)); + } catch (Throwable t) { + LOG.error("Error in creating log file."); + return; + } - // TargetConfig targetConfig = configureTarget(project, sourceCrs); + LOG.info("Startup..."); + + long heapMaxSize = Runtime.getRuntime().maxMemory(); + LOG.info("Maximum heap size configured as " + IOUtils.humanReadableByteCount(heapMaxSize, false)); + + // initialize + Init.init(); + + Version version = HalePlatform.getCoreVersion(); + LOG.info(MessageFormat.format("Launching hale-transformer {0}...", version.toString())); + + ExecContext context = new ExecContext(); + + // Set up project URI + URI projectUri = new URI(projectURL); + context.setProject(projectUri); + + // Load project + Project project = loadProject(projectUri); + + SourceConfig sourceConfig = new SourceConfig( + new URI(sourceDataURL), "eu.esdihumboldt.hale.io.gml.reader", null, true, new ArrayList<>()); + + List sourceConfigs = new ArrayList<>(); + sourceConfigs.add(sourceConfig); + + context.setSources(sourceConfigs.stream() + .filter(sourceConfigList -> sourceConfigList.transform()) + .map(sourceConfigList -> sourceConfig.location()) + .collect(Collectors.toList())); + context.setSourceProviderIds(sourceConfigs.stream() + .map(sourceConfigList -> sourceConfigList.providerId()) + .collect(Collectors.toList())); + context.setSourcesSettings(sourceConfigs.stream() + .map(sourceConfigList -> sourceConfigList.settings()) + .collect(Collectors.toList())); + + // extract detected source data crs and use in target config + Value sourceCrs = null; + Throwable error = null; + // try each source config, in case it is not set for some + for (int i = 0; i < sourceConfigs.size() && (sourceCrs == null || sourceCrs.isEmpty()); i++) { + try { + sourceCrs = ((SourceConfig) sourceConfigs.get(i)).settings().get("defaultSrs"); + } catch (Throwable t) { + error = t; + } + } + + if (error != null) { + LOG.warn("Could not determine source data CRS", error); + } else if (sourceCrs == null || sourceCrs.isEmpty()) { + LOG.warn( + "Unable to determine source data CRS: None of {} sources is configured with a CRS", + sourceConfigs.size()); + } + + TargetConfig targetConfig = configureTarget(project, sourceCrs); + + File resultDir = new File(workingFile, "result"); + resultDir.mkdir(); + String targetFilename = targetConfig.filename(); + if (targetFilename == null) { + targetFilename = "result.out"; + } + File targetFile = new File(resultDir, targetFilename); + context.setTarget(targetFile.toURI()); + String preset = targetConfig.preset(); + CustomTarget customConfig = targetConfig.customTarget(); + if (preset != null) { + context.setPreset(preset); + } else { + if (customConfig == null || customConfig.providerId() == null) { + throw new IllegalStateException("No configuration on how to write transformed data available"); + } + context.setTargetProviderId(customConfig.providerId()); + } + + if (customConfig != null) { + context.setTargetSettings(customConfig.settings()); + } else { + context.setTargetSettings(new HashMap<>()); + } + + context.setReportsOut(reportFile); + + // general configuration + context.setLogException(true); - try { // run the transformation + LOG.info("Transforming started."); + new ExecTransformation().run(context); + + // upload any result - LOG.info("Transforming..."); - TimeUnit.SECONDS.sleep(30); - // new ExecTransformation().run(context); + // evaluate results + ReportReader reader = new ReportReader(); + ReportSession reports = reader.readFile(reportFile); + JSONObject stats = getStats(reports); + success = evaluateReports(reports.getAllReports().values(), false); LOG.info("Transformation complete."); } catch (Throwable t) { + try { + Files.delete(tempDirectory); + } catch (IOException e) { + LOG.error("Cannot remove the working directory: " + e.getMessage(), e); + } LOG.error("Failed to execute transformation: " + t.getMessage(), t); } finally { latch.countDown(); + try { + Files.delete(tempDirectory); + } catch (IOException e) { + LOG.error("Cannot remove the working directory: " + e.getMessage(), e); + } } } @@ -74,6 +247,201 @@ private Project loadProject(URI projectUri) { return result; } + private TargetConfig configureTarget(Project lp, Value sourceCrs) { + String filename; + String preset = null; + CustomTarget customTarget = null; + + Map presets = getPresets(lp); + + // Preset names + String defaultPreset = "default"; + String hcPreset = "hale-connect"; + + if (presets.containsKey(hcPreset)) { + // Project contains hale-connect preset + preset = hcPreset; + IOConfiguration ioConfiguration = presets.get(hcPreset); + filename = determineTargetFileName(ioConfiguration); + } else if (presets.containsKey(defaultPreset)) { + // Project contains default preset + preset = defaultPreset; + IOConfiguration ioConfiguration = presets.get(defaultPreset); + filename = determineTargetFileName(ioConfiguration); + } else { + // No specific presets found, creating a custom target configuration + + Map targetMap = new HashMap<>(); + + // Specify target provider for GML FeatureCollection + String targetProvider = "eu.esdihumboldt.hale.io.gml.writer"; + + // Additional settings for testing + targetMap.put("xml.pretty", Value.of(true)); + targetMap.put("crs.epsg.prefix", Value.of("http://www.opengis.net/def/crs/EPSG/0/")); + + // Use CRS from source data analysis if available and a valid EPSG code, + // otherwise fallback to EPSG:4326 + Value targetCrs = + (sourceCrs != null && sourceCrs.getStringRepresentation().startsWith("code:EPSG")) + ? sourceCrs + : Value.of("code:EPSG:4326"); + + targetMap.put("crs", targetCrs); + LOG.info("Using {} as the transformation target CRS", targetCrs.getStringRepresentation()); + + // Create a custom target configuration + CustomTarget target = new CustomTarget(targetProvider, targetMap); + + filename = "inspire.gml"; + customTarget = target; + } + + // Create and return the target configuration + return new TargetConfig(filename, preset, customTarget); + } + + /** + * Determine the name of the target file based on an export preset. + * + * @param preset the export preset + * @return the file name for the target file + */ + public static String determineTargetFileName(IOConfiguration preset) { + // Default extension to "xml" to reflect old behavior + String extension = "xml"; + + IContentType contentType = determineContentType(preset); + if (contentType != null) { + // Derive extension from content type + String[] extensions = contentType.getFileSpecs(IContentType.FILE_EXTENSION_SPEC); + if (extensions != null && extensions.length > 0) { + extension = extensions[0]; // Choose the first one + } + } + + // If extension would be "gml," use "xml" instead for backward compatibility + extension = "gml".equalsIgnoreCase(extension) ? "xml" : extension; + + LOG.info("Chose .{} as the extension for the target file", extension); + + return "result." + extension; + } + + private static IContentType determineContentType(IOConfiguration preset) { + // Usually, the content type is part of the settings + Value value = preset.getProviderConfiguration().get(IOProvider.PARAM_CONTENT_TYPE); + if (value != null && !value.isEmpty()) { + return HalePlatform.getContentTypeManager().getContentType(value.as(String.class)); + } + + // Try to determine based on provider ID + String providerId = preset.getProviderId(); + if (providerId != null) { + IOProviderDescriptor providerDescriptor = + IOProviderExtension.getInstance().getFactory(providerId); + if (providerDescriptor != null) { + Set supportedTypes = providerDescriptor.getSupportedTypes(); + if (!supportedTypes.isEmpty()) { + IContentType contentType = supportedTypes.iterator().next(); + + if (supportedTypes.size() > 1) { + LOG.warn( + "Multiple content types as candidates ({}), chose {}", + supportedTypes.stream().map(IContentType::getId).collect(Collectors.joining(", ")), + contentType.getId()); + } + + return contentType; + } + } + } + + return null; + } + + /** + * Get all export presets from the project. + * + * @param project the hale project object + * @return the map of presets + */ + private Map getPresets(Project project) { + Map exportPresets = new HashMap<>(); + + if (project == null) { + return exportPresets; + } + + for (Entry entry : + project.getExportConfigurations().entrySet()) { + IOConfiguration originalConfiguration = entry.getValue(); + + if (InstanceIO.ACTION_SAVE_TRANSFORMED_DATA.equals(originalConfiguration.getActionId())) { + String presetName = entry.getKey(); + IOConfiguration clonedConfiguration = originalConfiguration.clone(); + + // Check and add the I/O provider to exportPresets + checkAndAddIOProvider(presetName, clonedConfiguration, exportPresets); + } + } + + return exportPresets; + } + + private void checkAndAddIOProvider( + String presetName, IOConfiguration configuration, Map exportPresets) { + String providerId = configuration.getProviderId(); + IOProviderDescriptor factory = HaleIO.findIOProviderFactory(InstanceWriter.class, null, providerId); + + if (factory != null) { + String name = Strings.isNullOrEmpty(presetName) ? factory.getDisplayName() : presetName; + exportPresets.computeIfAbsent(name, k -> configuration); + } else { + LOG.error("I/O provider {} for export preset {} not found", providerId, presetName); + } + } + + /** + * After transformation, assemble stats from reports. + * + * @param reports the reports + */ + private JSONObject getStats(ReportSession reports) { + StatsCollector root = + new StatisticsHelper().getStatistics(reports.getAllReports().values(), true); + + try { + return new JSONObject(root.saveToJson(false)); + } catch (JSONException e) { + LOG.error("Error assembling stats JSON representation", e); + return null; + } + } + + private boolean evaluateReports(Collection> reports, boolean detailed) { + boolean ok = true; + ExecUtil.info("Transformation tasks summaries:"); + + for (Report report : reports) { + if (!report.isSuccess() || !report.getErrors().isEmpty()) { + ok = false; + + ExecUtil.error(report.getTaskName() + ": " + report.getSummary()); + if (detailed) { + report.getErrors().forEach(e -> { + ExecUtil.error(e.getStackTrace()); + }); + } + } else { + ExecUtil.info(report.getTaskName() + ": " + report.getSummary()); + } + // TODO process information, provide in a usable way? + } + + return ok; + } + public CountDownLatch getLatch() { return latch; } diff --git a/src/main/java/to/wetransform/hale/transformer/TransformerConfig.java b/src/main/java/to/wetransform/hale/transformer/TransformerConfig.java deleted file mode 100644 index e0b76a4..0000000 --- a/src/main/java/to/wetransform/hale/transformer/TransformerConfig.java +++ /dev/null @@ -1,6 +0,0 @@ -package to.wetransform.hale.transformer; - -public class TransformerConfig { - - // empty for now -} diff --git a/src/main/java/to/wetransform/hale/transformer/api/Init.java b/src/main/java/to/wetransform/hale/transformer/api/Init.java new file mode 100644 index 0000000..03f02f2 --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/api/Init.java @@ -0,0 +1,19 @@ +package to.wetransform.hale.transformer.api; + +import groovy.lang.GroovySystem; +import org.eclipse.equinox.nonosgi.registry.RegistryFactoryHelper; +import org.slf4j.bridge.SLF4JBridgeHandler; +import to.wetransform.hale.transformer.api.internal.CustomMetaClassCreationHandle; + +public class Init { + + public static void init() { + SLF4JBridgeHandler.install(); + + // initialize registry + RegistryFactoryHelper.getRegistry(); + + // initialize meta extensions + GroovySystem.getMetaClassRegistry().setMetaClassCreationHandle(new CustomMetaClassCreationHandle()); + } +} diff --git a/src/main/java/to/wetransform/hale/transformer/api/internal/CountdownLatchConfig.java b/src/main/java/to/wetransform/hale/transformer/api/internal/CountdownLatchConfig.java new file mode 100644 index 0000000..480a203 --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/api/internal/CountdownLatchConfig.java @@ -0,0 +1,20 @@ +package to.wetransform.hale.transformer.api.internal; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; + +@Configuration +@ComponentScan +public class CountdownLatchConfig { + @Value("${countdownLatch.waiting-time}") + private long waitingTime; + + public long getWaitingTime() { + return waitingTime; + } + + public void setWaitingTime(long waitingTime) { + this.waitingTime = waitingTime; + } +} diff --git a/src/main/java/to/wetransform/hale/transformer/api/internal/CustomMetaClassCreationHandle.java b/src/main/java/to/wetransform/hale/transformer/api/internal/CustomMetaClassCreationHandle.java new file mode 100644 index 0000000..6b6c055 --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/api/internal/CustomMetaClassCreationHandle.java @@ -0,0 +1,76 @@ +package to.wetransform.hale.transformer.api.internal; + +import java.lang.reflect.Constructor; + +import eu.esdihumboldt.util.groovy.meta.extension.MetaClassDescriptor; +import eu.esdihumboldt.util.groovy.meta.extension.MetaClassExtension; +import groovy.lang.MetaClass; +import groovy.lang.MetaClassRegistry; +import groovy.lang.MetaClassRegistry.MetaClassCreationHandle; +import org.eclipse.equinox.nonosgi.registry.RegistryFactoryHelper; + +/** + * Adapts created meta classes with delegating meta classes registered in the + * {@link MetaClassExtension}. + */ +public class CustomMetaClassCreationHandle extends MetaClassCreationHandle { + + private final MetaClassExtension ext; + + public CustomMetaClassCreationHandle() { + // initialize registry + RegistryFactoryHelper.getRegistry(); + + ext = new MetaClassExtension(); + } + + @Override + protected MetaClass createNormalMetaClass( + @SuppressWarnings("rawtypes") Class theClass, MetaClassRegistry registry) { + MetaClass metaClass = super.createNormalMetaClass(theClass, registry); + + for (MetaClassDescriptor descriptor : ext.getElements()) { + if (descriptorApplies(descriptor, theClass)) { + // create meta class + Class delegatingMetaClass = descriptor.getMetaClass(); + try { + Constructor constructor = delegatingMetaClass.getConstructor(MetaClass.class); + metaClass = (MetaClass) constructor.newInstance(metaClass); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + return metaClass; + } + + /** + * Check if a meta class descriptor applies to a given class. + * + * @param descriptor the meta class descriptor + * @param theClass the class for which should be determined if the descriptor + * applies + * @return true if the descriptor applies to the class, + * false otherwise + */ + private boolean descriptorApplies(MetaClassDescriptor descriptor, @SuppressWarnings("rawtypes") Class theClass) { + Class forClass = descriptor.getForClass(); + if (descriptor.isForArray()) { + if (theClass.isArray()) { + Class componentClass = theClass.getComponentType(); + if (componentClass != null) { + return forClass.equals(componentClass) || forClass.isAssignableFrom(componentClass); + } else { + // should actually not happen + return false; + } + } else { + // no array + return false; + } + } else { + return forClass.equals(theClass) || forClass.isAssignableFrom(theClass); + } + } +} diff --git a/src/main/java/to/wetransform/hale/transformer/api/messaging/TransformationMessageConsumer.java b/src/main/java/to/wetransform/hale/transformer/api/messaging/TransformationMessageConsumer.java index 1f5d524..5e713ef 100644 --- a/src/main/java/to/wetransform/hale/transformer/api/messaging/TransformationMessageConsumer.java +++ b/src/main/java/to/wetransform/hale/transformer/api/messaging/TransformationMessageConsumer.java @@ -7,9 +7,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import to.wetransform.hale.transformer.Transformer; import to.wetransform.hale.transformer.api.TransformerApiApplication; +import to.wetransform.hale.transformer.api.internal.CountdownLatchConfig; @Service public class TransformationMessageConsumer { @@ -22,23 +24,31 @@ public record TransformationMessage( private static final Logger LOG = LoggerFactory.getLogger(TransformationMessageConsumer.class); + private final CountdownLatchConfig countdownLatchConfig; + + @Autowired + public TransformationMessageConsumer(CountdownLatchConfig countdownLatchConfig) { + this.countdownLatchConfig = countdownLatchConfig; + } + @RabbitListener(queues = TransformerApiApplication.QUEUE_NAME) public void receiveMessage(final TransformationMessage message) { LOG.info("Received projectUrl = " + message.projectUrl + " sourceDataUrl = " + message.sourceDataUrl); // TODO Implement mechanism to only accept a message from the queue if no // transformation is currently running - if (message.projectUrl != null && message.sourceDataUrl() != null) { Transformer tx = new Transformer(); try { - tx.transform(); - tx.getLatch().await(10, TimeUnit.MINUTES); // TODO make configurable + LOG.info("Transformation started"); + tx.transform(message.sourceDataUrl(), message.projectUrl, null); + tx.getLatch().await(countdownLatchConfig.getWaitingTime(), TimeUnit.MINUTES); } catch (InterruptedException e) { // TODO What should be done when the transformation fails or times out? // - Simply requeuing the message is probably not helpful // - Send a message back so that the producer can react? + Thread.currentThread().interrupt(); LOG.error("Transformation process timed out: " + e.getMessage(), e); } } diff --git a/src/main/java/to/wetransform/hale/transformer/api/urlhandler/CustomURLStreamHandlerFactory.java b/src/main/java/to/wetransform/hale/transformer/api/urlhandler/CustomURLStreamHandlerFactory.java new file mode 100644 index 0000000..e159c16 --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/api/urlhandler/CustomURLStreamHandlerFactory.java @@ -0,0 +1,31 @@ +package to.wetransform.hale.transformer.api.urlhandler; + +import java.net.URLStreamHandler; +import java.net.URLStreamHandlerFactory; +import java.util.HashMap; +import java.util.Map; + +/** + * Custom URL handlers to allow authentication for cases where the client does + * not support it, or resolving relative URLs for cases where that is not + * possible. + */ +public class CustomURLStreamHandlerFactory implements URLStreamHandlerFactory { + + private final Map handlers = new HashMap<>(); + + /** + * Add an additional URL handler. + * + * @param protocol the protocol name + * @param handler the handler for the protocol + */ + public void addHandler(String protocol, URLStreamHandler handler) { + handlers.put(protocol.toLowerCase(), handler); + } + + @Override + public URLStreamHandler createURLStreamHandler(String protocol) { + return handlers.get(protocol); + } +} diff --git a/src/main/java/to/wetransform/hale/transformer/api/urlhandler/ServiceURLConnection.java b/src/main/java/to/wetransform/hale/transformer/api/urlhandler/ServiceURLConnection.java new file mode 100644 index 0000000..178ff5b --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/api/urlhandler/ServiceURLConnection.java @@ -0,0 +1,127 @@ +package to.wetransform.hale.transformer.api.urlhandler; + +import java.io.IOException; +import java.io.InputStream; +import java.io.UnsupportedEncodingException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLConnection; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; + +public class ServiceURLConnection extends URLConnection { + + private String ownerType; + private String ownerId; + private String path; + private String bucket; + + private final String serviceBaseUrl; + private final TokenProvider tokenProvider; + private boolean rawOnly; + + protected ServiceURLConnection(URL url, String serviceBaseUrl, TokenProvider tokenProvider, boolean rawOnly) { + super(url); + this.serviceBaseUrl = serviceBaseUrl; + this.tokenProvider = tokenProvider; + this.rawOnly = rawOnly; + } + + @Override + public void connect() throws IOException { + if (connected) return; + + doConnect(); + + connected = true; + } + + /** + * Reads the connection information from the URL. + */ + protected void doConnect() throws IOException { + List parts = Splitter.on('/').omitEmptyStrings().splitToList(getURL().getPath()); + if (parts.size() < 3) { + throw new IOException("URL is missing valid owner identification"); + } + ownerType = parts.get(0); + ownerId = parts.get(1); + bucket = parts.get(2); + + // path in bucket + path = Joiner.on('/').join(parts.subList(3, parts.size())); + } + + @Override + public InputStream getInputStream() throws IOException { + if (!connected) { + connect(); + } + + /* + * XXX note that this bypasses any authentication mechanisms by authenticating + * as the service + */ + + /* + * There are different possibilities to implement this: - Query the bucket + * information and access S3 directly via the provided access URL - or access + * via the raw file URL provided by the schema store + * + * For now using the latter option + */ + + // delegate to http connection + StringBuilder urlBuilder = new StringBuilder(serviceBaseUrl); + // TODO what to put here? + urlBuilder.append("/buckets/"); + urlBuilder.append(ownerType); + urlBuilder.append("/"); + urlBuilder.append(ownerId); + urlBuilder.append("/"); + urlBuilder.append(bucket); + if (rawOnly) { + urlBuilder.append("/raw/"); + } else if (!path.isEmpty()) { + urlBuilder.append("/"); + + // Don't encode "/", as they're used for accessing raw files within folders + String encodedPath = + Arrays.stream(path.split("/")).map(this::encodeString).collect(Collectors.joining("/")); + + urlBuilder.append(encodedPath); + } + + URL fileURL = new URL(urlBuilder.toString()); + HttpURLConnection httpConnection = (HttpURLConnection) fileURL.openConnection(); + + String token; + try { + token = tokenProvider.getAuthenticationToken(); + } catch (Exception e) { + throw new IOException("Unable to create authentication token for schema protocol request", e); + } + + httpConnection.setRequestProperty("Authorization", "Bearer " + token); + httpConnection.setRequestMethod("GET"); + + httpConnection.connect(); + + return httpConnection.getInputStream(); + } + + private String encodeString(String stringToEncode) { + try { + return URLEncoder.encode(stringToEncode, StandardCharsets.UTF_8.name()); + } catch (UnsupportedEncodingException e) { + // Not going to happen since StandardCharsets is used + } + return null; + } +} diff --git a/src/main/java/to/wetransform/hale/transformer/api/urlhandler/ServiceURLStreamHandler.java b/src/main/java/to/wetransform/hale/transformer/api/urlhandler/ServiceURLStreamHandler.java new file mode 100644 index 0000000..25ce254 --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/api/urlhandler/ServiceURLStreamHandler.java @@ -0,0 +1,25 @@ +package to.wetransform.hale.transformer.api.urlhandler; + +import java.io.IOException; +import java.net.URL; +import java.net.URLConnection; +import java.net.URLStreamHandler; + +public class ServiceURLStreamHandler extends URLStreamHandler { + + private final String serviceBaseUrl; + private final TokenProvider tokenProvider; + private boolean rawOnly; + + public ServiceURLStreamHandler(String serviceBaseUrl, TokenProvider tokenProvider, boolean rawOnly) { + this.serviceBaseUrl = serviceBaseUrl; + this.tokenProvider = tokenProvider; + this.rawOnly = rawOnly; + } + + @Override + protected URLConnection openConnection(URL u) throws IOException { + // Provide your implementation to open a connection for the specified URL + return new ServiceURLConnection(u, serviceBaseUrl, tokenProvider, rawOnly); + } +} diff --git a/src/main/java/to/wetransform/hale/transformer/api/urlhandler/TokenProvider.java b/src/main/java/to/wetransform/hale/transformer/api/urlhandler/TokenProvider.java new file mode 100644 index 0000000..dde1c8b --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/api/urlhandler/TokenProvider.java @@ -0,0 +1,6 @@ +package to.wetransform.hale.transformer.api.urlhandler; + +public interface TokenProvider { + + String getAuthenticationToken(); +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 8b13789..695bf6f 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1 +1,2 @@ - +# Configuration for CountDownLatch waiting time in milliseconds +countdownLatch.waiting-time=10 \ No newline at end of file