#Persistent Buffer
PersistentBuffer
is the first of a series of practical Akka Streams flow components. It works like the Akka Streams buffer with the difference that the content of the buffer is stored in a series of memory-mapped files in the directory given at construction of the PersistentBuffer
. This allows the buffer size to be virtually limitless, not use the JVM heap for storage, and have extremely good performance in the range of a million messages/second at the same time.
##Dependencies
The following dependencies are required for Persistent Buffer to work:
"org.squbs" %% "squbs-pattern" % squbsVersion,
"net.openhft" % "chronicle-queue" % "4.5.13"
##Examples
The following example shows the use of PersistentBuffer
in a stream:
implicit val serializer = QueueSerializer[ByteString]()
val source = Source(1 to 1000000).map { n => ByteString(s"Hello $n") }
val buffer = new PersistentBuffer[ByteString](new File("/tmp/myqueue"))
val counter = Flow[Any].map( _ => 1L).reduce(_ + _).toMat(Sink.head)(Keep.right)
val countFuture = source.via(buffer.async).runWith(counter)
This version shows the same in a GraphDSL:
implicit val serializer = QueueSerializer[ByteString]()
val source = Source(1 to 1000000).map { n => ByteString(s"Hello $n") }
val buffer = new PersistentBuffer[ByteString](new File("/tmp/myqueue"))
val counter = Flow[Any].map( _ => 1L).reduce(_ + _).toMat(Sink.head)(Keep.right)
val streamGraph = RunnableGraph.fromGraph(GraphDSL.create(counter) { implicit builder =>
sink =>
import GraphDSL.Implicits._
source ~> buffer.async ~> sink
ClosedShape
})
val countFuture = streamGraph.run()
##Back-Pressure
PersistentBuffer
does not back-pressure upstream. It will take all the stream elements given to it and grow its storage by increasing, or rotating, the number of queue files. It does not have any means to determine a limit on the buffer size or determine the storage capacity. Downstream back-pressure is honored as per Akka Streams and Reactive Streams requirements.
If the PersistentBuffer
stage gets fused with the downstream, PersistentBuffer
would not buffer and it would actually back-pressure. To make sure PersistentBuffer
actually runs in its own pace, add an async
boundary just after it.
##Failure & Recovery
Due to it's persistent nature, PersistentBuffer
can recover from abrupt stream shutdowns, failures, JVM failures or even potential system failures. A restart of a stream with the PersistentBuffer
on the same directory will start emitting the elements stored in the buffer and not yet consumed before the newly added elements. Elements consumed from the buffer but not yet finished processing at the time of the previous stream failure or shutdown will cause a loss of only those elements.
Since the buffer is backed by local storage, spindles or SSD, the performance and durability of this buffer is also dependent on the durability of this storage. A system malfunction or storage corruption may cause total loss of all elements in the buffer. So it is important to understand and assume the durability of this buffer not at the level of databases or other off-host persistent stores, in exchange for much higher performance.
Akka Streams stages batch the requests and buffers the records internally. PersistentBuffer
guarantees the recovery and persistence of the records that reached to onPush
, the records that are in Akka Stream stage's internal buffer that has not reached to onPush
yet would be lost during a failure.
##Commit Guarantee
In case of an unexpected failure, elements emitted from the PersistentBuffer
stage but not yet reached to a sink
would be lost. Sometimes, it might be required to avoid such data loss. Using a commit
stage before a sink
might help in such case. To add a commit
stage, use PersistentBufferAtLeastOnce
instead. Please see below example for commit
stage usage:
implicit val serializer = QueueSerializer[ByteString]()
val source = Source(1 to 1000000).map { n => ByteString(s"Hello $n") }
val tempPath = new File("/tmp/myqueue")
val config = ConfigFactory.parseMap {
Map(
"persist-dir" -> s"${tempPath.getAbsolutePath}"
)
}
val buffer = new PersistentBufferAtLeastOnce[ByteString](config)
val commit = buffer.commit[ByteString]
val flowSink = // do some transformation or a sink flow with expected failure
val counter = Flow[Any].map( _ => 1L).reduce(_ + _).toMat(Sink.head)(Keep.right)
val streamGraph = RunnableGraph.fromGraph(GraphDSL.create(counter) { implicit builder =>
sink =>
import GraphDSL.Implicits._
// ensures that records are reprocessed when something fails at tranform flow
source ~> buffer ~> flowSink ~> commit ~> sink
ClosedShape
})
val countFuture = streamGraph.run()
Please note, commit
does not prevent the loss of messages in a sink
's (or any other stage's after commit
) internal buffer.
###Commit Order
The commit
stage should normally receive the elements in the index order. However, a potential bug in a stream may cause an element to be dropped or reach to commit
stage out of order. The default commit-order-policy
is set to lenient
to let the stream continue in such scenarios. You can set it to strict
for a CommitOrderException
to be thrown and let the Supervision.Decider
determine what action to take.
##Space Management
A typical directory for persisting the queue looks like the followings:
$ ls -l
-rw-r--r-- 1 squbs_user 110054053 83886080 May 17 20:00 20160518.cq4
-rw-r--r-- 1 squbs_user 110054053 8192 May 17 20:00 tailer.idx
Queue files are deleted automatically once all the readers have successfully processed reading the queue.
##Configuration
The queue can be created by passing just a location of the persistent directory keeping all default configuration. This is seen in all the examples above. Alternatively, it can be created by passing a Config
object at construction. The Config
object is a standard HOCON configuration. The following example shows constructing a PersistentBuffer
using a Config
:
val configText =
"""
| persist-dir = /tmp/myQueue
| roll-cycle = xlarge_daily
| wire-type = compressed_binary
| block-size = 80m
""".stripMargin
val config = ConfigFactory.parseString(configText)
// Construct the buffer using a Config.
val buffer = new PersistentBuffer[ByteString](config)
The following configuration properties are used for the PersistentBuffer
persist-dir = /tmp/myQueue # Required
roll-cycle = daily # Optional, defaults to daily
wire-type = binary # Optional, defaults to binary
block-size = 80m # Optional, defaults to 64m
index-spacing = 16k # Optional, defaults to roll-cycle's spacing
index-count = 16 # Optional, defaults to roll-cycle's count
commit-order-policy = lenient # Optional, default to lenient
Roll-cycle can be specified in lower or upper case. Supported values for roll-cycle
are as follows:
Roll Cycle | Capacity |
---|---|
MINUTELY | 64 million entries per minute |
HOURLY | 256 million entries per hour |
SMALL_DAILY | 512 million entries per day |
DAILY | 4 billion entries per day |
LARGE_DAILY | 32 billion entries per day |
XLARGE_DAILY | 2 trillion entries per day |
HUGE_DAILY | 256 trillion entries per day |
Wire-type can be specified in lower or upper case. Supported values for wire-type
are as follows:
- TEXT
- BINARY
- FIELDLESS_BINARY
- COMPRESSED_BINARY
- JSON
- RAW
- CSV
The memory sizes such as block-size
and index-spacing
are specified according to the memory size format defined in the HOCON specification.
##Serialization
A QueueSerializer[T]
needs to be implicitly provided for a PersistentBuffer[T]
, as seen in the examples above:
implicit val serializer = QueueSerializer[ByteString]()
The QueueSerializer[T]()
call produces a serializer for your target type. It depends on the serialization and deserialization of the underlying infrastructure.
###Implementing a Serializer To control the fine-grained persistent format in the queue, you may want to implement your own serializer as follows:
case class Person(name: String, age: Int)
class PersonSerializer extends QueueSerializer[Person] {
override def readElement(wire: WireIn): Option[Person] = {
for {
name <- Option(wire.read().`object`(classOf[String]))
age <- Option(wire.read().int32)
} yield { Person(name, age) }
}
override def writeElement(element: Person, wire: WireOut): Unit = {
wire.write().`object`(classOf[String], element.name)
wire.write().int32(element.age)
}
}
To use this serializer, just declare it implicitly before constructing the PersistentBuffer
as follows:
implicit val serializer = new PersonSerializer()
val buffer = new PersistentBuffer[Person](new File("/tmp/myqueue")
##Broadcast Buffer
BroadcastBuffer
is a variant of persistent buffer. This works similar to PersistentBuffer
except that stream elements are broadcasted to multiple output ports. Hence it is a combination of buffer and broadcast stages. The configuration takes an additional parameter named output-ports
which specifies the number of output ports.
A broadcast buffer is specially required when stream elements are to be emitted from each output port at an independent rate depending on the speed of downstream demand.
val configText =
"""
| persist-dir = /tmp/myQueue
| roll-cycle = xlarge_daily
| wire-type = compressed_binary
| block-size = 80m
| output-ports = 3
""".stripMargin
val config = ConfigFactory.parseString(configText)
// Construct the buffer using a Config.
val bcBuffer = new BroadcastBuffer[ByteString](config)
##Examples
implicit val serializer = QueueSerializer[ByteString]()
val in = Source(1 to 100000)
val flowCounter = Flow[Any].map(_ => 1L).reduce(_ + _).toMat(Sink.head)(Keep.right)
val streamGraph = RunnableGraph.fromGraph(GraphDSL.create(flowCounter) { implicit builder =>
sink =>
import GraphDSL.Implicits._
val buffer = new BroadcastBufferAtLeastOnce[ByteString](config)
val commit = buffer.commit[ByteString]
val bcBuffer = builder.add(buffer.async)
val mr = builder.add(merge)
in ~> transform ~> bcBuffer ~> commit ~> mr ~> sink
bcBuffer ~> commit ~> mr
bcBuffer ~> commit ~> mr
ClosedShape
})
val countFuture = streamGraph.run()
##Credits
PersistentBuffer
utilizes Chronicle-Queue 4.x as high-performance memory-mapped queue persistence.