Skip to content

Commit

Permalink
NIFI-14157 Allow InvokeScriptedProcessor scripts to implement OnPrima…
Browse files Browse the repository at this point in the history
…ryNodeStateChange (#9632)

Signed-off-by: David Handermann <[email protected]>
  • Loading branch information
mattyb149 authored Jan 24, 2025
1 parent 1c11ce9 commit 584323b
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.ValidationContext;
Expand Down Expand Up @@ -212,7 +214,6 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String
public void setup(final ProcessContext context) {
scriptingComponentHelper.setupVariables(context);
setup();

invokeScriptedProcessorMethod("onScheduled", context);
}

Expand All @@ -232,6 +233,12 @@ public void setup() {
}
}

@OnPrimaryNodeStateChange
public void onPrimaryNodeStateChange(final PrimaryNodeState newState) {

invokeScriptedProcessorMethod("onPrimaryNodeStateChange", newState);
}

/**
* Handles changes to this processor's properties. If changes are made to
* script- or engine-related properties, the script will be reloaded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,29 @@ public void testReadRecordsWithRecordPath() throws Exception {
ff.assertContentEquals("48\n47\n14\n");
}

/**
* Tests a script that has a Groovy Processor that implements its own onPrimaryNodeStateChange
*/
@Test
public void testOnPrimaryNodeStateChange() {
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy");
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/groovy/test_OnPrimaryStateChange.groovy");
runner.setProperty(ScriptingComponentUtils.MODULES, "target/test/resources/groovy");
InvokeScriptedProcessor invokeScriptedProcessor = ((InvokeScriptedProcessor) scriptingComponent);
invokeScriptedProcessor.setup(runner.getProcessContext());
runner.setIsConfiguredForClustering(true);
runner.run(1, false, true);
runner.setPrimaryNode(true);
runner.clearTransferState();
runner.run(1, true, false);
runner.assertAllFlowFilesTransferred("success");
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship("success");
assertNotNull(flowFiles);
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertAttributeEquals("isPrimaryNode", "true");
}

private static class OverrideInvokeScriptedProcessor extends InvokeScriptedProcessor {

private int numTimesModifiedCalled = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange
import org.apache.nifi.annotation.notification.PrimaryNodeState
import org.apache.nifi.processor.AbstractProcessor
import org.apache.nifi.processor.ProcessContext
import org.apache.nifi.processor.ProcessSession
import org.apache.nifi.processor.Relationship


class MyRecordProcessor extends AbstractProcessor {

def REL_SUCCESS = new Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build()
def REL_FAILURE = new Relationship.Builder().name("failure").description('FlowFiles are routed here if an error occurs during processing').build()

static boolean primaryNode = false

@OnPrimaryNodeStateChange
void onPrimaryNodeStateChange(final PrimaryNodeState newState) {
primaryNode = true
}

@Override
Set<Relationship> getRelationships() {
[REL_SUCCESS, REL_FAILURE] as Set<Relationship>
}

@Override
void onTrigger(ProcessContext context, ProcessSession session) {
def flowFile = session.create()
session.putAttribute(flowFile, 'isPrimaryNode', primaryNode.toString())
session.transfer(flowFile, REL_SUCCESS)
}
}

processor = new MyRecordProcessor()
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
Expand Down Expand Up @@ -979,6 +981,14 @@ public void setIsConfiguredForClustering(final boolean isConfiguredForClustering

@Override
public void setPrimaryNode(boolean primaryNode) {
if (context.isPrimary() != primaryNode) {
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, processor,
primaryNode ? PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED);
} catch (final Exception e) {
Assertions.fail("Could not invoke methods annotated with @OnPrimaryNodeStateChange annotation due to: " + e);
}
}
context.setPrimaryNode(primaryNode);
}

Expand Down

0 comments on commit 584323b

Please sign in to comment.