Skip to content

Commit

Permalink
Merge pull request #1578 from rabbitmq/rabbitmq-dotnet-client-650
Browse files Browse the repository at this point in the history
Add test that creates `IChannel` within async consumer callback
  • Loading branch information
lukebakken authored May 23, 2024
2 parents d7b8e2f + 7445af6 commit 41accbd
Showing 1 changed file with 67 additions and 0 deletions.
67 changes: 67 additions & 0 deletions projects/Test/Integration/TestAsyncConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,73 @@ public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer()
AssertRecordedQueues((RabbitMQ.Client.Framing.Impl.AutorecoveringConnection)_conn, 0);
}

[Fact]
public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650()
{
string exchangeName = GenerateExchangeName();
string queue1Name = GenerateQueueName();
string queue2Name = GenerateQueueName();

var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
using var cts = new CancellationTokenSource(WaitSpan);
using CancellationTokenRegistration ctr = cts.Token.Register(() =>
{
tcs.SetCanceled();
});

_conn.ConnectionShutdown += (o, ea) =>
{
HandleConnectionShutdown(_conn, ea, (args) =>
{
MaybeSetException(ea, tcs);
});
};

_channel.ChannelShutdown += (o, ea) =>
{
HandleChannelShutdown(_channel, ea, (args) =>
{
MaybeSetException(ea, tcs);
});
};

// queue1 -> produce click to queue2
// click -> exchange
// queue2 -> consume click from queue1
await _channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct, autoDelete: true);
await _channel.QueueDeclareAsync(queue1Name);
await _channel.QueueBindAsync(queue1Name, exchangeName, queue1Name);
await _channel.QueueDeclareAsync(queue2Name);
await _channel.QueueBindAsync(queue2Name, exchangeName, queue2Name);

var consumer1 = new AsyncEventingBasicConsumer(_channel);
consumer1.Received += async (sender, args) =>
{
using (IChannel innerChannel = await _conn.CreateChannelAsync())
{
await innerChannel.ConfirmSelectAsync();
await innerChannel.BasicPublishAsync(exchangeName, queue2Name, mandatory: true);
await innerChannel.WaitForConfirmsOrDieAsync();
await innerChannel.CloseAsync();
}
};
await _channel.BasicConsumeAsync(queue1Name, autoAck: true, consumer1);

var consumer2 = new AsyncEventingBasicConsumer(_channel);
consumer2.Received += async (sender, args) =>
{
tcs.TrySetResult(true);
await Task.Yield();
};
await _channel.BasicConsumeAsync(queue2Name, autoAck: true, consumer2);

await _channel.ConfirmSelectAsync();
await _channel.BasicPublishAsync(exchangeName, queue1Name, body: GetRandomBody(1024));
await _channel.WaitForConfirmsOrDieAsync();

Assert.True(await tcs.Task);
}

private static void SetException(Exception ex, params TaskCompletionSource<bool>[] tcsAry)
{
foreach (TaskCompletionSource<bool> tcs in tcsAry)
Expand Down

0 comments on commit 41accbd

Please sign in to comment.