Skip to content

Commit

Permalink
Merge pull request #593 from arkivanov/docs-single-2
Browse files Browse the repository at this point in the history
KDocs for Single (part 2)
  • Loading branch information
Arkadii Ivanov authored Mar 1, 2021
2 parents 4408715 + 2c567da commit cba3858
Show file tree
Hide file tree
Showing 20 changed files with 190 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import com.badoo.reaktive.disposable.plusAssign
import com.badoo.reaktive.scheduler.Scheduler
import com.badoo.reaktive.utils.freeze

/**
* Signals `onSuccess` and `onError` events of the [Single] on the specified [Scheduler].
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#observeOn-io.reactivex.Scheduler-).
*/
fun <T> Single<T>.observeOn(scheduler: Scheduler): Single<T> =
single { emitter ->
val disposables = CompositeDisposable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package com.badoo.reaktive.single
import com.badoo.reaktive.maybe.Maybe
import com.badoo.reaktive.maybe.map

/**
* Returns [Maybe] that emits the success value of the [Single] if it is an instance of `T`,
* otherwise completes.
*/
inline fun <reified T> Single<*>.ofType(): Maybe<T> =
filter { it is T }
.map { it as T }
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import com.badoo.reaktive.base.subscribeSafe
import com.badoo.reaktive.base.tryCatch
import com.badoo.reaktive.disposable.Disposable

/**
* When the [Single] signals `onError`, resumes the flow with a new [Single] returned by `nextSupplier.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#onErrorResumeNext-io.reactivex.functions.Function-).
*/
fun <T> Single<T>.onErrorResumeNext(nextSupplier: (Throwable) -> Single<T>): Single<T> =
single { emitter ->
subscribe(
Expand All @@ -27,5 +32,10 @@ fun <T> Single<T>.onErrorResumeNext(nextSupplier: (Throwable) -> Single<T>): Sin
)
}

/**
* When the [Single] signals `onError`, resumes the flow with `next` [Single].
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#onErrorResumeNext-io.reactivex.Single-).
*/
fun <T> Single<T>.onErrorResumeNext(next: Single<T>): Single<T> =
onErrorResumeNext { next }
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
package com.badoo.reaktive.single

/**
* When the [Single] signals `onError`, emits a value returned `valueSupplier`.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#onErrorReturn-io.reactivex.functions.Function-).
*/
fun <T> Single<T>.onErrorReturn(valueSupplier: (Throwable) -> T): Single<T> =
onErrorResumeNext { throwable -> valueSupplier(throwable).toSingle() }

/**
* When the [Single] signals `onError`, emits the `value`.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#onErrorReturnItem-T-).
*/
fun <T> Single<T>.onErrorReturnValue(value: T): Single<T> =
onErrorResumeNext { value.toSingle() }
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,9 @@ package com.badoo.reaktive.single
import com.badoo.reaktive.observable.Observable
import com.badoo.reaktive.observable.repeat

/**
* When the [Single] signals `onSuccess`, re-subscribes to the [Single], `count` times.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#repeat-long-).
*/
fun <T> Single<T>.repeat(count: Int = -1): Observable<T> = asObservable().repeat(count = count)
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import com.badoo.reaktive.observable.Observable
import com.badoo.reaktive.observable.observable
import com.badoo.reaktive.utils.atomic.AtomicInt

/**
* When the [Single] signals `onSuccess`, re-subscribes to the [Single] if the `predicate` function returns `false`.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#repeatUntil-io.reactivex.functions.BooleanSupplier-).
*/
fun <T> Single<T>.repeatUntil(predicate: (T) -> Boolean): Observable<T> =
observable { emitter ->
val observer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ import com.badoo.reaktive.observable.Observable
import com.badoo.reaktive.observable.observable
import com.badoo.reaktive.utils.atomic.AtomicInt

/**
* When the [Single] signals `onSuccess`, re-subscribes to the [Single] when the [Maybe] returned by the `handler` function emits a value.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#repeatWhen-io.reactivex.functions.Function-).
*/
fun <T> Single<T>.repeatWhen(handler: (repeatNumber: Int, value: T) -> Maybe<*>): Observable<T> =
observable { emitter ->
val observer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import com.badoo.reaktive.base.operator.Retry
import com.badoo.reaktive.base.subscribeSafe
import com.badoo.reaktive.disposable.Disposable

/**
* When the [Single] signals `onError`, re-subscribes to the [Single] if the `predicate` returns `true`.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#retry-io.reactivex.functions.BiPredicate-).
*/
fun <T> Single<T>.retry(predicate: (attempt: Int, Throwable) -> Boolean = { _, _ -> true }): Single<T> =
single { emitter ->
subscribe(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
package com.badoo.reaktive.single

/**
* Creates a [Single] with manual signalling via [SingleEmitter].
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#create-io.reactivex.SingleOnSubscribe-).
*/
expect fun <T> single(onSubscribe: (emitter: SingleEmitter<T>) -> Unit): Single<T>
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ import com.badoo.reaktive.disposable.DisposableWrapper
import com.badoo.reaktive.disposable.doIfNotDisposed
import com.badoo.reaktive.utils.handleReaktiveError

/**
* Subscribes to the [Single] and provides event callbacks.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#subscribe-io.reactivex.functions.Consumer-io.reactivex.functions.Consumer-).
*
* @param isThreadLocal see [Single.threadLocal]
*/
@UseReturnValue
fun <T> Single<T>.subscribe(
isThreadLocal: Boolean = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import com.badoo.reaktive.disposable.Disposable
import com.badoo.reaktive.disposable.plusAssign
import com.badoo.reaktive.scheduler.Scheduler

/**
* Returns a [Single] that subscribes to the current [Single] on the specified [Scheduler].
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#subscribeOn-io.reactivex.Scheduler-).
*/
fun <T> Single<T>.subscribeOn(scheduler: Scheduler): Single<T> =
single { emitter ->
val disposables = CompositeDisposable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import com.badoo.reaktive.disposable.plusAssign
import com.badoo.reaktive.utils.ThreadLocalDisposableHolder
import com.badoo.reaktive.utils.handleReaktiveError

/**
* Prevents the downstream from freezing by saving the [SingleObserver] in a thread local storage.
*
* Please refer to the corresponding Readme [section](https://github.com/badoo/Reaktive#thread-local-tricks-to-avoid-freezing).
*/
fun <T> Single<T>.threadLocal(): Single<T> =
single {
val disposables = CompositeDisposable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import com.badoo.reaktive.disposable.Disposable
import com.badoo.reaktive.disposable.addTo
import com.badoo.reaktive.scheduler.Scheduler

/**
* Disposes the current [Single] if it does not signal within the `timeoutMillis` timeout, and subscribes to `other` [Single] if provided.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#timeout-long-java.util.concurrent.TimeUnit-io.reactivex.Scheduler-io.reactivex.SingleSource-).
*/
fun <T> Single<T>.timeout(timeoutMillis: Long, scheduler: Scheduler, other: Single<T>? = null): Single<T> =
single { emitter ->
val onTimeout: () -> Unit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ package com.badoo.reaktive.single

import com.badoo.reaktive.scheduler.Scheduler

/**
* Signals `onSuccess` with `delayMillis` value after the given `delayMillis` delay.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#timer-long-java.util.concurrent.TimeUnit-io.reactivex.Scheduler-).
*/
fun singleTimer(delayMillis: Long, scheduler: Scheduler): Single<Long> =
single { emitter ->
val executor = scheduler.newExecutor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import com.badoo.reaktive.disposable.Disposable
import com.badoo.reaktive.plugin.onAssembleSingle
import kotlin.native.concurrent.SharedImmutable

/**
* ⚠️ Advanced use only: creates an instance of [Single] without any safeguards by calling `onSubscribe` with a [SingleObserver].
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#unsafeCreate-io.reactivex.SingleSource-).
*/
@OptIn(ExperimentalReaktiveApi::class)
inline fun <T> singleUnsafe(crossinline onSubscribe: (observer: SingleObserver<T>) -> Unit): Single<T> =
onAssembleSingle(
Expand All @@ -15,6 +20,11 @@ inline fun <T> singleUnsafe(crossinline onSubscribe: (observer: SingleObserver<T
}
)

/**
* Returns a [Single] that emits the specified `value`.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#just-T-).
*/
fun <T> singleOf(value: T): Single<T> =
singleUnsafe { observer ->
val disposable = Disposable()
Expand All @@ -25,6 +35,9 @@ fun <T> singleOf(value: T): Single<T> =
}
}

/**
* A convenience extensions function for [singleOf].
*/
fun <T> T.toSingle(): Single<T> = singleOf(this)

@SharedImmutable
Expand All @@ -34,8 +47,18 @@ private val singleOfNever by lazy {
}
}

/**
* Returns a [Single] that never signals.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#never--).
*/
fun <T> singleOfNever(): Single<T> = singleOfNever

/**
* Returns a [Single] that signals the specified `error` via `onError`.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#error-java.lang.Throwable-).
*/
fun <T> singleOfError(error: Throwable): Single<T> =
singleUnsafe { observer ->
val disposable = Disposable()
Expand All @@ -46,11 +69,22 @@ fun <T> singleOfError(error: Throwable): Single<T> =
}
}

/**
* A convenience extensions function for [singleOfError].
*/
fun <T> Throwable.toSingleOfError(): Single<T> = singleOfError(this)

/**
* Returns a [Single] that emits the value returned by the `func` shared function.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#fromCallable-java.util.concurrent.Callable-).
*/
fun <T> singleFromFunction(func: () -> T): Single<T> =
single { emitter ->
emitter.onSuccess(func())
}

/**
* A convenience extensions function for [singleFromFunction].
*/
fun <T> (() -> T).asSingle(): Single<T> = singleFromFunction(this)
55 changes: 55 additions & 0 deletions reaktive/src/commonMain/kotlin/com/badoo/reaktive/single/Zip.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,29 @@ package com.badoo.reaktive.single
import com.badoo.reaktive.observable.firstOrError
import com.badoo.reaktive.observable.zip

/**
* Subscribes to all provided [Single]s, accumulates all their values and emits a value returned by the `mapper` function.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#zip-java.lang.Iterable-io.reactivex.functions.Function-).
*/
fun <T, R> Iterable<Single<T>>.zip(mapper: (List<T>) -> R): Single<R> =
map(Single<T>::asObservable)
.zip(mapper)
.firstOrError()

/**
* Subscribes to all `sources` [Single]s, accumulates all their values and emits a value returned by the `mapper` function.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#zipArray-io.reactivex.functions.Function-io.reactivex.SingleSource...-).
*/
fun <T, R> zip(vararg sources: Single<T>, mapper: (List<T>) -> R): Single<R> =
sources.toList().zip(mapper)

/**
* Subscribes to all `source` [Single]s, accumulates their values and emits a value returned by the `mapper` function.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#zip-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.functions.BiFunction-).
*/
fun <T1, T2, R> zip(
source1: Single<T1>,
source2: Single<T2>,
Expand All @@ -24,6 +39,11 @@ fun <T1, T2, R> zip(
mapper(values[0] as T1, values[1] as T2)
}

/**
* Subscribes to all `source` [Single]s, accumulates their values and emits a value returned by the `mapper` function.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#zip-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.functions.Function3-).
*/
fun <T1, T2, T3, R> zip(
source1: Single<T1>,
source2: Single<T2>,
Expand All @@ -36,6 +56,11 @@ fun <T1, T2, T3, R> zip(
mapper(values[0] as T1, values[1] as T2, values[2] as T3)
}

/**
* Subscribes to all `source` [Single]s, accumulates their values and emits a value returned by the `mapper` function.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#zip-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.functions.Function4-).
*/
fun <T1, T2, T3, T4, R> zip(
source1: Single<T1>,
source2: Single<T2>,
Expand All @@ -49,6 +74,11 @@ fun <T1, T2, T3, T4, R> zip(
mapper(values[0] as T1, values[1] as T2, values[2] as T3, values[3] as T4)
}

/**
* Subscribes to all `source` [Single]s, accumulates their values and emits a value returned by the `mapper` function.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#zip-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.functions.Function5-).
*/
fun <T1, T2, T3, T4, T5, R> zip(
source1: Single<T1>,
source2: Single<T2>,
Expand All @@ -63,6 +93,11 @@ fun <T1, T2, T3, T4, T5, R> zip(
mapper(values[0] as T1, values[1] as T2, values[2] as T3, values[3] as T4, values[4] as T5)
}

/**
* Subscribes to all `source` [Single]s, accumulates their values and emits a value returned by the `mapper` function.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#zip-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.functions.Function6-).
*/
fun <T1, T2, T3, T4, T5, T6, R> zip(
source1: Single<T1>,
source2: Single<T2>,
Expand All @@ -78,6 +113,11 @@ fun <T1, T2, T3, T4, T5, T6, R> zip(
mapper(values[0] as T1, values[1] as T2, values[2] as T3, values[3] as T4, values[4] as T5, values[5] as T6)
}

/**
* Subscribes to all `source` [Single]s, accumulates their values and emits a value returned by the `mapper` function.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#zip-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.functions.Function7-).
*/
fun <T1, T2, T3, T4, T5, T6, T7, R> zip(
source1: Single<T1>,
source2: Single<T2>,
Expand All @@ -102,6 +142,11 @@ fun <T1, T2, T3, T4, T5, T6, T7, R> zip(
)
}

/**
* Subscribes to all `source` [Single]s, accumulates their values and emits a value returned by the `mapper` function.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#zip-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.functions.Function8-).
*/
fun <T1, T2, T3, T4, T5, T6, T7, T8, R> zip(
source1: Single<T1>,
source2: Single<T2>,
Expand All @@ -128,6 +173,11 @@ fun <T1, T2, T3, T4, T5, T6, T7, T8, R> zip(
)
}

/**
* Subscribes to all `source` [Single]s, accumulates their values and emits a value returned by the `mapper` function.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#zip-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.functions.Function9-).
*/
fun <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> zip(
source1: Single<T1>,
source2: Single<T2>,
Expand Down Expand Up @@ -156,6 +206,11 @@ fun <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> zip(
)
}

/**
* Subscribes to all `source` [Single]s, accumulates their values and emits a value returned by the `mapper` function.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#zip-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.SingleSource-io.reactivex.functions.Function9-).
*/
fun <T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, R> zip(
source1: Single<T1>,
source2: Single<T2>,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
package com.badoo.reaktive.single

/**
* Subscribes to both the current [Single] and the `other` [Single], accumulates their values and emits a value returned by the `mapper` function.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#zipWith-io.reactivex.SingleSource-io.reactivex.functions.BiFunction-).
*/
fun <T, R, I> Single<T>.zipWith(other: Single<R>, mapper: (T, R) -> I): Single<I> =
zip(this, other) { first, second -> mapper(first, second) }
Loading

0 comments on commit cba3858

Please sign in to comment.