diff --git a/.gitignore b/.gitignore index 3e8850c4b0..8d9fe0e6bc 100644 --- a/.gitignore +++ b/.gitignore @@ -124,3 +124,9 @@ projects/Unit*/TestResult.xml # Vim .sw? .*.sw? + + +################# +## JetBrains Rider +################# +.idea/ diff --git a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs index b33443e437..1ceeb45e7a 100644 --- a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs +++ b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs @@ -274,6 +274,13 @@ public TimeSpan ContinuationTimeout /// public bool TopologyRecoveryEnabled { get; set; } = true; + /// + /// Force writes to the socket to run on a dedicated thread instead of the thread pool. This may prevent + /// timeouts if a large number of blocking requests are going out simultaneously. Will become obsolete + /// once requests become asynchronous. Defaults to false. + /// + public bool EnableSynchronousWriteLoop { get; set; } = false; + /// /// Filter to include/exclude entities from topology recovery. /// Default filter includes all entities in topology recovery. @@ -640,7 +647,7 @@ public IConnection CreateConnection(IEndpointResolver endpointResolver, string c internal IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint) { IFrameHandler fh = Protocols.DefaultProtocol.CreateFrameHandler(endpoint, _memoryPool, SocketFactory, - RequestedConnectionTimeout, SocketReadTimeout, SocketWriteTimeout); + RequestedConnectionTimeout, SocketReadTimeout, SocketWriteTimeout, EnableSynchronousWriteLoop); return ConfigureFrameHandler(fh); } diff --git a/projects/RabbitMQ.Client/client/impl/IProtocolExtensions.cs b/projects/RabbitMQ.Client/client/impl/IProtocolExtensions.cs index e8c2acd125..8edce34190 100644 --- a/projects/RabbitMQ.Client/client/impl/IProtocolExtensions.cs +++ b/projects/RabbitMQ.Client/client/impl/IProtocolExtensions.cs @@ -45,9 +45,10 @@ public static IFrameHandler CreateFrameHandler( Func socketFactory, TimeSpan connectionTimeout, TimeSpan readTimeout, - TimeSpan writeTimeout) + TimeSpan writeTimeout, + bool enableSynchronousWriteLoop) { - return new SocketFrameHandler(endpoint, socketFactory, connectionTimeout, readTimeout, writeTimeout) + return new SocketFrameHandler(endpoint, socketFactory, connectionTimeout, readTimeout, writeTimeout, enableSynchronousWriteLoop) { MemoryPool = pool }; diff --git a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs index 141e02aee0..f768a54a45 100644 --- a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs +++ b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs @@ -63,6 +63,8 @@ public static async Task TimeoutAfter(this Task task, TimeSpan timeout) class SocketFrameHandler : IFrameHandler { private readonly AmqpTcpEndpoint _endpoint; + + // Socket poll timeout in ms. If the socket does not // become writeable in this amount of time, we throw // an exception. @@ -78,19 +80,19 @@ class SocketFrameHandler : IFrameHandler private readonly byte[] _frameHeaderBuffer; private bool _closed; private ArrayPool _pool = ArrayPool.Shared; + private readonly bool _enableSynchronousWriteLoop; public SocketFrameHandler(AmqpTcpEndpoint endpoint, Func socketFactory, - TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout) + TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout, bool enableSynchronousWriteLoop) { _endpoint = endpoint; + _enableSynchronousWriteLoop = enableSynchronousWriteLoop; _frameHeaderBuffer = new byte[6]; var channel = Channel.CreateUnbounded>( new UnboundedChannelOptions { - AllowSynchronousContinuations = false, - SingleReader = true, - SingleWriter = false + AllowSynchronousContinuations = false, SingleReader = true, SingleWriter = false }); _channelReader = channel.Reader; @@ -134,7 +136,15 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint, _writer = new BufferedStream(netstream, _socket.Client.SendBufferSize); WriteTimeout = writeTimeout; - _writerTask = Task.Run(WriteLoop, CancellationToken.None); + if (_enableSynchronousWriteLoop) + { + TaskCreationOptions tco = TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach; + _writerTask = Task.Factory.StartNew(SynchronousWriteLoop, CancellationToken.None, tco, TaskScheduler.Default); + } + else + { + _writerTask = Task.Run(WriteLoop, CancellationToken.None); + } } public AmqpTcpEndpoint Endpoint @@ -281,6 +291,24 @@ private async Task WriteLoop() } } + private void SynchronousWriteLoop() + { + while (_channelReader.WaitToReadAsync().AsTask().Result) + { + _socket.Client.Poll(_writeableStateTimeoutMicroSeconds, SelectMode.SelectWrite); + while (_channelReader.TryRead(out var memory)) + { + if (MemoryMarshal.TryGetArray(memory, out ArraySegment segment) && + segment.Array != null) + { + _writer.Write(segment.Array, segment.Offset, segment.Count); + MemoryPool.Return(segment.Array); + } + } + _writer.Flush(); + } + } + private static bool ShouldTryIPv6(AmqpTcpEndpoint endpoint) { return Socket.OSSupportsIPv6 && endpoint.AddressFamily != AddressFamily.InterNetwork; diff --git a/projects/Unit/TestConnectionFactory.cs b/projects/Unit/TestConnectionFactory.cs index 76ae3d85c3..89a590f9a8 100644 --- a/projects/Unit/TestConnectionFactory.cs +++ b/projects/Unit/TestConnectionFactory.cs @@ -161,6 +161,20 @@ public void TestCreateConnectionAmqpTcpEndpointListAndClientProvidedName() } } + [Test] + public void TestCreateConnectionWithSynchronousWriteLoop() + { + var cf = new ConnectionFactory + { + AutomaticRecoveryEnabled = true, + HostName = "localhost", + EnableSynchronousWriteLoop = true + }; + using (IConnection conn = cf.CreateConnection()){ + Assert.AreEqual(5672, conn.Endpoint.Port); + } + } + [Test] public void TestCreateConnectionUsesDefaultPort() {