Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50806][SQL] Support InputRDDCodegen interruption on task cancellation #49501

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

Ngone51
Copy link
Member

@Ngone51 Ngone51 commented Jan 15, 2025

What changes were proposed in this pull request?

This PR adds a new mutable state taskInterrupted in InputRDDCodegen. The state indicates whether the task thread is interrupted or not. InputRDDCodegen would check the state each time when it produces a record and stops immediately if taskInterrupted=true. taskInterrupted would be updated by TaskContext.isInterrupted() per 1000 output rows processed.

Why are the changes needed?

This facilitates the resource release when there is a task killing request.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added unit test.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Jan 15, 2025
@Ngone51 Ngone51 requested a review from cloud-fan January 15, 2025 07:01
| InternalRow $row = (InternalRow) $input.next();
| $numOutputRows++;
| if ($numOutputRows % 1000 == 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1000 looks like a magic number. So, we don't need to interrupt when the number of rows is less than 1000 because it will finish quickly in any way?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it's by intuition. And yes we expect 1000 rows to finish quickly soon.

val logAppender = new LogAppender("")
withLogAppender(logAppender, level = Some(Level.INFO)) {
spark.sparkContext.setJobGroup("SPARK-50806", "SPARK-50806", false)
val slowDF = spark.range(1, 100000).rdd.mapPartitions { iter =>
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment that this test case assumption: this should be 100000(>1k) because we check it per 1k rows?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

@cloud-fan
Copy link
Contributor

@Ngone51 the new test failed

@Ngone51
Copy link
Member Author

Ngone51 commented Jan 16, 2025

@cloud-fan is there a way to configure the parallelize for Range?

@dongjoon-hyun
Copy link
Member

Do you mean spark.range(..) in your test case? Then, numPartitions of spark.range(start: Long, end: Long, step: Long, numPartitions: Int) wasn't enough?

is there a way to configure the parallelize for Range?

@dongjoon-hyun
Copy link
Member

Gentle ping, @Ngone51 ~

@Ngone51
Copy link
Member Author

Ngone51 commented Jan 23, 2025

Then, numPartitions of spark.range(start: Long, end: Long, step: Long, numPartitions: Int) wasn't enough?

@dongjoon-hyun Thanks, this is what I'm looking for. Let me try it first.

(The test generates 2 tasks locally but 3 in GithubAction. So I'd liket explictly set the partitions to fix the stability of the test.)

@dongjoon-hyun
Copy link
Member

Maybe, could you rebase this PR to master branch once more, @Ngone51 ? The failure is a fixed one in the master branch.

[info] - SPARK-29442 Set `default` mode should override the existing mode *** FAILED *** (6 milliseconds)
[info]   java.lang.NoSuchFieldException: mode

@Ngone51
Copy link
Member Author

Ngone51 commented Jan 24, 2025

@dongjoon-hyun Thanks, will updated. But this PR's test failure still exits. I'll take a closer look.

2025-01-23T05:14:27.8423862Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m- SPARK-50806: HashAggregate codegen should be interrupted on task cancellation *** FAILED *** (1 minute, 5 seconds)�[0m�[0m
2025-01-23T05:14:27.8426704Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  The code passed to eventually never returned normally. Attempted 3956 times over 1.0001835922166666 minutes. Last failure message: 3 did not equal 2. (HashAggregateCodegenInterruptionSuite.scala:91)�[0m�[0m

@dongjoon-hyun
Copy link
Member

Oh. Got it.

@Ngone51
Copy link
Member Author

Ngone51 commented Jan 24, 2025

The failure is not related to numPartitions but job cancelation. The Github Action Spark logs shows that job SPARK-50806 fails to cancel because it can't be found in the active job list:

05:13:22.837 dag-scheduler-event-loop INFO DAGScheduler: Got map stage job 0 ($anonfun$withThreadLocalCaptured$2 at FutureTask.java:264) with 2 output partitions
05:13:22.838 dag-scheduler-event-loop INFO DAGScheduler: Final stage: ShuffleMapStage 0 ($anonfun$withThreadLocalCaptured$2 at FutureTask.java:264)
05:13:22.838 dag-scheduler-event-loop INFO DAGScheduler: Parents of final stage: List()
05:13:22.838 dag-scheduler-event-loop INFO DAGScheduler: Missing parents: List()
05:13:22.838 dag-scheduler-event-loop INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[9] at $anonfun$withThreadLocalCaptured$2 at FutureTask.java:264), which has no missing parents
05:13:22.838 dag-scheduler-event-loop INFO MemoryStore: MemoryStore started with capacity 2.4 GiB
05:13:22.839 dag-scheduler-event-loop INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 46.7 KiB, free 2.4 GiB)
05:13:22.851 dag-scheduler-event-loop INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.2 KiB, free 2.4 GiB)
05:13:22.851 dag-scheduler-event-loop INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1656
05:13:22.851 dag-scheduler-event-loop INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[9] at $anonfun$withThreadLocalCaptured$2 at FutureTask.java:264) (first 15 tasks are for partitions Vector(0, 1))
05:13:22.851 dag-scheduler-event-loop INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks resource profile 0
05:13:22.852 dispatcher-event-loop-0 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (localhost,executor driver, partition 0, PROCESS_LOCAL, 9968 bytes) 
05:13:22.852 dispatcher-event-loop-0 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1) (localhost,executor driver, partition 1, PROCESS_LOCAL, 9968 bytes) 
05:13:22.852 Executor task launch worker-0 INFO Executor: Starting executor with user classpath (userClassPathFirst = false): ''
05:13:22.852 Executor task launch worker-0 INFO Executor: Using REPL class URI: spark://localhost:35095/artifacts/69020f8b-80c6-45e1-9080-16fdff425ac4/classes/
05:13:22.852 Executor task launch worker-0 INFO Executor: Created or updated repl class loader org.apache.spark.executor.ExecutorClassLoader@36057c87 for 69020f8b-80c6-45e1-9080-16fdff425ac4.
05:13:22.852 Executor task launch worker for task 0.0 in stage 0.0 (TID 0) INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
05:13:22.852 Executor task launch worker for task 1.0 in stage 0.0 (TID 1) INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
05:13:22.859 Executor task launch worker for task 0.0 in stage 0.0 (TID 0) INFO TransportClientFactory: Successfully created connection to localhost/127.0.0.1:35095 after 0 ms (0 ms spent in bootstraps)
05:13:22.864 Executor task launch worker for task 0.0 in stage 0.0 (TID 0) INFO CodeGenerator: Code generated in 7.28645 ms
05:13:22.867 Executor task launch worker for task 0.0 in stage 0.0 (TID 0) INFO CodeGenerator: Code generated in 2.50331 ms
05:13:22.883 Executor task launch worker for task 0.0 in stage 0.0 (TID 0) INFO CodeGenerator: Code generated in 13.913607 ms
05:13:22.891 Executor task launch worker for task 0.0 in stage 0.0 (TID 0) INFO CodeGenerator: Code generated in 6.30746 ms
05:13:22.892 Executor task launch worker for task 0.0 in stage 0.0 (TID 0) INFO SecurityManager: Changing view acls to: runner
05:13:22.892 Executor task launch worker for task 0.0 in stage 0.0 (TID 0) INFO SecurityManager: Changing modify acls to: runner
05:13:22.892 Executor task launch worker for task 0.0 in stage 0.0 (TID 0) INFO SecurityManager: Changing view acls groups to: runner
05:13:22.892 Executor task launch worker for task 0.0 in stage 0.0 (TID 0) INFO SecurityManager: Changing modify acls groups to: runner
05:13:22.892 Executor task launch worker for task 0.0 in stage 0.0 (TID 0) INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: runner groups with view permissions: EMPTY; users with modify permissions: runner; groups with modify permissions: EMPTY; RPC SSL disabled
05:13:22.904 Executor task launch worker for task 0.0 in stage 0.0 (TID 0) INFO CodeGenerator: Code generated in 2.286265 ms
05:13:22.911 Executor task launch worker for task 0.0 in stage 0.0 (TID 0) INFO CodeGenerator: Code generated in 3.089951 ms
05:13:27.820 pool-1-thread-1-ScalaTest-running-HashAggregateCodegenInterruptionSuite INFO DAGScheduler: Asked to cancel job group SPARK-50806 with cancelFutureJobs=false
05:13:27.820 dag-scheduler-event-loop WARN DAGScheduler: Failed to cancel job group SPARK-50806. Cannot find active jobs for it.

However, this is the local Spark logs, where the job SPARK-50806 is canceled succesfully:

56:32.453 dag-scheduler-event-loop INFO DAGScheduler: Got map stage job 0 ($anonfun$withThreadLocalCaptured$2 at FutureTask.java:264) with 2 output partitions
15:56:32.455 dag-scheduler-event-loop INFO DAGScheduler: Final stage: ShuffleMapStage 0 ($anonfun$withThreadLocalCaptured$2 at FutureTask.java:264)
15:56:32.455 dag-scheduler-event-loop INFO DAGScheduler: Parents of final stage: List()
15:56:32.456 dag-scheduler-event-loop INFO DAGScheduler: Missing parents: List()
15:56:32.458 dag-scheduler-event-loop INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[9] at $anonfun$withThreadLocalCaptured$2 at FutureTask.java:264), which has no missing parents
15:56:32.474 dag-scheduler-event-loop INFO MemoryStore: MemoryStore started with capacity 2.4 GiB
15:56:32.482 dag-scheduler-event-loop INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 46.8 KiB, free 2.4 GiB)
15:56:33.001 dag-scheduler-event-loop INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.2 KiB, free 2.4 GiB)
15:56:33.008 dag-scheduler-event-loop INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1656
15:56:33.021 dag-scheduler-event-loop INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[9] at $anonfun$withThreadLocalCaptured$2 at FutureTask.java:264) (first 15 tasks are for partitions Vector(0, 1))
15:56:33.022 dag-scheduler-event-loop INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks resource profile 0
15:56:33.040 dispatcher-event-loop-1 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (192.168.1.5,executor driver, partition 0, PROCESS_LOCAL, 9970 bytes) 
15:56:33.042 dispatcher-event-loop-1 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1) (192.168.1.5,executor driver, partition 1, PROCESS_LOCAL, 9970 bytes) 
15:56:33.047 Executor task launch worker-0 INFO Executor: Starting executor with user classpath (userClassPathFirst = false): ''
15:56:33.048 Executor task launch worker-0 INFO Executor: Using REPL class URI: spark://192.168.1.5:58016/artifacts/dd408fe5-bacf-4bc5-a02f-0a1b6a038442/classes/
15:56:33.050 Executor task launch worker-0 INFO Executor: Created or updated repl class loader org.apache.spark.executor.ExecutorClassLoader@14e50195 for dd408fe5-bacf-4bc5-a02f-0a1b6a038442.
15:56:33.052 Executor task launch worker for task 0.0 in stage 0.0 (TID 0) INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15:56:33.052 Executor task launch worker for task 1.0 in stage 0.0 (TID 1) INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
15:56:33.121 Executor task launch worker for task 1.0 in stage 0.0 (TID 1) INFO TransportClientFactory: Successfully created connection to /192.168.1.5:58016 after 16 ms (0 ms spent in bootstraps)
15:56:33.143 Executor task launch worker for task 1.0 in stage 0.0 (TID 1) INFO CodeGenerator: Code generated in 46.866584 ms
15:56:33.160 Executor task launch worker for task 0.0 in stage 0.0 (TID 0) INFO CodeGenerator: Code generated in 6.688333 ms
15:56:33.195 Executor task launch worker for task 1.0 in stage 0.0 (TID 1) INFO CodeGenerator: Code generated in 28.92075 ms
15:56:33.215 Executor task launch worker for task 1.0 in stage 0.0 (TID 1) INFO CodeGenerator: Code generated in 12.730916 ms
15:56:33.220 Executor task launch worker for task 1.0 in stage 0.0 (TID 1) INFO SecurityManager: Changing view acls to: yi.wu
15:56:33.220 Executor task launch worker for task 1.0 in stage 0.0 (TID 1) INFO SecurityManager: Changing modify acls to: yi.wu
15:56:33.220 Executor task launch worker for task 1.0 in stage 0.0 (TID 1) INFO SecurityManager: Changing view acls groups to: yi.wu
15:56:33.220 Executor task launch worker for task 1.0 in stage 0.0 (TID 1) INFO SecurityManager: Changing modify acls groups to: yi.wu
15:56:33.220 Executor task launch worker for task 1.0 in stage 0.0 (TID 1) INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: yi.wu groups with view permissions: EMPTY; users with modify permissions: yi.wu; groups with modify permissions: EMPTY; RPC SSL disabled
15:56:33.234 Executor task launch worker for task 0.0 in stage 0.0 (TID 0) INFO CodeGenerator: Code generated in 4.3335 ms
15:56:33.264 Executor task launch worker for task 1.0 in stage 0.0 (TID 1) INFO CodeGenerator: Code generated in 5.507833 ms
15:56:37.204 pool-1-thread-1-ScalaTest-running-HashAggregateCodegenInterruptionSuite INFO DAGScheduler: Asked to cancel job group SPARK-50806 with cancelFutureJobs=false
15:56:37.284 dag-scheduler-event-loop INFO TaskSchedulerImpl: Canceling stage 0
15:56:37.284 dag-scheduler-event-loop INFO TaskSchedulerImpl: Killing all running tasks in stage 0: [SPARK_JOB_CANCELLED] Job 0 cancelled part of cancelled job group SPARK-50806 SQLSTATE: XXKDA
15:56:37.286 dag-scheduler-event-loop INFO TaskSchedulerImpl: Stage 0.0 was cancelled

The logs looks almost the same. It's confused why the job can't be found in Github Action's case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants