Skip to content

Commit

Permalink
Merge pull request #228 from tumblr/fix-sentbuffer
Browse files Browse the repository at this point in the history
fixing bug with respecting sentBuffer max size
  • Loading branch information
DanSimon committed Jul 13, 2015
2 parents 71d65d9 + c2f3f37 commit 743f0a9
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ class MockWriteBuffer(val maxWriteSize: Int, handler: Option[ConnectionHandler]
assert(call == data, s"expected '${data.utf8String}', got '${call.utf8String}'")
}

/**
* Expect exactly `num` writes
*/
def expectNumWrites(num: Int) {
assert(writeCalls.size == num, s"expected exactly $num writes, but ${writeCalls.size} writes occurred")
writeCalls.clear()
}

def expectOneWrite(data: ByteString) {
assert(writeCalls.size == 1, s"expected exactly one write, but ${writeCalls.size} writes occurred")
expectWrite(data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,23 @@ class ServiceClientSpec extends ColossusSpec {
res2 must equal(Some(rep2.message))
}

"not overflow sentBuffer when draining from pending" in {
// here we're checking to make sure that if we've previously paused
// writes, and then resume writing, that we continue to respect the max
// sentBuffer size.
val (endpoint, client, probe) = newClient(true, 1)
val cmds = (0 to 3).map{i =>
client.send(Command(CMD_GET, "foo")).execute()
}
val reply = StatusReply("foo")
(0 to 3).map{i =>
endpoint.expectNumWrites(1)
endpoint.clearBuffer()
endpoint.expectNumWrites(0)
client.receivedData(reply.raw)
}
}

"gracefully disconnect" in {
val cmd1 = Command(CMD_GET, "foo")
val cmd2 = Command(CMD_GET, "bar")
Expand Down Expand Up @@ -400,7 +417,7 @@ class ServiceClientSpec extends ColossusSpec {
}


"not attempt reconnect when autoReconnect is false" taggedAs(Tag("wat")) in {
"not attempt reconnect when autoReconnect is false" in {
withIOSystem{ implicit io =>
val server = Service.serve[Raw]("rawwww", TEST_PORT) {_.handle{con => con.become{
case foo => {
Expand Down
13 changes: 7 additions & 6 deletions colossus/src/main/scala/colossus/service/ServiceClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -274,15 +274,16 @@ extends Controller[O,I](codec, ControllerConfig(config.pendingBufferSize, config
s.handler(Failure(new NotConnectedException("Not Connected")))
} else if (isConnected || !failFast) {
val pushed = push(s.message, s.start){
case OutputResult.Success => sentBuffer.enqueue(s)
case OutputResult.Success => {
sentBuffer.enqueue(s)
if (sentBuffer.size >= config.sentBufferSize) {
pauseWrites() //writes resumed in processMessage
}
}
case OutputResult.Failure(err) => s.handler(Failure(err))
case OutputResult.Cancelled(err) => s.handler(Failure(err))
}
if (pushed) {
if (sentBuffer.size >= config.sentBufferSize) {
pauseWrites() //writes resumed in processMessage
}
} else {
if (!pushed) {
s.handler(Failure(new ClientOverloadedException(s"Error sending ${s.message}: Client is overloaded")))
}
} else {
Expand Down

0 comments on commit 743f0a9

Please sign in to comment.