Skip to content

Commit

Permalink
Merge pull request #261 from arkivanov/optimize-Observable-flatMap
Browse files Browse the repository at this point in the history
Optimized Observable flatMap
  • Loading branch information
Arkadii Ivanov authored Oct 23, 2019
2 parents 6776d2b + 3b0ed22 commit 5a1e3bb
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.badoo.reaktive.base

import com.badoo.reaktive.disposable.CompositeDisposable
import com.badoo.reaktive.disposable.Disposable

internal open class CompositeDisposableObserver : CompositeDisposable(), Observer {

override fun onSubscribe(disposable: Disposable) {
add(disposable)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package com.badoo.reaktive.disposable
* Thread-safe collection of [Disposable]
*/
@Suppress("EmptyDefaultConstructor")
expect class CompositeDisposable() : Disposable {
expect open class CompositeDisposable() : Disposable {

/**
* Disposes the [CompositeDisposable] and all its [Disposable]s.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,27 @@
package com.badoo.reaktive.observable

import com.badoo.reaktive.base.CompositeDisposableObserver
import com.badoo.reaktive.base.ErrorCallback
import com.badoo.reaktive.base.Observer
import com.badoo.reaktive.base.ValueCallback
import com.badoo.reaktive.base.subscribeSafe
import com.badoo.reaktive.base.tryCatch
import com.badoo.reaktive.completable.CompletableCallbacks
import com.badoo.reaktive.disposable.CompositeDisposable
import com.badoo.reaktive.disposable.Disposable
import com.badoo.reaktive.utils.atomic.AtomicInt

fun <T, R> Observable<T>.flatMap(mapper: (T) -> Observable<R>): Observable<R> =
observable { emitter ->
val disposables = CompositeDisposable()
emitter.setDisposable(disposables)
val serializedEmitter = emitter.serialize()

subscribeSafe(
object : ObservableObserver<T>, ErrorCallback by serializedEmitter {
val upstreamObserver =
object : CompositeDisposableObserver(), ObservableObserver<T>, ErrorCallback by serializedEmitter {
private val activeSourceCount = AtomicInt(1)

private val mappedObserver =
object : ObservableObserver<R>, Observer by this, CompletableCallbacks by this,
ValueCallback<R> by serializedEmitter {
}

override fun onSubscribe(disposable: Disposable) {
disposables += disposable
}

override fun onNext(value: T) {
activeSourceCount.addAndGet(1)
serializedEmitter.tryCatch { mapper(value).subscribe(mappedObserver) }
Expand All @@ -40,7 +33,10 @@ fun <T, R> Observable<T>.flatMap(mapper: (T) -> Observable<R>): Observable<R> =
}
}
}
)

emitter.setDisposable(upstreamObserver)

subscribeSafe(upstreamObserver)
}

fun <T, U, R> Observable<T>.flatMap(mapper: (T) -> Observable<U>, resultSelector: (T, U) -> R): Observable<R> =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,22 @@
package com.badoo.reaktive.observable

import com.badoo.reaktive.base.CompositeDisposableObserver
import com.badoo.reaktive.base.ErrorCallback
import com.badoo.reaktive.base.Observer
import com.badoo.reaktive.base.subscribeSafe
import com.badoo.reaktive.base.tryCatch
import com.badoo.reaktive.completable.CompletableCallbacks
import com.badoo.reaktive.disposable.CompositeDisposable
import com.badoo.reaktive.disposable.Disposable
import com.badoo.reaktive.maybe.Maybe
import com.badoo.reaktive.maybe.MaybeObserver
import com.badoo.reaktive.maybe.map
import com.badoo.reaktive.utils.atomic.AtomicInt

fun <T, R> Observable<T>.flatMapMaybe(mapper: (T) -> Maybe<R>): Observable<R> =
observable { emitter ->
val disposables = CompositeDisposable()
emitter.setDisposable(disposables)
val serializedEmitter = emitter.serialize()

subscribeSafe(
object : ObservableObserver<T>, ErrorCallback by serializedEmitter {
val upstreamObserver =
object : CompositeDisposableObserver(), ObservableObserver<T>, ErrorCallback by serializedEmitter {
private val activeSourceCount = AtomicInt(1)

private val mappedObserver: MaybeObserver<R> =
Expand All @@ -30,10 +27,6 @@ fun <T, R> Observable<T>.flatMapMaybe(mapper: (T) -> Maybe<R>): Observable<R> =
}
}

override fun onSubscribe(disposable: Disposable) {
disposables += disposable
}

override fun onNext(value: T) {
activeSourceCount.addAndGet(1)
serializedEmitter.tryCatch(block = { mapper(value).subscribe(mappedObserver) })
Expand All @@ -45,7 +38,10 @@ fun <T, R> Observable<T>.flatMapMaybe(mapper: (T) -> Maybe<R>): Observable<R> =
}
}
}
)

emitter.setDisposable(upstreamObserver)

subscribeSafe(upstreamObserver)
}

fun <T, U, R> Observable<T>.flatMapMaybe(mapper: (T) -> Maybe<U>, resultSelector: (T, U) -> R): Observable<R> =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
package com.badoo.reaktive.observable

import com.badoo.reaktive.base.CompositeDisposableObserver
import com.badoo.reaktive.base.ErrorCallback
import com.badoo.reaktive.base.Observer
import com.badoo.reaktive.base.subscribeSafe
import com.badoo.reaktive.base.tryCatch
import com.badoo.reaktive.disposable.CompositeDisposable
import com.badoo.reaktive.disposable.Disposable
import com.badoo.reaktive.single.Single
import com.badoo.reaktive.single.SingleObserver
import com.badoo.reaktive.single.map
import com.badoo.reaktive.utils.atomic.AtomicInt

fun <T, R> Observable<T>.flatMapSingle(mapper: (T) -> Single<R>): Observable<R> =
observable { emitter ->
val disposables = CompositeDisposable()
emitter.setDisposable(disposables)
val serializedEmitter = emitter.serialize()

subscribeSafe(
object : ObservableObserver<T>, ErrorCallback by serializedEmitter {
val upstreamObserver =
object : CompositeDisposableObserver(), ObservableObserver<T>, ErrorCallback by serializedEmitter {
private val activeSourceCount = AtomicInt(1)

private val mappedObserver: SingleObserver<R> =
Expand All @@ -29,10 +26,6 @@ fun <T, R> Observable<T>.flatMapSingle(mapper: (T) -> Single<R>): Observable<R>
}
}

override fun onSubscribe(disposable: Disposable) {
disposables += disposable
}

override fun onNext(value: T) {
activeSourceCount.addAndGet(1)
serializedEmitter.tryCatch(block = { mapper(value).subscribe(mappedObserver) })
Expand All @@ -44,7 +37,10 @@ fun <T, R> Observable<T>.flatMapSingle(mapper: (T) -> Single<R>): Observable<R>
}
}
}
)

emitter.setDisposable(upstreamObserver)

subscribeSafe(upstreamObserver)
}

fun <T, U, R> Observable<T>.flatMapSingle(mapper: (T) -> Single<U>, resultSelector: (T, U) -> R): Observable<R> =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import kotlin.jvm.Volatile
* Thread-safe collection of [Disposable]
*/
@Suppress("EmptyDefaultConstructor")
actual class CompositeDisposable actual constructor() : Disposable {
actual open class CompositeDisposable actual constructor() : Disposable {

private var list: MutableList<Disposable>? = null
@Volatile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import com.badoo.reaktive.utils.atomic.getAndUpdate
* Thread-safe collection of [Disposable]
*/
@Suppress("EmptyDefaultConstructor")
actual class CompositeDisposable actual constructor() : Disposable {
actual open class CompositeDisposable actual constructor() : Disposable {

private val list = AtomicReference<List<Disposable>?>(emptyList())
override val isDisposed: Boolean get() = list.value == null
Expand Down

0 comments on commit 5a1e3bb

Please sign in to comment.