Skip to content

Commit

Permalink
Add support for geosparql functions
Browse files Browse the repository at this point in the history
  • Loading branch information
GordianDziwis committed Nov 28, 2019
1 parent 0b8f923 commit 00c8f9d
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 31 deletions.
8 changes: 4 additions & 4 deletions nifi-sparql-integrate-processors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<packaging>jar</packaging>

<properties>
<jena.version>3.11.0</jena.version>
<jena.version>3.12.0</jena.version>
<jena-sparql-api.subversion>1</jena-sparql-api.subversion>
<jena-sparql-api.version>${jena.version}-${jena-sparql-api.subversion}</jena-sparql-api.version>
<spring-boot.version>1.5.8.RELEASE</spring-boot.version>
Expand Down Expand Up @@ -59,17 +59,17 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>1.6.0</version>
<version>1.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.6.0</version>
<version>1.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.6.0</version>
<version>1.9.2</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@
*/
package org.aksw.sparql_integrate.processors.sparql_integrate;

import com.google.common.collect.Streams;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -31,12 +30,17 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Stream;
import com.google.common.collect.Streams;
import org.aksw.jena_sparql_api.sparql.ext.http.JenaExtensionHttp;
import org.aksw.jena_sparql_api.sparql.ext.util.JenaExtensionUtil;
import org.aksw.jena_sparql_api.stmt.SparqlStmt;
import org.aksw.jena_sparql_api.stmt.SparqlStmtIterator;
import org.aksw.jena_sparql_api.stmt.SparqlStmtParser;
import org.aksw.jena_sparql_api.stmt.SparqlStmtParserImpl;
import org.aksw.jena_sparql_api.stmt.SparqlStmtQuery;
import org.aksw.jena_sparql_api.stmt.SparqlStmtUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
import org.apache.jena.graph.Triple;
import org.apache.jena.query.Dataset;
import org.apache.jena.query.DatasetFactory;
Expand All @@ -57,6 +61,7 @@
import org.apache.jena.shared.impl.PrefixMappingImpl;
import org.apache.jena.sparql.core.Prologue;
import org.apache.jena.sparql.core.Quad;
import org.apache.jena.sparql.lang.arq.ParseException;
import org.apache.jena.update.UpdateRequest;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
Expand All @@ -68,7 +73,9 @@
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
Expand All @@ -78,6 +85,7 @@
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import io.github.galbiston.geosparql_jena.configuration.GeoSPARQLConfig;

@Tags({"RDF", "SPARQL"})
@CapabilityDescription("This processor takes an SPARQL query as an argument and outputs a RDF-Turtle file.")
Expand Down Expand Up @@ -114,7 +122,7 @@ public interface FLOW_FILE_CONTENTS {
.name("SPARQL_QUERY")
.displayName("SPARQL Query")
.description("The SPARQL query to run.")
.expressionLanguageSupported(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();

Expand Down Expand Up @@ -175,29 +183,18 @@ public void onScheduled(final ProcessContext context) {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

final ComponentLog logger = getLogger();
final String contentFlowFile = context.getProperty(CONTENT_FLOW_FILE).getValue();
final String baseUri = context.getProperty(BASE_URI).getValue();
final String rdfDataInputSyntax = context.getProperty(RDF_DATA_INPUT_SYNTAX).getValue();
final AtomicReference<Stream<SparqlStmt>> stmts = new AtomicReference<>();
// Disable creation of a derby.log file ; triggered by the GeoSPARQL module
System.setProperty("derby.stream.error.field",
"org.aksw.sparql_integrate.cli.DerbyUtil.DEV_NULL");

// Init geosparql module
GeoSPARQLConfig.setupNoIndex();
FlowFile flowFile = session.get();

String sparqlQuery = new String();
if (contentFlowFile.equals(FLOW_FILE_CONTENTS.SPARQL_QUERY)) {
final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
session.exportTo(flowFile, bytes);
sparqlQuery = bytes.toString();
} else {
sparqlQuery = context.getProperty(SPARQL_QUERY_PROPERTY).getValue();
}
SparqlStmtIterator stmtIter = new SparqlStmtIterator(getStmtParser(baseUri), sparqlQuery);
stmts.set(Streams.stream(stmtIter));
try {
} catch (Exception ex) {
ex.printStackTrace();
getLogger().error("Failed to read sparql query.");
}

String baseUri = context.getProperty(BASE_URI).evaluateAttributeExpressions().getValue();
Dataset dataset = DatasetFactory.create();
RDFConnection conn = RDFConnectionFactory.connect(dataset);
Path path = null;
Expand All @@ -206,17 +203,41 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
RDFDataMgr.read(dataset, in, baseUri, RDFLanguages.nameToLang(rdfDataInputSyntax));
RDFDataMgr.read(dataset, in, RDFLanguages.nameToLang(rdfDataInputSyntax));
}
});
break;
case FLOW_FILE_CONTENTS.NON_RDF_DATA:
path = Paths.get(baseUri, flowFile.getAttribute("filename"));
path = new File("/tmp/" + flowFile.getAttribute("filename")).toPath();
logger.error("Path: " + path.toString());
baseUri = path.toAbsolutePath().getParent().toString() + "/";
session.exportTo(flowFile, path, false);
break;
}
String sparqlQuery = new String();
if (contentFlowFile.equals(FLOW_FILE_CONTENTS.SPARQL_QUERY)) {
final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
session.exportTo(flowFile, bytes);
sparqlQuery = bytes.toString();
} else {
sparqlQuery = context.getProperty(SPARQL_QUERY_PROPERTY).evaluateAttributeExpressions(flowFile).getValue();
}

logger.error("SparqlQuery: " + sparqlQuery);
logger.error("context.getProperty(SPARQL_QUERY_PROPERTY): " + context.getProperty(SPARQL_QUERY_PROPERTY));
logger
.error("context.getProperty(SPARQL_QUERY_PROPERTY).evaluateAttributeExpressions(): "
+ context.getProperty(SPARQL_QUERY_PROPERTY).evaluateAttributeExpressions());
logger.error("BaseUri: " + baseUri.toString());


SparqlStmtIterator stmtIter;
try {
stmtIter = getStmtIter(baseUri, sparqlQuery);
stmts.set(Streams.stream(stmtIter));
} catch (Exception e) {
e.printStackTrace();
}
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
Expand All @@ -231,10 +252,13 @@ public void process(OutputStream out) throws IOException {
ioException.printStackTrace();
}
}
String filename = FilenameUtils.getBaseName(flowFile.getAttribute("filename")) + ".nt";
session.putAttribute(flowFile, "filename", filename);
session.transfer(flowFile, SUCCESS);
}

private static Function<String, SparqlStmt> getStmtParser(String baseUri) {
private static SparqlStmtIterator getStmtIter(String baseUri, String sparqlQuery)
throws IOException, ParseException {

PrefixMapping pm = new PrefixMappingImpl();
pm.setNsPrefixes(PrefixMapping.Extended);
Expand All @@ -243,7 +267,11 @@ private static Function<String, SparqlStmt> getStmtParser(String baseUri) {
Prologue prologue = new Prologue();
prologue.setPrefixMapping(pm);
prologue.setBaseURI(baseUri);
return SparqlStmtParserImpl.create(Syntax.syntaxARQ, prologue, true);
Function<String, SparqlStmt> rawSparqlStmtParser =
SparqlStmtParserImpl.create(Syntax.syntaxARQ, prologue, true);
SparqlStmtParser sparqlStmtParser = SparqlStmtParser.wrapWithNamespaceTracking(pm, rawSparqlStmtParser);
SparqlStmtIterator stmts = SparqlStmtUtils.parse(IOUtils.toInputStream(sparqlQuery, "UTF-8"), sparqlStmtParser);
return stmts;
}

public static void processStmts(RDFConnection conn, SparqlStmt stmt, OutputStream out) {
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>1.6.0</version>
<version>1.9.2</version>
</parent>

<groupId>org.aksw.sparql-integrate</groupId>
Expand All @@ -40,7 +40,7 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nifi-example-processors</artifactId>
<version>1.6.0</version>
<version>1.9.2</version>
</dependency>
</dependencies>
</dependencyManagement>
Expand Down

0 comments on commit 00c8f9d

Please sign in to comment.