Skip to content

Commit

Permalink
OOZIE-3717 When fork actions parallel submit, becasue ForkedActionSta…
Browse files Browse the repository at this point in the history
…rtXCommand and ActionStartXCommand has the same name, so ForkedActionStartXCommand would be lost, and cause deadlock (chenhd via dionusos)
  • Loading branch information
dionusos committed Aug 7, 2023
1 parent 6039007 commit 3c614c7
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.UUIDService;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.ELEvaluationException;
import org.apache.oozie.util.Instrumentation;
import org.apache.oozie.util.JobUtils;
Expand Down Expand Up @@ -96,6 +95,19 @@ public ActionStartXCommand(WorkflowJobBean job, String actionId, String type) {
this.jobId = wfJob.getId();
}

public ActionStartXCommand(String actionId, String type, String name) {
super(name, type, 0);
this.actionId = actionId;
this.jobId = Services.get().get(UUIDService.class).getId(actionId);
}

public ActionStartXCommand(WorkflowJobBean job, String actionId, String type, String name) {
super(name, type, 0);
this.actionId = actionId;
this.wfJob = job;
this.jobId = wfJob.getId();
}

@Override
protected void setLogInfo() {
LogUtils.setLogInfo(actionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,17 @@
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.XCommand;
import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;

public class ForkedActionStartXCommand extends ActionStartXCommand {

private final static String FORKED_ACTION_START_NAME = "action.forkedstart";

public ForkedActionStartXCommand(String actionId, String type) {
super(actionId, type);
super(actionId, type, FORKED_ACTION_START_NAME);
}

public ForkedActionStartXCommand(WorkflowJobBean wfJob, String id, String type) {
super(wfJob, id, type);
super(wfJob, id, type, FORKED_ACTION_START_NAME);
}

protected ActionExecutorContext execute() throws CommandException {
Expand Down
160 changes: 157 additions & 3 deletions core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,8 @@
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobsGetForPurgeJPAExecutor;
import org.apache.oozie.local.LocalOozie;
import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.ConfigurationService;
Expand All @@ -62,8 +60,15 @@
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.workflow.lite.LiteWorkflowAppParser;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XCallable;
import org.apache.oozie.service.RecoveryService;


import javax.persistence.EntityManager;

public class TestSignalXCommand extends XDataTestCase {

Expand Down Expand Up @@ -554,4 +559,153 @@ public boolean evaluate() throws Exception {
}
}
}

/**
* for test {@link #testDeadlockForForkParallelSubmit()}
*/
public static class TestRecoverForkStartActionCallableQueueService extends CallableQueueService{
private final XLog log = XLog.getLog(getClass());
public static TestSignalXCommand testSignalXCommand;

/**
* Overwrite for test the same action's ActionStartXCommand in queue and the ForkedActionStartXCommand wouldn't lose,
* if ActionStartXCommand and ForkedActionStartXCommand has the same name, ForkedActionStartXCommand couldn't enqueue
* after a ActionStartXCommand in queue waiting for running.
*/
public class CallableWrapper<E> extends CallableQueueService.CallableWrapper<E> {

boolean forkedActionStartXCommandFirstEnter;
public CallableWrapper(XCallable<E> callable, long delay) {
super(callable,delay);
forkedActionStartXCommandFirstEnter = callable instanceof ForkedActionStartXCommand;
}


public void run() {
XCallable<?> callable = getElement();
if (forkedActionStartXCommandFirstEnter && callable instanceof ForkedActionStartXCommand){
// make sure there has a ActionStartXCommand in the queue wait to run,
// and then ForkedActionStartXCommand enqueue
testSignalXCommand.waitFor(15 * 1000, new Predicate() {
@Override
public boolean evaluate() throws Exception {
return !filterDuplicates();
}
},200);

log.warn("max concurrency for callable [{0}] exceeded, enqueueing with [{1}]ms delay", callable
.getType(), CONCURRENCY_DELAY);
setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS);

try {
Method queue = CallableQueueService.class.getDeclaredMethod("queue",
CallableQueueService.CallableWrapper.class, boolean.class);
queue.setAccessible(true);
queue.invoke(TestRecoverForkStartActionCallableQueueService.this,this,true);
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
throw new RuntimeException(e);
}
forkedActionStartXCommandFirstEnter = false;
}else {
super.run();
}
}
}

/**
* Replace CallableQueueService.CallableWrapper to TestRecoverForkStartActionCallableQueueService.CallableWrapper
* for text, in order to call TestRecoverForkStartActionCallableQueueService.CallableWrapper for wait
* ActionStartXCommand enqueue before.
*
*/
public <T> Future<T> submit(CallableQueueService.CallableWrapper<T> task) throws InterruptedException {
return super.submit(new TestRecoverForkStartActionCallableQueueService.CallableWrapper<T>(task.getElement(),
task.getInitialDelay()));
}
}


/**
* Test : fork parallel submit, the action has the same XCommand in queue, there will skip enqueue by
* {@link CallableQueueService.CallableWrapper#filterDuplicates()} so if the ActionStartXCommand and
* ForkedActionStartXCommand has the same name, it would be lost.
*
* Note : RecoveryService will check the pending action and try to start it. So if the action's ForkedActionStartXCommand
* wait for run, there may be a ActionStartXCommand add for the same action.
*
*/
public void testDeadlockForForkParallelSubmit() throws Exception {
setSystemProperty(Services.CONF_SERVICE_EXT_CLASSES, TestRecoverForkStartActionCallableQueueService.class.getName());
TestRecoverForkStartActionCallableQueueService.testSignalXCommand = this;

services = new Services();
Configuration servicesConf = services.getConf();
servicesConf.setInt(RecoveryService.CONF_WF_ACTIONS_OLDER_THAN, 0);
servicesConf.setInt(RecoveryService.CONF_SERVICE_INTERVAL, 10);
services.init();

ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, true);

Configuration conf = new XConfiguration();
String workflowUri = getTestCaseFileUri("workflow.xml");
//@formatter:off
String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:1.0\" name=\"wf-fork\">"
+ "<start to=\"fork1\"/>"
+ "<fork name=\"fork1\">"
+ "<path start=\"action1\"/>"
+ "<path start=\"action2\"/>"
+ "<path start=\"action3\"/>"
+ "<path start=\"action4\"/>"
+ "<path start=\"action5\"/>"
+ "</fork>"
+ "<action name=\"action1\">"
+ "<fs></fs>"
+ "<ok to=\"join1\"/>"
+ "<error to=\"kill\"/>"
+ "</action>"
+ "<action name=\"action2\">"
+ "<fs></fs><ok to=\"join1\"/>"
+ "<error to=\"kill\"/>"
+ "</action>"
+ "<action name=\"action3\">"
+ "<fs></fs><ok to=\"join1\"/>"
+ "<error to=\"kill\"/>"
+ "</action>"
+ "<action name=\"action4\">"
+ "<fs></fs><ok to=\"join1\"/>"
+ "<error to=\"kill\"/>"
+ "</action>"
+ "<action name=\"action5\">"
+ "<fs></fs><ok to=\"join1\"/>"
+ "<error to=\"kill\"/>"
+ "</action>"
+ "<join name=\"join1\" to=\"end\"/>"
+ "<kill name=\"kill\"><message>killed</message>"
+ "</kill><"
+ "end name=\"end\"/>"
+ "</workflow-app>";
//@Formatter:on

writeToFile(appXml, workflowUri);
conf.set(OozieClient.APP_PATH, workflowUri);
conf.set(OozieClient.USER_NAME, getTestUser());

SubmitXCommand sc = new SubmitXCommand(conf);
final String jobId = sc.call();
new StartXCommand(jobId).call();

waitFor(30 * 1000, new Predicate() {
@Override
public boolean evaluate() throws Exception {
return WorkflowJobQueryExecutor.getInstance()
.get(WorkflowJobQueryExecutor.WorkflowJobQuery.GET_WORKFLOW, jobId)
.getStatus() == WorkflowJob.Status.SUCCEEDED;
}
});

assertEquals(WorkflowJobQueryExecutor.getInstance()
.get(WorkflowJobQueryExecutor.WorkflowJobQuery.GET_WORKFLOW, jobId)
.getStatus(),
WorkflowJob.Status.SUCCEEDED);
}
}
1 change: 1 addition & 0 deletions release-log.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
-- Oozie 5.3.0 release (trunk - unreleased)

OOZIE-3717 When fork actions parallel submit, becasue ForkedActionStartXCommand and ActionStartXCommand has the same name, so ForkedActionStartXCommand would be lost, and cause deadlock (chenhd via dionusos)
OOZIE-3715 Fix fork out more than one transitions submit , one transition submit fail can't execute KillXCommand (chenhd via dionusos)
OOZIE-3716 Invocation of Main class completed Message is skipped when LauncherSecurityManager calls system exit (khr9603 via dionusos)
OOZIE-3695 [sharelib-hive2] Fix current SpotBugs discovered issues in Oozie's sharelib-hive2 module (jmakai via dionusos)
Expand Down

0 comments on commit 3c614c7

Please sign in to comment.