Skip to content

Commit

Permalink
fix data lake connectors
Browse files Browse the repository at this point in the history
Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
  • Loading branch information
zhouyuan committed Oct 9, 2024
1 parent ba675b9 commit 4775901
Showing 3 changed files with 11 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.QueryPlanWrapper
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.types.StructType
@@ -66,14 +66,14 @@ case class DeltaScanTransformer(
override def doCanonicalize(): DeltaScanTransformer = {
DeltaScanTransformer(
relation,
output.map(QueryPlan.normalizeExpressions(_, output)),
output.map(QueryPlanWrapper.normalizeExpressions(_, output)),
requiredSchema,
QueryPlan.normalizePredicates(
QueryPlanWrapper.normalizePredicates(
filterUnusedDynamicPruningExpressions(partitionFilters),
output),
optionalBucketSet,
optionalNumCoalescedBuckets,
QueryPlan.normalizePredicates(dataFilters, output),
QueryPlanWrapper.normalizePredicates(dataFilters, output),
None,
disableBucketedScan
)
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.QueryPlanWrapper
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.types.StructType
@@ -61,14 +61,14 @@ case class HudiScanTransformer(
override def doCanonicalize(): HudiScanTransformer = {
HudiScanTransformer(
relation,
output.map(QueryPlan.normalizeExpressions(_, output)),
output.map(QueryPlanWrapper.normalizeExpressions(_, output)),
requiredSchema,
QueryPlan.normalizePredicates(
QueryPlanWrapper.normalizePredicates(
filterUnusedDynamicPruningExpressions(partitionFilters),
output),
optionalBucketSet,
optionalNumCoalescedBuckets,
QueryPlan.normalizePredicates(dataFilters, output),
QueryPlanWrapper.normalizePredicates(dataFilters, output),
None,
disableBucketedScan
)
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@ import org.apache.gluten.substrait.rel.SplitInfo

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, DynamicPruningExpression, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.QueryPlanWrapper
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
@@ -71,8 +71,8 @@ case class IcebergScanTransformer(

override def doCanonicalize(): IcebergScanTransformer = {
this.copy(
output = output.map(QueryPlan.normalizeExpressions(_, output)),
runtimeFilters = QueryPlan.normalizePredicates(
output = output.map(QueryPlanWrapper.normalizeExpressions(_, output)),
runtimeFilters = QueryPlanWrapper.normalizePredicates(
runtimeFilters.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral)),
output)
)

0 comments on commit 4775901

Please sign in to comment.