Skip to content

Commit

Permalink
Merge pull request #1456 from Friendseeker/Ichoran-fixed-Gzip-stream
Browse files Browse the repository at this point in the history
[2.x] Replace `ParallelGzipOutputStream` with Ichoran's  implementation
  • Loading branch information
eed3si9n authored Oct 15, 2024
2 parents fec4db9 + d35e028 commit 41dd74e
Show file tree
Hide file tree
Showing 6 changed files with 376 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,25 @@ package sbt.internal.inc.consistent
* additional information regarding copyright ownership.
*/

import java.io.{ File, FileInputStream, FileOutputStream }
import java.util.Optional
import sbt.io.{ IO, Using }
import xsbti.compile.analysis.ReadWriteMappers
import xsbti.compile.{ AnalysisContents, AnalysisStore => XAnalysisStore }

import java.io.{ File, FileInputStream, FileOutputStream }
import java.util.Optional
import scala.util.control.Exception.allCatch
import xsbti.compile.analysis.ReadWriteMappers

import scala.concurrent.ExecutionContext

object ConsistentFileAnalysisStore {
def text(
file: File,
mappers: ReadWriteMappers,
sort: Boolean = true,
ec: ExecutionContext = ExecutionContext.global,
parallelism: Int = Runtime.getRuntime.availableProcessors()
): XAnalysisStore =
new AStore(
file,
new ConsistentAnalysisFormat(mappers, sort),
SerializerFactory.text,
ec,
parallelism
)

Expand All @@ -59,22 +55,19 @@ object ConsistentFileAnalysisStore {
file: File,
mappers: ReadWriteMappers,
sort: Boolean,
ec: ExecutionContext = ExecutionContext.global,
parallelism: Int = Runtime.getRuntime.availableProcessors()
): XAnalysisStore =
new AStore(
file,
new ConsistentAnalysisFormat(mappers, sort),
SerializerFactory.binary,
ec,
parallelism
)

private final class AStore[S <: Serializer, D <: Deserializer](
file: File,
format: ConsistentAnalysisFormat,
sf: SerializerFactory[S, D],
ec: ExecutionContext = ExecutionContext.global,
parallelism: Int = Runtime.getRuntime.availableProcessors()
) extends XAnalysisStore {

Expand All @@ -85,7 +78,7 @@ object ConsistentFileAnalysisStore {
if (!file.getParentFile.exists()) file.getParentFile.mkdirs()
val fout = new FileOutputStream(tmpAnalysisFile)
try {
val gout = new ParallelGzipOutputStream(fout, ec, parallelism)
val gout = new ParallelGzipOutputStream(fout, parallelism)
val ser = sf.serializerFor(gout)
format.write(ser, analysis, setup)
gout.close()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,123 +1,219 @@
// Original code by Stefan Zeiger (see https://github.com/szeiger/zinc/blob/1d296b2fbeaae1cf14e4c00db0bbc2203f9783a4/internal/zinc-persist/src/main/scala/sbt/internal/inc/consistent/NewParallelGzipOutputStream.scala)
// Modified by Rex Kerr to use Java threads directly rather than Future
package sbt.internal.inc.consistent

import java.io.{ ByteArrayOutputStream, FilterOutputStream, OutputStream }
import java.util.zip.{ CRC32, Deflater, DeflaterOutputStream }

import java.util.concurrent.{ SynchronousQueue, ArrayBlockingQueue, LinkedTransferQueue }

import scala.annotation.tailrec
import scala.concurrent.duration.Duration
import scala.concurrent.{ Await, ExecutionContext, Future }
import scala.collection.mutable

/**
* Parallel gzip compression. Algorithm based on https://github.com/shevek/parallelgzip
* with additional optimization and simplification. This is essentially a block-buffered
* stream but instead of writing a full block to the underlying output, it is passed to a
* thread pool for compression and the Futures of compressed blocks are collected when
* flushing.
* thread pool for compression and the compressed blocks are collected when flushing.
*/
object ParallelGzipOutputStream {
private val blockSize = 64 * 1024
private val compression = Deflater.DEFAULT_COMPRESSION

private class BufOut(size: Int) extends ByteArrayOutputStream(size) {
def writeTo(buf: Array[Byte]): Unit = System.arraycopy(this.buf, 0, buf, 0, count)
// Holds an input buffer to load data and an output buffer to write
// the compressed data into. Compressing clears the input buffer.
// Compressed data can be retrieved with `output.writeTo(OutputStream)`.
private final class Block(var index: Int) {
val input = new Array[Byte](blockSize)
var inputN = 0
val output = new ByteArrayOutputStream(blockSize + (blockSize >> 3))
val deflater = new Deflater(compression, true)
val dos = new DeflaterOutputStream(output, deflater, true)

def compress(): Unit = {
deflater.reset()
output.reset()
dos.write(input, 0, inputN)
dos.flush()
inputN = 0
}
}

private class Worker {
private[this] val defl = new Deflater(compression, true)
private[this] val buf = new BufOut(blockSize + (blockSize >> 3))
private[this] val out = new DeflaterOutputStream(buf, defl, true)
def compress(b: Block): Unit = {
defl.reset()
buf.reset()
out.write(b.data, 0, b.length)
out.flush()
b.length = buf.size
if (b.length > b.data.length) b.data = new Array[Byte](b.length)
buf.writeTo(b.data)
// Waits for data to appear in a SynchronousQueue.
// When it does, compress it and pass it along. Also put self in a pool of workers awaiting more work.
// If data does not appear but a `None` appears instead, cease running (and do not add self to work queue).
private final class Worker(
val workers: ArrayBlockingQueue[Worker],
val compressed: LinkedTransferQueue[Either[Int, Block]]
) extends Thread {
val work = new SynchronousQueue[Option[Block]]

@tailrec
def loop(): Unit = {
work.take() match {
case Some(block) =>
block.compress()
compressed.put(Right(block))
workers.put(this)
loop()
case _ =>
}
}
}

private val localWorker = new ThreadLocal[Worker] {
override def initialValue = new Worker
override def run(): Unit = {
loop()
}
}

private class Block {
var data = new Array[Byte](blockSize + (blockSize >> 3))
var length = 0
// Waits for data to appear in a LinkedTransferQueue.
// When it does, place it into a sorted tree and, if the data is in order, write it out.
// Once the data has been written, place it into a cache for completed buffers.
// If data does not appear but an integer appears instead, set a mark to quit once
// that many blocks have been written.
private final class Scribe(out: OutputStream, val completed: LinkedTransferQueue[Block])
extends Thread {
val work = new LinkedTransferQueue[Either[Int, Block]]
private val tree = new collection.mutable.TreeMap[Int, Block]
private var next = 0
private var stopAt = Int.MaxValue

@tailrec
def loop(): Unit = {
work.take() match {
case Right(block) =>
tree(block.index) = block
case Left(limit) =>
stopAt = limit
}
while (tree.nonEmpty && tree.head._2.index == next) {
val block = tree.remove(next).get
block.output.writeTo(out)
completed.put(block)
next += 1
}
if (next < stopAt) loop()
}

override def run(): Unit = {
loop()
}
}

private val header = Array[Byte](0x1f.toByte, 0x8b.toByte, Deflater.DEFLATED, 0, 0, 0, 0, 0, 0, 0)
}

final class ParallelGzipOutputStream(out: OutputStream, ec: ExecutionContext, parallelism: Int)
/**
* Implements a parallel chunked compression algorithm (using minimum of two extra threads).
* Note that the methods in this class are not themselves threadsafe; this class
* has "interior concurrency" (c.f. interior mutability). In particular, writing
* concurrent with or after a close operation is not defined.
*/
final class ParallelGzipOutputStream(out: OutputStream, parallelism: Int)
extends FilterOutputStream(out) {
import ParallelGzipOutputStream._

private final val crc = new CRC32
private final val queueLimit = parallelism * 3
// preferred on 2.13: new mutable.ArrayDeque[Future[Block]](queueLimit)
private final val pending = mutable.Queue.empty[Future[Block]]
private var current: Block = new Block
private var free: Block = _
private var total = 0L
private val crc = new CRC32
private var totalBlocks = 0
private var totalCount = 0L

private val bufferLimit = parallelism * 3
private var bufferCount = 1
private var current = new Block(0)

private val workerCount = math.max(1, parallelism - 1)
private val workers = new ArrayBlockingQueue[Worker](workerCount)
private val buffers = new LinkedTransferQueue[Block]()

out.write(header)
private val scribe = new Scribe(out, buffers)
scribe.start()

while (workers.remainingCapacity() > 0) {
val w = new Worker(workers, scribe.work)
workers.put(w)
w.start()
}

override def write(b: Int): Unit = write(Array[Byte]((b & 0xff).toByte))
override def write(b: Array[Byte]): Unit = write(b, 0, b.length)

@tailrec override def write(b: Array[Byte], off: Int, len: Int): Unit = {
val copy = math.min(len, blockSize - current.length)
@tailrec
override def write(b: Array[Byte], off: Int, len: Int): Unit = {
val copy = math.min(len, blockSize - current.inputN)
crc.update(b, off, copy)
total += copy
System.arraycopy(b, off, current.data, current.length, copy)
current.length += copy
totalCount += copy
System.arraycopy(b, off, current.input, current.inputN, copy)
current.inputN += copy
if (copy < len) {
submit()
write(b, off + copy, len - copy)
}
}

private[this] def submit(): Unit = {
flushUntil(queueLimit - 1)
val finalBlock = current
pending += Future { localWorker.get.compress(finalBlock); finalBlock }(ec)
if (free != null) {
current = free
free = null
} else current = new Block()
private def submit(): Unit = {
val w = workers.take()
w.work.put(Some(current))
totalBlocks += 1
current = buffers.poll()
if (current eq null) {
if (bufferCount < bufferLimit) {
current = new Block(totalBlocks)
bufferCount += 1
} else {
current = buffers.take()
}
}
current.index = totalBlocks
}

private def flushUntil(remaining: Int): Unit =
while (pending.length > remaining || pending.headOption.exists(_.isCompleted)) {
val b = Await.result(pending.dequeue(), Duration.Inf)
out.write(b.data, 0, b.length)
b.length = 0
free = b
private def flushImpl(shutdown: Boolean): Unit = {
val fetched = new Array[Block](bufferCount - 1)
var n = 0
// If we have all the buffers, all pending work is done.
while (n < fetched.length) {
fetched(n) = buffers.take()
n += 1
}
if (shutdown) {
// Send stop signal to workers and scribe
n = workerCount
while (n > 0) {
workers.take().work.put(None)
n -= 1
}
scribe.work.put(Left(totalBlocks))
} else {
// Put all the buffers back so we can keep accepting data.
n = 0
while (n < fetched.length) {
buffers.put(fetched(n))
n += 1
}
}
}

/**
* Blocks until all pending data is written. Note that this is a poor use of a parallel data writing class.
* It is preferable to write all data and then close the stream. Note also that a flushed stream will not
* have the trailing CRC checksum and therefore will not be a valid compressed file, so there is little point
* flushing early.
*/
override def flush(): Unit = {
if (current.length > 0) submit()
flushUntil(0)
if (current.inputN > 0) submit()
flushImpl(false)
super.flush()
}

override def close(): Unit = {
flush()
if (current.inputN > 0) submit()
flushImpl(true)

val buf = new Array[Byte](10)
def int(i: Int, off: Int): Unit = {
buf(off) = ((i & 0xff).toByte)
buf(off + 1) = (((i >>> 8) & 0xff).toByte)
buf(off + 2) = (((i >>> 16) & 0xff).toByte)
buf(off + 3) = (((i >>> 24) & 0xff).toByte)
}
buf(0) = 3
int(crc.getValue.toInt, 2)
int((total & 0xffffffffL).toInt, 6)
val bb = java.nio.ByteBuffer.wrap(buf)
bb.order(java.nio.ByteOrder.LITTLE_ENDIAN)
bb.putShort(3)
bb.putInt(crc.getValue.toInt)
bb.putInt((totalCount & 0xffffffffL).toInt)
out.write(buf)

out.close()
total = Integer.MIN_VALUE
free = null
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package sbt.inc.consistent

import java.io.File
import java.io.{ File, FileInputStream }
import java.util.Arrays
import org.scalatest.funsuite.AnyFunSuite
import sbt.internal.inc.consistent.ConsistentFileAnalysisStore
import sbt.internal.inc.{ Analysis, FileAnalysisStore }
import sbt.io.IO
import sbt.io.{ IO, Using }
import xsbti.compile.{ AnalysisContents, AnalysisStore }
import xsbti.compile.analysis.ReadWriteMappers

Expand Down Expand Up @@ -50,6 +50,22 @@ class ConsistentAnalysisFormatIntegrationSuite extends AnyFunSuite {
}
}

test("compression ratio") {
for (d <- data) {
assert(d.exists())
val api = read(FileAnalysisStore.text(d))
val file = write("cbin1.zip", api)
val uncompressedSize = Using.gzipInputStream(new FileInputStream(file)) { in =>
val content = IO.readBytes(in)
content.length
}
val compressedSize = d.length()
val compressionRatio = compressedSize.toDouble / uncompressedSize.toDouble
assert(compressionRatio < 0.85)
// compression rate for each data: 0.8185090254676337, 0.7247774786370688, 0.8346021341469837
}
}

def read(store: AnalysisStore): AnalysisContents = {
val api = store.unsafeGet()
// Force loading of companion file and check that the companion data is present:
Expand Down
Loading

0 comments on commit 41dd74e

Please sign in to comment.