-
Notifications
You must be signed in to change notification settings - Fork 385
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
compression codec interface and Snappy support to reduce buffer size to improve scalability #685
Conversation
Thank you for your pull request. An admin will review this request soon. |
@@ -162,7 +162,8 @@ object SparkBuild extends Build { | |||
"cc.spray" % "spray-json_2.9.2" % "1.1.1" excludeAll(excludeNetty), | |||
"org.apache.mesos" % "mesos" % "0.9.0-incubating", | |||
"io.netty" % "netty-all" % "4.0.0.Beta2", | |||
"org.apache.derby" % "derby" % "10.4.2.0" % "test" | |||
"org.apache.derby" % "derby" % "10.4.2.0" % "test", | |||
"org.xerial.snappy" % "snappy-java" % "1.0.5" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, also update maven build.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure. I will add it.
Hi Gavin, Thanks for submitting this. Is there any particular reason you initialize the codec in wrapForCompression instead of during the class BlockManager construction? |
Jenkins, test this please. Whitelist this person. |
Hi Reynold, Yes. we want to support the user customized compression codec implementation. So we need to initialize it after users' jars are added to class loader in Executor.updateDependencies(), in the construction of BlockManager(in createFromSystemProperties()) that hasn't happened yet. |
@@ -902,8 +904,15 @@ private[spark] class BlockManager( | |||
* Wrap an output stream for compression if block compression is enabled for its block type | |||
*/ | |||
def wrapForCompression(blockId: String, s: OutputStream): OutputStream = { | |||
if (compressionCodec == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Gavin,
As we discussed in person a while ago, you can move the initialization of the compressionCodec to the place it was declared using "lazy val" in Scala.
The biggest scalability issue we observed in our tests is: in order to scale to bigger data size, we need to have more reduce splits, so when we want to scale to massive data, we end up with having too many reduce splits. Similar to what's stated in https://spark-project.atlassian.net/browse/SPARK-751. We believe #669 is very important.
While even with #669 we would still have many reduce splits, like in our case, processing 10 TB data needs 1 million reduce splits. One of the scalability bottleneck of this is, when we have many splits, there would be the same number of open writers as the number of splits. For each writer there's 1 compression stream and 1 buffer stream, which both contain some buffer internally. The current LZF compression library has 64K fixed buffer, which would cause OOM easily when there are too many reduce splits. (https://github.com/ning/compress/blob/master/src/main/java/com/ning/compress/lzf/LZFOutputStream.java)
In this change, compression codec interface is introduced with Snappy compression which support customization of buffer size. (We can also rewrite the output stream implementation of LZF to support configurable buffer size, while snappy already supports this so might be more straightforward to use snappy.) LZF ans Snappy can be configured from system properties. Other compression codec implementation can be defined in driver program as well.
We did some tests, Snappy is slightly faster and has slightly better compression rate than LZF when using the same buffer size. We also tested compression rate of Snappy in different buffer size:
15496 LZF (64k buffer)
15434 snappy (64k buffer)
16594 snappy (32k buffer)
18028 snappy (16k buffer)
So we can reduce the memory footprint by 4x with a slightly worse compression rate.