Skip to content

Commit

Permalink
Add retry logic to writes
Browse files Browse the repository at this point in the history
  • Loading branch information
caspiano committed Jul 15, 2020
1 parent 20e7222 commit df74fd4
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 14 deletions.
4 changes: 2 additions & 2 deletions shard.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: rethinkdb
version: 0.2.0
crystal: 0.34.0
version: 0.2.1
crystal: ~> 0.34
license: MIT

dependencies:
Expand Down
6 changes: 6 additions & 0 deletions spec/spec_helper.cr
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
require "spec"
require "log"

require "../src/crystal-rethinkdb"
include RethinkDB::Shortcuts

Spec.before_suite do
::Log.setup "*", Log::Severity::Debug
end

module Generators
@@i = 0

Expand Down
43 changes: 31 additions & 12 deletions src/rethinkdb/connection.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "json"
require "log"
require "retriable"
require "socket"
require "socket/tcp_socket"
Expand All @@ -13,6 +14,8 @@ require "./serialization"

module RethinkDB
class Connection
Log = ::Log.for(self)

# Authentication
getter user
private getter password
Expand Down Expand Up @@ -65,16 +68,9 @@ module RethinkDB
end
sock.close
rescue e
Log.error(exception: e) { "reconnecting" }
sock.close
write_lock.synchronize do
reset_channels
reset_id
# Create a new socket
@sock = TCPSocket.new(host, port)
sock.sync = false
connect
authorise(user, password)
end
write_lock.synchronize { reconnect }
raise e
end
end
Expand Down Expand Up @@ -128,14 +124,37 @@ module RethinkDB
protected getter write_lock = Mutex.new(Mutex::Protection::Reentrant)

protected def write(data)
write_lock.synchronize {
try_write do
sock.write(data)
sock.flush
end
end

protected def reconnect
reset_channels
reset_id
# Create a new socket
@sock = TCPSocket.new(host, port)
sock.sync = false
connect
authorise(user, password)
end

protected def try_write
write_lock.synchronize {
yield
}
rescue e
# Retry in the read loop
sock.close
raise e
end

protected def read
sock.gets('\0', true).not_nil!
rescue e : NilAssertionError
sock.close
raise ConnectionException.new("Socket closed")
end

@next_id : UInt64 = 1_u64
Expand Down Expand Up @@ -323,12 +342,12 @@ module RethinkDB
end

query_slice = query.to_slice
conn.write_lock.synchronize {
conn.try_write do
conn.sock.write_bytes(id, IO::ByteFormat::LittleEndian)
conn.sock.write_bytes(query_slice.size, IO::ByteFormat::LittleEndian)
conn.sock.write(query_slice)
conn.sock.flush
}
end
end

private def read_response
Expand Down

0 comments on commit df74fd4

Please sign in to comment.