-
Notifications
You must be signed in to change notification settings - Fork 454
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GLUTEN-3378][CORE] Datasource V2 data lake read support #3843
Conversation
Thanks for opening a pull request! Could you open an issue for this pull request on Github Issues? https://github.com/oap-project/gluten/issues Then could you also rename commit message and pull request title in the following format?
See also: |
Run Gluten Clickhouse CI |
4 similar comments
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
@YannByron @yma11 @rui-mo Could you help review? |
Run Gluten Clickhouse CI |
} | ||
val scan = batchScanExec.scan | ||
scan match { | ||
case _ if scan.getClass.getName == IcebergScanClassName => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, supporting a new Scan
type (e.g. adding a non-Iceberg data source) will require modifying this match
code, as well as the supportedBatchScan()
method below.
Would it please be possible to allow other plugins to register themselves, so that adding a new format will not require changing ScanTransformerFactory
code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rz-vastdata We can achieve this using the Service Loader, similar to DataSourceRegister
in Spark. However, the current ScanTransformer
in Gluten extends Spark's ScanExec
and requires a constructor with parameters. This makes it a bit difficult to handle with Service Loader. I will think about whether there is another way to use the Service Loader.
|
||
public class IcebergLocalFilesNode extends LocalFilesNode { | ||
|
||
class DeleteFile { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if it's used by this PR...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't use it. This is a TODO.
My fault in design. I noticed that there are many modification that rename |
+1 |
import org.apache.spark.sql.catalyst.TableIdentifier | ||
import org.apache.spark.sql.sources.BaseRelation | ||
|
||
trait DatasourceScanTransformer extends BaseScanTransformer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this added to just keep consistent with vanilla Spark? If so, I think we can deduct this hierarchy to make the inheritance not so complicated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delta will extends this trait.
@@ -52,7 +52,7 @@ class HiveTableScanExecTransformer( | |||
relation: HiveTableRelation, | |||
partitionPruningPred: Seq[Expression])(session: SparkSession) | |||
extends HiveTableScanExec(requestedAttributes, relation, partitionPruningPred)(session) | |||
with BasicScanExecTransformer { | |||
with BaseScanTransformer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to still use BasicScanExecTransformer
as this change is not necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I will revert these changes first.
import org.apache.spark.sql.connector.read.InputPartition | ||
import org.apache.spark.sql.types.StructType | ||
|
||
trait BaseDataSource extends SupportFormat { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems BasicScanTransformer
is the only implementation of SupportFormat
, maybe you can add the fields directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add this interface to BaseDataSource
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean SupportFormat
.
|
||
// TODO: Add delete file support for MOR iceberg table | ||
|
||
IcebergLocalFilesNode( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will we also need to use service loader for this LocalFilesNode serialization? I am not sure how much in common for IcebergLocalFilesNode
and DeltaLocalFilesNode
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. It is not used by gluten-core
. The incremental part of the delta and iceberg definitions of the MOR table is expected to have significant differences.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be a toProtobuf()
method for IcebergLocalFilesNode
and it will be called in gluten-core/substrait/../PlanNode for serialization. Or how do you plan to pass these new added fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current PR does not involve deleting files. To support deleting files in the future, we will need to implement a specific toProtobuf
method and modify the proto file.
Run Gluten Clickhouse CI |
@@ -70,11 +70,11 @@ class HiveTableScanExecTransformer( | |||
|
|||
override def getPartitions: Seq[InputPartition] = partitions | |||
|
|||
override def getPartitionSchemas: StructType = relation.tableMeta.partitionSchema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yma11 I think the names of these interfaces need to be changed. Here, you can refer to the corresponding interfaces in Spark, and we should not use plurals.
d013400
to
6aa2493
Compare
Run Gluten Clickhouse CI |
6aa2493
to
0dc0f5f
Compare
Run Gluten Clickhouse CI |
2 similar comments
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
bb08d4c
to
5b723e7
Compare
Run Gluten Clickhouse CI |
@yma11 @YannByron @rz-vastdata I have modified the reflection to use the service loader. Can you please help review? |
Run Gluten Clickhouse CI |
a4f915c
to
af544ab
Compare
Run Gluten Clickhouse CI |
* their data source v2 transformer. This allows users to give the data source v2 transformer alias | ||
* as the format type over the fully qualified class name. | ||
*/ | ||
trait DataSourceV2TransformerRegister { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, it's ok. When support other datasource based on v1, i'll update here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. DataSource V1 has different interface parameters.
|
||
private val dataSourceV2TransformerMap = new ConcurrentHashMap[String, Class[_]]() | ||
|
||
def createFileSourceScanTransformer( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we combine the two createFileSourceScanTransformer
methods ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course, initially they were combined together, but we need to add some optional "Option" parameters.
af544ab
to
b37c3db
Compare
Run Gluten Clickhouse CI |
b37c3db
to
a8bdbda
Compare
Run Gluten Clickhouse CI |
ScanTransformerFactory.createFileSourceScanTransformer( | ||
fileSourceScan, | ||
reuseSubquery, | ||
extraFilters = leftFilters) | ||
case batchScan: BatchScanExec => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your fix. Can we avoid the extra filter pushdown for BatchScan by not calling applyFilterPushdownToScan
for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed this in d879fd0.
a8bdbda
to
d879fd0
Compare
Run Gluten Clickhouse CI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @liujiayi771, thanks for your update!
===== Performance report for TPCH SF2000 with Velox backend, for reference only ====
|
What changes were proposed in this pull request?
IcebergScanTransformer
is defined that extendsBatchScanExecTransformer
. Although theIcebergLocalFilesNode
is currently the same as theLocalFilesNode
, it is being prepared for future implementation of the "delete file" functionality.ScanTransformerFactory
is used to construct various types ofScanTransformer
. For Iceberg, theIcebergScanTransformer
will be constructed using service loader in order to avoid the dependency ofgluten-core
ongluten-iceberg
.BatchScan
only supports DPP's runtime filter for partition filtering.transformDynamicPruningExpr
method. This allows bothBatchScan
andFileSourceScan
to share this logic, as it is also required byBatchScan
.How was this patch tested?
VeloxTPCHIcebergSuite
andVeloxIcebergSuite
.