diff --git a/wrangler-transform/pom.xml b/wrangler-transform/pom.xml
index edea963e7..bb09467ea 100644
--- a/wrangler-transform/pom.xml
+++ b/wrangler-transform/pom.xml
@@ -10,6 +10,10 @@
wrangler-transform
Wrangler Transform
+
+ 2.3.1
+
+
io.cdap.wrangler
@@ -28,6 +32,68 @@
${cdap.version}
provided
+
+ io.cdap.cdap
+ cdap-etl-api-spark
+ ${cdap.version}
+ provided
+
+
+ io.cdap.cdap
+ cdap-api-spark2_2.11
+ ${cdap.version}
+ provided
+
+
+ org.apache.spark
+ spark-sql_2.11
+ ${spark.version}
+ provided
+
+
+ org.apache.spark
+ spark-core_2.11
+ ${spark.version}
+ provided
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ log4j
+ log4j
+
+
+ org.apache.hadoop
+ hadoop-client
+
+
+ com.esotericsoftware.reflectasm
+ reflectasm
+
+
+ org.apache.curator
+ curator-recipes
+
+
+ org.tachyonproject
+ tachyon-client
+
+
+ org.scala-lang
+ scala-compiler
+
+
+ org.eclipse.jetty.orbit
+ javax.servlet
+
+
+ net.java.dev.jets3t
+ jets3t
+
+
+
com.google.guava
guava
diff --git a/wrangler-transform/src/main/java/io/cdap/wrangler/SQLWrangler.java b/wrangler-transform/src/main/java/io/cdap/wrangler/SQLWrangler.java
new file mode 100644
index 000000000..e27289000
--- /dev/null
+++ b/wrangler-transform/src/main/java/io/cdap/wrangler/SQLWrangler.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright © 2021 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package io.cdap.wrangler;
+
+import io.cdap.cdap.api.annotation.RuntimeImplementation;
+import io.cdap.cdap.etl.api.dl.DLDataSet;
+import io.cdap.cdap.etl.api.dl.DLExpressionFactory;
+import io.cdap.cdap.etl.api.dl.DLPluginContext;
+import io.cdap.cdap.etl.api.dl.SimpleDLPluginRuntimeImplementation;
+import io.cdap.cdap.etl.api.engine.sql.StandardSQLCapabilities;
+import io.cdap.wrangler.Wrangler.Config;
+
+/**
+ * Wrangler implementation with BQ pushdown. Currently it's not implemented
+ */
+@RuntimeImplementation(pluginClass = Wrangler.class, order = Wrangler.ORDER_SQL)
+public class SQLWrangler implements SimpleDLPluginRuntimeImplementation {
+
+ private final Config config;
+
+ public SQLWrangler(Config config) {
+ this.config = config;
+ if (!Config.EL_SQL.equalsIgnoreCase(config.getExpressionLanguage())) {
+ //Fall back
+ throw new IllegalStateException("This implementation runs for SQL language");
+ }
+ if (config.getDirectives() != null && !config.getDirectives().trim().isEmpty()) {
+ throw new IllegalStateException("We only run this for empty directives list");
+ }
+ }
+
+ @Override
+ public void initialize(DLPluginContext context) throws Exception {
+ if (!context.getDLContext().getExpressionFactory(StandardSQLCapabilities.SQL).isPresent()) {
+ throw new IllegalStateException("Expression language does not implement SQL");
+ }
+ }
+
+ @Override
+ public DLDataSet transform(DLPluginContext context, DLDataSet input) {
+ DLExpressionFactory expressionFactory = context.getDLContext()
+ .getExpressionFactory(StandardSQLCapabilities.SQL).get();
+ return input.filter(expressionFactory.compile(config.getPrecondition()));
+ }
+}
diff --git a/wrangler-transform/src/main/java/io/cdap/wrangler/SparkWrangler.java b/wrangler-transform/src/main/java/io/cdap/wrangler/SparkWrangler.java
new file mode 100644
index 000000000..d3daa88cc
--- /dev/null
+++ b/wrangler-transform/src/main/java/io/cdap/wrangler/SparkWrangler.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright © 2021 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package io.cdap.wrangler;
+
+import io.cdap.cdap.api.annotation.RuntimeImplementation;
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.api.spark.sql.DataFrames;
+import io.cdap.cdap.etl.api.batch.SparkCompute;
+import io.cdap.cdap.etl.api.batch.SparkExecutionPluginContext;
+import io.cdap.wrangler.Wrangler.Config;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * Wrangler implementation with Spark SQL. Currently it has only precondition support
+ */
+@RuntimeImplementation(pluginClass = Wrangler.class, order = Wrangler.ORDER_SPARK)
+public class SparkWrangler extends SparkCompute {
+
+ private final Wrangler.Config config;
+
+ public SparkWrangler(Config config) {
+ this.config = config;
+ if (!Config.EL_SQL.equalsIgnoreCase(config.getExpressionLanguage())) {
+ //Fall back
+ throw new IllegalStateException("This implementation runs for SQL language");
+ }
+ if (config.getDirectives() != null && !config.getDirectives().trim().isEmpty()) {
+ throw new IllegalStateException("We only run this for empty directives list");
+ }
+ }
+
+ @Override
+ public JavaRDD transform(SparkExecutionPluginContext context,
+ JavaRDD input) throws Exception {
+ SparkSession session = SparkSession.builder().sparkContext(context.getSparkContext().sc())
+ .getOrCreate();
+ StructType inputSchema = DataFrames.toDataType(context.getInputSchema());
+ Schema outputSchema = context.getOutputSchema();
+ JavaRDD rowRDD = input.map(record -> DataFrames.toRow(record, inputSchema));
+ Dataset df = session.createDataFrame(rowRDD, inputSchema);
+ Dataset result = df.filter(config.getPrecondition());
+ return result.javaRDD()
+ .map(r -> DataFrames.fromRow(r, outputSchema));
+ }
+}
diff --git a/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java b/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java
index 6d4fa8358..cfc987ef5 100644
--- a/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java
+++ b/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java
@@ -21,6 +21,7 @@
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
+import io.cdap.cdap.api.annotation.RuntimeImplementation;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.plugin.PluginConfig;
@@ -82,6 +83,7 @@
@Plugin(type = "transform")
@Name("Wrangler")
@Description("Wrangler - A interactive tool for data cleansing and transformation.")
+@RuntimeImplementation(pluginClass = Wrangler.class, order = Wrangler.ORDER_TRANSFORM)
public class Wrangler extends Transform {
private static final Logger LOG = LoggerFactory.getLogger(Wrangler.class);
@@ -92,6 +94,10 @@ public class Wrangler extends Transform {
private static final String ON_ERROR_DEFAULT = "fail-pipeline";
private static final String ERROR_STRATEGY_DEFAULT = "wrangler.error.strategy.default";
+ static final int ORDER_SQL = 1;
+ static final int ORDER_SPARK = 2;
+ static final int ORDER_TRANSFORM = 3;
+
// Plugin configuration.
private final Config config;
@@ -288,6 +294,10 @@ public void prepareRun(StageSubmitterContext context) throws Exception {
public void initialize(TransformContext context) throws Exception {
super.initialize(context);
+ if (Config.EL_SQL.equalsIgnoreCase(config.getExpressionLanguage())) {
+ throw new IllegalStateException("SQL is not supported in MR implementation");
+ }
+
// Parse DSL and initialize the wrangle pipeline.
store = new DefaultTransientStore();
RecipeParser recipe = getRecipeParser(context);
@@ -480,6 +490,9 @@ public static class Config extends PluginConfig {
static final String NAME_UDD = "udd";
static final String NAME_SCHEMA = "schema";
static final String NAME_ON_ERROR = "on-error";
+ static final String NAME_EL = "expression-language";
+
+ static final String EL_SQL = "sql";
@Name(NAME_PRECONDITION)
@Description("Precondition expression specifying filtering before applying directives (true to filter)")
@@ -513,14 +526,21 @@ public static class Config extends PluginConfig {
@Nullable
private final String onError;
+ @Name(NAME_EL)
+ @Description("Expression language to use (jexl, sql). Currently applies to precondition only.")
+ @Macro
+ @Nullable
+ private String expressionLanguage;
+
public Config(String precondition, String directives, String udds,
- String field, String schema, String onError) {
+ String field, String schema, String onError, String expressionLanguage) {
this.precondition = precondition;
this.directives = directives;
this.udds = udds;
this.field = field;
this.schema = schema;
this.onError = onError;
+ this.expressionLanguage = expressionLanguage;
}
/**
@@ -529,6 +549,20 @@ public Config(String precondition, String directives, String udds,
public String getOnError() {
return onError == null ? ON_ERROR_DEFAULT : onError;
}
+
+ @Nullable
+ public String getExpressionLanguage() {
+ return expressionLanguage;
+ }
+
+ public String getPrecondition() {
+ return precondition;
+ }
+
+ @Nullable
+ public String getDirectives() {
+ return directives;
+ }
}
}