Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shuffle consolidation #669

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open

Shuffle consolidation #669

wants to merge 8 commits into from

Conversation

jason-dai
Copy link

In Spark, it is common practice (and usually preferred) to launch a large number of small tasks, which unfortunately can create an even larger number of very small shuffle files – one of the major shuffle performance issues. To address this issue, this change will combine multiple such files (for the same reduce partition or bucket) into one single large shuffle file. Please see SPARK-751 for the detailed design.

@AmplabJenkins
Copy link

Thank you for your pull request. An admin will review this request soon.

@mateiz
Copy link
Member

mateiz commented Jul 6, 2013

Hey Jason, I read the design document and this looks very good! Thanks for writing that up. I'm going to go over the code in detail in the next few days to see if I have comments.

@mateiz
Copy link
Member

mateiz commented Jul 13, 2013

Jenkins, this is ok to test

@AmplabJenkins
Copy link

Thank you for submitting this pull request. Unfortunately, the automated tests for this request have failed.
Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/199/

@mateiz
Copy link
Member

mateiz commented Jul 14, 2013

Jenkins, retest this please

@AmplabJenkins
Copy link

Thank you for submitting this pull request. Unfortunately, the automated tests for this request have failed.
Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/203/

@rxin
Copy link
Member

rxin commented Jul 14, 2013

Matei - you can review this now but don't merge it. I talked to @jason-dai the other day and he said there were a couple more changes he wanted to add.

@jason-dai
Copy link
Author

I have actually made all the changes I want :-)

@@ -18,33 +18,34 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
val blockManager = SparkEnv.get.blockManager

val startTime = System.currentTimeMillis
val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)
val (mapLocations, blockSizes)= SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put a space after the =

@mateiz
Copy link
Member

mateiz commented Jul 18, 2013

Hey Jason,

I've looked through this in more detail now and made some comments, mostly about style. Overall it looks very good. Before merging it though I also want to test it on a cluster in various scenarios -- we're working on a performance test suite to validate these kinds of changes. But thanks for putting this together!

@AmplabJenkins
Copy link

Thank you for submitting this pull request.

Unfortunately, the automated tests for this request have failed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/244/

@AmplabJenkins
Copy link

Thank you for submitting this pull request.

Unfortunately, the automated tests for this request have failed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/246/

@mateiz
Copy link
Member

mateiz commented Jul 22, 2013

Hey Jason, so FYI, some unit tests are legitimately failing with this (https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/246/) -- it seems to be because the tests compare some arrays of Longs by equality instead of dealing with the individual members.

@AmplabJenkins
Copy link

Thank you for submitting this pull request.

Unfortunately, the automated tests for this request have failed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/300/

@pwendell
Copy link
Contributor

I just tried to run this on a cluster and jobs failed with a fetch failure issue:

13/07/29 05:12:33 INFO cluster.ClusterTaskSetManager: Lost TID 415 (task 1.0:15)
13/07/29 05:12:33 INFO cluster.ClusterTaskSetManager: Loss was due to fetch failure from null

I can try and dig into this more tomorrow.

@jason-dai
Copy link
Author

We are setting up a cluster for large scale testing as well.

@AmplabJenkins
Copy link

Thank you for your pull request. An admin will review this request soon.

@mateiz
Copy link
Member

mateiz commented Aug 6, 2013

Hey Jason, just curious, have you had a chance to look into this? We'd like to code-freeze Spark 0.8 soon (ideally at the end of this week), and it would be nice to get this into it. There can be some time for bug fixing after but we should make sure it's working well before we add it. Of course, if it's too early, we'll just push it to a future release.

@jason-dai
Copy link
Author

Hi Matei - sorry we haven't had enough time to look into this yet. Maybe we should push it to a future release, as we'll be working on the graphx performance in the next couple of weeks.

@mateiz
Copy link
Member

mateiz commented Aug 6, 2013

No worries if it can't be done now, but I'm curious, have you guys been running with this code for a while, or are you still using the old version internally? Basically I'm wondering whether it requires a lot of testing or just a little. We can help with the testing too.

@jason-dai
Copy link
Author

Small scale testing works fine, but we ran into some wired failures in large scale testing and had not had enough time to look into it.

ankurdave pushed a commit to amplab/graphx that referenced this pull request Oct 30, 2013
The Spark shuffle phase can produce a large number of files, as one file is created
per mapper per reducer. For large or repeated jobs, this often produces millions of
shuffle files, which sees extremely degredaded performance from the OS file system.
This patch seeks to reduce that burden by combining multipe shuffle files into one.

This PR draws upon the work of Jason Dai in mesos/spark#669.
However, it simplifies the design in order to get the majority of the gain with less
overall intellectual and code burden. The vast majority of code in this pull request
is a refactor to allow the insertion of a clean layer of indirection between logical
block ids and physical files. This, I feel, provides some design clarity in addition
to enabling shuffle file consolidation.

The main goal is to produce one shuffle file per reducer per active mapper thread.
This allows us to isolate the mappers (simplifying the failure modes), while still
allowing us to reduce the number of mappers tremendously for large tasks. In order
to accomplish this, we simply create a new set of shuffle files for every parallel
task, and return the files to a pool which will be given out to the next run task.
rxin added a commit to amplab/graphx that referenced this pull request Oct 30, 2013
Basic shuffle file consolidation

The Spark shuffle phase can produce a large number of files, as one file is created
per mapper per reducer. For large or repeated jobs, this often produces millions of
shuffle files, which sees extremely degredaded performance from the OS file system.
This patch seeks to reduce that burden by combining multipe shuffle files into one.

This PR draws upon the work of @jason-dai in mesos/spark#669.
However, it simplifies the design in order to get the majority of the gain with less
overall intellectual and code burden. The vast majority of code in this pull request
is a refactor to allow the insertion of a clean layer of indirection between logical
block ids and physical files. This, I feel, provides some design clarity in addition
to enabling shuffle file consolidation.

The main goal is to produce one shuffle file per reducer per active mapper thread.
This allows us to isolate the mappers (simplifying the failure modes), while still
allowing us to reduce the number of mappers tremendously for large tasks. In order
to accomplish this, we simply create a new set of shuffle files for every parallel
task, and return the files to a pool which will be given out to the next run task.

I have run some ad hoc query testing on 5 m1.xlarge EC2 nodes with 2g of executor memory and the following microbenchmark:

    scala> val nums = sc.parallelize(1 to 1000, 1000).flatMap(x => (1 to 1e6.toInt))
    scala> def time(x: => Unit) = { val now = System.currentTimeMillis; x; System.currentTimeMillis - now }
    scala> (1 to 8).map(_ => time(nums.map(x => (x % 100000, 2000, x)).reduceByKey(_ + _).count) / 1000.0)

For this particular workload, with 1000 mappers and 2000 reducers, I saw the old method running at around 15 minutes, with the consolidated shuffle files running at around 4 minutes. There was a very sharp increase in running time for the non-consolidated version after around 1 million total shuffle files. Below this threshold, however, there wasn't a significant difference between the two.

Better performance measurement of this patch is warranted, and I plan on doing so in the near future as part of a general investigation of our shuffle file bottlenecks and performance.
markhamstra pushed a commit to alteryx/spark that referenced this pull request Feb 27, 2014
Basic shuffle file consolidation

The Spark shuffle phase can produce a large number of files, as one file is created
per mapper per reducer. For large or repeated jobs, this often produces millions of
shuffle files, which sees extremely degredaded performance from the OS file system.
This patch seeks to reduce that burden by combining multipe shuffle files into one.

This PR draws upon the work of @jason-dai in mesos/spark#669.
However, it simplifies the design in order to get the majority of the gain with less
overall intellectual and code burden. The vast majority of code in this pull request
is a refactor to allow the insertion of a clean layer of indirection between logical
block ids and physical files. This, I feel, provides some design clarity in addition
to enabling shuffle file consolidation.

The main goal is to produce one shuffle file per reducer per active mapper thread.
This allows us to isolate the mappers (simplifying the failure modes), while still
allowing us to reduce the number of mappers tremendously for large tasks. In order
to accomplish this, we simply create a new set of shuffle files for every parallel
task, and return the files to a pool which will be given out to the next run task.

I have run some ad hoc query testing on 5 m1.xlarge EC2 nodes with 2g of executor memory and the following microbenchmark:

    scala> val nums = sc.parallelize(1 to 1000, 1000).flatMap(x => (1 to 1e6.toInt))
    scala> def time(x: => Unit) = { val now = System.currentTimeMillis; x; System.currentTimeMillis - now }
    scala> (1 to 8).map(_ => time(nums.map(x => (x % 100000, 2000, x)).reduceByKey(_ + _).count) / 1000.0)

For this particular workload, with 1000 mappers and 2000 reducers, I saw the old method running at around 15 minutes, with the consolidated shuffle files running at around 4 minutes. There was a very sharp increase in running time for the non-consolidated version after around 1 million total shuffle files. Below this threshold, however, there wasn't a significant difference between the two.

Better performance measurement of this patch is warranted, and I plan on doing so in the near future as part of a general investigation of our shuffle file bottlenecks and performance.

(cherry picked from commit 48952d6)
Signed-off-by: Reynold Xin <[email protected]>
xiajunluan pushed a commit to xiajunluan/spark that referenced this pull request May 30, 2014
…ributions

Also moves a few lines of code around in make-distribution.sh.

Author: Patrick Wendell <[email protected]>

Closes mesos#669 from pwendell/make-distribution and squashes the following commits:

8bfac49 [Patrick Wendell] Small fix
46918ec [Patrick Wendell] SPARK-1737: Warn rather than fail when Java 7+ is used to create distributions.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants