From 00c8f9d3f41d6a82a7a0b94eb2c6849b65a01d0e Mon Sep 17 00:00:00 2001 From: BonaBeavis Date: Thu, 28 Nov 2019 17:35:55 +0100 Subject: [PATCH] Add support for geosparql functions --- nifi-sparql-integrate-processors/pom.xml | 8 +- .../SparqlIntegrateProcessor.java | 78 +++++++++++++------ pom.xml | 4 +- 3 files changed, 59 insertions(+), 31 deletions(-) diff --git a/nifi-sparql-integrate-processors/pom.xml b/nifi-sparql-integrate-processors/pom.xml index de85042..f1644ea 100644 --- a/nifi-sparql-integrate-processors/pom.xml +++ b/nifi-sparql-integrate-processors/pom.xml @@ -26,7 +26,7 @@ jar - 3.11.0 + 3.12.0 1 ${jena.version}-${jena-sparql-api.subversion} 1.5.8.RELEASE @@ -59,17 +59,17 @@ org.apache.nifi nifi-api - 1.6.0 + 1.9.2 org.apache.nifi nifi-utils - 1.6.0 + 1.9.2 org.apache.nifi nifi-mock - 1.6.0 + 1.9.2 test diff --git a/nifi-sparql-integrate-processors/src/main/java/org/aksw/sparql_integrate/processors/sparql_integrate/SparqlIntegrateProcessor.java b/nifi-sparql-integrate-processors/src/main/java/org/aksw/sparql_integrate/processors/sparql_integrate/SparqlIntegrateProcessor.java index 3774dd2..748342e 100644 --- a/nifi-sparql-integrate-processors/src/main/java/org/aksw/sparql_integrate/processors/sparql_integrate/SparqlIntegrateProcessor.java +++ b/nifi-sparql-integrate-processors/src/main/java/org/aksw/sparql_integrate/processors/sparql_integrate/SparqlIntegrateProcessor.java @@ -14,14 +14,13 @@ */ package org.aksw.sparql_integrate.processors.sparql_integrate; -import com.google.common.collect.Streams; import java.io.ByteArrayOutputStream; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -31,12 +30,17 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Stream; +import com.google.common.collect.Streams; import org.aksw.jena_sparql_api.sparql.ext.http.JenaExtensionHttp; import org.aksw.jena_sparql_api.sparql.ext.util.JenaExtensionUtil; import org.aksw.jena_sparql_api.stmt.SparqlStmt; import org.aksw.jena_sparql_api.stmt.SparqlStmtIterator; +import org.aksw.jena_sparql_api.stmt.SparqlStmtParser; import org.aksw.jena_sparql_api.stmt.SparqlStmtParserImpl; import org.aksw.jena_sparql_api.stmt.SparqlStmtQuery; +import org.aksw.jena_sparql_api.stmt.SparqlStmtUtils; +import org.apache.commons.io.FilenameUtils; +import org.apache.commons.io.IOUtils; import org.apache.jena.graph.Triple; import org.apache.jena.query.Dataset; import org.apache.jena.query.DatasetFactory; @@ -57,6 +61,7 @@ import org.apache.jena.shared.impl.PrefixMappingImpl; import org.apache.jena.sparql.core.Prologue; import org.apache.jena.sparql.core.Quad; +import org.apache.jena.sparql.lang.arq.ParseException; import org.apache.jena.update.UpdateRequest; import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.ReadsAttributes; @@ -68,7 +73,9 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -78,6 +85,7 @@ import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; +import io.github.galbiston.geosparql_jena.configuration.GeoSPARQLConfig; @Tags({"RDF", "SPARQL"}) @CapabilityDescription("This processor takes an SPARQL query as an argument and outputs a RDF-Turtle file.") @@ -114,7 +122,7 @@ public interface FLOW_FILE_CONTENTS { .name("SPARQL_QUERY") .displayName("SPARQL Query") .description("The SPARQL query to run.") - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .build(); @@ -175,29 +183,18 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final ComponentLog logger = getLogger(); final String contentFlowFile = context.getProperty(CONTENT_FLOW_FILE).getValue(); - final String baseUri = context.getProperty(BASE_URI).getValue(); final String rdfDataInputSyntax = context.getProperty(RDF_DATA_INPUT_SYNTAX).getValue(); final AtomicReference> stmts = new AtomicReference<>(); + // Disable creation of a derby.log file ; triggered by the GeoSPARQL module + System.setProperty("derby.stream.error.field", + "org.aksw.sparql_integrate.cli.DerbyUtil.DEV_NULL"); + // Init geosparql module + GeoSPARQLConfig.setupNoIndex(); FlowFile flowFile = session.get(); - - String sparqlQuery = new String(); - if (contentFlowFile.equals(FLOW_FILE_CONTENTS.SPARQL_QUERY)) { - final ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - session.exportTo(flowFile, bytes); - sparqlQuery = bytes.toString(); - } else { - sparqlQuery = context.getProperty(SPARQL_QUERY_PROPERTY).getValue(); - } - SparqlStmtIterator stmtIter = new SparqlStmtIterator(getStmtParser(baseUri), sparqlQuery); - stmts.set(Streams.stream(stmtIter)); - try { - } catch (Exception ex) { - ex.printStackTrace(); - getLogger().error("Failed to read sparql query."); - } - + String baseUri = context.getProperty(BASE_URI).evaluateAttributeExpressions().getValue(); Dataset dataset = DatasetFactory.create(); RDFConnection conn = RDFConnectionFactory.connect(dataset); Path path = null; @@ -206,17 +203,41 @@ public void onTrigger(final ProcessContext context, final ProcessSession session session.read(flowFile, new InputStreamCallback() { @Override public void process(InputStream in) throws IOException { - RDFDataMgr.read(dataset, in, baseUri, RDFLanguages.nameToLang(rdfDataInputSyntax)); + RDFDataMgr.read(dataset, in, RDFLanguages.nameToLang(rdfDataInputSyntax)); } }); break; case FLOW_FILE_CONTENTS.NON_RDF_DATA: - path = Paths.get(baseUri, flowFile.getAttribute("filename")); + path = new File("/tmp/" + flowFile.getAttribute("filename")).toPath(); + logger.error("Path: " + path.toString()); + baseUri = path.toAbsolutePath().getParent().toString() + "/"; session.exportTo(flowFile, path, false); break; } + String sparqlQuery = new String(); + if (contentFlowFile.equals(FLOW_FILE_CONTENTS.SPARQL_QUERY)) { + final ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + session.exportTo(flowFile, bytes); + sparqlQuery = bytes.toString(); + } else { + sparqlQuery = context.getProperty(SPARQL_QUERY_PROPERTY).evaluateAttributeExpressions(flowFile).getValue(); + } + logger.error("SparqlQuery: " + sparqlQuery); + logger.error("context.getProperty(SPARQL_QUERY_PROPERTY): " + context.getProperty(SPARQL_QUERY_PROPERTY)); + logger + .error("context.getProperty(SPARQL_QUERY_PROPERTY).evaluateAttributeExpressions(): " + + context.getProperty(SPARQL_QUERY_PROPERTY).evaluateAttributeExpressions()); + logger.error("BaseUri: " + baseUri.toString()); + + SparqlStmtIterator stmtIter; + try { + stmtIter = getStmtIter(baseUri, sparqlQuery); + stmts.set(Streams.stream(stmtIter)); + } catch (Exception e) { + e.printStackTrace(); + } flowFile = session.write(flowFile, new OutputStreamCallback() { @Override public void process(OutputStream out) throws IOException { @@ -231,10 +252,13 @@ public void process(OutputStream out) throws IOException { ioException.printStackTrace(); } } + String filename = FilenameUtils.getBaseName(flowFile.getAttribute("filename")) + ".nt"; + session.putAttribute(flowFile, "filename", filename); session.transfer(flowFile, SUCCESS); } - private static Function getStmtParser(String baseUri) { + private static SparqlStmtIterator getStmtIter(String baseUri, String sparqlQuery) + throws IOException, ParseException { PrefixMapping pm = new PrefixMappingImpl(); pm.setNsPrefixes(PrefixMapping.Extended); @@ -243,7 +267,11 @@ private static Function getStmtParser(String baseUri) { Prologue prologue = new Prologue(); prologue.setPrefixMapping(pm); prologue.setBaseURI(baseUri); - return SparqlStmtParserImpl.create(Syntax.syntaxARQ, prologue, true); + Function rawSparqlStmtParser = + SparqlStmtParserImpl.create(Syntax.syntaxARQ, prologue, true); + SparqlStmtParser sparqlStmtParser = SparqlStmtParser.wrapWithNamespaceTracking(pm, rawSparqlStmtParser); + SparqlStmtIterator stmts = SparqlStmtUtils.parse(IOUtils.toInputStream(sparqlQuery, "UTF-8"), sparqlStmtParser); + return stmts; } public static void processStmts(RDFConnection conn, SparqlStmt stmt, OutputStream out) { diff --git a/pom.xml b/pom.xml index 9e1da1f..26d08f3 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ org.apache.nifi nifi-nar-bundles - 1.6.0 + 1.9.2 org.aksw.sparql-integrate @@ -40,7 +40,7 @@ org.apache.nifi nifi-nifi-example-processors - 1.6.0 + 1.9.2