-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAkkaStreamFlowExample.scala
53 lines (41 loc) · 1.57 KB
/
AkkaStreamFlowExample.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package challenge1.akka_stream
import java.util.concurrent.TimeUnit
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import externalLegacyCodeNotUnderOurControl.PriceService
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
/**
* Challenge 1: combining the results of "parallel" calls
*
* Akka Stream Example that uses Flows/maps to decompose functionality.
* See 'Clean Code: same level of abstraction'.
* Created by pascal.mengelt on 01.12.2016.
*/
object AkkaStreamFlowExample extends App {
implicit val system = ActorSystem("AkkaStreams")
implicit val materializer = ActorMaterializer()
val serviceCount = 20
val start = System.currentTimeMillis()
// self documenting process
source
.via(createPriceServices)
.via(getPrices)
.runWith(collectPrices)
.map(calcAverage)
.foreach(printAverage)
TimeUnit.SECONDS.sleep(10)
system.terminate()
// implementation of each step
// Source
private lazy val source = Source.fromIterator(() => (1 to serviceCount).iterator)
// Flows
private lazy val createPriceServices = Flow[Int].map(_ => new PriceService())
private lazy val getPrices = Flow[PriceService].mapAsyncUnordered(serviceCount)(s => Future(s.getPrice))
// Sink
private lazy val collectPrices = Sink.seq[Int]
// Post Process
private def calcAverage(prices: Seq[Int]) = prices.sum / serviceCount
private def printAverage(average: Int) = println(s"The average price is $average (${System.currentTimeMillis() - start} ms): " + Thread.currentThread().getName)
}