Skip to content

Commit

Permalink
fix ch ci failure issue
Browse files Browse the repository at this point in the history
  • Loading branch information
Yangyang Gao committed Dec 5, 2023
1 parent 2c23947 commit 79ba0da
Show file tree
Hide file tree
Showing 10 changed files with 28 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
starts,
lengths,
partitionColumns,
new JArrayList[JMap[String, String]](),
fileFormat,
preferredLocations.toList.asJava)
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,10 @@ case class ClickHouseAppendDataExec(
starts,
lengths,
partitionColumns.map(_.asJava).asJava,
mutable.ArrayBuffer.empty[Map[String, String]].map(_.asJava).asJava,
ReadFileFormat.UnknownFormat,
List.empty.asJava)
List.empty.asJava
)
val insertOutputNode = InsertOutputBuilder.makeInsertOutputNode(
SnowflakeIdWorker.getInstance().nextId(),
database,
Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/substrait/SubstraitParser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ void SubstraitParser::parsePartitionAndMetadataColumns(
case ::substrait::NamedStruct::METADATA_COL:
isPartitionColumns.emplace_back(false);
isMetadataColumns.emplace_back(true);
break;
break;
default:
VELOX_FAIL("Unspecified column type.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ trait BasicScanExecTransformer extends LeafTransformSupport with SupportFormat {
// The key of merge schema option in Parquet reader.
protected val mergeSchemaOptionKey = "mergeschema"

def filterExprs(): Seq[Expression]
def filterExprs(hasMetadataColFilters: Boolean = true): Seq[Expression]

def outputAttributes(): Seq[Attribute]

def getMetadataColumns(): Seq[Attribute]
def getMetadataColumns(): Seq[AttributeReference]

def getPartitions: Seq[InputPartition]

Expand Down Expand Up @@ -116,7 +116,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport with SupportFormat {
}
}.asJava
// Will put all filter expressions into an AND expression
val transformer = filterExprs()
val transformer = filterExprs(false)
.reduceLeftOption(And)
.map(ExpressionConverter.replaceWithExpressionTransformer(_, output))
val filterNodes = transformer.map(_.doTransform(context.registeredFunction))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ class BatchScanExecTransformer(
@transient override lazy val metrics: Map[String, SQLMetric] =
BackendsApiManager.getMetricsApiInstance.genBatchScanTransformerMetrics(sparkContext)

override def filterExprs(): Seq[Expression] = scan match {
override def filterExprs(hasMetataColFilters: Boolean): Seq[Expression] = scan match {
case fileScan: FileScan =>
fileScan.dataFilters
case _ =>
throw new UnsupportedOperationException(s"${scan.getClass.toString} is not supported")
}

override def getMetadataColumns(): Seq[Attribute] = Seq.empty
override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty

override def outputAttributes(): Seq[Attribute] = output

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import io.glutenproject.substrait.rel.ReadRelNode

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, PlanExpression}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, PlanExpression}
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.FileSourceScanExecShim
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
Expand Down Expand Up @@ -75,9 +75,15 @@ class FileSourceScanExecTransformer(
Map.empty[String, SQLMetric]
}

override def filterExprs(): Seq[Expression] = dataFiltersWithoutMetatadaAttr
override def filterExprs(hasMetadataColFilters: Boolean): Seq[Expression] = {
if (hasMetadataColFilters) {
dataFilters
} else {
dataFiltersWithoutMetadataAttr
}
}

override def getMetadataColumns(): Seq[Attribute] = metadataColumns
override def getMetadataColumns(): Seq[AttributeReference] = metadataColumns

override def outputAttributes(): Seq[Attribute] = output

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import io.glutenproject.substrait.rel.ReadRelNode
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.SQLMetric
Expand Down Expand Up @@ -64,9 +64,9 @@ class HiveTableScanExecTransformer(
hiveQlTable.getOutputFormatClass,
hiveQlTable.getMetadata)

override def filterExprs(): Seq[Expression] = Seq.empty
override def filterExprs(hasMetadataColFilters: Boolean): Seq[Expression] = Seq.empty

override def getMetadataColumns(): Seq[Attribute] = Seq.empty
override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty

override def outputAttributes(): Seq[Attribute] = output

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,9 @@ class FileSourceScanExecShim(

override def canEqual(other: Any): Boolean = other.isInstanceOf[FileSourceScanExecShim]

def dataFiltersWithoutMetatadaAttr: Seq[Expression] = dataFilters

def metadataColumns: Seq[AttributeReference] = Seq.empty

def dataFiltersWithoutMetadataAttr: Seq[Expression] = dataFilters
def hasUnsupportedColumns: Boolean = {
// Below name has special meaning in Velox.
output.exists(a => a.name == "$path" || a.name == "$bucket")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ class FileSourceScanExecShim(

override def canEqual(other: Any): Boolean = other.isInstanceOf[FileSourceScanExecShim]

def dataFiltersWithoutMetatadaAttr: Seq[Expression] = dataFilters.filterNot(_.references.exists {
def dataFiltersWithoutMetadataAttr: Seq[Expression] = dataFilters.filterNot(_.references.exists {
case FileSourceMetadataAttribute(_) => true
case _ => false
})

def hasUnsupportedColumns: Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
import io.glutenproject.metrics.GlutenTimeMetric

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BoundReference, DynamicPruningExpression, Expression, FileSourceMetadataAttribute, MetadataAttribute, PlanExpression, Predicate}
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BoundReference, DynamicPruningExpression, Expression, FileSourceMetadataAttribute, PlanExpression, Predicate}
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
import org.apache.spark.sql.execution.metric.SQLMetric
Expand Down Expand Up @@ -60,12 +60,12 @@ class FileSourceScanExecShim(

override def canEqual(other: Any): Boolean = other.isInstanceOf[FileSourceScanExecShim]

def dataFiltersWithoutMetatadaAttr: Seq[Expression] = dataFilters.filterNot(_.references.exists {
def dataFiltersWithoutMetadataAttr: Seq[Expression] = dataFilters.filterNot(_.references.exists {
case FileSourceMetadataAttribute(_) => true
case _ => false
})

def metadataColumns: Seq[AttributeReference] = fileConstantMetadataColumns

def hasUnsupportedColumns: Boolean = {
val metadataColumnsNames = metadataColumns.map(_.name)
output
Expand Down

0 comments on commit 79ba0da

Please sign in to comment.