-
Notifications
You must be signed in to change notification settings - Fork 0
/
spark-accumulator.scala
32 lines (24 loc) · 1.11 KB
/
spark-accumulator.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// For accumulators used in actions, Spark applies each task’s update to each accumulator only once.
// For accumulators used in RDD transformations instead of actions, this guarantee does not exist for speculative execution
// Within transformations, accumulators should, conse‐ quently, be used only for debugging purposes.
// commutative if a op b = b op a
// associative if (a op b) op c = a op (b op c)
val file = sc.textFile("file:///Users/Zhenglai/git/spark/README.md")
// init the accumulator as Accumulator[Int] with 0 init value
val blankLines = sc.accumulator(0)
val lines = file flatMap { line => {
if (line == "") {
// add to accumulator, counting blanklines (errors, debugging events... counting)
// worker code in spark closures, accumulator is write-only here
blankLines += 1
}
line.split(" ")
}
}
// lines.saveAsTextFile("wordList.txt")
// driver get the value
println(s"Blank lines: ${blankLines.value}")
// another way of 2 passes
val file2 = sc.textFile("file:///Users/Zhenglai/git/spark/README.md")
file2 filter (_ == "") count
file2 filter (_ != "") flatMap (_.split(" "))