-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathChallenge3RxJava2ExampleWithThrotteling.java
47 lines (38 loc) · 2.25 KB
/
Challenge3RxJava2ExampleWithThrotteling.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package challenge3.rxjava2;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import org.apache.commons.lang3.tuple.Triple;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import static externalLegacyCodeNotUnderOurControl.PrintlnWithThreadname.println;
// https://github.com/ReactiveMeetupLucerne/AsyncNonBlockingExamplesJVM/issues/9
public class Challenge3RxJava2ExampleWithThrotteling {
public static void main(String[] args) throws InterruptedException {
Observable<Integer> speedyTemperatureSource = Observable.range(1, 100_000_000)
.subscribeOn(Schedulers.io())
.map(tick -> ThreadLocalRandom.current().nextInt(20, 25))
.doOnComplete(() -> println("Speedy temperature source created " + 100_000_000 + " random temperature values. Will do that again..."))
.repeat();
Observable<Triple<Integer, Integer, Integer>> minMaxValuesWithinWindow = speedyTemperatureSource
.sample(1, TimeUnit.MILLISECONDS) // throttle the speedy source a bit
.window(10, TimeUnit.SECONDS)
.flatMap(temperatureValuesWithinWindow ->
temperatureValuesWithinWindow.reduce(
Triple.of(Integer.MAX_VALUE, Integer.MIN_VALUE, 0 /* count */),
(minMaxCountTriple, tempValue) ->
Triple.of(Math.min(tempValue, minMaxCountTriple.getLeft()),
Math.max(tempValue, minMaxCountTriple.getMiddle()),
minMaxCountTriple.getRight() + 1)
).toObservable()
);
minMaxValuesWithinWindow.subscribe(minMaxCountTriple ->
println("Within 10 seconds window: Min="
+ minMaxCountTriple.getLeft()
+ "°Celsius, Max=" + minMaxCountTriple.getMiddle()
+ "°Celsius. Had " + minMaxCountTriple.getRight()
+ " values in window. "
+ "Calculated async and non-blocking TM :-)"));
println("I wasn't blocked");
TimeUnit.MINUTES.sleep(1);
}
}