Skip to content

Commit

Permalink
[VL] Gluten-it: Reuse Spark sessions that share same configuration (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer authored Jun 19, 2024
1 parent c9a6648 commit 7ac9983
Show file tree
Hide file tree
Showing 13 changed files with 440 additions and 410 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/velox_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ jobs:
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx6G sbin/gluten-it.sh queries \
--local --preset=velox --benchmark-type=ds --error-on-memleak -s=30.0 --off-heap-size=8g --threads=12 --shuffle-partitions=72 --iterations=1 \
--skip-data-gen --random-kill-tasks
--skip-data-gen --random-kill-tasks --no-session-reuse
# run-tpc-test-ubuntu-sf30:
# needs: build-native-lib-centos-7
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@

import com.google.common.base.Preconditions;
import org.apache.gluten.integration.BaseMixin;
import org.apache.gluten.integration.action.Dim;
import org.apache.gluten.integration.action.DimKv;
import org.apache.gluten.integration.action.DimValue;
import org.apache.commons.lang3.ArrayUtils;
import picocli.CommandLine;
import scala.Tuple2;
Expand Down Expand Up @@ -67,17 +64,17 @@ public class Parameterized implements Callable<Integer> {
public Integer call() throws Exception {
final Map<String, Map<String, List<Map.Entry<String, String>>>> parsed = new LinkedHashMap<>();

final Seq<scala.collection.immutable.Set<DimKv>> excludedCombinations = JavaConverters.asScalaBufferConverter(Arrays.stream(excludedDims).map(d -> {
final Seq<scala.collection.immutable.Set<org.apache.gluten.integration.action.Parameterized.DimKv>> excludedCombinations = JavaConverters.asScalaBufferConverter(Arrays.stream(excludedDims).map(d -> {
final Matcher m = excludedDimsPattern.matcher(d);
Preconditions.checkArgument(m.matches(), "Unrecognizable excluded dims: " + d);
Set<DimKv> out = new HashSet<>();
Set<org.apache.gluten.integration.action.Parameterized.DimKv> out = new HashSet<>();
final String[] dims = d.split(",");
for (String dim : dims) {
final String[] kv = dim.split(":");
Preconditions.checkArgument(kv.length == 2, "Unrecognizable excluded dims: " + d);
out.add(new DimKv(kv[0], kv[1]));
out.add(new org.apache.gluten.integration.action.Parameterized.DimKv(kv[0], kv[1]));
}
return JavaConverters.asScalaSetConverter(out).asScala().<DimKv>toSet();
return JavaConverters.asScalaSetConverter(out).asScala().<org.apache.gluten.integration.action.Parameterized.DimKv>toSet();
}).collect(Collectors.toList())).asScala();

// parse dims
Expand Down Expand Up @@ -121,19 +118,19 @@ public Integer call() throws Exception {
}

// Convert Map<String, Map<String, List<Map.Entry<String, String>>>> to List<Dim>
Seq<Dim> parsedDims = JavaConverters.asScalaBufferConverter(
Seq<org.apache.gluten.integration.action.Parameterized.Dim> parsedDims = JavaConverters.asScalaBufferConverter(
parsed.entrySet().stream().map(e ->
new Dim(e.getKey(), JavaConverters.asScalaBufferConverter(
new org.apache.gluten.integration.action.Parameterized.Dim(e.getKey(), JavaConverters.asScalaBufferConverter(
e.getValue().entrySet().stream().map(e2 ->
new DimValue(e2.getKey(), JavaConverters.asScalaBufferConverter(
new org.apache.gluten.integration.action.Parameterized.DimValue(e2.getKey(), JavaConverters.asScalaBufferConverter(
e2.getValue().stream().map(e3 -> new Tuple2<>(e3.getKey(), e3.getValue()))
.collect(Collectors.toList())).asScala())).collect(Collectors.toList())).asScala()
)).collect(Collectors.toList())).asScala();

org.apache.gluten.integration.action.Parameterized parameterized =
new org.apache.gluten.integration.action.Parameterized(dataGenMixin.getScale(),
dataGenMixin.genPartitionedData(), queriesMixin.queries(),
queriesMixin.explain(), queriesMixin.iterations(), warmupIterations, parsedDims,
queriesMixin.explain(), queriesMixin.iterations(), warmupIterations, queriesMixin.noSessionReuse(), parsedDims,
excludedCombinations, metrics);
return mixin.runActions(ArrayUtils.addAll(dataGenMixin.makeActions(), parameterized));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class Queries implements Callable<Integer> {
public Integer call() throws Exception {
org.apache.gluten.integration.action.Queries queries =
new org.apache.gluten.integration.action.Queries(dataGenMixin.getScale(), dataGenMixin.genPartitionedData(), queriesMixin.queries(),
queriesMixin.explain(), queriesMixin.iterations(), randomKillTasks);
queriesMixin.explain(), queriesMixin.iterations(), randomKillTasks, queriesMixin.noSessionReuse());
return mixin.runActions(ArrayUtils.addAll(dataGenMixin.makeActions(), queries));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public Integer call() throws Exception {
org.apache.gluten.integration.action.QueriesCompare queriesCompare =
new org.apache.gluten.integration.action.QueriesCompare(dataGenMixin.getScale(),
dataGenMixin.genPartitionedData(), queriesMixin.queries(),
queriesMixin.explain(), queriesMixin.iterations());
queriesMixin.explain(), queriesMixin.iterations(), queriesMixin.noSessionReuse());
return mixin.runActions(ArrayUtils.addAll(dataGenMixin.makeActions(), queriesCompare));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public class QueriesMixin {
@CommandLine.Option(names = {"--iterations"}, description = "How many iterations to run", defaultValue = "1")
private int iterations;

@CommandLine.Option(names = {"--no-session-reuse"}, description = "Recreate new Spark session each time a query is about to run", defaultValue = "false")
private boolean noSessionReuse;

public boolean explain() {
return explain;
}
Expand All @@ -50,6 +53,10 @@ public int iterations() {
return iterations;
}

public boolean noSessionReuse() {
return noSessionReuse;
}

public Actions.QuerySelector queries() {
return new Actions.QuerySelector() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
package org.apache.gluten.integration

import com.google.common.base.Preconditions
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.spark.sql.{RunResult, SparkQueryRunner, SparkSession}

import java.io.File

class QueryRunner(val queryResourceFolder: String, val dataPath: String) {
import QueryRunner._

Preconditions.checkState(
new File(dataPath).exists(),
s"Data not found at $dataPath, try using command `<gluten-it> data-gen-only <options>` to generate it first.",
Expand All @@ -37,10 +40,54 @@ class QueryRunner(val queryResourceFolder: String, val dataPath: String) {
caseId: String,
explain: Boolean = false,
metrics: Array[String] = Array(),
randomKillTasks: Boolean = false): RunResult = {
randomKillTasks: Boolean = false): QueryResult = {
val path = "%s/%s.sql".format(queryResourceFolder, caseId)
SparkQueryRunner.runQuery(spark, desc, path, explain, metrics, randomKillTasks)
try {
val r = SparkQueryRunner.runQuery(spark, desc, path, explain, metrics, randomKillTasks)
println(s"Successfully ran query $caseId. Returned row count: ${r.rows.length}")
Success(caseId, r)
} catch {
case e: Exception =>
println(s"Error running query $caseId. Error: ${ExceptionUtils.getStackTrace(e)}")
Failure(caseId, e)
}
}
}

object QueryRunner {}
object QueryRunner {
sealed trait QueryResult {
def caseId(): String
def succeeded(): Boolean
}

implicit class QueryResultOps(r: QueryResult) {
def asSuccessOption(): Option[Success] = {
r match {
case s: Success => Some(s)
case _: Failure => None
}
}

def asFailureOption(): Option[Failure] = {
r match {
case _: Success => None
case f: Failure => Some(f)
}
}

def asSuccess(): Success = {
asSuccessOption().get
}

def asFailure(): Failure = {
asFailureOption().get
}
}

case class Success(override val caseId: String, runResult: RunResult) extends QueryResult {
override def succeeded(): Boolean = true
}
case class Failure(override val caseId: String, error: Exception) extends QueryResult {
override def succeeded(): Boolean = false
}
}
Loading

0 comments on commit 7ac9983

Please sign in to comment.