diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 8e7551fa7738a..3cb63404bcacf 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -177,7 +177,7 @@ jobs: # In benchmark, we use local as master so set driver memory only. Note that GitHub Actions has 7 GB memory limit. bin/spark-submit \ --driver-memory 6g --class org.apache.spark.benchmark.Benchmarks \ - --jars "`find . -name '*-SNAPSHOT-tests.jar' -o -name '*avro*-SNAPSHOT.jar' | paste -sd ',' -`" \ + --jars "`find . -name '*-SNAPSHOT-tests.jar' -o -name '*avro*-SNAPSHOT.jar' | paste -sd ',' -`,`find ~/.cache/coursier -name 'curator-test-*.jar'`" \ "`find . -name 'spark-core*-SNAPSHOT-tests.jar'`" \ "${{ github.event.inputs.class }}" # To keep the directory structure and file permissions, tar them diff --git a/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt b/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt new file mode 100644 index 0000000000000..3312d6feff885 --- /dev/null +++ b/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt @@ -0,0 +1,13 @@ +================================================================================================ +PersistenceEngineBenchmark +================================================================================================ + +OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1051-azure +AMD EPYC 7763 64-Core Processor +1000 Workers: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +ZooKeeperPersistenceEngine 1183 1266 129 0.0 1183158.2 1.0X +FileSystemPersistenceEngine 218 222 4 0.0 218005.2 5.4X +BlackHolePersistenceEngine 0 0 0 29.5 34.0 34846.9X + + diff --git a/core/benchmarks/PersistenceEngineBenchmark-results.txt b/core/benchmarks/PersistenceEngineBenchmark-results.txt new file mode 100644 index 0000000000000..684963f92e1f0 --- /dev/null +++ b/core/benchmarks/PersistenceEngineBenchmark-results.txt @@ -0,0 +1,13 @@ +================================================================================================ +PersistenceEngineBenchmark +================================================================================================ + +OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure +AMD EPYC 7763 64-Core Processor +1000 Workers: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +ZooKeeperPersistenceEngine 1086 1215 162 0.0 1085606.9 1.0X +FileSystemPersistenceEngine 224 225 1 0.0 223834.2 4.9X +BlackHolePersistenceEngine 0 0 0 40.7 24.6 44209.4X + + diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala new file mode 100644 index 0000000000000..9917be9b1c090 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +import java.net.ServerSocket +import java.util.concurrent.ThreadLocalRandom + +import org.apache.curator.test.TestingServer + +import org.apache.spark.SparkConf +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.internal.config.Deploy.ZOOKEEPER_URL +import org.apache.spark.resource.ResourceUtils.{FPGA, GPU} +import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.util.Utils + + +/** + * Benchmark for PersistenceEngines. + * To run this benchmark: + * {{{ + * 1. without sbt: + * bin/spark-submit --class --jars `find ~/.cache/coursier \ + * -name 'curator-test-*.jar'` + * 2. build/sbt "core/Test/runMain " + * 3. generate result: + * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/Test/runMain " + * Results will be written to "benchmarks/PersistenceEngineBenchmark-results.txt". + * }}} + * */ +object PersistenceEngineBenchmark extends BenchmarkBase { + + val conf = new SparkConf() + val serializerJava = new JavaSerializer(conf) + val zkTestServer = new TestingServer(findFreePort(conf)) + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + + val numIters = 3 + val numWorkers = 1000 + val workers = (1 to numWorkers).map(createWorkerInfo).toArray + + conf.set(ZOOKEEPER_URL, zkTestServer.getConnectString) + + runBenchmark("PersistenceEngineBenchmark") { + val benchmark = new Benchmark(s"$numWorkers Workers", numWorkers, output = output) + + benchmark.addCase("ZooKeeperPersistenceEngine", numIters) { _ => + val engine = new ZooKeeperPersistenceEngine(conf, serializerJava) + workers.foreach(engine.addWorker) + engine.read[WorkerInfo]("worker_") + workers.foreach(engine.removeWorker) + engine.close() + } + + benchmark.addCase("FileSystemPersistenceEngine", numIters) { _ => + val dir = Utils.createTempDir().getAbsolutePath + val engine = new FileSystemPersistenceEngine(dir, serializerJava) + workers.foreach(engine.addWorker) + engine.read[WorkerInfo]("worker_") + workers.foreach(engine.removeWorker) + engine.close() + } + + benchmark.addCase("BlackHolePersistenceEngine", numIters) { _ => + val engine = new BlackHolePersistenceEngine() + workers.foreach(engine.addWorker) + engine.read[WorkerInfo]("worker_") + workers.foreach(engine.removeWorker) + engine.close() + } + + benchmark.run() + } + } + + override def afterAll(): Unit = { + zkTestServer.stop() + } + + private def createWorkerInfo(id: Int): WorkerInfo = { + val gpuResource = new WorkerResourceInfo(GPU, Seq("0", "1", "2")) + val fpgaResource = new WorkerResourceInfo(FPGA, Seq("3", "4", "5")) + val resources = Map(GPU -> gpuResource, FPGA -> fpgaResource) + val workerInfo = new WorkerInfo(s"worker-20231201000000-255.255.255.255-$id", "host", 8080, 4, + 1234, null, "http://publicAddress:80", resources) + workerInfo.lastHeartbeat = System.currentTimeMillis() + workerInfo + } + + def findFreePort(conf: SparkConf): Int = { + val candidatePort = ThreadLocalRandom.current().nextInt(1024, 65536) + Utils.startServiceOnPort(candidatePort, (trialPort: Int) => { + val socket = new ServerSocket(trialPort) + socket.close() + (null, trialPort) + }, conf)._2 + } +}