Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE] Implement stage-level resourceProfile auto-adjust framework to avoid oom #8018

Open
zjuwangg opened this issue Nov 21, 2024 · 21 comments
Labels
enhancement New feature or request

Comments

@zjuwangg
Copy link
Contributor

zjuwangg commented Nov 21, 2024

Implement stage-level resourceProfile auto-adjust framework to avoid oom

Backgroud

In our production environment, we suffer a lot from gluten jobs throwing heap OOM exception occasionally.
We hava digged into these problem, and there are major two kinds problem causing our jobs throwing oom:

  1. Stage contains fallback operator, eg: udaf and other still not supported function or operator, which require more heap memory then configured.
  2. Stage contains no fallback operator but contains a very heavy upstream exchanage. Here heavy means the upstream exchenage contains a huge M * N shuffle status(M means the shuffle mapper num and N means the reducer num), when this stage begins to do shuffle read, the executor side must keep the whole mapStatuses of the upstream shuffle status, when M * N is large, it's very likely causing heap OOM exception.
    image

The root cause is for now in a same spark application, all stages share same task heap/offheap memory config, and when different stage requires different offheap/heap fraction, the problem appears. Since #4392 has proposed a potential solution to solve this type of problem, we did some verification based on this idea.

Design

  • Introduce ResourceProfile setter in WholeStageTransformerand ColumnarShuffleExchangeExec
    Since all underlying native computation gets triggered from WholeStageTransformer or from ColumnarShuffle, we can add
  @transient private var resourceProfile: Option[ResourceProfile] = None

  def withResources(rp: ResourceProfile): Unit = {
    this.resourceProfile = Some(rp)
  }

in WholeStageTransformer, and when doCxecuteColumnar get Called and before rdd returned, set the resourceProfile for rdd.

    if (resourceProfile.isDefined) {
      logInfo(s"set resource profile ${resourceProfile.get} for child $child")
      rdd.withResources(resourceProfile.get)
    }
    rdd
  • Introduce GlutenDynamicAdjustStageRP in HeuristicApplier
    when aqe is enabled, we can check all operator in this stage and collect all child queryStage if exist belong to this stage.
    After we have collected all plan nodes belong to this stage, we can know whether there exists fallback or not, also we can calculate the shuffle status complexity to roughly estimate mapStatus memory occupation. The rule works in follwing steps
1. Collect all plan nodes belong to this stage.

2. Analyze plan nodes detail, gathing whether fallback exists and whether exist child queryStage.

3. Generate new resource profile
	3.1 Get the default resource profile from the sparkContext.resourceProfileManager and initializes task and executor resource requests based on the default profile.
    3.2 Adjusting Memory/Offheap Request

4. Handle Different Scenarios for Resource Profile Adjustment

Scenario 1: Fallback Exists: If both existsC2RorR2C and existsGlutenOperator are true, tries to apply the new resource profile to the detailed plans. 

Scenario 2: Shuffle Status Consideration: It filters the detailed plans to get only the ShuffleQueryStageExec instances. If there are any, it calculates the complexity of the stage shuffle status based on the number of mappers and reducers in each ShuffleQueryStageExec. If the calculated complexity meets or exceeds a threshold from the glutenConfig, then applies the new resource profile to the detailed plans.

5. Apply new resource profile if needed

We have completed a poc of this design and really sovled these two types oom problem, and we are refactoring code and plan to contribute to community.

Requirements

  • Aqe must be enabeld
  • Meets the stage level resource conditions
    • executor dynamic allocation is enabled, spark.dynamicAllocation.enabled must be true
    • Underlying resource schduler must support dynamic allocate executor

Potential Other Benifit

  1. Provided a new way to specify other resources eg. gpu for stage
  2. External tuning systems can intervene through this way.

FAQ

  1. what if a stage exists multiple WholeStageTransformer and will the multiple resource profile conflict each other?

Multiple resource profile can be merged through spark's mechnism.

  1. What if one stage get totally fallback which means there no chance to set ResourceProfile for this stage?

Potential solution: a) Wrap the whole fallbacked plan with a WrapperNode with interface and abillity to set ResourceProfile; b) Set default resource profile suitable for whole-stage-fallback stage and no need to set plan for this stage.

  1. other question?

We‘d love to here more thoughts and receive more comments about this idea!

@zjuwangg zjuwangg added the enhancement New feature or request label Nov 21, 2024
@zjuwangg
Copy link
Contributor Author

cc @WangGuangxin @weiting-chen

@FelixYBW
Copy link
Contributor

@zjuwangg Thank you for your investigation! It's really something we'd like to do.

  • We also should consider about the collaboration with RAS.
  • We need to predefine some operators' potential memory usage like Scan or Project in velox consumes little memory, but aggregate and join need much. So if a scan + fallback aggregate, we are able to set small offheap + large on heap. If it's a offloaded agg + fallbacked join, we now needs to set large offheap + large on heap, in this way we should fallback the agg or even whole stage then set a large on heap memory.
  • It's even better if we can specify different fallback policy when a task is retried, which means some task may offload to Velox, some task may retry with fallback. In theory it's possible but more complex.

@PHILO-HE has done some investigation some time ago and noted some code changes in Vanilla Spark is necessary, did you noted it? if so we may hack the code in Gluten firstly then submit PR to upstream Spark.

@FelixYBW
Copy link
Contributor

@zhli1142015 @Yohahaha @ulysses-you @jackylee-ch @kecookier @zhztheplayer

A big feature!

@Yohahaha
Copy link
Contributor

thank you for proposing this great idea and glad to see the POC has gain benefits in your prod env!

Meets the stage level resource conditions

  1. executor dynamic allocation is enabled, spark.dynamicAllocation.enabled must be true
  2. Underlying resource schduler must support dynamic allocate executor

for me, the most interesting things is the DRA(dynamic resource allocation) must be enabled, I guess the reason is to change executor's memory settings after we found OOM occurs, otherwise, new executor/pod will still OOM then dead, lead to spark job failed finally.

I found Uber has proposed a idea to solve pure on-heap OOM, it may helps understand more context about the reason for above requirement of DRA.
https://www.uber.com/en-JP/blog/dynamic-executor-core-resizing-in-spark/

@zhztheplayer zhztheplayer changed the title Implement stage-level resourceProfile auto-adjust framework to avoid oom [CORE] Implement stage-level resourceProfile auto-adjust framework to avoid oom Nov 25, 2024
@zhztheplayer
Copy link
Member

Thank you for sharing this work. Look forward for an initial patch to try with.

@jackylee-ch
Copy link
Contributor

Thanks for the interesting PR! Curious about how on-heap and off-heap sizes are determined in current production environments, I'm looking forward to seeing it.

@zjuwangg
Copy link
Contributor Author

zjuwangg commented Nov 25, 2024

@FelixYBW Thanks for detailed review!

  • We also should consider about the collaboration with RAS.
  • RAS sure should be considered!
  • We need to predefine some operators' potential memory usage like Scan or Project in velox consumes little memory, but aggregate and join need much. So if a scan + fallback aggregate, we are able to set small offheap + large on heap. If it's a offloaded agg + fallbacked join, we now needs to set large offheap + large on heap, in this way we should fallback the agg or even whole stage then set a large on heap memory.
  • Yes, we also have done some very simple work in our inner spark this way to save resource. Our first draft will not consider detailed operator but just focus on stage containing fallback operator and huge shuffle status. We could consider the detailed operator and adjust resource profile more carefully in the long term.
  • It's even better if we can specify different fallback policy when a task is retried, which means some task may offload to Velox, some task may retry with fallback. In theory it's possible but more complex.

We also have considered the situation when task is retied, but current spark seems no way to change the task's retry resource simpily.

@PHILO-HE has done some investigation some time ago and noted some code changes in Vanilla Spark is necessary, did you noted it? if so we may hack the code in Gluten firstly then submit PR to upstream Spark.

If one stage is totally fallback, there will be no interface to change this stage resource profile! That's the design FAQ 2 discussed with. Maybe there are more changes is needed in Vanilla Spark, nice to here more advices.

@PHILO-HE
Copy link
Contributor

@FelixYBW, what we previously considered is how to make resource profile applied for Spark in whole stage fallback situation. We thought it may need some code to hack into Spark code to achieve that. I note this design tried to use a wrapper or a separate configuration. Not sure the feasibility.

@zjuwangg, in your design, you mentioned that WholeStageTransformer will get resource profile applied for Gluten plans. But WholeStageTransformer may only wrap partial offloaded operators for this stage. You have considered this, right?

As @Yohahaha mentioned, Spark community has a SPIP for DynamicExecutorCoreResizing which is another attempt to manage to avoid the OOM failure.

@zjuwangg
Copy link
Contributor Author

Thanks for the interesting PR! Curious about how on-heap and off-heap sizes are determined in current production environments, I'm looking forward to seeing it.

When Vanilla Spark jobs migrate to spark native jobs, we will recompute and set new memory config. If the total original memory is M (executor.memory + executor.offheap), we will set 0.7M as off-heap and 0.3M as on-heap in native memory.
When huge shuffle status detected or fallback operator detects, we will double the on-heap memory, which will be 0.6M

@zjuwangg
Copy link
Contributor Author

@FelixYBW, what we previously considered is how to make resource profile applied for Spark in whole stage fallback situation. We thought it may need some code to hack into Spark code to achieve that. I note this design tried to use a wrapper or a separate configuration. Not sure the feasibility.

@zjuwangg, in your design, you mentioned that WholeStageTransformer will get resource profile applied for Gluten plans. But WholeStageTransformer may only wrap partial offloaded operators for this stage. You have considered this, right?

As @Yohahaha mentioned, Spark community has a SPIP for DynamicExecutorCoreResizing which is another attempt to manage to avoid the OOM failure.

@PHILO-HE

in your design, you mentioned that WholeStageTransformer will get resource profile applied for Gluten plans. But WholeStageTransformer may only wrap partial offloaded operators for this stage. You have considered this, right?

There should be no problem, current dynamic resource profile will search underlying rdd's resource profile, once we set the resource profile for WholeStageTransformer, the whole stage will all get affected(whether native node or original spark node.)

@zhztheplayer
Copy link
Member

zhztheplayer commented Nov 25, 2024

also we can calculate the shuffle status complexity to roughly estimate mapStatus memory occupation. The rule works in follwing steps

@zjuwangg I think this part is comparatively tricker than other. Do you think you can start from an individual PR which adds an utility / API to do resource estimation on query plans (if the idea aligns with your approach, I am not sure)? Then in subsequent PRs we can adopt this API in Gluten and whole-stage transformer for remaining work.

Moreover, does the feature target more for batch query scenarios (ETL, nightly, etc.)? Since I remember changing a Spark resource profile usually causes rebooting of executors, which will cause larger latency on ad-hoc queries?

cc @PHILO-HE

@FelixYBW
Copy link
Contributor

uber.com/en-JP/blog/dynamic-executor-core-resizing-in-spark

Uber is testing Gluten. Let me ping to see if they have interest.

@FelixYBW
Copy link
Contributor

@FelixYBW, what we previously considered is how to make resource profile applied for Spark in whole stage fallback situation. We thought it may need some code to hack into Spark code to achieve that. I note this design tried to use a wrapper or a separate configuration. Not sure the feasibility.

Should we hack the spark task scheduler to schedule the task with large onheap/offheap memory to the right executors?

@FelixYBW
Copy link
Contributor

FelixYBW commented Nov 25, 2024

Moreover, does the feature target more for batch query scenarios (ETL, nightly, etc.)? Since I remember changing a Spark resource profile usually causes rebooting of executors, which will cause larger latency on ad-hoc queries?

There are some talk previously with Pinterest team. We have two ways to do this:

  1. The normal way, we initially start executors with large offheap memory, when a task needs large on heap memory, the driver needs to kill current executor (including all other tasks running on current executor but different task thread) and restart a new executor with large onheap memory to run the task. So the total executor number and resource is as configured. We may needn't hack Spark in this way.
  2. we start 2 executors but share the same memory resource, one for large offheap, one for large onheap. At any time, task scheduler either schedule task to offheap or onheap executors. In this way we can make sure source isn't overcommitted. It can avoid the frequent restart of the offheap/onheap executor.
  3. like 1, but spark.dynamicAllocation.maxExecutors is enabled which I'd expect most customers do, we can start new executors with large onheap memory if resource is still available. It may lead to a situation that new executor has no oppotunity to start.

To creat POC, we may start from 2 (with 1/2 executor.instances for offheap and 1/2 for onheap) or 3. You thought?

To estimate the offheap/onheap ratio, we can start with a configurable value like 8:2 for offheap vs: 0:10 for onheap.

Another thing to considerate is that Vanilla spark also support offheap memory, but it still needs large onheap memory, I didn't find any guideline how to config this. Either not sure if Spark community still are working to move all large memory allocation from onheap to offheap. If one day all large memory allocation in vanilla spark can be allocated in offheap, we don't have such issue, but the new issue how Gluten and spark share offheap memory which isn't fully solved today.

@zjuwangg
Copy link
Contributor Author

@zjuwangg I think this part is comparatively tricker than other. Do you think you can start from an individual PR which adds an utility / API to do resource estimation on query plans (if the idea aligns with your approach, I am not sure)? Then in subsequent PRs we can adopt this API in Gluten and whole-stage transformer for remaining work.

@zhztheplayer Got your meaning, I will try in this way.

Moreover, does the feature target more for batch query scenarios (ETL, nightly, etc.)? Since I remember changing a Spark resource profile usually causes rebooting of executors, which will cause larger latency on ad-hoc queries?

cc @PHILO-HE

Yes, changing spark resource profile will require resource manager to allocate more executors, which may cause larger latency. In our production scenario, we more focus on improving ETL stability(avoid failure)

@zjuwangg
Copy link
Contributor Author

On spark side, this feature need https://issues.apache.org/jira/browse/SPARK-50421 get fixed.

@FelixYBW
Copy link
Contributor

On spark side, this feature need issues.apache.org/jira/browse/SPARK-50421 get fixed.

You may pick it into Gluten temporarily.

@zjuwangg
Copy link
Contributor Author

On spark side, this feature need https://issues.apache.org/jira/browse/SPARK-50421 get fixed.

This has been fixed in Spark 3.5.4 and up-coming 4.0.0

@zjuwangg
Copy link
Contributor Author

zjuwangg commented Dec 10, 2024

I just opened the first PR #8195 which add set/get ResourceProfile interface in the GlutenPlan. Please help review it when your guys have time.

@PHILO-HE @FelixYBW @zhztheplayer @jackylee-ch @Yohahaha

@PHILO-HE
Copy link
Contributor

@zjuwangg, I note you have a Spark PR merged to 3.5.4 & 4.0.0 to make a custom resource profile work. So this proposed Gluten feature will only support 3.5.4 and later version?

@zjuwangg
Copy link
Contributor Author

@zjuwangg, I note you have a Spark PR merged to 3.5.4 & 4.0.0 to make a custom resource profile work. So this proposed Gluten feature will only support 3.5.4 and later version?

In fact, if we just increase stage heap memory and don't adjust offheap memory, this feature can also work on Spark 3.1 + version.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

6 participants