Skip to content

Commit

Permalink
[SPARK-46193][CORE][TESTS] Add PersistenceEngineBenchmark
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR aims to provide a new benchmark, `PersistenceEngineBenchmark`.

### Why are the changes needed?

This is beneficial for both the developers and the users by providing a consistent measurement.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manual review.

```
$ build/sbt "core/Test/runMain org.apache.spark.deploy.master.PersistenceEngineBenchmark"
...
[info] OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Mac OS X 14.2
[info] Apple M1 Max
[info] 1000 Workers:                             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] ZooKeeperPersistenceEngine                        11179          11198          20          0.0    11179348.5       1.0X
[info] FileSystemPersistenceEngine                         416            422           6          0.0      415745.2      26.9X
[info] BlackHolePersistenceEngine                            0              0           0         22.7          44.1  253597.7X
```

```
$ bin/spark-submit --driver-memory 6g --class org.apache.spark.deploy.master.PersistenceEngineBenchmark --jars `find ~/Library/Caches/Coursier/v1 -name 'curator-test-*.jar'` core/target/scala-2.13/spark-core_2.13-4.0.0-SNAPSHOT-tests.jar
...
OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Mac OS X 14.2
Apple M1 Max
1000 Workers:                             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
ZooKeeperPersistenceEngine                        11565          11857         373          0.0    11564757.8       1.0X
FileSystemPersistenceEngine                         426            426           1          0.0      425605.0      27.2X
BlackHolePersistenceEngine                            0              0           0         27.4          36.5  316478.5X
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44102 from dongjoon-hyun/SPARK-46193.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
dongjoon-hyun committed Dec 1, 2023
1 parent e93bff6 commit 0e68961
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ jobs:
# In benchmark, we use local as master so set driver memory only. Note that GitHub Actions has 7 GB memory limit.
bin/spark-submit \
--driver-memory 6g --class org.apache.spark.benchmark.Benchmarks \
--jars "`find . -name '*-SNAPSHOT-tests.jar' -o -name '*avro*-SNAPSHOT.jar' | paste -sd ',' -`" \
--jars "`find . -name '*-SNAPSHOT-tests.jar' -o -name '*avro*-SNAPSHOT.jar' | paste -sd ',' -`,`find ~/.cache/coursier -name 'curator-test-*.jar'`" \
"`find . -name 'spark-core*-SNAPSHOT-tests.jar'`" \
"${{ github.event.inputs.class }}"
# To keep the directory structure and file permissions, tar them
Expand Down
13 changes: 13 additions & 0 deletions core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
================================================================================================
PersistenceEngineBenchmark
================================================================================================

OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1051-azure
AMD EPYC 7763 64-Core Processor
1000 Workers: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
ZooKeeperPersistenceEngine 1183 1266 129 0.0 1183158.2 1.0X
FileSystemPersistenceEngine 218 222 4 0.0 218005.2 5.4X
BlackHolePersistenceEngine 0 0 0 29.5 34.0 34846.9X


13 changes: 13 additions & 0 deletions core/benchmarks/PersistenceEngineBenchmark-results.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
================================================================================================
PersistenceEngineBenchmark
================================================================================================

OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure
AMD EPYC 7763 64-Core Processor
1000 Workers: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
ZooKeeperPersistenceEngine 1086 1215 162 0.0 1085606.9 1.0X
FileSystemPersistenceEngine 224 225 1 0.0 223834.2 4.9X
BlackHolePersistenceEngine 0 0 0 40.7 24.6 44209.4X


Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.master

import java.net.ServerSocket
import java.util.concurrent.ThreadLocalRandom

import org.apache.curator.test.TestingServer

import org.apache.spark.SparkConf
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
import org.apache.spark.internal.config.Deploy.ZOOKEEPER_URL
import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.util.Utils


/**
* Benchmark for PersistenceEngines.
* To run this benchmark:
* {{{
* 1. without sbt:
* bin/spark-submit --class <this class> --jars `find ~/.cache/coursier \
* -name 'curator-test-*.jar'` <spark core test jar>
* 2. build/sbt "core/Test/runMain <this class>"
* 3. generate result:
* SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/Test/runMain <this class>"
* Results will be written to "benchmarks/PersistenceEngineBenchmark-results.txt".
* }}}
* */
object PersistenceEngineBenchmark extends BenchmarkBase {

val conf = new SparkConf()
val serializerJava = new JavaSerializer(conf)
val zkTestServer = new TestingServer(findFreePort(conf))

override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {

val numIters = 3
val numWorkers = 1000
val workers = (1 to numWorkers).map(createWorkerInfo).toArray

conf.set(ZOOKEEPER_URL, zkTestServer.getConnectString)

runBenchmark("PersistenceEngineBenchmark") {
val benchmark = new Benchmark(s"$numWorkers Workers", numWorkers, output = output)

benchmark.addCase("ZooKeeperPersistenceEngine", numIters) { _ =>
val engine = new ZooKeeperPersistenceEngine(conf, serializerJava)
workers.foreach(engine.addWorker)
engine.read[WorkerInfo]("worker_")
workers.foreach(engine.removeWorker)
engine.close()
}

benchmark.addCase("FileSystemPersistenceEngine", numIters) { _ =>
val dir = Utils.createTempDir().getAbsolutePath
val engine = new FileSystemPersistenceEngine(dir, serializerJava)
workers.foreach(engine.addWorker)
engine.read[WorkerInfo]("worker_")
workers.foreach(engine.removeWorker)
engine.close()
}

benchmark.addCase("BlackHolePersistenceEngine", numIters) { _ =>
val engine = new BlackHolePersistenceEngine()
workers.foreach(engine.addWorker)
engine.read[WorkerInfo]("worker_")
workers.foreach(engine.removeWorker)
engine.close()
}

benchmark.run()
}
}

override def afterAll(): Unit = {
zkTestServer.stop()
}

private def createWorkerInfo(id: Int): WorkerInfo = {
val gpuResource = new WorkerResourceInfo(GPU, Seq("0", "1", "2"))
val fpgaResource = new WorkerResourceInfo(FPGA, Seq("3", "4", "5"))
val resources = Map(GPU -> gpuResource, FPGA -> fpgaResource)
val workerInfo = new WorkerInfo(s"worker-20231201000000-255.255.255.255-$id", "host", 8080, 4,
1234, null, "http://publicAddress:80", resources)
workerInfo.lastHeartbeat = System.currentTimeMillis()
workerInfo
}

def findFreePort(conf: SparkConf): Int = {
val candidatePort = ThreadLocalRandom.current().nextInt(1024, 65536)
Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
val socket = new ServerSocket(trialPort)
socket.close()
(null, trialPort)
}, conf)._2
}
}

0 comments on commit 0e68961

Please sign in to comment.