Skip to content

Commit

Permalink
Merge pull request #645 from arkivanov/add-buffer-operator
Browse files Browse the repository at this point in the history
Added remaining buffer operators
  • Loading branch information
Arkadii Ivanov authored Oct 16, 2021
2 parents 2202f0e + b3ced89 commit f1f1738
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 48 deletions.
11 changes: 11 additions & 0 deletions reaktive/api/reaktive.api
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,17 @@ public final class com/badoo/reaktive/observable/BufferCountSkipKt {
public static synthetic fun buffer$default (Lcom/badoo/reaktive/observable/Observable;IIILjava/lang/Object;)Lcom/badoo/reaktive/observable/Observable;
}

public final class com/badoo/reaktive/observable/BufferKt {
public static final fun buffer (Lcom/badoo/reaktive/observable/Observable;JJLcom/badoo/reaktive/scheduler/Scheduler;JZ)Lcom/badoo/reaktive/observable/Observable;
public static final fun buffer (Lcom/badoo/reaktive/observable/Observable;JLcom/badoo/reaktive/scheduler/Scheduler;IZ)Lcom/badoo/reaktive/observable/Observable;
public static final fun buffer (Lcom/badoo/reaktive/observable/Observable;Lcom/badoo/reaktive/observable/Observable;JZ)Lcom/badoo/reaktive/observable/Observable;
public static final fun buffer (Lcom/badoo/reaktive/observable/Observable;Lcom/badoo/reaktive/observable/Observable;Lkotlin/jvm/functions/Function1;JZ)Lcom/badoo/reaktive/observable/Observable;
public static synthetic fun buffer$default (Lcom/badoo/reaktive/observable/Observable;JJLcom/badoo/reaktive/scheduler/Scheduler;JZILjava/lang/Object;)Lcom/badoo/reaktive/observable/Observable;
public static synthetic fun buffer$default (Lcom/badoo/reaktive/observable/Observable;JLcom/badoo/reaktive/scheduler/Scheduler;IZILjava/lang/Object;)Lcom/badoo/reaktive/observable/Observable;
public static synthetic fun buffer$default (Lcom/badoo/reaktive/observable/Observable;Lcom/badoo/reaktive/observable/Observable;JZILjava/lang/Object;)Lcom/badoo/reaktive/observable/Observable;
public static synthetic fun buffer$default (Lcom/badoo/reaktive/observable/Observable;Lcom/badoo/reaktive/observable/Observable;Lkotlin/jvm/functions/Function1;JZILjava/lang/Object;)Lcom/badoo/reaktive/observable/Observable;
}

public final class com/badoo/reaktive/observable/CollectKt {
public static final fun collect (Lcom/badoo/reaktive/observable/Observable;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)Lcom/badoo/reaktive/single/Single;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.badoo.reaktive.observable

import com.badoo.reaktive.completable.Completable
import com.badoo.reaktive.scheduler.Scheduler

/**
* Returns an [Observable] that emits non-overlapping windows of elements it collects from the source [Observable].
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Observable.html#buffer-long-java.util.concurrent.TimeUnit-io.reactivex.rxjava3.core.Scheduler-int-io.reactivex.rxjava3.functions.Supplier-boolean-).
*/
fun <T> Observable<T>.buffer(
spanMillis: Long,
scheduler: Scheduler,
limit: Int = Int.MAX_VALUE,
restartOnLimit: Boolean = false
): Observable<List<T>> =
window(spanMillis = spanMillis, scheduler = scheduler, limit = limit.toLong(), restartOnLimit = restartOnLimit)
.flatMapSingle { it.toList() }

/**
* Returns an [Observable] that emits non-overlapping windows of elements it collects from the source [Observable].
* Window boundaries are determined by the elements emitted by the specified [boundaries][boundaries] [Observable].
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Observable.html#buffer-io.reactivex.rxjava3.core.ObservableSource-).
*/
fun <T> Observable<T>.buffer(
boundaries: Observable<*>,
limit: Long = Long.MAX_VALUE,
restartOnLimit: Boolean = false
): Observable<List<T>> =
window(boundaries = boundaries, limit = limit, restartOnLimit = restartOnLimit)
.flatMapSingle { it.toList() }

/**
* Returns an [Observable] that emits possibly overlapping windows of elements it collects from the source [Observable].
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Observable.html#buffer-long-long-java.util.concurrent.TimeUnit-io.reactivex.rxjava3.core.Scheduler-).
*/
fun <T> Observable<T>.buffer(
spanMillis: Long,
skipMillis: Long,
scheduler: Scheduler,
limit: Long = Long.MAX_VALUE,
restartOnLimit: Boolean = false
): Observable<List<T>> =
window(spanMillis = spanMillis, skipMillis = skipMillis, scheduler = scheduler, limit = limit, restartOnLimit = restartOnLimit)
.flatMapSingle { it.toList() }

/**
* Returns an [Observable] that emits possibly overlapping windows of elements it collects from the source [Observable].
* Every new window is opened when the [opening][opening] [Observable] emits an element.
* Each window is closed when the corresponding [Observable] returned by the [closing] function completes.
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Observable.html#buffer-io.reactivex.rxjava3.core.ObservableSource-io.reactivex.rxjava3.functions.Function-).
*/
fun <T, S> Observable<T>.buffer(
opening: Observable<S>,
closing: (S) -> Completable,
limit: Long = Long.MAX_VALUE,
restartOnLimit: Boolean = false
): Observable<List<T>> =
window(opening = opening, closing = closing, limit = limit, restartOnLimit = restartOnLimit)
.flatMapSingle { it.toList() }
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
package com.badoo.reaktive.observable

import com.badoo.reaktive.base.ErrorCallback
import com.badoo.reaktive.disposable.Disposable
import com.badoo.reaktive.utils.SharedList
import com.badoo.reaktive.utils.atomic.AtomicInt
import com.badoo.reaktive.utils.queue.SharedQueue

/**
* Returns an [Observable] that emits buffered [List]s of elements it collects from the source [Observable].
* The first buffer is started with the first element emitted by the source [Observable].
Expand All @@ -14,45 +8,6 @@ import com.badoo.reaktive.utils.queue.SharedQueue
*
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#buffer-int-int-).
*/
fun <T> Observable<T>.buffer(count: Int, skip: Int = count): Observable<List<T>> {
require(count > 0) { "Count value must be positive" }
require(skip > 0) { "Skip value must be positive" }

return observable { emitter ->
val listQueue = SharedQueue<SharedList<T>>()
val skipCounter = AtomicInt(1)

subscribe(
object : ObservableObserver<T>, ErrorCallback by emitter {
override fun onSubscribe(disposable: Disposable) {
emitter.setDisposable(disposable)
}

override fun onNext(value: T) {
if (skipCounter.addAndGet(-1) == 0) {
skipCounter.value = skip
listQueue.offer(SharedList())
}

listQueue.forEach { it += value }

if (listQueue.peek?.size == count) {
pollAndEmit()
}
}

override fun onComplete() {
while (!listQueue.isEmpty) {
pollAndEmit()
}
emitter.onComplete()
}

private fun pollAndEmit() {
val list = listQueue.poll()!!
emitter.onNext(list)
}
}
)
}
}
fun <T> Observable<T>.buffer(count: Int, skip: Int = count): Observable<List<T>> =
window(count = count.toLong(), skip = skip.toLong())
.flatMapSingle { it.toList() }
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package com.badoo.reaktive.observable

import com.badoo.reaktive.test.completable.TestCompletable
import com.badoo.reaktive.test.observable.TestObservable
import com.badoo.reaktive.test.observable.assertValues
import com.badoo.reaktive.test.observable.onNext
import com.badoo.reaktive.test.observable.test
import com.badoo.reaktive.test.scheduler.TestScheduler
import kotlin.test.Test

class BufferSimpleTests {

private val upstream = TestObservable<Int>()

@Test
fun buffer_spanMillis_emits_correct_values() {
val scheduler = TestScheduler()
val timer = scheduler.timer
val observer = upstream.buffer(spanMillis = 1000L, scheduler = scheduler, limit = 3).test()

upstream.onNext(1, 2)
timer.advanceBy(999L)
upstream.onNext(3)
timer.advanceBy(1L)
upstream.onNext(4, 5)
timer.advanceBy(1000L)
upstream.onNext(6, 7, 8)
timer.advanceBy(1000L)
upstream.onNext(9, 10, 11, 12)
timer.advanceBy(1000L)
upstream.onNext(13, 14, 15)
timer.advanceBy(1000L)
upstream.onNext(16, 17)
upstream.onComplete()

observer.assertValues(
listOf(1, 2, 3),
listOf(4, 5),
listOf(6, 7, 8),
listOf(9, 10, 11),
listOf(13, 14, 15),
listOf(16, 17),
)
}

@Test
fun buffer_boundaries_emits_correct_values() {
val boundaries = TestObservable<Unit>()
val observer = upstream.buffer(boundaries = boundaries, limit = 3).test()

upstream.onNext(1, 2, 3)
boundaries.onNext(Unit)
upstream.onNext(4, 5)
boundaries.onNext(Unit)
upstream.onNext(6, 7, 8)
boundaries.onNext(Unit)
upstream.onNext(9, 10, 11, 12)
boundaries.onNext(Unit)
upstream.onNext(13, 14, 15)
boundaries.onNext(Unit)
upstream.onNext(16, 17)
upstream.onComplete()

observer.assertValues(
listOf(1, 2, 3),
listOf(4, 5),
listOf(6, 7, 8),
listOf(9, 10, 11),
listOf(13, 14, 15),
listOf(16, 17),
)
}

@Test
fun buffer_spanMillis_skipMillis_emits_correct_values() {
val scheduler = TestScheduler()
val timer = scheduler.timer
val observer = upstream.buffer(spanMillis = 1000L, skipMillis = 600L, scheduler = scheduler, limit = 13).test()

upstream.onNext(1, 2)
timer.advanceBy(599L)
upstream.onNext(3)
timer.advanceBy(1L)
upstream.onNext(4, 5)
timer.advanceBy(399L)
upstream.onNext(6)
timer.advanceBy(1L)
upstream.onNext(7, 8, 9, 10)
timer.advanceBy(200L)
upstream.onNext(11, 12, 13)
timer.advanceBy(200L)
upstream.onNext(14, 15)
timer.advanceBy(200L)
upstream.onNext(16, 17)
timer.advanceBy(200L)
upstream.onNext(18, 19, 20, 21, 22, 23, 24)
upstream.onComplete()

observer.assertValues(
listOf(1, 2, 3, 4, 5, 6),
listOf(4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15),
listOf(11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23),
listOf(18, 19, 20, 21, 22, 23, 24),
)
}

@Test
fun buffer_opening_closing_emits_correct_values() {
val opening = TestObservable<Int>()
val closing = List(5) { TestCompletable() }
val observer = upstream.buffer(opening = opening, closing = { closing[it] }, limit = 5).test()

upstream.onNext(1)
opening.onNext(0)
upstream.onNext(2)
opening.onNext(1)
upstream.onNext(3)
opening.onNext(2)
upstream.onNext(4)
closing[0].onComplete()
upstream.onNext(5)
closing[2].onComplete()
upstream.onNext(6)
opening.onNext(3)
upstream.onNext(7)
upstream.onNext(8)
upstream.onNext(9)
upstream.onComplete()

observer.assertValues(
listOf(2, 3, 4),
listOf(4, 5),
listOf(3, 4, 5, 6, 7),
listOf(7, 8, 9),
)
}
}

0 comments on commit f1f1738

Please sign in to comment.