diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java index cd153b55c6..6a3c11956b 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java @@ -98,14 +98,14 @@ public static void run( LoggingContextHolder.INSTANCE.setConfig(jobModel.getConfig()); DiagnosticsUtil.writeMetadataFile(jobName, jobId, containerId, executionEnvContainerId, config); - run(appDesc, jobName, jobId, containerId, executionEnvContainerId, samzaEpochId, jobModel, config, + int exitCode = run(appDesc, jobName, jobId, containerId, executionEnvContainerId, samzaEpochId, jobModel, config, buildExternalContext(config)); - exitProcess(0); + exitProcess(exitCode); } @VisibleForTesting - static void run( + static int run( ApplicationDescriptorImpl appDesc, String jobName, String jobId, @@ -208,13 +208,7 @@ static void run( exitCode = 1; } finally { coordinatorStreamStore.close(); - /* - * Only exit in the scenario of non-zero exit code in order to maintain parity with current implementation where - * the method completes when no errors are encountered. - */ - if (exitCode != 0) { - exitProcess(exitCode); - } + return exitCode; } } diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java b/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java index ec579918c3..51e0175abb 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestContainerLaunchUtil.java @@ -61,8 +61,30 @@ public void testRunWithException() throws Exception { .when(ContainerLaunchUtil.class, "run", eq(APP_DESC), eq(JOB_NAME), eq(JOB_ID), eq(CONTAINER_ID), any(), any(), eq(JOB_MODEL), eq(CONFIG), any()); - ContainerLaunchUtil.run(APP_DESC, JOB_NAME, JOB_ID, CONTAINER_ID, Optional.empty(), Optional.empty(), JOB_MODEL, + int exitCode = ContainerLaunchUtil.run(APP_DESC, JOB_NAME, JOB_ID, CONTAINER_ID, Optional.empty(), Optional.empty(), JOB_MODEL, CONFIG, Optional.empty()); + assertEquals(1, exitCode); + } + + @Test + public void testRunSuccessfully() throws Exception { + int exitCode = 0; + final CountDownLatch completionLatch = new CountDownLatch(1); + PowerMockito.mockStatic(ContainerLaunchUtil.class); + PowerMockito.doReturn(mock(CoordinatorStreamStore.class)) + .when(ContainerLaunchUtil.class, "buildCoordinatorStreamStore", eq(CONFIG), any()); + PowerMockito.doAnswer(invocation -> { + completionLatch.countDown(); + return null; + }).when(ContainerLaunchUtil.class, "exitProcess", eq(exitCode)); + PowerMockito.doReturn(exitCode) + .when(ContainerLaunchUtil.class, "run", eq(APP_DESC), eq(JOB_NAME), eq(JOB_ID), eq(CONTAINER_ID), any(), any(), + eq(JOB_MODEL), eq(CONFIG), any()); + PowerMockito.doCallRealMethod() + .when(ContainerLaunchUtil.class, "run", eq(APP_DESC), eq(JOB_NAME), eq(JOB_ID), eq(CONTAINER_ID), any(), any(), + eq(JOB_MODEL)); + + ContainerLaunchUtil.run(APP_DESC, JOB_NAME, JOB_ID, CONTAINER_ID, Optional.empty(), Optional.empty(), JOB_MODEL); assertTrue(completionLatch.await(1, TimeUnit.SECONDS)); } }