Skip to content

Commit

Permalink
FlinkProcessTestRunnerSpec: reusing test runner
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadius committed Jan 20, 2025
1 parent 68eb84c commit b14fbd2
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package pl.touk.nussknacker.engine.management.testsmechanism
import io.circe.Json
import org.apache.flink.configuration.Configuration
import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.StreamMetaData
import pl.touk.nussknacker.engine.api.test.ScenarioTestData
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.testmode.TestProcess.TestResults
import pl.touk.nussknacker.engine.util.ReflectiveMethodInvoker
import pl.touk.nussknacker.engine.util.{MetaDataExtractor, ReflectiveMethodInvoker}

import scala.concurrent.{ExecutionContext, Future}

Expand Down Expand Up @@ -39,10 +40,28 @@ class FlinkProcessTestRunner(modelData: ModelData, parallelism: Int, streamExecu
miniCluster,
streamExecutionEnvironment,
modelData,
canonicalProcess,
rewriteParallelismIfHigherThanMaxParallelism(canonicalProcess),
scenarioTestData
)

private def rewriteParallelismIfHigherThanMaxParallelism(canonicalProcess: CanonicalProcess): CanonicalProcess = {
val scenarioParallelism = MetaDataExtractor
.extractTypeSpecificDataOrDefault[StreamMetaData](canonicalProcess.metaData, StreamMetaData())
.parallelism
.getOrElse(1)
if (scenarioParallelism > parallelism) {
canonicalProcess.copy(metaData =
canonicalProcess.metaData.copy(additionalFields =
canonicalProcess.metaData.additionalFields.copy(properties =
canonicalProcess.metaData.additionalFields.properties + (StreamMetaData.parallelismName -> parallelism.toString)
)
)
)
} else {
canonicalProcess
}
}

def close(): Unit = {
miniCluster.close()
streamExecutionEnvironment.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import java.util.{Date, UUID}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.Using

class FlinkProcessTestRunnerSpec
extends AnyWordSpec
Expand Down Expand Up @@ -78,6 +79,7 @@ class FlinkProcessTestRunnerSpec
}

private def runTests(useIOMonadInInterpreter: Boolean): Unit = {
val testRunner = prepareTestRunner(useIOMonadInInterpreter)
"be able to return test results" in {
val process =
ScenarioBuilder
Expand All @@ -92,15 +94,14 @@ class FlinkProcessTestRunnerSpec
val input = SimpleRecord("0", 1, "2", new Date(3), Some(4), 5, "6")
val input2 = SimpleRecord("0", 11, "2", new Date(3), Some(4), 5, "6")

val results = runFlinkTest(
val results = testRunner.runTests(
process,
ScenarioTestData(
List(
ScenarioTestJsonRecord(sourceNodeId, Json.fromString("0|1|2|3|4|5|6")),
ScenarioTestJsonRecord(sourceNodeId, Json.fromString("0|11|2|3|4|5|6"))
)
),
useIOMonadInInterpreter
)
)

val nodeResults = results.nodeResults
Expand Down Expand Up @@ -150,10 +151,9 @@ class FlinkProcessTestRunnerSpec
.source(sourceNodeId, "input")
.split("splitId1", GraphBuilder.emptySink("out1", "monitor"), GraphBuilder.emptySink("out2", "monitor"))

val results = runFlinkTest(
val results = testRunner.runTests(
process,
ScenarioTestData(List(createTestRecord(), createTestRecord(value1 = 11))),
useIOMonadInInterpreter
)

results.nodeResults("splitId1") shouldBe List(
Expand Down Expand Up @@ -184,10 +184,9 @@ class FlinkProcessTestRunnerSpec
val aggregate = SimpleRecordWithPreviousValue(input, 0, "s")
val aggregate2 = SimpleRecordWithPreviousValue(input2, 1, "s")

val results = runFlinkTest(
val results = testRunner.runTests(
process,
ScenarioTestData(List(createTestRecord(), createTestRecord(value1 = 11))),
useIOMonadInInterpreter
)

val nodeResults = results.nodeResults
Expand Down Expand Up @@ -238,10 +237,9 @@ class FlinkProcessTestRunnerSpec
.emptySink("out", "monitor")

val results =
runFlinkTest(
testRunner.runTests(
process,
ScenarioTestData(createTestRecord() :: List.fill(4)(createTestRecord(value1 = 11))),
useIOMonadInInterpreter
)

val nodeResults = results.nodeResults
Expand All @@ -258,7 +256,7 @@ class FlinkProcessTestRunnerSpec
.filter("filter", "1 / #input.value1 >= 0".spel)
.emptySink("out", "monitor")

val results = runFlinkTest(
val results = testRunner.runTests(
process,
ScenarioTestData(
List(
Expand All @@ -268,7 +266,6 @@ class FlinkProcessTestRunnerSpec
createTestRecord(id = "3", value1 = 4)
)
),
useIOMonadInInterpreter
)

val nodeResults = results.nodeResults
Expand Down Expand Up @@ -315,7 +312,7 @@ class FlinkProcessTestRunnerSpec
.emptySink("out", "monitor")

val exceptionConsumerId = UUID.randomUUID().toString
val results = runFlinkTest(
val results = runTestsWithCustomModel(
process = process,
scenarioTestData = ScenarioTestData(
List(
Expand All @@ -326,7 +323,7 @@ class FlinkProcessTestRunnerSpec
)
),
useIOMonadInInterpreter,
enrichDefaultConfig = RecordingExceptionConsumerProvider.configWithProvider(_, exceptionConsumerId)
enrichDefaultConfig = RecordingExceptionConsumerProvider.configWithProvider(_, exceptionConsumerId),
)

val nodeResults = results.nodeResults
Expand All @@ -347,7 +344,7 @@ class FlinkProcessTestRunnerSpec
.emptySink("out", "monitor")

val run = Future {
runFlinkTest(process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))), useIOMonadInInterpreter)
testRunner.runTests(process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))))
}

intercept[JobExecutionException](Await.result(run, 10 seconds))
Expand Down Expand Up @@ -376,7 +373,7 @@ class FlinkProcessTestRunnerSpec
)
)

val results = runFlinkTest(process, testData, useIOMonadInInterpreter)
val results = testRunner.runTests(process, testData)

results.nodeResults(sourceNodeId) should have size 3
results.externalInvocationResults("out") shouldBe
Expand Down Expand Up @@ -406,7 +403,7 @@ class FlinkProcessTestRunnerSpec
.emptySink("out", "valueMonitor", "Value" -> "#additionalOne + '|' + #additionalTwo".spel)
val testData = ScenarioTestData(List(ScenarioTestJsonRecord(sourceNodeId, Json.fromString("abc"))))

val results = runFlinkTest(process, testData, useIOMonadInInterpreter)
val results = testRunner.runTests(process, testData)

results.nodeResults(sourceNodeId) should have size 1
results.externalInvocationResults("out") shouldBe
Expand All @@ -427,7 +424,7 @@ class FlinkProcessTestRunnerSpec
.emptySink("out", "sinkForInts", "Value" -> "15 / {0, 1}[0]".spel)

val results =
runFlinkTest(process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))), useIOMonadInInterpreter)
testRunner.runTests(process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))))

results.exceptions should have length 1
results.exceptions.head.nodeId shouldBe Some("out")
Expand All @@ -447,7 +444,7 @@ class FlinkProcessTestRunnerSpec
def recordWithSeconds(duration: FiniteDuration) =
ScenarioTestJsonRecord(sourceNodeId, Json.fromString(s"0|0|0|${duration.toMillis}|0|0|0"))

val results = runFlinkTest(
val results = testRunner.runTests(
process,
ScenarioTestData(
List(
Expand All @@ -457,8 +454,7 @@ class FlinkProcessTestRunnerSpec
recordWithSeconds(9 second),
recordWithSeconds(20 second)
)
),
useIOMonadInInterpreter
)
)

val nodeResults = results.nodeResults
Expand All @@ -478,15 +474,14 @@ class FlinkProcessTestRunnerSpec
)
.emptySink("out", "valueMonitor", "Value" -> "#input.field1 + #input.field2".spel)

val results = runFlinkTest(
val results = testRunner.runTests(
process,
ScenarioTestData(
ScenarioTestJsonRecord(
sourceNodeId,
Json.obj("field1" -> Json.fromString("abc"), "field2" -> Json.fromString("def"))
) :: Nil
),
useIOMonadInInterpreter
)
)

results.invocationResults("out").map(_.value) shouldBe List(variable("abcdef"))
Expand All @@ -510,7 +505,7 @@ class FlinkProcessTestRunnerSpec
.emptySink("out", "valueMonitor", "Value" -> "#parsed.size + ' ' + #parsed[0].field2".spel)

val results =
runFlinkTest(process, ScenarioTestData(List(createTestRecord(value1 = valueToReturn))), useIOMonadInInterpreter)
testRunner.runTests(process, ScenarioTestData(List(createTestRecord(value1 = valueToReturn))))

results.invocationResults("out").map(_.value) shouldBe List(variable(s"$countToPass $valueToReturn"))
}
Expand All @@ -533,7 +528,7 @@ class FlinkProcessTestRunnerSpec
val recordTrue = createTestRecord(id = "ala")
val recordFalse = createTestRecord(id = "bela")

val results = runFlinkTest(process, ScenarioTestData(List(recordTrue, recordFalse)), useIOMonadInInterpreter)
val results = testRunner.runTests(process, ScenarioTestData(List(recordTrue, recordFalse)))

val invocationResults = results.invocationResults

Expand Down Expand Up @@ -571,7 +566,7 @@ class FlinkProcessTestRunnerSpec
val recB = createTestRecord(id = "b")
val recC = createTestRecord(id = "c")

val results = runFlinkTest(process, ScenarioTestData(List(recA, recB, recC)), useIOMonadInInterpreter)
val results = testRunner.runTests(process, ScenarioTestData(List(recA, recB, recC)))

results.invocationResults("proc2").map(_.contextId) should contain only (
s"$scenarioName-$sourceNodeId-$firstSubtaskIndex-1-end1",
Expand All @@ -580,7 +575,9 @@ class FlinkProcessTestRunnerSpec
s"$scenarioName-$sourceNodeId-$firstSubtaskIndex-2-end2"
)

results.externalInvocationResults("proc2").map(_.value.asInstanceOf[Json]) should contain theSameElementsAs List(
results
.externalInvocationResults("proc2")
.map(_.value.asInstanceOf[Json]) should contain theSameElementsAs List(
"b",
"a",
"c",
Expand Down Expand Up @@ -626,7 +623,7 @@ class FlinkProcessTestRunnerSpec
val recordC = recordA.copy(id = "c")
val recordD = recordA.copy(id = "d")

val results = runFlinkTest(process, scenarioTestData, useIOMonadInInterpreter)
val results = testRunner.runTests(process, scenarioTestData)

val nodeResults = results.nodeResults
nodeResults("source1") shouldBe List(
Expand Down Expand Up @@ -673,7 +670,7 @@ class FlinkProcessTestRunnerSpec
.emptySink("out", "valueMonitor", "Value" -> "{#componentUseCaseService, #componentUseCaseCustomNode}".spel)

val results =
runFlinkTest(process, ScenarioTestData(List(createTestRecord(sourceId = "start"))), useIOMonadInInterpreter)
testRunner.runTests(process, ScenarioTestData(List(createTestRecord(sourceId = "start"))))

results.invocationResults("out").map(_.value) shouldBe List(
variable(List(ComponentUseCase.TestRuntime, ComponentUseCase.TestRuntime))
Expand All @@ -694,7 +691,7 @@ class FlinkProcessTestRunnerSpec
.emptySink("out", "valueMonitor", "Value" -> "#input.value1".spel)

val run = Future {
runFlinkTest(process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))), useIOMonadInInterpreter)
testRunner.runTests(process, ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))))
}
val dictEditorException = intercept[IllegalStateException](Await.result(run, 10 seconds))
dictEditorException.getMessage shouldBe "DictKeyWithLabel expression can only be used with DictParameterEditor, got Some(DualParameterEditor(StringParameterEditor,RAW))"
Expand All @@ -715,7 +712,7 @@ class FlinkProcessTestRunnerSpec
)
.emptySink("out", "valueMonitor", "Value" -> "#input.value1".spel)

val results = runFlinkTest(
val results = runTestsWithCustomModel(
process,
ScenarioTestData(List(createTestRecord(id = "2", value1 = 2))),
useIOMonadInInterpreter,
Expand Down Expand Up @@ -745,10 +742,9 @@ class FlinkProcessTestRunnerSpec

val resolved = FragmentResolver(List(processWithFragmentParameterValidation)).resolve(scenario)

val results = runFlinkTest(
val results = testRunner.runTests(
resolved.valueOr { _ => throw new IllegalArgumentException("Won't happen") },
ScenarioTestData(List(ScenarioTestJsonRecord(sourceNodeId, Json.fromString("0|1|2|3|4|5|6")))),
useIOMonadInInterpreter
)
results.exceptions.length shouldBe 0
}
Expand All @@ -761,7 +757,7 @@ class FlinkProcessTestRunnerSpec
): ScenarioTestJsonRecord =
ScenarioTestJsonRecord(sourceId, Json.fromString(s"$id|$value1|2|3|4|5|6"))

private def runFlinkTest(
private def runTestsWithCustomModel(
process: CanonicalProcess,
scenarioTestData: ScenarioTestData,
useIOMonadInInterpreter: Boolean,
Expand All @@ -786,6 +782,21 @@ class FlinkProcessTestRunnerSpec
.runTests(process, scenarioTestData)
}

private def prepareTestRunner(useIOMonadInInterpreter: Boolean) = {
val config = ConfigFactory
.load("application.conf")
.withValue("globalParameters.useIOMonadInInterpreter", ConfigValueFactory.fromAnyRef(useIOMonadInInterpreter))

// We need to set context loader to avoid forking in sbt
val modelData = ModelData.duringExecution(
ModelConfigs(config, AdditionalModelConfigs(Map.empty)),
ModelClassLoader(getClass.getClassLoader, FlinkTestConfiguration.classpathWorkaround),
resolveConfigs = false
)

new FlinkProcessTestRunner(modelData, parallelism = 1, FlinkTestConfiguration.setupMemory(new Configuration))
}

private def nodeResult(count: Int, vars: (String, Any)*): ResultContext[_] =
nodeResult(count, sourceNodeId, vars: _*)

Expand Down

0 comments on commit b14fbd2

Please sign in to comment.