Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Improve the implementation of Spark function input_file_name #6157

Closed
zhztheplayer opened this issue Jun 20, 2024 · 6 comments · Fixed by #6200
Closed

[VL] Improve the implementation of Spark function input_file_name #6157

zhztheplayer opened this issue Jun 20, 2024 · 6 comments · Fixed by #6200
Labels
enhancement New feature or request

Comments

@zhztheplayer
Copy link
Member

zhztheplayer commented Jun 20, 2024

Description

2 PRs are being proposed for adding support for input_file_name

#6021 (merged)
#6139 (pending)

However so far 2 interdependent rules are being added for this feature, one for pushing down input_file_name function to Velox scan, another one for falling back the plan to vanilla plan once failed to move scan to Velox.

This kind of re-planning for offload/fallback should be avoided since it significantly increases query planner's code complexity. Technically, the rule to fallback is relying on not only the offload rule but actually all the rules that apply before it. We should find another way to add this kind of function support.

To improve:

  1. Remove InputFileNameReplaceRule and InputFileNameReplaceFallbackRule
  2. Remove SparkPlanExecApi#genExtendedColumnarFinalRules since unused
  3. In OffloadSingleNode.scala, add a rule OffloadInputFileName (or something) to:
    • Match on Project(input_file_name) + Scan pattern
    • Check validity
    • If eligible, replace the pattern with ProjectTransformer(input_file_name) + ScanTransformer(input_file_name)
    • If not eligible, do nothing
@zhztheplayer
Copy link
Member Author

Assigned to @gaoyangxiaozhu

@gaoyangxiaozhu
Copy link
Contributor

thanks @zhztheplayer , also synced with @zhli1142015 offline, he has another proposal to maybe using a new logical rule in Logical Plan phase to replace input_file_name expression to partition column instead of metadata column, with that, only need to modify project node instead of proejct + scan node to make thing simple.

@zhli1142015 could you help share your thoguht here for reference, then we can finalize using which way for refactor this part. thanks!

@gaoyangxiaozhu
Copy link
Contributor

gaoyangxiaozhu commented Jun 20, 2024

hey @zhztheplayer I thought a while and zhen's suggestion to convert input_file_name to partition column maybe a good workaround which don't need we update scan node but it is actually not semantic right, since file_name is actually more metadata column not partition column.

But it help me bring another idea which can overcome the above shortcomings.

The old implemented way need a fallback rule due to it introduce a new metadta column named $xxx$ which vanilla spark can't recognize so need be revert using fallback strategy, however actually vanilla spark has it self's metadata column named _metadata.file_name which has same semantic as input_file_name, so we can actually just leverage one simple rule to convert the input_file_name to _metadata.file_name which works in both vinalla spark and native.

With that, we only need one rule to keep it simple and work.

for example

Project [input_file_name() AS input_file_name()#7, id#0]
+- Relation [id#0] parquet

would be

Project [_metadata#6.file_name AS file_name#7, id#0]
+- Relation [id#0,_metadata#6] parquet

I will try locally to verify if the idea works. But let me know if you think that way is acceptable

@zhztheplayer / @FelixYBW / @zhli1142015 FYI

@zhztheplayer
Copy link
Member Author

hey @zhztheplayer I thought a while and zhen's suggestion to convert input_file_name to partition column maybe a good workaround which don't need we update scan node but it is actually not semantic right, since file_name is actually more metadata column not partition column.

But it help me bring another idea which can overcome the above shortcomings.

The old implemented way need a fallback rule due to it introduce a new metadta column named $xxx$ which vanilla spark can't recognize so need be revert using fallback strategy, however actually vanilla spark has it self's metadata column named _metadata.file_name which has same semantic as input_file_name, so we can actually just leverage one simple rule to convert the input_file_name to _metadata.file_name which works in both vinalla spark and native.

With that, we only need one rule to keep it simple and work.

for example

Project [input_file_name() AS input_file_name()#7, id#0]
+- Relation [id#0] parquet

would be

Project [_metadata#6.file_name AS file_name#7, id#0]
+- Relation [id#0,_metadata#6] parquet

I will try locally to verify if the idea works. But let me know if you think that way is acceptable

@zhztheplayer / @FelixYBW / @zhli1142015 FYI

That sounds great if it works.

@gaoyangxiaozhu
Copy link
Contributor

hey @zhztheplayer I thought a while and zhen's suggestion to convert input_file_name to partition column maybe a good workaround which don't need we update scan node but it is actually not semantic right, since file_name is actually more metadata column not partition column.
But it help me bring another idea which can overcome the above shortcomings.
The old implemented way need a fallback rule due to it introduce a new metadta column named $xxx$ which vanilla spark can't recognize so need be revert using fallback strategy, however actually vanilla spark has it self's metadata column named _metadata.file_name which has same semantic as input_file_name, so we can actually just leverage one simple rule to convert the input_file_name to _metadata.file_name which works in both vinalla spark and native.
With that, we only need one rule to keep it simple and work.
for example

Project [input_file_name() AS input_file_name()#7, id#0]
+- Relation [id#0] parquet

would be

Project [_metadata#6.file_name AS file_name#7, id#0]
+- Relation [id#0,_metadata#6] parquet

I will try locally to verify if the idea works. But let me know if you think that way is acceptable
@zhztheplayer / @FelixYBW / @zhli1142015 FYI

That sounds great if it works.

unlucky, can't directly reuse __metadata.file_path to replace input_file_name due to they have different value schema format.
for example
input_file_name() return file:///home/gayangya/Code/example.parquet/part-00001-7a014537-3f38-433a-9509-c63622146eb5-c000.snappy.parquet
while __metadata.file_path return file:/home/gayangya/Code/example.parquet/part-00001-7a014537-3f38-433a-9509-c63622146eb5-c000.snappy.parquet, one is file:///xxx, other is file:/xxx

@gaoyangxiaozhu
Copy link
Contributor

gaoyangxiaozhu commented Jun 25, 2024

@zhztheplayer i checked the spark code if a project node has input_file_name expression it can only have at mostly one data source scan child node. That makes it easily for us to implement one rule/method to make the call if offload/fallback a project node if have input_file_name.

I have follow your suggestion to modify OffloadSingleNode , the difference is i don't add a new offload way only for project + input_file_name scenarioto, instead I extend the logic of offload ProjectExec to add code path if a project is not transfromable but it match Project(input_file_name) + Scan pattern and scan is transformable, then we do convert and still offload the project.
Please help review #6200 thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
2 participants