You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
i'm using the valve component with akka-stream kafka to be able to pause and resume our kafka consumers.
I discovered an issue in the current implementation when trying to flip a valve on a finished stream (either completed or in error). In both case, the Future returned by flip will never complete because the callback in the ValveSwitch will never be invoked.
Here are 2 tests that will show the issue :
"a valve on an error'd graph" should {
"return false when trying to flip it" in {
val ((sourceProbe, switch), sinkProbe) = TestSource.probe[Int]
.viaMat(Valve(Open))(Keep.both)
.toMat(TestSink.probe[Int])(Keep.both)
.run()
val error = new RuntimeException("Boom !")
sourceProbe.sendError(error)
sinkProbe.request(1)
.expectError(error)
whenReady(switch.flip(Close)) {
_ shouldBe false
}
}
}
"a valve on a completed graph" should {
"return false when trying to flip it" in {
val ((sourceProbe, switch), sinkProbe) = TestSource.probe[Int]
.viaMat(Valve(Close))(Keep.both)
.toMat(TestSink.probe[Int])(Keep.both)
.run()
sourceProbe.sendComplete()
sinkProbe.expectSubscription()
sinkProbe.expectComplete()
whenReady(switch.flip(Open)) {
_ shouldBe false
}
}
}
Hello,
i'm using the valve component with akka-stream kafka to be able to pause and resume our kafka consumers.
I discovered an issue in the current implementation when trying to flip a valve on a finished stream (either completed or in error). In both case, the Future returned by
flip
will never complete because thecallback
in theValveSwitch
will never be invoked.Here are 2 tests that will show the issue :
The Valve is also affected by akka/akka#20503
Here is a gist that fixes the issue for the complete but not the error one : https://gist.github.com/fchaillou/fa34d1bd46d9c0f4db7c54fc8f535fe3
The text was updated successfully, but these errors were encountered: