Skip to content

Commit

Permalink
Try to detect socket fail in heartbeat
Browse files Browse the repository at this point in the history
  • Loading branch information
mgravell committed Mar 23, 2014
1 parent 2e77071 commit 71763cb
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,9 @@ public enum ConnectionFailureType
/// The socket was closed
/// </summary>
ConnectionDisposed,
/// <summary>
/// The database is loading and is not available for use
/// </summary>
Loading
}
}
10 changes: 1 addition & 9 deletions StackExchange.Redis/StackExchange/Redis/MessageQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ internal int Count()
}
}

internal bool HasWork()
internal bool Any()
{
lock(regular)
{
Expand Down Expand Up @@ -108,13 +108,5 @@ internal void GetStormLog(StringBuilder sb)
}
}
}

internal bool Any()
{
lock(regular)
{
return high.Count != 0 || regular.Count != 0;
}
}
}
}
18 changes: 16 additions & 2 deletions StackExchange.Redis/StackExchange/Redis/PhysicalBridge.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Runtime.CompilerServices;
using System.Text;
Expand Down Expand Up @@ -173,7 +174,7 @@ internal int GetOutstandingCount(out int inst, out int qu, out int qs, out int q
inst = (int)(Interlocked.Read(ref operationCount) - Interlocked.Read(ref profileLastLog));
qu = queue.Count();
var tmp = physical;
qs = tmp == null ? 0 : tmp.GetOutstandingCount();
qs = tmp == null ? 0 : tmp.GetSentAwaitingResponseCount();
qc = completionManager.GetOutstandingCount();
wr = Interlocked.CompareExchange(ref activeWriters, 0, 0);
wq = Interlocked.CompareExchange(ref inWriteQueue, 0, 0);
Expand Down Expand Up @@ -378,6 +379,12 @@ internal void OnHeartbeat(bool ifConnectedOnly)
State oldState;
OnDisconnected(ConnectionFailureType.SocketFailure, tmp, out ignore, out oldState);
}
} else if(!queue.Any() && tmp.GetSentAwaitingResponseCount() != 0)
{
// there's a chance this is a dead socket; sending data will shake that
// up a bit, so if we have an empty unsent queue and a non-empty sent
// queue, test the socket
KeepAlive();
}
}
break;
Expand Down Expand Up @@ -695,6 +702,7 @@ private bool WriteMessageToServer(PhysicalConnection connection, Message message
internal WriteResult WriteQueue(int maxWork)
{
bool weAreWriter = false;
PhysicalConnection conn = null;
try
{
Trace("Writing queue from bridge");
Expand All @@ -706,7 +714,7 @@ internal WriteResult WriteQueue(int maxWork)
return WriteResult.CompetingWriter;
}

var conn = GetConnection();
conn = GetConnection();
if(conn == null)
{
AbortUnsent();
Expand Down Expand Up @@ -745,8 +753,14 @@ internal WriteResult WriteQueue(int maxWork)
}
}
}
catch(IOException ex)
{
if (conn != null) conn.RecordConnectionFailed(ConnectionFailureType.SocketFailure, ex);
AbortUnsent();
}
catch(Exception ex)
{
AbortUnsent();
OnInternalError(ex);
}
finally
Expand Down
9 changes: 5 additions & 4 deletions StackExchange.Redis/StackExchange/Redis/PhysicalConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void RecordConnectionFailed(ConnectionFailureType failureType, Exception
lastBeat = Interlocked.Read(ref lastBeatTickCount);

string message = failureType + " on " + Format.ToString(bridge.ServerEndPoint.EndPoint) + "/" + connectionType
+ ", input-buffer: " + ioBufferBytes + ", outstanding: " + GetOutstandingCount()
+ ", input-buffer: " + ioBufferBytes + ", outstanding: " + GetSentAwaitingResponseCount()
+ ", last-read: " + unchecked(now - lastRead) / 1000 + "s ago, last-write: " + unchecked(now - lastWrite) / 1000 + "s ago, keep-alive: " + bridge.ServerEndPoint.WriteEverySeconds + "s, pending: "
+ bridge.GetPendingCount() + ", state: " + oldState + ", last-heartbeat: " + (lastBeat == 0 ? "never" : (unchecked(now - lastBeat) / 1000 + "s ago"))
+ (bridge.IsBeating ? " (mid-beat)" : "") + ", last-mbeat: " + multiplexer.LastHeartbeatSecondsAgo + "s ago, global: "
Expand Down Expand Up @@ -214,8 +214,9 @@ internal static void IdentifyFailureType(Exception exception, ref ConnectionFail
if (exception != null && failureType == ConnectionFailureType.InternalFailure)
{
if (exception is AuthenticationException) failureType = ConnectionFailureType.AuthenticationFailure;
if (exception is EndOfStreamException) failureType = ConnectionFailureType.SocketClosed;
if (exception is SocketException || exception is IOException) failureType = ConnectionFailureType.SocketFailure;
else if (exception is SocketException || exception is IOException) failureType = ConnectionFailureType.SocketFailure;
else if (exception is EndOfStreamException) failureType = ConnectionFailureType.SocketClosed;
else if (exception is ObjectDisposedException) failureType = ConnectionFailureType.SocketClosed;
}
}

Expand All @@ -236,7 +237,7 @@ internal void GetCounters(ConnectionCounters counters)
counters.Subscriptions = SubscriptionCount;
}

internal int GetOutstandingCount()
internal int GetSentAwaitingResponseCount()
{
lock (outstanding)
{
Expand Down
6 changes: 5 additions & 1 deletion StackExchange.Redis/StackExchange/Redis/ResultProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,8 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes

sealed class EstablishConnectionProcessor : ResultProcessor<bool>
{
static readonly byte[] expected = Encoding.UTF8.GetBytes("PONG"), authFail = Encoding.UTF8.GetBytes("ERR operation not permitted");
static readonly byte[] expected = Encoding.UTF8.GetBytes("PONG"), authFail = Encoding.UTF8.GetBytes("ERR operation not permitted"),
loading = Encoding.UTF8.GetBytes("LOADING ");
public override bool SetResult(PhysicalConnection connection, Message message, RawResult result)
{
var final = base.SetResult(connection, message, result);
Expand All @@ -662,6 +663,9 @@ public override bool SetResult(PhysicalConnection connection, Message message, R
if (result.Assert(authFail))
{
connection.RecordConnectionFailed(ConnectionFailureType.AuthenticationFailure);
} else if(result.AssertStarts(loading))
{
connection.RecordConnectionFailed(ConnectionFailureType.Loading);
}
else
{
Expand Down

0 comments on commit 71763cb

Please sign in to comment.