Skip to content

Commit

Permalink
Merge pull request #1753 from rabbitmq/rabbitmq-dotnet-client-1750
Browse files Browse the repository at this point in the history
Handle `OperationCanceledException` in RPC continuations
  • Loading branch information
lukebakken authored Jan 16, 2025
2 parents db37681 + cc16b10 commit bcba9e4
Show file tree
Hide file tree
Showing 7 changed files with 351 additions and 172 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ jobs:
id: install-start-rabbitmq
run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1
- name: Integration Tests
timeout-minutes: 25
timeout-minutes: 45
run: |
Start-Job -Verbose -ScriptBlock { & "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-server.exe" | Out-File -LiteralPath $env:APPDATA\RabbitMQ\log\toxiproxy-log.txt }; `
dotnet test `
Expand Down Expand Up @@ -113,7 +113,7 @@ jobs:
id: install-start-rabbitmq
run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1
- name: Sequential Integration Tests
timeout-minutes: 25
timeout-minutes: 45
run: dotnet test `
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' `
--environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" `
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,61 +83,65 @@ internal ConsumerDispatcherChannelBase(Impl.Channel channel, ushort concurrency)

public ushort Concurrency => _concurrency;

public ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken)
public async ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

if (false == _disposed && false == _quiesce)
{
AddConsumer(consumer, consumerTag);
WorkStruct work = WorkStruct.CreateConsumeOk(consumer, consumerTag);
return _writer.WriteAsync(work, cancellationToken);
}
else
{
return default;
try
{
AddConsumer(consumer, consumerTag);
WorkStruct work = WorkStruct.CreateConsumeOk(consumer, consumerTag);
await _writer.WriteAsync(work, cancellationToken)
.ConfigureAwait(false);
}
catch
{
_ = GetAndRemoveConsumer(consumerTag);
throw;
}
}
}

public ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered,
public async ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered,
string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body,
CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

if (false == _disposed && false == _quiesce)
{
IAsyncBasicConsumer consumer = GetConsumerOrDefault(consumerTag);
var work = WorkStruct.CreateDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body);
return _writer.WriteAsync(work, cancellationToken);
}
else
{
return default;
await _writer.WriteAsync(work, cancellationToken)
.ConfigureAwait(false);
}
}

public ValueTask HandleBasicCancelOkAsync(string consumerTag, CancellationToken cancellationToken)
public async ValueTask HandleBasicCancelOkAsync(string consumerTag, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

if (false == _disposed && false == _quiesce)
{
IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag);
WorkStruct work = WorkStruct.CreateCancelOk(consumer, consumerTag);
return _writer.WriteAsync(work, cancellationToken);
}
else
{
return default;
await _writer.WriteAsync(work, cancellationToken)
.ConfigureAwait(false);
}
}

public ValueTask HandleBasicCancelAsync(string consumerTag, CancellationToken cancellationToken)
public async ValueTask HandleBasicCancelAsync(string consumerTag, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

if (false == _disposed && false == _quiesce)
{
IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag);
WorkStruct work = WorkStruct.CreateCancel(consumer, consumerTag);
return _writer.WriteAsync(work, cancellationToken);
}
else
{
return default;
await _writer.WriteAsync(work, cancellationToken)
.ConfigureAwait(false);
}
}

Expand Down
Loading

0 comments on commit bcba9e4

Please sign in to comment.