diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index 4926a97eb824..bc16c2d77fe1 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -20,17 +20,16 @@ import org.apache.gluten.{GlutenConfig, GlutenNumaBindingInfo} import org.apache.gluten.backendsapi.IteratorApi import org.apache.gluten.execution._ import org.apache.gluten.expression.ConverterUtils -import org.apache.gluten.metrics.{GlutenTimeMetric, IMetrics, NativeMetrics} +import org.apache.gluten.metrics.{IMetrics, NativeMetrics} import org.apache.gluten.substrait.plan.PlanNode import org.apache.gluten.substrait.rel._ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.gluten.utils.LogLevelUtil import org.apache.gluten.vectorized.{CHNativeExpressionEvaluator, CloseableCHColumnBatchIterator, GeneralInIterator, GeneralOutIterator} -import org.apache.spark.{InterruptibleIterator, SparkConf, SparkContext, TaskContext} +import org.apache.spark.{InterruptibleIterator, SparkConf, TaskContext} import org.apache.spark.affinity.CHAffinity import org.apache.spark.internal.Logging -import org.apache.spark.rdd.RDD import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.datasources.FilePartition import org.apache.spark.sql.execution.metric.SQLMetric @@ -315,44 +314,6 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { context.addTaskCompletionListener[Unit](_ => close()) new CloseableCHColumnBatchIterator(resIter, Some(pipelineTime)) } - - /** Generate Native FileScanRDD, currently only for ClickHouse Backend. */ - override def genNativeFileScanRDD( - sparkContext: SparkContext, - wsCtx: WholeStageTransformContext, - splitInfos: Seq[SplitInfo], - scan: BasicScanExecTransformer, - numOutputRows: SQLMetric, - numOutputBatches: SQLMetric, - scanTime: SQLMetric): RDD[ColumnarBatch] = { - val substraitPlanPartition = GlutenTimeMetric.withMillisTime { - val planByteArray = wsCtx.root.toProtobuf.toByteArray - splitInfos.zipWithIndex.map { - case (splitInfo, index) => - val splitInfoByteArray = splitInfo match { - case filesNode: LocalFilesNode => - setFileSchemaForLocalFiles(filesNode, scan) - filesNode.setFileReadProperties(mapAsJavaMap(scan.getProperties)) - filesNode.toProtobuf.toByteArray - case extensionTableNode: ExtensionTableNode => - extensionTableNode.toProtobuf.toByteArray - } - - GlutenPartition( - index, - planByteArray, - Array(splitInfoByteArray), - locations = splitInfo.preferredLocations().asScala.toArray) - } - }(t => logInfo(s"Generating the Substrait plan took: $t ms.")) - - new NativeFileScanColumnarRDD( - sparkContext, - substraitPlanPartition, - numOutputRows, - numOutputBatches, - scanTime) - } } object CHIteratorApi { diff --git a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHParquetReadBenchmark.scala b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHParquetReadBenchmark.scala deleted file mode 100644 index dc1431fa64fa..000000000000 --- a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHParquetReadBenchmark.scala +++ /dev/null @@ -1,255 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.benchmarks - -import org.apache.gluten.GlutenConfig -import org.apache.gluten.backendsapi.BackendsApiManager -import org.apache.gluten.execution.{FileSourceScanExecTransformer, WholeStageTransformContext} -import org.apache.gluten.expression.ConverterUtils -import org.apache.gluten.sql.shims.SparkShimLoader -import org.apache.gluten.substrait.SubstraitContext -import org.apache.gluten.substrait.plan.PlanBuilder -import org.apache.gluten.vectorized.{CHBlockConverterJniWrapper, CHNativeBlock} - -import org.apache.spark.benchmark.Benchmark -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow} -import org.apache.spark.sql.execution.FileSourceScanExec -import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark -import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} -import org.apache.spark.sql.vectorized.ColumnarBatch - -import com.google.common.collect.Lists - -import scala.collection.JavaConverters._ - -/** - * Benchmark to measure Clickhouse parquet read performance. To run this benchmark: - * {{{ - * 1. Run in IDEA: run this class directly; - * 2. Run without IDEA: bin/spark-submit --class - * --jars ,, - * --conf xxxx=xxx - * backends-clickhouse-XXX-tests.jar - * parameters - * - * Parameters: - * 1. parquet files dir; - * 2. the count of the parquet file to read; - * 3. the fields to read; - * 4. the execution count; - * 5. whether to run vanilla spark benchmarks; - * }}} - */ -object CHParquetReadBenchmark extends SqlBasedBenchmark with CHSqlBasedBenchmark { - - protected lazy val appName = "CHParquetReadBenchmark" - protected lazy val thrdNum = "1" - protected lazy val memorySize = "4G" - protected lazy val offheapSize = "4G" - - def beforeAll(): Unit = {} - - override def getSparkSession: SparkSession = { - beforeAll() - val conf = getSparkConf - .setIfMissing("spark.sql.columnVector.offheap.enabled", "true") - .set("spark.gluten.sql.columnar.separate.scan.rdd.for.ch", "true") - - SparkSession.builder.config(conf).getOrCreate() - } - - override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { - val (parquetDir, readFileCnt, scanSchema, executedCnt, executedVanilla) = - if (mainArgs.isEmpty) { - ("/data/tpch-data/parquet/lineitem", 3, "l_orderkey,l_receiptdate", 5, true) - } else { - (mainArgs(0), mainArgs(1).toInt, mainArgs(2), mainArgs(3).toInt, mainArgs(4).toBoolean) - } - - val chParquet = spark.sql(s""" - |select $scanSchema from parquet.`$parquetDir` - | - |""".stripMargin) - - // Get the `FileSourceScanExecTransformer` - val chScanPlan = chParquet.queryExecution.executedPlan.collect { - case scan: FileSourceScanExecTransformer => scan - } - - val chFileScan = chScanPlan.head - val outputAttrs = chFileScan.outputAttributes() - val filePartitions = chFileScan.getPartitions - .take(readFileCnt) - .map(_.asInstanceOf[FilePartition]) - - val numOutputRows = chFileScan.longMetric("numOutputRows") - val numOutputVectors = chFileScan.longMetric("outputVectors") - val scanTime = chFileScan.longMetric("scanTime") - // Generate Substrait plan - val substraitContext = new SubstraitContext - val transformContext = chFileScan.transform(substraitContext) - val outNames = new java.util.ArrayList[String]() - for (attr <- outputAttrs) { - outNames.add(ConverterUtils.genColumnNameWithExprId(attr)) - } - val planNode = - PlanBuilder.makePlan(substraitContext, Lists.newArrayList(transformContext.root), outNames) - - val nativeFileScanRDD = BackendsApiManager.getIteratorApiInstance.genNativeFileScanRDD( - spark.sparkContext, - WholeStageTransformContext(planNode, substraitContext), - chFileScan.getSplitInfos, - chFileScan, - numOutputRows, - numOutputVectors, - scanTime - ) - - // Get the total row count - val chRowCnt = nativeFileScanRDD - .mapPartitionsInternal(batches => batches.map(batch => batch.numRows().toLong)) - .collect() - .sum - - val parquetReadBenchmark = - new Benchmark( - s"Parquet Read $readFileCnt files, fields: $scanSchema, total $chRowCnt records", - chRowCnt, - output = output) - - parquetReadBenchmark.addCase(s"ClickHouse Parquet Read", executedCnt) { - _ => - val resultRDD: RDD[Long] = nativeFileScanRDD.mapPartitionsInternal { - batches => - batches.map { - batch => - val block = CHNativeBlock.fromColumnarBatch(batch) - block.totalBytes() - block.close() - batch.numRows().toLong - } - } - resultRDD.collect() - } - - parquetReadBenchmark.addCase(s"ClickHouse Parquet Read to Rows", executedCnt) { - _ => - val resultRDD: RDD[Long] = nativeFileScanRDD.mapPartitionsInternal { - batches => - batches.map { - batch => - val block = CHNativeBlock.fromColumnarBatch(batch) - val info = - CHBlockConverterJniWrapper.convertColumnarToRow(block.blockAddress(), null) - new Iterator[InternalRow] { - var rowId = 0 - val row = new UnsafeRow(batch.numCols()) - var closed = false - - override def hasNext: Boolean = { - val result = rowId < batch.numRows() - if (!result && !closed) { - CHBlockConverterJniWrapper.freeMemory(info.memoryAddress, info.totalSize) - closed = true - } - result - } - - override def next: UnsafeRow = { - if (rowId >= batch.numRows()) throw new NoSuchElementException - - val (offset, length) = (info.offsets(rowId), info.lengths(rowId)) - row.pointTo(null, info.memoryAddress + offset, length.toInt) - rowId += 1 - row - } - }.foreach(_.numFields) - block.close() - - batch.numRows().toLong - } - } - resultRDD.collect() - } - - if (executedVanilla) { - spark.conf.set(GlutenConfig.GLUTEN_ENABLED.key, "false") - - val vanillaParquet = spark.sql(s""" - |select $scanSchema from parquet.`$parquetDir` - | - |""".stripMargin) - - val vanillaScanPlan = vanillaParquet.queryExecution.executedPlan.collect { - case scan: FileSourceScanExec => scan - } - - val fileScan = vanillaScanPlan.head - val fileScanOutput = fileScan.output - val relation = fileScan.relation - val readFile: PartitionedFile => Iterator[InternalRow] = - relation.fileFormat.buildReaderWithPartitionValues( - sparkSession = relation.sparkSession, - dataSchema = relation.dataSchema, - partitionSchema = relation.partitionSchema, - requiredSchema = fileScan.requiredSchema, - filters = Seq.empty, - options = relation.options, - hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options) - ) - - val newFileScanRDD = - SparkShimLoader.getSparkShims - .generateFileScanRDD(spark, readFile, filePartitions, fileScan) - .asInstanceOf[RDD[ColumnarBatch]] - - val rowCnt = newFileScanRDD - .mapPartitionsInternal(batches => batches.map(batch => batch.numRows().toLong)) - .collect() - .sum - assert(chRowCnt == rowCnt, "The row count of the benchmark is not equal.") - - parquetReadBenchmark.addCase(s"Vanilla Spark Parquet Read", executedCnt) { - _ => - val resultRDD: RDD[Long] = newFileScanRDD.mapPartitionsInternal { - batches => batches.map(_.numRows().toLong) - } - resultRDD.collect() - } - - parquetReadBenchmark.addCase(s"Vanilla Spark Parquet Read to Rows", executedCnt) { - _ => - val resultRDD: RDD[Long] = newFileScanRDD.mapPartitionsInternal { - batches => - val toUnsafe = UnsafeProjection.create(fileScanOutput, fileScanOutput) - batches.map { - batch => - // Convert to row and decode parquet value - batch.rowIterator().asScala.map(toUnsafe).foreach(_.numFields) - batch.numRows().toLong - } - } - resultRDD.collect() - } - } - - parquetReadBenchmark.run() - } -} diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index f1fbf3648bb2..5f9b5afa9976 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -27,9 +27,8 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.gluten.utils._ import org.apache.gluten.vectorized._ -import org.apache.spark.{SparkConf, SparkContext, TaskContext} +import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.internal.Logging -import org.apache.spark.rdd.RDD import org.apache.spark.softaffinity.SoftAffinity import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} @@ -232,16 +231,4 @@ class VeloxIteratorApi extends IteratorApi with Logging { .create() } // scalastyle:on argcount - - /** Generate Native FileScanRDD, currently only for ClickHouse Backend. */ - override def genNativeFileScanRDD( - sparkContext: SparkContext, - wsCxt: WholeStageTransformContext, - splitInfos: Seq[SplitInfo], - scan: BasicScanExecTransformer, - numOutputRows: SQLMetric, - numOutputBatches: SQLMetric, - scanTime: SQLMetric): RDD[ColumnarBatch] = { - throw new UnsupportedOperationException("Cannot support to generate Native FileScanRDD.") - } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala index d999948d7047..53dc8f47861f 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala @@ -24,7 +24,6 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.gluten.substrait.rel.SplitInfo import org.apache.spark._ -import org.apache.spark.rdd.RDD import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.StructType @@ -81,14 +80,4 @@ trait IteratorApi { partitionIndex: Int, materializeInput: Boolean = false): Iterator[ColumnarBatch] // scalastyle:on argcount - - /** Generate Native FileScanRDD, currently only for ClickHouse Backend. */ - def genNativeFileScanRDD( - sparkContext: SparkContext, - wsCxt: WholeStageTransformContext, - splitInfos: Seq[SplitInfo], - scan: BasicScanExecTransformer, - numOutputRows: SQLMetric, - numOutputBatches: SQLMetric, - scanTime: SQLMetric): RDD[ColumnarBatch] } diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala index 2dd5aff766a9..af35957ec393 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala @@ -22,18 +22,14 @@ import org.apache.gluten.extension.ValidationResult import org.apache.gluten.substrait.`type`.ColumnTypeNode import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.extensions.ExtensionBuilder -import org.apache.gluten.substrait.plan.PlanBuilder import org.apache.gluten.substrait.rel.{RelBuilder, SplitInfo} import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat -import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.hive.HiveTableScanExecTransformer import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType} -import org.apache.spark.sql.vectorized.ColumnarBatch -import com.google.common.collect.Lists import com.google.protobuf.StringValue import scala.collection.JavaConverters._ @@ -75,28 +71,6 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource .genSplitInfo(_, getPartitionSchema, fileFormat, getMetadataColumns.map(_.name))) } - def doExecuteColumnarInternal(): RDD[ColumnarBatch] = { - val numOutputRows = longMetric("numOutputRows") - val numOutputVectors = longMetric("outputVectors") - val scanTime = longMetric("scanTime") - val substraitContext = new SubstraitContext - val transformContext = transform(substraitContext) - val outNames = - filteRedundantField(outputAttributes()).map(ConverterUtils.genColumnNameWithExprId).asJava - val planNode = - PlanBuilder.makePlan(substraitContext, Lists.newArrayList(transformContext.root), outNames) - - BackendsApiManager.getIteratorApiInstance.genNativeFileScanRDD( - sparkContext, - WholeStageTransformContext(planNode, substraitContext), - getSplitInfos, - this, - numOutputRows, - numOutputVectors, - scanTime - ) - } - override protected def doValidateInternal(): ValidationResult = { var fields = schema.fields @@ -182,9 +156,9 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource def filteRedundantField(outputs: Seq[Attribute]): Seq[Attribute] = { var final_output: List[Attribute] = List() val outputList = outputs.toArray - for (i <- 0 to outputList.size - 1) { + for (i <- outputList.indices) { var dup = false - for (j <- 0 to i - 1) { + for (j <- 0 until i) { if (outputList(i).name == outputList(j).name) { dup = true } @@ -193,6 +167,6 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource final_output = final_output :+ outputList(i) } } - final_output.toSeq + final_output } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala index b0c8c59e7bb5..64d9d6546bd8 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala @@ -22,7 +22,6 @@ import org.apache.gluten.extension.ValidationResult import org.apache.gluten.metrics.MetricsUpdater import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat -import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan @@ -31,7 +30,6 @@ import org.apache.spark.sql.connector.read.{InputPartition, Scan} import org.apache.spark.sql.execution.datasources.v2.{BatchScanExecShim, FileScan} import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.vectorized.ColumnarBatch /** Columnar Based BatchScanExec. */ case class BatchScanExecTransformer( @@ -144,10 +142,6 @@ abstract class BatchScanExecTransformerBase( super.doValidateInternal() } - override def doExecuteColumnar(): RDD[ColumnarBatch] = { - doExecuteColumnarInternal() - } - override def metricsUpdater(): MetricsUpdater = BackendsApiManager.getMetricsApiInstance.genBatchScanTransformerMetricsUpdater(metrics) diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala index c3d2da7f0466..ff905251b8ae 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala @@ -21,7 +21,6 @@ import org.apache.gluten.extension.ValidationResult import org.apache.gluten.metrics.MetricsUpdater import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat -import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, PlanExpression} import org.apache.spark.sql.catalyst.plans.QueryPlan @@ -30,7 +29,6 @@ import org.apache.spark.sql.execution.FileSourceScanExecShim import org.apache.spark.sql.execution.datasources.HadoopFsRelation import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.collection.BitSet case class FileSourceScanExecTransformer( @@ -147,10 +145,6 @@ abstract class FileSourceScanExecTransformerBase( super.doValidateInternal() } - override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { - doExecuteColumnarInternal() - } - override def metricsUpdater(): MetricsUpdater = BackendsApiManager.getMetricsApiInstance.genFileSourceScanTransformerMetricsUpdater(metrics) diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala b/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala index 2952267e5a1d..5dfa85b269a8 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala @@ -22,7 +22,6 @@ import org.apache.gluten.extension.ValidationResult import org.apache.gluten.metrics.MetricsUpdater import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat -import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSeq, Expression} @@ -34,7 +33,6 @@ import org.apache.spark.sql.hive.HiveTableScanExecTransformer._ import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.hive.execution.{AbstractHiveTableScanExec, HiveTableScanExec} import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.Utils import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat @@ -81,10 +79,6 @@ case class HiveTableScanExecTransformer( override def metricsUpdater(): MetricsUpdater = BackendsApiManager.getMetricsApiInstance.genHiveTableScanTransformerMetricsUpdater(metrics) - override def doExecuteColumnar(): RDD[ColumnarBatch] = { - doExecuteColumnarInternal() - } - @transient private lazy val hivePartitionConverter = new HivePartitionConverter(session.sessionState.newHadoopConf(), session)