Skip to content

Commit

Permalink
[VL] Add VeloxTransitionSuite (apache#7324)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer authored and shamirchen committed Oct 14, 2024
1 parent 4afe45b commit b685081
Show file tree
Hide file tree
Showing 10 changed files with 379 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.apache.gluten.GlutenBuildInfo._
import org.apache.gluten.GlutenConfig
import org.apache.gluten.backend.Backend
import org.apache.gluten.backendsapi._
import org.apache.gluten.columnarbatch.CHBatch
import org.apache.gluten.execution.WriteFilesExecTransformer
import org.apache.gluten.expression.WindowFunctionsBuilder
import org.apache.gluten.extension.ValidationResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,41 +14,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.backendsapi
package org.apache.gluten.columnarbatch

import org.apache.gluten.extension.columnar.transition.Convention

import org.apache.spark.sql.execution.{CHColumnarToRowExec, RowToCHNativeColumnarExec, SparkPlan}

package object clickhouse {

/**
* ClickHouse batch convention.
*
* [[fromRow]] and [[toRow]] need a [[TransitionDef]] instance. The scala allows an compact way to
* implement trait using a lambda function.
*
* Here the detail definition is given in [[CHBatch.fromRow]].
* {{{
* fromRow(new TransitionDef {
* override def create(): Transition = new Transition {
* override protected def apply0(plan: SparkPlan): SparkPlan =
* RowToCHNativeColumnarExec(plan)
* }
* })
* }}}
*/
case object CHBatch extends Convention.BatchType {
fromRow(
() =>
(plan: SparkPlan) => {
RowToCHNativeColumnarExec(plan)
})
/**
* ClickHouse batch convention.
*
* [[fromRow]] and [[toRow]] need a [[TransitionDef]] instance. The scala allows an compact way to
* implement trait using a lambda function.
*
* Here the detail definition is given in [[CHBatch.fromRow]].
* {{{
* fromRow(new TransitionDef {
* override def create(): Transition = new Transition {
* override protected def apply0(plan: SparkPlan): SparkPlan =
* RowToCHNativeColumnarExec(plan)
* }
* })
* }}}
*/
object CHBatch extends Convention.BatchType {
fromRow(
() =>
(plan: SparkPlan) => {
RowToCHNativeColumnarExec(plan)
})

toRow(
() =>
(plan: SparkPlan) => {
CHColumnarToRowExec(plan)
})
}
toRow(
() =>
(plan: SparkPlan) => {
CHColumnarToRowExec(plan)
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.apache.gluten.GlutenBuildInfo._
import org.apache.gluten.GlutenConfig
import org.apache.gluten.backend.Backend
import org.apache.gluten.backendsapi._
import org.apache.gluten.columnarbatch.VeloxBatch
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution.WriteFilesExecTransformer
import org.apache.gluten.expression.WindowFunctionsBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.backendsapi
package org.apache.gluten.columnarbatch

import org.apache.gluten.columnarbatch.ArrowBatch
import org.apache.gluten.execution.{RowToVeloxColumnarExec, VeloxColumnarToRowExec}
import org.apache.gluten.extension.columnar.transition.{Convention, TransitionDef}

import org.apache.spark.sql.execution.SparkPlan

package object velox {
case object VeloxBatch extends Convention.BatchType {
fromRow(
() =>
(plan: SparkPlan) => {
RowToVeloxColumnarExec(plan)
})
object VeloxBatch extends Convention.BatchType {
fromRow(
() =>
(plan: SparkPlan) => {
RowToVeloxColumnarExec(plan)
})

toRow(
() =>
(plan: SparkPlan) => {
VeloxColumnarToRowExec(plan)
})
toRow(
() =>
(plan: SparkPlan) => {
VeloxColumnarToRowExec(plan)
})

// Velox batch is considered one-way compatible with Arrow batch.
// This is practically achieved by utilizing C++ API VeloxColumnarBatch::from at runtime.
fromBatch(ArrowBatch, TransitionDef.empty)
}
// Velox batch is considered one-way compatible with Arrow batch.
// This is practically achieved by utilizing C++ API VeloxColumnarBatch::from at runtime.
fromBatch(ArrowBatch, TransitionDef.empty)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.test;

import org.apache.gluten.GlutenConfig;

import com.codahale.metrics.MetricRegistry;
import org.apache.spark.SparkConf;
import org.apache.spark.api.plugin.PluginContext;
import org.apache.spark.resource.ResourceInformation;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.util.Map;

public final class MockVeloxBackend {
public static PluginContext mockPluginContext() {
return new PluginContext() {
@Override
public MetricRegistry metricRegistry() {
throw new UnsupportedOperationException();
}

@Override
public SparkConf conf() {
return newSparkConf();
}

@Override
public String executorID() {
throw new UnsupportedOperationException();
}

@Override
public String hostname() {
throw new UnsupportedOperationException();
}

@Override
public Map<String, ResourceInformation> resources() {
throw new UnsupportedOperationException();
}

@Override
public void send(Object message) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public Object ask(Object message) throws Exception {
throw new UnsupportedOperationException();
}
};
}

@NotNull
private static SparkConf newSparkConf() {
final SparkConf conf = new SparkConf();
conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY(), "1g");
return conf;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,77 +16,22 @@
*/
package org.apache.gluten.test;

import org.apache.gluten.GlutenConfig;
import org.apache.gluten.backendsapi.ListenerApi;
import org.apache.gluten.backendsapi.velox.VeloxListenerApi;

import com.codahale.metrics.MetricRegistry;
import org.apache.spark.SparkConf;
import org.apache.spark.api.plugin.PluginContext;
import org.apache.spark.resource.ResourceInformation;
import org.jetbrains.annotations.NotNull;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.io.IOException;
import java.util.Map;

public abstract class VeloxBackendTestBase {
private static final ListenerApi API = new VeloxListenerApi();

@BeforeClass
public static void setup() {
API.onExecutorStart(mockPluginContext());
API.onExecutorStart(MockVeloxBackend.mockPluginContext());
}

@AfterClass
public static void tearDown() {
API.onExecutorShutdown();
}

private static PluginContext mockPluginContext() {
return new PluginContext() {
@Override
public MetricRegistry metricRegistry() {
throw new UnsupportedOperationException();
}

@Override
public SparkConf conf() {
return newSparkConf();
}

@Override
public String executorID() {
throw new UnsupportedOperationException();
}

@Override
public String hostname() {
throw new UnsupportedOperationException();
}

@Override
public Map<String, ResourceInformation> resources() {
throw new UnsupportedOperationException();
}

@Override
public void send(Object message) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public Object ask(Object message) throws Exception {
throw new UnsupportedOperationException();
}
};
}

@NotNull
private static SparkConf newSparkConf() {
final SparkConf conf = new SparkConf();
conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY(), "1g");
return conf;
}
}
Loading

0 comments on commit b685081

Please sign in to comment.