diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index ce6fd70caa..7a74712cad 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -564,6 +564,73 @@ public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer() AssertRecordedQueues((RabbitMQ.Client.Framing.Impl.AutorecoveringConnection)_conn, 0); } + [Fact] + public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650() + { + string exchangeName = GenerateExchangeName(); + string queue1Name = GenerateQueueName(); + string queue2Name = GenerateQueueName(); + + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + using var cts = new CancellationTokenSource(WaitSpan); + using CancellationTokenRegistration ctr = cts.Token.Register(() => + { + tcs.SetCanceled(); + }); + + _conn.ConnectionShutdown += (o, ea) => + { + HandleConnectionShutdown(_conn, ea, (args) => + { + MaybeSetException(ea, tcs); + }); + }; + + _channel.ChannelShutdown += (o, ea) => + { + HandleChannelShutdown(_channel, ea, (args) => + { + MaybeSetException(ea, tcs); + }); + }; + + // queue1 -> produce click to queue2 + // click -> exchange + // queue2 -> consume click from queue1 + await _channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct, autoDelete: true); + await _channel.QueueDeclareAsync(queue1Name); + await _channel.QueueBindAsync(queue1Name, exchangeName, queue1Name); + await _channel.QueueDeclareAsync(queue2Name); + await _channel.QueueBindAsync(queue2Name, exchangeName, queue2Name); + + var consumer1 = new AsyncEventingBasicConsumer(_channel); + consumer1.Received += async (sender, args) => + { + using (IChannel innerChannel = await _conn.CreateChannelAsync()) + { + await innerChannel.ConfirmSelectAsync(); + await innerChannel.BasicPublishAsync(exchangeName, queue2Name, mandatory: true); + await innerChannel.WaitForConfirmsOrDieAsync(); + await innerChannel.CloseAsync(); + } + }; + await _channel.BasicConsumeAsync(queue1Name, autoAck: true, consumer1); + + var consumer2 = new AsyncEventingBasicConsumer(_channel); + consumer2.Received += async (sender, args) => + { + tcs.TrySetResult(true); + await Task.Yield(); + }; + await _channel.BasicConsumeAsync(queue2Name, autoAck: true, consumer2); + + await _channel.ConfirmSelectAsync(); + await _channel.BasicPublishAsync(exchangeName, queue1Name, body: GetRandomBody(1024)); + await _channel.WaitForConfirmsOrDieAsync(); + + Assert.True(await tcs.Task); + } + private static void SetException(Exception ex, params TaskCompletionSource[] tcsAry) { foreach (TaskCompletionSource tcs in tcsAry)