Skip to content

Commit

Permalink
Use Channel for stubbed Effect Flow. Also made all Flows cancellable
Browse files Browse the repository at this point in the history
  • Loading branch information
floschu committed Aug 31, 2020
1 parent fb7c2d8 commit a4d5694
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
Expand Down Expand Up @@ -90,10 +89,3 @@ fun <T, U> Flow<T>.takeUntil(other: Flow<U>): Flow<T> = flow {
}

private class TakeUntilException : CancellationException()

/**
* Regular filterNotNull requires T : Any?
*/
internal fun <T> Flow<T?>.filterNotNullCast(): Flow<T> {
return filter { it != null }.map { checkNotNull(it) { "oh shi-" } }
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,31 +100,12 @@ internal class ControllerImplementation<Action, Mutation, State, Effect>(

// endregion

// region effects

private val effectEmitter: (Effect) -> Unit = { effect ->
val canBeOffered = effectsChannel.offer(effect)
if (canBeOffered) {
controllerLog.log { ControllerEvent.Effect(tag, effect.toString()) }
} else {
throw ControllerError.Effect(tag, effect.toString())
}
}

private val effectsChannel = Channel<Effect>(EFFECTS_CAPACITY)
override val effects: Flow<Effect>
get() = if (stubEnabled) stubbedEffectFlow else {
effectsChannel.receiveAsFlow().cancellable()
}

// endregion

// region controller

override val state: Flow<State>
get() = if (stubEnabled) stubbedStateFlow else {
get() = if (stubEnabled) stubbedStateFlow.cancellable() else {
if (controllerStart is ControllerStart.Lazy) start()
mutableStateFlow
mutableStateFlow.cancellable()
}

override val currentState: State
Expand All @@ -144,6 +125,27 @@ internal class ControllerImplementation<Action, Mutation, State, Effect>(

// endregion

// region effects

private val effectEmitter: (Effect) -> Unit = { effect ->
val canBeOffered = effectsChannel.offer(effect)
if (canBeOffered) {
controllerLog.log { ControllerEvent.Effect(tag, effect.toString()) }
} else {
throw ControllerError.Effect(tag, effect.toString())
}
}

private val effectsChannel = Channel<Effect>(EFFECTS_CAPACITY)
override val effects: Flow<Effect>
get() = if (stubEnabled) {
stubbedEffectFlow.receiveAsFlow().cancellable()
} else {
effectsChannel.receiveAsFlow().cancellable()
}

// endregion

// region manual start + stop

internal fun start(): Boolean {
Expand All @@ -162,8 +164,7 @@ internal class ControllerImplementation<Action, Mutation, State, Effect>(

private val stubbedActions = mutableListOf<Action>()
private val stubbedStateFlow = MutableStateFlow(initialState)
private val _stubbedEffectFlow = MutableStateFlow<Effect?>(null)
private val stubbedEffectFlow = _stubbedEffectFlow.filterNotNullCast()
private val stubbedEffectFlow = Channel<Effect>(EFFECTS_CAPACITY)

override val dispatchedActions: List<Action>
get() = stubbedActions
Expand All @@ -173,7 +174,7 @@ internal class ControllerImplementation<Action, Mutation, State, Effect>(
}

override fun emitEffect(effect: Effect) {
_stubbedEffectFlow.value = effect
stubbedEffectFlow.offer(effect)
}

// endregion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
Expand Down Expand Up @@ -61,16 +60,4 @@ internal class ExtensionsTest {
val emptyResult = emptyFlow<Int>().takeUntil(flow { delay(1101); emit(Unit) }).toList()
assertEquals(emptyList(), emptyResult)
}

@Test
fun `filterNotNullCast with empty flow`() = runBlockingTest {
val result = emptyFlow<Int>().filterNotNullCast().toList()
assertEquals(emptyList(), result)
}

@Test
fun `filterNotNullCast with non-empty flow`() = runBlockingTest {
val result = flowOf(null, 1, 2, null).filterNotNullCast().toList()
assertEquals(listOf(1, 2), result)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,22 @@ import at.florianschuster.test.flow.expect
import at.florianschuster.test.flow.lastEmission
import at.florianschuster.test.flow.testIn
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.test.runBlockingTest
import org.junit.Rule
import org.junit.Test
import kotlin.test.assertEquals
Expand Down Expand Up @@ -234,7 +239,7 @@ internal class ImplementationTest {

@Test
fun `effects are received from mutator, reducer and transformer`() {
val sut = testCoroutineScope.createEffectController()
val sut = testCoroutineScope.createEffectTestController()
val states = sut.state.testIn(testCoroutineScope)
val effects = sut.effects.testIn(testCoroutineScope)

Expand All @@ -254,7 +259,7 @@ internal class ImplementationTest {

@Test
fun `effects are only received once collector`() {
val sut = testCoroutineScope.createEffectController()
val sut = testCoroutineScope.createEffectTestController()
val effects = mutableListOf<TestEffect>()
sut.effects.onEach { effects.add(it) }.launchIn(testCoroutineScope)
sut.effects.onEach { effects.add(it) }.launchIn(testCoroutineScope)
Expand All @@ -277,7 +282,7 @@ internal class ImplementationTest {
@Test
fun `effects overflow throws error`() {
val scope = TestCoroutineScope()
val sut = scope.createEffectController()
val sut = scope.createEffectTestController()

repeat(ControllerImplementation.EFFECTS_CAPACITY) { sut.dispatch(1) }
assertTrue(scope.uncaughtExceptions.isEmpty())
Expand All @@ -289,6 +294,40 @@ internal class ImplementationTest {
assertEquals(ControllerError.Effect::class, assertNotNull(error.cause)::class)
}

@Test
fun `state is cancellable`() = runBlockingTest {
val sut = createCounterController()

sut.dispatch(Unit)

var state: Int? = null
launch {
cancel()
state = -1
state = sut.state.first() // this should be cancelled and thus not return a value
}

assertEquals(-1, state)
sut.cancel()
}

@Test
fun `effects are cancellable`() = runBlockingTest {
val sut = createEffectTestController()

sut.dispatch(TestEffect.Mutator.ordinal)

var effect: TestEffect? = null
launch {
cancel()
effect = TestEffect.Reducer
effect = sut.effects.first() // this should be cancelled and thus not return a value
}

assertEquals(TestEffect.Reducer, effect)
sut.cancel()
}

private fun CoroutineScope.createAlwaysSameStateController() =
ControllerImplementation<Unit, Unit, Int, Nothing>(
scope = this,
Expand Down Expand Up @@ -415,7 +454,7 @@ internal class ImplementationTest {
Mutator, Reducer, ActionTransformer, MutationTransformer, StateTransformer
}

private fun CoroutineScope.createEffectController() =
private fun CoroutineScope.createEffectTestController() =
ControllerImplementation<Int, Int, Int, TestEffect>(
scope = this,
dispatcher = defaultScopeDispatcher(),
Expand Down

0 comments on commit a4d5694

Please sign in to comment.