-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAkkaStreamExample.scala
41 lines (34 loc) · 1.16 KB
/
AkkaStreamExample.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package challenge1.akka_stream
import java.util.concurrent.TimeUnit
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import externalLegacyCodeNotUnderOurControl.PriceService
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
/**
* Challenge 1: combining the results of "parallel" calls
*
* Simple Akka Stream Example
* Created by pascal.mengelt on 29.11.2016.
*/
object AkkaStreamExample extends App {
implicit val system = ActorSystem("AkkaStreams")
implicit val materializer = ActorMaterializer()
val serviceCount = 20
val start = System.currentTimeMillis()
// create Price Services
Source.fromIterator(() => (1 to serviceCount).iterator)
.map(_ => new PriceService())
// call services
.mapAsyncUnordered(serviceCount) (s => Future(s.getPrice))
// collect the result
.runWith(Sink.seq)
// calc the average
.map(_.sum / serviceCount)
// print the result
.foreach(price =>
println(s"The average price is $price (${System.currentTimeMillis() - start} ms): " + Thread.currentThread().getName))
TimeUnit.SECONDS.sleep(10)
system.terminate()
}