-
Notifications
You must be signed in to change notification settings - Fork 165
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Various metrics bug fixes and improvements #1111
base: main
Are you sure you want to change the base?
Conversation
@viirya @parthchandra @mbutrovich This is still WIP but let me know what you think of the overall approach here if you have time. Current status is that we now log the metrics that we are dropping. Here are two examples from TPC-H q3. We wrap an aggregate in a projection causing:
The input to a SortExec is a ScanExec to fetch the input batches from the JVM, and we drop those metrics:
|
Arc::new(SparkPlan::new_with_additional( | ||
spark_plan.plan_id, | ||
projection, | ||
vec![child], | ||
vec![aggregate], | ||
)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an example where we are currently dropping the aggregate metrics and just capturing the projection metrics
My initial thoughts:
|
Approach looks good (though I cannot say I understand it completely). The results are definitely what we wanted! |
I can possibly break this down into some smaller PRs as well. I may do that. |
@@ -365,28 +378,23 @@ struct ScanStream<'a> { | |||
scan: ScanExec, | |||
/// Schema representing the data | |||
schema: SchemaRef, | |||
/// Metrics |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it dropped because it repeats what we have on SparkPlan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have reverted some of these changes now
partition: usize, | ||
baseline_metrics: BaselineMetrics, | ||
) -> Self { | ||
pub fn new(scan: ScanExec, schema: SchemaRef, partition: usize, jvm_fetch_time: Time) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps jvm_fetch_time
enough for now, but if you wanna expand metrics in future its better to have a wrapper structure similar to BaselineMetrics
?
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #1111 +/- ##
============================================
+ Coverage 34.33% 34.43% +0.09%
- Complexity 898 901 +3
============================================
Files 115 115
Lines 42986 43477 +491
Branches 9369 9506 +137
============================================
+ Hits 14761 14971 +210
- Misses 25361 25615 +254
- Partials 2864 2891 +27 ☔ View full report in Codecov by Sentry. 🚨 Try these New Features:
|
Which issue does this PR close?
Closes #1109
Closes #1003
Closes #1110
Closes #935
Rationale for this change
We currently drop some native metrics due to a design flaw in the current metrics code where we assume that the native plan is a 1:1 mapping with the Spark plan, which is often not true. See the issue for more details.
Improvement 1: Fix bug where metrics were being dropped in some cases
Here are before and after images for
BulidRight
hash join where we insert an extra projection on the native side, breaking the assumption that there is a 1:1 mapping between Spark plan and native plan:Improvement 2: Report Arrow FFI time for passing batches from JVM to native
We now include the
ScanExec
time for transferring batches from JVM to native. The following example shows total scan time of 16.4 seconds but now also shows the additional 17.7 seconds for transferring those batches to native for the filter operation.What changes are included in this PR?
The native planner now builds a tree of
SparkPlan
that is a 1:1 mapping with the original Spark plan. EachSparkPlan
can reference multiple native plans that should be used for metrics collection.How are these changes tested?
Existing tests, and new unit tests in the planner