Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serializing Option fields in a state results in unexpected behaviour during tests #209

Open
lukas-mi opened this issue Feb 1, 2025 · 7 comments

Comments

@lukas-mi
Copy link

lukas-mi commented Feb 1, 2025

I've encountered an odd behavior when testing my Flink application that does stateful processing: the state produced by a stateful function contains Option fields that when are None cannot be asserted correctly by the Scala test suite.

Setup:

  • KeyedProcessFunction with a state of a case class type:
case class MyState(optField: Option[String])

object MyState {
  given typeInfo: TypeInformation[MyState] = deriveTypeInformation
}
  • state is created/updated by applying CDC events on it, which are also Scala case classes
  • state is created using value descriptor new ValueStateDescriptor[MyState]("myState", MyState.typeInfo)
  • test harness is used in tests:
val fun = new MyStatefulFun()
val testHarness = new KeyedOneInputStreamOperatorTestHarness[String, CDCEvent, MyState](
      new KeyedProcessOperator(fun),
      _.eventId,
      Types.STRING
    )
  • the state from the output
val states = testHarness.extractOutputValues().asScala
states.head shouldBe None
  • the previous assertions results in test failure:
None was not equal to None
ScalaTestFailureLocation: my.package.functions.MyStatefulFun at (MyStatefulFun.scala:10)
Expected :None
Actual   :None
  • it works fine when the field is set to Some("test-value")

Using:

  • Scala @ 3.5.2
  • Java @ 11.0.26 (Temurin)
  • Flink @ 1.19.1
  • Flink Scala API @ 1.19.1_1.2.1

What could be the issue?

@lukas-mi lukas-mi changed the title Serializing Option fields in a state results in unexpected behaviour Serializing Option fields in a state results in unexpected behaviour during tests Feb 1, 2025
@novakov-alexey
Copy link
Collaborator

I think this could be familiar issue. CC @arnaud-daroussin

Could you try to disable Kryo doing this in your test?

env.getConfig.disableGenericTypes()

@novakov-alexey
Copy link
Collaborator

@lukas-mi
Copy link
Author

lukas-mi commented Feb 3, 2025

env.getConfig.disableGenericTypes() didn't help, however, passing the serializer to the test harness setup did the job 👍🏼

@novakov-alexey
Copy link
Collaborator

@lukas-mi ok, great. Does Flink fail with some Kryo error when disableGenericTypes() is on?

@lukas-mi
Copy link
Author

lukas-mi commented Feb 3, 2025

It did not make a difference, assertion still failed with: None was not equal to None. And there was no Kryo error

@novakov-alexey
Copy link
Collaborator

I see. Interesting, thanks for checking.

@arnaud-daroussin do you think that using disableGenericTypes() in this case would fail with some error related to Kryo fallback is disabled? As we discussed earlier, None was not equal to None may happen when old Scala API is in CLASSPATH and Flink can fallback to Kryo silently. It seems this is not the case here. What do you think?

@arnaud-daroussin
Copy link
Contributor

Hi,
Sorry for late reply, I was on leave last week.

I think disableGenericTypes() should allow to fail-fast with an explicit UnsupportedOperationException: Generic types have been disabled...
When I test to disable generic types on the CustomTriggerTests you pointed out, it is failing as expected. For example, if I replace line 73 by:

testHarness.getEnvironment.getExecutionConfig.disableGenericTypes()
testHarness.setup()

And when I test:

testHarness.getEnvironment.getExecutionConfig.disableGenericTypes()
testHarness.setup(serializer)

It is working, meaning no Kryo fallback occurred.

@lukas-mi, can you share with us your full test? To be sure we are not missing something.

Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants