Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a synchronous write loop for connections. #1389

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,9 @@ projects/Unit*/TestResult.xml
# Vim
.sw?
.*.sw?


#################
## JetBrains Rider
#################
.idea/
9 changes: 8 additions & 1 deletion projects/RabbitMQ.Client/client/api/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,13 @@ public TimeSpan ContinuationTimeout
/// </summary>
public bool TopologyRecoveryEnabled { get; set; } = true;

/// <summary>
/// Force writes to the socket to run on a dedicated thread instead of the thread pool. This may prevent
/// timeouts if a large number of blocking requests are going out simultaneously. Will become obsolete
/// once requests become asynchronous. Defaults to false.
/// </summary>
public bool EnableSynchronousWriteLoop { get; set; } = false;

/// <summary>
/// Filter to include/exclude entities from topology recovery.
/// Default filter includes all entities in topology recovery.
Expand Down Expand Up @@ -640,7 +647,7 @@ public IConnection CreateConnection(IEndpointResolver endpointResolver, string c
internal IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint)
{
IFrameHandler fh = Protocols.DefaultProtocol.CreateFrameHandler(endpoint, _memoryPool, SocketFactory,
RequestedConnectionTimeout, SocketReadTimeout, SocketWriteTimeout);
RequestedConnectionTimeout, SocketReadTimeout, SocketWriteTimeout, EnableSynchronousWriteLoop);
return ConfigureFrameHandler(fh);
}

Expand Down
5 changes: 3 additions & 2 deletions projects/RabbitMQ.Client/client/impl/IProtocolExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ public static IFrameHandler CreateFrameHandler(
Func<AddressFamily, ITcpClient> socketFactory,
TimeSpan connectionTimeout,
TimeSpan readTimeout,
TimeSpan writeTimeout)
TimeSpan writeTimeout,
bool enableSynchronousWriteLoop)
{
return new SocketFrameHandler(endpoint, socketFactory, connectionTimeout, readTimeout, writeTimeout)
return new SocketFrameHandler(endpoint, socketFactory, connectionTimeout, readTimeout, writeTimeout, enableSynchronousWriteLoop)
{
MemoryPool = pool
};
Expand Down
38 changes: 33 additions & 5 deletions projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public static async Task TimeoutAfter(this Task task, TimeSpan timeout)
class SocketFrameHandler : IFrameHandler
{
private readonly AmqpTcpEndpoint _endpoint;


// Socket poll timeout in ms. If the socket does not
// become writeable in this amount of time, we throw
// an exception.
Expand All @@ -78,19 +80,19 @@ class SocketFrameHandler : IFrameHandler
private readonly byte[] _frameHeaderBuffer;
private bool _closed;
private ArrayPool<byte> _pool = ArrayPool<byte>.Shared;
private readonly bool _enableSynchronousWriteLoop;

public SocketFrameHandler(AmqpTcpEndpoint endpoint,
Func<AddressFamily, ITcpClient> socketFactory,
TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout)
TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout, bool enableSynchronousWriteLoop)
{
_endpoint = endpoint;
_enableSynchronousWriteLoop = enableSynchronousWriteLoop;
_frameHeaderBuffer = new byte[6];
var channel = Channel.CreateUnbounded<ReadOnlyMemory<byte>>(
new UnboundedChannelOptions
{
AllowSynchronousContinuations = false,
SingleReader = true,
SingleWriter = false
AllowSynchronousContinuations = false, SingleReader = true, SingleWriter = false
});

_channelReader = channel.Reader;
Expand Down Expand Up @@ -134,7 +136,15 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint,
_writer = new BufferedStream(netstream, _socket.Client.SendBufferSize);

WriteTimeout = writeTimeout;
_writerTask = Task.Run(WriteLoop, CancellationToken.None);
if (_enableSynchronousWriteLoop)
{
TaskCreationOptions tco = TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach;
_writerTask = Task.Factory.StartNew(SynchronousWriteLoop, CancellationToken.None, tco, TaskScheduler.Default);
}
else
{
_writerTask = Task.Run(WriteLoop, CancellationToken.None);
}
}

public AmqpTcpEndpoint Endpoint
Expand Down Expand Up @@ -281,6 +291,24 @@ private async Task WriteLoop()
}
}

private void SynchronousWriteLoop()
{
while (_channelReader.WaitToReadAsync().AsTask().Result)
{
_socket.Client.Poll(_writeableStateTimeoutMicroSeconds, SelectMode.SelectWrite);
while (_channelReader.TryRead(out var memory))
{
if (MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment) &&
segment.Array != null)
{
_writer.Write(segment.Array, segment.Offset, segment.Count);
MemoryPool.Return(segment.Array);
}
}
_writer.Flush();
}
}

private static bool ShouldTryIPv6(AmqpTcpEndpoint endpoint)
{
return Socket.OSSupportsIPv6 && endpoint.AddressFamily != AddressFamily.InterNetwork;
Expand Down
14 changes: 14 additions & 0 deletions projects/Unit/TestConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,20 @@ public void TestCreateConnectionAmqpTcpEndpointListAndClientProvidedName()
}
}

[Test]
public void TestCreateConnectionWithSynchronousWriteLoop()
{
var cf = new ConnectionFactory
{
AutomaticRecoveryEnabled = true,
HostName = "localhost",
EnableSynchronousWriteLoop = true
};
using (IConnection conn = cf.CreateConnection()){
Assert.AreEqual(5672, conn.Endpoint.Port);
}
}

[Test]
public void TestCreateConnectionUsesDefaultPort()
{
Expand Down
Loading