diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java index 4ae4886d38..a96d137506 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; +import joptsimple.internal.Strings; import org.apache.commons.lang3.StringUtils; import org.apache.samza.SamzaException; import org.apache.samza.config.ApplicationConfig; @@ -167,7 +168,23 @@ JobConfig generateJobConfig(JobNode jobNode, String executionPlanJson) { LOG.info("Job {} has generated configs {}", jobNode.getJobNameAndId(), generatedConfig); - return new JobConfig(mergeConfig(originalConfig, generatedConfig)); + // Merge generated task.inputs and original task.inputs together as new task.inputs + Map newConfigs = mergeInputs(originalConfig, inputs); + + return new JobConfig(mergeConfig(newConfigs, generatedConfig)); + } + + private Map mergeInputs(Config originalConfig, Set inputs) { + Map mergedConfigs = new HashMap<>(originalConfig); + if (inputs.isEmpty()) { + return mergedConfigs; + } + String originalInputs = originalConfig.get(TaskConfig.INPUT_STREAMS); + if (!Strings.isNullOrEmpty(originalInputs)) { + inputs.add(originalInputs); + } + mergedConfigs.put(TaskConfig.INPUT_STREAMS, Joiner.on(',').join(inputs)); + return mergedConfigs; } private Map getReachableTables(Collection reachableOperators, JobNode jobNode) { diff --git a/samza-core/src/test/java/org/apache/samza/execution/ExecutionPlannerTestBase.java b/samza-core/src/test/java/org/apache/samza/execution/ExecutionPlannerTestBase.java index d238855557..f614da10b8 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/ExecutionPlannerTestBase.java +++ b/samza-core/src/test/java/org/apache/samza/execution/ExecutionPlannerTestBase.java @@ -101,6 +101,10 @@ String getJobNameAndId() { } void configureJobNode(ApplicationDescriptorImpl mockStreamAppDesc) { + configureJobNode(mockStreamAppDesc, mockConfig); + } + + void configureJobNode(ApplicationDescriptorImpl mockStreamAppDesc, Config mockConfig) { JobGraph jobGraph = new ExecutionPlanner(mockConfig, mock(StreamManager.class)) .createJobGraph(mockStreamAppDesc); mockJobNode = spy(jobGraph.getJobNodes().get(0)); diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java index 2d80e79f3e..1d42fcfef2 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java @@ -21,6 +21,7 @@ import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; import java.util.Arrays; +import joptsimple.internal.Strings; import org.apache.samza.SamzaException; import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl; import org.apache.samza.application.descriptors.TaskApplicationDescriptorImpl; @@ -121,6 +122,23 @@ public void testGenerateJobConfigWithTaskApplication() { validateStreamConfigures(jobConfig, deserializedSerdes); } + @Test + public void testGenerateJobConfigWithTaskApplicationWhenSpecifiedTaskInputsInConfig() { + Map configs = new HashMap<>(mockConfig); + // specify task.inputs in config explicitly + configs.put(TaskConfig.INPUT_STREAMS, "kafka.topic1,kafka.topic2"); + MapConfig originConfig = new MapConfig(configs); + TaskApplicationDescriptorImpl taskAppDesc = new TaskApplicationDescriptorImpl(getTaskApplication(), originConfig); + configureJobNode(taskAppDesc, originConfig); + // create the JobGraphConfigureGenerator and generate the jobConfig for the jobNode + JobNodeConfigurationGenerator configureGenerator = new JobNodeConfigurationGenerator(); + JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, "testJobGraphJson"); + + // Verify the results + Config expectedJobConfig = getExpectedJobConfig(originConfig, mockJobNode.getInEdges()); + validateJobConfig(expectedJobConfig, jobConfig); + } + @Test public void testGenerateJobConfigWithLegacyTaskApplication() { TaskApplicationDescriptorImpl taskAppDesc = new TaskApplicationDescriptorImpl(getLegacyTaskApplication(), mockConfig); @@ -274,6 +292,10 @@ private Config getExpectedJobConfig(Config originConfig, Map inputs.add(inputEdge.getName()); } } + String originalInputs = originConfig.get(TaskConfig.INPUT_STREAMS); + if (!Strings.isNullOrEmpty(originalInputs)) { + inputs.add(originalInputs); + } if (!inputs.isEmpty()) { configMap.put(TaskConfig.INPUT_STREAMS, Joiner.on(',').join(inputs)); }