Skip to content

Commit

Permalink
NIFI-14266 - Allow ExecuteSQL to not overwrite flowfile content when …
Browse files Browse the repository at this point in the history
…no result

Co-authored-by: Marcin Gemra <[email protected]>
  • Loading branch information
pvillard31 and sfc-gh-mgemra committed Feb 25, 2025
1 parent 96eea40 commit 4027d8f
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
Expand Down Expand Up @@ -101,9 +103,9 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();

public static final PropertyDescriptor SQL_SELECT_QUERY = new PropertyDescriptor.Builder()
.name("SQL select query")
.description("The SQL select query to execute. The query can be empty, a constant value, or built from attributes "
public static final PropertyDescriptor SQL_QUERY = new PropertyDescriptor.Builder()
.name("SQL Query")
.description("The SQL query to execute. The query can be empty, a constant value, or built from attributes "
+ "using Expression Language. If this property is specified, it will be used regardless of the content of "
+ "incoming flowfiles. If this property is empty, the content of the incoming flow file is expected "
+ "to contain a valid SQL select query, to be issued by the processor to the database. Note that Expression "
Expand Down Expand Up @@ -188,6 +190,17 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
.required(true)
.build();

public static final PropertyDescriptor CONTENT_OUTPUT_STRATEGY = new PropertyDescriptor.Builder()
.name("Content Output Strategy")
.description("""
If the query didn't return any result, this property specifies if the processor should overwrite the
FlowFile's content or ignore it (when the processor is triggered by an incoming FlowFile).
""")
.allowableValues(ContentOutputStrategy.class)
.defaultValue(ContentOutputStrategy.EMPTY_RESULT)
.required(true)
.build();

protected List<PropertyDescriptor> propDescriptors;

protected DBCPService dbcpService;
Expand All @@ -202,10 +215,15 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propDescriptors;
}

@Override
public void migrateProperties(final PropertyConfiguration config) {
config.renameProperty("SQL select query", SQL_QUERY.getName());
}

@OnScheduled
public void setup(ProcessContext context) {
// If the query is not set, then an incoming flow file is needed. Otherwise fail the initialization
if (!context.getProperty(SQL_SELECT_QUERY).isSet() && !context.hasIncomingConnection()) {
if (!context.getProperty(SQL_QUERY).isSet() && !context.hasIncomingConnection()) {
final String errorString = "Either the Select Query must be specified or there must be an incoming connection "
+ "providing flowfile(s) containing a SQL select query";
getLogger().error(errorString);
Expand Down Expand Up @@ -244,8 +262,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
SqlWriter sqlWriter = configureSqlWriter(session, context, fileToProcess);

String selectQuery;
if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
if (context.getProperty(SQL_QUERY).isSet()) {
selectQuery = context.getProperty(SQL_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
} else {
// If the query is not set, then an incoming flow file is required, and expected to contain a valid SQL select query.
// If there is no incoming connection, onTrigger will not be called as the processor will fail when scheduled.
Expand Down Expand Up @@ -456,7 +474,12 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
session.remove(fileToProcess);
} else {
// If we had no results then transfer the original flow file downstream to trigger processors
session.transfer(setFlowFileEmptyResults(session, fileToProcess, sqlWriter), REL_SUCCESS);
if (context.getProperty(CONTENT_OUTPUT_STRATEGY).asAllowableValue(ContentOutputStrategy.class) == null
|| context.getProperty(CONTENT_OUTPUT_STRATEGY).asAllowableValue(ContentOutputStrategy.class) == ContentOutputStrategy.EMPTY_RESULT) {
session.transfer(setFlowFileEmptyResults(session, fileToProcess, sqlWriter), REL_SUCCESS);
} else {
session.transfer(fileToProcess, REL_SUCCESS);
}
}
} else if (resultCount == 0) {
// If we had no inbound FlowFile, no exceptions, and the SQL generated no result sets (Insert/Update/Delete statements only)
Expand Down Expand Up @@ -531,4 +554,38 @@ protected List<String> getQueries(final String value) {
}

protected abstract SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context, FlowFile fileToProcess);

enum ContentOutputStrategy implements DescribedValue {
EMPTY_RESULT(
"Overwrite Content",
"Overwrites the FlowFile content with the empty result set."
),
IGNORED(
"Ignore Results",
"Ignores the result and passes the incoming FlowFile content to the next processor."
);

private final String value;
private final String description;

ContentOutputStrategy(final String value, final String description) {
this.value = value;
this.description = description;
}

@Override
public String getValue() {
return this.value;
}

@Override
public String getDisplayName() {
return this.value;
}

@Override
public String getDescription() {
return this.description;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.sql.DefaultAvroSqlWriter;
import org.apache.nifi.processors.standard.sql.SqlWriter;
import org.apache.nifi.util.db.AvroUtil.CodecType;
import org.apache.nifi.util.db.JdbcCommon;

import java.util.List;
import java.util.Set;

import static org.apache.nifi.util.db.AvroUtil.CodecType;
import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_PRECISION;
import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_SCALE;
import static org.apache.nifi.util.db.JdbcProperties.NORMALIZE_NAMES_FOR_AVRO;
Expand Down Expand Up @@ -150,7 +150,7 @@ public ExecuteSQL() {
propDescriptors = List.of(
DBCP_SERVICE,
SQL_PRE_QUERY,
SQL_SELECT_QUERY,
SQL_QUERY,
SQL_POST_QUERY,
QUERY_TIMEOUT,
NORMALIZE_NAMES_FOR_AVRO,
Expand All @@ -161,7 +161,8 @@ public ExecuteSQL() {
MAX_ROWS_PER_FLOW_FILE,
OUTPUT_BATCH_SIZE,
FETCH_SIZE,
AUTO_COMMIT
AUTO_COMMIT,
CONTENT_OUTPUT_STRATEGY
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public class ExecuteSQLRecord extends AbstractExecuteSQL {
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
DBCP_SERVICE,
SQL_PRE_QUERY,
SQL_SELECT_QUERY,
SQL_QUERY,
SQL_POST_QUERY,
QUERY_TIMEOUT,
RECORD_WRITER_FACTORY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
Expand Down Expand Up @@ -117,7 +118,7 @@ public void setup() throws InitializationException {
@Test
public void testIncomingConnectionWithNoFlowFile() {
runner.setIncomingConnection(true);
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM persons");
runner.setProperty(ExecuteSQL.SQL_QUERY, "SELECT * FROM persons");
runner.run();
runner.assertTransferCount(ExecuteSQL.REL_SUCCESS, 0);
runner.assertTransferCount(ExecuteSQL.REL_FAILURE, 0);
Expand Down Expand Up @@ -203,14 +204,79 @@ public void testWithNullIntColumn() throws SQLException {
stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, 1, 1)");

runner.setIncomingConnection(false);
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM TEST_NULL_INT");
runner.setProperty(ExecuteSQL.SQL_QUERY, "SELECT * FROM TEST_NULL_INT");
runner.run();

runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).getFirst().assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "2");
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).getFirst().assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0");
}

@Test
public void testDropTableWithOverwrite() throws SQLException, IOException {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();

// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();

try {
stmt.execute("drop table TEST_DROP_TABLE");
} catch (final SQLException ignored) {
}

stmt.execute("create table TEST_DROP_TABLE (id integer not null, val1 integer, val2 integer)");

stmt.execute("insert into TEST_DROP_TABLE (id, val1, val2) VALUES (0, NULL, 1)");
stmt.execute("insert into TEST_DROP_TABLE (id, val1, val2) VALUES (1, 1, 1)");

runner.setIncomingConnection(true);
runner.setProperty(ExecuteSQL.SQL_QUERY, "DROP TABLE TEST_DROP_TABLE");
runner.enqueue("some data");
runner.run();

runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);

final List<MockFlowFile> flowfiles = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS);
final InputStream in = new ByteArrayInputStream(flowfiles.getFirst().toByteArray());
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {
assertFalse(dataFileReader.hasNext());
}
}

@Test
public void testDropTableNoOverwrite() throws SQLException, IOException {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();

// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();

try {
stmt.execute("drop table TEST_TRUNCATE_TABLE");
} catch (final SQLException ignored) {
}

stmt.execute("create table TEST_TRUNCATE_TABLE (id integer not null, val1 integer, val2 integer)");

stmt.execute("insert into TEST_TRUNCATE_TABLE (id, val1, val2) VALUES (0, NULL, 1)");
stmt.execute("insert into TEST_TRUNCATE_TABLE (id, val1, val2) VALUES (1, 1, 1)");

runner.setIncomingConnection(true);
runner.setProperty(ExecuteSQL.CONTENT_OUTPUT_STRATEGY, AbstractExecuteSQL.ContentOutputStrategy.IGNORED);
runner.setProperty(ExecuteSQL.SQL_QUERY, "TRUNCATE TABLE TEST_TRUNCATE_TABLE");
runner.enqueue("some data");
runner.run();

runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
runner.assertContents(ExecuteSQL.REL_SUCCESS, List.of("some data"));
}

@Test
public void testCompression() throws SQLException, IOException {
// remove previous test database, if any
Expand All @@ -233,7 +299,7 @@ public void testCompression() throws SQLException, IOException {

runner.setIncomingConnection(false);
runner.setProperty(ExecuteSQL.COMPRESSION_FORMAT, AvroUtil.CodecType.BZIP2.name());
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM TEST_NULL_INT");
runner.setProperty(ExecuteSQL.SQL_QUERY, "SELECT * FROM TEST_NULL_INT");
runner.run();

runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
Expand Down Expand Up @@ -269,7 +335,7 @@ public void testWithOutputBatching() throws SQLException {
runner.setIncomingConnection(false);
runner.setProperty(ExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5");
runner.setProperty(ExecuteSQL.OUTPUT_BATCH_SIZE, "5");
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM TEST_NULL_INT");
runner.setProperty(ExecuteSQL.SQL_QUERY, "SELECT * FROM TEST_NULL_INT");
runner.run();

runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 200);
Expand Down Expand Up @@ -311,7 +377,6 @@ public void testWithOutputBatchingAndIncomingFlowFile() throws SQLException {
stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" + i + ", 1, 1)");
}


Map<String, String> attrMap = new HashMap<>();
String testAttrName = "attr1";
String testAttrValue = "value1";
Expand Down Expand Up @@ -342,8 +407,6 @@ public void testWithOutputBatchingAndIncomingFlowFile() throws SQLException {
lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "199");
lastFlowFile.assertAttributeEquals(testAttrName, testAttrValue);
lastFlowFile.assertAttributeEquals(AbstractExecuteSQL.INPUT_FLOWFILE_UUID, inputFlowFile.getAttribute(CoreAttributes.UUID.key()));


}

@Test
Expand Down Expand Up @@ -371,7 +434,7 @@ public void testMaxRowsPerFlowFile() throws SQLException {
runner.setProperty(ExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5");
runner.setProperty(AbstractExecuteSQL.FETCH_SIZE, "5");
runner.setProperty(ExecuteSQL.OUTPUT_BATCH_SIZE, "0");
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM TEST_NULL_INT");
runner.setProperty(ExecuteSQL.SQL_QUERY, "SELECT * FROM TEST_NULL_INT");
runner.run();

runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 200);
Expand Down Expand Up @@ -410,7 +473,7 @@ public void testInsertStatementCreatesFlowFile() throws SQLException {
stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");

runner.setIncomingConnection(false);
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "insert into TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)");
runner.setProperty(ExecuteSQL.SQL_QUERY, "insert into TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)");
runner.run();

runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
Expand All @@ -435,7 +498,7 @@ public void testNoRowsStatementCreatesEmptyFlowFile() throws Exception {
stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");

runner.setIncomingConnection(true);
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
runner.setProperty(ExecuteSQL.SQL_QUERY, "select * from TEST_NULL_INT");
runner.enqueue("Hello".getBytes());
runner.run();

Expand Down Expand Up @@ -481,7 +544,7 @@ public void testWithDuplicateColumns() throws SQLException {
stmt.execute("insert into host2 values(1,'host2')");
stmt.execute("select a.host as hostA,b.host as hostB from host1 a join host2 b on b.id=a.id");
runner.setIncomingConnection(false);
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select a.host as hostA,b.host as hostB from host1 a join host2 b on b.id=a.id");
runner.setProperty(ExecuteSQL.SQL_QUERY, "select a.host as hostA,b.host as hostB from host1 a join host2 b on b.id=a.id");
runner.run();

runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
Expand All @@ -507,7 +570,7 @@ public void testWithSqlException() throws SQLException {

runner.setIncomingConnection(false);
// Try a valid SQL statement that will generate an error (val1 does not exist, e.g.)
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT val1 FROM TEST_NO_ROWS");
runner.setProperty(ExecuteSQL.SQL_QUERY, "SELECT val1 FROM TEST_NO_ROWS");
runner.run();

//No incoming flow file containing a query, and an exception causes no outbound flowfile.
Expand Down Expand Up @@ -584,7 +647,7 @@ public void invokeOnTrigger(final Integer queryTimeout, final String query, fina
}

if (setQueryProperty) {
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, query);
runner.setProperty(ExecuteSQL.SQL_QUERY, query);
}

runner.run();
Expand Down Expand Up @@ -639,7 +702,7 @@ public void testPreQuery() throws Exception {

runner.setIncomingConnection(true);
runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
runner.setProperty(ExecuteSQL.SQL_QUERY, "select * from TEST_NULL_INT");
runner.enqueue("test".getBytes());
runner.run();

Expand Down Expand Up @@ -684,7 +747,7 @@ public void testPostQuery() throws Exception {

runner.setIncomingConnection(true);
runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
runner.setProperty(ExecuteSQL.SQL_QUERY, "select * from TEST_NULL_INT");
runner.setProperty(ExecuteSQL.SQL_POST_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(0);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(0)");
runner.enqueue("test".getBytes());
runner.run();
Expand Down Expand Up @@ -730,7 +793,7 @@ public void testPreQueryFail() throws Exception {
runner.setIncomingConnection(true);
// Simulate failure by not provide parameter
runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
runner.setProperty(ExecuteSQL.SQL_QUERY, "select * from TEST_NULL_INT");
runner.enqueue("test".getBytes());
runner.run();

Expand All @@ -756,7 +819,7 @@ public void testPostQueryFail() throws Exception {

runner.setIncomingConnection(true);
runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
runner.setProperty(ExecuteSQL.SQL_QUERY, "select * from TEST_NULL_INT");
// Simulate failure by not provide parameter
runner.setProperty(ExecuteSQL.SQL_POST_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
runner.enqueue("test".getBytes());
Expand Down
Loading

0 comments on commit 4027d8f

Please sign in to comment.