You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
A query with a LIMIT clause, such as SELECT * FROM cloudtrail LIMIT 10, takes 1.5 minutes to execute when querying a CloudTrail dataset containing over 10,000 files.
Key issues observed:
Excessive Resource Consumption: The query plans more than 10K tasks, With the Dynamic Resource Allocation (DRA) feature, EMR-S Spark spins up additional nodes, which adds a delay of 30+ seconds.
Inefficient Execution: Despite Spark's limit optimization, which is designed to minimize unnecessary file scans by incrementally reading splits, the query is not efficiently skipping unneeded files.
Root cause analysis
We use the following example to explain the problem. The dataset consists of 225 files, and Spark splits and groups them into 12 input splits.
When the user submits the query SELECT * FROM alb_logs LIMIT 10, the expected behavior is that Spark should scan only one split (i.e., one file). If the query successfully retrieves the 10 rows, it returns the result without scanning additional files. Otherwise, Spark will scan more files, controlled by the spark.sql.limit.scaleUpFactor. For instance, the query execution plan for this query is shown in the figure below. The Spark job only contains one task, which fetches 10 rows from a single file without requiring a shuffle stage. The entire job took 24 milliseconds to complete.
However, when the query is submitted through FlintREPL, it interacts with Spark using the following code: spark.sql("SELECT * FROM alb_logs LIMIT 10").toJSON.collect()
In this case, the execution plan differs. The introduction of the toJSON operator causes Spark to split the execution into two stages, with a shuffle stage in between. which leads to unnecessary overhead.
What solution would you like?
I would like a solution that can progressively plan the InputPartition and collect only the necessary dataset.
What alternatives have you considered?
n/a
Do you have any additional context?
attached
The text was updated successfully, but these errors were encountered:
penghuo
changed the title
[FEATURE] Performance - LIMIT clause does not push down
[FEATURE] Performance - LIMIT clause does been optimized in execution stage
Oct 8, 2024
@penghuo I think this is a bug in Spark. I filed a PR apache/spark#48407 to fix it. Before being merged, if this is a very common usage in our Flint, maybe we should remove the toJSON and translate dataset to JSON format inside Flint.
Is your feature request related to a problem?
Problem statements
A query with a LIMIT clause, such as SELECT * FROM cloudtrail LIMIT 10, takes 1.5 minutes to execute when querying a CloudTrail dataset containing over 10,000 files.
Key issues observed:
Root cause analysis
We use the following example to explain the problem. The dataset consists of 225 files, and Spark splits and groups them into 12 input splits.
![Screenshot 2024-10-08 at 7 53 31 AM](https://private-user-images.githubusercontent.com/2969395/374728434-ce677b17-6fba-4fcc-a1fa-621e8ae41769.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3Mzk3OTcwOTUsIm5iZiI6MTczOTc5Njc5NSwicGF0aCI6Ii8yOTY5Mzk1LzM3NDcyODQzNC1jZTY3N2IxNy02ZmJhLTRmY2MtYTFmYS02MjFlOGFlNDE3NjkucG5nP1gtQW16LUFsZ29yaXRobT1BV1M0LUhNQUMtU0hBMjU2JlgtQW16LUNyZWRlbnRpYWw9QUtJQVZDT0RZTFNBNTNQUUs0WkElMkYyMDI1MDIxNyUyRnVzLWVhc3QtMSUyRnMzJTJGYXdzNF9yZXF1ZXN0JlgtQW16LURhdGU9MjAyNTAyMTdUMTI1MzE1WiZYLUFtei1FeHBpcmVzPTMwMCZYLUFtei1TaWduYXR1cmU9YTQ4YjNjMTliMzc5OWY0ZDZjN2UzYWY3NjU3ZGNiMDFkMmNiOGRlZjE3Mzc5OGVjODViZmJlMzQ1MWM2NWZjNiZYLUFtei1TaWduZWRIZWFkZXJzPWhvc3QifQ._uAQOl4xcVek8me0jQOdvb0vqCF3pEcq8n7z4estOuI)
When the user submits the query SELECT * FROM alb_logs LIMIT 10, the expected behavior is that Spark should scan only one split (i.e., one file). If the query successfully retrieves the 10 rows, it returns the result without scanning additional files. Otherwise, Spark will scan more files, controlled by the spark.sql.limit.scaleUpFactor. For instance, the query execution plan for this query is shown in the figure below. The Spark job only contains one task, which fetches 10 rows from a single file without requiring a shuffle stage. The entire job took 24 milliseconds to complete.
However, when the query is submitted through FlintREPL, it interacts with Spark using the following code: spark.sql("SELECT * FROM alb_logs LIMIT 10").toJSON.collect()
![Screenshot 2024-10-08 at 7 44 32 AM](https://private-user-images.githubusercontent.com/2969395/374728506-bf60ddb1-4769-442e-8033-5bd827cf1e21.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3Mzk3OTcwOTUsIm5iZiI6MTczOTc5Njc5NSwicGF0aCI6Ii8yOTY5Mzk1LzM3NDcyODUwNi1iZjYwZGRiMS00NzY5LTQ0MmUtODAzMy01YmQ4MjdjZjFlMjEucG5nP1gtQW16LUFsZ29yaXRobT1BV1M0LUhNQUMtU0hBMjU2JlgtQW16LUNyZWRlbnRpYWw9QUtJQVZDT0RZTFNBNTNQUUs0WkElMkYyMDI1MDIxNyUyRnVzLWVhc3QtMSUyRnMzJTJGYXdzNF9yZXF1ZXN0JlgtQW16LURhdGU9MjAyNTAyMTdUMTI1MzE1WiZYLUFtei1FeHBpcmVzPTMwMCZYLUFtei1TaWduYXR1cmU9NThkOWUxNmFiYTdiY2U5YWY2YmFmYzA1ODdhNGM4MDhjYjY3NGQ4Njg3ZWZlNWVjMjM4OTA5MzUwNzdmYjVkYiZYLUFtei1TaWduZWRIZWFkZXJzPWhvc3QifQ.CbRZaQlToCutG3gfPnzfgmGxn8SNSbfVKkjQqn1zUYI)
In this case, the execution plan differs. The introduction of the toJSON operator causes Spark to split the execution into two stages, with a shuffle stage in between. which leads to unnecessary overhead.
What solution would you like?
I would like a solution that can progressively plan the InputPartition and collect only the necessary dataset.
What alternatives have you considered?
n/a
Do you have any additional context?
attached
The text was updated successfully, but these errors were encountered: